From 23fbabe303ede09812fad7ea285a12aac518f7a6 Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Wed, 26 Jul 2017 17:39:07 +0200 Subject: [PATCH] Further improve parallel execution. --- cdist/config.py | 13 ++++--------- cdist/mputil.py | 12 +++++++----- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/cdist/config.py b/cdist/config.py index d6f8a482..5fbb818e 100644 --- a/cdist/config.py +++ b/cdist/config.py @@ -29,7 +29,7 @@ import itertools import tempfile import socket import multiprocessing -from cdist.mputil import mp_pool_run +from cdist.mputil import mp_pool_run, mp_sig_handler import atexit import shutil @@ -138,12 +138,8 @@ class Config(object): if args.parallel: import signal - def sigterm_handler(signum, frame): - log.trace("signal %s, killing whole process group", signum) - os.killpg(os.getpgrp(), signal.SIGKILL) - - signal.signal(signal.SIGTERM, sigterm_handler) - signal.signal(signal.SIGHUP, sigterm_handler) + signal.signal(signal.SIGTERM, mp_sig_handler) + signal.signal(signal.SIGHUP, mp_sig_handler) # FIXME: Refactor relict - remove later log = logging.getLogger("cdist") @@ -221,8 +217,7 @@ class Config(object): cls.onehost(*process_args[0]) except cdist.Error as e: failed_hosts.append(host) - # Catch errors in parallel mode when joining - if args.parallel: + elif args.parallel: log.trace("Multiprocessing start method is {}".format( multiprocessing.get_start_method())) log.trace(("Starting multiprocessing Pool for {} " diff --git a/cdist/mputil.py b/cdist/mputil.py index d823e192..56fcfe39 100644 --- a/cdist/mputil.py +++ b/cdist/mputil.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# 2016 Darko Poljak (darko.poljak at gmail.com) +# 2016-2017 Darko Poljak (darko.poljak at gmail.com) # # This file is part of cdist. # @@ -31,6 +31,11 @@ import logging log = logging.getLogger("cdist-mputil") +def mp_sig_handler(signum, frame): + log.trace("signal %s, SIGKILL whole process group", signum) + os.killpg(os.getpgrp(), signal.SIGKILL) + + def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): """Run func using concurrent.futures.ProcessPoolExecutor with jobs jobs and supplied iterables of args and kwds with one entry for each @@ -56,8 +61,5 @@ def mp_pool_run(func, args=None, kwds=None, jobs=multiprocessing.cpu_count()): retval.append(f.result()) return retval except KeyboardInterrupt: - log.trace("KeyboardInterrupt, killing process group") - # When Ctrl+C in terminal then kill whole process group. - # Otherwise there remain processes in sleeping state. - os.killpg(os.getpgrp(), signal.SIGKILL) + mp_sig_handler(signal.SIGINT, None) raise