import decimal import logging import math import subprocess from oca.pool import WrongIdError from datacenterlight.models import VMPricing from hosting.models import UserHostingKey, VMDetail, VATRates from opennebula_api.serializers import VirtualMachineSerializer logger = logging.getLogger(__name__) def get_all_public_keys(customer): """ Returns all the public keys of the user :param customer: The customer whose public keys are needed :return: A list of public keys """ return UserHostingKey.objects.filter(user_id=customer.id).values_list( "public_key", flat=True).distinct() def get_or_create_vm_detail(user, manager, vm_id): """ Returns VMDetail object related to given vm_id. Creates the object if it does not exist :param vm_id: The ID of the VM which should be greater than 0. :param user: The CustomUser object that owns this VM :param manager: The OpenNebulaManager object :return: The VMDetail object. None if vm_id is less than or equal to 0. Also, for the cases where the VMDetail does not exist and we can not fetch data about the VM from OpenNebula, the function returns None """ if vm_id <= 0: return None try: vm_detail_obj = VMDetail.objects.get(vm_id=vm_id) except VMDetail.DoesNotExist: try: vm_obj = manager.get_vm(vm_id) except (WrongIdError, ConnectionRefusedError) as e: logger.error(str(e)) return None vm = VirtualMachineSerializer(vm_obj).data vm_detail_obj = VMDetail.objects.create( user=user, vm_id=vm_id, disk_size=vm['disk_size'], cores=vm['cores'], memory=vm['memory'], configuration=vm['configuration'], ipv4=vm['ipv4'], ipv6=vm['ipv6'] ) return vm_detail_obj def get_vm_price(cpu, memory, disk_size, hdd_size=0, pricing_name='default'): """ A helper function that computes price of a VM from given cpu, ram and ssd parameters :param cpu: Number of cores of the VM :param memory: RAM of the VM :param disk_size: Disk space of the VM (SSD) :param hdd_size: The HDD size :param pricing_name: The pricing name to be used :return: The price of the VM """ try: pricing = VMPricing.objects.get(name=pricing_name) except Exception as ex: logger.error( "Error getting VMPricing object for {pricing_name}." "Details: {details}".format( pricing_name=pricing_name, details=str(ex) ) ) return None price = ((decimal.Decimal(cpu) * pricing.cores_unit_price) + (decimal.Decimal(memory) * pricing.ram_unit_price) + (decimal.Decimal(disk_size) * pricing.ssd_unit_price) + (decimal.Decimal(hdd_size) * pricing.hdd_unit_price)) cents = decimal.Decimal('.01') price = price.quantize(cents, decimal.ROUND_HALF_UP) return round(float(price), 2) def get_vm_price_for_given_vat(cpu, memory, ssd_size, hdd_size=0, pricing_name='default', vat_rate=0): try: pricing = VMPricing.objects.get(name=pricing_name) except Exception as ex: logger.error( "Error getting VMPricing object for {pricing_name}." "Details: {details}".format( pricing_name=pricing_name, details=str(ex) ) ) return None price = ( (decimal.Decimal(cpu) * pricing.cores_unit_price) + (decimal.Decimal(memory) * pricing.ram_unit_price) + (decimal.Decimal(ssd_size) * pricing.ssd_unit_price) + (decimal.Decimal(hdd_size) * pricing.hdd_unit_price) ) discount_name = pricing.discount_name discount_amount = round(float(pricing.discount_amount), 2) vat = price * decimal.Decimal(vat_rate) * decimal.Decimal(0.01) vat_percent = vat_rate cents = decimal.Decimal('.01') price = price.quantize(cents, decimal.ROUND_HALF_UP) vat = vat.quantize(cents, decimal.ROUND_HALF_UP) discount_amount_with_vat = decimal.Decimal(discount_amount) * (1 + decimal.Decimal(vat_rate) * decimal.Decimal(0.01)) discount_amount_with_vat = discount_amount_with_vat.quantize(cents, decimal.ROUND_HALF_UP) discount = { 'name': discount_name, 'amount': discount_amount, 'amount_with_vat': round(float(discount_amount_with_vat), 2) } return (round(float(price), 2), round(float(vat), 2), round(float(vat_percent), 2), discount) def get_vm_price_with_vat(cpu, memory, ssd_size, hdd_size=0, pricing_name='default'): """ A helper function that computes price of a VM from given cpu, ram and ssd, hdd and the pricing parameters :param cpu: Number of cores of the VM :param memory: RAM of the VM :param ssd_size: Disk space of the VM (SSD) :param hdd_size: The HDD size :param pricing_name: The pricing name to be used :return: The a tuple containing the price of the VM, the VAT and the VAT percentage """ try: pricing = VMPricing.objects.get(name=pricing_name) except Exception as ex: logger.error( "Error getting VMPricing object for {pricing_name}." "Details: {details}".format( pricing_name=pricing_name, details=str(ex) ) ) return None price = ( (decimal.Decimal(cpu) * pricing.cores_unit_price) + (decimal.Decimal(memory) * pricing.ram_unit_price) + (decimal.Decimal(ssd_size) * pricing.ssd_unit_price) + (decimal.Decimal(hdd_size) * pricing.hdd_unit_price) ) if pricing.vat_inclusive: vat = decimal.Decimal(0) vat_percent = decimal.Decimal(0) else: vat = price * pricing.vat_percentage * decimal.Decimal(0.01) vat_percent = pricing.vat_percentage cents = decimal.Decimal('.01') price = price.quantize(cents, decimal.ROUND_HALF_UP) vat = vat.quantize(cents, decimal.ROUND_HALF_UP) discount = { 'name': pricing.discount_name, 'amount': round(float(pricing.discount_amount), 2) } return (round(float(price), 2), round(float(vat), 2), round(float(vat_percent), 2), discount) def ping_ok(host_ipv6): """ A utility method to check if a host responds to ping requests. Note: the function relies on `ping6` utility of debian to check. :param host_ipv6 str type parameter that represets the ipv6 of the host to checked :return True if the host responds to ping else returns False """ try: subprocess.check_output("ping6 -c 1 " + host_ipv6, shell=True) except Exception as ex: logger.debug(host_ipv6 + " not reachable via ping. Error = " + str(ex)) return False return True def get_vat_rate_for_country(country): vat_rate = None try: vat_rate = VATRates.objects.get( territory_codes=country, start_date__isnull=False, stop_date=None ) logger.debug("VAT rate for %s is %s" % (country, vat_rate.rate)) return vat_rate.rate except VATRates.DoesNotExist as dne: logger.debug(str(dne)) logger.debug("Did not find VAT rate for %s, returning 0" % country) return 0 def get_ip_addresses(vm_id): try: vm_detail = VMDetail.objects.get(vm_id=vm_id) return "%s
%s" % (vm_detail.ipv6, vm_detail.ipv4) except VMDetail.DoesNotExist as dne: logger.error(str(dne)) logger.error("VMDetail for %s does not exist" % vm_id) return "--" def round_up(n, decimals=0): multiplier = 10 ** decimals return math.ceil(n * multiplier) / multiplier class HostingUtils: @staticmethod def clear_items_from_list(from_list, items_list): """ A utility function to clear items from a given list. Useful when deleting items in bulk from session. e.g.: HostingUtils.clear_items_from_list( request.session, ['token', 'billing_address_data', 'card_id',] ) :param from_list: :param items_list: :return: """ for var in items_list: if var in from_list: del from_list[var]