From 0af64c01bf438bcab808b93f973b0ef71b3c4989 Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Tue, 25 Jul 2017 11:12:18 +0200 Subject: [PATCH] Add -p HOST_MAX argument. --- cdist/argparse.py | 19 ++++++++------ cdist/config.py | 55 +++++++++++++++++++++++++++++------------ docs/src/man1/cdist.rst | 17 +++++++------ 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/cdist/argparse.py b/cdist/argparse.py index b065a179..d8b2c294 100644 --- a/cdist/argparse.py +++ b/cdist/argparse.py @@ -152,9 +152,10 @@ def get_parsers(): parser['config_main'].add_argument( '-j', '--jobs', nargs='?', type=check_positive_int, - help=('Specify the maximum number of parallel jobs. Global ' - 'explorers, object prepare and object run are supported ' - '(currently in beta)'), + help=('Operate in parallel in specified maximum number of jobs. ' + 'Global explorers, object prepare and object run are ' + 'supported. Without argument CPU count is used by default. ' + 'Currently in beta.'), action='store', dest='jobs', const=multiprocessing.cpu_count()) parser['config_main'].add_argument( @@ -204,13 +205,17 @@ def get_parsers(): 'default, read hosts from stdin.'), dest='hostfile', required=False) parser['config_args'].add_argument( - '-p', '--parallel', - help='operate on multiple hosts in parallel', - action='store_true', dest='parallel') + '-p', '--parallel', nargs='?', metavar='HOST_MAX', + type=check_positive_int, + help=('Operate on multiple hosts in parallel for specified maximum ' + 'hosts at a time. Without argument CPU count is used by ' + 'default.'), + action='store', dest='parallel', + const=multiprocessing.cpu_count()) parser['config_args'].add_argument( '-s', '--sequential', help='operate on multiple hosts sequentially (default)', - action='store_false', dest='parallel') + action='store_const', dest='parallel', const=0) parser['config_args'].add_argument( '-t', '--tag', help=('host is specified by tag, not hostname/address; ' diff --git a/cdist/config.py b/cdist/config.py index 2c9721f5..f153ced5 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -129,7 +129,6 @@ class Config(object): cls._check_and_prepare_args(args) - process = {} failed_hosts = [] time_start = time.time() @@ -153,6 +152,16 @@ class Config(object): else: it = itertools.chain(cls.hosts(args.host), cls.hosts(args.hostfile)) + + process_args = [] + # No new child process if only one host at a time. + if args.parallel == 1: + log.debug("Only 1 parallel process, doing it sequentially") + args.parallel = 0 + if args.parallel: + log.trace("Processing hosts in parallel") + else: + log.trace("Processing hosts sequentially") for entry in it: if isinstance(entry, tuple): # if configuring by specified tags @@ -178,27 +187,37 @@ class Config(object): hostcnt += 1 if args.parallel: - log.trace("Creating child process for %s", host) - process[host] = multiprocessing.Process( - target=cls.onehost, - args=(host, host_tags, host_base_path, hostdir, args, - True)) - process[host].start() + pargs = (host, host_tags, host_base_path, hostdir, args, True) + log.trace(("Args for multiprocessing operation " + "for host {}: {}".format(host, pargs))) + process_args.append(pargs) else: try: cls.onehost(host, host_tags, host_base_path, hostdir, args, parallel=False) except cdist.Error as e: failed_hosts.append(host) - + if args.parallel and len(process_args) == 1: + log.debug("Only 1 host for parallel processing, doing it " + "sequentially") + try: + cls.onehost(*process_args[0]) + except cdist.Error as e: + failed_hosts.append(host) # Catch errors in parallel mode when joining if args.parallel: - for host in process.keys(): - log.trace("Joining process %s", host) - process[host].join() + log.trace("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + log.trace(("Starting multiprocessing Pool for {} " + "parallel host operation".format(args.parallel))) - if not process[host].exitcode == 0: - failed_hosts.append(host) + results = mp_pool_run(cls.onehost, process_args, jobs=args.parallel) + log.trace(("Multiprocessing for parallel host operation " + "finished")) + log.trace(("Multiprocessing for parallel host operation " + "results: {}", results)) + + failed_hosts = [host for host, result in results if not result] time_end = time.time() log.verbose("Total processing time for %s host(s): %s", hostcnt, @@ -236,7 +255,10 @@ class Config(object): @classmethod def onehost(cls, host, host_tags, host_base_path, host_dir_name, args, parallel): - """Configure ONE system""" + """Configure ONE system. + If operating in parallel then return tuple (host, True|False, ) + so that main process knows for which host function was successful. + """ log = logging.getLogger(host) @@ -273,8 +295,7 @@ class Config(object): except cdist.Error as e: log.error(e) if parallel: - # We are running in our own process here, need to sys.exit! - sys.exit(1) + return (host, False, ) else: raise @@ -285,6 +306,8 @@ class Config(object): # Pass back to controlling code in sequential mode else: raise + if parallel: + return (host, True, ) @staticmethod def create_base_root_path(out_path=None): diff --git a/docs/src/man1/cdist.rst b/docs/src/man1/cdist.rst index d6bd1c8f..829b3824 100644 --- a/docs/src/man1/cdist.rst +++ b/docs/src/man1/cdist.rst @@ -19,14 +19,14 @@ SYNOPSIS [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a] - [-f HOSTFILE] [-p] [-s] [-t] + [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t] [host [host ...]] cdist install [-h] [-q] [-v] [-b] [-C CACHE_PATH_PATTERN] [-c CONF_DIR] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a] - [-f HOSTFILE] [-p] [-s] [-t] + [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t] [host [host ...]] cdist inventory [-h] [-q] [-v] [-b] [-I INVENTORY_DIR] @@ -153,9 +153,10 @@ Configure/install one or more hosts. .. option:: -j [JOBS], --jobs [JOBS] - Specify the maximum number of parallel jobs. Global - explorers, object prepare and object run are supported - (currently in beta). + Operate in parallel in specified maximum number of + jobs. Global explorers, object prepare and object run + are supported. Without argument CPU count is used by + default. Currently in beta. .. option:: -n, --dry-run @@ -165,9 +166,11 @@ Configure/install one or more hosts. Directory to save cdist output in -.. option:: -p, --parallel +.. option:: -p [HOST_MAX], --parallel [HOST_MAX] - Operate on multiple hosts in parallel + Operate on multiple hosts in parallel for specified + maximum hosts at a time. Without argument CPU count is + used by default. .. option:: -r REMOTE_OUT_PATH, --remote-out-dir REMOTE_OUT_PATH