diff --git a/cdist/config.py b/cdist/config.py index ca8dcfec..d6f8a482 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -124,6 +124,27 @@ class Config(object): def commandline(cls, args): """Configure remote system""" + # No new child process if only one host at a time. + if args.parallel == 1: + log.debug("Only 1 parallel process, doing it sequentially") + args.parallel = 0 + + if args.parallel or args.jobs: + # If parallel execution then also log process id + del logging.getLogger().handlers[:] + log_format = '%(levelname)s: [%(process)d]: %(message)s' + logging.basicConfig(format=log_format) + + if args.parallel: + import signal + + def sigterm_handler(signum, frame): + log.trace("signal %s, killing whole process group", signum) + os.killpg(os.getpgrp(), signal.SIGKILL) + + signal.signal(signal.SIGTERM, sigterm_handler) + signal.signal(signal.SIGHUP, sigterm_handler) + # FIXME: Refactor relict - remove later log = logging.getLogger("cdist") @@ -154,10 +175,6 @@ class Config(object): cls.hosts(args.hostfile)) process_args = [] - # No new child process if only one host at a time. - if args.parallel == 1: - log.debug("Only 1 parallel process, doing it sequentially") - args.parallel = 0 if args.parallel: log.trace("Processing hosts in parallel") else: @@ -216,8 +233,8 @@ class Config(object): jobs=args.parallel) log.trace(("Multiprocessing for parallel host operation " "finished")) - log.trace(("Multiprocessing for parallel host operation " - "results: {}", results)) + log.trace("Multiprocessing for parallel host operation " + "results: %s", results) failed_hosts = [host for host, result in results if not result] @@ -301,13 +318,6 @@ class Config(object): else: raise - except KeyboardInterrupt: - # Ignore in parallel mode, we are existing anyway - if parallel: - sys.exit(0) - # Pass back to controlling code in sequential mode - else: - raise if parallel: return (host, True, ) diff --git a/cdist/mputil.py b/cdist/mputil.py index e564d749..d823e192 100644 --- a/cdist/mputil.py +++ b/cdist/mputil.py @@ -21,14 +21,21 @@ import multiprocessing +import concurrent.futures as cf import itertools +import os +import signal +import logging + + +log = logging.getLogger("cdist-mputil") def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): - """ Run func using multiprocessing.Pool with jobs jobs and supplied - iterable of args and kwds with one entry for each parallel func - instance. - Return list of results. + """Run func using concurrent.futures.ProcessPoolExecutor with jobs jobs + and supplied iterables of args and kwds with one entry for each + parallel func instance. + Return list of results. """ if args and kwds: fargs = zip(args, kwds) @@ -39,10 +46,18 @@ def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): else: return [func(), ] - with multiprocessing.Pool(jobs) as pool: - results = [ - pool.apply_async(func, a, k) - for a, k in fargs - ] - retval = [r.get() for r in results] - return retval + retval = [] + with cf.ProcessPoolExecutor(jobs) as executor: + try: + results = [ + executor.submit(func, *a, **k) for a, k in fargs + ] + for f in cf.as_completed(results): + retval.append(f.result()) + return retval + except KeyboardInterrupt: + log.trace("KeyboardInterrupt, killing process group") + # When Ctrl+C in terminal then kill whole process group. + # Otherwise there remain processes in sleeping state. + os.killpg(os.getpgrp(), signal.SIGKILL) + raise