Merge pull request #543 from darko-poljak/ssh-mux-sigpipe

Fix ssh connection multiplexing race condition #542
This commit is contained in:
Darko Poljak 2017-08-10 23:42:16 +02:00 committed by GitHub
commit 2b0e8160cf
3 changed files with 35 additions and 5 deletions

View file

@ -44,6 +44,7 @@ BANNER = """
REMOTE_COPY = "scp -o User=root" REMOTE_COPY = "scp -o User=root"
REMOTE_EXEC = "ssh -o User=root" REMOTE_EXEC = "ssh -o User=root"
REMOTE_CMDS_CLEANUP_PATTERN = "ssh -o User=root -O exit -S {}"
class Error(Exception): class Error(Exception):

View file

@ -50,13 +50,15 @@ from cdist.util.remoteutil import inspect_ssh_mux_opts
class Config(object): class Config(object):
"""Cdist main class to hold arbitrary data""" """Cdist main class to hold arbitrary data"""
def __init__(self, local, remote, dry_run=False, jobs=None): def __init__(self, local, remote, dry_run=False, jobs=None,
cleanup_cmds=[]):
self.local = local self.local = local
self.remote = remote self.remote = remote
self._open_logger() self._open_logger()
self.dry_run = dry_run self.dry_run = dry_run
self.jobs = jobs self.jobs = jobs
self.cleanup_cmds = cleanup_cmds
self.explorer = core.Explorer(self.local.target_host, self.local, self.explorer = core.Explorer(self.local.target_host, self.local,
self.remote, jobs=self.jobs) self.remote, jobs=self.jobs)
@ -94,6 +96,11 @@ class Config(object):
args.remote_exec_pattern = cdist.REMOTE_EXEC + mux_opts args.remote_exec_pattern = cdist.REMOTE_EXEC + mux_opts
if args_dict['remote_copy'] is None: if args_dict['remote_copy'] is None:
args.remote_copy_pattern = cdist.REMOTE_COPY + mux_opts args.remote_copy_pattern = cdist.REMOTE_COPY + mux_opts
if mux_opts:
cleanup_pattern = cdist.REMOTE_CMDS_CLEANUP_PATTERN
else:
cleanup_pattern = ""
args.remote_cmds_cleanup_pattern = cleanup_pattern
@classmethod @classmethod
def _check_and_prepare_args(cls, args): def _check_and_prepare_args(cls, args):
@ -265,7 +272,12 @@ class Config(object):
remote_copy = args.remote_copy_pattern.format(control_path) remote_copy = args.remote_copy_pattern.format(control_path)
else: else:
remote_copy = args.remote_copy remote_copy = args.remote_copy
return (remote_exec, remote_copy, ) if args.remote_cmds_cleanup_pattern:
remote_cmds_cleanup = args.remote_cmds_cleanup_pattern.format(
control_path)
else:
remote_cmds_cleanup = ""
return (remote_exec, remote_copy, remote_cmds_cleanup, )
@classmethod @classmethod
def onehost(cls, host, host_tags, host_base_path, host_dir_name, args, def onehost(cls, host, host_tags, host_base_path, host_dir_name, args,
@ -278,7 +290,8 @@ class Config(object):
log = logging.getLogger(host) log = logging.getLogger(host)
try: try:
remote_exec, remote_copy = cls._resolve_remote_cmds(args) remote_exec, remote_copy, cleanup_cmd = cls._resolve_remote_cmds(
args)
log.debug("remote_exec for host \"{}\": {}".format( log.debug("remote_exec for host \"{}\": {}".format(
host, remote_exec)) host, remote_exec))
log.debug("remote_copy for host \"{}\": {}".format( log.debug("remote_copy for host \"{}\": {}".format(
@ -305,7 +318,11 @@ class Config(object):
quiet_mode=args.quiet, quiet_mode=args.quiet,
archiving_mode=args.use_archiving) archiving_mode=args.use_archiving)
c = cls(local, remote, dry_run=args.dry_run, jobs=args.jobs) cleanup_cmds = []
if cleanup_cmd:
cleanup_cmds.append(cleanup_cmd)
c = cls(local, remote, dry_run=args.dry_run, jobs=args.jobs,
cleanup_cmds=cleanup_cmds)
c.run() c.run()
except cdist.Error as e: except cdist.Error as e:
@ -345,11 +362,23 @@ class Config(object):
self.explorer.run_global_explorers(self.local.global_explorer_out_path) self.explorer.run_global_explorers(self.local.global_explorer_out_path)
self.manifest.run_initial_manifest(self.local.initial_manifest) self.manifest.run_initial_manifest(self.local.initial_manifest)
self.iterate_until_finished() self.iterate_until_finished()
self.cleanup()
self.local.save_cache(start_time) self.local.save_cache(start_time)
self.log.info("Finished successful run in {:.2f} seconds".format( self.log.info("Finished successful run in {:.2f} seconds".format(
time.time() - start_time)) time.time() - start_time))
def cleanup(self):
self.log.debug("Running cleanup commands")
for cleanup_cmd in self.cleanup_cmds:
cmd = cleanup_cmd.split()
cmd.append(self.local.target_host[0])
try:
self.local.run(cmd, return_output=False, save_output=False)
except cdist.Error as e:
# Log warning but continue.
self.log.warning("Cleanup command failed: %s", e)
def object_list(self): def object_list(self):
"""Short name for object list retrieval""" """Short name for object list retrieval"""
for cdist_object in core.CdistObject.list_objects( for cdist_object in core.CdistObject.list_objects(

View file

@ -36,7 +36,7 @@ def inspect_ssh_mux_opts():
wanted_mux_opts = { wanted_mux_opts = {
"ControlPath": "{}", "ControlPath": "{}",
"ControlMaster": "auto", "ControlMaster": "auto",
"ControlPersist": "30", "ControlPersist": "2h",
} }
mux_opts = " ".join([" -o {}={}".format( mux_opts = " ".join([" -o {}={}".format(
x, wanted_mux_opts[x]) for x in wanted_mux_opts]) x, wanted_mux_opts[x]) for x in wanted_mux_opts])