Add opennebula_api app

This commit is contained in:
M.Ravi 2023-12-06 16:46:00 +05:30
parent d20b9137bc
commit e1fd0d52b3
10 changed files with 1103 additions and 0 deletions

0
opennebula_api/__init__.py Executable file
View file

3
opennebula_api/admin.py Executable file
View file

@ -0,0 +1,3 @@
# from django.contrib import admin
# Register your models here.

5
opennebula_api/apps.py Executable file
View file

@ -0,0 +1,5 @@
from django.apps import AppConfig
class OpennebulaApiConfig(AppConfig):
name = 'opennebula_api'

10
opennebula_api/exceptions.py Executable file
View file

@ -0,0 +1,10 @@
class KeyExistsError(Exception):
pass
class UserExistsError(Exception):
pass
class UserCredentialError(Exception):
pass

View file

View file

@ -0,0 +1,643 @@
import logging
import socket
import oca
from django.conf import settings
from oca.exceptions import OpenNebulaException
from oca.pool import WrongNameError, WrongIdError
from hosting.models import HostingOrder
from utils.models import CustomUser
from utils.tasks import save_ssh_key, save_ssh_key_error_handler
from .exceptions import KeyExistsError, UserExistsError, UserCredentialError
logger = logging.getLogger(__name__)
class OpenNebulaManager():
"""This class represents an opennebula manager."""
def __init__(self, email=None, password=None):
self.email = email
self.password = password
# Get oneadmin client
self.oneadmin_client = None if settings.BYPASS_OPENNEBULA else self._get_opennebula_client(
settings.OPENNEBULA_USERNAME,
settings.OPENNEBULA_PASSWORD
)
# Get or create oppenebula user using given credentials
try:
self.opennebula_user = self._get_or_create_user(
email,
password
)
# If opennebula user was created/obtained, get his client
self.client = self._get_opennebula_client(
email,
password
)
except:
pass
def _get_client(self, user):
"""Get a opennebula client object for a CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
oca.Client: Opennebula client object
Raise:
ConnectionError: If the connection to the opennebula server can't be
established
"""
return self._get_opennebula_client(user.email, user.password)
def _get_opennebula_client(self, username, password):
return None if settings.BYPASS_OPENNEBULA else oca.Client(
"{0}:{1}".format(username, password),
"{protocol}://{domain}:{port}{endpoint}".format(
protocol=settings.OPENNEBULA_PROTOCOL,
domain=settings.OPENNEBULA_DOMAIN,
port=settings.OPENNEBULA_PORT,
endpoint=settings.OPENNEBULA_ENDPOINT
)
)
def _get_user(self, user):
"""Get the corresponding opennebula user for a CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
oca.User: Opennebula user object
Raise:
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
"""
user_pool = self._get_user_pool()
return user_pool.get_by_name(user.email)
def create_user(self, user: CustomUser):
"""Create a new opennebula user or a corresponding CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
int: Return the opennebula user id
Raises:
ConnectionError: If the connection to the opennebula server can't be
established
UserExistsError: If a user with this credeintals already exits on the
server
UserCredentialError: If a user with this email exists but the
password is worng
"""
try:
self._get_user(user)
try:
self._get_client(self, user)
logger.debug('User already exists')
raise UserExistsError()
except OpenNebulaException as err:
logger.error('OpenNebulaException error: {0}'.format(err))
logger.error('User exists but password is wrong')
raise UserCredentialError()
except WrongNameError:
user_id = self.oneadmin_client.call(oca.User.METHODS['allocate'],
user.email, user.password,
'core')
logger.debug(
'Created a user for CustomObject: {user} with user id = {u_id}',
user=user,
u_id=user_id
)
return user_id
except ConnectionRefusedError:
logger.error(
'Could not connect to host: {host} via protocol {protocol}'.format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
def _get_or_create_user(self, email, password):
try:
user_pool = self._get_user_pool()
opennebula_user = user_pool.get_by_name(email)
return opennebula_user
except WrongNameError as wrong_name_err:
opennebula_user = self.oneadmin_client.call(
oca.User.METHODS['allocate'], email,
password, 'core')
logger.debug(
"User {} does not exist. Created the user. User id = {}".format(
email,
opennebula_user
)
)
return opennebula_user
except ConnectionRefusedError:
logger.error(
'Could not connect to host: {host} via protocol {protocol}'.format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
except Exception as ex:
logger.error(str(ex))
def _get_user_pool(self):
try:
user_pool = oca.UserPool(self.oneadmin_client)
user_pool.info()
except ConnectionRefusedError:
logger.error(
'Could not connect to host: {host} via protocol {protocol}'.format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise
return user_pool
def _get_vm_pool(self, infoextended=True):
"""
# filter:
# -4: Resources belonging to the users primary group
# -3: Resources belonging to the user
# -2: All resources
# -1: Resources belonging to the user and any of his groups
# >= 0: UID Users Resources
# vm states:
# *-2 Any state, including DONE
# *-1 Any state, except DONE (Default)
# *0 INIT
# *1 PENDING
# *2 HOLD
# *3 ACTIVE
# *4 STOPPED
# *5 SUSPENDED
# *6 DONE
# *7 FAILED
# *8 POWEROFF
# *9 UNDEPLOYED
:param infoextended: When True calls infoextended api method introduced
in OpenNebula 5.8 else falls back to info which has limited attributes
of a VM
:return: the oca VirtualMachinePool object
"""
try:
vm_pool = oca.VirtualMachinePool(self.client)
if infoextended:
vm_pool.infoextended(
filter=-1, # User's resources and any of his groups
vm_state=-1 # Look for VMs in any state, except DONE
)
else:
vm_pool.info()
return vm_pool
except AttributeError as ae:
logger.error("AttributeError : %s" % str(ae))
except ConnectionRefusedError:
logger.error(
'Could not connect to host: {host} via protocol {protocol}'.format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
# For now we'll just handle all other errors as connection errors
except:
raise ConnectionRefusedError
def get_vms(self):
try:
return self._get_vm_pool()
except ConnectionRefusedError:
raise ConnectionRefusedError
def get_vm(self, vm_id):
vm_id = int(vm_id)
try:
vm_pool = self._get_vm_pool()
return vm_pool.get_by_id(vm_id)
except WrongIdError:
raise WrongIdError
except:
raise ConnectionRefusedError
def get_ipv6(self, vm_id):
"""
Returns the first IPv6 of the given vm.
:return: An IPv6 address string, if it exists else returns None
"""
ipv6_list = self.get_all_ipv6_addresses(vm_id)
if len(ipv6_list) > 0:
return ipv6_list[0]
else:
return None
def get_all_ipv6_addresses(self, vm_id):
"""
Returns a list of IPv6 addresses of the given vm
:param vm_id: The ID of the vm
:return:
"""
ipv6_list = []
vm = self.get_vm(vm_id)
for nic in vm.template.nics:
if hasattr(nic, 'ip6_global'):
ipv6_list.append(nic.ip6_global)
return ipv6_list
def create_vm(self, template_id, specs, ssh_key=None, vm_name=None):
template = self.get_template(template_id)
vm_specs_formatter = """<TEMPLATE>
<MEMORY>{memory}</MEMORY>
<VCPU>{vcpu}</VCPU>
<CPU>{cpu}</CPU>
"""
try:
disk = template.template.disks[0]
image_id = disk.image_id
vm_specs = vm_specs_formatter.format(
vcpu=int(specs['cpu']),
cpu=0.1 * int(specs['cpu']),
memory=(512 if specs['memory'] == 0.5 else
1024 * int(specs['memory'])),
)
vm_specs += """<DISK>
<TYPE>fs</TYPE>
<SIZE>{size}</SIZE>
<DEV_PREFIX>vd</DEV_PREFIX>
<IMAGE_ID>{image_id}</IMAGE_ID>
</DISK>
""".format(size=1024 * int(specs['disk_size']),
image_id=image_id)
except:
disk = template.template.disks[0]
image = disk.image
image_uname = disk.image_uname
vm_specs = vm_specs_formatter.format(
vcpu=int(specs['cpu']),
cpu=0.1 * int(specs['cpu']),
memory=(512 if specs['memory'] == 0.5 else
1024 * int(specs['memory'])),
)
vm_specs += """<DISK>
<TYPE>fs</TYPE>
<SIZE>{size}</SIZE>
<DEV_PREFIX>vd</DEV_PREFIX>
<IMAGE>{image}</IMAGE>
<IMAGE_UNAME>{image_uname}</IMAGE_UNAME>
</DISK>
""".format(size=1024 * int(specs['disk_size']),
image=image,
image_uname=image_uname)
vm_specs += "<CONTEXT>"
if ssh_key:
vm_specs += "<SSH_PUBLIC_KEY>{ssh}</SSH_PUBLIC_KEY>".format(
ssh=ssh_key)
vm_specs += """<NETWORK>YES</NETWORK>
</CONTEXT>
</TEMPLATE>
"""
try:
vm_id = self.client.call(
oca.VmTemplate.METHODS['instantiate'], template.id, '', True,
vm_specs, False
)
except OpenNebulaException as err:
logger.error("OpenNebulaException: {0}".format(err))
return None
self.oneadmin_client.call(
oca.VirtualMachine.METHODS['action'],
'release',
vm_id
)
if vm_name is not None:
self.oneadmin_client.call(
'vm.rename',
vm_id,
vm_name
)
return vm_id
def delete_vm(self, vm_id):
TERMINATE_ACTION = 'terminate-hard'
vm_terminated = False
try:
self.oneadmin_client.call(
oca.VirtualMachine.METHODS['action'],
TERMINATE_ACTION,
int(vm_id),
)
vm_terminated = True
except socket.timeout as socket_err:
logger.error("Socket timeout error: {0}".format(socket_err))
except OpenNebulaException as opennebula_err:
logger.error(
"OpenNebulaException error: {0}".format(opennebula_err))
except OSError as os_err:
logger.error("OSError : {0}".format(os_err))
except ValueError as value_err:
logger.error("ValueError : {0}".format(value_err))
return vm_terminated
def save_key_in_opennebula_user(self, ssh_key, update_type=1):
"""
Save the given ssh key in OpenNebula user
# Update type: 0: Replace the whole template.
1: Merge new template with the existing one.
:param ssh_key: The ssh key to be saved
:param update_type: The update type as explained above
:return:
"""
return_value = self.oneadmin_client.call(
'user.update',
self.opennebula_user if type(self.opennebula_user) == int else self.opennebula_user.id,
'<CONTEXT><SSH_PUBLIC_KEY>%s</SSH_PUBLIC_KEY></CONTEXT>' % ssh_key,
update_type
)
if type(return_value) == int:
logger.debug(
"Saved the key in opennebula successfully : %s" % return_value)
else:
logger.error(
"Could not save the key in opennebula. %s" % return_value)
return
def _get_template_pool(self):
try:
template_pool = oca.VmTemplatePool(self.oneadmin_client)
template_pool.info()
return template_pool
except ConnectionRefusedError:
logger.error(
"""Could not connect to host: {host} via protocol
{protocol}""".format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
except:
raise ConnectionRefusedError
def get_templates(self, prefix='public-'):
try:
public_templates = [
template
for template in self._get_template_pool()
if template.name.startswith(prefix)
]
return public_templates
except Exception as e:
logger.debug(e)
return []
def try_get_templates(self):
try:
return self.get_templates()
except:
return []
def get_template(self, template_id):
template_id = int(template_id)
try:
template_pool = self._get_template_pool()
if template_id in settings.UPDATED_TEMPLATES_DICT.keys():
template_id = settings.UPDATED_TEMPLATES_DICT[template_id]
return template_pool.get_by_id(template_id)
except Exception as ex:
logger.debug("Template Id we are looking for : %s" % template_id)
logger.error(str(ex))
raise ConnectionRefusedError
def create_template(self, name, cores, memory, disk_size, core_price,
memory_price,
disk_size_price, ssh=''):
"""Create and add a new template to opennebula.
:param name: A string representation describing the template.
Used as label in view.
:param cores: Amount of virtual cpu cores for the VM.
:param memory: Amount of RAM for the VM (GB)
:param disk_size: Amount of disk space for VM (GB)
:param core_price: Price of virtual cpu for the VM per core.
:param memory_price: Price of RAM for the VM per GB
:param disk_size_price: Price of disk space for VM per GB
:param ssh: User public ssh key
"""
template_string_formatter = """<TEMPLATE>
<NAME>{name}</NAME>
<MEMORY>{memory}</MEMORY>
<VCPU>{vcpu}</VCPU>
<CPU>{cpu}</CPU>
<DISK>
<TYPE>fs</TYPE>
<SIZE>{size}</SIZE>
<DEV_PREFIX>vd</DEV_PREFIX>
</DISK>
<CPU_COST>{cpu_cost}</CPU_COST>
<MEMORY_COST>{memory_cost}</MEMORY_COST>
<DISK_COST>{disk_cost}</DISK_COST>
<SSH_PUBLIC_KEY>{ssh}</SSH_PUBLIC_KEY>
</TEMPLATE>
"""
template_id = oca.VmTemplate.allocate(
self.oneadmin_client,
template_string_formatter.format(
name=name,
vcpu=cores,
cpu=0.1 * cores,
size=1024 * disk_size,
memory=1024 * memory,
# * 10 because we set cpu to *0.1
cpu_cost=10 * core_price,
memory_cost=memory_price,
disk_cost=disk_size_price,
ssh=ssh
)
)
return template_id
def delete_template(self, template_id):
self.oneadmin_client.call(
oca.VmTemplate.METHODS['delete'], template_id, False
)
def change_user_password(self, passwd_hash):
if type(self.opennebula_user) == int:
logger.debug("opennebula_user is int and has value = %s" %
self.opennebula_user)
else:
logger.debug("opennebula_user is object and corresponding id is %s"
% self.opennebula_user.id)
self.oneadmin_client.call(
oca.User.METHODS['passwd'],
self.opennebula_user if type(self.opennebula_user) == int else self.opennebula_user.id,
passwd_hash
)
def add_public_key(self, user, public_key='', merge=False):
"""
Args:
user (CustomUser): Dynamicweb user
public_key (string): Public key to add to the user
merge (bool): Optional if True the new public key replaces the old
Raises:
KeyExistsError: If replace is False and the user already has a
public key
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
Returns:
True if public_key was added
"""
# TODO: Check if we can remove this first try because we basically just
# raise the possible Errors
try:
open_user = self._get_user(user)
try:
old_key = open_user.template.ssh_public_key
if not merge:
raise KeyExistsError()
public_key += '\n{key}'.format(key=old_key)
except AttributeError:
pass
self.oneadmin_client.call('user.update', open_user.id,
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'
.format(key=public_key))
return True
except WrongNameError:
raise
except ConnectionError:
raise
def remove_public_key(self, user, public_key=''):
"""
Args:
user (CustomUser): Dynamicweb user
public_key (string): Public key to be removed to the user
Raises:
KeyDoesNotExistsError: If replace is False and the user already has a
public key
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
Returns:
True if public_key was removed
"""
try:
open_user = self._get_user(user)
try:
old_key = open_user.template.ssh_public_key
if public_key not in old_key:
return False
# raise KeyDoesNotExistsError()
if '\n{}'.format(public_key) in old_key:
public_key = old_key.replace('\n{}'.format(public_key), '')
else:
public_key = old_key.replace(public_key, '')
except AttributeError:
return False
# raise KeyDoesNotExistsError()
self.oneadmin_client.call('user.update', open_user.id,
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'
.format(key=public_key))
return True
except WrongNameError:
raise
except ConnectionError:
raise
def manage_public_key(self, keys, hosts=None, countdown=0):
"""
A function that manages the supplied keys in the
authorized_keys file of the given list of hosts. If hosts
parameter is not supplied, all hosts of this customer
will be configured with the supplied keys
:param keys: A list of ssh keys that are to be added/removed
A key should be a dict of the form
{
'value': 'sha-.....', # public key as string
'state': True # whether key is to be added or
} # removed
:param hosts: A list of hosts IPv6 addresses
:param countdown: Parameter to be passed to celery apply_async
Allows to delay a task by `countdown` number of seconds
:return:
"""
if hosts is None:
hosts = self.get_all_hosts()
if len(hosts) > 0 and len(keys) > 0:
save_ssh_key.apply_async((hosts, keys), countdown=countdown,
link_error=save_ssh_key_error_handler.s())
else:
logger.debug(
"Keys and/or hosts are empty, so not managing any keys"
)
def get_all_hosts(self):
"""
A utility function to obtain all hosts of this owner
:return: A list of IPv6 addresses of all the hosts of this customer or
an empty list if none exist
"""
owner = CustomUser.objects.filter(
email=self.email).first()
all_orders = HostingOrder.objects.filter(customer__user=owner)
hosts = []
if len(all_orders) > 0:
logger.debug("The user {} has 1 or more VMs. We need to configure "
"the ssh keys.".format(self.email))
for order in all_orders:
try:
ip = self.get_ipv6(order.vm_id)
hosts.append(ip)
except WrongIdError:
logger.debug(
"VM with ID {} does not exist".format(order.vm_id))
else:
logger.debug("The user {} has no VMs. We don't need to configure "
"the ssh keys.".format(self.email))
return hosts

204
opennebula_api/serializers.py Executable file
View file

@ -0,0 +1,204 @@
import ipaddress
from builtins import hasattr
from rest_framework import serializers
from oca import OpenNebulaException
from .opennebula_manager import OpenNebulaManager
class VirtualMachineTemplateSerializer(serializers.Serializer):
"""Serializer to map the virtual machine template instance into JSON format."""
id = serializers.IntegerField(read_only=True)
name = serializers.SerializerMethodField()
cores = serializers.SerializerMethodField()
disk_size = serializers.SerializerMethodField()
memory = serializers.SerializerMethodField()
def get_cores(self, obj):
if hasattr(obj.template, 'vcpu'):
return obj.template.vcpu
return ''
def get_disk_size(self, obj):
template = obj.template
disk_size = 0
try:
for disk in template.disks:
disk_size += int(disk.size)
return disk_size / 1024
except:
return 0
def get_memory(self, obj):
return int(obj.template.memory) / 1024
def get_name(self, obj):
if obj.name.startswith('public-'):
return obj.name.lstrip('public-')
else:
return obj.name
class VirtualMachineSerializer(serializers.Serializer):
"""Serializer to map the virtual machine instance into JSON format."""
name = serializers.SerializerMethodField()
cores = serializers.IntegerField(source='template.vcpu')
disk = serializers.IntegerField(write_only=True)
set_memory = serializers.IntegerField(write_only=True, label='Memory')
memory = serializers.SerializerMethodField()
disk_size = serializers.SerializerMethodField()
hdd_size = serializers.SerializerMethodField()
ssd_size = serializers.SerializerMethodField()
ipv4 = serializers.SerializerMethodField()
ipv6 = serializers.SerializerMethodField()
vm_id = serializers.IntegerField(read_only=True, source='id')
state = serializers.CharField(read_only=True, source='str_state')
price = serializers.SerializerMethodField()
ssh_key = serializers.CharField(write_only=True)
configuration = serializers.SerializerMethodField()
template_id = serializers.ChoiceField(
choices=[(key.id, key.name) for key in
OpenNebulaManager().try_get_templates()
],
source='template.template_id',
write_only=True,
default=[]
)
def create(self, validated_data):
owner = validated_data['owner']
ssh_key = validated_data['ssh_key']
cores = validated_data['template']['vcpu']
memory = validated_data['set_memory']
disk = validated_data['disk']
template_id = validated_data['template']['template_id']
specs = {
'cpu': cores,
'disk_size': disk,
'memory': memory,
}
try:
manager = OpenNebulaManager(email=owner.username,
password=owner.password,
)
opennebula_id = manager.create_vm(template_id=template_id,
ssh_key=ssh_key,
specs=specs)
except OpenNebulaException as err:
raise serializers.ValidationError(
"OpenNebulaException occured. {0}".format(err)
)
return manager.get_vm(opennebula_id)
def get_memory(self, obj):
return int(obj.template.memory) / 1024
def get_disk_size(self, obj):
template = obj.template
disk_size = 0
for disk in template.disks:
disk_size += int(disk.size)
return disk_size / 1024
def get_ssd_size(self, obj):
template = obj.template
disk_size = 0
for disk in template.disks:
if disk.datastore == 'cephds':
disk_size += int(disk.size)
return disk_size / 1024
def get_hdd_size(self, obj):
template = obj.template
disk_size = 0
for disk in template.disks:
if disk.datastore == 'ceph_hdd_ds':
disk_size += int(disk.size)
return disk_size / 1024
def get_price(self, obj):
template = obj.template
price = float(template.vcpu) * 5.0
price += (int(template.memory) / 1024 * 2.0)
for disk in template.disks:
price += int(disk.size) / 1024 * 0.6
return price
def get_configuration(self, obj):
template_id = obj.template.template_id
template = OpenNebulaManager().get_template(template_id)
if template.name.startswith('public-'):
return template.name.lstrip('public-')
else:
return template.name
def get_ipv4(self, obj):
"""
Get the IPv4s from the given VM
:param obj: The VM in contention
:return: Returns csv string of all IPv4s added to this VM otherwise returns "-" if no IPv4 is available
"""
ipv4 = []
for nic in obj.template.nics:
if hasattr(nic, 'ip'):
ipv4.append(nic.ip)
if len(ipv4) > 0:
return ', '.join(ipv4)
else:
return '-'
def get_ipv6(self, obj):
ipv6 = []
for nic in obj.template.nics:
if hasattr(nic, 'ip6_global'):
ipv6.append(nic.ip6_global)
if len(ipv6) > 0:
return ', '.join(ipv6)
else:
return '-'
def get_name(self, obj):
if obj.name.startswith('public-'):
return obj.name.lstrip('public-')
else:
return obj.name
class VMTemplateSerializer(serializers.Serializer):
"""Serializer to map the VMTemplate instance into JSON format."""
id = serializers.IntegerField(
read_only=True, source='opennebula_vm_template_id'
)
name = serializers.CharField(read_only=True)
def hexstr2int(string):
return int(string.replace(':', ''), 16)
FIRST_MAC = hexstr2int('02:00:b3:39:79:4d')
FIRST_V4 = ipaddress.ip_address('185.203.112.2')
COUNT = 1000
def v4_from_mac(mac):
"""Calculates the IPv4 address from a MAC address.
mac: string (the colon-separated representation)
returns: ipaddress.ip_address object with the v4 address
"""
return FIRST_V4 + (hexstr2int(mac) - FIRST_MAC)
def is_in_v4_range(mac):
return FIRST_MAC <= hexstr2int(mac) < FIRST_MAC + 1000

151
opennebula_api/tests.py Executable file
View file

@ -0,0 +1,151 @@
import random
import string
from django.conf import settings
from django.test import TestCase
from unittest import skipIf
from .opennebula_manager import OpenNebulaManager
from .serializers import VirtualMachineSerializer
from utils.models import CustomUser
@skipIf(
settings.OPENNEBULA_DOMAIN is None or
settings.OPENNEBULA_DOMAIN == "test_domain",
"""OpenNebula details unavailable, so skipping
OpenNebulaManagerTestCases"""
)
class OpenNebulaManagerTestCases(TestCase):
"""This class defines the test suite for the opennebula manager model."""
def setUp(self):
"""Define the test client and other test variables."""
self.email = '{}@ungleich.ch'.format(''.join(random.choices(string.ascii_uppercase, k=10)))
self.password = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20))
self.user = CustomUser.objects.create(name='test', email=self.email,
password=self.password)
self.vm_specs = {}
self.vm_specs['cpu'] = 1
self.vm_specs['memory'] = 2
self.vm_specs['disk_size'] = 10
self.manager = OpenNebulaManager()
def test_connect_to_server(self):
"""Test the opennebula manager can connect to a server."""
try:
ver = self.manager.oneadmin_client.version()
except:
ver = None
self.assertTrue(ver is not None)
def test_get_user(self):
"""Test the opennebula manager can get a existing user."""
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
name = user.name
self.assertNotEqual(name, None)
def test_create_and_delete_user(self):
"""Test the opennebula manager can create and delete a new user."""
old_count = len(self.manager._get_user_pool())
self.manager = OpenNebulaManager(email=self.email,
password=self.password)
user_pool = self.manager._get_user_pool()
new_count = len(user_pool)
# Remove the user afterwards
user = user_pool.get_by_name(self.email)
user.delete()
self.assertNotEqual(old_count, new_count)
def test_user_can_login(self):
""" Test the manager can login to a new created user"""
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
client = self.manager._get_client(self.user)
version = client.version()
# Cleanup
user.delete()
self.assertNotEqual(version, None)
def test_add_public_key_to_user(self):
""" Test the manager can add a new public key to an user """
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
public_key = 'test'
self.manager.add_public_key(self.user, public_key)
# Fetch new user information from opennebula
user.info()
user_public_key = user.template.ssh_public_key
# Cleanup
user.delete()
self.assertEqual(user_public_key, public_key)
def test_append_public_key_to_user(self):
""" Test the manager can append a new public key to an user """
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
public_key = 'test'
self.manager.add_public_key(self.user, public_key)
# Fetch new user information from opennebula
user.info()
old_public_key = user.template.ssh_public_key
self.manager.add_public_key(self.user, public_key, merge=True)
user.info()
new_public_key = user.template.ssh_public_key
# Cleanup
user.delete()
self.assertEqual(new_public_key, '{}\n{}'.format(old_public_key,
public_key))
def test_remove_public_key_to_user(self):
""" Test the manager can remove a public key from an user """
self.manager.create_user(self.user)
user = self.manager._get_user(self.user)
public_key = 'test'
self.manager.add_public_key(self.user, public_key)
self.manager.add_public_key(self.user, public_key, merge=True)
user.info()
old_public_key = user.template.ssh_public_key
self.manager.remove_public_key(self.user, public_key)
user.info()
new_public_key = user.template.ssh_public_key
# Cleanup
user.delete()
self.assertEqual(new_public_key,
old_public_key.replace('{}\n'.format(public_key), '', 1))
def test_requires_ssh_key_for_new_vm(self):
"""Test the opennebula manager requires the user to have a ssh key when
creating a new vm"""
@skipIf(
settings.OPENNEBULA_DOMAIN is None or
settings.OPENNEBULA_DOMAIN == "test_domain",
"""OpenNebula details unavailable, so skipping
VirtualMachineSerializerTestCase"""
)
class VirtualMachineSerializerTestCase(TestCase):
def setUp(self):
"""Define the test client and other test variables."""
self.manager = OpenNebulaManager(email=None, password=None)
def test_serializer_strips_of_public(self):
""" Test the serialized virtual machine object contains no
'public-'."""
for vm in self.manager.get_vms():
serialized = VirtualMachineSerializer(vm)
self.assertEqual(
serialized.data.get('name'), vm.name.lstrip('public-')
)
break

11
opennebula_api/urls.py Executable file
View file

@ -0,0 +1,11 @@
from django.urls import re_path, include
from rest_framework.urlpatterns import format_suffix_patterns
from .views import VmCreateView, VmDetailsView
urlpatterns = {
re_path(r'^auth/', include('rest_framework.urls', namespace='rest_framework')),
re_path(r'^vms/$', VmCreateView.as_view(), name="vm_create"),
re_path(r'^vms/(?P<pk>[0-9]+)/$', VmDetailsView.as_view(), name="vm_details"),
}
urlpatterns = format_suffix_patterns(urlpatterns)

76
opennebula_api/views.py Executable file
View file

@ -0,0 +1,76 @@
from rest_framework import generics
from rest_framework import permissions
from .serializers import VirtualMachineSerializer
from .opennebula_manager import OpenNebulaManager
from rest_framework.exceptions import APIException
class ServiceUnavailable(APIException):
status_code = 503
default_detail = 'Service temporarily unavailable, try again later.'
default_code = 'service_unavailable'
class VmCreateView(generics.ListCreateAPIView):
"""This class handles the GET and POST requests."""
serializer_class = VirtualMachineSerializer
permission_classes = (permissions.IsAuthenticated, )
def get_queryset(self):
owner = self.request.user
manager = OpenNebulaManager(email=owner.username,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
vms = manager.get_vms()
except ConnectionRefusedError:
raise ServiceUnavailable
return vms
def perform_create(self, serializer):
"""Save the post data when creating a new template."""
serializer.save(owner=self.request.user)
class VmDetailsView(generics.RetrieveUpdateDestroyAPIView):
"""This class handles the http GET, PUT and DELETE requests."""
permission_classes = (permissions.IsAuthenticated, )
serializer_class = VirtualMachineSerializer
def get_queryset(self):
owner = self.request.user
manager = OpenNebulaManager(email=owner.username,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
vms = manager.get_vms()
except ConnectionRefusedError:
raise ServiceUnavailable
return vms
def get_object(self):
owner = self.request.user
manager = OpenNebulaManager(email=owner.username,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
vm = manager.get_vm(self.kwargs.get('pk'))
except ConnectionRefusedError:
raise ServiceUnavailable
return vm
def perform_destroy(self, instance):
owner = self.request.user
manager = OpenNebulaManager(email=owner.username,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
manager.delete_vm(instance.id)
except ConnectionRefusedError:
raise ServiceUnavailable