From 6c1b215db8a78a3ae5a9bad036c605bd5fcc3aee Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Tue, 6 Dec 2016 14:27:17 +0100
Subject: [PATCH 1/7] Begin parallelizing object prepare and run.

---
 cdist/config.py | 43 +++++++++++++++++++++++++++++++++----------
 1 file changed, 33 insertions(+), 10 deletions(-)

diff --git a/cdist/config.py b/cdist/config.py
index b1a120ca..42dd73b8 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -27,6 +27,7 @@ import time
 import itertools
 import tempfile
 import socket
+import concurrent.futures
 
 import cdist
 import cdist.hostsource
@@ -265,13 +266,14 @@ class Config(object):
             else:
                 yield cdist_object
 
-    def iterate_once(self):
+    def iterate_once_parallel(self):
         """
             Iterate over the objects once - helper method for
             iterate_until_finished
         """
         objects_changed = False
 
+        cargo = []
         for cdist_object in self.object_list():
             if cdist_object.requirements_unfinished(cdist_object.requirements):
                 """We cannot do anything for this poor object"""
@@ -280,18 +282,38 @@ class Config(object):
             if cdist_object.state == core.CdistObject.STATE_UNDEF:
                 """Prepare the virgin object"""
 
-                self.object_prepare(cdist_object)
-                objects_changed = True
+                # self.object_prepare(cdist_object)
+                # objects_changed = True
+                cargo.append(cdist_object)
 
-            if cdist_object.requirements_unfinished(cdist_object.autorequire):
-                """The previous step created objects we depend on -
-                   wait for them
-                """
+        if cargo:
+            with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
+                for x in executor.map(self.object_prepare, cargo):
+                    pass  # returns None
+            objects_changed = True
+
+        cargo.clear()
+        for cdist_object in self.object_list():
+            if cdist_object.requirements_unfinished(cdist_object.requirements):
+                """We cannot do anything for this poor object"""
                 continue
 
             if cdist_object.state == core.CdistObject.STATE_PREPARED:
-                self.object_run(cdist_object)
-                objects_changed = True
+                if cdist_object.requirements_unfinished(cdist_object.autorequire):
+                    """The previous step created objects we depend on -
+                    wait for them
+                    """
+                    continue
+
+                # self.object_run(cdist_object)
+                # objects_changed = True
+                cargo.append(cdist_object)
+
+        if cargo:
+            with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
+                for x in executor.map(self.object_run, cargo):
+                    pass  # returns None
+            objects_changed = True
 
         return objects_changed
 
@@ -304,7 +326,8 @@ class Config(object):
         objects_changed = True
 
         while objects_changed:
-            objects_changed = self.iterate_once()
+            if self.jobs:
+                objects_changed = self.iterate_once_parallel()
 
         # Check whether all objects have been finished
         unfinished_objects = []

From 1952d43073f571ec0b4c1bb19d8203876866fab8 Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Wed, 7 Dec 2016 19:06:51 +0100
Subject: [PATCH 2/7] Initial try for parallelization.

---
 cdist/config.py | 41 +++++++++++++++++++++++++++++++++++++----
 1 file changed, 37 insertions(+), 4 deletions(-)

diff --git a/cdist/config.py b/cdist/config.py
index a20a4840..f3faa03a 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -267,11 +267,44 @@ class Config(object):
             else:
                 yield cdist_object
 
-    def iterate_once_parallel(self):
+    def iterate_once(self):
         """
             Iterate over the objects once - helper method for
             iterate_until_finished
         """
+        if self.jobs:
+            objects_changed = self._iterate_once_parallel()
+        else:
+            objects_changed = self._iterate_once_sequential()
+        return objects_changed
+
+    def _iterate_once_sequential(self):
+        objects_changed = False
+
+        for cdist_object in self.object_list():
+            if cdist_object.requirements_unfinished(cdist_object.requirements):
+                """We cannot do anything for this poor object"""
+                continue
+
+            if cdist_object.state == core.CdistObject.STATE_UNDEF:
+                """Prepare the virgin object"""
+
+                self.object_prepare(cdist_object)
+                objects_changed = True
+
+            if cdist_object.requirements_unfinished(cdist_object.autorequire):
+                """The previous step created objects we depend on -
+                   wait for them
+                """
+                continue
+
+            if cdist_object.state == core.CdistObject.STATE_PREPARED:
+                self.object_run(cdist_object)
+                objects_changed = True
+
+        return objects_changed
+
+    def _iterate_once_parallel(self):
         objects_changed = False
 
         cargo = []
