concurrent.futures -> multiprocessing

This commit is contained in:
Darko Poljak 2016-12-08 17:36:57 +01:00
parent e6b9fc90ba
commit 8776a2ee06
3 changed files with 71 additions and 14 deletions

View file

@ -27,7 +27,7 @@ import time
import itertools import itertools
import tempfile import tempfile
import socket import socket
import concurrent.futures import multiprocessing
import cdist import cdist
import cdist.hostsource import cdist.hostsource
@ -47,7 +47,7 @@ class Config(object):
self.local = local self.local = local
self.remote = remote self.remote = remote
self.log = logging.getLogger(self.local.target_host[0]) self._open_logger()
self.dry_run = dry_run self.dry_run = dry_run
self.jobs = jobs self.jobs = jobs
@ -123,7 +123,6 @@ class Config(object):
@classmethod @classmethod
def commandline(cls, args): def commandline(cls, args):
"""Configure remote system""" """Configure remote system"""
import multiprocessing
# FIXME: Refactor relict - remove later # FIXME: Refactor relict - remove later
log = logging.getLogger("cdist") log = logging.getLogger("cdist")
@ -329,11 +328,24 @@ class Config(object):
self.object_prepare(cargo[0]) self.object_prepare(cargo[0])
objects_changed = True objects_changed = True
elif cargo: elif cargo:
self.log.debug("Preparing {} objects in parallel".format(n)) self.log.debug("Multiprocessing start method is {}".format(
with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: multiprocessing.get_start_method()))
for x in executor.map(self.object_prepare, cargo): self.log.debug(("Starting multiprocessing Pool for {} parallel "
pass # returns None "objects preparation".format(n)))
self.log.debug("Preparation finished") with multiprocessing.Pool(self.jobs) as pool:
self.log.debug(("Starting async for parallel object "
"preparation"))
results = [
pool.apply_async(self.object_prepare, (c,))
for c in cargo
]
self.log.debug(("Waiting async results for parallel object "
"preparation"))
for r in results:
r.get()
self.log.debug(("Multiprocessing for parallel object "
"preparation finished"))
objects_changed = True objects_changed = True
del cargo[:] del cargo[:]
@ -360,15 +372,42 @@ class Config(object):
self.object_run(cargo[0]) self.object_run(cargo[0])
objects_changed = True objects_changed = True
elif cargo: elif cargo:
self.log.debug("Running {} objects in parallel".format(n)) self.log.debug("Multiprocessing start method is {}".format(
with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: multiprocessing.get_start_method()))
for x in executor.map(self.object_run, cargo): self.log.debug(("Starting multiprocessing Pool for {} parallel "
pass # returns None "object run".format(n)))
self.log.debug("Running finished") with multiprocessing.Pool(self.jobs) as pool:
self.log.debug(("Starting async for parallel object run"))
results = [
pool.apply_async(self.object_run, (c,))
for c in cargo
]
self.log.debug(("Waiting async results for parallel object "
"run"))
for r in results:
r.get()
self.log.debug(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True objects_changed = True
return objects_changed return objects_changed
def _open_logger(self):
self.log = logging.getLogger(self.local.target_host[0])
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._open_logger()
def iterate_until_finished(self): def iterate_until_finished(self):
""" """
Go through all objects and solve them Go through all objects and solve them

View file

@ -79,6 +79,9 @@ class CdistType(object):
_instances = {} _instances = {}
def __getnewargs__(self):
return self.base_path, self.name
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
"""only one instance of each named type may exist""" """only one instance of each named type may exist"""
# name is second argument # name is second argument

View file

@ -98,7 +98,7 @@ class Manifest(object):
self.target_host = target_host self.target_host = target_host
self.local = local self.local = local
self.log = logging.getLogger(self.target_host[0]) self._open_logger()
self.env = { self.env = {
'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']), 'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']),
@ -114,6 +114,21 @@ class Manifest(object):
if self.log.getEffectiveLevel() == logging.DEBUG: if self.log.getEffectiveLevel() == logging.DEBUG:
self.env.update({'__cdist_debug': "yes"}) self.env.update({'__cdist_debug': "yes"})
def _open_logger(self):
self.log = logging.getLogger(self.target_host[0])
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._open_logger()
def env_initial_manifest(self, initial_manifest): def env_initial_manifest(self, initial_manifest):
env = os.environ.copy() env = os.environ.copy()
env.update(self.env) env.update(self.env)