888 lines
33 KiB
Python
888 lines
33 KiB
Python
import unicodedata
|
|
import logging
|
|
import random
|
|
import os
|
|
|
|
from django.db import models
|
|
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin
|
|
from django.contrib.sites.models import Site
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.conf import settings
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.urls import reverse
|
|
from django.db import IntegrityError, models
|
|
from guardian.shortcuts import assign_perm
|
|
|
|
from .fields import CountryField
|
|
from .mailer import BaseEmail
|
|
from dynamicweb2.ldap_manager import LdapManager
|
|
from dynamicweb2.stripe_utils import StripeUtils
|
|
from django.utils.crypto import get_random_string
|
|
from Crypto.PublicKey import RSA
|
|
|
|
from django.utils.functional import cached_property
|
|
|
|
# Create your models here.
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_validation_slug():
|
|
return make_password(None)
|
|
|
|
|
|
def validate_name(value):
|
|
valid_chars = [char for char in value if (char.isalpha() or char == "-" or char == " ")]
|
|
if len(valid_chars) < len(value):
|
|
raise ValidationError(
|
|
_('%(value)s is not a valid name. A valid name can only include letters, spaces or -'),
|
|
params={'value': value},
|
|
)
|
|
|
|
|
|
class MyUserManager(BaseUserManager):
|
|
def create_user(self, email, name, password=None):
|
|
"""
|
|
Creates and saves a User with the given email,name and password.
|
|
"""
|
|
|
|
if not email:
|
|
raise ValueError('Users must have an email address')
|
|
|
|
user = self.model(
|
|
email=self.normalize_email(email),
|
|
name=name,
|
|
validation_slug=make_password(None)
|
|
)
|
|
user.is_admin = False
|
|
user.set_password(password)
|
|
user.save(using=self._db)
|
|
user.create_ldap_account(password)
|
|
return user
|
|
|
|
def create_superuser(self, email, name, password):
|
|
"""
|
|
Creates and saves a superuser with the given email, name and password.
|
|
"""
|
|
user = self.create_user(email,
|
|
password=password,
|
|
name=name,
|
|
)
|
|
user.is_admin = True
|
|
user.is_active = True
|
|
user.is_superuser = True
|
|
user.save(using=self._db)
|
|
return user
|
|
|
|
|
|
class CustomUser(AbstractBaseUser, PermissionsMixin):
|
|
VALIDATED_CHOICES = ((0, 'Not validated'), (1, 'Validated'))
|
|
site = models.ForeignKey(Site, default=1, on_delete=models.CASCADE)
|
|
name = models.CharField(max_length=50, validators=[validate_name])
|
|
email = models.EmailField(unique=True)
|
|
username = models.CharField(max_length=60, unique=True, null=True)
|
|
validated = models.IntegerField(choices=VALIDATED_CHOICES, default=0)
|
|
in_ldap = models.BooleanField(default=False)
|
|
# By default, we initialize the validation_slug with appropriate value
|
|
# This is required for User(page) admin
|
|
validation_slug = models.CharField(
|
|
db_index=True, unique=True, max_length=50,
|
|
default=get_validation_slug
|
|
)
|
|
is_admin = models.BooleanField(
|
|
_('staff status'),
|
|
default=False,
|
|
help_text=_(
|
|
'Designates whether the user can log into this admin site.'),
|
|
)
|
|
import_stripe_bill_remark = models.TextField(
|
|
default="",
|
|
help_text="Indicates any issues while importing stripe bills"
|
|
)
|
|
|
|
objects = MyUserManager()
|
|
|
|
USERNAME_FIELD = "email"
|
|
REQUIRED_FIELDS = ['name', 'password']
|
|
|
|
# Define the groups field with a related_name to avoid clash
|
|
groups = models.ManyToManyField(
|
|
'auth.Group',
|
|
related_name='custom_user_set', # Define a unique related_name
|
|
blank=True,
|
|
verbose_name='groups',
|
|
help_text='The groups this user belongs to.',
|
|
)
|
|
|
|
# Define the user_permissions field with a related_name to avoid clash
|
|
user_permissions = models.ManyToManyField(
|
|
'auth.Permission',
|
|
related_name='custom_user_permission_set', # Define a unique related_name
|
|
blank=True,
|
|
verbose_name='user permissions',
|
|
help_text='Specific permissions for this user.',
|
|
)
|
|
|
|
@classmethod
|
|
def register(cls, name, password, email, app='digital_glarus',
|
|
base_url=None, send_email=True, account_details=None):
|
|
user = cls.objects.filter(email=email).first()
|
|
if not user:
|
|
user = cls.objects.create_user(name=name, email=email,
|
|
password=password)
|
|
if user:
|
|
if app == 'dcl':
|
|
dcl_text = settings.DCL_TEXT
|
|
user.is_active = False
|
|
if send_email is True:
|
|
email_data = {
|
|
'subject': '{dcl_text} {account_activation}'.format(
|
|
dcl_text=dcl_text,
|
|
account_activation=_('Account Activation')
|
|
),
|
|
'from_address': settings.DCL_SUPPORT_FROM_ADDRESS,
|
|
'to': user.email,
|
|
'context': {'base_url': base_url,
|
|
'activation_link': reverse(
|
|
'hosting:validate',
|
|
kwargs={
|
|
'validate_slug': user.validation_slug}),
|
|
'dcl_text': dcl_text
|
|
},
|
|
'template_name': 'user_activation',
|
|
'template_path': 'hosting/emails/'
|
|
}
|
|
if account_details:
|
|
email_data['context'][
|
|
'account_details'] = account_details
|
|
email = BaseEmail(**email_data)
|
|
email.send()
|
|
return user
|
|
else:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
@classmethod
|
|
def get_all_members(cls):
|
|
return cls.objects.filter(
|
|
stripecustomer__membershiporder__isnull=False)
|
|
|
|
@classmethod
|
|
def validate_url(cls, validation_slug):
|
|
user = cls.objects.filter(validation_slug=validation_slug).first()
|
|
if user:
|
|
user.validated = 1
|
|
user.save()
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def get_random_password(cls):
|
|
return get_random_string(24)
|
|
|
|
def is_superuser(self):
|
|
return False
|
|
|
|
def get_full_name(self):
|
|
# The user is identified by their email address
|
|
return self.email
|
|
|
|
def get_short_name(self):
|
|
# The user is identified by their email address
|
|
return self.email
|
|
|
|
def get_first_and_last_name(self, full_name):
|
|
first_name, *last_name = full_name.split(" ")
|
|
last_name = " ".join(last_name)
|
|
return first_name, last_name
|
|
|
|
def assign_username(self, user):
|
|
if not user.username:
|
|
ldap_manager = LdapManager()
|
|
|
|
# Try to come up with a username
|
|
first_name, last_name = self.get_first_and_last_name(user.name)
|
|
user.username = unicodedata.normalize('NFKD', first_name + last_name)
|
|
user.username = "".join([char for char in user.username if char.isalnum()]).lower()
|
|
exist = True
|
|
while exist:
|
|
# Check if it exists
|
|
exist, entries = ldap_manager.check_user_exists(user.username)
|
|
if exist:
|
|
# If username exists in ldap, come up with a new user name and check it again
|
|
user.username = user.username + str(random.randint(0, 2 ** 10))
|
|
else:
|
|
# If username does not exists in ldap, try to save it in database
|
|
try:
|
|
user.save()
|
|
except IntegrityError:
|
|
# If username exists in database then come up with a new username
|
|
user.username = user.username + str(random.randint(0, 2 ** 10))
|
|
exist = True
|
|
|
|
def create_ldap_account(self, password):
|
|
# create ldap account for user if it does not exists already.
|
|
if self.in_ldap:
|
|
return
|
|
self.assign_username(self)
|
|
ldap_manager = LdapManager()
|
|
try:
|
|
user_exists_in_ldap, entries = ldap_manager.check_user_exists(self.username)
|
|
except Exception:
|
|
logger.exception("Exception occur while searching for user in LDAP")
|
|
else:
|
|
if not user_exists_in_ldap:
|
|
# IF no ldap account
|
|
first_name, last_name = self.get_first_and_last_name(self.name)
|
|
if not last_name:
|
|
last_name = first_name
|
|
ldap_manager.create_user(self.username, password=password,
|
|
firstname=first_name, lastname=last_name,
|
|
email=self.email)
|
|
else:
|
|
# User exists already in LDAP, but with a dummy credential
|
|
# We are here implies that the user has successfully
|
|
# authenticated against Django db, and a corresponding user
|
|
# exists in LDAP.
|
|
# We just update the LDAP credentials once again, assuming it
|
|
# was set to a dummy value while migrating users from Django to
|
|
# LDAP
|
|
ldap_manager.change_password(self.username, password)
|
|
self.in_ldap = True
|
|
self.save()
|
|
|
|
def __str__(self): # __unicode__ on Python 2
|
|
return self.email
|
|
|
|
# def has_perm(self, perm, obj=None):
|
|
# "Does the user have a specific permission?"
|
|
# # Simplest possible answer: Yes, always
|
|
# return self.is_admin
|
|
|
|
def has_module_perms(self, app_label):
|
|
"Does the user have permissions to view the app `app_label`?"
|
|
# Simplest possible answer: Yes, always
|
|
return self.is_admin
|
|
|
|
@property
|
|
def is_staff(self):
|
|
"Is the user a member of staff?"
|
|
# Simplest possible answer: All admins are staff
|
|
return self.is_admin
|
|
|
|
@is_staff.setter
|
|
def is_staff(self, value):
|
|
self._is_staff = value
|
|
|
|
|
|
class StripeCustomer(models.Model):
|
|
user = models.OneToOneField(CustomUser, on_delete=models.CASCADE)
|
|
stripe_id = models.CharField(unique=True, max_length=100)
|
|
|
|
def __str__(self):
|
|
return "%s - %s" % (self.stripe_id, self.user.email)
|
|
|
|
@classmethod
|
|
def create_stripe_api_customer(cls, email=None, id_payment_method=None,
|
|
customer_name=None):
|
|
"""
|
|
This method creates a Stripe API customer with the given
|
|
email, token and customer_name. This is different from
|
|
get_or_create method below in that it does not create a
|
|
CustomUser and associate the customer created in stripe
|
|
with it, while get_or_create does that before creating the
|
|
stripe user.
|
|
"""
|
|
stripe_utils = StripeUtils()
|
|
stripe_data = stripe_utils.create_customer(
|
|
id_payment_method, email, customer_name)
|
|
if stripe_data.get('response_object'):
|
|
stripe_cus_id = stripe_data.get('response_object').get('id')
|
|
return stripe_cus_id
|
|
else:
|
|
return None
|
|
|
|
@classmethod
|
|
def get_or_create(cls, email=None, token=None, id_payment_method=None):
|
|
"""
|
|
Check if there is a registered stripe customer with that email
|
|
or create a new one
|
|
"""
|
|
try:
|
|
stripe_utils = StripeUtils()
|
|
stripe_customer = cls.objects.get(user__email=email)
|
|
# check if user is not in stripe but in database
|
|
customer = stripe_utils.check_customer(stripe_customer.stripe_id,
|
|
stripe_customer.user, token)
|
|
if "deleted" in customer and customer["deleted"]:
|
|
raise StripeCustomer.DoesNotExist()
|
|
return stripe_customer
|
|
except StripeCustomer.DoesNotExist:
|
|
user = CustomUser.objects.get(email=email)
|
|
stripe_utils = StripeUtils()
|
|
stripe_data = stripe_utils.create_customer(token, email, user.name)
|
|
if stripe_data.get('response_object'):
|
|
stripe_cus_id = stripe_data.get('response_object').get('id')
|
|
if hasattr(user, 'stripecustomer'):
|
|
# User already had a Stripe account and we are here
|
|
# because the account was deleted in dashboard
|
|
# So, we simply update the stripe_id
|
|
user.stripecustomer.stripe_id = stripe_cus_id
|
|
user.stripecustomer.save()
|
|
stripe_customer = user.stripecustomer
|
|
else:
|
|
# The user never had an associated Stripe account
|
|
# So, create one
|
|
stripe_customer = StripeCustomer.objects.create(
|
|
user=user, stripe_id=stripe_cus_id
|
|
)
|
|
return stripe_customer
|
|
else:
|
|
return None
|
|
|
|
|
|
class BaseBillingAddress(models.Model):
|
|
cardholder_name = models.CharField(max_length=100, default="")
|
|
street_address = models.CharField(max_length=100)
|
|
city = models.CharField(max_length=50)
|
|
postal_code = models.CharField(max_length=50)
|
|
country = CountryField()
|
|
vat_number = models.CharField(max_length=100, default="", blank=True)
|
|
stripe_tax_id = models.CharField(max_length=100, default="", blank=True)
|
|
vat_number_validated_on = models.DateTimeField(blank=True, null=True)
|
|
vat_validation_status = models.CharField(max_length=25, default="",
|
|
blank=True)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
|
|
class BillingAddress(BaseBillingAddress):
|
|
def __str__(self):
|
|
if self.vat_number:
|
|
return "%s, %s, %s, %s, %s, %s %s %s %s" % (
|
|
self.cardholder_name, self.street_address, self.city,
|
|
self.postal_code, self.country, self.vat_number,
|
|
self.stripe_tax_id, self.vat_number_validated_on,
|
|
self.vat_validation_status
|
|
)
|
|
else:
|
|
return "%s, %s, %s, %s, %s" % (
|
|
self.cardholder_name, self.street_address, self.city,
|
|
self.postal_code, self.country
|
|
)
|
|
|
|
|
|
class UserBillingAddress(BaseBillingAddress):
|
|
user = models.ForeignKey(CustomUser, related_name='billing_addresses', on_delete=models.CASCADE)
|
|
current = models.BooleanField(default=True)
|
|
|
|
def __str__(self):
|
|
if self.vat_number:
|
|
return "%s, %s, %s, %s, %s, %s %s %s %s" % (
|
|
self.cardholder_name, self.street_address, self.city,
|
|
self.postal_code, self.country, self.vat_number,
|
|
self.stripe_tax_id, self.vat_number_validated_on,
|
|
self.vat_validation_status
|
|
)
|
|
else:
|
|
return "%s, %s, %s, %s, %s" % (
|
|
self.cardholder_name, self.street_address, self.city,
|
|
self.postal_code, self.country
|
|
)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'Cardholder Name': self.cardholder_name,
|
|
'Street Address': self.street_address,
|
|
'City': self.city,
|
|
'Postal Code': self.postal_code,
|
|
'Country': self.country,
|
|
'VAT Number': self.vat_number
|
|
}
|
|
|
|
|
|
class AssignPermissionsMixin(object):
|
|
permissions = tuple()
|
|
user = None
|
|
obj = None
|
|
kwargs = dict()
|
|
|
|
def assign_permissions(self, user):
|
|
for permission in self.permissions:
|
|
assign_perm(permission, user, self)
|
|
|
|
|
|
class OrderDetail(AssignPermissionsMixin, models.Model):
|
|
# vm_template = models.ForeignKey(
|
|
# VMTemplate, blank=True, null=True, default=None,
|
|
# on_delete=models.SET_NULL
|
|
# )
|
|
cores = models.IntegerField(default=0)
|
|
memory = models.IntegerField(default=0)
|
|
hdd_size = models.IntegerField(default=0)
|
|
ssd_size = models.IntegerField(default=0)
|
|
|
|
def __str__(self):
|
|
return "Not available" if self.vm_template is None else (
|
|
"%s - %s, %s cores, %s GB RAM, %s GB SSD" % (
|
|
self.vm_template.name, self.vm_template.vm_type, self.cores,
|
|
self.memory, self.ssd_size
|
|
)
|
|
)
|
|
|
|
|
|
class GenericProduct(AssignPermissionsMixin, models.Model):
|
|
permissions = ('view_genericproduct',)
|
|
product_name = models.CharField(max_length=128, default="")
|
|
product_slug = models.SlugField(
|
|
unique=True,
|
|
help_text=(
|
|
'An mandatory unique slug for the product'
|
|
)
|
|
)
|
|
product_description = models.CharField(max_length=500, default="")
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
product_price = models.DecimalField(max_digits=6, decimal_places=2)
|
|
product_vat = models.DecimalField(max_digits=6, decimal_places=4, default=0)
|
|
product_is_subscription = models.BooleanField(default=True)
|
|
product_subscription_interval = models.CharField(
|
|
max_length=10, default="month",
|
|
help_text="Choose between `year` and `month`")
|
|
exclude_vat_calculations = models.BooleanField(
|
|
default=False,
|
|
help_text="When checked VAT calculations are excluded for this product"
|
|
)
|
|
|
|
def __str__(self):
|
|
return self.product_name
|
|
|
|
def get_actual_price(self, vat_rate=None):
|
|
if self.exclude_vat_calculations:
|
|
return round(float(self.product_price), 2)
|
|
else:
|
|
VAT = vat_rate if vat_rate is not None else self.product_vat
|
|
return round(
|
|
float(self.product_price) + float(self.product_price) * float(VAT), 2
|
|
)
|
|
|
|
|
|
class HostingOrder(AssignPermissionsMixin, models.Model):
|
|
ORDER_APPROVED_STATUS = 'Approved'
|
|
ORDER_DECLINED_STATUS = 'Declined'
|
|
|
|
vm_id = models.IntegerField(default=0)
|
|
customer = models.ForeignKey(StripeCustomer, on_delete=models.CASCADE)
|
|
billing_address = models.ForeignKey(BillingAddress, on_delete=models.CASCADE)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
approved = models.BooleanField(default=False)
|
|
last4 = models.CharField(max_length=4)
|
|
cc_brand = models.CharField(max_length=128)
|
|
stripe_charge_id = models.CharField(max_length=100, null=True)
|
|
price = models.FloatField()
|
|
subscription_id = models.CharField(max_length=100, null=True)
|
|
# vm_pricing = models.ForeignKey(VMPricing)
|
|
order_detail = models.ForeignKey(
|
|
OrderDetail, null=True, blank=True, default=None,
|
|
on_delete=models.SET_NULL
|
|
)
|
|
generic_product = models.ForeignKey(
|
|
GenericProduct, null=True, blank=True, default=None,
|
|
on_delete=models.SET_NULL
|
|
)
|
|
generic_payment_description = models.CharField(
|
|
max_length=500, null=True
|
|
)
|
|
permissions = ('u_view_hostingorder',)
|
|
|
|
class Meta:
|
|
permissions = (
|
|
('u_view_hostingorder', 'View Hosting Order'),
|
|
)
|
|
|
|
def __str__(self):
|
|
hosting_order_str = ("Order Nr: #{} - VM_ID: {} - {} - {} - "
|
|
"Specs: {} - Price: {}").format(
|
|
self.id, self.vm_id, self.customer.user.email, self.created_at,
|
|
self.order_detail, self.price
|
|
)
|
|
if self.generic_product_id is not None:
|
|
hosting_order_str += " - Generic Payment"
|
|
if self.stripe_charge_id is not None:
|
|
hosting_order_str += " - One time charge"
|
|
else:
|
|
hosting_order_str += " - Recurring"
|
|
return hosting_order_str
|
|
|
|
@cached_property
|
|
def status(self):
|
|
return self.ORDER_APPROVED_STATUS if self.approved else self.ORDER_DECLINED_STATUS
|
|
|
|
@classmethod
|
|
def create(cls, price=None, vm_id=0, customer=None,
|
|
billing_address=None, vm_pricing=None):
|
|
instance = cls.objects.create(
|
|
price=price,
|
|
vm_id=vm_id,
|
|
customer=customer,
|
|
billing_address=billing_address,
|
|
vm_pricing=vm_pricing
|
|
)
|
|
instance.assign_permissions(customer.user)
|
|
return instance
|
|
|
|
def set_approved(self):
|
|
self.approved = True
|
|
self.save()
|
|
|
|
def set_stripe_charge(self, stripe_charge):
|
|
self.stripe_charge_id = stripe_charge.id
|
|
if stripe_charge.source is None:
|
|
self.last4 = stripe_charge.payment_method_details.card.last4
|
|
self.cc_brand = stripe_charge.payment_method_details.card.brand
|
|
else:
|
|
self.last4 = stripe_charge.source.last4
|
|
self.cc_brand = stripe_charge.source.brand
|
|
self.save()
|
|
|
|
def set_subscription_id(self, subscription_id, cc_details):
|
|
"""
|
|
When creating a Stripe subscription, we have subscription id.
|
|
We store this in the subscription_id field.
|
|
This method sets the subscription id
|
|
and the last4 and credit card brands used for this order.
|
|
|
|
:param subscription_id: Stripe's subscription id
|
|
:param cc_details: A dict containing card details
|
|
{last4, brand}
|
|
:return:
|
|
"""
|
|
self.subscription_id = subscription_id
|
|
self.last4 = cc_details.get('last4')
|
|
self.cc_brand = cc_details.get('brand')
|
|
self.save()
|
|
|
|
def get_cc_data(self):
|
|
return {
|
|
'last4': self.last4,
|
|
'cc_brand': self.cc_brand,
|
|
} if self.last4 and self.cc_brand else None
|
|
|
|
|
|
class UserHostingKey(models.Model):
|
|
user = models.ForeignKey(CustomUser, blank=True, null=True, on_delete=models.CASCADE)
|
|
public_key = models.TextField()
|
|
private_key = models.FileField(upload_to='private_keys', blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
name = models.CharField(max_length=100)
|
|
|
|
@staticmethod
|
|
def generate_RSA(bits=2048):
|
|
'''
|
|
Generate an RSA keypair with an exponent of 65537 in PEM format
|
|
param: bits The key length in bits
|
|
Return private key and public key
|
|
'''
|
|
new_key = RSA.generate(2048, os.urandom)
|
|
public_key = new_key.publickey().exportKey("OpenSSH")
|
|
private_key = new_key.exportKey("PEM")
|
|
return private_key, public_key
|
|
|
|
@classmethod
|
|
def generate_keys(cls):
|
|
private_key, public_key = cls.generate_RSA()
|
|
# self.public_key = public_key
|
|
# self.save(update_fields=['public_key'])
|
|
return private_key, public_key
|
|
|
|
def delete(self, *args, **kwargs):
|
|
if bool(self.private_key) and os.path.isfile(self.private_key.path):
|
|
logger.debug("Removing private key {}".format(self.private_key.path))
|
|
os.remove(self.private_key.path)
|
|
else:
|
|
logger.debug("No private_key to remove")
|
|
|
|
super(UserHostingKey, self).delete(*args, **kwargs)
|
|
|
|
|
|
def get_anonymous_user_instance(CustomUser):
|
|
return CustomUser(email='anonymous@ungleich.ch')
|
|
|
|
|
|
class VATRates(AssignPermissionsMixin, models.Model):
|
|
start_date = models.DateField(blank=True, null=True)
|
|
stop_date = models.DateField(blank=True, null=True)
|
|
territory_codes = models.TextField(blank=True, default='')
|
|
currency_code = models.CharField(max_length=10)
|
|
rate = models.FloatField()
|
|
rate_type = models.TextField(blank=True, default='')
|
|
description = models.TextField(blank=True, default='')
|
|
|
|
|
|
class StripeTaxRate(AssignPermissionsMixin, models.Model):
|
|
tax_rate_id = models.CharField(max_length=100, unique=True)
|
|
jurisdiction = models.CharField(max_length=10)
|
|
inclusive = models.BooleanField(default=False)
|
|
display_name = models.CharField(max_length=100)
|
|
percentage = models.FloatField(default=0)
|
|
description = models.CharField(max_length=100)
|
|
|
|
|
|
class IncompletePaymentIntents(AssignPermissionsMixin, models.Model):
|
|
completed_at = models.DateTimeField(null=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
payment_intent_id = models.CharField(max_length=100)
|
|
request = models.TextField()
|
|
stripe_api_cus_id = models.CharField(max_length=30)
|
|
card_details_response = models.TextField()
|
|
stripe_subscription_id = models.CharField(max_length=100, null=True)
|
|
stripe_charge_id = models.CharField(max_length=100, null=True)
|
|
gp_details = models.TextField()
|
|
billing_address_data = models.TextField()
|
|
|
|
|
|
class IncompleteSubscriptions(AssignPermissionsMixin, models.Model):
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
completed_at = models.DateTimeField(null=True)
|
|
subscription_id = models.CharField(max_length=100)
|
|
subscription_status = models.CharField(max_length=30)
|
|
name = models.CharField(max_length=50)
|
|
email = models.EmailField()
|
|
request = models.TextField()
|
|
stripe_api_cus_id = models.CharField(max_length=30)
|
|
card_details_response = models.TextField()
|
|
stripe_subscription_obj = models.TextField()
|
|
stripe_onetime_charge = models.TextField()
|
|
gp_details = models.TextField()
|
|
specs = models.TextField()
|
|
vm_template_id = models.PositiveIntegerField(default=0)
|
|
template = models.TextField()
|
|
billing_address_data = models.TextField()
|
|
|
|
|
|
class UserCardDetail(AssignPermissionsMixin, models.Model):
|
|
permissions = ('view_usercarddetail',)
|
|
stripe_customer = models.ForeignKey(StripeCustomer)
|
|
last4 = models.CharField(max_length=4)
|
|
brand = models.CharField(max_length=128)
|
|
card_id = models.CharField(max_length=100, blank=True, default='')
|
|
fingerprint = models.CharField(max_length=100)
|
|
exp_month = models.IntegerField(null=False)
|
|
exp_year = models.IntegerField(null=False)
|
|
preferred = models.BooleanField(default=False)
|
|
|
|
class Meta:
|
|
permissions = (
|
|
('view_usercarddetail', 'View User Card'),
|
|
)
|
|
|
|
@classmethod
|
|
def create(cls, stripe_customer=None, last4=None, brand=None,
|
|
fingerprint=None, exp_month=None, exp_year=None, card_id=None,
|
|
preferred=False):
|
|
instance = cls.objects.create(
|
|
stripe_customer=stripe_customer, last4=last4, brand=brand,
|
|
fingerprint=fingerprint, exp_month=exp_month, exp_year=exp_year,
|
|
card_id=card_id, preferred=preferred
|
|
)
|
|
instance.assign_permissions(stripe_customer.user)
|
|
return instance
|
|
|
|
@classmethod
|
|
def get_all_cards_list(cls, stripe_customer):
|
|
"""
|
|
Get all the cards of the given customer as a list
|
|
|
|
:param stripe_customer: The StripeCustomer object
|
|
:return: A list of all cards; an empty list if the customer object is
|
|
None
|
|
"""
|
|
cards_list = []
|
|
if stripe_customer is None:
|
|
return cards_list
|
|
user_card_details = UserCardDetail.objects.filter(
|
|
stripe_customer_id=stripe_customer.id
|
|
).order_by('-preferred', 'id')
|
|
for card in user_card_details:
|
|
cards_list.append({
|
|
'last4': card.last4, 'brand': card.brand, 'id': card.id,
|
|
'exp_year': card.exp_year,
|
|
'exp_month': '{:02d}'.format(card.exp_month),
|
|
'preferred': card.preferred
|
|
})
|
|
return cards_list
|
|
|
|
@classmethod
|
|
def get_or_create_user_card_detail(cls, stripe_customer, card_details):
|
|
"""
|
|
A method that checks if a UserCardDetail object exists already
|
|
based upon the given card_details and creates it for the given
|
|
customer if it does not exist. It returns the UserCardDetail object
|
|
matching the given card_details if it exists.
|
|
|
|
:param stripe_customer: The given StripeCustomer object to whom the
|
|
card object should belong to
|
|
:param card_details: A dictionary identifying a given card
|
|
:return: UserCardDetail object
|
|
"""
|
|
try:
|
|
if ('fingerprint' in card_details and 'exp_month' in card_details
|
|
and 'exp_year' in card_details):
|
|
card_detail = UserCardDetail.objects.get(
|
|
stripe_customer=stripe_customer,
|
|
fingerprint=card_details['fingerprint'],
|
|
exp_month=card_details['exp_month'],
|
|
exp_year=card_details['exp_year']
|
|
)
|
|
else:
|
|
raise UserCardDetail.DoesNotExist()
|
|
except UserCardDetail.DoesNotExist:
|
|
preferred = False
|
|
if 'preferred' in card_details:
|
|
preferred = card_details['preferred']
|
|
card_detail = UserCardDetail.create(
|
|
stripe_customer=stripe_customer,
|
|
last4=card_details['last4'],
|
|
brand=card_details['brand'],
|
|
fingerprint=card_details['fingerprint'],
|
|
exp_month=card_details['exp_month'],
|
|
exp_year=card_details['exp_year'],
|
|
card_id=card_details['card_id'],
|
|
preferred=preferred
|
|
)
|
|
return card_detail
|
|
|
|
@staticmethod
|
|
def set_default_card(stripe_api_cus_id, stripe_source_id):
|
|
"""
|
|
Sets the given stripe source as the default source for the given
|
|
Stripe customer
|
|
:param stripe_api_cus_id: Stripe customer id
|
|
:param stripe_source_id: The Stripe source id
|
|
:return:
|
|
"""
|
|
stripe_utils = StripeUtils()
|
|
cus_response = stripe_utils.get_customer(stripe_api_cus_id)
|
|
cu = cus_response['response_object']
|
|
if stripe_source_id.startswith("pm"):
|
|
# card is a payment method
|
|
cu.invoice_settings.default_payment_method = stripe_source_id
|
|
else:
|
|
cu.default_source = stripe_source_id
|
|
cu.save()
|
|
UserCardDetail.save_default_card_local(
|
|
stripe_api_cus_id, stripe_source_id
|
|
)
|
|
|
|
@staticmethod
|
|
def set_default_card_from_stripe(stripe_api_cus_id):
|
|
stripe_utils = StripeUtils()
|
|
cus_response = stripe_utils.get_customer(stripe_api_cus_id)
|
|
cu = cus_response['response_object']
|
|
default_source = cu.default_source
|
|
if default_source is not None:
|
|
UserCardDetail.save_default_card_local(
|
|
stripe_api_cus_id, default_source
|
|
)
|
|
|
|
@staticmethod
|
|
def save_default_card_local(stripe_api_cus_id, card_id):
|
|
stripe_cust = StripeCustomer.objects.get(stripe_id=stripe_api_cus_id)
|
|
user_card_detail = UserCardDetail.objects.get(
|
|
stripe_customer=stripe_cust, card_id=card_id
|
|
)
|
|
for card in stripe_cust.usercarddetail_set.all():
|
|
card.preferred = False
|
|
card.save()
|
|
user_card_detail.preferred = True
|
|
user_card_detail.save()
|
|
|
|
@staticmethod
|
|
def get_user_card_details(stripe_customer, card_details):
|
|
"""
|
|
A utility function to check whether a StripeCustomer is already
|
|
associated with the card having given details
|
|
|
|
:param stripe_customer:
|
|
:param card_details:
|
|
:return: The UserCardDetails object if it exists, None otherwise
|
|
"""
|
|
try:
|
|
ucd = UserCardDetail.objects.get(
|
|
stripe_customer=stripe_customer,
|
|
fingerprint=card_details['fingerprint'],
|
|
exp_month=card_details['exp_month'],
|
|
exp_year=card_details['exp_year']
|
|
)
|
|
return ucd
|
|
except UserCardDetail.DoesNotExist:
|
|
return None
|
|
|
|
|
|
class VMPricing(models.Model):
|
|
name = models.CharField(max_length=255, unique=True)
|
|
vat_inclusive = models.BooleanField(default=True)
|
|
vat_percentage = models.DecimalField(
|
|
max_digits=7, decimal_places=5, blank=True, default=0
|
|
)
|
|
cores_unit_price = models.DecimalField(
|
|
max_digits=7, decimal_places=5, default=0
|
|
)
|
|
ram_unit_price = models.DecimalField(
|
|
max_digits=7, decimal_places=5, default=0
|
|
)
|
|
ssd_unit_price = models.DecimalField(
|
|
max_digits=7, decimal_places=5, default=0
|
|
)
|
|
hdd_unit_price = models.DecimalField(
|
|
max_digits=7, decimal_places=6, default=0
|
|
)
|
|
discount_name = models.CharField(max_length=255, null=True, blank=True)
|
|
discount_amount = models.DecimalField(
|
|
max_digits=6, decimal_places=2, default=0
|
|
)
|
|
stripe_coupon_id = models.CharField(max_length=255, null=True, blank=True)
|
|
|
|
def __str__(self):
|
|
display_str = self.name + ' => ' + ' - '.join([
|
|
'{}/Core'.format(self.cores_unit_price.normalize()),
|
|
'{}/GB RAM'.format(self.ram_unit_price.normalize()),
|
|
'{}/GB SSD'.format(self.ssd_unit_price.normalize()),
|
|
'{}/GB HDD'.format(self.hdd_unit_price.normalize()),
|
|
'{}% VAT'.format(self.vat_percentage.normalize())
|
|
if not self.vat_inclusive else 'VAT-Incl',
|
|
])
|
|
if self.discount_amount:
|
|
display_str = ' - '.join([
|
|
display_str,
|
|
'{} {}'.format(
|
|
self.discount_amount,
|
|
self.discount_name if self.discount_name else 'Discount'
|
|
)
|
|
])
|
|
return display_str
|
|
|
|
@classmethod
|
|
def get_vm_pricing_by_name(cls, name):
|
|
try:
|
|
pricing = VMPricing.objects.get(name=name)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error getting VMPricing with name {name}. "
|
|
"Details: {details}. Attempting to return default"
|
|
"pricing.".format(name=name, details=str(e))
|
|
)
|
|
pricing = VMPricing.get_default_pricing()
|
|
return pricing
|
|
|
|
@classmethod
|
|
def get_default_pricing(cls):
|
|
""" Returns the default pricing or None """
|
|
try:
|
|
default_pricing = VMPricing.objects.get(name='default')
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
default_pricing = None
|
|
return default_pricing
|