From 0af64c01bf438bcab808b93f973b0ef71b3c4989 Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Tue, 25 Jul 2017 11:12:18 +0200
Subject: [PATCH] Add -p HOST_MAX argument.

---
 cdist/argparse.py       | 19 ++++++++------
 cdist/config.py         | 55 +++++++++++++++++++++++++++++------------
 docs/src/man1/cdist.rst | 17 +++++++------
 3 files changed, 61 insertions(+), 30 deletions(-)

diff --git a/cdist/argparse.py b/cdist/argparse.py
index b065a179..d8b2c294 100644
--- a/cdist/argparse.py
+++ b/cdist/argparse.py
@@ -152,9 +152,10 @@ def get_parsers():
     parser['config_main'].add_argument(
            '-j', '--jobs', nargs='?',
            type=check_positive_int,
-           help=('Specify the maximum number of parallel jobs. Global '
-                 'explorers, object prepare and object run are supported '
-                 '(currently in beta)'),
+           help=('Operate in parallel in specified maximum number of jobs. '
+                 'Global explorers, object prepare and object run are '
+                 'supported. Without argument CPU count is used by default. '
+                 'Currently in beta.'),
            action='store', dest='jobs',
            const=multiprocessing.cpu_count())
     parser['config_main'].add_argument(
@@ -204,13 +205,17 @@ def get_parsers():
                   'default, read hosts from stdin.'),
             dest='hostfile', required=False)
     parser['config_args'].add_argument(
-           '-p', '--parallel',
-           help='operate on multiple hosts in parallel',
-           action='store_true', dest='parallel')
+           '-p', '--parallel', nargs='?', metavar='HOST_MAX',
+           type=check_positive_int,
+           help=('Operate on multiple hosts in parallel for specified maximum '
+                 'hosts at a time. Without argument CPU count is used by '
+                 'default.'),
+           action='store', dest='parallel',
+           const=multiprocessing.cpu_count())
     parser['config_args'].add_argument(
            '-s', '--sequential',
            help='operate on multiple hosts sequentially (default)',
-           action='store_false', dest='parallel')
+           action='store_const', dest='parallel', const=0)
     parser['config_args'].add_argument(
              '-t', '--tag',
              help=('host is specified by tag, not hostname/address; '
diff --git a/cdist/config.py b/cdist/config.py
index 2c9721f5..f153ced5 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -129,7 +129,6 @@ class Config(object):
 
         cls._check_and_prepare_args(args)
 
-        process = {}
         failed_hosts = []
         time_start = time.time()
 
@@ -153,6 +152,16 @@ class Config(object):
         else:
             it = itertools.chain(cls.hosts(args.host),
                                  cls.hosts(args.hostfile))
+
+        process_args = []
+        # No new child process if only one host at a time.
+        if args.parallel == 1:
+            log.debug("Only 1 parallel process, doing it sequentially")
+            args.parallel = 0
+        if args.parallel:
+            log.trace("Processing hosts in parallel")
+        else:
+            log.trace("Processing hosts sequentially")
         for entry in it:
             if isinstance(entry, tuple):
                 # if configuring by specified tags
@@ -178,27 +187,37 @@ class Config(object):
 
             hostcnt += 1
             if args.parallel:
-                log.trace("Creating child process for %s", host)
-                process[host] = multiprocessing.Process(
-                        target=cls.onehost,
-                        args=(host, host_tags, host_base_path, hostdir, args,
-                              True))
-                process[host].start()
+                pargs = (host, host_tags, host_base_path, hostdir, args, True)
+                log.trace(("Args for multiprocessing operation "
+                                "for host {}: {}".format(host, pargs)))
+                process_args.append(pargs)
             else:
                 try:
                     cls.onehost(host, host_tags, host_base_path, hostdir,
                                 args, parallel=False)
                 except cdist.Error as e:
                     failed_hosts.append(host)
-
+        if args.parallel and len(process_args) == 1:
+            log.debug("Only 1 host for parallel processing, doing it "
+                      "sequentially")
+            try:
+                cls.onehost(*process_args[0])
+            except cdist.Error as e:
+                failed_hosts.append(host)
         # Catch errors in parallel mode when joining
         if args.parallel:
-            for host in process.keys():
-                log.trace("Joining process %s", host)
-                process[host].join()
+            log.trace("Multiprocessing start method is {}".format(
+                multiprocessing.get_start_method()))
+            log.trace(("Starting multiprocessing Pool for {} "
+                            "parallel host operation".format(args.parallel)))
 
-                if not process[host].exitcode == 0:
-                    failed_hosts.append(host)
+            results = mp_pool_run(cls.onehost, process_args, jobs=args.parallel)
+            log.trace(("Multiprocessing for parallel host operation "
+                            "finished"))
+            log.trace(("Multiprocessing for parallel host operation "
+                            "results: {}", results))
+
+            failed_hosts = [host for host, result in results if not result]
 
         time_end = time.time()
         log.verbose("Total processing time for %s host(s): %s", hostcnt,
@@ -236,7 +255,10 @@ class Config(object):
     @classmethod
     def onehost(cls, host, host_tags, host_base_path, host_dir_name, args,
                 parallel):
-        """Configure ONE system"""
+        """Configure ONE system.
+           If operating in parallel then return tuple (host, True|False, )
+           so that main process knows for which host function was successful.
+        """
 
         log = logging.getLogger(host)
 
@@ -273,8 +295,7 @@ class Config(object):
         except cdist.Error as e:
             log.error(e)
             if parallel:
-                # We are running in our own process here, need to sys.exit!
-                sys.exit(1)
+                return (host, False, )
             else:
                 raise
 
@@ -285,6 +306,8 @@ class Config(object):
             # Pass back to controlling code in sequential mode
             else:
                 raise
+        if parallel:
+            return (host, True, )
 
     @staticmethod
     def create_base_root_path(out_path=None):
diff --git a/docs/src/man1/cdist.rst b/docs/src/man1/cdist.rst
index d6bd1c8f..829b3824 100644
--- a/docs/src/man1/cdist.rst
+++ b/docs/src/man1/cdist.rst
@@ -19,14 +19,14 @@ SYNOPSIS
                  [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
                  [-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
                  [--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
-                 [-f HOSTFILE] [-p] [-s] [-t]
+                 [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
                  [host [host ...]] 
 
     cdist install [-h] [-q] [-v] [-b] [-C CACHE_PATH_PATTERN] [-c CONF_DIR]
                   [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
                   [-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
                   [--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
-                  [-f HOSTFILE] [-p] [-s] [-t]
+                  [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
                   [host [host ...]] 
 
     cdist inventory [-h] [-q] [-v] [-b] [-I INVENTORY_DIR]
@@ -153,9 +153,10 @@ Configure/install one or more hosts.
 
 .. option:: -j [JOBS], --jobs [JOBS]
 
-    Specify the maximum number of parallel jobs. Global
-    explorers, object prepare and object run are supported
-    (currently in beta).
+    Operate in parallel in specified maximum number of
+    jobs. Global explorers, object prepare and object run
+    are supported. Without argument CPU count is used by
+    default. Currently in beta.
 
 .. option:: -n, --dry-run
 
@@ -165,9 +166,11 @@ Configure/install one or more hosts.
 
     Directory to save cdist output in
 
-.. option:: -p, --parallel
+.. option:: -p [HOST_MAX], --parallel [HOST_MAX]
 
-    Operate on multiple hosts in parallel
+    Operate on multiple hosts in parallel for specified
+    maximum hosts at a time. Without argument CPU count is
+    used by default.
 
 .. option:: -r REMOTE_OUT_PATH, --remote-out-dir REMOTE_OUT_PATH