diff --git a/hosting/utils.py b/hosting/utils.py new file mode 100644 index 0000000..2f22996 --- /dev/null +++ b/hosting/utils.py @@ -0,0 +1,303 @@ +import datetime +import decimal +import json + +import stripe +from django.http import JsonResponse + +from dynamicweb2.stripe_utils import StripeUtils +from hosting.models import VATRates, UserBillingAddress, BillingAddress, StripeCustomer, UserHostingKey, VMPricing +from django.conf import settings +from django.contrib import messages +from django.utils.translation import get_language, gettext_lazy as _ +from django.urls import reverse +import logging + +logger = logging.getLogger(__name__) + + +eu_countries = ['at', 'be', 'bg', 'ch', 'cy', 'cz', 'hr', 'dk', + 'ee', 'fi', 'fr', 'mc', 'de', 'gr', 'hu', 'ie', 'it', + 'lv', 'lu', 'mt', 'nl', 'pl', 'pt', 'ro', 'sk', 'si', 'es', + 'se', 'gb'] + + +def get_vat_rate_for_country(country): + try: + vat_rate = VATRates.objects.get( + territory_codes=country, start_date__isnull=False, stop_date=None + ) + logger.debug("VAT rate for %s is %s" % (country, vat_rate.rate)) + return vat_rate.rate + except VATRates.DoesNotExist as dne: + logger.debug(str(dne)) + logger.debug("Did not find VAT rate for %s, returning 0" % country) + return 0 + + +def validate_vat_number(stripe_customer_id, billing_address_id, + is_user_ba=False): + if is_user_ba: + billing_address = get_billing_address(billing_address_id) + else: + try: + billing_address = BillingAddress.objects.get(id=billing_address_id) + except BillingAddress.DoesNotExist: + billing_address = None + logger.debug("BillingAddress does not exist for %s" % billing_address_id) + except BillingAddress.MultipleObjectsReturned: + logger.debug("Multiple BillingAddress exist for %s" % billing_address_id) + billing_address = BillingAddress.objects.filter(id=billing_address_id).order_by('-id').first() + if billing_address is not None: + logger.debug("BillingAddress found: %s %s type=%s" % ( + billing_address_id, str(billing_address), type(billing_address))) + if billing_address.country.lower().strip() not in eu_countries: + return { + "validated_on": "", + "status": "not_needed" + } + if billing_address.vat_number_validated_on: + logger.debug("billing_address verified on %s" % + billing_address.vat_number_validated_on) + return { + "validated_on": billing_address.vat_number_validated_on, + "status": "verified" + } + else: + logger.debug("billing_address not yet verified, " + "Checking if we already have a tax id") + if billing_address.stripe_tax_id: + logger.debug("We have a tax id %s" % billing_address.stripe_tax_id) + tax_id_obj = stripe.Customer.retrieve_tax_id( + stripe_customer_id, + billing_address.stripe_tax_id, + ) + if tax_id_obj.verification.status == "verified": + logger.debug("Latest status on Stripe=%s. Updating" % + tax_id_obj.verification.status) + # update billing address + billing_address.vat_number_validated_on = datetime.datetime.now() + billing_address.vat_validation_status = tax_id_obj.verification.status + billing_address.save() + return { + "status": "verified", + "validated_on": billing_address.vat_number_validated_on + } + else: + billing_address.vat_validation_status = tax_id_obj.verification.status + billing_address.save() + logger.debug( + "Latest status on Stripe=%s" % str(tax_id_obj) + ) + return { + "status": tax_id_obj.verification.status if tax_id_obj + else "unknown", + "validated_on": "" + } + else: + logger.debug("Creating a tax id") + logger.debug("Billing address = %s" % str(billing_address)) + tax_id_obj = create_tax_id( + stripe_customer_id, billing_address_id, + "ch_vat" if billing_address.country.lower() == "ch" else "eu_vat", + is_user_ba=is_user_ba + ) + logger.debug("tax_id_obj = %s" % str(tax_id_obj)) + else: + logger.debug("invalid billing address") + return { + "status": "invalid billing address", + "validated_on": "" + } + + if 'response_object' in tax_id_obj: + return tax_id_obj + + return { + "status": tax_id_obj.verification.status, + "validated_on": datetime.datetime.now() if tax_id_obj.verification.status == "verified" else "" + } + + +def get_billing_address(billing_address_id): + try: + billing_address = UserBillingAddress.objects.get( + id=billing_address_id) + except UserBillingAddress.DoesNotExist: + billing_address = None + logger.debug( + "UserBillingAddress does not exist for %s" % billing_address_id) + except UserBillingAddress.MultipleObjectsReturned: + logger.debug( + "Multiple UserBillingAddress exist for %s" % billing_address_id) + billing_address = UserBillingAddress.objects.filter( + id=billing_address_id).order_by('-id').first() + return billing_address + + +def create_tax_id(stripe_customer_id, billing_address_id, vat_type, is_user_ba=False): + if is_user_ba: + billing_address = get_billing_address(billing_address_id) + else: + try: + billing_address = BillingAddress.objects.get(id=billing_address_id) + except BillingAddress.DoesNotExist: + billing_address = None + logger.debug("BillingAddress does not exist for %s" % billing_address_id) + except BillingAddress.MultipleObjectsReturned: + logger.debug("Multiple BillingAddress exist for %s" % billing_address_id) + billing_address = BillingAddress.objects.filter(billing_address_id).order_by('-id').first() + + tax_id_obj = None + tax_id_response = None + if billing_address: + stripe_utils = StripeUtils() + tax_id_response = stripe_utils.get_or_create_tax_id_for_user( + stripe_customer_id, + vat_number=billing_address.vat_number, + vat_type=vat_type, + country=billing_address.country + ) + + tax_id_obj = tax_id_response.get('response_object') + + if not tax_id_obj: + logger.debug("Received none in tax_id_obj") + return { + 'paid': False, + 'response_object': None, + 'error': tax_id_response["error"] if tax_id_response and 'error' in tax_id_response else + "No such address found" + } + + try: + stripe_customer = StripeCustomer.objects.get(stripe_id=stripe_customer_id) + billing_address_set = set() + logger.debug("Updating billing address") + for ho in stripe_customer.hostingorder_set.all(): + if ho.billing_address.vat_number == billing_address.vat_number: + billing_address_set.add(ho.billing_address) + for b_address in billing_address_set: + b_address.stripe_tax_id = tax_id_obj.id + b_address.vat_validation_status = tax_id_obj.verification.status + b_address.save() + logger.debug("Updated billing_address %s" % str(b_address)) + + ub_addresses = stripe_customer.user.billing_addresses.filter( + vat_number=billing_address.vat_number) + for ub_address in ub_addresses: + ub_address.stripe_tax_id = tax_id_obj.id + ub_address.vat_validation_status = tax_id_obj.verification.status + ub_address.save() + logger.debug("Updated user_billing_address %s" % str(ub_address)) + except StripeCustomer.DoesNotExist: + logger.debug("StripeCustomer %s does not exist" % stripe_customer_id) + billing_address.stripe_tax_id = tax_id_obj.id + billing_address.vat_validation_status = tax_id_obj.verification.status + billing_address.save() + return tax_id_obj + + +def get_vm_price_for_given_vat(cpu, memory, ssd_size, hdd_size=0, + pricing_name='default', vat_rate=0): + try: + pricing = VMPricing.objects.get(name=pricing_name) + except Exception as ex: + logger.error( + "Error getting VMPricing object for {pricing_name}." + "Details: {details}".format( + pricing_name=pricing_name, details=str(ex) + ) + ) + return None + + price = ( + (decimal.Decimal(cpu) * pricing.cores_unit_price) + + (decimal.Decimal(memory) * pricing.ram_unit_price) + + (decimal.Decimal(ssd_size) * pricing.ssd_unit_price) + + (decimal.Decimal(hdd_size) * pricing.hdd_unit_price) + + decimal.Decimal(settings.VM_BASE_PRICE) + ) + + discount_name = pricing.discount_name + discount_amount = round(float(pricing.discount_amount), 2) + vat = price * decimal.Decimal(vat_rate) * decimal.Decimal(0.01) + vat_percent = vat_rate + + cents = decimal.Decimal('.01') + price = price.quantize(cents, decimal.ROUND_HALF_UP) + vat = vat.quantize(cents, decimal.ROUND_HALF_UP) + discount_amount_with_vat = (decimal.Decimal(discount_amount) * + (1 + decimal.Decimal(vat_rate) * decimal.Decimal(0.01))) + discount_amount_with_vat = discount_amount_with_vat.quantize(cents, decimal.ROUND_HALF_UP) + discount = { + 'name': discount_name, + 'amount': discount_amount, + 'amount_with_vat': round(float(discount_amount_with_vat), 2), + 'stripe_coupon_id': pricing.stripe_coupon_id + } + return (round(float(price), 2), round(float(vat), 2), + round(float(vat_percent), 2), discount) + + +def get_all_public_keys(customer): + """ + Returns all the public keys of the user + :param customer: The customer whose public keys are needed + :return: A list of public keys + """ + return UserHostingKey.objects.filter(user_id=customer.id).values_list( + "public_key", flat=True).distinct() + + +def create_incomplete_intent_request(request): + """ + Creates a dictionary of all session variables so that they could be + picked up in the webhook for processing. + + :param request: + :return: + """ + req = { + 'scheme': request.scheme, + 'host': request.get_host(), + 'language': get_language(), + 'new_user_hosting_key_id': request.session.get( + 'new_user_hosting_key_id', None), + 'card_id': request.session.get('card_id', None), + 'generic_payment_type': request.session.get( + 'generic_payment_type', None), + 'generic_payment_details': request.session.get( + 'generic_payment_details', None), + 'user': request.session.get('user', None), + 'id_payment_method': request.session.get('id_payment_method', None), + } + return json.dumps(req) + + +def get_error_response_dict(msg, request): + logger.error(msg) + response = { + 'status': False, + 'redirect': "{url}#{section}".format( + url=(reverse( + 'show_product', + kwargs={'product_slug': request.session['generic_payment_details']['product_slug']} + ) if 'generic_payment_details' in request.session else + reverse('hosting:payment') + ), + section='payment_error' + ), + 'msg_title': str(_('Error.')), + 'msg_body': str( + _('There was a payment related error.' + ' On close of this popup, you will be redirected back to' + ' the payment page.')) + } + return response + + +def show_error(msg, request): + logger.error(msg) + messages.add_message(request, messages.ERROR, msg, extra_tags='failed_payment') + return JsonResponse(get_error_response_dict(msg,request))