diff --git a/cdist/__init__.py b/cdist/__init__.py index 74db1a13..9068ae69 100644 --- a/cdist/__init__.py +++ b/cdist/__init__.py @@ -56,6 +56,21 @@ class UnresolvableRequirementsError(cdist.Error): pass +class CdistBetaRequired(cdist.Error): + """Beta functionality is used but beta is not enabled""" + + def __init__(self, command, arg): + self.command = command + self.arg = arg + + def __str__(self): + err_msg = ("\'{}\' argument of \'{}\' command is beta, but beta is " + "not enabled. If you want to use it please enable beta " + "functionalities by using the -b/--enable-beta command " + "line flag.") + return err_msg.format(self.arg, self.command) + + class CdistObjectError(Error): """Something went wrong with an object""" diff --git a/cdist/config.py b/cdist/config.py index 17fb33ca..9d4c5c10 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -71,15 +71,16 @@ def inspect_ssh_mux_opts(): class Config(object): """Cdist main class to hold arbitrary data""" - def __init__(self, local, remote, dry_run=False): + def __init__(self, local, remote, dry_run=False, jobs=None): self.local = local self.remote = remote self.log = logging.getLogger(self.local.target_host[0]) self.dry_run = dry_run + self.jobs = jobs self.explorer = core.Explorer(self.local.target_host, self.local, - self.remote) + self.remote, jobs=self.jobs) self.manifest = core.Manifest(self.local.target_host, self.local) self.code = core.Code(self.local.target_host, self.local, self.remote) @@ -119,6 +120,7 @@ class Config(object): if args.manifest == '-' and args.hostfile == '-': raise cdist.Error(("Cannot read both, manifest and host file, " "from stdin")) + # if no host source is specified then read hosts from stdin if not (args.hostfile or args.host): args.hostfile = '-' @@ -266,7 +268,7 @@ class Config(object): remote_exec=remote_exec, remote_copy=remote_copy) - c = cls(local, remote, dry_run=args.dry_run) + c = cls(local, remote, dry_run=args.dry_run, jobs=args.jobs) c.run() except cdist.Error as e: diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index faf1d23b..ef85431c 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -23,6 +23,7 @@ import logging import os import glob +import multiprocessing import cdist @@ -65,10 +66,10 @@ class Explorer(object): """Executes cdist explorers. """ - def __init__(self, target_host, local, remote): + def __init__(self, target_host, local, remote, jobs=None): self.target_host = target_host - self.log = logging.getLogger(target_host[0]) + self._open_logger() self.local = local self.remote = remote @@ -79,6 +80,10 @@ class Explorer(object): '__explorer': self.remote.global_explorer_path, } self._type_explorers_transferred = [] + self.jobs = jobs + + def _open_logger(self): + self.log = logging.getLogger(self.target_host[0]) # global @@ -93,17 +98,65 @@ class Explorer(object): """ self.log.info("Running global explorers") self.transfer_global_explorers() + if self.jobs is None: + self._run_global_explorers_seq(out_path) + else: + self._run_global_explorers_parallel(out_path) + + def _run_global_explorer(self, explorer, out_path): + output = self.run_global_explorer(explorer) + path = os.path.join(out_path, explorer) + with open(path, 'w') as fd: + fd.write(output) + + def _run_global_explorers_seq(self, out_path): + self.log.info("Running global explorers sequentially") for explorer in self.list_global_explorer_names(): - output = self.run_global_explorer(explorer) - path = os.path.join(out_path, explorer) - with open(path, 'w') as fd: - fd.write(output) + self._run_global_explorer(explorer, out_path) + + def _run_global_explorers_parallel(self, out_path): + self.log.info("Running global explorers in {} parallel jobs".format( + self.jobs)) + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for global " + "explorers run")) + with multiprocessing.Pool(self.jobs) as pool: + self.log.debug("Starting async for global explorer run") + results = [ + pool.apply_async(self._run_global_explorer, (e, out_path,)) + for e in self.list_global_explorer_names() + ] + + self.log.debug("Waiting async results for global explorer runs") + for r in results: + r.get() # self._run_global_explorer returns None + self.log.debug(("Multiprocessing run for global explorers " + "finished")) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() def transfer_global_explorers(self): """Transfer the global explorers to the remote side.""" self.remote.mkdir(self.remote.global_explorer_path) - self.remote.transfer(self.local.global_explorer_path, - self.remote.global_explorer_path) + if self.jobs is None: + self.remote.transfer(self.local.global_explorer_path, + self.remote.global_explorer_path) + else: + self.remote.transfer_dir_parallel( + self.local.global_explorer_path, + self.remote.global_explorer_path, + self.jobs) self.remote.run(["chmod", "0700", "%s/*" % (self.remote.global_explorer_path)]) diff --git a/cdist/exec/local.py b/cdist/exec/local.py index 6876124b..c6e25be3 100644 --- a/cdist/exec/local.py +++ b/cdist/exec/local.py @@ -85,6 +85,18 @@ class Local(object): def _init_log(self): self.log = logging.getLogger(self.target_host[0]) + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._init_log() + def _init_permissions(self): # Setup file permissions using umask os.umask(0o077) diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py index f157d65c..9c70bdf4 100644 --- a/cdist/exec/remote.py +++ b/cdist/exec/remote.py @@ -26,9 +26,10 @@ import sys import glob import subprocess import logging -import cdist.exec.util as exec_util +import multiprocessing import cdist +import cdist.exec.util as exec_util class DecodeError(cdist.Error): @@ -66,10 +67,25 @@ class Remote(object): self.type_path = os.path.join(self.conf_path, "type") self.global_explorer_path = os.path.join(self.conf_path, "explorer") - self.log = logging.getLogger(self.target_host[0]) + self._open_logger() self._init_env() + def _open_logger(self): + self.log = logging.getLogger(self.target_host[0]) + + # logger is not pickable, so remove it when we pickle + def __getstate__(self): + state = self.__dict__.copy() + if 'log' in state: + del state['log'] + return state + + # recreate logger when we unpickle + def __setstate__(self, state): + self.__dict__.update(state) + self._open_logger() + def _init_env(self): """Setup environment for scripts - HERE????""" # FIXME: better do so in exec functions that require it! @@ -110,6 +126,40 @@ class Remote(object): self.target_host[0], destination)]) self._run_command(command) + def transfer_dir_parallel(self, source, destination, jobs): + """Transfer a directory to the remote side in parallel mode.""" + self.log.debug("Remote transfer: %s -> %s", source, destination) + self.rmdir(destination) + if os.path.isdir(source): + self.mkdir(destination) + self.log.info("Remote transfer in {} parallel jobs".format( + jobs)) + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for parallel " + "remote transfer")) + with multiprocessing.Pool(jobs) as pool: + self.log.debug("Starting async for parallel transfer") + commands = [] + for f in glob.glob1(source, '*'): + command = self._copy.split() + path = os.path.join(source, f) + command.extend([path, '{0}:{1}'.format( + self.target_host[0], destination)]) + commands.append(command) + results = [ + pool.apply_async(self._run_command, (cmd,)) + for cmd in commands + ] + + self.log.debug("Waiting async results for parallel transfer") + for r in results: + r.get() # self._run_command returns None + self.log.debug(("Multiprocessing for parallel transfer " + "finished")) + else: + raise cdist.Error("Source {} is not a directory".format(source)) + def run_script(self, script, env=None, return_output=False): """Run the given script with the given environment on the remote side. Return the output as a string. diff --git a/cdist/test/exec/remote.py b/cdist/test/exec/remote.py index 1cb4490a..45dabb18 100644 --- a/cdist/test/exec/remote.py +++ b/cdist/test/exec/remote.py @@ -24,6 +24,7 @@ import getpass import shutil import string import random +import multiprocessing import cdist from cdist import test @@ -125,6 +126,22 @@ class RemoteTestCase(test.CdistTestCase): # test if the payload file is in the target directory self.assertTrue(os.path.isfile(os.path.join(target, source_file_name))) + def test_transfer_dir_parallel(self): + source = self.mkdtemp(dir=self.temp_dir) + # put 8 files in the directory as payload + filenames = [] + for x in range(8): + handle, source_file = self.mkstemp(dir=source) + os.close(handle) + source_file_name = os.path.split(source_file)[-1] + filenames.append(source_file_name) + target = self.mkdtemp(dir=self.temp_dir) + self.remote.transfer_dir_parallel(source, target, + multiprocessing.cpu_count()) + # test if the payload files are in the target directory + for filename in filenames: + self.assertTrue(os.path.isfile(os.path.join(target, filename))) + def test_create_files_dirs(self): self.remote.create_files_dirs() self.assertTrue(os.path.isdir(self.remote.base_path)) diff --git a/cdist/test/explorer/__init__.py b/cdist/test/explorer/__init__.py index 9efa1680..fc66020d 100644 --- a/cdist/test/explorer/__init__.py +++ b/cdist/test/explorer/__init__.py @@ -23,6 +23,7 @@ import os import shutil import getpass +import multiprocessing import cdist from cdist import core @@ -168,3 +169,48 @@ class ExplorerClassTestCase(test.CdistTestCase): cdist_object.create() self.explorer.run_type_explorers(cdist_object) self.assertEqual(cdist_object.explorers, {'world': 'hello'}) + + def test_jobs_parameter(self): + self.assertIsNone(self.explorer.jobs) + expl = explorer.Explorer( + self.target_host, + self.local, + self.remote, + jobs=8) + self.assertEqual(expl.jobs, 8) + + def test_transfer_global_explorers_parallel(self): + expl = explorer.Explorer( + self.target_host, + self.local, + self.remote, + jobs=multiprocessing.cpu_count()) + self.assertIsNotNone(expl.jobs) + + expl.transfer_global_explorers() + source = self.local.global_explorer_path + destination = self.remote.global_explorer_path + self.assertEqual(sorted(os.listdir(source)), + sorted(os.listdir(destination))) + + def test_run_parallel_jobs(self): + expl = explorer.Explorer( + self.target_host, + self.local, + self.remote, + jobs=multiprocessing.cpu_count()) + self.assertIsNotNone(expl.jobs) + out_path = self.mkdtemp() + + expl.run_global_explorers(out_path) + names = sorted(expl.list_global_explorer_names()) + output = sorted(os.listdir(out_path)) + + self.assertEqual(names, output) + shutil.rmtree(out_path) + + +if __name__ == '__main__': + import unittest + + unittest.main() diff --git a/docs/changelog b/docs/changelog index d836af52..d0d2c157 100644 --- a/docs/changelog +++ b/docs/changelog @@ -1,6 +1,8 @@ Changelog --------- next: + * Core: Add -b, --enable-beta option for enabling beta functionalities (Darko Poljak) + * Core: Add -j, --jobs option for parallel execution and add parallel support for global explorers (currently in beta) (Darko Poljak) * Core: Add derived env vars for target hostname and fqdn (Darko Poljak) * New type: __keyboard: Set keyboard layout (Carlos Ortigoza) * Documentation: Re-license types' man pages to GPLV3+ (Dmitry Bogatov, Darko Poljak) diff --git a/docs/src/man1/cdist.rst b/docs/src/man1/cdist.rst index c6f08110..a00a3ec0 100644 --- a/docs/src/man1/cdist.rst +++ b/docs/src/man1/cdist.rst @@ -15,9 +15,9 @@ SYNOPSIS cdist banner [-h] [-d] [-v] - cdist config [-h] [-d] [-v] [-c CONF_DIR] [-f HOSTFILE] [-i MANIFEST] - [-n] [-o OUT_PATH] [-p] [-s] [--remote-copy REMOTE_COPY] - [--remote-exec REMOTE_EXEC] + cdist config [-h] [-d] [-v] [-b] [-c CONF_DIR] [-f HOSTFILE] + [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-p] [-s] + [--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC] [host [host ...]] cdist shell [-h] [-d] [-v] [-s SHELL] @@ -62,6 +62,11 @@ CONFIG ------ Configure one or more hosts. +.. option:: -b, --enable-beta + + Enable beta functionalities. Beta functionalities include the + following options: -j/--jobs. + .. option:: -c CONF_DIR, --conf-dir CONF_DIR Add a configuration directory. Can be specified multiple times. @@ -83,6 +88,11 @@ Configure one or more hosts. Path to a cdist manifest or - to read from stdin +.. option:: -j [JOBS], --jobs [JOBS] + + Specify the maximum number of parallel jobs; currently only + global explorers are supported (currently in beta) + .. option:: -n, --dry-run Do not execute code diff --git a/scripts/cdist b/scripts/cdist index 96bd0462..d02f0a5f 100755 --- a/scripts/cdist +++ b/scripts/cdist @@ -22,6 +22,38 @@ # +# list of beta arguments for sub-commands +BETA_ARGS = { + 'config': ['jobs', ], +} + + +def check_positive_int(value): + import argparse + + try: + val = int(value) + except ValueError as e: + raise argparse.ArgumentTypeError( + "{} is invalid int value".format(value)) + if val <= 0: + raise argparse.ArgumentTypeError( + "{} is invalid positive int value".format(val)) + return val + + +def check_beta(args_dict): + if 'beta' not in args_dict: + args_dict['beta'] = False + # Check only if beta is not enabled: if beta option is specified then + # raise error. + if not args_dict['beta']: + cmd = args_dict['command'] + for arg in BETA_ARGS[cmd]: + if arg in args_dict: + raise cdist.CdistBetaRequired(cmd, arg) + + def commandline(): """Parse command line""" import argparse @@ -31,6 +63,7 @@ def commandline(): import cdist.shell import shutil import os + import multiprocessing # Construct parser others can reuse parser = {} @@ -62,6 +95,11 @@ def commandline(): 'config', parents=[parser['loglevel']]) parser['config'].add_argument( 'host', nargs='*', help='host(s) to operate on') + parser['config'].add_argument( + '-b', '--enable-beta', + help=('Enable beta functionalities. Beta functionalities ' + 'include the following options: -j/--jobs.'), + action='store_true', dest='beta', default=False) parser['config'].add_argument( '-c', '--conf-dir', help=('Add configuration directory (can be repeated, ' @@ -77,6 +115,12 @@ def commandline(): '-i', '--initial-manifest', help='Path to a cdist manifest or \'-\' to read from stdin.', dest='manifest', required=False) + parser['config'].add_argument( + '-j', '--jobs', nargs='?', type=check_positive_int, + help=('Specify the maximum number of parallel jobs, currently ' + 'only global explorers are supported (currently in beta'), + action='store', dest='jobs', + const=multiprocessing.cpu_count()) parser['config'].add_argument( '-n', '--dry-run', help='Do not execute code', action='store_true') @@ -146,6 +190,7 @@ def commandline(): parser['main'].print_help() sys.exit(0) + check_beta(vars(args)) args.func(args) if __name__ == "__main__":