dynamicweb2/hosting/models.py
2023-11-22 15:42:05 +05:30

612 lines
23 KiB
Python

import unicodedata
import logging
import random
import os
from django.db import models
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin
from django.contrib.sites.models import Site
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.urls import reverse
from django.db import IntegrityError, models
from guardian.shortcuts import assign_perm
from .fields import CountryField
from .mailer import BaseEmail
from dynamicweb2.ldap_manager import LdapManager
from dynamicweb2.stripe_utils import StripeUtils
from django.utils.crypto import get_random_string
from Crypto.PublicKey import RSA
from django.utils.functional import cached_property
# Create your models here.
logger = logging.getLogger(__name__)
def get_validation_slug():
return make_password(None)
def validate_name(value):
valid_chars = [char for char in value if (char.isalpha() or char == "-" or char == " ")]
if len(valid_chars) < len(value):
raise ValidationError(
_('%(value)s is not a valid name. A valid name can only include letters, spaces or -'),
params={'value': value},
)
class MyUserManager(BaseUserManager):
def create_user(self, email, name, password=None):
"""
Creates and saves a User with the given email,name and password.
"""
if not email:
raise ValueError('Users must have an email address')
user = self.model(
email=self.normalize_email(email),
name=name,
validation_slug=make_password(None)
)
user.is_admin = False
user.set_password(password)
user.save(using=self._db)
user.create_ldap_account(password)
return user
def create_superuser(self, email, name, password):
"""
Creates and saves a superuser with the given email, name and password.
"""
user = self.create_user(email,
password=password,
name=name,
)
user.is_admin = True
user.is_active = True
user.is_superuser = True
user.save(using=self._db)
return user
class CustomUser(AbstractBaseUser, PermissionsMixin):
VALIDATED_CHOICES = ((0, 'Not validated'), (1, 'Validated'))
site = models.ForeignKey(Site, default=1, on_delete=models.CASCADE)
name = models.CharField(max_length=50, validators=[validate_name])
email = models.EmailField(unique=True)
username = models.CharField(max_length=60, unique=True, null=True)
validated = models.IntegerField(choices=VALIDATED_CHOICES, default=0)
in_ldap = models.BooleanField(default=False)
# By default, we initialize the validation_slug with appropriate value
# This is required for User(page) admin
validation_slug = models.CharField(
db_index=True, unique=True, max_length=50,
default=get_validation_slug
)
is_admin = models.BooleanField(
_('staff status'),
default=False,
help_text=_(
'Designates whether the user can log into this admin site.'),
)
import_stripe_bill_remark = models.TextField(
default="",
help_text="Indicates any issues while importing stripe bills"
)
objects = MyUserManager()
USERNAME_FIELD = "email"
REQUIRED_FIELDS = ['name', 'password']
# Define the groups field with a related_name to avoid clash
groups = models.ManyToManyField(
'auth.Group',
related_name='custom_user_set', # Define a unique related_name
blank=True,
verbose_name='groups',
help_text='The groups this user belongs to.',
)
# Define the user_permissions field with a related_name to avoid clash
user_permissions = models.ManyToManyField(
'auth.Permission',
related_name='custom_user_permission_set', # Define a unique related_name
blank=True,
verbose_name='user permissions',
help_text='Specific permissions for this user.',
)
@classmethod
def register(cls, name, password, email, app='digital_glarus',
base_url=None, send_email=True, account_details=None):
user = cls.objects.filter(email=email).first()
if not user:
user = cls.objects.create_user(name=name, email=email,
password=password)
if user:
if app == 'dcl':
dcl_text = settings.DCL_TEXT
user.is_active = False
if send_email is True:
email_data = {
'subject': '{dcl_text} {account_activation}'.format(
dcl_text=dcl_text,
account_activation=_('Account Activation')
),
'from_address': settings.DCL_SUPPORT_FROM_ADDRESS,
'to': user.email,
'context': {'base_url': base_url,
'activation_link': reverse(
'hosting:validate',
kwargs={
'validate_slug': user.validation_slug}),
'dcl_text': dcl_text
},
'template_name': 'user_activation',
'template_path': 'datacenterlight/emails/'
}
if account_details:
email_data['context'][
'account_details'] = account_details
email = BaseEmail(**email_data)
email.send()
return user
else:
return None
else:
return None
@classmethod
def get_all_members(cls):
return cls.objects.filter(
stripecustomer__membershiporder__isnull=False)
@classmethod
def validate_url(cls, validation_slug):
user = cls.objects.filter(validation_slug=validation_slug).first()
if user:
user.validated = 1
user.save()
return True
return False
@classmethod
def get_random_password(cls):
return get_random_string(24)
def is_superuser(self):
return False
def get_full_name(self):
# The user is identified by their email address
return self.email
def get_short_name(self):
# The user is identified by their email address
return self.email
def get_first_and_last_name(self, full_name):
first_name, *last_name = full_name.split(" ")
last_name = " ".join(last_name)
return first_name, last_name
def assign_username(self, user):
if not user.username:
ldap_manager = LdapManager()
# Try to come up with a username
first_name, last_name = self.get_first_and_last_name(user.name)
user.username = unicodedata.normalize('NFKD', first_name + last_name)
user.username = "".join([char for char in user.username if char.isalnum()]).lower()
exist = True
while exist:
# Check if it exists
exist, entries = ldap_manager.check_user_exists(user.username)
if exist:
# If username exists in ldap, come up with a new user name and check it again
user.username = user.username + str(random.randint(0, 2 ** 10))
else:
# If username does not exists in ldap, try to save it in database
try:
user.save()
except IntegrityError:
# If username exists in database then come up with a new username
user.username = user.username + str(random.randint(0, 2 ** 10))
exist = True
def create_ldap_account(self, password):
# create ldap account for user if it does not exists already.
if self.in_ldap:
return
self.assign_username(self)
ldap_manager = LdapManager()
try:
user_exists_in_ldap, entries = ldap_manager.check_user_exists(self.username)
except Exception:
logger.exception("Exception occur while searching for user in LDAP")
else:
if not user_exists_in_ldap:
# IF no ldap account
first_name, last_name = self.get_first_and_last_name(self.name)
if not last_name:
last_name = first_name
ldap_manager.create_user(self.username, password=password,
firstname=first_name, lastname=last_name,
email=self.email)
else:
# User exists already in LDAP, but with a dummy credential
# We are here implies that the user has successfully
# authenticated against Django db, and a corresponding user
# exists in LDAP.
# We just update the LDAP credentials once again, assuming it
# was set to a dummy value while migrating users from Django to
# LDAP
ldap_manager.change_password(self.username, password)
self.in_ldap = True
self.save()
def __str__(self): # __unicode__ on Python 2
return self.email
# def has_perm(self, perm, obj=None):
# "Does the user have a specific permission?"
# # Simplest possible answer: Yes, always
# return self.is_admin
def has_module_perms(self, app_label):
"Does the user have permissions to view the app `app_label`?"
# Simplest possible answer: Yes, always
return self.is_admin
@property
def is_staff(self):
"Is the user a member of staff?"
# Simplest possible answer: All admins are staff
return self.is_admin
@is_staff.setter
def is_staff(self, value):
self._is_staff = value
class StripeCustomer(models.Model):
user = models.OneToOneField(CustomUser, on_delete=models.CASCADE)
stripe_id = models.CharField(unique=True, max_length=100)
def __str__(self):
return "%s - %s" % (self.stripe_id, self.user.email)
@classmethod
def create_stripe_api_customer(cls, email=None, id_payment_method=None,
customer_name=None):
"""
This method creates a Stripe API customer with the given
email, token and customer_name. This is different from
get_or_create method below in that it does not create a
CustomUser and associate the customer created in stripe
with it, while get_or_create does that before creating the
stripe user.
"""
stripe_utils = StripeUtils()
stripe_data = stripe_utils.create_customer(
id_payment_method, email, customer_name)
if stripe_data.get('response_object'):
stripe_cus_id = stripe_data.get('response_object').get('id')
return stripe_cus_id
else:
return None
@classmethod
def get_or_create(cls, email=None, token=None, id_payment_method=None):
"""
Check if there is a registered stripe customer with that email
or create a new one
"""
try:
stripe_utils = StripeUtils()
stripe_customer = cls.objects.get(user__email=email)
# check if user is not in stripe but in database
customer = stripe_utils.check_customer(stripe_customer.stripe_id,
stripe_customer.user, token)
if "deleted" in customer and customer["deleted"]:
raise StripeCustomer.DoesNotExist()
return stripe_customer
except StripeCustomer.DoesNotExist:
user = CustomUser.objects.get(email=email)
stripe_utils = StripeUtils()
stripe_data = stripe_utils.create_customer(token, email, user.name)
if stripe_data.get('response_object'):
stripe_cus_id = stripe_data.get('response_object').get('id')
if hasattr(user, 'stripecustomer'):
# User already had a Stripe account and we are here
# because the account was deleted in dashboard
# So, we simply update the stripe_id
user.stripecustomer.stripe_id = stripe_cus_id
user.stripecustomer.save()
stripe_customer = user.stripecustomer
else:
# The user never had an associated Stripe account
# So, create one
stripe_customer = StripeCustomer.objects.create(
user=user, stripe_id=stripe_cus_id
)
return stripe_customer
else:
return None
class BaseBillingAddress(models.Model):
cardholder_name = models.CharField(max_length=100, default="")
street_address = models.CharField(max_length=100)
city = models.CharField(max_length=50)
postal_code = models.CharField(max_length=50)
country = CountryField()
vat_number = models.CharField(max_length=100, default="", blank=True)
stripe_tax_id = models.CharField(max_length=100, default="", blank=True)
vat_number_validated_on = models.DateTimeField(blank=True, null=True)
vat_validation_status = models.CharField(max_length=25, default="",
blank=True)
class Meta:
abstract = True
class BillingAddress(BaseBillingAddress):
def __str__(self):
if self.vat_number:
return "%s, %s, %s, %s, %s, %s %s %s %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country, self.vat_number,
self.stripe_tax_id, self.vat_number_validated_on,
self.vat_validation_status
)
else:
return "%s, %s, %s, %s, %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country
)
class UserBillingAddress(BaseBillingAddress):
user = models.ForeignKey(CustomUser, related_name='billing_addresses', on_delete=models.CASCADE)
current = models.BooleanField(default=True)
def __str__(self):
if self.vat_number:
return "%s, %s, %s, %s, %s, %s %s %s %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country, self.vat_number,
self.stripe_tax_id, self.vat_number_validated_on,
self.vat_validation_status
)
else:
return "%s, %s, %s, %s, %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country
)
def to_dict(self):
return {
'Cardholder Name': self.cardholder_name,
'Street Address': self.street_address,
'City': self.city,
'Postal Code': self.postal_code,
'Country': self.country,
'VAT Number': self.vat_number
}
class AssignPermissionsMixin(object):
permissions = tuple()
user = None
obj = None
kwargs = dict()
def assign_permissions(self, user):
for permission in self.permissions:
assign_perm(permission, user, self)
class OrderDetail(AssignPermissionsMixin, models.Model):
# vm_template = models.ForeignKey(
# VMTemplate, blank=True, null=True, default=None,
# on_delete=models.SET_NULL
# )
cores = models.IntegerField(default=0)
memory = models.IntegerField(default=0)
hdd_size = models.IntegerField(default=0)
ssd_size = models.IntegerField(default=0)
def __str__(self):
return "Not available" if self.vm_template is None else (
"%s - %s, %s cores, %s GB RAM, %s GB SSD" % (
self.vm_template.name, self.vm_template.vm_type, self.cores,
self.memory, self.ssd_size
)
)
class GenericProduct(AssignPermissionsMixin, models.Model):
permissions = ('view_genericproduct',)
product_name = models.CharField(max_length=128, default="")
product_slug = models.SlugField(
unique=True,
help_text=(
'An mandatory unique slug for the product'
)
)
product_description = models.CharField(max_length=500, default="")
created_at = models.DateTimeField(auto_now_add=True)
product_price = models.DecimalField(max_digits=6, decimal_places=2)
product_vat = models.DecimalField(max_digits=6, decimal_places=4, default=0)
product_is_subscription = models.BooleanField(default=True)
product_subscription_interval = models.CharField(
max_length=10, default="month",
help_text="Choose between `year` and `month`")
exclude_vat_calculations = models.BooleanField(
default=False,
help_text="When checked VAT calculations are excluded for this product"
)
def __str__(self):
return self.product_name
def get_actual_price(self, vat_rate=None):
if self.exclude_vat_calculations:
return round(float(self.product_price), 2)
else:
VAT = vat_rate if vat_rate is not None else self.product_vat
return round(
float(self.product_price) + float(self.product_price) * float(VAT), 2
)
class HostingOrder(AssignPermissionsMixin, models.Model):
ORDER_APPROVED_STATUS = 'Approved'
ORDER_DECLINED_STATUS = 'Declined'
vm_id = models.IntegerField(default=0)
customer = models.ForeignKey(StripeCustomer, on_delete=models.CASCADE)
billing_address = models.ForeignKey(BillingAddress, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
approved = models.BooleanField(default=False)
last4 = models.CharField(max_length=4)
cc_brand = models.CharField(max_length=128)
stripe_charge_id = models.CharField(max_length=100, null=True)
price = models.FloatField()
subscription_id = models.CharField(max_length=100, null=True)
# vm_pricing = models.ForeignKey(VMPricing)
order_detail = models.ForeignKey(
OrderDetail, null=True, blank=True, default=None,
on_delete=models.SET_NULL
)
generic_product = models.ForeignKey(
GenericProduct, null=True, blank=True, default=None,
on_delete=models.SET_NULL
)
generic_payment_description = models.CharField(
max_length=500, null=True
)
permissions = ('u_view_hostingorder',)
class Meta:
permissions = (
('u_view_hostingorder', 'View Hosting Order'),
)
def __str__(self):
hosting_order_str = ("Order Nr: #{} - VM_ID: {} - {} - {} - "
"Specs: {} - Price: {}").format(
self.id, self.vm_id, self.customer.user.email, self.created_at,
self.order_detail, self.price
)
if self.generic_product_id is not None:
hosting_order_str += " - Generic Payment"
if self.stripe_charge_id is not None:
hosting_order_str += " - One time charge"
else:
hosting_order_str += " - Recurring"
return hosting_order_str
@cached_property
def status(self):
return self.ORDER_APPROVED_STATUS if self.approved else self.ORDER_DECLINED_STATUS
@classmethod
def create(cls, price=None, vm_id=0, customer=None,
billing_address=None, vm_pricing=None):
instance = cls.objects.create(
price=price,
vm_id=vm_id,
customer=customer,
billing_address=billing_address,
vm_pricing=vm_pricing
)
instance.assign_permissions(customer.user)
return instance
def set_approved(self):
self.approved = True
self.save()
def set_stripe_charge(self, stripe_charge):
self.stripe_charge_id = stripe_charge.id
if stripe_charge.source is None:
self.last4 = stripe_charge.payment_method_details.card.last4
self.cc_brand = stripe_charge.payment_method_details.card.brand
else:
self.last4 = stripe_charge.source.last4
self.cc_brand = stripe_charge.source.brand
self.save()
def set_subscription_id(self, subscription_id, cc_details):
"""
When creating a Stripe subscription, we have subscription id.
We store this in the subscription_id field.
This method sets the subscription id
and the last4 and credit card brands used for this order.
:param subscription_id: Stripe's subscription id
:param cc_details: A dict containing card details
{last4, brand}
:return:
"""
self.subscription_id = subscription_id
self.last4 = cc_details.get('last4')
self.cc_brand = cc_details.get('brand')
self.save()
def get_cc_data(self):
return {
'last4': self.last4,
'cc_brand': self.cc_brand,
} if self.last4 and self.cc_brand else None
class UserHostingKey(models.Model):
user = models.ForeignKey(CustomUser, blank=True, null=True, on_delete=models.CASCADE)
public_key = models.TextField()
private_key = models.FileField(upload_to='private_keys', blank=True)
created_at = models.DateTimeField(auto_now_add=True)
name = models.CharField(max_length=100)
@staticmethod
def generate_RSA(bits=2048):
'''
Generate an RSA keypair with an exponent of 65537 in PEM format
param: bits The key length in bits
Return private key and public key
'''
new_key = RSA.generate(2048, os.urandom)
public_key = new_key.publickey().exportKey("OpenSSH")
private_key = new_key.exportKey("PEM")
return private_key, public_key
@classmethod
def generate_keys(cls):
private_key, public_key = cls.generate_RSA()
# self.public_key = public_key
# self.save(update_fields=['public_key'])
return private_key, public_key
def delete(self, *args, **kwargs):
if bool(self.private_key) and os.path.isfile(self.private_key.path):
logger.debug("Removing private key {}".format(self.private_key.path))
os.remove(self.private_key.path)
else:
logger.debug("No private_key to remove")
super(UserHostingKey, self).delete(*args, **kwargs)
def get_anonymous_user_instance(CustomUser):
return CustomUser(email='anonymous@ungleich.ch')