ugly->bad

This commit is contained in:
Darko Poljak 2016-12-04 20:27:42 +01:00
parent 2999f12698
commit 609977b7ff
6 changed files with 54 additions and 51 deletions

View File

@ -68,13 +68,13 @@ class CdistBetaRequired(cdist.Error):
err_msg = ("\'{}\' command is beta, but beta is " err_msg = ("\'{}\' command is beta, but beta is "
"not enabled. If you want to use it please enable beta " "not enabled. If you want to use it please enable beta "
"functionalities by using the -b/--enable-beta command " "functionalities by using the -b/--enable-beta command "
"line flag.") "line flag or setting CDIST_BETA env var.")
fmt_args = [self.command, ] fmt_args = [self.command, ]
else: else:
err_msg = ("\'{}\' argument of \'{}\' command is beta, but beta " err_msg = ("\'{}\' argument of \'{}\' command is beta, but beta "
"is not enabled. If you want to use it please enable " "is not enabled. If you want to use it please enable "
"beta functionalities by using the -b/--enable-beta " "beta functionalities by using the -b/--enable-beta "
"command line flag.") "command line flag or setting CDIST_BETA env var.")
fmt_args = [self.arg, self.command, ] fmt_args = [self.arg, self.command, ]
return err_msg.format(*fmt_args) return err_msg.format(*fmt_args)

View File

@ -149,14 +149,9 @@ class Explorer(object):
def transfer_global_explorers(self): def transfer_global_explorers(self):
"""Transfer the global explorers to the remote side.""" """Transfer the global explorers to the remote side."""
self.remote.mkdir(self.remote.global_explorer_path) self.remote.mkdir(self.remote.global_explorer_path)
if self.jobs is None: self.remote.transfer(self.local.global_explorer_path,
self.remote.transfer(self.local.global_explorer_path, self.remote.global_explorer_path,
self.remote.global_explorer_path) self.jobs)
else:
self.remote.transfer_dir_parallel(
self.local.global_explorer_path,
self.remote.global_explorer_path,
self.jobs)
self.remote.run(["chmod", "0700", self.remote.run(["chmod", "0700",
"%s/*" % (self.remote.global_explorer_path)]) "%s/*" % (self.remote.global_explorer_path)])

View File

