From 609977b7ff8d5659622f3fff8494b8fd8b9aa427 Mon Sep 17 00:00:00 2001 From: Darko Poljak Date: Sun, 4 Dec 2016 20:27:42 +0100 Subject: [PATCH] ugly->bad --- cdist/__init__.py | 4 +- cdist/core/explorer.py | 11 ++--- cdist/exec/remote.py | 80 +++++++++++++++++---------------- cdist/test/exec/remote.py | 4 +- docs/src/cdist-reference.rst.sh | 3 ++ docs/src/man1/cdist.rst | 3 ++ 6 files changed, 54 insertions(+), 51 deletions(-) diff --git a/cdist/__init__.py b/cdist/__init__.py index b6f5c8cb..c142230c 100644 --- a/cdist/__init__.py +++ b/cdist/__init__.py @@ -68,13 +68,13 @@ class CdistBetaRequired(cdist.Error): err_msg = ("\'{}\' command is beta, but beta is " "not enabled. If you want to use it please enable beta " "functionalities by using the -b/--enable-beta command " - "line flag.") + "line flag or setting CDIST_BETA env var.") fmt_args = [self.command, ] else: err_msg = ("\'{}\' argument of \'{}\' command is beta, but beta " "is not enabled. If you want to use it please enable " "beta functionalities by using the -b/--enable-beta " - "command line flag.") + "command line flag or setting CDIST_BETA env var.") fmt_args = [self.arg, self.command, ] return err_msg.format(*fmt_args) diff --git a/cdist/core/explorer.py b/cdist/core/explorer.py index ef85431c..23996240 100644 --- a/cdist/core/explorer.py +++ b/cdist/core/explorer.py @@ -149,14 +149,9 @@ class Explorer(object): def transfer_global_explorers(self): """Transfer the global explorers to the remote side.""" self.remote.mkdir(self.remote.global_explorer_path) - if self.jobs is None: - self.remote.transfer(self.local.global_explorer_path, - self.remote.global_explorer_path) - else: - self.remote.transfer_dir_parallel( - self.local.global_explorer_path, - self.remote.global_explorer_path, - self.jobs) + self.remote.transfer(self.local.global_explorer_path, + self.remote.global_explorer_path, + self.jobs) self.remote.run(["chmod", "0700", "%s/*" % (self.remote.global_explorer_path)]) diff --git a/cdist/exec/remote.py b/cdist/exec/remote.py index 74a33f73..440aafa7 100644 --- a/cdist/exec/remote.py +++ b/cdist/exec/remote.py @@ -118,57 +118,59 @@ class Remote(object): self.log.debug("Remote mkdir: %s", path) self.run(["mkdir", "-p", path]) - def transfer(self, source, destination): + def transfer(self, source, destination, jobs=None): """Transfer a file or directory to the remote side.""" self.log.debug("Remote transfer: %s -> %s", source, destination) self.rmdir(destination) if os.path.isdir(source): self.mkdir(destination) - for f in glob.glob1(source, '*'): - command = self._copy.split() - path = os.path.join(source, f) - command.extend([path, '{0}:{1}'.format( - _wrap_addr(self.target_host[0]), destination)]) - self._run_command(command) + if jobs: + self._transfer_dir_parallel(source, destination, jobs) + else: + self._transfer_dir_sequential(source, destination) + elif jobs: + raise cdist.Error("Source {} is not a directory".format(source)) else: command = self._copy.split() command.extend([source, '{0}:{1}'.format( _wrap_addr(self.target_host[0]), destination)]) self._run_command(command) - def transfer_dir_parallel(self, source, destination, jobs): - """Transfer a directory to the remote side in parallel mode.""" - self.log.debug("Remote transfer: %s -> %s", source, destination) - self.rmdir(destination) - if os.path.isdir(source): - self.mkdir(destination) - self.log.info("Remote transfer in {} parallel jobs".format( - jobs)) - self.log.debug("Multiprocessing start method is {}".format( - multiprocessing.get_start_method())) - self.log.debug(("Starting multiprocessing Pool for parallel " - "remote transfer")) - with multiprocessing.Pool(jobs) as pool: - self.log.debug("Starting async for parallel transfer") - commands = [] - for f in glob.glob1(source, '*'): - command = self._copy.split() - path = os.path.join(source, f) - command.extend([path, '{0}:{1}'.format( - _wrap_addr(self.target_host[0]), destination)]) - commands.append(command) - results = [ - pool.apply_async(self._run_command, (cmd,)) - for cmd in commands - ] + def _transfer_dir_sequential(self, source, destination): + for f in glob.glob1(source, '*'): + command = self._copy.split() + path = os.path.join(source, f) + command.extend([path, '{0}:{1}'.format( + _wrap_addr(self.target_host[0]), destination)]) + self._run_command(command) - self.log.debug("Waiting async results for parallel transfer") - for r in results: - r.get() # self._run_command returns None - self.log.debug(("Multiprocessing for parallel transfer " - "finished")) - else: - raise cdist.Error("Source {} is not a directory".format(source)) + def _transfer_dir_parallel(self, source, destination, jobs): + """Transfer a directory to the remote side in parallel mode.""" + self.log.info("Remote transfer in {} parallel jobs".format( + jobs)) + self.log.debug("Multiprocessing start method is {}".format( + multiprocessing.get_start_method())) + self.log.debug(("Starting multiprocessing Pool for parallel " + "remote transfer")) + with multiprocessing.Pool(jobs) as pool: + self.log.debug("Starting async for parallel transfer") + commands = [] + for f in glob.glob1(source, '*'): + command = self._copy.split() + path = os.path.join(source, f) + command.extend([path, '{0}:{1}'.format( + _wrap_addr(self.target_host[0]), destination)]) + commands.append(command) + results = [ + pool.apply_async(self._run_command, (cmd,)) + for cmd in commands + ] + + self.log.debug("Waiting async results for parallel transfer") + for r in results: + r.get() # self._run_command returns None + self.log.debug(("Multiprocessing for parallel transfer " + "finished")) def run_script(self, script, env=None, return_output=False): """Run the given script with the given environment on the remote side. diff --git a/cdist/test/exec/remote.py b/cdist/test/exec/remote.py index 45dabb18..371d17e3 100644 --- a/cdist/test/exec/remote.py +++ b/cdist/test/exec/remote.py @@ -136,8 +136,8 @@ class RemoteTestCase(test.CdistTestCase): source_file_name = os.path.split(source_file)[-1] filenames.append(source_file_name) target = self.mkdtemp(dir=self.temp_dir) - self.remote.transfer_dir_parallel(source, target, - multiprocessing.cpu_count()) + self.remote.transfer(source, target, + multiprocessing.cpu_count()) # test if the payload files are in the target directory for filename in filenames: self.assertTrue(os.path.isfile(os.path.join(target, filename))) diff --git a/docs/src/cdist-reference.rst.sh b/docs/src/cdist-reference.rst.sh index 97b22473..4b94b858 100755 --- a/docs/src/cdist-reference.rst.sh +++ b/docs/src/cdist-reference.rst.sh @@ -273,4 +273,7 @@ CDIST_REMOTE_EXEC CDIST_REMOTE_COPY Use this command for remote copy (should behave like scp). + +CDIST_BETA + Enable beta functionalities. eof diff --git a/docs/src/man1/cdist.rst b/docs/src/man1/cdist.rst index 5daedcd4..08c856b1 100644 --- a/docs/src/man1/cdist.rst +++ b/docs/src/man1/cdist.rst @@ -236,6 +236,9 @@ CDIST_REMOTE_EXEC CDIST_REMOTE_COPY Use this command for remote copy (should behave like scp). +CDIST_BETA + Enable beta functionalities. + EXIT STATUS ----------- The following exit values shall be returned: