cdist/cdist/core/cdist_object.py
Darko Poljak 7a0b697f4c Implement maintaining object relationship graph
For each object maintain parent-child relationship graph, i.e. list of
parent objects ('parents' property) and list of children objects ('children'
property).

Objects without parent(s) are objects specified in init manifest.
Objects without children are object of types that do not reuse other types.
2021-03-30 12:09:59 +02:00

293 lines
10 KiB
Python

# -*- coding: utf-8 -*-
#
# 2011-2017 Steven Armstrong (steven-cdist at armstrong.cc)
# 2011-2015 Nico Schottelius (nico-cdist at schottelius.org)
# 2014 Daniel Heule (hda at sfs.biz)
#
# This file is part of cdist.
#
# cdist is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cdist is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cdist. If not, see <http://www.gnu.org/licenses/>.
#
#
import os
import cdist
import cdist.core
from cdist.util import fsproperty
class IllegalObjectIdError(cdist.Error):
def __init__(self, object_id, message=None):
self.object_id = object_id
self.message = message or 'Illegal object id'
def __str__(self):
return '%s: %s' % (self.message, self.object_id)
class MissingObjectIdError(cdist.Error):
def __init__(self, type_name):
self.type_name = type_name
self.message = ("Type %s requires object id (is not a "
"singleton type)") % self.type_name
def __str__(self):
return '%s' % (self.message)
class CdistObject:
"""Represents a cdist object.
All interaction with objects in cdist should be done through this class.
Directly accessing an object through the file system from python code is
a bug.
"""
# Constants for use with Object.state
STATE_UNDEF = ""
STATE_PREPARED = "prepared"
STATE_RUNNING = "running"
STATE_DONE = "done"
def __init__(self, cdist_type, base_path, object_marker, object_id):
self.cdist_type = cdist_type # instance of Type
self.base_path = base_path
self.object_id = object_id
self.object_marker = object_marker
self.validate_object_id()
self.sanitise_object_id()
self.name = self.join_name(self.cdist_type.name, self.object_id)
self.path = os.path.join(self.cdist_type.path, self.object_id,
self.object_marker)
self.absolute_path = os.path.join(self.base_path, self.path)
self.code_local_path = os.path.join(self.path, "code-local")
self.code_remote_path = os.path.join(self.path, "code-remote")
self.parameter_path = os.path.join(self.path, "parameter")
self.stdout_path = os.path.join(self.absolute_path, "stdout")
self.stderr_path = os.path.join(self.absolute_path, "stderr")
@classmethod
def list_objects(cls, object_base_path, type_base_path, object_marker):
"""Return a list of object instances"""
for object_name in cls.list_object_names(
object_base_path, object_marker):
type_name, object_id = cls.split_name(object_name)
yield cls(cdist.core.CdistType(type_base_path, type_name),
base_path=object_base_path,
object_marker=object_marker,
object_id=object_id)
@classmethod
def list_object_names(cls, object_base_path, object_marker):
"""Return a list of object names"""
for path, dirs, files in os.walk(object_base_path):
if object_marker in dirs:
yield os.path.relpath(path, object_base_path)
@classmethod
def list_type_names(cls, object_base_path):
"""Return a list of type names"""
return cdist.core.listdir(object_base_path)
@staticmethod
def split_name(object_name):
"""split_name('__type_name/the/object_id')
->
('__type_name', 'the/object_id')
Split the given object name into it's type and object_id parts.
"""
type_name = object_name.split(os.sep)[0]
object_id = os.sep.join(object_name.split(os.sep)[1:])
return type_name, object_id
@staticmethod
def join_name(type_name, object_id):
"""join_name('__type_name', 'the/object_id')
->
__type_name/the/object_id'
Join the given type_name and object_id into an object name.
"""
return os.path.join(type_name, object_id)
def validate_object_id(self):
if self.cdist_type.is_singleton and self.object_id:
raise IllegalObjectIdError(('singleton objects can\'t have an '
'object_id'))
"""Validate the given object_id and raise IllegalObjectIdError
if it's not valid.
"""
if self.object_id:
if self.object_marker in self.object_id.split(os.sep):
raise IllegalObjectIdError(
self.object_id, ('object_id may not contain '
'\'%s\'') % self.object_marker)
if '//' in self.object_id:
raise IllegalObjectIdError(
self.object_id, 'object_id may not contain //')
_invalid_object_ids = ('.', '/', )
for ioid in _invalid_object_ids:
if self.object_id == ioid:
raise IllegalObjectIdError(
self.object_id,
'object_id may not be a {}'.format(ioid))
# If no object_id and type is not singleton => error out
if not self.object_id and not self.cdist_type.is_singleton:
raise MissingObjectIdError(self.cdist_type.name)
# Does not work:
# AttributeError:
# 'CdistObject' object has no attribute 'parameter_path'
# "Type %s is not a singleton type - missing object id
# (parameters: %s)" % (self.cdist_type.name, self.parameters))
def object_from_name(self, object_name):
"""Convenience method for creating an object instance from an object
name.
Mainly intended to create objects when resolving requirements.
e.g:
<CdistObject __foo/bar>.object_from_name('__other/object') ->
<CdistObject __other/object>
"""
base_path = self.base_path
type_path = self.cdist_type.base_path
object_marker = self.object_marker
type_name, object_id = self.split_name(object_name)
cdist_type = self.cdist_type.__class__(type_path, type_name)
return self.__class__(cdist_type, base_path, object_marker,
object_id=object_id)
def __repr__(self):
return '<CdistObject %s>' % self.name
def __eq__(self, other):
"""define equality as 'name is the same'"""
return self.name == other.name
def __hash__(self):
return hash(self.name)
def __lt__(self, other):
return isinstance(other, self.__class__) and self.name < other.name
def sanitise_object_id(self):
"""
Remove leading and trailing slash (one only)
"""
# Allow empty object id for singletons
if self.object_id:
# Remove leading slash
if self.object_id[0] == '/':
self.object_id = self.object_id[1:]
# Remove trailing slash
if self.object_id[-1] == '/':
self.object_id = self.object_id[:-1]
# FIXME: still needed?
@property
def explorer_path(self):
"""Create and return the relative path to this objects explorers"""
# create absolute path
path = os.path.join(self.absolute_path, "explorer")
if not os.path.isdir(path):
os.mkdir(path)
# return relative path
return os.path.join(self.path, "explorer")
requirements = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, 'require'))
autorequire = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, 'autorequire'))
parameters = fsproperty.DirectoryDictProperty(
lambda obj: os.path.join(obj.base_path, obj.parameter_path))
explorers = fsproperty.DirectoryDictProperty(
lambda obj: os.path.join(obj.base_path, obj.explorer_path))
state = fsproperty.FileStringProperty(
lambda obj: os.path.join(obj.absolute_path, "state"))
source = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, "source"))
code_local = fsproperty.FileStringProperty(
lambda obj: os.path.join(obj.base_path, obj.code_local_path))
code_remote = fsproperty.FileStringProperty(
lambda obj: os.path.join(obj.base_path, obj.code_remote_path))
typeorder = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, 'typeorder'))
typeorder_dep = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, 'typeorder_dep'))
# objects without parents are objects specified in init manifest
parents = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, 'parents'))
# objects without children are object of types that do not reuse other
# types
children = fsproperty.FileListProperty(
lambda obj: os.path.join(obj.absolute_path, 'children'))
def cleanup(self):
try:
os.remove(os.path.join(self.absolute_path, 'typeorder_dep'))
except FileNotFoundError:
pass
@property
def exists(self):
"""Checks wether this cdist object exists on the file systems."""
return os.path.exists(self.absolute_path)
def create(self, allow_overwrite=False):
"""Create this cdist object on the filesystem.
"""
try:
for path in (self.absolute_path,
os.path.join(self.base_path, self.parameter_path),
self.stdout_path,
self.stderr_path):
os.makedirs(path, exist_ok=allow_overwrite)
except EnvironmentError as error:
raise cdist.Error(('Error creating directories for cdist object: '
'%s: %s') % (self, error))
def requirements_unfinished(self, requirements):
"""Return state whether requirements are satisfied"""
object_list = []
for requirement in requirements:
cdist_object = self.object_from_name(requirement)
if not cdist_object.state == self.STATE_DONE:
object_list.append(cdist_object)
return object_list