diff --git a/cdist/conf/type/__package/nonparallel b/cdist/conf/type/__package/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_apt/nonparallel b/cdist/conf/type/__package_apt/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_dpkg/nonparallel b/cdist/conf/type/__package_dpkg/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_emerge/nonparallel b/cdist/conf/type/__package_emerge/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_emerge_dependencies/nonparallel b/cdist/conf/type/__package_emerge_dependencies/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_luarocks/nonparallel b/cdist/conf/type/__package_luarocks/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_opkg/nonparallel b/cdist/conf/type/__package_opkg/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_pacman/nonparallel b/cdist/conf/type/__package_pacman/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_pip/nonparallel b/cdist/conf/type/__package_pip/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_pkg_freebsd/nonparallel b/cdist/conf/type/__package_pkg_freebsd/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_pkg_openbsd/nonparallel b/cdist/conf/type/__package_pkg_openbsd/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_pkgng_freebsd/nonparallel b/cdist/conf/type/__package_pkgng_freebsd/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_rubygem/nonparallel b/cdist/conf/type/__package_rubygem/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_update_index/nonparallel b/cdist/conf/type/__package_update_index/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_upgrade_all/nonparallel b/cdist/conf/type/__package_upgrade_all/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_yum/nonparallel b/cdist/conf/type/__package_yum/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/conf/type/__package_zypper/nonparallel b/cdist/conf/type/__package_zypper/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/config.py b/cdist/config.py index eee47d03..fccc93a0 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -167,7 +167,7 @@ class Config(object): time_end = time.time() log.verbose("Total processing time for %s host(s): %s", hostcnt, - (time_end - time_start)) + (time_end - time_start)) if len(failed_hosts) > 0: raise cdist.Error("Failed to configure the following hosts: " + @@ -356,10 +356,34 @@ class Config(object): elif cargo: self.log.trace("Multiprocessing start method is {}".format( multiprocessing.get_start_method())) + + self.log.trace("Multiprocessing cargo: %s", cargo) + + cargo_types = set() + for c in cargo: + cargo_types.add(c.cdist_type) + self.log.trace("Multiprocessing cargo_types: %s", cargo_types) + nt = len(cargo_types) + if nt == 1: + self.log.debug(("Only one type, transfering explorers " + "sequentially")) + self.explorer.transfer_type_explorers(cargo_types.pop()) + else: + self.log.trace(("Starting multiprocessing Pool for {} " + "parallel transfering types' explorers".format( + nt))) + args = [ + (ct, ) for ct in cargo_types + ] + mp_pool_run(self.explorer.transfer_type_explorers, args, + jobs=self.jobs) + self.log.trace(("Multiprocessing for parallel transfering " + "types' explorers finished")) + self.log.trace(("Starting multiprocessing Pool for {} parallel " "objects preparation".format(n))) args = [ - (c, ) for c in cargo + (c, False, ) for c in cargo ] mp_pool_run(self.object_prepare, args, jobs=self.jobs) self.log.trace(("Multiprocessing for parallel object " @@ -382,25 +406,44 @@ class Config(object): # self.object_run(cdist_object) # objects_changed = True - cargo.append(cdist_object) - n = len(cargo) - if n == 1: - self.log.debug("Only one object, running sequentially") - self.object_run(cargo[0]) - objects_changed = True - elif cargo: - self.log.trace("Multiprocessing start method is {}".format( - multiprocessing.get_start_method())) - self.log.trace(("Starting multiprocessing Pool for {} parallel " - "object run".format(n))) - args = [ - (c, ) for c in cargo - ] - mp_pool_run(self.object_run, args, jobs=self.jobs) - self.log.trace(("Multiprocessing for parallel object " - "run finished")) - objects_changed = True + # put objects in chuncks of distinct types + # so that there is no more than one object + # of the same type in one chunk because there is a + # possibility of object's process locking which + # prevents parallel execution at remote + # and do this only for nonparallel marked types + for chunk in cargo: + for obj in chunk: + if (obj.cdist_type == cdist_object.cdist_type and + cdist_object.cdist_type.is_nonparallel): + break + else: + chunk.append(cdist_object) + break + else: + chunk = [cdist_object, ] + cargo.append(chunk) + + for chunk in cargo: + self.log.trace("Running chunk: %s", chunk) + n = len(chunk) + if n == 1: + self.log.debug("Only one object, running sequentially") + self.object_run(chunk[0]) + objects_changed = True + elif chunk: + self.log.trace("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.trace(("Starting multiprocessing Pool for {} " + "parallel object run".format(n))) + args = [ + (c, ) for c in chunk + ] + mp_pool_run(self.object_run, args, jobs=self.jobs) + self.log.trace(("Multiprocessing for parallel object " + "run finished")) + objects_changed = True return objects_changed @@ -466,12 +509,12 @@ class Config(object): ("The requirements of the following objects could not be " "resolved:\n%s") % ("\n".join(info_string))) - def object_prepare(self, cdist_object): + def object_prepare(self, cdist_object, transfer_type_explorers=True): """Prepare object: Run type explorer + manifest""" self.log.verbose("Preparing object {}".format(cdist_object.name)) self.log.verbose( "Running manifest and explorers for " + cdist_object.name) - self.explorer.run_type_explorers(cdist_object) + self.explorer.run_type_explorers(cdist_object, transfer_type_explorers) self.manifest.run_type_manifest(cdist_object) cdist_object.state = core.CdistObject.STATE_PREPARED diff --git a/cdist/core/cdist_type.py b/cdist/core/cdist_type.py index c2d45cdd..1f7c3eb5 100644 --- a/cdist/core/cdist_type.py +++ b/cdist/core/cdist_type.py @@ -66,6 +66,9 @@ class CdistType(object): self.__boolean_parameters = None self.__parameter_defaults = None + def __hash__(self): + return hash(self.name) + @classmethod def list_types(cls, base_path): """Return a list of type instances""" @@ -112,6 +115,12 @@ class CdistType(object): (if not: for configuration)""" return os.path.isfile(os.path.join(self.absolute_path, "install")) + @property + def is_nonparallel(self): + """Check whether a type is a non parallel, i.e. its objects + cannot run in parallel.""" + return os.path.isfile(os.path.join(self.absolute_path, "nonparallel")) + @property def explorers(self): """Return a list of available explorers""" diff --git a/cdist/core/code.py b/cdist/core/code.py index 6917e7ba..900188bf 100644 --- a/cdist/core/code.py +++ b/cdist/core/code.py @@ -141,7 +141,9 @@ class Code(object): destination = os.path.join(self.remote.object_path, cdist_object.code_remote_path) # FIXME: BUG: do not create destination, but top level of destination! - self.remote.mkdir(destination) + # self.remote.mkdir(destination) + # FIX? + self.remote.mkdir(os.path.dirname(destination)) self.remote.transfer(source, destination) def _run_code(self, cdist_object, which, env=None): diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index 0d47e45e..d604c015 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -163,16 +163,21 @@ class Explorer(object): except EnvironmentError: return [] - def run_type_explorers(self, cdist_object): + def run_type_explorers(self, cdist_object, transfer_type_explorers=True): """Run the type explorers for the given object and save their output in the object. """ self.log.verbose("Running type explorers for {}".format( cdist_object.cdist_type)) - self.log.trace("Transfering type explorers for type: %s", - cdist_object.cdist_type) - self.transfer_type_explorers(cdist_object.cdist_type) + if transfer_type_explorers: + self.log.trace("Transfering type explorers for type: %s", + cdist_object.cdist_type) + self.transfer_type_explorers(cdist_object.cdist_type) + else: + self.log.trace(("No need for transfering type explorers for " + "type: %s"), + cdist_object.cdist_type) self.log.trace("Transfering object parameters for object: %s", cdist_object.name) self.transfer_object_parameters(cdist_object) diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py index 2a27bae4..9588db74 100644 --- a/cdist/exec/remote.py +++ b/cdist/exec/remote.py @@ -124,7 +124,7 @@ class Remote(object): def transfer(self, source, destination, jobs=None): """Transfer a file or directory to the remote side.""" self.log.trace("Remote transfer: %s -> %s", source, destination) - self.rmdir(destination) + # self.rmdir(destination) if os.path.isdir(source): self.mkdir(destination) if jobs: diff --git a/cdist/test/cdist_type/__init__.py b/cdist/test/cdist_type/__init__.py index 6ed3f87c..6e11383b 100644 --- a/cdist/test/cdist_type/__init__.py +++ b/cdist/test/cdist_type/__init__.py @@ -113,6 +113,16 @@ class TypeTestCase(test.CdistTestCase): cdist_type = core.CdistType(base_path, '__not_singleton') self.assertFalse(cdist_type.is_singleton) + def test_nonparallel_is_nonparallel(self): + base_path = fixtures + cdist_type = core.CdistType(base_path, '__nonparallel') + self.assertTrue(cdist_type.is_nonparallel) + + def test_not_nonparallel_is_nonparallel(self): + base_path = fixtures + cdist_type = core.CdistType(base_path, '__not_nonparallel') + self.assertFalse(cdist_type.is_nonparallel) + def test_install_is_install(self): base_path = fixtures cdist_type = core.CdistType(base_path, '__install') diff --git a/cdist/test/cdist_type/fixtures/__nonparallel/nonparallel b/cdist/test/cdist_type/fixtures/__nonparallel/nonparallel new file mode 100644 index 00000000..e69de29b diff --git a/cdist/test/cdist_type/fixtures/__not_nonparallel/.keep b/cdist/test/cdist_type/fixtures/__not_nonparallel/.keep new file mode 100644 index 00000000..e69de29b