Merge pull request #547 from darko-poljak/p-host_max
Add -p HOST_MAX argument.
This commit is contained in:
commit
55f4528d1e
3 changed files with 61 additions and 30 deletions
|
@ -152,9 +152,10 @@ def get_parsers():
|
||||||
parser['config_main'].add_argument(
|
parser['config_main'].add_argument(
|
||||||
'-j', '--jobs', nargs='?',
|
'-j', '--jobs', nargs='?',
|
||||||
type=check_positive_int,
|
type=check_positive_int,
|
||||||
help=('Specify the maximum number of parallel jobs. Global '
|
help=('Operate in parallel in specified maximum number of jobs. '
|
||||||
'explorers, object prepare and object run are supported '
|
'Global explorers, object prepare and object run are '
|
||||||
'(currently in beta)'),
|
'supported. Without argument CPU count is used by default. '
|
||||||
|
'Currently in beta.'),
|
||||||
action='store', dest='jobs',
|
action='store', dest='jobs',
|
||||||
const=multiprocessing.cpu_count())
|
const=multiprocessing.cpu_count())
|
||||||
parser['config_main'].add_argument(
|
parser['config_main'].add_argument(
|
||||||
|
@ -204,13 +205,17 @@ def get_parsers():
|
||||||
'default, read hosts from stdin.'),
|
'default, read hosts from stdin.'),
|
||||||
dest='hostfile', required=False)
|
dest='hostfile', required=False)
|
||||||
parser['config_args'].add_argument(
|
parser['config_args'].add_argument(
|
||||||
'-p', '--parallel',
|
'-p', '--parallel', nargs='?', metavar='HOST_MAX',
|
||||||
help='operate on multiple hosts in parallel',
|
type=check_positive_int,
|
||||||
action='store_true', dest='parallel')
|
help=('Operate on multiple hosts in parallel for specified maximum '
|
||||||
|
'hosts at a time. Without argument CPU count is used by '
|
||||||
|
'default.'),
|
||||||
|
action='store', dest='parallel',
|
||||||
|
const=multiprocessing.cpu_count())
|
||||||
parser['config_args'].add_argument(
|
parser['config_args'].add_argument(
|
||||||
'-s', '--sequential',
|
'-s', '--sequential',
|
||||||
help='operate on multiple hosts sequentially (default)',
|
help='operate on multiple hosts sequentially (default)',
|
||||||
action='store_false', dest='parallel')
|
action='store_const', dest='parallel', const=0)
|
||||||
parser['config_args'].add_argument(
|
parser['config_args'].add_argument(
|
||||||
'-t', '--tag',
|
'-t', '--tag',
|
||||||
help=('host is specified by tag, not hostname/address; '
|
help=('host is specified by tag, not hostname/address; '
|
||||||
|
|
|
@ -129,7 +129,6 @@ class Config(object):
|
||||||
|
|
||||||
cls._check_and_prepare_args(args)
|
cls._check_and_prepare_args(args)
|
||||||
|
|
||||||
process = {}
|
|
||||||
failed_hosts = []
|
failed_hosts = []
|
||||||
time_start = time.time()
|
time_start = time.time()
|
||||||
|
|
||||||
|
@ -153,6 +152,16 @@ class Config(object):
|
||||||
else:
|
else:
|
||||||
it = itertools.chain(cls.hosts(args.host),
|
it = itertools.chain(cls.hosts(args.host),
|
||||||
cls.hosts(args.hostfile))
|
cls.hosts(args.hostfile))
|
||||||
|
|
||||||
|
process_args = []
|
||||||
|
# No new child process if only one host at a time.
|
||||||
|
if args.parallel == 1:
|
||||||
|
log.debug("Only 1 parallel process, doing it sequentially")
|
||||||
|
args.parallel = 0
|
||||||
|
if args.parallel:
|
||||||
|
log.trace("Processing hosts in parallel")
|
||||||
|
else:
|
||||||
|
log.trace("Processing hosts sequentially")
|
||||||
for entry in it:
|
for entry in it:
|
||||||
if isinstance(entry, tuple):
|
if isinstance(entry, tuple):
|
||||||
# if configuring by specified tags
|
# if configuring by specified tags
|
||||||
|
@ -178,27 +187,37 @@ class Config(object):
|
||||||
|
|
||||||
hostcnt += 1
|
hostcnt += 1
|
||||||
if args.parallel:
|
if args.parallel:
|
||||||
log.trace("Creating child process for %s", host)
|
pargs = (host, host_tags, host_base_path, hostdir, args, True)
|
||||||
process[host] = multiprocessing.Process(
|
log.trace(("Args for multiprocessing operation "
|
||||||
target=cls.onehost,
|
"for host {}: {}".format(host, pargs)))
|
||||||
args=(host, host_tags, host_base_path, hostdir, args,
|
process_args.append(pargs)
|
||||||
True))
|
|
||||||
process[host].start()
|
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
cls.onehost(host, host_tags, host_base_path, hostdir,
|
cls.onehost(host, host_tags, host_base_path, hostdir,
|
||||||
args, parallel=False)
|
args, parallel=False)
|
||||||
except cdist.Error as e:
|
except cdist.Error as e:
|
||||||
failed_hosts.append(host)
|
failed_hosts.append(host)
|
||||||
|
if args.parallel and len(process_args) == 1:
|
||||||
|
log.debug("Only 1 host for parallel processing, doing it "
|
||||||
|
"sequentially")
|
||||||
|
try:
|
||||||
|
cls.onehost(*process_args[0])
|
||||||
|
except cdist.Error as e:
|
||||||
|
failed_hosts.append(host)
|
||||||
# Catch errors in parallel mode when joining
|
# Catch errors in parallel mode when joining
|
||||||
if args.parallel:
|
if args.parallel:
|
||||||
for host in process.keys():
|
log.trace("Multiprocessing start method is {}".format(
|
||||||
log.trace("Joining process %s", host)
|
multiprocessing.get_start_method()))
|
||||||
process[host].join()
|
log.trace(("Starting multiprocessing Pool for {} "
|
||||||
|
"parallel host operation".format(args.parallel)))
|
||||||
|
|
||||||
if not process[host].exitcode == 0:
|
results = mp_pool_run(cls.onehost, process_args, jobs=args.parallel)
|
||||||
failed_hosts.append(host)
|
log.trace(("Multiprocessing for parallel host operation "
|
||||||
|
"finished"))
|
||||||
|
log.trace(("Multiprocessing for parallel host operation "
|
||||||
|
"results: {}", results))
|
||||||
|
|
||||||
|
failed_hosts = [host for host, result in results if not result]
|
||||||
|
|
||||||
time_end = time.time()
|
time_end = time.time()
|
||||||
log.verbose("Total processing time for %s host(s): %s", hostcnt,
|
log.verbose("Total processing time for %s host(s): %s", hostcnt,
|
||||||
|
@ -236,7 +255,10 @@ class Config(object):
|
||||||
@classmethod
|
@classmethod
|
||||||
def onehost(cls, host, host_tags, host_base_path, host_dir_name, args,
|
def onehost(cls, host, host_tags, host_base_path, host_dir_name, args,
|
||||||
parallel):
|
parallel):
|
||||||
"""Configure ONE system"""
|
"""Configure ONE system.
|
||||||
|
If operating in parallel then return tuple (host, True|False, )
|
||||||
|
so that main process knows for which host function was successful.
|
||||||
|
"""
|
||||||
|
|
||||||
log = logging.getLogger(host)
|
log = logging.getLogger(host)
|
||||||
|
|
||||||
|
@ -273,8 +295,7 @@ class Config(object):
|
||||||
except cdist.Error as e:
|
except cdist.Error as e:
|
||||||
log.error(e)
|
log.error(e)
|
||||||
if parallel:
|
if parallel:
|
||||||
# We are running in our own process here, need to sys.exit!
|
return (host, False, )
|
||||||
sys.exit(1)
|
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -285,6 +306,8 @@ class Config(object):
|
||||||
# Pass back to controlling code in sequential mode
|
# Pass back to controlling code in sequential mode
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
if parallel:
|
||||||
|
return (host, True, )
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_base_root_path(out_path=None):
|
def create_base_root_path(out_path=None):
|
||||||
|
|
|
@ -19,14 +19,14 @@ SYNOPSIS
|
||||||
[-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
|
[-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
|
||||||
[-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
|
[-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
|
||||||
[--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
|
[--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
|
||||||
[-f HOSTFILE] [-p] [-s] [-t]
|
[-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
|
||||||
[host [host ...]]
|
[host [host ...]]
|
||||||
|
|
||||||
cdist install [-h] [-q] [-v] [-b] [-C CACHE_PATH_PATTERN] [-c CONF_DIR]
|
cdist install [-h] [-q] [-v] [-b] [-C CACHE_PATH_PATTERN] [-c CONF_DIR]
|
||||||
[-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
|
[-i MANIFEST] [-j [JOBS]] [-n] [-o OUT_PATH]
|
||||||
[-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
|
[-r REMOTE_OUT_DIR] [--remote-copy REMOTE_COPY]
|
||||||
[--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
|
[--remote-exec REMOTE_EXEC] [-I INVENTORY_DIR] [-A] [-a]
|
||||||
[-f HOSTFILE] [-p] [-s] [-t]
|
[-f HOSTFILE] [-p [HOST_MAX]] [-s] [-t]
|
||||||
[host [host ...]]
|
[host [host ...]]
|
||||||
|
|
||||||
cdist inventory [-h] [-q] [-v] [-b] [-I INVENTORY_DIR]
|
cdist inventory [-h] [-q] [-v] [-b] [-I INVENTORY_DIR]
|
||||||
|
@ -153,9 +153,10 @@ Configure/install one or more hosts.
|
||||||
|
|
||||||
.. option:: -j [JOBS], --jobs [JOBS]
|
.. option:: -j [JOBS], --jobs [JOBS]
|
||||||
|
|
||||||
Specify the maximum number of parallel jobs. Global
|
Operate in parallel in specified maximum number of
|
||||||
explorers, object prepare and object run are supported
|
jobs. Global explorers, object prepare and object run
|
||||||
(currently in beta).
|
are supported. Without argument CPU count is used by
|
||||||
|
default. Currently in beta.
|
||||||
|
|
||||||
.. option:: -n, --dry-run
|
.. option:: -n, --dry-run
|
||||||
|
|
||||||
|
@ -165,9 +166,11 @@ Configure/install one or more hosts.
|
||||||
|
|
||||||
Directory to save cdist output in
|
Directory to save cdist output in
|
||||||
|
|
||||||
.. option:: -p, --parallel
|
.. option:: -p [HOST_MAX], --parallel [HOST_MAX]
|
||||||
|
|
||||||
Operate on multiple hosts in parallel
|
Operate on multiple hosts in parallel for specified
|
||||||
|
maximum hosts at a time. Without argument CPU count is
|
||||||
|
used by default.
|
||||||
|
|
||||||
.. option:: -r REMOTE_OUT_PATH, --remote-out-dir REMOTE_OUT_PATH
|
.. option:: -r REMOTE_OUT_PATH, --remote-out-dir REMOTE_OUT_PATH
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue