Cleanup and add tests

This commit is contained in:
modulos 2017-05-31 18:01:54 +02:00
parent 9a7df541c5
commit 0b5cf2c057
2 changed files with 192 additions and 107 deletions

View file

@ -2,12 +2,14 @@ import oca
import socket
import logging
from oca.pool import WrongNameError
from oca.exceptions import OpenNebulaException
from django.conf import settings
from django.utils.functional import cached_property
from oca.pool import WrongNameError
from oca.exceptions import OpenNebulaException
from utils.models import CustomUser
logger = logging.getLogger(__name__)
@ -35,10 +37,33 @@ class OpenNebulaManager():
)
except:
pass
def _get_client(self, user):
"""Get a opennebula client object for a CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
oca.Client: Opennebula client object
Raise:
ConnectionError: If the connection to the opennebula server can't be
established
"""
return oca.Client("{0}:{1}".format(
user.email,
user.password),
"{protocol}://{domain}:{port}{endpoint}".format(
protocol=settings.OPENNEBULA_PROTOCOL,
domain=settings.OPENNEBULA_DOMAIN,
port=settings.OPENNEBULA_PORT,
endpoint=settings.OPENNEBULA_ENDPOINT
))
def _get_opennebula_client(self, username, password):
return oca.Client("{0}:{1}".format(
username,
password),
"{protocol}://{domain}:{port}{endpoint}".format(
protocol=settings.OPENNEBULA_PROTOCOL,
@ -47,6 +72,69 @@ class OpenNebulaManager():
endpoint=settings.OPENNEBULA_ENDPOINT
))
def _get_user(self, user):
"""Get the corresponding opennebula user for a CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
oca.User: Opennebula user object
Raise:
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
"""
user_pool = self._get_user_pool()
return user_pool.get_by_name(user.email)
def create_user(self, user: CustomUser):
"""Create a new opennebula user or a corresponding CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
int: Return the opennebula user id
Raises:
ConnectionError: If the connection to the opennebula server can't be
established
UserExistsError: If a user with this credeintals already exits on the
server
UserCredentialError: If a user with this email exists but the
password is worng
"""
try:
self._get_user(user)
try:
self._get_client(self, user)
logger.debug('User already exists')
raise UserExistsError()
except OpenNebulaException as err:
logger.error('OpenNebulaException error: {0}'.format(err))
logger.debug('User exists but password is wrong')
raise UserCredentialError()
except WrongNameError:
user_id = self.oneadmin_client.call(oca.User.METHODS['allocate'],
user.email, user.password, 'core')
logger.debug('Created a user for CustomObject: {user} with user id = {u_id}',
user=user,
u_id=user_id
)
return user_id
except ConnectionRefusedError:
logger.error('Could not connect to host: {host} via protocol {protocol}'.format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
def _get_or_create_user(self, email, password):
try:
user_pool = self._get_user_pool()
@ -77,7 +165,7 @@ class OpenNebulaManager():
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
raise
return user_pool
def _get_vm_pool(self):
@ -350,3 +438,42 @@ class OpenNebulaManager():
self.opennebula_user.id,
new_password
)
def add_public_key(self, user, public_key='', replace=True):
"""
Args:
user (CustomUser): Dynamicweb user
public_key (string): Public key to add to the user
replace (bool): Optional if True the new public key replaces the old
Raises:
KeyExistsError: If replace is False and the user already has a
public key
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
Returns:
True if public_key was added
"""
# TODO: Check if we can remove this first try because we basically just
# raise the possible Errors
try:
open_user = self._get_user(user)
try:
old_key = open_user.template.ssh_public_key
if not replace:
raise KeyExistsError()
except AttributeError:
pass
self.oneadmin_client.call('user.update', open_user.id,
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'.format(key=public_key))
return True
except WrongNameError:
raise
except ConnectionError:
raise

View file

@ -1,35 +1,53 @@
import socket
import random
import string
from django.test import TestCase
from .models import VirtualMachine, VirtualMachineTemplate, OpenNebulaManager
from .models import OpenNebulaManager
from utils.models import CustomUser
class OpenNebulaManagerTestCases(TestCase):
"""This class defines the test suite for the opennebula manager model."""
def setUp(self):
"""Define the test client and other test variables."""
self.cores = 1
self.memory = 1
self.disk_size = 10.0
self.email = 'test@test.com'
self.password = 'testtest'
self.manager = OpenNebulaManager(email=None, password=None, create_user=False)
def test_model_can_connect_to_server(self):
"""Test the opennebula manager model can connect to a server."""
self.email = '{}@ungleich.ch'.format(''.join(random.choices(string.ascii_uppercase, k=10)))
self.password = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20))
self.user = CustomUser.objects.create(name='test', email=self.email,
password=self.password)
self.vm_specs = {}
self.vm_specs['cpu'] = 1
self.vm_specs['memory'] = 2
self.vm_specs['disk_size'] = 10
self.manager = OpenNebulaManager()
def test_connect_to_server(self):
"""Test the opennebula manager can connect to a server."""
try:
user_pool = self.manager._get_user_pool()
ver = self.manager.oneadmin_client.version()
except:
user_pool = None
self.assertFalse(user_pool is None)
ver = None
self.assertTrue(ver is not None)
def test_model_can_create_user(self):
"""Test the opennebula manager model can create a new user."""
def test_get_user(self):
"""Test the opennebula manager can get a existing user."""
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
name = user.name
self.assertNotEqual(name, None)
def test_create_and_delete_user(self):
"""Test the opennebula manager can create and delete a new user."""
old_count = len(self.manager._get_user_pool())
self.manager = OpenNebulaManager(email=self.email,
password=self.password,
create_user=True)
password=self.password)
user_pool = self.manager._get_user_pool()
new_count = len(user_pool)
# Remove the user afterwards
@ -38,96 +56,36 @@ class OpenNebulaManagerTestCases(TestCase):
self.assertNotEqual(old_count, new_count)
def test_user_can_login(self):
""" Test the manager can login to a new created user"""
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
client = self.manager._get_client(self.user)
version = client.version()
class VirtualMachineTemplateTestCase(TestCase):
"""This class defines the test suite for the virtualmachine template model."""
# Cleanup
user.delete()
self.assertNotEqual(version, None)
def setUp(self):
"""Define the test client and other test variables."""
self.template_name = "Standard"
self.base_price = 0.0
self.core_price = 5.0
self.memory_price = 2.0
self.disk_size_price = 0.6
def test_add_public_key_to_user(self):
""" Test the manager can add a new public key to an user """
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
public_key = 'test'
self.manager.add_public_key(self.user, public_key)
# Fetch new user information from opennebula
user.info()
user_public_key = user.template.ssh_public_key
# Cleanup
user.delete()
self.cores = 1
self.memory = 1
self.disk_size = 10.0
self.manager = OpenNebulaManager(email=None, password=None, create_user=False)
self.opennebula_id = self.manager.create_template(name=self.template_name,
cores=self.cores,
memory=self.memory,
disk_size=self.disk_size)
self.template = VirtualMachineTemplate(opennebula_id=self.opennebula_id,
base_price=self.base_price,
memory_price=self.memory_price,
core_price=self.core_price,
disk_size_price=self.disk_size_price)
self.assertEqual(user_public_key, public_key)
def test_model_can_create_a_virtualmachine_template(self):
"""Test the virtualmachine template model can create a template."""
old_count = VirtualMachineTemplate.objects.count()
self.template.save()
new_count = VirtualMachineTemplate.objects.count()
# Remove the template afterwards
template = self.manager._get_template(self.template.opennebula_id)
template.delete()
self.assertNotEqual(old_count, new_count)
def test_model_can_calculate_price(self):
price = self.cores * self.core_price
price += self.memory * self.memory_price
price += self.disk_size * self.disk_size_price
self.assertEqual(price, self.template.calculate_price())
def test_requires_ssh_key_for_new_vm(self):
"""Test the opennebula manager requires the user to have a ssh key when
creating a new vm"""
class VirtualMachineTestCase(TestCase):
def setUp(self):
"""Define the test client and other test variables."""
self.template_name = "Standard"
self.base_price = 0.0
self.core_price = 5.0
self.memory_price = 2.0
self.disk_size_price = 0.6
self.cores = 1
self.memory = 1
self.disk_size = 10.0
self.manager = OpenNebulaManager(email=None, password=None, create_user=False)
self.opennebula_id = self.manager.create_template(name=self.template_name,
cores=self.cores,
memory=self.memory,
disk_size=self.disk_size)
self.template = VirtualMachineTemplate(opennebula_id=self.opennebula_id,
base_price=self.base_price,
memory_price=self.memory_price,
core_price=self.core_price,
disk_size_price=self.disk_size_price)
self.template_id = self.template.opennebula_id()
self.opennebula_id = self.manager.create_virtualmachine(template_id=self.template_id)
self.virtualmachine = VirtualMachine(opennebula_id=self.opennebula_id,
template=self.template)
def test_model_can_create_a_virtualmachine(self):
"""Test the virtualmachine model can create a virtualmachine."""
old_count = VirtualMachine.objects.count()
self.virtualmachine.save()
new_count = VirtualMachine.objects.count()
self.assertNotEqual(old_count, new_count)
def test_model_can_create_a_virtualmachine_for_user(self):
pass
def test_model_can_delete_a_virtualmachine(self):
"""Test the virtualmachine model can delete a virtualmachine."""
self.virtualmachine.save()
old_count = VirtualMachine.objects.count()
VirtualMachine.objects.first().delete()
new_count = VirtualMachine.objects.count()
self.assertNotEqual(old_count, new_count)