230 lines
8.7 KiB
Python
230 lines
8.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# 2011-2017 Steven Armstrong (steven-cdist at armstrong.cc)
|
|
# 2012-2015 Nico Schottelius (nico-cdist at schottelius.org)
|
|
#
|
|
# This file is part of cdist.
|
|
#
|
|
# cdist is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# cdist is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with cdist. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
#
|
|
|
|
import getpass
|
|
import os
|
|
import shutil
|
|
import logging
|
|
import re
|
|
|
|
import cdist
|
|
from cdist import core
|
|
from cdist import test
|
|
from cdist.exec import local
|
|
from cdist.exec import remote
|
|
from cdist.core import code
|
|
|
|
import os.path as op
|
|
my_dir = op.abspath(op.dirname(__file__))
|
|
fixtures = op.join(my_dir, 'fixtures')
|
|
conf_dir = op.join(fixtures, 'conf')
|
|
|
|
class Burried:
|
|
'''This a bogus -alas needed- wrapper class whose sole
|
|
purpose is to hide the inner classes from `unittest`
|
|
that would otherwise try to run them, even though
|
|
they have now become abstractish base classes.
|
|
'''
|
|
|
|
class CodeTestCase(test.CdistTestCase):
|
|
'''
|
|
This is now a base class (which should not be invoked by unittest).
|
|
|
|
A couple tests had to be refactored, without any behavioral changes,
|
|
so as to suit POLYGLOT testing.
|
|
'''
|
|
|
|
def setUp(self):
|
|
self.setup_for_type('__dump_environment')
|
|
|
|
def setup_for_type(self, tested_type_name):
|
|
self.local_dir = self.mkdtemp()
|
|
self.hostdir = cdist.str_hash(self.target_host[0])
|
|
self.host_base_path = os.path.join(self.local_dir, self.hostdir)
|
|
|
|
self.local = local.Local(
|
|
target_host=self.target_host,
|
|
target_host_tags=self.target_host_tags,
|
|
base_root_path=self.host_base_path,
|
|
host_dir_name=self.hostdir,
|
|
exec_path=cdist.test.cdist_exec_path,
|
|
add_conf_dirs=[conf_dir])
|
|
self.local.create_files_dirs()
|
|
|
|
self.remote_dir = self.mkdtemp()
|
|
remote_exec = self.remote_exec
|
|
remote_copy = self.remote_copy
|
|
self.remote = remote.Remote(
|
|
target_host=self.target_host,
|
|
remote_exec=remote_exec,
|
|
remote_copy=remote_copy,
|
|
base_path=self.remote_dir,
|
|
stdout_base_path=self.local.stdout_base_path,
|
|
stderr_base_path=self.local.stderr_base_path)
|
|
self.remote.create_files_dirs()
|
|
|
|
self.code = code.Code(self.target_host, self.local, self.remote)
|
|
|
|
self.cdist_type = core.CdistType(self.local.type_path,
|
|
tested_type_name)
|
|
self.cdist_object = core.CdistObject(
|
|
self.cdist_type, self.local.object_path, 'whatever',
|
|
self.local.object_marker_name)
|
|
self.cdist_object.create()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.local_dir)
|
|
shutil.rmtree(self.remote_dir)
|
|
|
|
def _expected_environment(self):
|
|
expected = {
|
|
'__target_host' : self.local.target_host[0],
|
|
'__target_hostname' : self.local.target_host[1],
|
|
'__target_fqdn' : self.local.target_host[2],
|
|
'__global' : self.local.base_path,
|
|
'__type' : self.cdist_type.absolute_path,
|
|
'__object' : self.cdist_object.absolute_path,
|
|
'__object_id' : self.cdist_object.object_id,
|
|
'__object_name' : self.cdist_object.name,
|
|
'__files' : self.local.files_path,
|
|
'__target_host_tags' : self.local.target_host_tags,
|
|
'__cdist_log_level': str(logging.WARNING),
|
|
'__cdist_log_level_name' : 'WARNING'
|
|
}
|
|
return expected
|
|
|
|
# Keeping around older simplex parsing logic for a while
|
|
def _parse_env_from_generated_code(self, script_text):
|
|
output_dict = {}
|
|
for line in script_text.split('\n'):
|
|
if line:
|
|
junk, value = line.split(': ')
|
|
key = junk.split(' ')[1]
|
|
output_dict[key] = value
|
|
return output_dict
|
|
|
|
def test_run_gencode_local_environment(self):
|
|
script_text = self.code.run_gencode_local(self.cdist_object)
|
|
|
|
self.assertDictContainsSubset(
|
|
self._expected_environment(),
|
|
self._parse_env_from_generated_code(script_text)
|
|
)
|
|
|
|
def test_run_gencode_remote_environment(self):
|
|
script_text = self.code.run_gencode_remote(self.cdist_object)
|
|
|
|
self.assertDictContainsSubset(
|
|
self._expected_environment(),
|
|
self._parse_env_from_generated_code(script_text)
|
|
)
|
|
|
|
def test_transfer_code_remote(self):
|
|
self.cdist_object.code_remote = self.code.run_gencode_remote(
|
|
self.cdist_object)
|
|
self.code.transfer_code_remote(self.cdist_object)
|
|
destination = os.path.join(self.remote.object_path,
|
|
self.cdist_object.code_remote_path)
|
|
self.assertTrue(os.path.isfile(destination))
|
|
|
|
def test_run_code_local(self):
|
|
self.cdist_object.code_local = self.code.run_gencode_local(
|
|
self.cdist_object)
|
|
self.code.run_code_local(self.cdist_object)
|
|
|
|
def test_run_code_remote_environment(self):
|
|
self.cdist_object.code_remote = self.code.run_gencode_remote(
|
|
self.cdist_object)
|
|
self.code.transfer_code_remote(self.cdist_object)
|
|
self.code.run_code_remote(self.cdist_object)
|
|
|
|
class CodeTestCasePolyglot(CodeTestCase):
|
|
'''Base class for testing generetaed polyglot code
|
|
|
|
The assumption here is that, if cdist works for Perl code,
|
|
it should work for any script language, as long as the
|
|
corresponding interpretor (given on the shebang line)
|
|
is available.
|
|
'''
|
|
def _parse_env_from_generated_code(self, script_text):
|
|
'''
|
|
New parsing logic allows for some simple generated perlish code
|
|
(as well as basic shell code).
|
|
|
|
This implementation can actually be moved up the parent class
|
|
as a drop in replacement of the older parser.
|
|
|
|
NOTE that this kind of parsing is necesarily fragile and sloppy.
|
|
It has been factored out and can easily be overriden, if needed.
|
|
'''
|
|
output_dict = {}
|
|
for line in script_text.split('\n'):
|
|
line = line.strip()
|
|
|
|
if not line:
|
|
continue # Ignore blank lines
|
|
|
|
if bool(re.match('^#', line)):
|
|
continue # Ignore lines consisting of only comments
|
|
|
|
if not bool(re.search(': ', line)):
|
|
continue # Ignore lines that do not contain our splitter pattern.
|
|
|
|
junk, value = line.split(': ')
|
|
key = junk.split(' ')[1]
|
|
|
|
# Remove leading quotes (single or double)
|
|
key = re.sub('^\s*["\']+\s*', '', key)
|
|
|
|
# Remove trailing quotes
|
|
# and any eventual whitespace escape sequences just before
|
|
# XXX: Why do we need to double escape the backslash (4 in all)
|
|
value = re.sub('\s*([\\\\]+[trn])*\s*["\']*\s*[;]*\s*$', '', value)
|
|
|
|
output_dict[key] = value
|
|
|
|
return output_dict
|
|
|
|
# Actual Test Case classes (visible to unittest)
|
|
class CodeTestCase(Burried.CodeTestCase):
|
|
''' Older tests that cover the monoglot scenario (shell generates shell)
|
|
gencode-* is typically written for /bin/sh (which, in turn, must generate shell code).
|
|
'''
|
|
def setUp(self):
|
|
self.setup_for_type('__dump_environment')
|
|
|
|
class CodeTestCase_polyglot_perl_generates_perl(Burried.CodeTestCasePolyglot):
|
|
def setUp(self):
|
|
self.setup_for_type('__dump_environment_polyglot_perl_generates_perl')
|
|
|
|
class CodeTestCase_polyglot_perl_generates_shell(Burried.CodeTestCasePolyglot):
|
|
def setUp(self):
|
|
self.setup_for_type('__dump_environment_polyglot_perl_generates_shell')
|
|
|
|
class CodeTestCase_polyglot_shell_generates_perl(Burried.CodeTestCasePolyglot):
|
|
def setUp(self):
|
|
self.setup_for_type('__dump_environment_polyglot_shell_generates_perl')
|
|
|
|
if __name__ == '__main__':
|
|
import unittest
|
|
unittest.main()
|