From 0b5cf2c057c4636bc9e7789a25739dcf2e2f8cc8 Mon Sep 17 00:00:00 2001 From: modulos Date: Wed, 31 May 2017 18:01:54 +0200 Subject: [PATCH] Cleanup and add tests --- opennebula_api/models.py | 133 ++++++++++++++++++++++++++++++- opennebula_api/tests.py | 166 +++++++++++++++------------------------ 2 files changed, 192 insertions(+), 107 deletions(-) diff --git a/opennebula_api/models.py b/opennebula_api/models.py index 3eac0b6e..423013e4 100644 --- a/opennebula_api/models.py +++ b/opennebula_api/models.py @@ -2,12 +2,14 @@ import oca import socket import logging +from oca.pool import WrongNameError +from oca.exceptions import OpenNebulaException from django.conf import settings from django.utils.functional import cached_property -from oca.pool import WrongNameError -from oca.exceptions import OpenNebulaException +from utils.models import CustomUser + logger = logging.getLogger(__name__) @@ -35,10 +37,33 @@ class OpenNebulaManager(): ) except: pass + def _get_client(self, user): + """Get a opennebula client object for a CustomUser object + + Args: + user (CustomUser): dynamicweb CustomUser object + + Returns: + oca.Client: Opennebula client object + + Raise: + ConnectionError: If the connection to the opennebula server can't be + established + """ + return oca.Client("{0}:{1}".format( + user.email, + user.password), + "{protocol}://{domain}:{port}{endpoint}".format( + protocol=settings.OPENNEBULA_PROTOCOL, + domain=settings.OPENNEBULA_DOMAIN, + port=settings.OPENNEBULA_PORT, + endpoint=settings.OPENNEBULA_ENDPOINT + )) def _get_opennebula_client(self, username, password): return oca.Client("{0}:{1}".format( username, + password), "{protocol}://{domain}:{port}{endpoint}".format( protocol=settings.OPENNEBULA_PROTOCOL, @@ -47,6 +72,69 @@ class OpenNebulaManager(): endpoint=settings.OPENNEBULA_ENDPOINT )) + def _get_user(self, user): + """Get the corresponding opennebula user for a CustomUser object + + Args: + user (CustomUser): dynamicweb CustomUser object + + Returns: + oca.User: Opennebula user object + + Raise: + WrongNameError: If no openebula user with this credentials exists + ConnectionError: If the connection to the opennebula server can't be + established + """ + user_pool = self._get_user_pool() + return user_pool.get_by_name(user.email) + + def create_user(self, user: CustomUser): + """Create a new opennebula user or a corresponding CustomUser object + + + Args: + user (CustomUser): dynamicweb CustomUser object + + Returns: + int: Return the opennebula user id + + Raises: + ConnectionError: If the connection to the opennebula server can't be + established + UserExistsError: If a user with this credeintals already exits on the + server + UserCredentialError: If a user with this email exists but the + password is worng + + """ + try: + self._get_user(user) + try: + self._get_client(self, user) + logger.debug('User already exists') + raise UserExistsError() + except OpenNebulaException as err: + logger.error('OpenNebulaException error: {0}'.format(err)) + logger.debug('User exists but password is wrong') + raise UserCredentialError() + + except WrongNameError: + user_id = self.oneadmin_client.call(oca.User.METHODS['allocate'], + user.email, user.password, 'core') + logger.debug('Created a user for CustomObject: {user} with user id = {u_id}', + user=user, + u_id=user_id + ) + return user_id + except ConnectionRefusedError: + logger.error('Could not connect to host: {host} via protocol {protocol}'.format( + host=settings.OPENNEBULA_DOMAIN, + protocol=settings.OPENNEBULA_PROTOCOL) + ) + raise ConnectionRefusedError + + def _get_or_create_user(self, email, password): try: user_pool = self._get_user_pool() @@ -77,7 +165,7 @@ class OpenNebulaManager(): host=settings.OPENNEBULA_DOMAIN, protocol=settings.OPENNEBULA_PROTOCOL) ) - raise ConnectionRefusedError + raise return user_pool def _get_vm_pool(self): @@ -350,3 +438,42 @@ class OpenNebulaManager(): self.opennebula_user.id, new_password ) + + def add_public_key(self, user, public_key='', replace=True): + """ + + Args: + user (CustomUser): Dynamicweb user + public_key (string): Public key to add to the user + replace (bool): Optional if True the new public key replaces the old + + Raises: + KeyExistsError: If replace is False and the user already has a + public key + WrongNameError: If no openebula user with this credentials exists + ConnectionError: If the connection to the opennebula server can't be + established + + Returns: + True if public_key was added + + """ + # TODO: Check if we can remove this first try because we basically just + # raise the possible Errors + try: + open_user = self._get_user(user) + try: + old_key = open_user.template.ssh_public_key + if not replace: + raise KeyExistsError() + + except AttributeError: + pass + self.oneadmin_client.call('user.update', open_user.id, + '{key}'.format(key=public_key)) + return True + except WrongNameError: + raise + + except ConnectionError: + raise diff --git a/opennebula_api/tests.py b/opennebula_api/tests.py index 14694e6f..bd744a66 100644 --- a/opennebula_api/tests.py +++ b/opennebula_api/tests.py @@ -1,35 +1,53 @@ +import socket +import random +import string + from django.test import TestCase -from .models import VirtualMachine, VirtualMachineTemplate, OpenNebulaManager + +from .models import OpenNebulaManager +from utils.models import CustomUser class OpenNebulaManagerTestCases(TestCase): """This class defines the test suite for the opennebula manager model.""" def setUp(self): """Define the test client and other test variables.""" - self.cores = 1 - self.memory = 1 - self.disk_size = 10.0 - self.email = 'test@test.com' - self.password = 'testtest' - self.manager = OpenNebulaManager(email=None, password=None, create_user=False) + self.email = '{}@ungleich.ch'.format(''.join(random.choices(string.ascii_uppercase, k=10))) + self.password = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20)) + + self.user = CustomUser.objects.create(name='test', email=self.email, + password=self.password) + + self.vm_specs = {} + self.vm_specs['cpu'] = 1 + self.vm_specs['memory'] = 2 + self.vm_specs['disk_size'] = 10 + + self.manager = OpenNebulaManager() - def test_model_can_connect_to_server(self): - """Test the opennebula manager model can connect to a server.""" + def test_connect_to_server(self): + """Test the opennebula manager can connect to a server.""" try: - user_pool = self.manager._get_user_pool() - except: - user_pool = None - self.assertFalse(user_pool is None) + ver = self.manager.oneadmin_client.version() + except: + ver = None + self.assertTrue(ver is not None) - def test_model_can_create_user(self): - """Test the opennebula manager model can create a new user.""" + def test_get_user(self): + """Test the opennebula manager can get a existing user.""" + self.manager.create_user(self.user) + user = self.manager._get_user(self.user) + name = user.name + self.assertNotEqual(name, None) + + def test_create_and_delete_user(self): + """Test the opennebula manager can create and delete a new user.""" old_count = len(self.manager._get_user_pool()) self.manager = OpenNebulaManager(email=self.email, - password=self.password, - create_user=True) + password=self.password) user_pool = self.manager._get_user_pool() new_count = len(user_pool) # Remove the user afterwards @@ -38,96 +56,36 @@ class OpenNebulaManagerTestCases(TestCase): self.assertNotEqual(old_count, new_count) + def test_user_can_login(self): + """ Test the manager can login to a new created user""" + self.manager.create_user(self.user) + user = self.manager._get_user(self.user) + client = self.manager._get_client(self.user) + version = client.version() -class VirtualMachineTemplateTestCase(TestCase): - """This class defines the test suite for the virtualmachine template model.""" + # Cleanup + user.delete() + self.assertNotEqual(version, None) - def setUp(self): - """Define the test client and other test variables.""" - self.template_name = "Standard" - self.base_price = 0.0 - self.core_price = 5.0 - self.memory_price = 2.0 - self.disk_size_price = 0.6 + def test_add_public_key_to_user(self): + """ Test the manager can add a new public key to an user """ + self.manager.create_user(self.user) + user = self.manager._get_user(self.user) + public_key = 'test' + self.manager.add_public_key(self.user, public_key) + # Fetch new user information from opennebula + user.info() + user_public_key = user.template.ssh_public_key + # Cleanup + user.delete() - self.cores = 1 - self.memory = 1 - self.disk_size = 10.0 - - self.manager = OpenNebulaManager(email=None, password=None, create_user=False) - self.opennebula_id = self.manager.create_template(name=self.template_name, - cores=self.cores, - memory=self.memory, - disk_size=self.disk_size) - - self.template = VirtualMachineTemplate(opennebula_id=self.opennebula_id, - base_price=self.base_price, - memory_price=self.memory_price, - core_price=self.core_price, - disk_size_price=self.disk_size_price) - - - def test_model_can_create_a_virtualmachine_template(self): - """Test the virtualmachine template model can create a template.""" - old_count = VirtualMachineTemplate.objects.count() - self.template.save() - new_count = VirtualMachineTemplate.objects.count() - # Remove the template afterwards - template = self.manager._get_template(self.template.opennebula_id) - template.delete() - self.assertNotEqual(old_count, new_count) - - def test_model_can_calculate_price(self): - price = self.cores * self.core_price - price += self.memory * self.memory_price - price += self.disk_size * self.disk_size_price - self.assertEqual(price, self.template.calculate_price()) - - - -class VirtualMachineTestCase(TestCase): - def setUp(self): - """Define the test client and other test variables.""" - self.template_name = "Standard" - self.base_price = 0.0 - self.core_price = 5.0 - self.memory_price = 2.0 - self.disk_size_price = 0.6 - - self.cores = 1 - self.memory = 1 - self.disk_size = 10.0 - self.manager = OpenNebulaManager(email=None, password=None, create_user=False) - self.opennebula_id = self.manager.create_template(name=self.template_name, - cores=self.cores, - memory=self.memory, - disk_size=self.disk_size) - - self.template = VirtualMachineTemplate(opennebula_id=self.opennebula_id, - base_price=self.base_price, - memory_price=self.memory_price, - core_price=self.core_price, - disk_size_price=self.disk_size_price) - self.template_id = self.template.opennebula_id() - self.opennebula_id = self.manager.create_virtualmachine(template_id=self.template_id) - - self.virtualmachine = VirtualMachine(opennebula_id=self.opennebula_id, - template=self.template) + self.assertEqual(user_public_key, public_key) - def test_model_can_create_a_virtualmachine(self): - """Test the virtualmachine model can create a virtualmachine.""" - old_count = VirtualMachine.objects.count() - self.virtualmachine.save() - new_count = VirtualMachine.objects.count() - self.assertNotEqual(old_count, new_count) - def test_model_can_create_a_virtualmachine_for_user(self): - pass + def test_requires_ssh_key_for_new_vm(self): + """Test the opennebula manager requires the user to have a ssh key when + creating a new vm""" + + + - def test_model_can_delete_a_virtualmachine(self): - """Test the virtualmachine model can delete a virtualmachine.""" - self.virtualmachine.save() - old_count = VirtualMachine.objects.count() - VirtualMachine.objects.first().delete() - new_count = VirtualMachine.objects.count() - self.assertNotEqual(old_count, new_count)