@ -118,57 +118,59 @@ class Remote(object):
self.log.debug("Remote mkdir: %s", path) self.log.debug("Remote mkdir: %s", path)
self.run(["mkdir", "-p", path]) self.run(["mkdir", "-p", path])
def transfer(self, source, destination): def transfer(self, source, destination, jobs=None):
"""Transfer a file or directory to the remote side.""" """Transfer a file or directory to the remote side."""
self.log.debug("Remote transfer: %s -> %s", source, destination) self.log.debug("Remote transfer: %s -> %s", source, destination)
self.rmdir(destination) self.rmdir(destination)
if os.path.isdir(source): if os.path.isdir(source):
self.mkdir(destination) self.mkdir(destination)
for f in glob.glob1(source, '*'): if jobs:
command = self._copy.split() self._transfer_dir_parallel(source, destination, jobs)
path = os.path.join(source, f) else:
command.extend([path, '{0}:{1}'.format( self._transfer_dir_sequential(source, destination)
_wrap_addr(self.target_host[0]), destination)]) elif jobs:
self._run_command(command) raise cdist.Error("Source {} is not a directory".format(source))
else: else:
command = self._copy.split() command = self._copy.split()
command.extend([source, '{0}:{1}'.format( command.extend([source, '{0}:{1}'.format(
_wrap_addr(self.target_host[0]), destination)]) _wrap_addr(self.target_host[0]), destination)])
self._run_command(command) self._run_command(command)
def transfer_dir_parallel(self, source, destination, jobs): def _transfer_dir_sequential(self, source, destination):
"""Transfer a directory to the remote side in parallel mode.""" for f in glob.glob1(source, '*'):
self.log.debug("Remote transfer: %s -> %s", source, destination) command = self._copy.split()
self.rmdir(destination) path = os.path.join(source, f)
if os.path.isdir(source): command.extend([path, '{0}:{1}'.format(
self.mkdir(destination) _wrap_addr(self.target_host[0]), destination)])
self.log.info("Remote transfer in {} parallel jobs".format( self._run_command(command)
jobs))
self.log.debug("Multiprocessing start method is {}".format(
multiprocessing.get_start_method()))
self.log.debug(("Starting multiprocessing Pool for parallel "
"remote transfer"))
with multiprocessing.Pool(jobs) as pool:
self.log.debug("Starting async for parallel transfer")
commands = []
for f in glob.glob1(source, '*'):
command = self._copy.split()
path = os.path.join(source, f)
command.extend([path, '{0}:{1}'.format(
_wrap_addr(self.target_host[0]), destination)])
commands.append(command)
results = [
pool.apply_async(self._run_command, (cmd,))
for cmd in commands
]
self.log.debug("Waiting async results for parallel transfer") def _transfer_dir_parallel(self, source, destination, jobs):
for r in results: """Transfer a directory to the remote side in parallel mode."""
r.get() # self._run_command returns None self.log.info("Remote transfer in {} parallel jobs".format(
self.log.debug(("Multiprocessing for parallel transfer " jobs))
"finished")) self.log.debug("Multiprocessing start method is {}".format(
else: multiprocessing.get_start_method()))
raise cdist.Error("Source {} is not a directory".format(source)) self.log.debug(("Starting multiprocessing Pool for parallel "
"remote transfer"))
with multiprocessing.Pool(jobs) as pool:
self.log.debug("Starting async for parallel transfer")
commands = []
for f in glob.glob1(source, '*'):
command = self._copy.split()
path = os.path.join(source, f)
command.extend([path, '{0}:{1}'.format(
_wrap_addr(self.target_host[0]), destination)])
commands.append(command)
results = [
pool.apply_async(self._run_command, (cmd,))
for cmd in commands
]
self.log.debug("Waiting async results for parallel transfer")
for r in results:
r.get() # self._run_command returns None
self.log.debug(("Multiprocessing for parallel transfer "
"finished"))
def run_script(self, script, env=None, return_output=False): def run_script(self, script, env=None, return_output=False):
"""Run the given script with the given environment on the remote side. """Run the given script with the given environment on the remote side.

View File

@ -136,8 +136,8 @@ class RemoteTestCase(test.CdistTestCase):
source_file_name = os.path.split(source_file)[-1] source_file_name = os.path.split(source_file)[-1]
filenames.append(source_file_name) filenames.append(source_file_name)
target = self.mkdtemp(dir=self.temp_dir) target = self.mkdtemp(dir=self.temp_dir)
self.remote.transfer_dir_parallel(source, target, self.remote.transfer(source, target,
multiprocessing.cpu_count()) multiprocessing.cpu_count())
# test if the payload files are in the target directory # test if the payload files are in the target directory
for filename in filenames: for filename in filenames:
self.assertTrue(os.path.isfile(os.path.join(target, filename))) self.assertTrue(os.path.isfile(os.path.join(target, filename)))

View File

@ -273,4 +273,7 @@ CDIST_REMOTE_EXEC
CDIST_REMOTE_COPY CDIST_REMOTE_COPY
Use this command for remote copy (should behave like scp). Use this command for remote copy (should behave like scp).
CDIST_BETA
Enable beta functionalities.
eof eof

View File

@ -236,6 +236,9 @@ CDIST_REMOTE_EXEC
CDIST_REMOTE_COPY CDIST_REMOTE_COPY
Use this command for remote copy (should behave like scp). Use this command for remote copy (should behave like scp).
CDIST_BETA
Enable beta functionalities.
EXIT STATUS EXIT STATUS
----------- -----------
The following exit values shall be returned: The following exit values shall be returned: