Merge remote-tracking branch 'ungleich/master' into ssh-mux-sigpipe

This commit is contained in:
Darko Poljak 2017-07-19 07:55:19 +02:00
commit 01d7f63fcb
44 changed files with 322 additions and 148 deletions

View file

@ -151,7 +151,7 @@ class Config(object):
hostcnt += 1
if args.parallel:
log.debug("Creating child process for %s", host)
log.trace("Creating child process for %s", host)
process[host] = multiprocessing.Process(
target=cls.onehost,
args=(host, host_base_path, hostdir, args, True))
@ -166,15 +166,15 @@ class Config(object):
# Catch errors in parallel mode when joining
if args.parallel:
for host in process.keys():
log.debug("Joining process %s", host)
log.trace("Joining process %s", host)
process[host].join()
if not process[host].exitcode == 0:
failed_hosts.append(host)
time_end = time.time()
log.info("Total processing time for %s host(s): %s", hostcnt,
(time_end - time_start))
log.verbose("Total processing time for %s host(s): %s", hostcnt,
(time_end - time_start))
if len(failed_hosts) > 0:
raise cdist.Error("Failed to configure the following hosts: " +
@ -233,13 +233,15 @@ class Config(object):
host_dir_name=host_dir_name,
initial_manifest=args.manifest,
add_conf_dirs=args.conf_dir,
cache_path_pattern=args.cache_path_pattern)
cache_path_pattern=args.cache_path_pattern,
quiet_mode=args.quiet)
remote = cdist.exec.remote.Remote(
target_host=target_host,
remote_exec=remote_exec,
remote_copy=remote_copy,
base_path=args.remote_out_path)
base_path=args.remote_out_path,
quiet_mode=args.quiet)
cleanup_cmds = []
if cleanup_cmd:
@ -284,6 +286,8 @@ class Config(object):
"""Do what is most often done: deploy & cleanup"""
start_time = time.time()
self.log.info("Starting configuration run")
self._init_files_dirs()
self.explorer.run_global_explorers(self.local.global_explorer_out_path)
@ -292,8 +296,8 @@ class Config(object):
self.cleanup()
self.local.save_cache(start_time)
self.log.info("Finished successful run in %s seconds",
time.time() - start_time)
self.log.info("Finished successful run in {:.2f} seconds".format(
time.time() - start_time))
def cleanup(self):
self.log.debug("Running cleanup commands")
@ -329,7 +333,7 @@ class Config(object):
return objects_changed
def _iterate_once_sequential(self):
self.log.info("Iteration in sequential mode")
self.log.debug("Iteration in sequential mode")
objects_changed = False
for cdist_object in self.object_list():
@ -356,7 +360,7 @@ class Config(object):
return objects_changed
def _iterate_once_parallel(self):
self.log.info("Iteration in parallel mode in {} jobs".format(
self.log.debug("Iteration in parallel mode in {} jobs".format(
self.jobs))
objects_changed = False
@ -379,15 +383,39 @@ class Config(object):
self.object_prepare(cargo[0])
objects_changed = True
elif cargo:
self.log.debug("Multiprocessing start method is {}".format(
self.log.trace("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for {} parallel "
self.log.trace("Multiprocessing cargo: %s", cargo)
cargo_types = set()
for c in cargo:
cargo_types.add(c.cdist_type)
self.log.trace("Multiprocessing cargo_types: %s", cargo_types)
nt = len(cargo_types)
if nt == 1:
self.log.debug(("Only one type, transfering explorers "
"sequentially"))
self.explorer.transfer_type_explorers(cargo_types.pop())
else:
self.log.trace(("Starting multiprocessing Pool for {} "
"parallel transfering types' explorers".format(
nt)))
args = [
(ct, ) for ct in cargo_types
]
mp_pool_run(self.explorer.transfer_type_explorers, args,
jobs=self.jobs)
self.log.trace(("Multiprocessing for parallel transfering "
"types' explorers finished"))
self.log.trace(("Starting multiprocessing Pool for {} parallel "
"objects preparation".format(n)))
args = [
(c, ) for c in cargo
(c, False, ) for c in cargo
]
mp_pool_run(self.object_prepare, args, jobs=self.jobs)
self.log.debug(("Multiprocessing for parallel object "
self.log.trace(("Multiprocessing for parallel object "
"preparation finished"))
objects_changed = True
@ -407,25 +435,44 @@ class Config(object):
# self.object_run(cdist_object)
# objects_changed = True
cargo.append(cdist_object)
n = len(cargo)
if n == 1:
self.log.debug("Only one object, running sequentially")
self.object_run(cargo[0])
objects_changed = True
elif cargo:
self.log.debug("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for {} parallel "
"object run".format(n)))
args = [
(c, ) for c in cargo
]
mp_pool_run(self.object_run, args, jobs=self.jobs)
self.log.debug(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True
# put objects in chuncks of distinct types
# so that there is no more than one object
# of the same type in one chunk because there is a
# possibility of object's process locking which
# prevents parallel execution at remote
# and do this only for nonparallel marked types
for chunk in cargo:
for obj in chunk:
if (obj.cdist_type == cdist_object.cdist_type and
cdist_object.cdist_type.is_nonparallel):
break
else:
chunk.append(cdist_object)
break
else:
chunk = [cdist_object, ]
cargo.append(chunk)
for chunk in cargo:
self.log.trace("Running chunk: %s", chunk)
n = len(chunk)
if n == 1:
self.log.debug("Only one object, running sequentially")
self.object_run(chunk[0])
objects_changed = True
elif chunk:
self.log.trace("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.trace(("Starting multiprocessing Pool for {} "
"parallel object run".format(n)))
args = [
(c, ) for c in chunk
]
mp_pool_run(self.object_run, args, jobs=self.jobs)
self.log.trace(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True
return objects_changed
@ -491,18 +538,19 @@ class Config(object):
("The requirements of the following objects could not be "
"resolved:\n%s") % ("\n".join(info_string)))
def object_prepare(self, cdist_object):
def object_prepare(self, cdist_object, transfer_type_explorers=True):
"""Prepare object: Run type explorer + manifest"""
self.log.info(
self.log.verbose("Preparing object {}".format(cdist_object.name))
self.log.verbose(
"Running manifest and explorers for " + cdist_object.name)
self.explorer.run_type_explorers(cdist_object)
self.explorer.run_type_explorers(cdist_object, transfer_type_explorers)
self.manifest.run_type_manifest(cdist_object)
cdist_object.state = core.CdistObject.STATE_PREPARED
def object_run(self, cdist_object):
"""Run gencode and code for an object"""
self.log.debug("Trying to run object %s" % (cdist_object.name))
self.log.verbose("Running object " + cdist_object.name)
if cdist_object.state == core.CdistObject.STATE_DONE:
raise cdist.Error(("Attempting to run an already finished "
"object: %s"), cdist_object)
@ -510,7 +558,7 @@ class Config(object):
cdist_type = cdist_object.cdist_type
# Generate
self.log.info("Generating code for %s" % (cdist_object.name))
self.log.debug("Generating code for %s" % (cdist_object.name))
cdist_object.code_local = self.code.run_gencode_local(cdist_object)
cdist_object.code_remote = self.code.run_gencode_remote(cdist_object)
if cdist_object.code_local or cdist_object.code_remote:
@ -519,15 +567,19 @@ class Config(object):
# Execute
if not self.dry_run:
if cdist_object.code_local or cdist_object.code_remote:
self.log.info("Executing code for %s" % (cdist_object.name))
self.log.info("Processing %s" % (cdist_object.name))
if cdist_object.code_local:
self.log.trace("Executing local code for %s"
% (cdist_object.name))
self.code.run_code_local(cdist_object)
if cdist_object.code_remote:
self.log.trace("Executing remote code for %s"
% (cdist_object.name))
self.code.transfer_code_remote(cdist_object)
self.code.run_code_remote(cdist_object)
else:
self.log.info("Skipping code execution due to DRY RUN")
self.log.verbose("Skipping code execution due to DRY RUN")
# Mark this object as done
self.log.debug("Finishing run of " + cdist_object.name)
self.log.trace("Finishing run of " + cdist_object.name)
cdist_object.state = core.CdistObject.STATE_DONE