diff --git a/cdist/config.py b/cdist/config.py index ae63531d..a47811a9 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -28,6 +28,7 @@ import itertools import tempfile import socket import multiprocessing +from cdist.mputil import mp_pool_run import cdist import cdist.hostsource @@ -332,20 +333,12 @@ class Config(object): multiprocessing.get_start_method())) self.log.debug(("Starting multiprocessing Pool for {} parallel " "objects preparation".format(n))) - with multiprocessing.Pool(self.jobs) as pool: - self.log.debug(("Starting async for parallel object " - "preparation")) - results = [ - pool.apply_async(self.object_prepare, (c,)) - for c in cargo - ] - - self.log.debug(("Waiting async results for parallel object " - "preparation")) - for r in results: - r.get() - self.log.debug(("Multiprocessing for parallel object " - "preparation finished")) + args = [ + (c, ) for c in cargo + ] + mp_pool_run(self.object_prepare, args, jobs=self.jobs) + self.log.debug(("Multiprocessing for parallel object " + "preparation finished")) objects_changed = True del cargo[:] @@ -376,19 +369,12 @@ class Config(object): multiprocessing.get_start_method())) self.log.debug(("Starting multiprocessing Pool for {} parallel " "object run".format(n))) - with multiprocessing.Pool(self.jobs) as pool: - self.log.debug(("Starting async for parallel object run")) - results = [ - pool.apply_async(self.object_run, (c,)) - for c in cargo - ] - - self.log.debug(("Waiting async results for parallel object " - "run")) - for r in results: - r.get() - self.log.debug(("Multiprocessing for parallel object " - "run finished")) + args = [ + (c, ) for c in cargo + ] + mp_pool_run(self.object_run, args, jobs=self.jobs) + self.log.debug(("Multiprocessing for parallel object " + "run finished")) objects_changed = True return objects_changed diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index 23996240..45afc5c0 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -24,8 +24,7 @@ import logging import os import glob import multiprocessing - -import cdist +from cdist.mputil import mp_pool_run ''' common: @@ -121,18 +120,12 @@ class Explorer(object): multiprocessing.get_start_method())) self.log.debug(("Starting multiprocessing Pool for global " "explorers run")) - with multiprocessing.Pool(self.jobs) as pool: - self.log.debug("Starting async for global explorer run") - results = [ - pool.apply_async(self._run_global_explorer, (e, out_path,)) - for e in self.list_global_explorer_names() - ] - - self.log.debug("Waiting async results for global explorer runs") - for r in results: - r.get() # self._run_global_explorer returns None - self.log.debug(("Multiprocessing run for global explorers " - "finished")) + args = [ + (e, out_path, ) for e in self.list_global_explorer_names() + ] + mp_pool_run(self._run_global_explorer, args, jobs=self.jobs) + self.log.debug(("Multiprocessing run for global explorers " + "finished")) # logger is not pickable, so remove it when we pickle def __getstate__(self): diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py index 440aafa7..042b7103 100644 --- a/cdist/exec/remote.py +++ b/cdist/exec/remote.py @@ -31,6 +31,7 @@ import multiprocessing import cdist import cdist.exec.util as exec_util import cdist.util.ipaddr as ipaddr +from cdist.mputil import mp_pool_run def _wrap_addr(addr): @@ -152,25 +153,16 @@ class Remote(object): multiprocessing.get_start_method())) self.log.debug(("Starting multiprocessing Pool for parallel " "remote transfer")) - with multiprocessing.Pool(jobs) as pool: - self.log.debug("Starting async for parallel transfer") - commands = [] - for f in glob.glob1(source, '*'): - command = self._copy.split() - path = os.path.join(source, f) - command.extend([path, '{0}:{1}'.format( - _wrap_addr(self.target_host[0]), destination)]) - commands.append(command) - results = [ - pool.apply_async(self._run_command, (cmd,)) - for cmd in commands - ] - - self.log.debug("Waiting async results for parallel transfer") - for r in results: - r.get() # self._run_command returns None - self.log.debug(("Multiprocessing for parallel transfer " - "finished")) + args = [] + for f in glob.glob1(source, '*'): + command = self._copy.split() + path = os.path.join(source, f) + command.extend([path, '{0}:{1}'.format( + _wrap_addr(self.target_host[0]), destination)]) + args.append((command, )) + mp_pool_run(self._run_command, args, jobs=jobs) + self.log.debug(("Multiprocessing for parallel transfer " + "finished")) def run_script(self, script, env=None, return_output=False): """Run the given script with the given environment on the remote side. diff --git a/cdist/mputil.py b/cdist/mputil.py new file mode 100644 index 00000000..24289df2 --- /dev/null +++ b/cdist/mputil.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# +# 2016 Darko Poljak (darko.poljak at gmail.com) +# +# This file is part of cdist. +# +# cdist is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# cdist is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with cdist. If not, see . +# +# + + +import multiprocessing +import itertools + + +def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): + """ Run func using multiprocessing.Pool with jobs jobs and supplied + iterable of args and kwds with one entry for each parallel func + instance. + Return list of results. + """ + if args and kwds: + fargs = zip(args, kdws) + elif args: + fargs = zip(args, itertools.repeat({})) + elif kwds: + fargs = zip(itertools.repeat(()), kwds) + else: + return [func(), ] + + with multiprocessing.Pool(jobs) as pool: + results = [ + pool.apply_async(func, a, k) + for a, k in fargs + ] + retval = [r.get() for r in results] + return retval