From a4c49201c0d6f4d821fcb21798fa28a2885262b4 Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Thu, 11 Aug 2016 23:54:31 +0200 Subject: [PATCH] Add jobs option for parallel execution, global explorers first. --- cdist/config.py | 7 +++--- cdist/core/explorer.py | 38 ++++++++++++++++++++++++++++----- cdist/test/explorer/__init__.py | 32 +++++++++++++++++++++++++++ scripts/cdist | 20 +++++++++++++++++ 4 files changed, 89 insertions(+), 8 deletions(-) diff --git a/cdist/config.py b/cdist/config.py index da560e91..34d647e1 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -70,15 +70,16 @@ def inspect_ssh_mux_opts(): class Config(object): """Cdist main class to hold arbitrary data""" - def __init__(self, local, remote, dry_run=False): + def __init__(self, local, remote, dry_run=False, jobs=None): self.local = local self.remote = remote self.log = logging.getLogger(self.local.target_host) self.dry_run = dry_run + self.jobs = jobs self.explorer = core.Explorer(self.local.target_host, self.local, - self.remote) + self.remote, jobs=self.jobs) self.manifest = core.Manifest(self.local.target_host, self.local) self.code = core.Code(self.local.target_host, self.local, self.remote) @@ -241,7 +242,7 @@ class Config(object): remote_exec=remote_exec, remote_copy=remote_copy) - c = cls(local, remote, dry_run=args.dry_run) + c = cls(local, remote, dry_run=args.dry_run, jobs=args.jobs) c.run() except cdist.Error as e: diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index 345f45ff..ac2a45ca 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -23,6 +23,7 @@ import logging import os import glob +import multiprocessing import cdist @@ -65,7 +66,7 @@ class Explorer(object): """Executes cdist explorers. """ - def __init__(self, target_host, local, remote): + def __init__(self, target_host, local, remote, jobs=None): self.target_host = target_host self.log = logging.getLogger(target_host) @@ -77,6 +78,7 @@ class Explorer(object): '__explorer': self.remote.global_explorer_path, } self._type_explorers_transferred = [] + self.jobs = jobs # global @@ -91,11 +93,37 @@ class Explorer(object): """ self.log.info("Running global explorers") self.transfer_global_explorers() + if self.jobs is None: + self._run_global_explorers_seq(out_path) + else: + self._run_global_explorers_parallel(out_path) + + def _run_global_explorer(self, explorer, out_path): + output = self.run_global_explorer(explorer) + path = os.path.join(out_path, explorer) + with open(path, 'w') as fd: + fd.write(output) + + def _run_global_explorers_seq(self, out_path): + self.log.info("Running global explorers sequentially") for explorer in self.list_global_explorer_names(): - output = self.run_global_explorer(explorer) - path = os.path.join(out_path, explorer) - with open(path, 'w') as fd: - fd.write(output) + self._run_global_explorer(explorer, out_path) + + def _run_global_explorers_parallel(self, out_path): + self.log.info("Running global explorers in {} parallel jobs".format( + self.jobs)) + self.log.info("Starting multiprocessing Pool") + with multiprocessing.Pool(self.jobs) as pool: + self.log.info("Starting async global explorer run") + results = [ + pool.apply_async(self._run_global_explorer, (e, out_path,)) + for e in self.list_global_explorer_names() + ] + self.log.info("Waiting async global explorer run results") + for r in results: + r.get() + self.log.info("Async global explorer run finished") + self.log.info("Multiprocessing Pool finished") def transfer_global_explorers(self): """Transfer the global explorers to the remote side.""" diff --git a/cdist/test/explorer/__init__.py b/cdist/test/explorer/__init__.py index 9a4555b8..27820292 100644 --- a/cdist/test/explorer/__init__.py +++ b/cdist/test/explorer/__init__.py @@ -23,6 +23,7 @@ import os import shutil import getpass +import multiprocessing import cdist from cdist import core @@ -168,3 +169,34 @@ class ExplorerClassTestCase(test.CdistTestCase): cdist_object.create() self.explorer.run_type_explorers(cdist_object) self.assertEqual(cdist_object.explorers, {'world': 'hello'}) + + def test_jobs_parameter(self): + self.assertIsNone(self.explorer.jobs) + expl = explorer.Explorer( + self.target_host, + self.local, + self.remote, + jobs=8) + self.assertEqual(expl.jobs, 8) + + def test_run_parallel_jobs(self): + expl = explorer.Explorer( + self.target_host, + self.local, + self.remote, + jobs=multiprocessing.cpu_count()) + self.assertIsNotNone(expl.jobs) + out_path = self.mkdtemp() + + expl.run_global_explorers(out_path) + names = sorted(expl.list_global_explorer_names()) + output = sorted(os.listdir(out_path)) + + self.assertEqual(names, output) + shutil.rmtree(out_path) + + +if __name__ == '__main__': + import unittest + + unittest.main() diff --git a/scripts/cdist b/scripts/cdist index 953cad78..b7e3a805 100755 --- a/scripts/cdist +++ b/scripts/cdist @@ -22,6 +22,20 @@ # +def check_positive_int(value): + import argparse + + try: + val = int(value) + except ValueError as e: + raise argparse.ArgumentTypeError( + "{} is invalid int value".format(value)) + if val <= 0: + raise argparse.ArgumentTypeError( + "{} is invalid positive int value".format(val)) + return val + + def commandline(): """Parse command line""" import argparse @@ -31,6 +45,7 @@ def commandline(): import cdist.shell import shutil import os + import multiprocessing # Construct parser others can reuse parser = {} @@ -105,6 +120,11 @@ def commandline(): '(should behave like ssh)'), action='store', dest='remote_exec', default=os.environ.get('CDIST_REMOTE_EXEC')) + parser['config'].add_argument( + '-j', '--jobs', nargs='?', type=check_positive_int, + help='Specify the maximum number of parallel jobs', + action='store', dest='jobs', + const=multiprocessing.cpu_count()) parser['config'].set_defaults(func=cdist.config.Config.commandline) # Shell