Add file locking for -j parallel execution.

This commit is contained in:
Darko Poljak 2017-08-11 15:04:26 +02:00
parent 1fa37566cb
commit 75fe3272b3
5 changed files with 85 additions and 17 deletions

View File

@ -39,11 +39,9 @@ import cdist.hostsource
import cdist.exec.local import cdist.exec.local
import cdist.exec.remote import cdist.exec.remote
from cdist import inventory
import cdist.util.ipaddr as ipaddr import cdist.util.ipaddr as ipaddr
from cdist import core from cdist import core, inventory
from cdist.util.remoteutil import inspect_ssh_mux_opts from cdist.util.remoteutil import inspect_ssh_mux_opts

View File

@ -113,8 +113,8 @@ class Manifest(object):
'__target_host_tags': self.local.target_host_tags, '__target_host_tags': self.local.target_host_tags,
} }
if self.log.getEffectiveLevel() == logging.DEBUG: self.env.update(
self.env.update({'__cdist_debug': "yes"}) {'__cdist_loglevel': str(self.log.getEffectiveLevel())})
def _open_logger(self): def _open_logger(self):
self.log = logging.getLogger(self.target_host[0]) self.log = logging.getLogger(self.target_host[0])

View File

@ -28,6 +28,7 @@ import sys
import cdist import cdist
from cdist import core from cdist import core
from cdist import flock
class MissingRequiredEnvironmentVariableError(cdist.Error): class MissingRequiredEnvironmentVariableError(cdist.Error):
@ -94,20 +95,25 @@ class Emulator(object):
"""Emulate type commands (i.e. __file and co)""" """Emulate type commands (i.e. __file and co)"""
self.commandline() self.commandline()
self.setup_object() self.init_object()
self.save_stdin()
self.record_requirements() # locking for parallel execution
self.record_auto_requirements() with flock.Flock(self.flock_path) as lock:
self.log.trace("Finished %s %s" % ( self.setup_object()
self.cdist_object.path, self.parameters)) self.save_stdin()
self.record_requirements()
self.record_auto_requirements()
self.log.trace("Finished %s %s" % (
self.cdist_object.path, self.parameters))
def __init_log(self): def __init_log(self):
"""Setup logging facility""" """Setup logging facility"""
if '__cdist_debug' in self.env: if '__cdist_loglevel' in self.env:
logging.root.setLevel(logging.DEBUG) level = int(self.env['__cdist_loglevel'])
else: else:
logging.root.setLevel(logging.INFO) level = logging.OFF
logging.root.setLevel(level)
self.log = logging.getLogger(self.target_host[0]) self.log = logging.getLogger(self.target_host[0])
@ -150,8 +156,8 @@ class Emulator(object):
self.args = parser.parse_args(self.argv[1:]) self.args = parser.parse_args(self.argv[1:])
self.log.trace('Args: %s' % self.args) self.log.trace('Args: %s' % self.args)
def setup_object(self): def init_object(self):
# Setup object - and ensure it is not in args # Initialize object - and ensure it is not in args
if self.cdist_type.is_singleton: if self.cdist_type.is_singleton:
self.object_id = '' self.object_id = ''
else: else:
@ -162,7 +168,13 @@ class Emulator(object):
self.cdist_object = core.CdistObject( self.cdist_object = core.CdistObject(
self.cdist_type, self.object_base_path, self.object_marker, self.cdist_type, self.object_base_path, self.object_marker,
self.object_id) self.object_id)
lockfname = ('.' + self.cdist_type.name +
self.object_id + '_' +
self.object_marker + '.lock')
lockfname = lockfname.replace(os.sep, '_')
self.flock_path = os.path.join(self.object_base_path, lockfname)
def setup_object(self):
# Create object with given parameters # Create object with given parameters
self.parameters = {} self.parameters = {}
for key, value in vars(self.args).items(): for key, value in vars(self.args).items():

58
cdist/flock.py Normal file
View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
#
# 2017 Darko Poljak (darko.poljak at gmail.com)
#
# This file is part of cdist.
#
# cdist is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cdist is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cdist. If not, see <http://www.gnu.org/licenses/>.
#
#
import fcntl
import logging
import os
log = logging.getLogger('cdist-flock')
class Flock():
def __init__(self, path):
self.path = path
self.lockfd = None
def flock(self):
log.debug('Acquiring lock on %s', self.path)
self.lockfd = open(self.path, 'w+')
fcntl.flock(self.lockfd, fcntl.LOCK_EX)
log.debug('Acquired lock on %s', self.path)
def funlock(self):
log.debug('Releasing lock on %s', self.path)
fcntl.flock(self.lockfd, fcntl.LOCK_UN)
self.lockfd.close()
self.lockfd = None
try:
os.remove(self.path)
except FileNotFoundError:
pass
log.debug('Released lock on %s', self.path)
def __enter__(self):
self.flock()
return self
def __exit__(self, *args):
self.funlock()
return False

View File

@ -136,7 +136,7 @@ class ManifestTestCase(test.CdistTestCase):
current_level = self.log.getEffectiveLevel() current_level = self.log.getEffectiveLevel()
self.log.setLevel(logging.DEBUG) self.log.setLevel(logging.DEBUG)
manifest = cdist.core.manifest.Manifest(self.target_host, self.local) manifest = cdist.core.manifest.Manifest(self.target_host, self.local)
self.assertTrue("__cdist_debug" in manifest.env) self.assertTrue("__cdist_loglevel" in manifest.env)
self.log.setLevel(current_level) self.log.setLevel(current_level)