Merge pull request #507 from darko-poljak/the-good-the-bad-and-the-ugly

The good the bad and the ugly - object prepare/run parallelization
This commit is contained in:
Darko Poljak 2017-06-13 22:45:09 +02:00 committed by GitHub
commit e774fc2b2a
6 changed files with 185 additions and 36 deletions

View file

@ -27,6 +27,8 @@ import time
import itertools import itertools
import tempfile import tempfile
import socket import socket
import multiprocessing
from cdist.mputil import mp_pool_run
import atexit import atexit
import shutil import shutil
@ -48,7 +50,7 @@ class Config(object):
self.local = local self.local = local
self.remote = remote self.remote = remote
self.log = logging.getLogger(self.local.target_host[0]) self._open_logger()
self.dry_run = dry_run self.dry_run = dry_run
self.jobs = jobs self.jobs = jobs
@ -123,7 +125,6 @@ class Config(object):
@classmethod @classmethod
def commandline(cls, args): def commandline(cls, args):
"""Configure remote system""" """Configure remote system"""
import multiprocessing
# FIXME: Refactor relict - remove later # FIXME: Refactor relict - remove later
log = logging.getLogger("cdist") log = logging.getLogger("cdist")
@ -278,6 +279,14 @@ class Config(object):
Iterate over the objects once - helper method for Iterate over the objects once - helper method for
iterate_until_finished iterate_until_finished
""" """
if self.jobs:
objects_changed = self._iterate_once_parallel()
else:
objects_changed = self._iterate_once_sequential()
return objects_changed
def _iterate_once_sequential(self):
self.log.info("Iteration in sequential mode")
objects_changed = False objects_changed = False
for cdist_object in self.object_list(): for cdist_object in self.object_list():
@ -303,6 +312,95 @@ class Config(object):
return objects_changed return objects_changed
def _iterate_once_parallel(self):
self.log.info("Iteration in parallel mode in {} jobs".format(
self.jobs))
objects_changed = False
cargo = []
for cdist_object in self.object_list():
if cdist_object.requirements_unfinished(cdist_object.requirements):
"""We cannot do anything for this poor object"""
continue
if cdist_object.state == core.CdistObject.STATE_UNDEF:
"""Prepare the virgin object"""
# self.object_prepare(cdist_object)
# objects_changed = True
cargo.append(cdist_object)
n = len(cargo)
if n == 1:
self.log.debug("Only one object, preparing sequentially")
self.object_prepare(cargo[0])
objects_changed = True
elif cargo:
self.log.debug("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for {} parallel "
"objects preparation".format(n)))
args = [
(c, ) for c in cargo
]
mp_pool_run(self.object_prepare, args, jobs=self.jobs)
self.log.debug(("Multiprocessing for parallel object "
"preparation finished"))
objects_changed = True
del cargo[:]
for cdist_object in self.object_list():
if cdist_object.requirements_unfinished(cdist_object.requirements):
"""We cannot do anything for this poor object"""
continue
if cdist_object.state == core.CdistObject.STATE_PREPARED:
if cdist_object.requirements_unfinished(
cdist_object.autorequire):
"""The previous step created objects we depend on -
wait for them
"""
continue
# self.object_run(cdist_object)
# objects_changed = True
cargo.append(cdist_object)
n = len(cargo)
if n == 1:
self.log.debug("Only one object, running sequentially")
self.object_run(cargo[0])
objects_changed = True
elif cargo:
self.log.debug("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for {} parallel "
"object run".format(n)))
args = [
(c, ) for c in cargo
]
mp_pool_run(self.object_run, args, jobs=self.jobs)
self.log.debug(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True
return objects_changed
def _open_logger(self):
self.log = logging.getLogger(self.local.target_host[0])
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._open_logger()
def iterate_until_finished(self): def iterate_until_finished(self):
""" """
Go through all objects and solve them Go through all objects and solve them

View file

@ -79,6 +79,9 @@ class CdistType(object):
_instances = {} _instances = {}
def __getnewargs__(self):
return self.base_path, self.name
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
"""only one instance of each named type may exist""" """only one instance of each named type may exist"""
# name is second argument # name is second argument

View file

@ -24,8 +24,7 @@ import logging
import os import os
import glob import glob
import multiprocessing import multiprocessing
from cdist.mputil import mp_pool_run
import cdist
''' '''
common: common:
@ -121,16 +120,10 @@ class Explorer(object):
multiprocessing.get_start_method())) multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for global " self.log.debug(("Starting multiprocessing Pool for global "
"explorers run")) "explorers run"))
with multiprocessing.Pool(self.jobs) as pool: args = [
self.log.debug("Starting async for global explorer run") (e, out_path, ) for e in self.list_global_explorer_names()
results = [
pool.apply_async(self._run_global_explorer, (e, out_path,))
for e in self.list_global_explorer_names()
] ]
mp_pool_run(self._run_global_explorer, args, jobs=self.jobs)
self.log.debug("Waiting async results for global explorer runs")
for r in results:
r.get() # self._run_global_explorer returns None
self.log.debug(("Multiprocessing run for global explorers " self.log.debug(("Multiprocessing run for global explorers "
"finished")) "finished"))

View file

@ -98,7 +98,7 @@ class Manifest(object):
self.target_host = target_host self.target_host = target_host
self.local = local self.local = local
self.log = logging.getLogger(self.target_host[0]) self._open_logger()
self.env = { self.env = {
'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']), 'PATH': "%s:%s" % (self.local.bin_path, os.environ['PATH']),
@ -114,6 +114,21 @@ class Manifest(object):
if self.log.getEffectiveLevel() == logging.DEBUG: if self.log.getEffectiveLevel() == logging.DEBUG:
self.env.update({'__cdist_debug': "yes"}) self.env.update({'__cdist_debug': "yes"})
def _open_logger(self):
self.log = logging.getLogger(self.target_host[0])
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._open_logger()
def env_initial_manifest(self, initial_manifest): def env_initial_manifest(self, initial_manifest):
env = os.environ.copy() env = os.environ.copy()
env.update(self.env) env.update(self.env)

View file

@ -31,6 +31,7 @@ import multiprocessing
import cdist import cdist
import cdist.exec.util as exec_util import cdist.exec.util as exec_util
import cdist.util.ipaddr as ipaddr import cdist.util.ipaddr as ipaddr
from cdist.mputil import mp_pool_run
def _wrap_addr(addr): def _wrap_addr(addr):
@ -152,23 +153,14 @@ class Remote(object):
multiprocessing.get_start_method())) multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for parallel " self.log.debug(("Starting multiprocessing Pool for parallel "
"remote transfer")) "remote transfer"))
with multiprocessing.Pool(jobs) as pool: args = []
self.log.debug("Starting async for parallel transfer")
commands = []
for f in glob.glob1(source, '*'): for f in glob.glob1(source, '*'):
command = self._copy.split() command = self._copy.split()
path = os.path.join(source, f) path = os.path.join(source, f)
command.extend([path, '{0}:{1}'.format( command.extend([path, '{0}:{1}'.format(
_wrap_addr(self.target_host[0]), destination)]) _wrap_addr(self.target_host[0]), destination)])
commands.append(command) args.append((command, ))
results = [ mp_pool_run(self._run_command, args, jobs=jobs)
pool.apply_async(self._run_command, (cmd,))
for cmd in commands
]
self.log.debug("Waiting async results for parallel transfer")
for r in results:
r.get() # self._run_command returns None
self.log.debug(("Multiprocessing for parallel transfer " self.log.debug(("Multiprocessing for parallel transfer "
"finished")) "finished"))

48
cdist/mputil.py Normal file
View file

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# 2016 Darko Poljak (darko.poljak at gmail.com)
#
# This file is part of cdist.
#
# cdist is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cdist is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cdist. If not, see <http://www.gnu.org/licenses/>.
#
#
import multiprocessing
import itertools
def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()):
""" Run func using multiprocessing.Pool with jobs jobs and supplied
iterable of args and kwds with one entry for each parallel func
instance.
Return list of results.
"""
if args and kwds:
fargs = zip(args, kwds)
elif args:
fargs = zip(args, itertools.repeat({}))
elif kwds:
fargs = zip(itertools.repeat(()), kwds)
else:
return [func(), ]
with multiprocessing.Pool(jobs) as pool:
results = [
pool.apply_async(func, a, k)
for a, k in fargs
]
retval = [r.get() for r in results]
return retval