From 8776a2ee067e6254fd87290cef21bf5f2b669edb Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Thu, 8 Dec 2016 17:36:57 +0100 Subject: [PATCH] concurrent.futures -> multiprocessing --- cdist/config.py | 65 ++++++++++++++++++++++++++++++++-------- cdist/core/cdist_type.py | 3 ++ cdist/core/manifest.py | 17 ++++++++++- 3 files changed, 71 insertions(+), 14 deletions(-) diff --git a/cdist/config.py b/cdist/config.py index 111fa233..ae63531d 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -27,7 +27,7 @@ import time import itertools import tempfile import socket -import concurrent.futures +import multiprocessing import cdist import cdist.hostsource @@ -47,7 +47,7 @@ class Config(object): self.local = local self.remote = remote - self.log = logging.getLogger(self.local.target_host[0]) + self._open_logger() self.dry_run = dry_run self.jobs = jobs @@ -123,7 +123,6 @@ class Config(object): @classmethod def commandline(cls, args): """Configure remote system""" - import multiprocessing # FIXME: Refactor relict - remove later log = logging.getLogger("cdist") @@ -329,11 +328,24 @@ class Config(object): self.object_prepare(cargo[0]) objects_changed = True elif cargo: - self.log.debug("Preparing {} objects in parallel".format(n)) - with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: - for x in executor.map(self.object_prepare, cargo): - pass # returns None - self.log.debug("Preparation finished") + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for {} parallel " + "objects preparation".format(n))) + with multiprocessing.Pool(self.jobs) as pool: + self.log.debug(("Starting async for parallel object " + "preparation")) + results = [ + pool.apply_async(self.object_prepare, (c,)) + for c in cargo + ] + + self.log.debug(("Waiting async results for parallel object " + "preparation")) + for r in results: + r.get() + self.log.debug(("Multiprocessing for parallel object " + "preparation finished")) objects_changed = True del cargo[:] @@ -360,15 +372,42 @@ class Config(object): self.object_run(cargo[0]) objects_changed = True elif cargo: - self.log.debug("Running {} objects in parallel".format(n)) - with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: - for x in executor.map(self.object_run, cargo): - pass # returns None - self.log.debug("Running finished") + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for {} parallel " + "object run".format(n))) + with multiprocessing.Pool(self.jobs) as pool: + self.log.debug(("Starting async for parallel object run")) + results = [ + pool.apply_async(self.object_run, (c,)) + for c in cargo + ] + + self.log.debug(("Waiting async results for parallel object " + "run")) + for r in results: + r.get() + self.log.debug(("Multiprocessing for parallel object " + "run finished")) objects_changed = True return objects_changed + def _open_logger(self): + self.log = logging.getLogger(self.local.target_host[0]) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() + def iterate_until_finished(self): """ Go through all objects and solve them diff --git a/cdist/core/cdist_type.py b/cdist/core/cdist_type.py index a548f365..14865386 100644 --- a/cdist/core/cdist_type.py +++ b/cdist/core/cdist_type.py @@ -79,6 +79,9 @@ class CdistType(object): _instances = {} + def __getnewargs__(self): + return self.base_path, self.name + def __new__(cls, *args, **kwargs): """only one instance of each named type may exist""" # name is second argument diff --git a/cdist/core/manifest.py b/cdist/core/manifest.py index a16e9346..92a16190 100644 --- a/cdist/core/manifest.py +++ b/cdist/core/manifest.py @@ -98,7 +98,7 @@ class Manifest(object): self.target_host = target_host self.local = local - self.log = logging.getLogger(self.target_host[0]) + self._open_logger() self.env = { 'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']), @@ -114,6 +114,21 @@ class Manifest(object): if self.log.getEffectiveLevel() == logging.DEBUG: self.env.update({'__cdist_debug': "yes"}) + def _open_logger(self): + self.log = logging.getLogger(self.target_host[0]) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() + def env_initial_manifest(self, initial_manifest): env = os.environ.copy() env.update(self.env)