@@ -300,7 +333,8 @@ class Config(object):
                 continue
 
             if cdist_object.state == core.CdistObject.STATE_PREPARED:
-                if cdist_object.requirements_unfinished(cdist_object.autorequire):
+                if cdist_object.requirements_unfinished(
+                        cdist_object.autorequire):
                     """The previous step created objects we depend on -
                     wait for them
                     """
@@ -327,8 +361,7 @@ class Config(object):
         objects_changed = True
 
         while objects_changed:
-            if self.jobs:
-                objects_changed = self.iterate_once_parallel()
+            objects_changed = self.iterate_once()
 
         # Check whether all objects have been finished
         unfinished_objects = []

From ca3a8ddf67f060414ed3f065791821bd086c3267 Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Thu, 8 Dec 2016 00:47:07 +0100
Subject: [PATCH 3/7] no clear() in python < 3.3; if only one then do it
 sequentially

---
 cdist/config.py | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/cdist/config.py b/cdist/config.py
index f3faa03a..655153be 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -320,13 +320,16 @@ class Config(object):
                 # objects_changed = True
                 cargo.append(cdist_object)
 
-        if cargo:
+        if len(cargo) == 1:
+            self.object_prepare(cargo[0])
+            objects_changed = True
+        elif cargo:
             with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
                 for x in executor.map(self.object_prepare, cargo):
                     pass  # returns None
             objects_changed = True
 
-        cargo.clear()
+        del cargo[:]
         for cdist_object in self.object_list():
             if cdist_object.requirements_unfinished(cdist_object.requirements):
                 """We cannot do anything for this poor object"""
@@ -344,7 +347,10 @@ class Config(object):
                 # objects_changed = True
                 cargo.append(cdist_object)
 
-        if cargo:
+        if len(cargo) == 1:
+            self.object_run(cargo[0])
+            objects_changed = True
+        elif cargo:
             with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
                 for x in executor.map(self.object_run, cargo):
                     pass  # returns None

From e6b9fc90bac30090f8ef19512b586770b704767f Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Thu, 8 Dec 2016 14:11:30 +0100
Subject: [PATCH 4/7] Add log messages.

---
 cdist/config.py | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/cdist/config.py b/cdist/config.py
index 655153be..111fa233 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -279,6 +279,7 @@ class Config(object):
         return objects_changed
 
     def _iterate_once_sequential(self):
+        self.log.info("Iteration in sequential mode")
         objects_changed = False
 
         for cdist_object in self.object_list():
@@ -305,6 +306,8 @@ class Config(object):
         return objects_changed
 
     def _iterate_once_parallel(self):
+        self.log.info("Iteration in parallel mode in {} jobs".format(
+            self.jobs))
         objects_changed = False
 
         cargo = []
@@ -320,13 +323,17 @@ class Config(object):
                 # objects_changed = True
                 cargo.append(cdist_object)
 
-        if len(cargo) == 1:
+        n = len(cargo)
+        if n == 1:
+            self.log.debug("Only one object, preparing sequentially")
             self.object_prepare(cargo[0])
             objects_changed = True
         elif cargo:
+            self.log.debug("Preparing {} objects in parallel".format(n))
             with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
                 for x in executor.map(self.object_prepare, cargo):
                     pass  # returns None
+            self.log.debug("Preparation finished")
             objects_changed = True
 
         del cargo[:]
@@ -347,13 +354,17 @@ class Config(object):
                 # objects_changed = True
                 cargo.append(cdist_object)
 
-        if len(cargo) == 1:
+        n = len(cargo)
+        if n == 1:
+            self.log.debug("Only one object, running sequentially")
             self.object_run(cargo[0])
             objects_changed = True
         elif cargo:
+            self.log.debug("Running {} objects in parallel".format(n))
             with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
                 for x in executor.map(self.object_run, cargo):
                     pass  # returns None
+            self.log.debug("Running finished")
             objects_changed = True
 
         return objects_changed

From 8776a2ee067e6254fd87290cef21bf5f2b669edb Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Thu, 8 Dec 2016 17:36:57 +0100
Subject: [PATCH 5/7] concurrent.futures -> multiprocessing

