diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index ac2a45ca..efffb6ed 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -69,7 +69,7 @@ class Explorer(object): def __init__(self, target_host, local, remote, jobs=None): self.target_host = target_host - self.log = logging.getLogger(target_host) + self._open_logger() self.local = local self.remote = remote @@ -80,6 +80,9 @@ class Explorer(object): self._type_explorers_transferred = [] self.jobs = jobs + def _open_logger(self): + self.log = logging.getLogger(self.target_host) + # global def list_global_explorer_names(self): @@ -112,24 +115,46 @@ class Explorer(object): def _run_global_explorers_parallel(self, out_path): self.log.info("Running global explorers in {} parallel jobs".format( self.jobs)) - self.log.info("Starting multiprocessing Pool") + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.info(("Starting multiprocessing Pool for global " + "explorers run")) with multiprocessing.Pool(self.jobs) as pool: - self.log.info("Starting async global explorer run") + self.log.info("Starting async for global explorer run") results = [ pool.apply_async(self._run_global_explorer, (e, out_path,)) for e in self.list_global_explorer_names() ] - self.log.info("Waiting async global explorer run results") + + self.log.info("Waiting async results for global explorer runs") for r in results: - r.get() - self.log.info("Async global explorer run finished") - self.log.info("Multiprocessing Pool finished") + r.get() # self._run_global_explorer returns None + self.log.info(("Multiprocessing run for global explorers " + "finished")) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() def transfer_global_explorers(self): """Transfer the global explorers to the remote side.""" self.remote.mkdir(self.remote.global_explorer_path) - self.remote.transfer(self.local.global_explorer_path, - self.remote.global_explorer_path) + if self.jobs is None: + self.remote.transfer(self.local.global_explorer_path, + self.remote.global_explorer_path) + else: + self.remote.transfer_dir_parallel( + self.local.global_explorer_path, + self.remote.global_explorer_path, + self.jobs) self.remote.run(["chmod", "0700", "%s/*" % (self.remote.global_explorer_path)]) diff --git a/cdist/exec/local.py b/cdist/exec/local.py index 4fdb5170..34826bfb 100644 --- a/cdist/exec/local.py +++ b/cdist/exec/local.py @@ -85,6 +85,18 @@ class Local(object): def _init_log(self): self.log = logging.getLogger(self.target_host) + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._init_log() + def _init_permissions(self): # Setup file permissions using umask os.umask(0o077) diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py index b6ff8c1f..389c0da1 100644 --- a/cdist/exec/remote.py +++ b/cdist/exec/remote.py @@ -26,9 +26,10 @@ import sys import glob import subprocess import logging -import cdist.exec.util as exec_util +import multiprocessing import cdist +import cdist.exec.util as exec_util class DecodeError(cdist.Error): @@ -66,10 +67,25 @@ class Remote(object): self.type_path = os.path.join(self.conf_path, "type") self.global_explorer_path = os.path.join(self.conf_path, "explorer") - self.log = logging.getLogger(self.target_host) + self._open_logger() self._init_env() + def _open_logger(self): + self.log = logging.getLogger(self.target_host) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() + def _init_env(self): """Setup environment for scripts - HERE????""" # FIXME: better do so in exec functions that require it! @@ -110,6 +126,40 @@ class Remote(object): self.target_host, destination)]) self._run_command(command) + def transfer_dir_parallel(self, source, destination, jobs): + """Transfer a directory to the remote side in parallel mode.""" + self.log.debug("Remote transfer: %s -> %s", source, destination) + self.rmdir(destination) + if os.path.isdir(source): + self.mkdir(destination) + self.log.info("Remote transfer in {} parallel jobs".format( + jobs)) + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.info(("Starting multiprocessing Pool for parallel " + "remote transfer")) + with multiprocessing.Pool(jobs) as pool: + self.log.info("Starting async for parallel transfer") + commands = [] + for f in glob.glob1(source, '*'): + command = self._copy.split() + path = os.path.join(source, f) + command.extend([path, '{0}:{1}'.format( + self.target_host, destination)]) + commands.append(command) + results = [ + pool.apply_async(self._run_command, (cmd,)) + for cmd in commands + ] + + self.log.info("Waiting async results for parallel transfer") + for r in results: + r.get() # self._run_command returns None + self.log.info(("Multiprocessing for parallel transfer " + "finished")) + else: + raise cdist.Error("Source {} is not a directory".format(source)) + def run_script(self, script, env=None, return_output=False): """Run the given script with the given environment on the remote side. Return the output as a string. diff --git a/cdist/test/exec/remote.py b/cdist/test/exec/remote.py index 121ec4ac..f498c285 100644 --- a/cdist/test/exec/remote.py +++ b/cdist/test/exec/remote.py @@ -24,6 +24,7 @@ import getpass import shutil import string import random +import multiprocessing import cdist from cdist import test @@ -121,6 +122,22 @@ class RemoteTestCase(test.CdistTestCase): # test if the payload file is in the target directory self.assertTrue(os.path.isfile(os.path.join(target, source_file_name))) + def test_transfer_dir_parallel(self): + source = self.mkdtemp(dir=self.temp_dir) + # put 8 files in the directory as payload + filenames = [] + for x in range(8): + handle, source_file = self.mkstemp(dir=source) + os.close(handle) + source_file_name = os.path.split(source_file)[-1] + filenames.append(source_file_name) + target = self.mkdtemp(dir=self.temp_dir) + self.remote.transfer_dir_parallel(source, target, + multiprocessing.cpu_count()) + # test if the payload files are in the target directory + for filename in filenames: + self.assertTrue(os.path.isfile(os.path.join(target, filename))) + def test_create_files_dirs(self): self.remote.create_files_dirs() self.assertTrue(os.path.isdir(self.remote.base_path)) diff --git a/cdist/test/explorer/__init__.py b/cdist/test/explorer/__init__.py index 27820292..8ac9975e 100644 --- a/cdist/test/explorer/__init__.py +++ b/cdist/test/explorer/__init__.py @@ -179,6 +179,20 @@ class ExplorerClassTestCase(test.CdistTestCase): jobs=8) self.assertEqual(expl.jobs, 8) + def test_transfer_global_explorers_parallel(self): + expl = explorer.Explorer( + self.target_host, + self.local, + self.remote, + jobs=multiprocessing.cpu_count()) + self.assertIsNotNone(expl.jobs) + + expl.transfer_global_explorers() + source = self.local.global_explorer_path + destination = self.remote.global_explorer_path + self.assertEqual(sorted(os.listdir(source)), + sorted(os.listdir(destination))) + def test_run_parallel_jobs(self): expl = explorer.Explorer( self.target_host, diff --git a/docs/changelog b/docs/changelog index 5dd49ef5..13cde84b 100644 --- a/docs/changelog +++ b/docs/changelog @@ -1,8 +1,8 @@ Changelog --------- next: - * New type __filesystem: manage filesystems on devices ( Daniel Heule ) - + * Core: Add -j, --jobs option for parallel execution and add parallel support for global explorers (Darko Poljak) + * New type __filesystem: manage filesystems on devices (Daniel Heule) * New type: __locale_system (Steven Armstrong, Carlos Ortigoza, Nico Schottelius) * New type: __sysctl (Steven Armstrong)