dynamicweb2/utils/hosting_utils.py

242 lines
8.3 KiB
Python
Raw Normal View History

2023-12-06 11:16:30 +00:00
import decimal
import logging
import subprocess
from django.conf import settings
from oca.pool import WrongIdError
from datacenterlight.models import VMPricing
from hosting.models import UserHostingKey, VMDetail, VATRates
from opennebula_api.serializers import VirtualMachineSerializer
logger = logging.getLogger(__name__)
def get_all_public_keys(customer):
"""
Returns all the public keys of the user
:param customer: The customer whose public keys are needed
:return: A list of public keys
"""
return UserHostingKey.objects.filter(user_id=customer.id).values_list(
"public_key", flat=True).distinct()
def get_or_create_vm_detail(user, manager, vm_id):
"""
Returns VMDetail object related to given vm_id. Creates the object
if it does not exist
:param vm_id: The ID of the VM which should be greater than 0.
:param user: The CustomUser object that owns this VM
:param manager: The OpenNebulaManager object
:return: The VMDetail object. None if vm_id is less than or equal to 0.
Also, for the cases where the VMDetail does not exist and we can not
fetch data about the VM from OpenNebula, the function returns None
"""
if vm_id <= 0:
return None
try:
vm_detail_obj = VMDetail.objects.get(vm_id=vm_id)
except VMDetail.DoesNotExist:
try:
vm_obj = manager.get_vm(vm_id)
except (WrongIdError, ConnectionRefusedError) as e:
logger.error(str(e))
return None
vm = VirtualMachineSerializer(vm_obj).data
vm_detail_obj = VMDetail.objects.create(
user=user, vm_id=vm_id, disk_size=vm['disk_size'],
cores=vm['cores'], memory=vm['memory'],
configuration=vm['configuration'], ipv4=vm['ipv4'],
ipv6=vm['ipv6']
)
return vm_detail_obj
def get_vm_price(cpu, memory, disk_size, hdd_size=0, pricing_name='default'):
"""
A helper function that computes price of a VM from given cpu, ram and
ssd parameters
:param cpu: Number of cores of the VM
:param memory: RAM of the VM
:param disk_size: Disk space of the VM (SSD)
:param hdd_size: The HDD size
:param pricing_name: The pricing name to be used
:return: The price of the VM
"""
try:
pricing = VMPricing.objects.get(name=pricing_name)
except Exception as ex:
logger.error(
"Error getting VMPricing object for {pricing_name}."
"Details: {details}".format(
pricing_name=pricing_name, details=str(ex)
)
)
return None
price = ((decimal.Decimal(cpu) * pricing.cores_unit_price) +
(decimal.Decimal(memory) * pricing.ram_unit_price) +
(decimal.Decimal(disk_size) * pricing.ssd_unit_price) +
(decimal.Decimal(hdd_size) * pricing.hdd_unit_price) +
decimal.Decimal(settings.VM_BASE_PRICE))
cents = decimal.Decimal('.01')
price = price.quantize(cents, decimal.ROUND_HALF_UP)
return round(float(price), 2)
def get_vm_price_for_given_vat(cpu, memory, ssd_size, hdd_size=0,
pricing_name='default', vat_rate=0):
try:
pricing = VMPricing.objects.get(name=pricing_name)
except Exception as ex:
logger.error(
"Error getting VMPricing object for {pricing_name}."
"Details: {details}".format(
pricing_name=pricing_name, details=str(ex)
)
)
return None
price = (
(decimal.Decimal(cpu) * pricing.cores_unit_price) +
(decimal.Decimal(memory) * pricing.ram_unit_price) +
(decimal.Decimal(ssd_size) * pricing.ssd_unit_price) +
(decimal.Decimal(hdd_size) * pricing.hdd_unit_price) +
decimal.Decimal(settings.VM_BASE_PRICE)
)
discount_name = pricing.discount_name
discount_amount = round(float(pricing.discount_amount), 2)
vat = price * decimal.Decimal(vat_rate) * decimal.Decimal(0.01)
vat_percent = vat_rate
cents = decimal.Decimal('.01')
price = price.quantize(cents, decimal.ROUND_HALF_UP)
vat = vat.quantize(cents, decimal.ROUND_HALF_UP)
discount_amount_with_vat = decimal.Decimal(discount_amount) * (1 + decimal.Decimal(vat_rate) * decimal.Decimal(0.01))
discount_amount_with_vat = discount_amount_with_vat.quantize(cents, decimal.ROUND_HALF_UP)
discount = {
'name': discount_name,
'amount': discount_amount,
'amount_with_vat': round(float(discount_amount_with_vat), 2),
'stripe_coupon_id': pricing.stripe_coupon_id
}
return (round(float(price), 2), round(float(vat), 2),
round(float(vat_percent), 2), discount)
def get_vm_price_with_vat(cpu, memory, ssd_size, hdd_size=0,
pricing_name='default'):
"""
A helper function that computes price of a VM from given cpu, ram and
ssd, hdd and the pricing parameters
:param cpu: Number of cores of the VM
:param memory: RAM of the VM
:param ssd_size: Disk space of the VM (SSD)
:param hdd_size: The HDD size
:param pricing_name: The pricing name to be used
:return: The a tuple containing the price of the VM, the VAT and the
VAT percentage
"""
try:
pricing = VMPricing.objects.get(name=pricing_name)
except Exception as ex:
logger.error(
"Error getting VMPricing object for {pricing_name}."
"Details: {details}".format(
pricing_name=pricing_name, details=str(ex)
)
)
return None
price = (
(decimal.Decimal(cpu) * pricing.cores_unit_price) +
(decimal.Decimal(memory) * pricing.ram_unit_price) +
(decimal.Decimal(ssd_size) * pricing.ssd_unit_price) +
(decimal.Decimal(hdd_size) * pricing.hdd_unit_price) +
decimal.Decimal(settings.VM_BASE_PRICE)
)
if pricing.vat_inclusive:
vat = decimal.Decimal(0)
vat_percent = decimal.Decimal(0)
else:
vat = price * pricing.vat_percentage * decimal.Decimal(0.01)
vat_percent = pricing.vat_percentage
cents = decimal.Decimal('.01')
price = price.quantize(cents, decimal.ROUND_HALF_UP)
vat = vat.quantize(cents, decimal.ROUND_HALF_UP)
discount = {
'name': pricing.discount_name,
'amount': round(float(pricing.discount_amount), 2),
'stripe_coupon_id': pricing.stripe_coupon_id
}
return (round(float(price), 2), round(float(vat), 2),
round(float(vat_percent), 2), discount)
def ping_ok(host_ipv6):
"""
A utility method to check if a host responds to ping requests. Note: the
function relies on `ping6` utility of debian to check.
:param host_ipv6 str type parameter that represets the ipv6 of the host to
checked
:return True if the host responds to ping else returns False
"""
try:
subprocess.check_output("ping6 -c 1 " + host_ipv6, shell=True)
except Exception as ex:
logger.debug(host_ipv6 + " not reachable via ping. Error = " + str(ex))
return False
return True
def get_vat_rate_for_country(country):
vat_rate = None
try:
vat_rate = VATRates.objects.get(
territory_codes=country, start_date__isnull=False, stop_date=None
)
logger.debug("VAT rate for %s is %s" % (country, vat_rate.rate))
return vat_rate.rate
except VATRates.DoesNotExist as dne:
logger.debug(str(dne))
logger.debug("Did not find VAT rate for %s, returning 0" % country)
return 0
def get_ip_addresses(vm_id):
try:
vm_detail = VMDetail.objects.get(vm_id=vm_id)
return "%s <br/>%s" % (vm_detail.ipv6, vm_detail.ipv4)
except VMDetail.DoesNotExist as dne:
logger.error(str(dne))
logger.error("VMDetail for %s does not exist" % vm_id)
return "--"
class HostingUtils:
@staticmethod
def clear_items_from_list(from_list, items_list):
"""
A utility function to clear items from a given list.
Useful when deleting items in bulk from session.
e.g.:
HostingUtils.clear_items_from_list(
request.session,
['token', 'billing_address_data', 'card_id',]
)
:param from_list:
:param items_list:
:return:
"""
for var in items_list:
if var in from_list:
del from_list[var]