Add jobs option for parallel execution, global explorers first.

This commit is contained in:
Darko Poljak 2016-08-11 23:54:31 +02:00
parent 8d6e0760dc
commit a4c49201c0
4 changed files with 89 additions and 8 deletions

View file

@ -70,15 +70,16 @@ def inspect_ssh_mux_opts():
class Config(object): class Config(object):
"""Cdist main class to hold arbitrary data""" """Cdist main class to hold arbitrary data"""
def __init__(self, local, remote, dry_run=False): def __init__(self, local, remote, dry_run=False, jobs=None):
self.local = local self.local = local
self.remote = remote self.remote = remote
self.log = logging.getLogger(self.local.target_host) self.log = logging.getLogger(self.local.target_host)
self.dry_run = dry_run self.dry_run = dry_run
self.jobs = jobs
self.explorer = core.Explorer(self.local.target_host, self.local, self.explorer = core.Explorer(self.local.target_host, self.local,
self.remote) self.remote, jobs=self.jobs)
self.manifest = core.Manifest(self.local.target_host, self.local) self.manifest = core.Manifest(self.local.target_host, self.local)
self.code = core.Code(self.local.target_host, self.local, self.remote) self.code = core.Code(self.local.target_host, self.local, self.remote)
@ -241,7 +242,7 @@ class Config(object):
remote_exec=remote_exec, remote_exec=remote_exec,
remote_copy=remote_copy) remote_copy=remote_copy)
c = cls(local, remote, dry_run=args.dry_run) c = cls(local, remote, dry_run=args.dry_run, jobs=args.jobs)
c.run() c.run()
except cdist.Error as e: except cdist.Error as e:

View file

@ -23,6 +23,7 @@
import logging import logging
import os import os
import glob import glob
import multiprocessing
import cdist import cdist
@ -65,7 +66,7 @@ class Explorer(object):
"""Executes cdist explorers. """Executes cdist explorers.
""" """
def __init__(self, target_host, local, remote): def __init__(self, target_host, local, remote, jobs=None):
self.target_host = target_host self.target_host = target_host
self.log = logging.getLogger(target_host) self.log = logging.getLogger(target_host)
@ -77,6 +78,7 @@ class Explorer(object):
'__explorer': self.remote.global_explorer_path, '__explorer': self.remote.global_explorer_path,
} }
self._type_explorers_transferred = [] self._type_explorers_transferred = []
self.jobs = jobs
# global # global
@ -91,12 +93,38 @@ class Explorer(object):
""" """
self.log.info("Running global explorers") self.log.info("Running global explorers")
self.transfer_global_explorers() self.transfer_global_explorers()
for explorer in self.list_global_explorer_names(): if self.jobs is None:
self._run_global_explorers_seq(out_path)
else:
self._run_global_explorers_parallel(out_path)
def _run_global_explorer(self, explorer, out_path):
output = self.run_global_explorer(explorer) output = self.run_global_explorer(explorer)
path = os.path.join(out_path, explorer) path = os.path.join(out_path, explorer)
with open(path, 'w') as fd: with open(path, 'w') as fd:
fd.write(output) fd.write(output)
def _run_global_explorers_seq(self, out_path):
self.log.info("Running global explorers sequentially")
for explorer in self.list_global_explorer_names():
self._run_global_explorer(explorer, out_path)
def _run_global_explorers_parallel(self, out_path):
self.log.info("Running global explorers in {} parallel jobs".format(
self.jobs))
self.log.info("Starting multiprocessing Pool")
with multiprocessing.Pool(self.jobs) as pool:
self.log.info("Starting async global explorer run")
results = [
pool.apply_async(self._run_global_explorer, (e, out_path,))
for e in self.list_global_explorer_names()
]
self.log.info("Waiting async global explorer run results")
for r in results:
r.get()
self.log.info("Async global explorer run finished")
self.log.info("Multiprocessing Pool finished")
def transfer_global_explorers(self): def transfer_global_explorers(self):
"""Transfer the global explorers to the remote side.""" """Transfer the global explorers to the remote side."""
self.remote.mkdir(self.remote.global_explorer_path) self.remote.mkdir(self.remote.global_explorer_path)

View file

@ -23,6 +23,7 @@
import os import os
import shutil import shutil
import getpass import getpass
import multiprocessing
import cdist import cdist
from cdist import core from cdist import core
@ -168,3 +169,34 @@ class ExplorerClassTestCase(test.CdistTestCase):
cdist_object.create() cdist_object.create()
self.explorer.run_type_explorers(cdist_object) self.explorer.run_type_explorers(cdist_object)
self.assertEqual(cdist_object.explorers, {'world': 'hello'}) self.assertEqual(cdist_object.explorers, {'world': 'hello'})
def test_jobs_parameter(self):
self.assertIsNone(self.explorer.jobs)
expl = explorer.Explorer(
self.target_host,
self.local,
self.remote,
jobs=8)
self.assertEqual(expl.jobs, 8)
def test_run_parallel_jobs(self):
expl = explorer.Explorer(
self.target_host,
self.local,
self.remote,
jobs=multiprocessing.cpu_count())
self.assertIsNotNone(expl.jobs)
out_path = self.mkdtemp()
expl.run_global_explorers(out_path)
names = sorted(expl.list_global_explorer_names())
output = sorted(os.listdir(out_path))
self.assertEqual(names, output)
shutil.rmtree(out_path)
if __name__ == '__main__':
import unittest
unittest.main()

View file

@ -22,6 +22,20 @@
# #
def check_positive_int(value):
import argparse
try:
val = int(value)
except ValueError as e:
raise argparse.ArgumentTypeError(
"{} is invalid int value".format(value))
if val <= 0:
raise argparse.ArgumentTypeError(
"{} is invalid positive int value".format(val))
return val
def commandline(): def commandline():
"""Parse command line""" """Parse command line"""
import argparse import argparse
@ -31,6 +45,7 @@ def commandline():
import cdist.shell import cdist.shell
import shutil import shutil
import os import os
import multiprocessing
# Construct parser others can reuse # Construct parser others can reuse
parser = {} parser = {}
@ -105,6 +120,11 @@ def commandline():
'(should behave like ssh)'), '(should behave like ssh)'),
action='store', dest='remote_exec', action='store', dest='remote_exec',
default=os.environ.get('CDIST_REMOTE_EXEC')) default=os.environ.get('CDIST_REMOTE_EXEC'))
parser['config'].add_argument(
'-j', '--jobs', nargs='?', type=check_positive_int,
help='Specify the maximum number of parallel jobs',
action='store', dest='jobs',
const=multiprocessing.cpu_count())
parser['config'].set_defaults(func=cdist.config.Config.commandline) parser['config'].set_defaults(func=cdist.config.Config.commandline)
# Shell # Shell