Added Flake8 lib and fixed PEP8 violations

This commit is contained in:
Siarhei Puhach 2017-06-29 17:34:40 +03:00
commit edcfd3e9f4
65 changed files with 340 additions and 387 deletions

View file

@ -1,3 +1,3 @@
from django.contrib import admin
# from django.contrib import admin
# Register your models here.

View file

@ -1,9 +1,10 @@
class KeyExistsError(Exception):
pass
class UserExistsError(Exception):
pass
class UserCredentialError(Exception):
pass

View file

@ -6,7 +6,6 @@ from oca.pool import WrongNameError, WrongIdError
from oca.exceptions import OpenNebulaException
from django.conf import settings
from django.utils.functional import cached_property
from utils.models import CustomUser
from .exceptions import KeyExistsError, UserExistsError, UserCredentialError
@ -38,9 +37,10 @@ class OpenNebulaManager():
)
except:
pass
def _get_client(self, user):
"""Get a opennebula client object for a CustomUser object
"""Get a opennebula client object for a CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
@ -49,7 +49,7 @@ class OpenNebulaManager():
Raise:
ConnectionError: If the connection to the opennebula server can't be
established
established
"""
return oca.Client("{0}:{1}".format(
user.email,
@ -74,8 +74,8 @@ class OpenNebulaManager():
))
def _get_user(self, user):
"""Get the corresponding opennebula user for a CustomUser object
"""Get the corresponding opennebula user for a CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
@ -85,7 +85,7 @@ class OpenNebulaManager():
Raise:
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
established
"""
user_pool = self._get_user_pool()
return user_pool.get_by_name(user.email)
@ -93,16 +93,16 @@ class OpenNebulaManager():
def create_user(self, user: CustomUser):
"""Create a new opennebula user or a corresponding CustomUser object
Args:
user (CustomUser): dynamicweb CustomUser object
Returns:
int: Return the opennebula user id
Raises:
ConnectionError: If the connection to the opennebula server can't be
established
established
UserExistsError: If a user with this credeintals already exits on the
server
UserCredentialError: If a user with this email exists but the
@ -111,7 +111,7 @@ class OpenNebulaManager():
"""
try:
self._get_user(user)
try:
try:
self._get_client(self, user)
logger.debug('User already exists')
raise UserExistsError()
@ -122,19 +122,18 @@ class OpenNebulaManager():
except WrongNameError:
user_id = self.oneadmin_client.call(oca.User.METHODS['allocate'],
user.email, user.password, 'core')
user.email, user.password, 'core')
logger.debug('Created a user for CustomObject: {user} with user id = {u_id}',
user=user,
u_id=user_id
)
return user_id
user=user,
u_id=user_id
)
return user_id
except ConnectionRefusedError:
logger.error('Could not connect to host: {host} via protocol {protocol}'.format(
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise ConnectionRefusedError
def _get_or_create_user(self, email, password):
try:
@ -166,7 +165,7 @@ class OpenNebulaManager():
host=settings.OPENNEBULA_DOMAIN,
protocol=settings.OPENNEBULA_PROTOCOL)
)
raise
raise
return user_pool
def _get_vm_pool(self):
@ -209,36 +208,6 @@ class OpenNebulaManager():
except:
raise ConnectionRefusedError
def create_template(self, name, cores, memory, disk_size, core_price, memory_price,
disk_size_price, ssh=''):
"""Create and add a new template to opennebula.
:param name: A string representation describing the template.
Used as label in view.
:param cores: Amount of virtual cpu cores for the VM.
:param memory: Amount of RAM for the VM (GB)
:param disk_size: Amount of disk space for VM (GB)
:param core_price: Price of virtual cpu for the VM per core.
:param memory_price: Price of RAM for the VM per GB
:param disk_size_price: Price of disk space for VM per GB
:param ssh: User public ssh key
"""
template_id = oca.VmTemplate.allocate(
self.oneadmin_client,
template_string_formatter.format(
name=name,
vcpu=cores,
cpu=0.1 * cores,
size=1024 * disk_size,
memory=1024 * memory,
# * 10 because we set cpu to *0.1
cpu_cost=10 * core_price,
memory_cost=memory_price,
disk_cost=disk_size_price,
ssh=ssh
)
)
def create_vm(self, template_id, specs, ssh_key=None):
template = self.get_template(template_id)
@ -286,8 +255,7 @@ class OpenNebulaManager():
""".format(size=1024 * int(specs['disk_size']),
image=image,
image_uname=image_uname)
if ssh_key:
vm_specs += """<CONTEXT>
<SSH_PUBLIC_KEY>{ssh}</SSH_PUBLIC_KEY>
@ -431,30 +399,30 @@ class OpenNebulaManager():
)
def add_public_key(self, user, public_key='', merge=False):
"""
"""
Args:
user (CustomUser): Dynamicweb user
Args:
user (CustomUser): Dynamicweb user
public_key (string): Public key to add to the user
merge (bool): Optional if True the new public key replaces the old
Raises:
KeyExistsError: If replace is False and the user already has a
public key
public key
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
established
Returns:
True if public_key was added
"""
# TODO: Check if we can remove this first try because we basically just
# raise the possible Errors
# raise the possible Errors
try:
open_user = self._get_user(user)
try:
old_key = open_user.template.ssh_public_key
old_key = open_user.template.ssh_public_key
if not merge:
raise KeyExistsError()
public_key += '\n{key}'.format(key=old_key)
@ -462,7 +430,8 @@ class OpenNebulaManager():
except AttributeError:
pass
self.oneadmin_client.call('user.update', open_user.id,
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'.format(key=public_key))
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'
.format(key=public_key))
return True
except WrongNameError:
raise
@ -471,18 +440,18 @@ class OpenNebulaManager():
raise
def remove_public_key(self, user, public_key=''):
"""
"""
Args:
user (CustomUser): Dynamicweb user
Args:
user (CustomUser): Dynamicweb user
public_key (string): Public key to be removed to the user
Raises:
KeyDoesNotExistsError: If replace is False and the user already has a
public key
public key
WrongNameError: If no openebula user with this credentials exists
ConnectionError: If the connection to the opennebula server can't be
established
established
Returns:
True if public_key was removed
@ -492,21 +461,22 @@ class OpenNebulaManager():
try:
open_user = self._get_user(user)
try:
old_key = open_user.template.ssh_public_key
old_key = open_user.template.ssh_public_key
if public_key not in old_key:
return False
# raise KeyDoesNotExistsError()
if '\n{}'.format(public_key) in old_key:
public_key = old_key.replace('\n{}'.format(public_key), '')
else:
else:
public_key = old_key.replace(public_key, '')
except AttributeError:
return False
#raise KeyDoesNotExistsError()
# raise KeyDoesNotExistsError()
self.oneadmin_client.call('user.update', open_user.id,
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'.format(key=public_key))
'<CONTEXT><SSH_PUBLIC_KEY>{key}</SSH_PUBLIC_KEY></CONTEXT>'
.format(key=public_key))
return True
except WrongNameError:
raise

View file

@ -1,20 +1,19 @@
import oca
import ipaddress
from rest_framework import serializers
from oca import OpenNebulaException
from oca.template import VmTemplate
from .models import OpenNebulaManager
class VirtualMachineTemplateSerializer(serializers.Serializer):
"""Serializer to map the virtual machine template instance into JSON format."""
id = serializers.IntegerField(read_only=True)
name = serializers.SerializerMethodField()
cores = serializers.SerializerMethodField()
disk_size = serializers.SerializerMethodField()
memory = serializers.SerializerMethodField()
id = serializers.IntegerField(read_only=True)
name = serializers.SerializerMethodField()
cores = serializers.SerializerMethodField()
disk_size = serializers.SerializerMethodField()
memory = serializers.SerializerMethodField()
def get_cores(self, obj):
if hasattr(obj.template, 'vcpu'):
@ -28,7 +27,7 @@ class VirtualMachineTemplateSerializer(serializers.Serializer):
try:
for disk in template.disks:
disk_size += int(disk.size)
return disk_size / 1024
return disk_size / 1024
except:
return 0
@ -39,30 +38,28 @@ class VirtualMachineTemplateSerializer(serializers.Serializer):
return obj.name.strip('public-')
class VirtualMachineSerializer(serializers.Serializer):
"""Serializer to map the virtual machine instance into JSON format."""
name = serializers.SerializerMethodField()
cores = serializers.IntegerField(source='template.vcpu')
disk = serializers.IntegerField(write_only=True)
set_memory = serializers.IntegerField(write_only=True, label='Memory')
memory = serializers.SerializerMethodField()
cores = serializers.IntegerField(source='template.vcpu')
disk = serializers.IntegerField(write_only=True)
set_memory = serializers.IntegerField(write_only=True, label='Memory')
memory = serializers.SerializerMethodField()
disk_size = serializers.SerializerMethodField()
ipv4 = serializers.SerializerMethodField()
ipv6 = serializers.SerializerMethodField()
vm_id = serializers.IntegerField(read_only=True, source='id')
state = serializers.CharField(read_only=True, source='str_state')
price = serializers.SerializerMethodField()
ssh_key = serializers.CharField(write_only=True)
disk_size = serializers.SerializerMethodField()
ipv4 = serializers.SerializerMethodField()
ipv6 = serializers.SerializerMethodField()
vm_id = serializers.IntegerField(read_only=True, source='id')
state = serializers.CharField(read_only=True, source='str_state')
price = serializers.SerializerMethodField()
ssh_key = serializers.CharField(write_only=True)
configuration = serializers.SerializerMethodField()
template_id = serializers.ChoiceField(
choices=[(key.id, key.name) for key in
OpenNebulaManager().try_get_templates()
],
OpenNebulaManager().try_get_templates()
],
source='template.template_id',
write_only=True,
default=[]
@ -77,12 +74,11 @@ class VirtualMachineSerializer(serializers.Serializer):
template_id = validated_data['template']['template_id']
specs = {
'cpu' : cores,
'disk_size' : disk,
'memory' : memory,
'cpu': cores,
'disk_size': disk,
'memory': memory,
}
try:
manager = OpenNebulaManager(email=owner.email,
password=owner.password,
@ -92,7 +88,7 @@ class VirtualMachineSerializer(serializers.Serializer):
specs=specs)
except OpenNebulaException as err:
raise serializers.ValidationError("OpenNebulaException occured. {0}".format(err))
return manager.get_vm(opennebula_id)
def get_memory(self, obj):
@ -112,6 +108,7 @@ class VirtualMachineSerializer(serializers.Serializer):
for disk in template.disks:
price += int(disk.size)/1024 * 0.6
return price
def get_configuration(self, obj):
template_id = obj.template.template_id
template = OpenNebulaManager().get_template(template_id)
@ -123,7 +120,7 @@ class VirtualMachineSerializer(serializers.Serializer):
return str(v4_from_mac(nic.mac))
else:
return '-'
def get_ipv6(self, obj):
nic = obj.template.nics[0]
return nic.ip6_global
@ -131,12 +128,15 @@ class VirtualMachineSerializer(serializers.Serializer):
def get_name(self, obj):
return obj.name.strip('public-')
def hexstr2int(string):
return int(string.replace(':', ''), 16)
FIRST_MAC = hexstr2int('02:00:b3:39:79:4d')
FIRST_V4 = ipaddress.ip_address('185.203.112.2')
COUNT = 1000
FIRST_V4 = ipaddress.ip_address('185.203.112.2')
COUNT = 1000
def v4_from_mac(mac):
"""Calculates the IPv4 address from a MAC address.
@ -146,5 +146,6 @@ def v4_from_mac(mac):
"""
return FIRST_V4 + (hexstr2int(mac) - FIRST_MAC)
def is_in_v4_range(mac):
return FIRST_MAC <= hexstr2int(mac) < FIRST_MAC + 1000

View file

@ -1,4 +1,3 @@
import socket
import random
import string
@ -8,32 +7,30 @@ from .models import OpenNebulaManager
from .serializers import VirtualMachineSerializer
from utils.models import CustomUser
class OpenNebulaManagerTestCases(TestCase):
"""This class defines the test suite for the opennebula manager model."""
def setUp(self):
"""Define the test client and other test variables."""
self.email = '{}@ungleich.ch'.format(''.join(random.choices(string.ascii_uppercase, k=10)))
self.password = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20))
self.password = ''.join(random.choices(string.ascii_uppercase + string.digits, k=20))
self.user = CustomUser.objects.create(name='test', email=self.email,
password=self.password)
password=self.password)
self.vm_specs = {}
self.vm_specs['cpu'] = 1
self.vm_specs['memory'] = 2
self.vm_specs['disk_size'] = 10
self.manager = OpenNebulaManager()
self.manager = OpenNebulaManager()
def test_connect_to_server(self):
"""Test the opennebula manager can connect to a server."""
try:
ver = self.manager.oneadmin_client.version()
except:
except:
ver = None
self.assertTrue(ver is not None)
@ -54,7 +51,7 @@ class OpenNebulaManagerTestCases(TestCase):
# Remove the user afterwards
user = user_pool.get_by_name(self.email)
user.delete()
self.assertNotEqual(old_count, new_count)
def test_user_can_login(self):
@ -64,7 +61,7 @@ class OpenNebulaManagerTestCases(TestCase):
client = self.manager._get_client(self.user)
version = client.version()
# Cleanup
# Cleanup
user.delete()
self.assertNotEqual(version, None)
@ -77,11 +74,11 @@ class OpenNebulaManagerTestCases(TestCase):
# Fetch new user information from opennebula
user.info()
user_public_key = user.template.ssh_public_key
# Cleanup
# Cleanup
user.delete()
self.assertEqual(user_public_key, public_key)
def test_append_public_key_to_user(self):
""" Test the manager can append a new public key to an user """
self.manager.create_user(self.user)
@ -94,11 +91,11 @@ class OpenNebulaManagerTestCases(TestCase):
self.manager.add_public_key(self.user, public_key, merge=True)
user.info()
new_public_key = user.template.ssh_public_key
# Cleanup
# Cleanup
user.delete()
self.assertEqual(new_public_key, '{}\n{}'.format(old_public_key,
public_key))
public_key))
def test_remove_public_key_to_user(self):
""" Test the manager can remove a public key from an user """
@ -112,12 +109,11 @@ class OpenNebulaManagerTestCases(TestCase):
self.manager.remove_public_key(self.user, public_key)
user.info()
new_public_key = user.template.ssh_public_key
# Cleanup
# Cleanup
user.delete()
self.assertEqual(new_public_key,
old_public_key.replace('{}\n'.format(public_key), '', 1))
old_public_key.replace('{}\n'.format(public_key), '', 1))
def test_requires_ssh_key_for_new_vm(self):
"""Test the opennebula manager requires the user to have a ssh key when
@ -128,14 +124,11 @@ class VirtualMachineSerializerTestCase(TestCase):
def setUp(self):
"""Define the test client and other test variables."""
self.manager = OpenNebulaManager(email=None, password=None)
def test_serializer_strips_of_public(self):
""" Test the serialized virtual machine object contains no 'public-'."""
""" Test the serialized virtual machine object contains no 'public-'."""
for vm in self.manager.get_vms():
serialized = VirtualMachineSerializer(vm)
self.assertEqual(serialized.data.get('name'), vm.name.strip('public-'))
break

View file

@ -1,19 +1,11 @@
from rest_framework import generics
from rest_framework import permissions
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth import authenticate, login
from utils.views import LoginViewMixin
from membership.models import CustomUser, StripeCustomer
from guardian.mixins import PermissionRequiredMixin
from .serializers import VirtualMachineTemplateSerializer, \
VirtualMachineSerializer
from .serializers import VirtualMachineSerializer
from .models import OpenNebulaManager
from rest_framework.exceptions import APIException
class ServiceUnavailable(APIException):
status_code = 503
default_detail = 'Service temporarily unavailable, try again later.'
@ -29,7 +21,7 @@ class VmCreateView(generics.ListCreateAPIView):
owner = self.request.user
manager = OpenNebulaManager(email=owner.email,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
vms = manager.get_vms()
@ -41,6 +33,7 @@ class VmCreateView(generics.ListCreateAPIView):
"""Save the post data when creating a new template."""
serializer.save(owner=self.request.user)
class VmDetailsView(generics.RetrieveUpdateDestroyAPIView):
"""This class handles the http GET, PUT and DELETE requests."""
permission_classes = (permissions.IsAuthenticated, )
@ -51,7 +44,7 @@ class VmDetailsView(generics.RetrieveUpdateDestroyAPIView):
owner = self.request.user
manager = OpenNebulaManager(email=owner.email,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
vms = manager.get_vms()
@ -63,22 +56,21 @@ class VmDetailsView(generics.RetrieveUpdateDestroyAPIView):
owner = self.request.user
manager = OpenNebulaManager(email=owner.email,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
vm = manager.get_vm(self.kwargs.get('pk'))
except ConnectionRefusedError:
raise ServiceUnavailable
return vm
return vm
def perform_destroy(self, instance):
owner = self.request.user
manager = OpenNebulaManager(email=owner.email,
password=owner.password)
# We may have ConnectionRefusedError if we don't have a
# We may have ConnectionRefusedError if we don't have a
# connection to OpenNebula. For now, we raise ServiceUnavailable
try:
manager.delete_vm(instance.id)
except ConnectionRefusedError:
raise ServiceUnavailable