Merge branch 'log-server-new-min-py-ver' into 'master'

Log server to capture nested logging output

See merge request ungleich-public/cdist!891
This commit is contained in:
poljakowski 2020-06-13 13:46:05 +02:00
commit 7a48b30d7a
8 changed files with 123 additions and 32 deletions

View File

@ -26,6 +26,7 @@ import hashlib
import cdist.log import cdist.log
import cdist.version import cdist.version
VERSION = cdist.version.VERSION VERSION = cdist.version.VERSION
BANNER = """ BANNER = """
@ -48,6 +49,9 @@ REMOTE_EXEC = "ssh -o User=root"
REMOTE_CMDS_CLEANUP_PATTERN = "ssh -o User=root -O exit -S {}" REMOTE_CMDS_CLEANUP_PATTERN = "ssh -o User=root -O exit -S {}"
MIN_SUPPORTED_PYTHON_VERSION = '3.5'
class Error(Exception): class Error(Exception):
"""Base exception class for this project""" """Base exception class for this project"""
pass pass

View File

@ -206,6 +206,13 @@ def get_parsers():
'supported. Without argument CPU count is used by default. '), 'supported. Without argument CPU count is used by default. '),
action='store', dest='jobs', action='store', dest='jobs',
const=multiprocessing.cpu_count()) const=multiprocessing.cpu_count())
parser['config_main'].add_argument(
'--log-server',
action='store_true',
help=('Start a log server for sub processes to use. '
'This is mainly useful when running cdist nested '
'from a code-local script. Log server is alwasy '
'implicitly started for \'install\' command.'))
parser['config_main'].add_argument( parser['config_main'].add_argument(
'-n', '--dry-run', '-n', '--dry-run',
help='Do not execute code.', action='store_true') help='Do not execute code.', action='store_true')

View File

@ -29,18 +29,20 @@ import time
import itertools import itertools
import tempfile import tempfile
import multiprocessing import multiprocessing
from cdist.mputil import mp_pool_run, mp_sig_handler
import atexit import atexit
import shutil import shutil
import socket import socket
from cdist.mputil import mp_pool_run, mp_sig_handler
from cdist import core, inventory
from cdist.util.remoteutil import inspect_ssh_mux_opts
import cdist import cdist
import cdist.hostsource import cdist.hostsource
import cdist.exec.local import cdist.exec.local
import cdist.exec.remote import cdist.exec.remote
import cdist.util.ipaddr as ipaddr import cdist.util.ipaddr as ipaddr
import cdist.configuration import cdist.configuration
from cdist import core, inventory
from cdist.util.remoteutil import inspect_ssh_mux_opts
def graph_check_cycle(graph): def graph_check_cycle(graph):
@ -195,7 +197,6 @@ class Config(object):
@classmethod @classmethod
def commandline(cls, args): def commandline(cls, args):
"""Configure remote system""" """Configure remote system"""
if (args.parallel and args.parallel != 1) or args.jobs: if (args.parallel and args.parallel != 1) or args.jobs:
if args.timestamp: if args.timestamp:
cdist.log.setupTimestampingParallelLogging() cdist.log.setupTimestampingParallelLogging()
@ -382,10 +383,16 @@ class Config(object):
If operating in parallel then return tuple (host, True|False, ) If operating in parallel then return tuple (host, True|False, )
so that main process knows for which host function was successful. so that main process knows for which host function was successful.
""" """
log = logging.getLogger(host) log = logging.getLogger(host)
try: try:
if args.log_server:
# Start a log server so that nested `cdist config` runs
# have a place to send their logs to.
log_server_socket_dir = tempfile.mkdtemp()
cls._register_path_for_removal(log_server_socket_dir)
cdist.log.setupLogServer(log_server_socket_dir, log)
remote_exec, remote_copy, cleanup_cmd = cls._resolve_remote_cmds( remote_exec, remote_copy, cleanup_cmd = cls._resolve_remote_cmds(
args) args)
log.debug("remote_exec for host \"{}\": {}".format( log.debug("remote_exec for host \"{}\": {}".format(

View File

@ -116,6 +116,10 @@ class Code(object):
if dry_run: if dry_run:
self.env['__cdist_dry_run'] = '1' self.env['__cdist_dry_run'] = '1'
if '__cdist_log_server_socket_export' in os.environ:
self.env['__cdist_log_server_socket'] = os.environ[
'__cdist_log_server_socket_export']
def _run_gencode(self, cdist_object, which): def _run_gencode(self, cdist_object, which):
cdist_type = cdist_object.cdist_type cdist_type = cdist_object.cdist_type
script = os.path.join(self.local.type_path, script = os.path.join(self.local.type_path,

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# 2013 Steven Armstrong (steven-cdist at armstrong.cc) # 2013-2019 Steven Armstrong (steven-cdist at armstrong.cc)
# #
# This file is part of cdist. # This file is part of cdist.
# #
@ -22,9 +22,21 @@
import cdist.config import cdist.config
import cdist.core import cdist.core
import cdist.log
class Install(cdist.config.Config): class Install(cdist.config.Config):
@classmethod
def onehost(cls, host, host_tags, host_base_path, host_dir_name, args,
parallel, configuration, remove_remote_files_dirs=False):
# Always start log server during cdist install so that nested
# `cdist config` runs have a place to send their logs to.
args.log_server = True
super().onehost(host, host_tags, host_base_path, host_dir_name, args,
parallel, configuration, remove_remote_files_dirs)
def object_list(self): def object_list(self):
"""Short name for object list retrieval. """Short name for object list retrieval.
In install mode, we only care about install objects. In install mode, we only care about install objects.

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# 2010-2013 Nico Schottelius (nico-cdist at schottelius.org) # 2010-2013 Nico Schottelius (nico-cdist at schottelius.org)
# 2019-2020 Steven Armstrong
# #
# This file is part of cdist. # This file is part of cdist.
# #
@ -20,9 +21,16 @@
# #
# #
import logging
import sys
import datetime import datetime
import logging
import logging.handlers
import sys
import os
import asyncio
import contextlib
import pickle
import struct
import threading
# Define additional cdist logging levels. # Define additional cdist logging levels.
@ -89,20 +97,26 @@ class DefaultLog(logging.Logger):
super().__init__(name) super().__init__(name)
self.propagate = False self.propagate = False
formatter = CdistFormatter(self.FORMAT) if '__cdist_log_server_socket' in os.environ:
log_server_socket = os.environ['__cdist_log_server_socket']
socket_handler = logging.handlers.SocketHandler(log_server_socket,
None)
self.addHandler(socket_handler)
else:
formatter = CdistFormatter(self.FORMAT)
stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.addFilter(self.StdoutFilter()) stdout_handler.addFilter(self.StdoutFilter())
stdout_handler.setLevel(logging.TRACE) stdout_handler.setLevel(logging.TRACE)
stdout_handler.setFormatter(formatter) stdout_handler.setFormatter(formatter)
stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.addFilter(self.StderrFilter()) stderr_handler.addFilter(self.StderrFilter())
stderr_handler.setLevel(logging.ERROR) stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(formatter) stderr_handler.setFormatter(formatter)
self.addHandler(stdout_handler) self.addHandler(stdout_handler)
self.addHandler(stderr_handler) self.addHandler(stderr_handler)
def verbose(self, msg, *args, **kwargs): def verbose(self, msg, *args, **kwargs):
self.log(logging.VERBOSE, msg, *args, **kwargs) self.log(logging.VERBOSE, msg, *args, **kwargs)
@ -152,4 +166,42 @@ def setupParallelLogging():
logging.setLoggerClass(ParallelLog) logging.setLoggerClass(ParallelLog)
async def handle_log_client(reader, writer):
while True:
chunk = await reader.read(4)
if len(chunk) < 4:
return
data_size = struct.unpack('>L', chunk)[0]
data = await reader.read(data_size)
obj = pickle.loads(data)
record = logging.makeLogRecord(obj)
logger = logging.getLogger(record.name)
logger.handle(record)
def run_log_server(server_address):
# Get a new loop inside the current thread to run the log server.
loop = asyncio.new_event_loop()
loop.create_task(asyncio.start_unix_server(handle_log_client,
server_address))
loop.run_forever()
def setupLogServer(socket_dir, log=logging.getLogger(__name__)):
"""Run a asyncio based unix socket log server in a background thread.
"""
log_server_socket = os.path.join(socket_dir, 'log-server')
log.debug('Starting logging server on: %s', log_server_socket)
os.environ['__cdist_log_server_socket_export'] = log_server_socket
with contextlib.suppress(FileNotFoundError):
os.remove(log_server_socket)
t = threading.Thread(target=run_log_server, args=(log_server_socket,))
# Deamonizing the thread means we don't have to care about stoping it.
# It will die together with the main process.
t.daemon = True
t.start()
setupDefaultLogging() setupDefaultLogging()

View File

@ -17,20 +17,20 @@ SYNOPSIS
cdist config [-h] [-l LOGLEVEL] [-q] [-v] [-b] [--colors WHEN] cdist config [-h] [-l LOGLEVEL] [-q] [-v] [-b] [--colors WHEN]
[-g CONFIG_FILE] [-4] [-6] [-C CACHE_PATH_PATTERN] [-g CONFIG_FILE] [-4] [-6] [-C CACHE_PATH_PATTERN]
[-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-P] [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [--log-server]
[-R [{tar,tgz,tbz2,txz}]] [-r REMOTE_OUT_PATH] [-n] [-o OUT_PATH] [-P] [-R [{tar,tgz,tbz2,txz}]]
[--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] [-r REMOTE_OUT_PATH] [--remote-copy REMOTE_COPY]
[-S] [-I INVENTORY_DIR] [-A] [-a] [-f HOSTFILE] [--remote-exec REMOTE_EXEC] [-S] [-I INVENTORY_DIR] [-A]
[-p [HOST_MAX]] [-s] [-t] [-a] [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
[host [host ...]] [host [host ...]]
cdist install [-h] [-l LOGLEVEL] [-q] [-v] [-b] [--colors WHEN] cdist install [-h] [-l LOGLEVEL] [-q] [-v] [-b] [--colors WHEN]
[-g CONFIG_FILE] [-4] [-6] [-C CACHE_PATH_PATTERN] [-g CONFIG_FILE] [-4] [-6] [-C CACHE_PATH_PATTERN]
[-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [--log-server]
[-P] [-R [{tar,tgz,tbz2,txz}]] [-r REMOTE_OUT_PATH] [-n] [-o OUT_PATH] [-P] [-R [{tar,tgz,tbz2,txz}]]
[--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] [-r REMOTE_OUT_PATH] [--remote-copy REMOTE_COPY]
[-S] [-I INVENTORY_DIR] [-A] [-a] [-f HOSTFILE] [--remote-exec REMOTE_EXEC] [-S] [-I INVENTORY_DIR] [-A]
[-p [HOST_MAX]] [-s] [-t] [-a] [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
[host [host ...]] [host [host ...]]
cdist inventory [-h] {add-host,add-tag,del-host,del-tag,list} ... cdist inventory [-h] {add-host,add-tag,del-host,del-tag,list} ...
@ -202,6 +202,12 @@ Install command is currently in beta.
are supported. Without argument CPU count is used by are supported. Without argument CPU count is used by
default. default.
**--log-server**
Start a log server for sub processes to use. This is
mainly useful when running cdist nested from a code-
local script. Log server is always implicitly started
for 'install' command.
**-n, --dry-run** **-n, --dry-run**
Do not execute code. Do not execute code.

View File

@ -60,10 +60,9 @@ def commandline():
if __name__ == "__main__": if __name__ == "__main__":
cdistpythonversion = '3.5' if sys.version < cdist.MIN_SUPPORTED_PYTHON_VERSION:
if sys.version < cdistpythonversion:
print('Python >= {} is required on the source host.'.format( print('Python >= {} is required on the source host.'.format(
cdistpythonversion), file=sys.stderr) cdist.MIN_SUPPORTED_PYTHON_VERSIO), file=sys.stderr)
sys.exit(1) sys.exit(1)
exit_code = 0 exit_code = 0