Commit 126a1812 authored by Darko Poljak's avatar Darko Poljak

Fix parallel object prepare and run steps. Add nonparallel type marker.

parent 31899b22
......@@ -167,7 +167,7 @@ class Config(object):
time_end = time.time()
log.verbose("Total processing time for %s host(s): %s", hostcnt,
(time_end - time_start))
(time_end - time_start))
if len(failed_hosts) > 0:
raise cdist.Error("Failed to configure the following hosts: " +
......@@ -356,10 +356,34 @@ class Config(object):
elif cargo:
self.log.trace("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.trace("Multiprocessing cargo: %s", cargo)
cargo_types = set()
for c in cargo:
cargo_types.add(c.cdist_type)
self.log.trace("Multiprocessing cargo_types: %s", cargo_types)
nt = len(cargo_types)
if nt == 1:
self.log.debug(("Only one type, transfering explorers "
"sequentially"))
self.explorer.transfer_type_explorers(cargo_types.pop())
else:
self.log.trace(("Starting multiprocessing Pool for {} "
"parallel transfering types' explorers".format(
nt)))
args = [
(ct, ) for ct in cargo_types
]
mp_pool_run(self.explorer.transfer_type_explorers, args,
jobs=self.jobs)
self.log.trace(("Multiprocessing for parallel transfering "
"types' explorers finished"))
self.log.trace(("Starting multiprocessing Pool for {} parallel "
"objects preparation".format(n)))
args = [
(c, ) for c in cargo
(c, False, ) for c in cargo
]
mp_pool_run(self.object_prepare, args, jobs=self.jobs)
self.log.trace(("Multiprocessing for parallel object "
......@@ -382,25 +406,44 @@ class Config(object):
# self.object_run(cdist_object)
# objects_changed = True
cargo.append(cdist_object)
n = len(cargo)
if n == 1:
self.log.debug("Only one object, running sequentially")
self.object_run(cargo[0])
objects_changed = True
elif cargo:
self.log.trace("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.trace(("Starting multiprocessing Pool for {} parallel "
"object run".format(n)))
args = [
(c, ) for c in cargo
]
mp_pool_run(self.object_run, args, jobs=self.jobs)
self.log.trace(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True
# put objects in chuncks of distinct types
# so that there is no more than one object
# of the same type in one chunk because there is a
# possibility of object's process locking which
# prevents parallel execution at remote
# and do this only for nonparallel marked types
for chunk in cargo:
for obj in chunk:
if (obj.cdist_type == cdist_object.cdist_type and
cdist_object.cdist_type.is_nonparallel):
break
else:
chunk.append(cdist_object)
break
else:
chunk = [cdist_object, ]
cargo.append(chunk)
for chunk in cargo:
self.log.trace("Running chunk: %s", chunk)
n = len(chunk)
if n == 1:
self.log.debug("Only one object, running sequentially")
self.object_run(chunk[0])
objects_changed = True
elif chunk:
self.log.trace("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.trace(("Starting multiprocessing Pool for {} "
"parallel object run".format(n)))
args = [
(c, ) for c in chunk
]
mp_pool_run(self.object_run, args, jobs=self.jobs)
self.log.trace(("Multiprocessing for parallel object "
"run finished"))
objects_changed = True
return objects_changed
......@@ -466,12 +509,12 @@ class Config(object):
("The requirements of the following objects could not be "
"resolved:\n%s") % ("\n".join(info_string)))
def object_prepare(self, cdist_object):
def object_prepare(self, cdist_object, transfer_type_explorers=True):
"""Prepare object: Run type explorer + manifest"""
self.log.verbose("Preparing object {}".format(cdist_object.name))
self.log.verbose(
"Running manifest and explorers for " + cdist_object.name)
self.explorer.run_type_explorers(cdist_object)
self.explorer.run_type_explorers(cdist_object, transfer_type_explorers)
self.manifest.run_type_manifest(cdist_object)
cdist_object.state = core.CdistObject.STATE_PREPARED
......
......@@ -66,6 +66,9 @@ class CdistType(object):
self.__boolean_parameters = None
self.__parameter_defaults = None
def __hash__(self):
return hash(self.name)
@classmethod
def list_types(cls, base_path):
"""Return a list of type instances"""
......@@ -112,6 +115,12 @@ class CdistType(object):
(if not: for configuration)"""
return os.path.isfile(os.path.join(self.absolute_path, "install"))
@property
def is_nonparallel(self):
"""Check whether a type is a non parallel, i.e. its objects
cannot run in parallel."""
return os.path.isfile(os.path.join(self.absolute_path, "nonparallel"))
@property
def explorers(self):
"""Return a list of available explorers"""
......
......@@ -141,7 +141,9 @@ class Code(object):
destination = os.path.join(self.remote.object_path,
cdist_object.code_remote_path)
# FIXME: BUG: do not create destination, but top level of destination!
self.remote.mkdir(destination)
# self.remote.mkdir(destination)
# FIX?
self.remote.mkdir(os.path.dirname(destination))
self.remote.transfer(source, destination)
def _run_code(self, cdist_object, which, env=None):
......
......@@ -163,16 +163,21 @@ class Explorer(object):
except EnvironmentError:
return []
def run_type_explorers(self, cdist_object):
def run_type_explorers(self, cdist_object, transfer_type_explorers=True):
"""Run the type explorers for the given object and save their output
in the object.
"""
self.log.verbose("Running type explorers for {}".format(
cdist_object.cdist_type))
self.log.trace("Transfering type explorers for type: %s",
cdist_object.cdist_type)
self.transfer_type_explorers(cdist_object.cdist_type)
if transfer_type_explorers:
self.log.trace("Transfering type explorers for type: %s",
cdist_object.cdist_type)
self.transfer_type_explorers(cdist_object.cdist_type)
else:
self.log.trace(("No need for transfering type explorers for "
"type: %s"),
cdist_object.cdist_type)
self.log.trace("Transfering object parameters for object: %s",
cdist_object.name)
self.transfer_object_parameters(cdist_object)
......
......@@ -124,7 +124,7 @@ class Remote(object):
def transfer(self, source, destination, jobs=None):
"""Transfer a file or directory to the remote side."""
self.log.trace("Remote transfer: %s -> %s", source, destination)
self.rmdir(destination)
# self.rmdir(destination)
if os.path.isdir(source):
self.mkdir(destination)
if jobs:
......
......@@ -113,6 +113,16 @@ class TypeTestCase(test.CdistTestCase):
cdist_type = core.CdistType(base_path, '__not_singleton')
self.assertFalse(cdist_type.is_singleton)
def test_nonparallel_is_nonparallel(self):
base_path = fixtures
cdist_type = core.CdistType(base_path, '__nonparallel')
self.assertTrue(cdist_type.is_nonparallel)
def test_not_nonparallel_is_nonparallel(self):
base_path = fixtures
cdist_type = core.CdistType(base_path, '__not_nonparallel')
self.assertFalse(cdist_type.is_nonparallel)
def test_install_is_install(self):
base_path = fixtures
cdist_type = core.CdistType(base_path, '__install')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment