Merge pull request #473 from darko-poljak/parallel-global-explorers-execution

Parallel global explorers execution
This commit is contained in:
Nico Schottelius 2016-08-15 16:49:00 +02:00 committed by GitHub
commit d7b09507bb
10 changed files with 268 additions and 16 deletions

View file

@ -56,6 +56,21 @@ class UnresolvableRequirementsError(cdist.Error):
pass pass
class CdistBetaRequired(cdist.Error):
"""Beta functionality is used but beta is not enabled"""
def __init__(self, command, arg):
self.command = command
self.arg = arg
def __str__(self):
err_msg = ("\'{}\' argument of \'{}\' command is beta, but beta is "
"not enabled. If you want to use it please enable beta "
"functionalities by using the -b/--enable-beta command "
"line flag.")
return err_msg.format(self.arg, self.command)
class CdistObjectError(Error): class CdistObjectError(Error):
"""Something went wrong with an object""" """Something went wrong with an object"""

View file

@ -71,15 +71,16 @@ def inspect_ssh_mux_opts():
class Config(object): class Config(object):
"""Cdist main class to hold arbitrary data""" """Cdist main class to hold arbitrary data"""
def __init__(self, local, remote, dry_run=False): def __init__(self, local, remote, dry_run=False, jobs=None):
self.local = local self.local = local
self.remote = remote self.remote = remote
self.log = logging.getLogger(self.local.target_host[0]) self.log = logging.getLogger(self.local.target_host[0])
self.dry_run = dry_run self.dry_run = dry_run
self.jobs = jobs
self.explorer = core.Explorer(self.local.target_host, self.local, self.explorer = core.Explorer(self.local.target_host, self.local,
self.remote) self.remote, jobs=self.jobs)
self.manifest = core.Manifest(self.local.target_host, self.local) self.manifest = core.Manifest(self.local.target_host, self.local)
self.code = core.Code(self.local.target_host, self.local, self.remote) self.code = core.Code(self.local.target_host, self.local, self.remote)
@ -119,6 +120,7 @@ class Config(object):
if args.manifest == '-' and args.hostfile == '-': if args.manifest == '-' and args.hostfile == '-':
raise cdist.Error(("Cannot read both, manifest and host file, " raise cdist.Error(("Cannot read both, manifest and host file, "
"from stdin")) "from stdin"))
# if no host source is specified then read hosts from stdin # if no host source is specified then read hosts from stdin
if not (args.hostfile or args.host): if not (args.hostfile or args.host):
args.hostfile = '-' args.hostfile = '-'
@ -266,7 +268,7 @@ class Config(object):
remote_exec=remote_exec, remote_exec=remote_exec,
remote_copy=remote_copy) remote_copy=remote_copy)
c = cls(local, remote, dry_run=args.dry_run) c = cls(local, remote, dry_run=args.dry_run, jobs=args.jobs)
c.run() c.run()
except cdist.Error as e: except cdist.Error as e:

View file

@ -23,6 +23,7 @@
import logging import logging
import os import os
import glob import glob
import multiprocessing
import cdist import cdist
@ -65,10 +66,10 @@ class Explorer(object):
"""Executes cdist explorers. """Executes cdist explorers.
""" """
def __init__(self, target_host, local, remote): def __init__(self, target_host, local, remote, jobs=None):
self.target_host = target_host self.target_host = target_host
self.log = logging.getLogger(target_host[0]) self._open_logger()
self.local = local self.local = local
self.remote = remote self.remote = remote
@ -79,6 +80,10 @@ class Explorer(object):
'__explorer': self.remote.global_explorer_path, '__explorer': self.remote.global_explorer_path,
} }
self._type_explorers_transferred = [] self._type_explorers_transferred = []
self.jobs = jobs
def _open_logger(self):
self.log = logging.getLogger(self.target_host[0])
# global # global
@ -93,17 +98,65 @@ class Explorer(object):
""" """
self.log.info("Running global explorers") self.log.info("Running global explorers")
self.transfer_global_explorers() self.transfer_global_explorers()
for explorer in self.list_global_explorer_names(): if self.jobs is None:
self._run_global_explorers_seq(out_path)
else:
self._run_global_explorers_parallel(out_path)
def _run_global_explorer(self, explorer, out_path):
output = self.run_global_explorer(explorer) output = self.run_global_explorer(explorer)
path = os.path.join(out_path, explorer) path = os.path.join(out_path, explorer)
with open(path, 'w') as fd: with open(path, 'w') as fd:
fd.write(output) fd.write(output)
def _run_global_explorers_seq(self, out_path):
self.log.info("Running global explorers sequentially")
for explorer in self.list_global_explorer_names():
self._run_global_explorer(explorer, out_path)
def _run_global_explorers_parallel(self, out_path):
self.log.info("Running global explorers in {} parallel jobs".format(
self.jobs))
self.log.debug("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for global "
"explorers run"))
with multiprocessing.Pool(self.jobs) as pool:
self.log.debug("Starting async for global explorer run")
results = [
pool.apply_async(self._run_global_explorer, (e, out_path,))
for e in self.list_global_explorer_names()
]
self.log.debug("Waiting async results for global explorer runs")
for r in results:
r.get() # self._run_global_explorer returns None
self.log.debug(("Multiprocessing run for global explorers "
"finished"))
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._open_logger()
def transfer_global_explorers(self): def transfer_global_explorers(self):
"""Transfer the global explorers to the remote side.""" """Transfer the global explorers to the remote side."""
self.remote.mkdir(self.remote.global_explorer_path) self.remote.mkdir(self.remote.global_explorer_path)
if self.jobs is None:
self.remote.transfer(self.local.global_explorer_path, self.remote.transfer(self.local.global_explorer_path,
self.remote.global_explorer_path) self.remote.global_explorer_path)
else:
self.remote.transfer_dir_parallel(
self.local.global_explorer_path,
self.remote.global_explorer_path,
self.jobs)
self.remote.run(["chmod", "0700", self.remote.run(["chmod", "0700",
"%s/*" % (self.remote.global_explorer_path)]) "%s/*" % (self.remote.global_explorer_path)])

View file

@ -85,6 +85,18 @@ class Local(object):
def _init_log(self): def _init_log(self):
self.log = logging.getLogger(self.target_host[0]) self.log = logging.getLogger(self.target_host[0])
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._init_log()
def _init_permissions(self): def _init_permissions(self):
# Setup file permissions using umask # Setup file permissions using umask
os.umask(0o077) os.umask(0o077)

View file

@ -26,9 +26,10 @@ import sys
import glob import glob
import subprocess import subprocess
import logging import logging
import cdist.exec.util as exec_util import multiprocessing
import cdist import cdist
import cdist.exec.util as exec_util
class DecodeError(cdist.Error): class DecodeError(cdist.Error):
@ -66,10 +67,25 @@ class Remote(object):
self.type_path = os.path.join(self.conf_path, "type") self.type_path = os.path.join(self.conf_path, "type")
self.global_explorer_path = os.path.join(self.conf_path, "explorer") self.global_explorer_path = os.path.join(self.conf_path, "explorer")
self.log = logging.getLogger(self.target_host[0]) self._open_logger()
self._init_env() self._init_env()
def _open_logger(self):
self.log = logging.getLogger(self.target_host[0])
# logger is not pickable, so remove it when we pickle
def __getstate__(self):
state = self.__dict__.copy()
if 'log' in state:
del state['log']
return state
# recreate logger when we unpickle
def __setstate__(self, state):
self.__dict__.update(state)
self._open_logger()
def _init_env(self): def _init_env(self):
"""Setup environment for scripts - HERE????""" """Setup environment for scripts - HERE????"""
# FIXME: better do so in exec functions that require it! # FIXME: better do so in exec functions that require it!
@ -110,6 +126,40 @@ class Remote(object):
self.target_host[0], destination)]) self.target_host[0], destination)])
self._run_command(command) self._run_command(command)
def transfer_dir_parallel(self, source, destination, jobs):
"""Transfer a directory to the remote side in parallel mode."""
self.log.debug("Remote transfer: %s -> %s", source, destination)
self.rmdir(destination)
if os.path.isdir(source):
self.mkdir(destination)
self.log.info("Remote transfer in {} parallel jobs".format(
jobs))
self.log.debug("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for parallel "
"remote transfer"))
with multiprocessing.Pool(jobs) as pool:
self.log.debug("Starting async for parallel transfer")
commands = []
for f in glob.glob1(source, '*'):
command = self._copy.split()
path = os.path.join(source, f)
command.extend([path, '{0}:{1}'.format(
self.target_host[0], destination)])
commands.append(command)
results = [
pool.apply_async(self._run_command, (cmd,))
for cmd in commands
]
self.log.debug("Waiting async results for parallel transfer")
for r in results:
r.get() # self._run_command returns None
self.log.debug(("Multiprocessing for parallel transfer "
"finished"))
else:
raise cdist.Error("Source {} is not a directory".format(source))
def run_script(self, script, env=None, return_output=False): def run_script(self, script, env=None, return_output=False):
"""Run the given script with the given environment on the remote side. """Run the given script with the given environment on the remote side.
Return the output as a string. Return the output as a string.

