Create mp_pool_run helper function for running in parallel.

This commit is contained in:
Darko Poljak 2016-12-08 21:48:59 +01:00
parent 6ea1809a30
commit e5a6599ccb
4 changed files with 79 additions and 60 deletions

View file

@ -28,6 +28,7 @@ import itertools
import tempfile import tempfile
import socket import socket
import multiprocessing import multiprocessing
from cdist.mputil import mp_pool_run
import cdist import cdist
import cdist.hostsource import cdist.hostsource
@ -332,20 +333,12 @@ class Config(object):
multiprocessing.get_start_method())) multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for {} parallel " self.log.debug(("Starting multiprocessing Pool for {} parallel "
"objects preparation".format(n))) "objects preparation".format(n)))
with multiprocessing.Pool(self.jobs) as pool: args = [
self.log.debug(("Starting async for parallel object " (c, ) for c in cargo
"preparation")) ]
results = [ mp_pool_run(self.object_prepare, args, jobs=self.jobs)
pool.apply_async(self.object_prepare, (c,)) self.log.debug(("Multiprocessing for parallel object "
for c in cargo "preparation finished"))
]
self.log.debug(("Waiting async results for parallel object "
"preparation"))
for r in results:
r.get()
self.log.debug(("Multiprocessing for parallel object "
"preparation finished"))
objects_changed = True objects_changed = True
del cargo[:] del cargo[:]
@ -376,19 +369,12 @@ class Config(object):
multiprocessing.get_start_method())) multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for {} parallel " self.log.debug(("Starting multiprocessing Pool for {} parallel "
"object run".format(n))) "object run".format(n)))
with multiprocessing.Pool(self.jobs) as pool: args = [
self.log.debug(("Starting async for parallel object run")) (c, ) for c in cargo
results = [ ]
pool.apply_async(self.object_run, (c,)) mp_pool_run(self.object_run, args, jobs=self.jobs)
for c in cargo self.log.debug(("Multiprocessing for parallel object "
] "run finished"))
self.log.debug(("Waiting async results for parallel object "
"run"))
for r in results:
r.get()
self.log.debug(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True objects_changed = True
return objects_changed return objects_changed

View file

@ -24,8 +24,7 @@ import logging
import os import os
import glob import glob
import multiprocessing import multiprocessing
from cdist.mputil import mp_pool_run
import cdist
''' '''
common: common:
@ -121,18 +120,12 @@ class Explorer(object):
multiprocessing.get_start_method())) multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for global " self.log.debug(("Starting multiprocessing Pool for global "
"explorers run")) "explorers run"))
with multiprocessing.Pool(self.jobs) as pool: args = [
self.log.debug("Starting async for global explorer run") (e, out_path, ) for e in self.list_global_explorer_names()
results = [ ]
pool.apply_async(self._run_global_explorer, (e, out_path,)) mp_pool_run(self._run_global_explorer, args, jobs=self.jobs)
for e in self.list_global_explorer_names() self.log.debug(("Multiprocessing run for global explorers "
] "finished"))
self.log.debug("Waiting async results for global explorer runs")
for r in results:
r.get() # self._run_global_explorer returns None
self.log.debug(("Multiprocessing run for global explorers "
"finished"))
# logger is not pickable, so remove it when we pickle # logger is not pickable, so remove it when we pickle
def __getstate__(self): def __getstate__(self):

View file

@ -31,6 +31,7 @@ import multiprocessing
import cdist import cdist
import cdist.exec.util as exec_util import cdist.exec.util as exec_util
import cdist.util.ipaddr as ipaddr import cdist.util.ipaddr as ipaddr
from cdist.mputil import mp_pool_run
def _wrap_addr(addr): def _wrap_addr(addr):
@ -152,25 +153,16 @@ class Remote(object):
multiprocessing.get_start_method())) multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for parallel " self.log.debug(("Starting multiprocessing Pool for parallel "
"remote transfer")) "remote transfer"))
with multiprocessing.Pool(jobs) as pool: args = []
self.log.debug("Starting async for parallel transfer") for f in glob.glob1(source, '*'):
commands = [] command = self._copy.split()
for f in glob.glob1(source, '*'): path = os.path.join(source, f)
command = self._copy.split() command.extend([path, '{0}:{1}'.format(
path = os.path.join(source, f) _wrap_addr(self.target_host[0]), destination)])
command.extend([path, '{0}:{1}'.format( args.append((command, ))
_wrap_addr(self.target_host[0]), destination)]) mp_pool_run(self._run_command, args, jobs=jobs)
commands.append(command) self.log.debug(("Multiprocessing for parallel transfer "
results = [ "finished"))
pool.apply_async(self._run_command, (cmd,))
for cmd in commands
]
self.log.debug("Waiting async results for parallel transfer")
for r in results:
r.get() # self._run_command returns None
self.log.debug(("Multiprocessing for parallel transfer "
"finished"))
def run_script(self, script, env=None, return_output=False): def run_script(self, script, env=None, return_output=False):
"""Run the given script with the given environment on the remote side. """Run the given script with the given environment on the remote side.

48
cdist/mputil.py Normal file
View file

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# 2016 Darko Poljak (darko.poljak at gmail.com)
#
# This file is part of cdist.
#
# cdist is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cdist is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cdist. If not, see <http://www.gnu.org/licenses/>.
#
#
import multiprocessing
import itertools
def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()):
""" Run func using multiprocessing.Pool with jobs jobs and supplied
iterable of args and kwds with one entry for each parallel func
instance.
Return list of results.
"""
if args and kwds:
fargs = zip(args, kdws)
elif args:
fargs = zip(args, itertools.repeat({}))
elif kwds:
fargs = zip(itertools.repeat(()), kwds)
else:
return [func(), ]
with multiprocessing.Pool(jobs) as pool:
results = [
pool.apply_async(func, a, k)
for a, k in fargs
]
retval = [r.get() for r in results]
return retval