multiprocessing.Pool -> concurrent.futures.ProcessPoolExecutor

This commit is contained in:
Darko Poljak 2017-07-26 12:01:19 +02:00
parent 7c7a98d083
commit d1a044cc23
2 changed files with 49 additions and 24 deletions

View file

@ -124,6 +124,27 @@ class Config(object):
def commandline(cls, args): def commandline(cls, args):
"""Configure remote system""" """Configure remote system"""
# No new child process if only one host at a time.
if args.parallel == 1:
log.debug("Only 1 parallel process, doing it sequentially")
args.parallel = 0
if args.parallel or args.jobs:
# If parallel execution then also log process id
del logging.getLogger().handlers[:]
log_format = '%(levelname)s: [%(process)d]: %(message)s'
logging.basicConfig(format=log_format)
if args.parallel:
import signal
def sigterm_handler(signum, frame):
log.trace("signal %s, killing whole process group", signum)
os.killpg(os.getpgrp(), signal.SIGKILL)
signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGHUP, sigterm_handler)
# FIXME: Refactor relict - remove later # FIXME: Refactor relict - remove later
log = logging.getLogger("cdist") log = logging.getLogger("cdist")
@ -154,10 +175,6 @@ class Config(object):
cls.hosts(args.hostfile)) cls.hosts(args.hostfile))
process_args = [] process_args = []
# No new child process if only one host at a time.
if args.parallel == 1:
log.debug("Only 1 parallel process, doing it sequentially")
args.parallel = 0
if args.parallel: if args.parallel:
log.trace("Processing hosts in parallel") log.trace("Processing hosts in parallel")
else: else:
@ -216,8 +233,8 @@ class Config(object):
jobs=args.parallel) jobs=args.parallel)
log.trace(("Multiprocessing for parallel host operation " log.trace(("Multiprocessing for parallel host operation "
"finished")) "finished"))
log.trace(("Multiprocessing for parallel host operation " log.trace("Multiprocessing for parallel host operation "
"results: {}", results)) "results: %s", results)
failed_hosts = [host for host, result in results if not result] failed_hosts = [host for host, result in results if not result]
@ -301,13 +318,6 @@ class Config(object):
else: else:
raise raise
except KeyboardInterrupt:
# Ignore in parallel mode, we are existing anyway
if parallel:
sys.exit(0)
# Pass back to controlling code in sequential mode
else:
raise
if parallel: if parallel:
return (host, True, ) return (host, True, )

View file

@ -21,13 +21,20 @@
import multiprocessing import multiprocessing
import concurrent.futures as cf
import itertools import itertools
import os
import signal
import logging
log = logging.getLogger("cdist-mputil")
def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()):
""" Run func using multiprocessing.Pool with jobs jobs and supplied """Run func using concurrent.futures.ProcessPoolExecutor with jobs jobs
iterable of args and kwds with one entry for each parallel func and supplied iterables of args and kwds with one entry for each
instance. parallel func instance.
Return list of results. Return list of results.
""" """
if args and kwds: if args and kwds:
@ -39,10 +46,18 @@ def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()):
else: else:
return [func(), ] return [func(), ]
with multiprocessing.Pool(jobs) as pool: retval = []
with cf.ProcessPoolExecutor(jobs) as executor:
try:
results = [ results = [
pool.apply_async(func, a, k) executor.submit(func, *a, **k) for a, k in fargs
for a, k in fargs
] ]
retval = [r.get() for r in results] for f in cf.as_completed(results):
retval.append(f.result())
return retval return retval
except KeyboardInterrupt:
log.trace("KeyboardInterrupt, killing process group")
# When Ctrl+C in terminal then kill whole process group.
# Otherwise there remain processes in sleeping state.
os.killpg(os.getpgrp(), signal.SIGKILL)
raise