---
 cdist/config.py          | 65 ++++++++++++++++++++++++++++++++--------
 cdist/core/cdist_type.py |  3 ++
 cdist/core/manifest.py   | 17 ++++++++++-
 3 files changed, 71 insertions(+), 14 deletions(-)

diff --git a/cdist/config.py b/cdist/config.py
index 111fa233..ae63531d 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -27,7 +27,7 @@ import time
 import itertools
 import tempfile
 import socket
-import concurrent.futures
+import multiprocessing
 
 import cdist
 import cdist.hostsource
@@ -47,7 +47,7 @@ class Config(object):
 
         self.local = local
         self.remote = remote
-        self.log = logging.getLogger(self.local.target_host[0])
+        self._open_logger()
         self.dry_run = dry_run
         self.jobs = jobs
 
@@ -123,7 +123,6 @@ class Config(object):
     @classmethod
     def commandline(cls, args):
         """Configure remote system"""
-        import multiprocessing
 
         # FIXME: Refactor relict - remove later
         log = logging.getLogger("cdist")
@@ -329,11 +328,24 @@ class Config(object):
             self.object_prepare(cargo[0])
             objects_changed = True
         elif cargo:
-            self.log.debug("Preparing {} objects in parallel".format(n))
-            with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
-                for x in executor.map(self.object_prepare, cargo):
-                    pass  # returns None
-            self.log.debug("Preparation finished")
+            self.log.debug("Multiprocessing start method is {}".format(
+                multiprocessing.get_start_method()))
+            self.log.debug(("Starting multiprocessing Pool for {} parallel "
+                            "objects preparation".format(n)))
+            with multiprocessing.Pool(self.jobs) as pool:
+                self.log.debug(("Starting async for parallel object "
+                                "preparation"))
+                results = [
+                    pool.apply_async(self.object_prepare, (c,))
+                    for c in cargo
+                ]
+
+                self.log.debug(("Waiting async results for parallel object "
+                                "preparation"))
+                for r in results:
+                    r.get()
+                self.log.debug(("Multiprocessing for parallel object "
+                                "preparation finished"))
             objects_changed = True
 
         del cargo[:]
@@ -360,15 +372,42 @@ class Config(object):
             self.object_run(cargo[0])
             objects_changed = True
         elif cargo:
-            self.log.debug("Running {} objects in parallel".format(n))
-            with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
-                for x in executor.map(self.object_run, cargo):
-                    pass  # returns None
-            self.log.debug("Running finished")
+            self.log.debug("Multiprocessing start method is {}".format(
+                multiprocessing.get_start_method()))
+            self.log.debug(("Starting multiprocessing Pool for {} parallel "
+                            "object run".format(n)))
+            with multiprocessing.Pool(self.jobs) as pool:
+                self.log.debug(("Starting async for parallel object run"))
+                results = [
+                    pool.apply_async(self.object_run, (c,))
+                    for c in cargo
+                ]
+
+                self.log.debug(("Waiting async results for parallel object "
+                                "run"))
+                for r in results:
+                    r.get()
+                self.log.debug(("Multiprocessing for parallel object "
+                                "run finished"))
             objects_changed = True
 
         return objects_changed
 
