From e07328f569fbea0130442a71a22b976317306bf2 Mon Sep 17 00:00:00 2001 From: Nico Schottelius Date: Mon, 12 Sep 2011 01:14:35 +0200 Subject: [PATCH] initial support for parallel running Signed-off-by: Nico Schottelius --- bin/cdist | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/bin/cdist b/bin/cdist index 0e85a79c..c50775f8 100755 --- a/bin/cdist +++ b/bin/cdist @@ -23,6 +23,7 @@ import argparse import datetime import logging +import multiprocessing import os import subprocess import shutil @@ -573,6 +574,15 @@ class Cdist: self.target_host, duration.total_seconds()) + def deploy_and_cleanup(self): + """Do what is most often done: deploy & cleanup""" + self.deploy_to() + self.cleanup() + + +def foo(): + print("test") + if __name__ == "__main__": parser = argparse.ArgumentParser(description='cdist ' + VERSION) parser.add_argument('host', nargs='*', help='one or more hosts to operate on') @@ -587,13 +597,12 @@ if __name__ == "__main__": parser.add_argument('-i', '--initial-manifest', help='Path to a cdist manifest or - to read from stdin', dest='manifest', required=False) -# parser.add_argument('-p', '--parallel', -# help='Operate on multiple hosts in parallel', -# action='store_true', dest='parallel') -# parser.add_argument('-s', '--sequential', -# help='Operate on multiple hosts sequentially', -# action='store_false', dest='parallel') - + parser.add_argument('-p', '--parallel', + help='Operate on multiple hosts in parallel', + action='store_true', dest='parallel') + parser.add_argument('-s', '--sequential', + help='Operate on multiple hosts sequentially', + action='store_false', dest='parallel') parser.add_argument('-V', '--version', help='Show version', action='version', version='%(prog)s ' + VERSION) @@ -606,10 +615,26 @@ if __name__ == "__main__": banner() sys.exit(0) + process = {} + time_start = datetime.datetime.now() try: for host in args.host: c = Cdist(host, initial_manifest=args.manifest, home=args.cdist_home) - c.deploy_to() - c.cleanup() + if args.parallel: + log.info("Starting child process for %s", host) + process[host] = multiprocessing.Process(target=c.deploy_and_cleanup) + process[host].start() + log.debug("After process for %s", host) + else: + c.deploy_and_cleanup() + + if args.parallel: + for p in process.keys(): + log.debug("Joining %s", p) + process[p].join() + + time_end = datetime.datetime.now() + log.info("Total processing time: %s", (time_end - time_start).total_seconds()) + except KeyboardInterrupt: sys.exit(0)