From 6e9e9ad557479354938257f04c8fab283b8251e8 Mon Sep 17 00:00:00 2001 From: Steven Armstrong Date: Tue, 12 Nov 2019 00:40:58 +0100 Subject: [PATCH 1/4] implement log server to capture nested logging output Signed-off-by: Steven Armstrong --- cdist/core/code.py | 4 +++ cdist/install.py | 23 +++++++++++++- cdist/log.py | 77 ++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 90 insertions(+), 14 deletions(-) diff --git a/cdist/core/code.py b/cdist/core/code.py index 1550880a..a7d9b7ca 100644 --- a/cdist/core/code.py +++ b/cdist/core/code.py @@ -116,6 +116,10 @@ class Code(object): if dry_run: self.env['__cdist_dry_run'] = '1' + if '__cdist_log_server_socket_to_export' in os.environ: + self.env['__cdist_log_server_socket'] = os.environ['__cdist_log_server_socket_to_export'] + + def _run_gencode(self, cdist_object, which): cdist_type = cdist_object.cdist_type script = os.path.join(self.local.type_path, diff --git a/cdist/install.py b/cdist/install.py index b88ad016..3f94ca68 100644 --- a/cdist/install.py +++ b/cdist/install.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# 2013 Steven Armstrong (steven-cdist at armstrong.cc) +# 2013-2019 Steven Armstrong (steven-cdist at armstrong.cc) # # This file is part of cdist. # @@ -20,11 +20,32 @@ # # +import os +import logging +import tempfile + import cdist.config import cdist.core class Install(cdist.config.Config): + + @classmethod + def onehost(cls, host, host_tags, host_base_path, host_dir_name, args, + parallel, configuration, remove_remote_files_dirs=False): + # Start a log server so nested `cdist config` runs have a place to + # send their logs to. + log_server_socket_dir = tempfile.mkdtemp() + log_server_socket = os.path.join(log_server_socket_dir, 'log-server') + cls._register_path_for_removal(log_server_socket_dir) + log = logging.getLogger(host) + log.debug('Starting logging server on: %s', log_server_socket) + os.environ['__cdist_log_server_socket_to_export'] = log_server_socket + cdist.log.setupLogServer(log_server_socket) + + super().onehost(host, host_tags, host_base_path, host_dir_name, args, + parallel, configuration, remove_remote_files_dirs=False) + def object_list(self): """Short name for object list retrieval. In install mode, we only care about install objects. diff --git a/cdist/log.py b/cdist/log.py index 790059df..94dd11e8 100644 --- a/cdist/log.py +++ b/cdist/log.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- # # 2010-2013 Nico Schottelius (nico-cdist at schottelius.org) +# 2019-2020 Steven Armstrong # # This file is part of cdist. # @@ -20,9 +21,17 @@ # # -import logging -import sys +import asyncio +import contextlib import datetime +import logging +import logging.handlers +import os +import pickle +import struct +import sys +import threading +import time # Define additional cdist logging levels. @@ -89,20 +98,25 @@ class DefaultLog(logging.Logger): super().__init__(name) self.propagate = False - formatter = CdistFormatter(self.FORMAT) + if '__cdist_log_server_socket' in os.environ: + log_server_socket = os.environ['__cdist_log_server_socket'] + socket_handler = logging.handlers.SocketHandler(log_server_socket, None) + self.addHandler(socket_handler) + else: + formatter = CdistFormatter(self.FORMAT) - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.addFilter(self.StdoutFilter()) - stdout_handler.setLevel(logging.TRACE) - stdout_handler.setFormatter(formatter) + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.addFilter(self.StdoutFilter()) + stdout_handler.setLevel(logging.TRACE) + stdout_handler.setFormatter(formatter) - stderr_handler = logging.StreamHandler(sys.stderr) - stderr_handler.addFilter(self.StderrFilter()) - stderr_handler.setLevel(logging.ERROR) - stderr_handler.setFormatter(formatter) + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.addFilter(self.StderrFilter()) + stderr_handler.setLevel(logging.ERROR) + stderr_handler.setFormatter(formatter) - self.addHandler(stdout_handler) - self.addHandler(stderr_handler) + self.addHandler(stdout_handler) + self.addHandler(stderr_handler) def verbose(self, msg, *args, **kwargs): self.log(logging.VERBOSE, msg, *args, **kwargs) @@ -152,4 +166,41 @@ def setupParallelLogging(): logging.setLoggerClass(ParallelLog) +async def handle_log_client(reader, writer): + while True: + chunk = await reader.read(4) + if len(chunk) < 4: + return + + data_size = struct.unpack('>L', chunk)[0] + data = bytearray(data_size) + view = memoryview(data) + data_pending = data_size + data = await reader.read(data_size) + + obj = pickle.loads(data) + record = logging.makeLogRecord(obj) + logger = logging.getLogger(record.name) + logger.handle(record) + + +def run_log_server(server_address): + # Get a new loop inside the current thread to run the log server. + loop = asyncio.new_event_loop() + loop.create_task(asyncio.start_unix_server(handle_log_client, server_address)) + loop.run_forever() + + +def setupLogServer(log_server_socket): + """Run a asyncio based unix socket log server in a background thread. + """ + with contextlib.suppress(FileNotFoundError): + os.remove(log_server_socket) + t = threading.Thread(target=run_log_server, args=(log_server_socket,)) + # Deamonizing the thread means we don't have to care about stoping it. + # It will die together with the main process. + t.daemon = True + t.start() + + setupDefaultLogging() From 831bfc822b61a5e5cf8186a32ea592622693efdb Mon Sep 17 00:00:00 2001 From: Steven Armstrong Date: Tue, 12 Nov 2019 09:22:28 +0100 Subject: [PATCH 2/4] remove unused code Signed-off-by: Steven Armstrong --- cdist/log.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cdist/log.py b/cdist/log.py index 94dd11e8..c1376a58 100644 --- a/cdist/log.py +++ b/cdist/log.py @@ -173,9 +173,6 @@ async def handle_log_client(reader, writer): return data_size = struct.unpack('>L', chunk)[0] - data = bytearray(data_size) - view = memoryview(data) - data_pending = data_size data = await reader.read(data_size) obj = pickle.loads(data) From 57e352cd1e7b400ca1daf7e1e680874bd9a6cc01 Mon Sep 17 00:00:00 2001 From: Steven Armstrong Date: Sat, 11 Jan 2020 01:59:18 +0100 Subject: [PATCH 3/4] log server is also usefull for cdist config Signed-off-by: Steven Armstrong --- cdist/argparse.py | 6 ++++++ cdist/config.py | 10 ++++++++++ cdist/install.py | 13 +++---------- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/cdist/argparse.py b/cdist/argparse.py index 1c83237c..15b9ca7a 100644 --- a/cdist/argparse.py +++ b/cdist/argparse.py @@ -250,6 +250,12 @@ def get_parsers(): '-S', '--disable-saving-output-streams', help='Disable saving output streams.', action='store_false', dest='save_output_streams', default=True) + parser['config_main'].add_argument( + '--log-server', + action='store_true', + help=('Start a log server for sub processes to use.' + 'This is mainly usefull when running cdist nested' + 'from a code-local script.')) # Config parser['config_args'] = argparse.ArgumentParser(add_help=False) diff --git a/cdist/config.py b/cdist/config.py index b2d72f05..82e8559b 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -385,6 +385,16 @@ class Config(object): log = logging.getLogger(host) + if args.log_server: + # Start a log server so that nested `cdist config` runs have a place + # to send their logs to. + log_server_socket_dir = tempfile.mkdtemp() + log_server_socket = os.path.join(log_server_socket_dir, 'log-server') + cls._register_path_for_removal(log_server_socket_dir) + log.debug('Starting logging server on: %s', log_server_socket) + os.environ['__cdist_log_server_socket_to_export'] = log_server_socket + cdist.log.setupLogServer(log_server_socket) + try: remote_exec, remote_copy, cleanup_cmd = cls._resolve_remote_cmds( args) diff --git a/cdist/install.py b/cdist/install.py index 3f94ca68..ec0a7270 100644 --- a/cdist/install.py +++ b/cdist/install.py @@ -33,16 +33,9 @@ class Install(cdist.config.Config): @classmethod def onehost(cls, host, host_tags, host_base_path, host_dir_name, args, parallel, configuration, remove_remote_files_dirs=False): - # Start a log server so nested `cdist config` runs have a place to - # send their logs to. - log_server_socket_dir = tempfile.mkdtemp() - log_server_socket = os.path.join(log_server_socket_dir, 'log-server') - cls._register_path_for_removal(log_server_socket_dir) - log = logging.getLogger(host) - log.debug('Starting logging server on: %s', log_server_socket) - os.environ['__cdist_log_server_socket_to_export'] = log_server_socket - cdist.log.setupLogServer(log_server_socket) - + # Always start log server during cdist install so that nested + # `cdist config` runs have a place to send their logs to. + args.log_server = True super().onehost(host, host_tags, host_base_path, host_dir_name, args, parallel, configuration, remove_remote_files_dirs=False) From 59b98091d78c5195d2aa4a2ea3ef79f93e5ba67c Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Tue, 9 Jun 2020 12:47:50 +0200 Subject: [PATCH 4/4] Adapt; update docs and code style --- cdist/__init__.py | 4 ++++ cdist/argparse.py | 13 +++++++------ cdist/config.py | 27 ++++++++++++--------------- cdist/core/code.py | 6 +++--- cdist/install.py | 8 +++----- cdist/log.py | 18 +++++++++++------- docs/src/man1/cdist.rst | 26 ++++++++++++++++---------- scripts/cdist | 5 ++--- 8 files changed, 58 insertions(+), 49 deletions(-) diff --git a/cdist/__init__.py b/cdist/__init__.py index c673b3ba..be573170 100644 --- a/cdist/__init__.py +++ b/cdist/__init__.py @@ -26,6 +26,7 @@ import hashlib import cdist.log import cdist.version + VERSION = cdist.version.VERSION BANNER = """ @@ -48,6 +49,9 @@ REMOTE_EXEC = "ssh -o User=root" REMOTE_CMDS_CLEANUP_PATTERN = "ssh -o User=root -O exit -S {}" +MIN_SUPPORTED_PYTHON_VERSION = '3.5' + + class Error(Exception): """Base exception class for this project""" pass diff --git a/cdist/argparse.py b/cdist/argparse.py index 15b9ca7a..77303591 100644 --- a/cdist/argparse.py +++ b/cdist/argparse.py @@ -206,6 +206,13 @@ def get_parsers(): 'supported. Without argument CPU count is used by default. '), action='store', dest='jobs', const=multiprocessing.cpu_count()) + parser['config_main'].add_argument( + '--log-server', + action='store_true', + help=('Start a log server for sub processes to use. ' + 'This is mainly useful when running cdist nested ' + 'from a code-local script. Log server is alwasy ' + 'implicitly started for \'install\' command.')) parser['config_main'].add_argument( '-n', '--dry-run', help='Do not execute code.', action='store_true') @@ -250,12 +257,6 @@ def get_parsers(): '-S', '--disable-saving-output-streams', help='Disable saving output streams.', action='store_false', dest='save_output_streams', default=True) - parser['config_main'].add_argument( - '--log-server', - action='store_true', - help=('Start a log server for sub processes to use.' - 'This is mainly usefull when running cdist nested' - 'from a code-local script.')) # Config parser['config_args'] = argparse.ArgumentParser(add_help=False) diff --git a/cdist/config.py b/cdist/config.py index 82e8559b..9fe9b676 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -29,18 +29,20 @@ import time import itertools import tempfile import multiprocessing -from cdist.mputil import mp_pool_run, mp_sig_handler import atexit import shutil import socket + +from cdist.mputil import mp_pool_run, mp_sig_handler +from cdist import core, inventory +from cdist.util.remoteutil import inspect_ssh_mux_opts + import cdist import cdist.hostsource import cdist.exec.local import cdist.exec.remote import cdist.util.ipaddr as ipaddr import cdist.configuration -from cdist import core, inventory -from cdist.util.remoteutil import inspect_ssh_mux_opts def graph_check_cycle(graph): @@ -195,7 +197,6 @@ class Config(object): @classmethod def commandline(cls, args): """Configure remote system""" - if (args.parallel and args.parallel != 1) or args.jobs: if args.timestamp: cdist.log.setupTimestampingParallelLogging() @@ -382,20 +383,16 @@ class Config(object): If operating in parallel then return tuple (host, True|False, ) so that main process knows for which host function was successful. """ - log = logging.getLogger(host) - if args.log_server: - # Start a log server so that nested `cdist config` runs have a place - # to send their logs to. - log_server_socket_dir = tempfile.mkdtemp() - log_server_socket = os.path.join(log_server_socket_dir, 'log-server') - cls._register_path_for_removal(log_server_socket_dir) - log.debug('Starting logging server on: %s', log_server_socket) - os.environ['__cdist_log_server_socket_to_export'] = log_server_socket - cdist.log.setupLogServer(log_server_socket) - try: + if args.log_server: + # Start a log server so that nested `cdist config` runs + # have a place to send their logs to. + log_server_socket_dir = tempfile.mkdtemp() + cls._register_path_for_removal(log_server_socket_dir) + cdist.log.setupLogServer(log_server_socket_dir, log) + remote_exec, remote_copy, cleanup_cmd = cls._resolve_remote_cmds( args) log.debug("remote_exec for host \"{}\": {}".format( diff --git a/cdist/core/code.py b/cdist/core/code.py index a7d9b7ca..2a30908e 100644 --- a/cdist/core/code.py +++ b/cdist/core/code.py @@ -116,9 +116,9 @@ class Code(object): if dry_run: self.env['__cdist_dry_run'] = '1' - if '__cdist_log_server_socket_to_export' in os.environ: - self.env['__cdist_log_server_socket'] = os.environ['__cdist_log_server_socket_to_export'] - + if '__cdist_log_server_socket_export' in os.environ: + self.env['__cdist_log_server_socket'] = os.environ[ + '__cdist_log_server_socket_export'] def _run_gencode(self, cdist_object, which): cdist_type = cdist_object.cdist_type diff --git a/cdist/install.py b/cdist/install.py index ec0a7270..561b2fa6 100644 --- a/cdist/install.py +++ b/cdist/install.py @@ -20,12 +20,9 @@ # # -import os -import logging -import tempfile - import cdist.config import cdist.core +import cdist.log class Install(cdist.config.Config): @@ -36,8 +33,9 @@ class Install(cdist.config.Config): # Always start log server during cdist install so that nested # `cdist config` runs have a place to send their logs to. args.log_server = True + super().onehost(host, host_tags, host_base_path, host_dir_name, args, - parallel, configuration, remove_remote_files_dirs=False) + parallel, configuration, remove_remote_files_dirs) def object_list(self): """Short name for object list retrieval. diff --git a/cdist/log.py b/cdist/log.py index c1376a58..bee99fac 100644 --- a/cdist/log.py +++ b/cdist/log.py @@ -21,17 +21,16 @@ # # -import asyncio -import contextlib import datetime import logging import logging.handlers +import sys import os +import asyncio +import contextlib import pickle import struct -import sys import threading -import time # Define additional cdist logging levels. @@ -100,7 +99,8 @@ class DefaultLog(logging.Logger): if '__cdist_log_server_socket' in os.environ: log_server_socket = os.environ['__cdist_log_server_socket'] - socket_handler = logging.handlers.SocketHandler(log_server_socket, None) + socket_handler = logging.handlers.SocketHandler(log_server_socket, + None) self.addHandler(socket_handler) else: formatter = CdistFormatter(self.FORMAT) @@ -184,13 +184,17 @@ async def handle_log_client(reader, writer): def run_log_server(server_address): # Get a new loop inside the current thread to run the log server. loop = asyncio.new_event_loop() - loop.create_task(asyncio.start_unix_server(handle_log_client, server_address)) + loop.create_task(asyncio.start_unix_server(handle_log_client, + server_address)) loop.run_forever() -def setupLogServer(log_server_socket): +def setupLogServer(socket_dir, log=logging.getLogger(__name__)): """Run a asyncio based unix socket log server in a background thread. """ + log_server_socket = os.path.join(socket_dir, 'log-server') + log.debug('Starting logging server on: %s', log_server_socket) + os.environ['__cdist_log_server_socket_export'] = log_server_socket with contextlib.suppress(FileNotFoundError): os.remove(log_server_socket) t = threading.Thread(target=run_log_server, args=(log_server_socket,)) diff --git a/docs/src/man1/cdist.rst b/docs/src/man1/cdist.rst index 9bf8fc9b..aa2607f8 100644 --- a/docs/src/man1/cdist.rst +++ b/docs/src/man1/cdist.rst @@ -17,20 +17,20 @@ SYNOPSIS cdist config [-h] [-l LOGLEVEL] [-q] [-v] [-b] [--colors WHEN] [-g CONFIG_FILE] [-4] [-6] [-C CACHE_PATH_PATTERN] - [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-P] - [-R [{tar,tgz,tbz2,txz}]] [-r REMOTE_OUT_PATH] - [--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] - [-S] [-I INVENTORY_DIR] [-A] [-a] [-f HOSTFILE] - [-p [HOST_MAX]] [-s] [-t] + [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [--log-server] + [-n] [-o OUT_PATH] [-P] [-R [{tar,tgz,tbz2,txz}]] + [-r REMOTE_OUT_PATH] [--remote-copy REMOTE_COPY] + [--remote-exec REMOTE_EXEC] [-S] [-I INVENTORY_DIR] [-A] + [-a] [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t] [host [host ...]] cdist install [-h] [-l LOGLEVEL] [-q] [-v] [-b] [--colors WHEN] [-g CONFIG_FILE] [-4] [-6] [-C CACHE_PATH_PATTERN] - [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] - [-P] [-R [{tar,tgz,tbz2,txz}]] [-r REMOTE_OUT_PATH] - [--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] - [-S] [-I INVENTORY_DIR] [-A] [-a] [-f HOSTFILE] - [-p [HOST_MAX]] [-s] [-t] + [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [--log-server] + [-n] [-o OUT_PATH] [-P] [-R [{tar,tgz,tbz2,txz}]] + [-r REMOTE_OUT_PATH] [--remote-copy REMOTE_COPY] + [--remote-exec REMOTE_EXEC] [-S] [-I INVENTORY_DIR] [-A] + [-a] [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t] [host [host ...]] cdist inventory [-h] {add-host,add-tag,del-host,del-tag,list} ... @@ -202,6 +202,12 @@ Install command is currently in beta. are supported. Without argument CPU count is used by default. +**--log-server** + Start a log server for sub processes to use. This is + mainly useful when running cdist nested from a code- + local script. Log server is always implicitly started + for 'install' command. + **-n, --dry-run** Do not execute code. diff --git a/scripts/cdist b/scripts/cdist index 664504a0..b1d782ab 100755 --- a/scripts/cdist +++ b/scripts/cdist @@ -60,10 +60,9 @@ def commandline(): if __name__ == "__main__": - cdistpythonversion = '3.5' - if sys.version < cdistpythonversion: + if sys.version < cdist.MIN_SUPPORTED_PYTHON_VERSION: print('Python >= {} is required on the source host.'.format( - cdistpythonversion), file=sys.stderr) + cdist.MIN_SUPPORTED_PYTHON_VERSIO), file=sys.stderr) sys.exit(1) exit_code = 0