Transfer and run global explorers in parallel.
This commit is contained in:
		
					parent
					
						
							
								a4c49201c0
							
						
					
				
			
			
				commit
				
					
						5f436f21b8
					
				
			
		
					 6 changed files with 131 additions and 13 deletions
				
			
		| 
						 | 
				
			
			@ -69,7 +69,7 @@ class Explorer(object):
 | 
			
		|||
    def __init__(self, target_host, local, remote, jobs=None):
 | 
			
		||||
        self.target_host = target_host
 | 
			
		||||
 | 
			
		||||
        self.log = logging.getLogger(target_host)
 | 
			
		||||
        self._open_logger()
 | 
			
		||||
 | 
			
		||||
        self.local = local
 | 
			
		||||
        self.remote = remote
 | 
			
		||||
| 
						 | 
				
			
			@ -80,6 +80,9 @@ class Explorer(object):
 | 
			
		|||
        self._type_explorers_transferred = []
 | 
			
		||||
        self.jobs = jobs
 | 
			
		||||
 | 
			
		||||
    def _open_logger(self):
 | 
			
		||||
        self.log = logging.getLogger(self.target_host)
 | 
			
		||||
 | 
			
		||||
    # global
 | 
			
		||||
 | 
			
		||||
    def list_global_explorer_names(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -112,24 +115,46 @@ class Explorer(object):
 | 
			
		|||
    def _run_global_explorers_parallel(self, out_path):
 | 
			
		||||
        self.log.info("Running global explorers in {} parallel jobs".format(
 | 
			
		||||
            self.jobs))
 | 
			
		||||
        self.log.info("Starting multiprocessing Pool")
 | 
			
		||||
        self.log.debug("Multiprocessing start method is {}".format(
 | 
			
		||||
            multiprocessing.get_start_method()))
 | 
			
		||||
        self.log.info(("Starting multiprocessing Pool for global "
 | 
			
		||||
                       "explorers run"))
 | 
			
		||||
        with multiprocessing.Pool(self.jobs) as pool:
 | 
			
		||||
            self.log.info("Starting async global explorer run")
 | 
			
		||||
            self.log.info("Starting async for global explorer run")
 | 
			
		||||
            results = [
 | 
			
		||||
                pool.apply_async(self._run_global_explorer, (e, out_path,))
 | 
			
		||||
                for e in self.list_global_explorer_names()
 | 
			
		||||
            ]
 | 
			
		||||
            self.log.info("Waiting async global explorer run results")
 | 
			
		||||
 | 
			
		||||
            self.log.info("Waiting async results for global explorer runs")
 | 
			
		||||
            for r in results:
 | 
			
		||||
                r.get()
 | 
			
		||||
            self.log.info("Async global explorer run finished")
 | 
			
		||||
        self.log.info("Multiprocessing Pool finished")
 | 
			
		||||
                r.get()  # self._run_global_explorer returns None
 | 
			
		||||
            self.log.info(("Multiprocessing run for global explorers "
 | 
			
		||||
                           "finished"))
 | 
			
		||||
 | 
			
		||||
    # logger is not pickable, so remove it when we pickle
 | 
			
		||||
    def __getstate__(self):
 | 
			
		||||
        state = self.__dict__.copy()
 | 
			
		||||
        if 'log' in state:
 | 
			
		||||
            del state['log']
 | 
			
		||||
        return state
 | 
			
		||||
 | 
			
		||||
    # recreate logger when we unpickle
 | 
			
		||||
    def __setstate__(self, state):
 | 
			
		||||
        self.__dict__.update(state)
 | 
			
		||||
        self._open_logger()
 | 
			
		||||
 | 
			
		||||
    def transfer_global_explorers(self):
 | 
			
		||||
        """Transfer the global explorers to the remote side."""
 | 
			
		||||
        self.remote.mkdir(self.remote.global_explorer_path)
 | 
			
		||||
        if self.jobs is None:
 | 
			
		||||
            self.remote.transfer(self.local.global_explorer_path,
 | 
			
		||||
                                 self.remote.global_explorer_path)
 | 
			
		||||
        else:
 | 
			
		||||
            self.remote.transfer_dir_parallel(
 | 
			
		||||
                    self.local.global_explorer_path,
 | 
			
		||||
                    self.remote.global_explorer_path,
 | 
			
		||||
                    self.jobs)
 | 
			
		||||
        self.remote.run(["chmod", "0700",
 | 
			
		||||
                         "%s/*" % (self.remote.global_explorer_path)])
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -85,6 +85,18 @@ class Local(object):
 | 
			
		|||
    def _init_log(self):
 | 
			
		||||
        self.log = logging.getLogger(self.target_host)
 | 
			
		||||
 | 
			
		||||
    # logger is not pickable, so remove it when we pickle
 | 
			
		||||
    def __getstate__(self):
 | 
			
		||||
        state = self.__dict__.copy()
 | 
			
		||||
        if 'log' in state:
 | 
			
		||||
            del state['log']
 | 
			
		||||
        return state
 | 
			
		||||
 | 
			
		||||
    # recreate logger when we unpickle
 | 
			
		||||
    def __setstate__(self, state):
 | 
			
		||||
        self.__dict__.update(state)
 | 
			
		||||
        self._init_log()
 | 
			
		||||
 | 
			
		||||
    def _init_permissions(self):
 | 
			
		||||
        # Setup file permissions using umask
 | 
			
		||||
        os.umask(0o077)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -26,9 +26,10 @@ import sys
 | 
			
		|||
import glob
 | 
			
		||||
import subprocess
 | 
			
		||||
import logging
 | 
			
		||||
import cdist.exec.util as exec_util
 | 
			
		||||
import multiprocessing
 | 
			
		||||
 | 
			
		||||
import cdist
 | 
			
		||||
import cdist.exec.util as exec_util
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DecodeError(cdist.Error):
 | 
			
		||||
| 
						 | 
				
			
			@ -66,10 +67,25 @@ class Remote(object):
 | 
			
		|||
        self.type_path = os.path.join(self.conf_path, "type")
 | 
			
		||||
        self.global_explorer_path = os.path.join(self.conf_path, "explorer")
 | 
			
		||||
 | 
			
		||||
        self.log = logging.getLogger(self.target_host)
 | 
			
		||||
        self._open_logger()
 | 
			
		||||
 | 
			
		||||
        self._init_env()
 | 
			
		||||
 | 
			
		||||
    def _open_logger(self):
 | 
			
		||||
        self.log = logging.getLogger(self.target_host)
 | 
			
		||||
 | 
			
		||||
    # logger is not pickable, so remove it when we pickle
 | 
			
		||||
    def __getstate__(self):
 | 
			
		||||
        state = self.__dict__.copy()
 | 
			
		||||
        if 'log' in state:
 | 
			
		||||
            del state['log']
 | 
			
		||||
        return state
 | 
			
		||||
 | 
			
		||||
    # recreate logger when we unpickle
 | 
			
		||||
    def __setstate__(self, state):
 | 
			
		||||
        self.__dict__.update(state)
 | 
			
		||||
        self._open_logger()
 | 
			
		||||
 | 
			
		||||
    def _init_env(self):
 | 
			
		||||
        """Setup environment for scripts - HERE????"""
 | 
			
		||||
        # FIXME: better do so in exec functions that require it!
 | 
			
		||||
| 
						 | 
				
			
			@ -110,6 +126,40 @@ class Remote(object):
 | 
			
		|||
                self.target_host, destination)])
 | 
			
		||||
            self._run_command(command)
 | 
			
		||||
 | 
			
		||||
    def transfer_dir_parallel(self, source, destination, jobs):
 | 
			
		||||
        """Transfer a directory to the remote side in parallel mode."""
 | 
			
		||||
        self.log.debug("Remote transfer: %s -> %s", source, destination)
 | 
			
		||||
        self.rmdir(destination)
 | 
			
		||||
        if os.path.isdir(source):
 | 
			
		||||
            self.mkdir(destination)
 | 
			
		||||
            self.log.info("Remote transfer in {} parallel jobs".format(
 | 
			
		||||
                jobs))
 | 
			
		||||
            self.log.debug("Multiprocessing start method is {}".format(
 | 
			
		||||
                multiprocessing.get_start_method()))
 | 
			
		||||
            self.log.info(("Starting multiprocessing Pool for parallel "
 | 
			
		||||
                           "remote transfer"))
 | 
			
		||||
            with multiprocessing.Pool(jobs) as pool:
 | 
			
		||||
                self.log.info("Starting async for parallel transfer")
 | 
			
		||||
                commands = []
 | 
			
		||||
                for f in glob.glob1(source, '*'):
 | 
			
		||||
                    command = self._copy.split()
 | 
			
		||||
                    path = os.path.join(source, f)
 | 
			
		||||
                    command.extend([path, '{0}:{1}'.format(
 | 
			
		||||
                        self.target_host, destination)])
 | 
			
		||||
                    commands.append(command)
 | 
			
		||||
                results = [
 | 
			
		||||
                    pool.apply_async(self._run_command, (cmd,))
 | 
			
		||||
                    for cmd in commands
 | 
			
		||||
                ]
 | 
			
		||||
 | 
			
		||||
                self.log.info("Waiting async results for parallel transfer")
 | 
			
		||||
                for r in results:
 | 
			
		||||
                    r.get()  # self._run_command returns None
 | 
			
		||||
                self.log.info(("Multiprocessing for parallel transfer "
 | 
			
		||||
                               "finished"))
 | 
			
		||||
        else:
 | 
			
		||||
            raise cdist.Error("Source {} is not a directory".format(source))
 | 
			
		||||
 | 
			
		||||
    def run_script(self, script, env=None, return_output=False):
 | 
			
		||||
        """Run the given script with the given environment on the remote side.
 | 
			
		||||
        Return the output as a string.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -24,6 +24,7 @@ import getpass
 | 
			
		|||
import shutil
 | 
			
		||||
import string
 | 
			
		||||
import random
 | 
			
		||||
import multiprocessing
 | 
			
		||||
 | 
			
		||||
import cdist
 | 
			
		||||
from cdist import test
 | 
			
		||||
| 
						 | 
				
			
			@ -121,6 +122,22 @@ class RemoteTestCase(test.CdistTestCase):
 | 
			
		|||
        # test if the payload file is in the target directory
 | 
			
		||||
        self.assertTrue(os.path.isfile(os.path.join(target, source_file_name)))
 | 
			
		||||
 | 
			
		||||
    def test_transfer_dir_parallel(self):
 | 
			
		||||
        source = self.mkdtemp(dir=self.temp_dir)
 | 
			
		||||
        # put 8 files in the directory as payload
 | 
			
		||||
        filenames = []
 | 
			
		||||
        for x in range(8):
 | 
			
		||||
            handle, source_file = self.mkstemp(dir=source)
 | 
			
		||||
            os.close(handle)
 | 
			
		||||
            source_file_name = os.path.split(source_file)[-1]
 | 
			
		||||
            filenames.append(source_file_name)
 | 
			
		||||
        target = self.mkdtemp(dir=self.temp_dir)
 | 
			
		||||
        self.remote.transfer_dir_parallel(source, target,
 | 
			
		||||
                                          multiprocessing.cpu_count())
 | 
			
		||||
        # test if the payload files are in the target directory
 | 
			
		||||
        for filename in filenames:
 | 
			
		||||
            self.assertTrue(os.path.isfile(os.path.join(target, filename)))
 | 
			
		||||
 | 
			
		||||
    def test_create_files_dirs(self):
 | 
			
		||||
        self.remote.create_files_dirs()
 | 
			
		||||
        self.assertTrue(os.path.isdir(self.remote.base_path))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -179,6 +179,20 @@ class ExplorerClassTestCase(test.CdistTestCase):
 | 
			
		|||
            jobs=8)
 | 
			
		||||
        self.assertEqual(expl.jobs, 8)
 | 
			
		||||
 | 
			
		||||
    def test_transfer_global_explorers_parallel(self):
 | 
			
		||||
        expl = explorer.Explorer(
 | 
			
		||||
            self.target_host,
 | 
			
		||||
            self.local,
 | 
			
		||||
            self.remote,
 | 
			
		||||
            jobs=multiprocessing.cpu_count())
 | 
			
		||||
        self.assertIsNotNone(expl.jobs)
 | 
			
		||||
 | 
			
		||||
        expl.transfer_global_explorers()
 | 
			
		||||
        source = self.local.global_explorer_path
 | 
			
		||||
        destination = self.remote.global_explorer_path
 | 
			
		||||
        self.assertEqual(sorted(os.listdir(source)),
 | 
			
		||||
                         sorted(os.listdir(destination)))
 | 
			
		||||
 | 
			
		||||
    def test_run_parallel_jobs(self):
 | 
			
		||||
        expl = explorer.Explorer(
 | 
			
		||||
            self.target_host,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,8 +1,8 @@
 | 
			
		|||
Changelog
 | 
			
		||||
---------
 | 
			
		||||
next:
 | 
			
		||||
	* New type __filesystem: manage filesystems on devices ( Daniel Heule )
 | 
			
		||||
 | 
			
		||||
	* Core: Add -j, --jobs option for parallel execution and add parallel support for global explorers (Darko Poljak)
 | 
			
		||||
	* New type __filesystem: manage filesystems on devices (Daniel Heule)
 | 
			
		||||
	* New type: __locale_system (Steven Armstrong, Carlos Ortigoza, Nico Schottelius)
 | 
			
		||||
	* New type: __sysctl (Steven Armstrong)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue