Add -p HOST_MAX argument.

This commit is contained in:
Darko Poljak 2017-07-25 11:12:18 +02:00
parent bb2cc68169
commit 0af64c01bf
3 changed files with 61 additions and 30 deletions

View file

@ -152,9 +152,10 @@ def get_parsers():
parser['config_main'].add_argument( parser['config_main'].add_argument(
'-j', '--jobs', nargs='?', '-j', '--jobs', nargs='?',
type=check_positive_int, type=check_positive_int,
help=('Specify the maximum number of parallel jobs. Global ' help=('Operate in parallel in specified maximum number of jobs. '
'explorers, object prepare and object run are supported ' 'Global explorers, object prepare and object run are '
'(currently in beta)'), 'supported. Without argument CPU count is used by default. '
'Currently in beta.'),
action='store', dest='jobs', action='store', dest='jobs',
const=multiprocessing.cpu_count()) const=multiprocessing.cpu_count())
parser['config_main'].add_argument( parser['config_main'].add_argument(
@ -204,13 +205,17 @@ def get_parsers():
'default, read hosts from stdin.'), 'default, read hosts from stdin.'),
dest='hostfile', required=False) dest='hostfile', required=False)
parser['config_args'].add_argument( parser['config_args'].add_argument(
'-p', '--parallel', '-p', '--parallel', nargs='?', metavar='HOST_MAX',
help='operate on multiple hosts in parallel', type=check_positive_int,
action='store_true', dest='parallel') help=('Operate on multiple hosts in parallel for specified maximum '
'hosts at a time. Without argument CPU count is used by '
'default.'),
action='store', dest='parallel',
const=multiprocessing.cpu_count())
parser['config_args'].add_argument( parser['config_args'].add_argument(
'-s', '--sequential', '-s', '--sequential',
help='operate on multiple hosts sequentially (default)', help='operate on multiple hosts sequentially (default)',
action='store_false', dest='parallel') action='store_const', dest='parallel', const=0)
parser['config_args'].add_argument( parser['config_args'].add_argument(
'-t', '--tag', '-t', '--tag',
help=('host is specified by tag, not hostname/address; ' help=('host is specified by tag, not hostname/address; '

View file

@ -129,7 +129,6 @@ class Config(object):
cls._check_and_prepare_args(args) cls._check_and_prepare_args(args)
process = {}
failed_hosts = [] failed_hosts = []
time_start = time.time() time_start = time.time()
@ -153,6 +152,16 @@ class Config(object):
else: else:
it = itertools.chain(cls.hosts(args.host), it = itertools.chain(cls.hosts(args.host),
cls.hosts(args.hostfile)) cls.hosts(args.hostfile))
process_args = []
# No new child process if only one host at a time.
if args.parallel == 1:
log.debug("Only 1 parallel process, doing it sequentially")
args.parallel = 0
if args.parallel:
log.trace("Processing hosts in parallel")
else:
log.trace("Processing hosts sequentially")
for entry in it: for entry in it:
if isinstance(entry, tuple): if isinstance(entry, tuple):
# if configuring by specified tags # if configuring by specified tags
@ -178,27 +187,37 @@ class Config(object):
hostcnt += 1 hostcnt += 1
if args.parallel: if args.parallel:
log.trace("Creating child process for %s", host) pargs = (host, host_tags, host_base_path, hostdir, args, True)
process[host] = multiprocessing.Process( log.trace(("Args for multiprocessing operation "
target=cls.onehost, "for host {}: {}".format(host, pargs)))
args=(host, host_tags, host_base_path, hostdir, args, process_args.append(pargs)
True))
process[host].start()
else: else:
try: try:
cls.onehost(host, host_tags, host_base_path, hostdir, cls.onehost(host, host_tags, host_base_path, hostdir,
args, parallel=False) args, parallel=False)
except cdist.Error as e: except cdist.Error as e:
failed_hosts.append(host) failed_hosts.append(host)
if args.parallel and len(process_args) == 1:
log.debug("Only 1 host for parallel processing, doing it "
"sequentially")
try:
cls.onehost(*process_args[0])
except cdist.Error as e:
failed_hosts.append(host)
# Catch errors in parallel mode when joining # Catch errors in parallel mode when joining
if args.parallel: if args.parallel:
for host in process.keys(): log.trace("Multiprocessing start method is {}".format(
log.trace("Joining process %s", host) multiprocessing.get_start_method()))
process[host].join() log.trace(("Starting multiprocessing Pool for {} "
"parallel host operation".format(args.parallel)))
if not process[host].exitcode == 0: results = mp_pool_run(cls.onehost, process_args, jobs=args.parallel)
failed_hosts.append(host) log.trace(("Multiprocessing for parallel host operation "
"finished"))
log.trace(("Multiprocessing for parallel host operation "
"results: {}", results))
failed_hosts = [host for host, result in results if not result]
time_end = time.time() time_end = time.time()
log.verbose("Total processing time for %s host(s): %s", hostcnt, log.verbose("Total processing time for %s host(s): %s", hostcnt,
@ -236,7 +255,10 @@ class Config(object):
@classmethod @classmethod
def onehost(cls, host, host_tags, host_base_path, host_dir_name, args, def onehost(cls, host, host_tags, host_base_path, host_dir_name, args,
parallel): parallel):
"""Configure ONE system""" """Configure ONE system.
If operating in parallel then return tuple (host, True|False, )
so that main process knows for which host function was successful.
"""
log = logging.getLogger(host) log = logging.getLogger(host)
@ -273,8 +295,7 @@ class Config(object):
except cdist.Error as e: except cdist.Error as e:
log.error(e) log.error(e)
if parallel: if parallel:
# We are running in our own process here, need to sys.exit! return (host, False, )
sys.exit(1)
else: else:
raise raise
@ -285,6 +306,8 @@ class Config(object):
# Pass back to controlling code in sequential mode # Pass back to controlling code in sequential mode
else: else:
raise raise
if parallel:
return (host, True, )
@staticmethod @staticmethod
def create_base_root_path(out_path=None): def create_base_root_path(out_path=None):

View file

@ -19,14 +19,14 @@ SYNOPSIS
[-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
[-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY] [-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
[--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a] [--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
[-f HOSTFILE] [-p] [-s] [-t] [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
[host [host ...]] [host [host ...]]
cdist install [-h] [-q] [-v] [-b] [-C CACHE_PATH_PATTERN] [-c CONF_DIR] cdist install [-h] [-q] [-v] [-b] [-C CACHE_PATH_PATTERN] [-c CONF_DIR]
[-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH] [-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
[-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY] [-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
[--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a] [--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
[-f HOSTFILE] [-p] [-s] [-t] [-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
[host [host ...]] [host [host ...]]
cdist inventory [-h] [-q] [-v] [-b] [-I INVENTORY_DIR] cdist inventory [-h] [-q] [-v] [-b] [-I INVENTORY_DIR]
@ -153,9 +153,10 @@ Configure/install one or more hosts.
.. option:: -j [JOBS], --jobs [JOBS] .. option:: -j [JOBS], --jobs [JOBS]
Specify the maximum number of parallel jobs. Global Operate in parallel in specified maximum number of
explorers, object prepare and object run are supported jobs. Global explorers, object prepare and object run
(currently in beta). are supported. Without argument CPU count is used by
default. Currently in beta.
.. option:: -n, --dry-run .. option:: -n, --dry-run
@ -165,9 +166,11 @@ Configure/install one or more hosts.
Directory to save cdist output in Directory to save cdist output in
.. option:: -p, --parallel .. option:: -p [HOST_MAX], --parallel [HOST_MAX]
Operate on multiple hosts in parallel Operate on multiple hosts in parallel for specified
maximum hosts at a time. Without argument CPU count is
used by default.
.. option:: -r REMOTE_OUT_PATH, --remote-out-dir REMOTE_OUT_PATH .. option:: -r REMOTE_OUT_PATH, --remote-out-dir REMOTE_OUT_PATH