View file

@ -24,6 +24,7 @@ import getpass
import shutil import shutil
import string import string
import random import random
import multiprocessing
import cdist import cdist
from cdist import test from cdist import test
@ -125,6 +126,22 @@ class RemoteTestCase(test.CdistTestCase):
# test if the payload file is in the target directory # test if the payload file is in the target directory
self.assertTrue(os.path.isfile(os.path.join(target, source_file_name))) self.assertTrue(os.path.isfile(os.path.join(target, source_file_name)))
def test_transfer_dir_parallel(self):
source = self.mkdtemp(dir=self.temp_dir)
# put 8 files in the directory as payload
filenames = []
for x in range(8):
handle, source_file = self.mkstemp(dir=source)
os.close(handle)
source_file_name = os.path.split(source_file)[-1]
filenames.append(source_file_name)
target = self.mkdtemp(dir=self.temp_dir)
self.remote.transfer_dir_parallel(source, target,
multiprocessing.cpu_count())
# test if the payload files are in the target directory
for filename in filenames:
self.assertTrue(os.path.isfile(os.path.join(target, filename)))
def test_create_files_dirs(self): def test_create_files_dirs(self):
self.remote.create_files_dirs() self.remote.create_files_dirs()
self.assertTrue(os.path.isdir(self.remote.base_path)) self.assertTrue(os.path.isdir(self.remote.base_path))

View file

@ -23,6 +23,7 @@
import os import os
import shutil import shutil
import getpass import getpass
import multiprocessing
import cdist import cdist
from cdist import core from cdist import core
@ -168,3 +169,48 @@ class ExplorerClassTestCase(test.CdistTestCase):
cdist_object.create() cdist_object.create()
self.explorer.run_type_explorers(cdist_object) self.explorer.run_type_explorers(cdist_object)
self.assertEqual(cdist_object.explorers, {'world': 'hello'}) self.assertEqual(cdist_object.explorers, {'world': 'hello'})
def test_jobs_parameter(self):
self.assertIsNone(self.explorer.jobs)
expl = explorer.Explorer(
self.target_host,
self.local,
self.remote,
jobs=8)
self.assertEqual(expl.jobs, 8)
def test_transfer_global_explorers_parallel(self):
expl = explorer.Explorer(
self.target_host,
self.local,
self.remote,
jobs=multiprocessing.cpu_count())
self.assertIsNotNone(expl.jobs)
expl.transfer_global_explorers()
source = self.local.global_explorer_path
destination = self.remote.global_explorer_path
self.assertEqual(sorted(os.listdir(source)),
sorted(os.listdir(destination)))
def test_run_parallel_jobs(self):
expl = explorer.Explorer(
self.target_host,
self.local,
self.remote,
jobs=multiprocessing.cpu_count())
self.assertIsNotNone(expl.jobs)
out_path = self.mkdtemp()
expl.run_global_explorers(out_path)
names = sorted(expl.list_global_explorer_names())
output = sorted(os.listdir(out_path))
self.assertEqual(names, output)
shutil.rmtree(out_path)
if __name__ == '__main__':
import unittest
unittest.main()

View file

@ -1,6 +1,8 @@
Changelog Changelog
--------- ---------
next: next:
* Core: Add -b, --enable-beta option for enabling beta functionalities (Darko Poljak)
* Core: Add -j, --jobs option for parallel execution and add parallel support for global explorers (currently in beta) (Darko Poljak)
* Core: Add derived env vars for target hostname and fqdn (Darko Poljak) * Core: Add derived env vars for target hostname and fqdn (Darko Poljak)
* New type: __keyboard: Set keyboard layout (Carlos Ortigoza) * New type: __keyboard: Set keyboard layout (Carlos Ortigoza)
* Documentation: Re-license types' man pages to GPLV3+ (Dmitry Bogatov, Darko Poljak) * Documentation: Re-license types' man pages to GPLV3+ (Dmitry Bogatov, Darko Poljak)

View file

@ -15,9 +15,9 @@ SYNOPSIS
cdist banner [-h] [-d] [-v] cdist banner [-h] [-d] [-v]
cdist config [-h] [-d] [-v] [-c CONF_DIR] [-f HOSTFILE] [-i MANIFEST] cdist config [-h] [-d] [-v] [-b] [-c CONF_DIR] [-f HOSTFILE]
[-n] [-o OUT_PATH] [-p] [-s] [--remote-copy REMOTE_COPY] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-p] [-s]
[--remote-exec REMOTE_EXEC] [--remote-copy REMOTE_COPY] [--remote-exec REMOTE_EXEC]
[host [host ...]] [host [host ...]]
cdist shell [-h] [-d] [-v] [-s SHELL] cdist shell [-h] [-d] [-v] [-s SHELL]
@ -62,6 +62,11 @@ CONFIG
------ ------
Configure one or more hosts. Configure one or more hosts.
.. option:: -b, --enable-beta
Enable beta functionalities. Beta functionalities include the
following options: -j/--jobs.
.. option:: -c CONF_DIR, --conf-dir CONF_DIR .. option:: -c CONF_DIR, --conf-dir CONF_DIR
Add a configuration directory. Can be specified multiple times. Add a configuration directory. Can be specified multiple times.
@ -83,6 +88,11 @@ Configure one or more hosts.
Path to a cdist manifest or - to read from stdin Path to a cdist manifest or - to read from stdin
.. option:: -j [JOBS], --jobs [JOBS]
Specify the maximum number of parallel jobs; currently only
global explorers are supported (currently in beta)
.. option:: -n, --dry-run .. option:: -n, --dry-run
Do not execute code Do not execute code

View file

@ -22,6 +22,38 @@
# #
# list of beta arguments for sub-commands
BETA_ARGS = {
'config': ['jobs', ],
}
def check_positive_int(value):
import argparse
try:
val = int(value)
except ValueError as e:
raise argparse.ArgumentTypeError(
"{} is invalid int value".format(value))
if val <= 0:
raise argparse.ArgumentTypeError(
"{} is invalid positive int value".format(val))
return val
def check_beta(args_dict):
if 'beta' not in args_dict:
args_dict['beta'] = False
# Check only if beta is not enabled: if beta option is specified then
# raise error.
if not args_dict['beta']:
cmd = args_dict['command']
for arg in BETA_ARGS[cmd]:
if arg in args_dict:
raise cdist.CdistBetaRequired(cmd, arg)
def commandline(): def commandline():
"""Parse command line""" """Parse command line"""
import argparse import argparse
@ -31,6 +63,7 @@ def commandline():
import cdist.shell import cdist.shell
import shutil import shutil
import os import os
import multiprocessing
# Construct parser others can reuse # Construct parser others can reuse
parser = {} parser = {}
@ -62,6 +95,11 @@ def commandline():
'config', parents=[parser['loglevel']]) 'config', parents=[parser['loglevel']])
parser['config'].add_argument( parser['config'].add_argument(
'host', nargs='*', help='host(s) to operate on') 'host', nargs='*', help='host(s) to operate on')
parser['config'].add_argument(
'-b', '--enable-beta',
help=('Enable beta functionalities. Beta functionalities '
'include the following options: -j/--jobs.'),
action='store_true', dest='beta', default=False)
parser['config'].add_argument( parser['config'].add_argument(
'-c', '--conf-dir', '-c', '--conf-dir',
help=('Add configuration directory (can be repeated, ' help=('Add configuration directory (can be repeated, '
@ -77,6 +115,12 @@ def commandline():
'-i', '--initial-manifest', '-i', '--initial-manifest',
help='Path to a cdist manifest or \'-\' to read from stdin.', help='Path to a cdist manifest or \'-\' to read from stdin.',
dest='manifest', required=False) dest='manifest', required=False)
parser['config'].add_argument(
'-j', '--jobs', nargs='?', type=check_positive_int,
help=('Specify the maximum number of parallel jobs, currently '
'only global explorers are supported (currently in beta'),
action='store', dest='jobs',
const=multiprocessing.cpu_count())
parser['config'].add_argument( parser['config'].add_argument(
'-n', '--dry-run', '-n', '--dry-run',
help='Do not execute code', action='store_true') help='Do not execute code', action='store_true')
@ -146,6 +190,7 @@ def commandline():
parser['main'].print_help() parser['main'].print_help()
sys.exit(0) sys.exit(0)
check_beta(vars(args))
args.func(args) args.func(args)
if __name__ == "__main__": if __name__ == "__main__":