diff --git a/cdist/config.py b/cdist/config.py index c25c029b..03a2e6ee 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -27,6 +27,8 @@ import time import itertools import tempfile import socket +import multiprocessing +from cdist.mputil import mp_pool_run import atexit import shutil @@ -48,7 +50,7 @@ class Config(object): self.local = local self.remote = remote - self.log = logging.getLogger(self.local.target_host[0]) + self._open_logger() self.dry_run = dry_run self.jobs = jobs @@ -123,7 +125,6 @@ class Config(object): @classmethod def commandline(cls, args): """Configure remote system""" - import multiprocessing # FIXME: Refactor relict - remove later log = logging.getLogger("cdist") @@ -278,6 +279,14 @@ class Config(object): Iterate over the objects once - helper method for iterate_until_finished """ + if self.jobs: + objects_changed = self._iterate_once_parallel() + else: + objects_changed = self._iterate_once_sequential() + return objects_changed + + def _iterate_once_sequential(self): + self.log.info("Iteration in sequential mode") objects_changed = False for cdist_object in self.object_list(): @@ -303,6 +312,95 @@ class Config(object): return objects_changed + def _iterate_once_parallel(self): + self.log.info("Iteration in parallel mode in {} jobs".format( + self.jobs)) + objects_changed = False + + cargo = [] + for cdist_object in self.object_list(): + if cdist_object.requirements_unfinished(cdist_object.requirements): + """We cannot do anything for this poor object""" + continue + + if cdist_object.state == core.CdistObject.STATE_UNDEF: + """Prepare the virgin object""" + + # self.object_prepare(cdist_object) + # objects_changed = True + cargo.append(cdist_object) + + n = len(cargo) + if n == 1: + self.log.debug("Only one object, preparing sequentially") + self.object_prepare(cargo[0]) + objects_changed = True + elif cargo: + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for {} parallel " + "objects preparation".format(n))) + args = [ + (c, ) for c in cargo + ] + mp_pool_run(self.object_prepare, args, jobs=self.jobs) + self.log.debug(("Multiprocessing for parallel object " + "preparation finished")) + objects_changed = True + + del cargo[:] + for cdist_object in self.object_list(): + if cdist_object.requirements_unfinished(cdist_object.requirements): + """We cannot do anything for this poor object""" + continue + + if cdist_object.state == core.CdistObject.STATE_PREPARED: + if cdist_object.requirements_unfinished( + cdist_object.autorequire): + """The previous step created objects we depend on - + wait for them + """ + continue + + # self.object_run(cdist_object) + # objects_changed = True + cargo.append(cdist_object) + + n = len(cargo) + if n == 1: + self.log.debug("Only one object, running sequentially") + self.object_run(cargo[0]) + objects_changed = True + elif cargo: + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for {} parallel " + "object run".format(n))) + args = [ + (c, ) for c in cargo + ] + mp_pool_run(self.object_run, args, jobs=self.jobs) + self.log.debug(("Multiprocessing for parallel object " + "run finished")) + objects_changed = True + + return objects_changed + + def _open_logger(self): + self.log = logging.getLogger(self.local.target_host[0]) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() + def iterate_until_finished(self): """ Go through all objects and solve them diff --git a/cdist/core/cdist_type.py b/cdist/core/cdist_type.py index a548f365..14865386 100644 --- a/cdist/core/cdist_type.py +++ b/cdist/core/cdist_type.py @@ -79,6 +79,9 @@ class CdistType(object): _instances = {} + def __getnewargs__(self): + return self.base_path, self.name + def __new__(cls, *args, **kwargs): """only one instance of each named type may exist""" # name is second argument diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index 23996240..45afc5c0 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -24,8 +24,7 @@ import logging import os import glob import multiprocessing - -import cdist +from cdist.mputil import mp_pool_run ''' common: @@ -121,18 +120,12 @@ class Explorer(object): multiprocessing.get_start_method())) self.log.debug(("Starting multiprocessing Pool for global " "explorers run")) - with multiprocessing.Pool(self.jobs) as pool: - self.log.debug("Starting async for global explorer run") - results = [ - pool.apply_async(self._run_global_explorer, (e, out_path,)) - for e in self.list_global_explorer_names() - ] - - self.log.debug("Waiting async results for global explorer runs") - for r in results: - r.get() # self._run_global_explorer returns None - self.log.debug(("Multiprocessing run for global explorers " - "finished")) + args = [ + (e, out_path, ) for e in self.list_global_explorer_names() + ] + mp_pool_run(self._run_global_explorer, args, jobs=self.jobs) + self.log.debug(("Multiprocessing run for global explorers " + "finished")) # logger is not pickable, so remove it when we pickle def __getstate__(self): diff --git a/cdist/core/manifest.py b/cdist/core/manifest.py index 574884a0..29f96c4f 100644 --- a/cdist/core/manifest.py +++ b/cdist/core/manifest.py @@ -98,7 +98,7 @@ class Manifest(object): self.target_host = target_host self.local = local - self.log = logging.getLogger(self.target_host[0]) + self._open_logger() self.env = { 'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']), @@ -114,6 +114,21 @@ class Manifest(object): if self.log.getEffectiveLevel() == logging.DEBUG: self.env.update({'__cdist_debug': "yes"}) + def _open_logger(self): + self.log = logging.getLogger(self.target_host[0]) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() + def env_initial_manifest(self, initial_manifest): env = os.environ.copy() env.update(self.env) diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py index 440aafa7..042b7103 100644 --- a/cdist/exec/remote.py +++ b/cdist/exec/remote.py @@ -31,6 +31,7 @@ import multiprocessing import cdist import cdist.exec.util as exec_util import cdist.util.ipaddr as ipaddr +from cdist.mputil import mp_pool_run def _wrap_addr(addr): @@ -152,25 +153,16 @@ class Remote(object): multiprocessing.get_start_method())) self.log.debug(("Starting multiprocessing Pool for parallel " "remote transfer")) - with multiprocessing.Pool(jobs) as pool: - self.log.debug("Starting async for parallel transfer") - commands = [] - for f in glob.glob1(source, '*'): - command = self._copy.split() - path = os.path.join(source, f) - command.extend([path, '{0}:{1}'.format( - _wrap_addr(self.target_host[0]), destination)]) - commands.append(command) - results = [ - pool.apply_async(self._run_command, (cmd,)) - for cmd in commands - ] - - self.log.debug("Waiting async results for parallel transfer") - for r in results: - r.get() # self._run_command returns None - self.log.debug(("Multiprocessing for parallel transfer " - "finished")) + args = [] + for f in glob.glob1(source, '*'): + command = self._copy.split() + path = os.path.join(source, f) + command.extend([path, '{0}:{1}'.format( + _wrap_addr(self.target_host[0]), destination)]) + args.append((command, )) + mp_pool_run(self._run_command, args, jobs=jobs) + self.log.debug(("Multiprocessing for parallel transfer " + "finished")) def run_script(self, script, env=None, return_output=False): """Run the given script with the given environment on the remote side. diff --git a/cdist/mputil.py b/cdist/mputil.py new file mode 100644 index 00000000..e564d749 --- /dev/null +++ b/cdist/mputil.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# +# 2016 Darko Poljak (darko.poljak at gmail.com) +# +# This file is part of cdist. +# +# cdist is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# cdist is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with cdist. If not, see . +# +# + + +import multiprocessing +import itertools + + +def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): + """ Run func using multiprocessing.Pool with jobs jobs and supplied + iterable of args and kwds with one entry for each parallel func + instance. + Return list of results. + """ + if args and kwds: + fargs = zip(args, kwds) + elif args: + fargs = zip(args, itertools.repeat({})) + elif kwds: + fargs = zip(itertools.repeat(()), kwds) + else: + return [func(), ] + + with multiprocessing.Pool(jobs) as pool: + results = [ + pool.apply_async(func, a, k) + for a, k in fargs + ] + retval = [r.get() for r in results] + return retval