+    def _open_logger(self):
+        self.log = logging.getLogger(self.local.target_host[0])
+
+    # logger is not pickable, so remove it when we pickle
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        if 'log' in state:
+            del state['log']
+        return state
+
+    # recreate logger when we unpickle
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+        self._open_logger()
+
     def iterate_until_finished(self):
         """
             Go through all objects and solve them
diff --git a/cdist/core/cdist_type.py b/cdist/core/cdist_type.py
index a548f365..14865386 100644
--- a/cdist/core/cdist_type.py
+++ b/cdist/core/cdist_type.py
@@ -79,6 +79,9 @@ class CdistType(object):
 
     _instances = {}
 
+    def __getnewargs__(self):
+        return self.base_path, self.name
+
     def __new__(cls, *args, **kwargs):
         """only one instance of each named type may exist"""
         # name is second argument
diff --git a/cdist/core/manifest.py b/cdist/core/manifest.py
index a16e9346..92a16190 100644
--- a/cdist/core/manifest.py
+++ b/cdist/core/manifest.py
@@ -98,7 +98,7 @@ class Manifest(object):
         self.target_host = target_host
         self.local = local
 
-        self.log = logging.getLogger(self.target_host[0])
+        self._open_logger()
 
         self.env = {
             'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']),
@@ -114,6 +114,21 @@ class Manifest(object):
         if self.log.getEffectiveLevel() == logging.DEBUG:
             self.env.update({'__cdist_debug': "yes"})
 
+    def _open_logger(self):
+        self.log = logging.getLogger(self.target_host[0])
+
+    # logger is not pickable, so remove it when we pickle
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        if 'log' in state:
+            del state['log']
+        return state
+
+    # recreate logger when we unpickle
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+        self._open_logger()
+
     def env_initial_manifest(self, initial_manifest):
         env = os.environ.copy()
         env.update(self.env)

From e5a6599ccb2173a944adc5ac46ad98e7760de0f4 Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Thu, 8 Dec 2016 21:48:59 +0100
Subject: [PATCH 6/7] Create mp_pool_run helper function for running in
 parallel.

---
 cdist/config.py        | 40 ++++++++++++-----------------------
 cdist/core/explorer.py | 21 ++++++------------
 cdist/exec/remote.py   | 30 ++++++++++----------------
 cdist/mputil.py        | 48 ++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 79 insertions(+), 60 deletions(-)
 create mode 100644 cdist/mputil.py

diff --git a/cdist/config.py b/cdist/config.py
index ae63531d..a47811a9 100644
--- a/cdist/config.py
+++ b/cdist/config.py
@@ -28,6 +28,7 @@ import itertools
 import tempfile
 import socket
 import multiprocessing
+from cdist.mputil import mp_pool_run
 
 import cdist
 import cdist.hostsource
@@ -332,20 +333,12 @@ class Config(object):
                 multiprocessing.get_start_method()))
             self.log.debug(("Starting multiprocessing Pool for {} parallel "
                             "objects preparation".format(n)))
-            with multiprocessing.Pool(self.jobs) as pool:
-                self.log.debug(("Starting async for parallel object "
-                                "preparation"))
-                results = [
-                    pool.apply_async(self.object_prepare, (c,))
-                    for c in cargo
-                ]
-
-                self.log.debug(("Waiting async results for parallel object "
-                                "preparation"))
-                for r in results:
-                    r.get()
-                self.log.debug(("Multiprocessing for parallel object "
-                                "preparation finished"))
+            args = [
+                (c, ) for c in cargo
+            ]
+            mp_pool_run(self.object_prepare, args, jobs=self.jobs)
+            self.log.debug(("Multiprocessing for parallel object "
+                            "preparation finished"))
             objects_changed = True
 
         del cargo[:]
@@ -376,19 +369,12 @@ class Config(object):
                 multiprocessing.get_start_method()))
             self.log.debug(("Starting multiprocessing Pool for {} parallel "
                             "object run".format(n)))
-            with multiprocessing.Pool(self.jobs) as pool:
-                self.log.debug(("Starting async for parallel object run"))
-                results = [
-                    pool.apply_async(self.object_run, (c,))
-                    for c in cargo
-                ]
-
-                self.log.debug(("Waiting async results for parallel object "
-                                "run"))
-                for r in results:
-                    r.get()
-                self.log.debug(("Multiprocessing for parallel object "
-                                "run finished"))
+            args = [
+                (c, ) for c in cargo
+            ]
+            mp_pool_run(self.object_run, args, jobs=self.jobs)
+            self.log.debug(("Multiprocessing for parallel object "
+                            "run finished"))
             objects_changed = True
 
         return objects_changed
diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py
index 23996240..45afc5c0 100644
--- a/cdist/core/explorer.py
+++ b/cdist/core/explorer.py
@@ -24,8 +24,7 @@ import logging
 import os
 import glob
 import multiprocessing
-
-import cdist
+from cdist.mputil import mp_pool_run
 
 '''
 common:
@@ -121,18 +120,12 @@ class Explorer(object):
             multiprocessing.get_start_method()))
         self.log.debug(("Starting multiprocessing Pool for global "
                        "explorers run"))
-        with multiprocessing.Pool(self.jobs) as pool:
-            self.log.debug("Starting async for global explorer run")
-            results = [
-                pool.apply_async(self._run_global_explorer, (e, out_path,))
-                for e in self.list_global_explorer_names()
-            ]
-
-            self.log.debug("Waiting async results for global explorer runs")
-            for r in results:
-                r.get()  # self._run_global_explorer returns None
-            self.log.debug(("Multiprocessing run for global explorers "
-                           "finished"))
+        args = [
+            (e, out_path, ) for e in self.list_global_explorer_names()
+        ]
+        mp_pool_run(self._run_global_explorer, args, jobs=self.jobs)
+        self.log.debug(("Multiprocessing run for global explorers "
+                        "finished"))
 
     # logger is not pickable, so remove it when we pickle
     def __getstate__(self):
diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py
index 440aafa7..042b7103 100644
--- a/cdist/exec/remote.py
+++ b/cdist/exec/remote.py
@@ -31,6 +31,7 @@ import multiprocessing
 import cdist
 import cdist.exec.util as exec_util
 import cdist.util.ipaddr as ipaddr
+from cdist.mputil import mp_pool_run
 
 
 def _wrap_addr(addr):
@@ -152,25 +153,16 @@ class Remote(object):
             multiprocessing.get_start_method()))
         self.log.debug(("Starting multiprocessing Pool for parallel "
                         "remote transfer"))
-        with multiprocessing.Pool(jobs) as pool:
-            self.log.debug("Starting async for parallel transfer")
-            commands = []
-            for f in glob.glob1(source, '*'):
-                command = self._copy.split()
-                path = os.path.join(source, f)
-                command.extend([path, '{0}:{1}'.format(
-                    _wrap_addr(self.target_host[0]), destination)])
-                commands.append(command)
-            results = [
-                pool.apply_async(self._run_command, (cmd,))
-                for cmd in commands
-            ]
-
-            self.log.debug("Waiting async results for parallel transfer")
-            for r in results:
-                r.get()  # self._run_command returns None
-            self.log.debug(("Multiprocessing for parallel transfer "
-                            "finished"))
+        args = []
+        for f in glob.glob1(source, '*'):
+            command = self._copy.split()
+            path = os.path.join(source, f)
+            command.extend([path, '{0}:{1}'.format(
+                _wrap_addr(self.target_host[0]), destination)])
+            args.append((command, ))
+        mp_pool_run(self._run_command, args, jobs=jobs)
+        self.log.debug(("Multiprocessing for parallel transfer "
+                        "finished"))
 
     def run_script(self, script, env=None, return_output=False):
         """Run the given script with the given environment on the remote side.
diff --git a/cdist/mputil.py b/cdist/mputil.py
new file mode 100644
index 00000000..24289df2
--- /dev/null
+++ b/cdist/mputil.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+#
+# 2016 Darko Poljak (darko.poljak at gmail.com)
+#
+# This file is part of cdist.
+#
+# cdist is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# cdist is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with cdist. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+
+
+import multiprocessing
+import itertools
+
+
+def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()):
+    """ Run func using multiprocessing.Pool with jobs jobs and supplied
+        iterable of args and kwds with one entry for each parallel func
+        instance.
+        Return list of results.
+    """
+    if args and kwds:
+        fargs = zip(args, kdws)
+    elif args:
+        fargs = zip(args, itertools.repeat({}))
+    elif kwds:
+        fargs = zip(itertools.repeat(()), kwds)
+    else:
+        return [func(), ]
+
+    with multiprocessing.Pool(jobs) as pool:
+        results = [
+            pool.apply_async(func, a, k)
+            for a, k in fargs
+        ]
+        retval = [r.get() for r in results]
+    return retval

From 8d2d538660c4812e170bc63a622910cda5d4a582 Mon Sep 17 00:00:00 2001
From: Darko Poljak <darko.poljak@gmail.com>
Date: Sun, 11 Dec 2016 21:17:22 +0100
Subject: [PATCH 7/7] Fix typo.

---
 cdist/mputil.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/cdist/mputil.py b/cdist/mputil.py
index 24289df2..e564d749 100644
--- a/cdist/mputil.py
+++ b/cdist/mputil.py
@@ -31,7 +31,7 @@ def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()):
         Return list of results.
     """
     if args and kwds:
-        fargs = zip(args, kdws)
+        fargs = zip(args, kwds)
     elif args:
         fargs = zip(args, itertools.repeat({}))
     elif kwds: