Add hosting/utils

This commit is contained in:
M.Ravi 2023-12-02 13:50:26 +05:30
parent 45f84182df
commit f62a2951a0

303
hosting/utils.py Normal file
View file

@ -0,0 +1,303 @@
import datetime
import decimal
import json
import stripe
from django.http import JsonResponse
from dynamicweb2.stripe_utils import StripeUtils
from hosting.models import VATRates, UserBillingAddress, BillingAddress, StripeCustomer, UserHostingKey, VMPricing
from django.conf import settings
from django.contrib import messages
from django.utils.translation import get_language, gettext_lazy as _
from django.urls import reverse
import logging
logger = logging.getLogger(__name__)
eu_countries = ['at', 'be', 'bg', 'ch', 'cy', 'cz', 'hr', 'dk',
'ee', 'fi', 'fr', 'mc', 'de', 'gr', 'hu', 'ie', 'it',
'lv', 'lu', 'mt', 'nl', 'pl', 'pt', 'ro', 'sk', 'si', 'es',
'se', 'gb']
def get_vat_rate_for_country(country):
try:
vat_rate = VATRates.objects.get(
territory_codes=country, start_date__isnull=False, stop_date=None
)
logger.debug("VAT rate for %s is %s" % (country, vat_rate.rate))
return vat_rate.rate
except VATRates.DoesNotExist as dne:
logger.debug(str(dne))
logger.debug("Did not find VAT rate for %s, returning 0" % country)
return 0
def validate_vat_number(stripe_customer_id, billing_address_id,
is_user_ba=False):
if is_user_ba:
billing_address = get_billing_address(billing_address_id)
else:
try:
billing_address = BillingAddress.objects.get(id=billing_address_id)
except BillingAddress.DoesNotExist:
billing_address = None
logger.debug("BillingAddress does not exist for %s" % billing_address_id)
except BillingAddress.MultipleObjectsReturned:
logger.debug("Multiple BillingAddress exist for %s" % billing_address_id)
billing_address = BillingAddress.objects.filter(id=billing_address_id).order_by('-id').first()
if billing_address is not None:
logger.debug("BillingAddress found: %s %s type=%s" % (
billing_address_id, str(billing_address), type(billing_address)))
if billing_address.country.lower().strip() not in eu_countries:
return {
"validated_on": "",
"status": "not_needed"
}
if billing_address.vat_number_validated_on:
logger.debug("billing_address verified on %s" %
billing_address.vat_number_validated_on)
return {
"validated_on": billing_address.vat_number_validated_on,
"status": "verified"
}
else:
logger.debug("billing_address not yet verified, "
"Checking if we already have a tax id")
if billing_address.stripe_tax_id:
logger.debug("We have a tax id %s" % billing_address.stripe_tax_id)
tax_id_obj = stripe.Customer.retrieve_tax_id(
stripe_customer_id,
billing_address.stripe_tax_id,
)
if tax_id_obj.verification.status == "verified":
logger.debug("Latest status on Stripe=%s. Updating" %
tax_id_obj.verification.status)
# update billing address
billing_address.vat_number_validated_on = datetime.datetime.now()
billing_address.vat_validation_status = tax_id_obj.verification.status
billing_address.save()
return {
"status": "verified",
"validated_on": billing_address.vat_number_validated_on
}
else:
billing_address.vat_validation_status = tax_id_obj.verification.status
billing_address.save()
logger.debug(
"Latest status on Stripe=%s" % str(tax_id_obj)
)
return {
"status": tax_id_obj.verification.status if tax_id_obj
else "unknown",
"validated_on": ""
}
else:
logger.debug("Creating a tax id")
logger.debug("Billing address = %s" % str(billing_address))
tax_id_obj = create_tax_id(
stripe_customer_id, billing_address_id,
"ch_vat" if billing_address.country.lower() == "ch" else "eu_vat",
is_user_ba=is_user_ba
)
logger.debug("tax_id_obj = %s" % str(tax_id_obj))
else:
logger.debug("invalid billing address")
return {
"status": "invalid billing address",
"validated_on": ""
}
if 'response_object' in tax_id_obj:
return tax_id_obj
return {
"status": tax_id_obj.verification.status,
"validated_on": datetime.datetime.now() if tax_id_obj.verification.status == "verified" else ""
}
def get_billing_address(billing_address_id):
try:
billing_address = UserBillingAddress.objects.get(
id=billing_address_id)
except UserBillingAddress.DoesNotExist:
billing_address = None
logger.debug(
"UserBillingAddress does not exist for %s" % billing_address_id)
except UserBillingAddress.MultipleObjectsReturned:
logger.debug(
"Multiple UserBillingAddress exist for %s" % billing_address_id)
billing_address = UserBillingAddress.objects.filter(
id=billing_address_id).order_by('-id').first()
return billing_address
def create_tax_id(stripe_customer_id, billing_address_id, vat_type, is_user_ba=False):
if is_user_ba:
billing_address = get_billing_address(billing_address_id)
else:
try:
billing_address = BillingAddress.objects.get(id=billing_address_id)
except BillingAddress.DoesNotExist:
billing_address = None
logger.debug("BillingAddress does not exist for %s" % billing_address_id)
except BillingAddress.MultipleObjectsReturned:
logger.debug("Multiple BillingAddress exist for %s" % billing_address_id)
billing_address = BillingAddress.objects.filter(billing_address_id).order_by('-id').first()
tax_id_obj = None
tax_id_response = None
if billing_address:
stripe_utils = StripeUtils()
tax_id_response = stripe_utils.get_or_create_tax_id_for_user(
stripe_customer_id,
vat_number=billing_address.vat_number,
vat_type=vat_type,
country=billing_address.country
)
tax_id_obj = tax_id_response.get('response_object')
if not tax_id_obj:
logger.debug("Received none in tax_id_obj")
return {
'paid': False,
'response_object': None,
'error': tax_id_response["error"] if tax_id_response and 'error' in tax_id_response else
"No such address found"
}
try:
stripe_customer = StripeCustomer.objects.get(stripe_id=stripe_customer_id)
billing_address_set = set()
logger.debug("Updating billing address")
for ho in stripe_customer.hostingorder_set.all():
if ho.billing_address.vat_number == billing_address.vat_number:
billing_address_set.add(ho.billing_address)
for b_address in billing_address_set:
b_address.stripe_tax_id = tax_id_obj.id
b_address.vat_validation_status = tax_id_obj.verification.status
b_address.save()
logger.debug("Updated billing_address %s" % str(b_address))
ub_addresses = stripe_customer.user.billing_addresses.filter(
vat_number=billing_address.vat_number)
for ub_address in ub_addresses:
ub_address.stripe_tax_id = tax_id_obj.id
ub_address.vat_validation_status = tax_id_obj.verification.status
ub_address.save()
logger.debug("Updated user_billing_address %s" % str(ub_address))
except StripeCustomer.DoesNotExist:
logger.debug("StripeCustomer %s does not exist" % stripe_customer_id)
billing_address.stripe_tax_id = tax_id_obj.id
billing_address.vat_validation_status = tax_id_obj.verification.status
billing_address.save()
return tax_id_obj
def get_vm_price_for_given_vat(cpu, memory, ssd_size, hdd_size=0,
pricing_name='default', vat_rate=0):
try:
pricing = VMPricing.objects.get(name=pricing_name)
except Exception as ex:
logger.error(
"Error getting VMPricing object for {pricing_name}."
"Details: {details}".format(
pricing_name=pricing_name, details=str(ex)
)
)
return None
price = (
(decimal.Decimal(cpu) * pricing.cores_unit_price) +
(decimal.Decimal(memory) * pricing.ram_unit_price) +
(decimal.Decimal(ssd_size) * pricing.ssd_unit_price) +
(decimal.Decimal(hdd_size) * pricing.hdd_unit_price) +
decimal.Decimal(settings.VM_BASE_PRICE)
)
discount_name = pricing.discount_name
discount_amount = round(float(pricing.discount_amount), 2)
vat = price * decimal.Decimal(vat_rate) * decimal.Decimal(0.01)
vat_percent = vat_rate
cents = decimal.Decimal('.01')
price = price.quantize(cents, decimal.ROUND_HALF_UP)
vat = vat.quantize(cents, decimal.ROUND_HALF_UP)
discount_amount_with_vat = (decimal.Decimal(discount_amount) *
(1 + decimal.Decimal(vat_rate) * decimal.Decimal(0.01)))
discount_amount_with_vat = discount_amount_with_vat.quantize(cents, decimal.ROUND_HALF_UP)
discount = {
'name': discount_name,
'amount': discount_amount,
'amount_with_vat': round(float(discount_amount_with_vat), 2),
'stripe_coupon_id': pricing.stripe_coupon_id
}
return (round(float(price), 2), round(float(vat), 2),
round(float(vat_percent), 2), discount)
def get_all_public_keys(customer):
"""
Returns all the public keys of the user
:param customer: The customer whose public keys are needed
:return: A list of public keys
"""
return UserHostingKey.objects.filter(user_id=customer.id).values_list(
"public_key", flat=True).distinct()
def create_incomplete_intent_request(request):
"""
Creates a dictionary of all session variables so that they could be
picked up in the webhook for processing.
:param request:
:return:
"""
req = {
'scheme': request.scheme,
'host': request.get_host(),
'language': get_language(),
'new_user_hosting_key_id': request.session.get(
'new_user_hosting_key_id', None),
'card_id': request.session.get('card_id', None),
'generic_payment_type': request.session.get(
'generic_payment_type', None),
'generic_payment_details': request.session.get(
'generic_payment_details', None),
'user': request.session.get('user', None),
'id_payment_method': request.session.get('id_payment_method', None),
}
return json.dumps(req)
def get_error_response_dict(msg, request):
logger.error(msg)
response = {
'status': False,
'redirect': "{url}#{section}".format(
url=(reverse(
'show_product',
kwargs={'product_slug': request.session['generic_payment_details']['product_slug']}
) if 'generic_payment_details' in request.session else
reverse('hosting:payment')
),
section='payment_error'
),
'msg_title': str(_('Error.')),
'msg_body': str(
_('There was a payment related error.'
' On close of this popup, you will be redirected back to'
' the payment page.'))
}
return response
def show_error(msg, request):
logger.error(msg)
messages.add_message(request, messages.ERROR, msg, extra_tags='failed_payment')
return JsonResponse(get_error_response_dict(msg,request))