Begin parallelizing object prepare and run.

This commit is contained in:
Darko Poljak 2016-12-06 14:27:17 +01:00
parent 609977b7ff
commit 6c1b215db8
1 changed files with 33 additions and 10 deletions

View File

@ -27,6 +27,7 @@ import time
import itertools import itertools
import tempfile import tempfile
import socket import socket
import concurrent.futures
import cdist import cdist
import cdist.hostsource import cdist.hostsource
@ -265,13 +266,14 @@ class Config(object):
else: else:
yield cdist_object yield cdist_object
def iterate_once(self): def iterate_once_parallel(self):
""" """
Iterate over the objects once - helper method for Iterate over the objects once - helper method for
iterate_until_finished iterate_until_finished
""" """
objects_changed = False objects_changed = False
cargo = []
for cdist_object in self.object_list(): for cdist_object in self.object_list():
if cdist_object.requirements_unfinished(cdist_object.requirements): if cdist_object.requirements_unfinished(cdist_object.requirements):
"""We cannot do anything for this poor object""" """We cannot do anything for this poor object"""
@ -280,18 +282,38 @@ class Config(object):
if cdist_object.state == core.CdistObject.STATE_UNDEF: if cdist_object.state == core.CdistObject.STATE_UNDEF:
"""Prepare the virgin object""" """Prepare the virgin object"""
self.object_prepare(cdist_object) # self.object_prepare(cdist_object)
objects_changed = True # objects_changed = True
cargo.append(cdist_object)
if cdist_object.requirements_unfinished(cdist_object.autorequire): if cargo:
"""The previous step created objects we depend on - with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
wait for them for x in executor.map(self.object_prepare, cargo):
""" pass # returns None
objects_changed = True
cargo.clear()
for cdist_object in self.object_list():
if cdist_object.requirements_unfinished(cdist_object.requirements):
"""We cannot do anything for this poor object"""
continue continue
if cdist_object.state == core.CdistObject.STATE_PREPARED: if cdist_object.state == core.CdistObject.STATE_PREPARED:
self.object_run(cdist_object) if cdist_object.requirements_unfinished(cdist_object.autorequire):
objects_changed = True """The previous step created objects we depend on -
wait for them
"""
continue
# self.object_run(cdist_object)
# objects_changed = True
cargo.append(cdist_object)
if cargo:
with concurrent.futures.ProcessPoolExecutor(self.jobs) as executor:
for x in executor.map(self.object_run, cargo):
pass # returns None
objects_changed = True
return objects_changed return objects_changed
@ -304,7 +326,8 @@ class Config(object):
objects_changed = True objects_changed = True
while objects_changed: while objects_changed:
objects_changed = self.iterate_once() if self.jobs:
objects_changed = self.iterate_once_parallel()
# Check whether all objects have been finished # Check whether all objects have been finished
unfinished_objects = [] unfinished_objects = []