From 6c1b215db8a78a3ae5a9bad036c605bd5fcc3aee Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Tue, 6 Dec 2016 14:27:17 +0100 Subject: [PATCH] Begin parallelizing object prepare and run. --- cdist/config.py | 43 +++++++++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/cdist/config.py b/cdist/config.py index b1a120ca..42dd73b8 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -27,6 +27,7 @@ import time import itertools import tempfile import socket +import concurrent.futures import cdist import cdist.hostsource @@ -265,13 +266,14 @@ class Config(object): else: yield cdist_object - def iterate_once(self): + def iterate_once_parallel(self): """ Iterate over the objects once - helper method for iterate_until_finished """ objects_changed = False + cargo = [] for cdist_object in self.object_list(): if cdist_object.requirements_unfinished(cdist_object.requirements): """We cannot do anything for this poor object""" @@ -280,18 +282,38 @@ class Config(object): if cdist_object.state == core.CdistObject.STATE_UNDEF: """Prepare the virgin object""" - self.object_prepare(cdist_object) - objects_changed = True + # self.object_prepare(cdist_object) + # objects_changed = True + cargo.append(cdist_object) - if cdist_object.requirements_unfinished(cdist_object.autorequire): - """The previous step created objects we depend on - - wait for them - """ + if cargo: + with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: + for x in executor.map(self.object_prepare, cargo): + pass # returns None + objects_changed = True + + cargo.clear() + for cdist_object in self.object_list(): + if cdist_object.requirements_unfinished(cdist_object.requirements): + """We cannot do anything for this poor object""" continue if cdist_object.state == core.CdistObject.STATE_PREPARED: - self.object_run(cdist_object) - objects_changed = True + if cdist_object.requirements_unfinished(cdist_object.autorequire): + """The previous step created objects we depend on - + wait for them + """ + continue + + # self.object_run(cdist_object) + # objects_changed = True + cargo.append(cdist_object) + + if cargo: + with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor: + for x in executor.map(self.object_run, cargo): + pass # returns None + objects_changed = True return objects_changed @@ -304,7 +326,8 @@ class Config(object): objects_changed = True while objects_changed: - objects_changed = self.iterate_once() + if self.jobs: + objects_changed = self.iterate_once_parallel() # Check whether all objects have been finished unfinished_objects = []