dynamicweb2/hosting/models.py

613 lines
23 KiB
Python
Raw Normal View History

2023-11-22 10:12:05 +00:00
import unicodedata
import logging
import random
import os
from django.db import models
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin
from django.contrib.sites.models import Site
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.urls import reverse
from django.db import IntegrityError, models
from guardian.shortcuts import assign_perm
from .fields import CountryField
from .mailer import BaseEmail
from dynamicweb2.ldap_manager import LdapManager
from dynamicweb2.stripe_utils import StripeUtils
from django.utils.crypto import get_random_string
from Crypto.PublicKey import RSA
from django.utils.functional import cached_property
# Create your models here.
logger = logging.getLogger(__name__)
def get_validation_slug():
return make_password(None)
def validate_name(value):
valid_chars = [char for char in value if (char.isalpha() or char == "-" or char == " ")]
if len(valid_chars) < len(value):
raise ValidationError(
_('%(value)s is not a valid name. A valid name can only include letters, spaces or -'),
params={'value': value},
)
class MyUserManager(BaseUserManager):
def create_user(self, email, name, password=None):
"""
Creates and saves a User with the given email,name and password.
"""
if not email:
raise ValueError('Users must have an email address')
user = self.model(
email=self.normalize_email(email),
name=name,
validation_slug=make_password(None)
)
user.is_admin = False
user.set_password(password)
user.save(using=self._db)
user.create_ldap_account(password)
return user
def create_superuser(self, email, name, password):
"""
Creates and saves a superuser with the given email, name and password.
"""
user = self.create_user(email,
password=password,
name=name,
)
user.is_admin = True
user.is_active = True
user.is_superuser = True
user.save(using=self._db)
return user
class CustomUser(AbstractBaseUser, PermissionsMixin):
VALIDATED_CHOICES = ((0, 'Not validated'), (1, 'Validated'))
site = models.ForeignKey(Site, default=1, on_delete=models.CASCADE)
name = models.CharField(max_length=50, validators=[validate_name])
email = models.EmailField(unique=True)
username = models.CharField(max_length=60, unique=True, null=True)
validated = models.IntegerField(choices=VALIDATED_CHOICES, default=0)
in_ldap = models.BooleanField(default=False)
# By default, we initialize the validation_slug with appropriate value
# This is required for User(page) admin
validation_slug = models.CharField(
db_index=True, unique=True, max_length=50,
default=get_validation_slug
)
is_admin = models.BooleanField(
_('staff status'),
default=False,
help_text=_(
'Designates whether the user can log into this admin site.'),
)
import_stripe_bill_remark = models.TextField(
default="",
help_text="Indicates any issues while importing stripe bills"
)
objects = MyUserManager()
USERNAME_FIELD = "email"
REQUIRED_FIELDS = ['name', 'password']
# Define the groups field with a related_name to avoid clash
groups = models.ManyToManyField(
'auth.Group',
related_name='custom_user_set', # Define a unique related_name
blank=True,
verbose_name='groups',
help_text='The groups this user belongs to.',
)
# Define the user_permissions field with a related_name to avoid clash
user_permissions = models.ManyToManyField(
'auth.Permission',
related_name='custom_user_permission_set', # Define a unique related_name
blank=True,
verbose_name='user permissions',
help_text='Specific permissions for this user.',
)
@classmethod
def register(cls, name, password, email, app='digital_glarus',
base_url=None, send_email=True, account_details=None):
user = cls.objects.filter(email=email).first()
if not user:
user = cls.objects.create_user(name=name, email=email,
password=password)
if user:
if app == 'dcl':
dcl_text = settings.DCL_TEXT
user.is_active = False
if send_email is True:
email_data = {
'subject': '{dcl_text} {account_activation}'.format(
dcl_text=dcl_text,
account_activation=_('Account Activation')
),
'from_address': settings.DCL_SUPPORT_FROM_ADDRESS,
'to': user.email,
'context': {'base_url': base_url,
'activation_link': reverse(
'hosting:validate',
kwargs={
'validate_slug': user.validation_slug}),
'dcl_text': dcl_text
},
'template_name': 'user_activation',
'template_path': 'datacenterlight/emails/'
}
if account_details:
email_data['context'][
'account_details'] = account_details
email = BaseEmail(**email_data)
email.send()
return user
else:
return None
else:
return None
@classmethod
def get_all_members(cls):
return cls.objects.filter(
stripecustomer__membershiporder__isnull=False)
@classmethod
def validate_url(cls, validation_slug):
user = cls.objects.filter(validation_slug=validation_slug).first()
if user:
user.validated = 1
user.save()
return True
return False
@classmethod
def get_random_password(cls):
return get_random_string(24)
def is_superuser(self):
return False
def get_full_name(self):
# The user is identified by their email address
return self.email
def get_short_name(self):
# The user is identified by their email address
return self.email
def get_first_and_last_name(self, full_name):
first_name, *last_name = full_name.split(" ")
last_name = " ".join(last_name)
return first_name, last_name
def assign_username(self, user):
if not user.username:
ldap_manager = LdapManager()
# Try to come up with a username
first_name, last_name = self.get_first_and_last_name(user.name)
user.username = unicodedata.normalize('NFKD', first_name + last_name)
user.username = "".join([char for char in user.username if char.isalnum()]).lower()
exist = True
while exist:
# Check if it exists
exist, entries = ldap_manager.check_user_exists(user.username)
if exist:
# If username exists in ldap, come up with a new user name and check it again
user.username = user.username + str(random.randint(0, 2 ** 10))
else:
# If username does not exists in ldap, try to save it in database
try:
user.save()
except IntegrityError:
# If username exists in database then come up with a new username
user.username = user.username + str(random.randint(0, 2 ** 10))
exist = True
def create_ldap_account(self, password):
# create ldap account for user if it does not exists already.
if self.in_ldap:
return
self.assign_username(self)
ldap_manager = LdapManager()
try:
user_exists_in_ldap, entries = ldap_manager.check_user_exists(self.username)
except Exception:
logger.exception("Exception occur while searching for user in LDAP")
else:
if not user_exists_in_ldap:
# IF no ldap account
first_name, last_name = self.get_first_and_last_name(self.name)
if not last_name:
last_name = first_name
ldap_manager.create_user(self.username, password=password,
firstname=first_name, lastname=last_name,
email=self.email)
else:
# User exists already in LDAP, but with a dummy credential
# We are here implies that the user has successfully
# authenticated against Django db, and a corresponding user
# exists in LDAP.
# We just update the LDAP credentials once again, assuming it
# was set to a dummy value while migrating users from Django to
# LDAP
ldap_manager.change_password(self.username, password)
self.in_ldap = True
self.save()
def __str__(self): # __unicode__ on Python 2
return self.email
# def has_perm(self, perm, obj=None):
# "Does the user have a specific permission?"
# # Simplest possible answer: Yes, always
# return self.is_admin
def has_module_perms(self, app_label):
"Does the user have permissions to view the app `app_label`?"
# Simplest possible answer: Yes, always
return self.is_admin
@property
def is_staff(self):
"Is the user a member of staff?"
# Simplest possible answer: All admins are staff
return self.is_admin
@is_staff.setter
def is_staff(self, value):
self._is_staff = value
class StripeCustomer(models.Model):
user = models.OneToOneField(CustomUser, on_delete=models.CASCADE)
stripe_id = models.CharField(unique=True, max_length=100)
def __str__(self):
return "%s - %s" % (self.stripe_id, self.user.email)
@classmethod
def create_stripe_api_customer(cls, email=None, id_payment_method=None,
customer_name=None):
"""
This method creates a Stripe API customer with the given
email, token and customer_name. This is different from
get_or_create method below in that it does not create a
CustomUser and associate the customer created in stripe
with it, while get_or_create does that before creating the
stripe user.
"""
stripe_utils = StripeUtils()
stripe_data = stripe_utils.create_customer(
id_payment_method, email, customer_name)
if stripe_data.get('response_object'):
stripe_cus_id = stripe_data.get('response_object').get('id')
return stripe_cus_id
else:
return None
@classmethod
def get_or_create(cls, email=None, token=None, id_payment_method=None):
"""
Check if there is a registered stripe customer with that email
or create a new one
"""
try:
stripe_utils = StripeUtils()
stripe_customer = cls.objects.get(user__email=email)
# check if user is not in stripe but in database
customer = stripe_utils.check_customer(stripe_customer.stripe_id,
stripe_customer.user, token)
if "deleted" in customer and customer["deleted"]:
raise StripeCustomer.DoesNotExist()
return stripe_customer
except StripeCustomer.DoesNotExist:
user = CustomUser.objects.get(email=email)
stripe_utils = StripeUtils()
stripe_data = stripe_utils.create_customer(token, email, user.name)
if stripe_data.get('response_object'):
stripe_cus_id = stripe_data.get('response_object').get('id')
if hasattr(user, 'stripecustomer'):
# User already had a Stripe account and we are here
# because the account was deleted in dashboard
# So, we simply update the stripe_id
user.stripecustomer.stripe_id = stripe_cus_id
user.stripecustomer.save()
stripe_customer = user.stripecustomer
else:
# The user never had an associated Stripe account
# So, create one
stripe_customer = StripeCustomer.objects.create(
user=user, stripe_id=stripe_cus_id
)
return stripe_customer
else:
return None
class BaseBillingAddress(models.Model):
cardholder_name = models.CharField(max_length=100, default="")
street_address = models.CharField(max_length=100)
city = models.CharField(max_length=50)
postal_code = models.CharField(max_length=50)
country = CountryField()
vat_number = models.CharField(max_length=100, default="", blank=True)
stripe_tax_id = models.CharField(max_length=100, default="", blank=True)
vat_number_validated_on = models.DateTimeField(blank=True, null=True)
vat_validation_status = models.CharField(max_length=25, default="",
blank=True)
class Meta:
abstract = True
class BillingAddress(BaseBillingAddress):
def __str__(self):
if self.vat_number:
return "%s, %s, %s, %s, %s, %s %s %s %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country, self.vat_number,
self.stripe_tax_id, self.vat_number_validated_on,
self.vat_validation_status
)
else:
return "%s, %s, %s, %s, %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country
)
class UserBillingAddress(BaseBillingAddress):
user = models.ForeignKey(CustomUser, related_name='billing_addresses', on_delete=models.CASCADE)
current = models.BooleanField(default=True)
def __str__(self):
if self.vat_number:
return "%s, %s, %s, %s, %s, %s %s %s %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country, self.vat_number,
self.stripe_tax_id, self.vat_number_validated_on,
self.vat_validation_status
)
else:
return "%s, %s, %s, %s, %s" % (
self.cardholder_name, self.street_address, self.city,
self.postal_code, self.country
)
def to_dict(self):
return {
'Cardholder Name': self.cardholder_name,
'Street Address': self.street_address,
'City': self.city,
'Postal Code': self.postal_code,
'Country': self.country,
'VAT Number': self.vat_number
}
class AssignPermissionsMixin(object):
permissions = tuple()
user = None
obj = None
kwargs = dict()
def assign_permissions(self, user):
for permission in self.permissions:
assign_perm(permission, user, self)
class OrderDetail(AssignPermissionsMixin, models.Model):
# vm_template = models.ForeignKey(
# VMTemplate, blank=True, null=True, default=None,
# on_delete=models.SET_NULL
# )
cores = models.IntegerField(default=0)
memory = models.IntegerField(default=0)
hdd_size = models.IntegerField(default=0)
ssd_size = models.IntegerField(default=0)
def __str__(self):
return "Not available" if self.vm_template is None else (
"%s - %s, %s cores, %s GB RAM, %s GB SSD" % (
self.vm_template.name, self.vm_template.vm_type, self.cores,
self.memory, self.ssd_size
)
)
class GenericProduct(AssignPermissionsMixin, models.Model):
permissions = ('view_genericproduct',)
product_name = models.CharField(max_length=128, default="")
product_slug = models.SlugField(
unique=True,
help_text=(
'An mandatory unique slug for the product'
)
)
product_description = models.CharField(max_length=500, default="")
created_at = models.DateTimeField(auto_now_add=True)
product_price = models.DecimalField(max_digits=6, decimal_places=2)
product_vat = models.DecimalField(max_digits=6, decimal_places=4, default=0)
product_is_subscription = models.BooleanField(default=True)
product_subscription_interval = models.CharField(
max_length=10, default="month",
help_text="Choose between `year` and `month`")
exclude_vat_calculations = models.BooleanField(
default=False,
help_text="When checked VAT calculations are excluded for this product"
)
def __str__(self):
return self.product_name
def get_actual_price(self, vat_rate=None):
if self.exclude_vat_calculations:
return round(float(self.product_price), 2)
else:
VAT = vat_rate if vat_rate is not None else self.product_vat
return round(
float(self.product_price) + float(self.product_price) * float(VAT), 2
)
class HostingOrder(AssignPermissionsMixin, models.Model):
ORDER_APPROVED_STATUS = 'Approved'
ORDER_DECLINED_STATUS = 'Declined'
vm_id = models.IntegerField(default=0)
customer = models.ForeignKey(StripeCustomer, on_delete=models.CASCADE)
billing_address = models.ForeignKey(BillingAddress, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
approved = models.BooleanField(default=False)
last4 = models.CharField(max_length=4)
cc_brand = models.CharField(max_length=128)
stripe_charge_id = models.CharField(max_length=100, null=True)
price = models.FloatField()
subscription_id = models.CharField(max_length=100, null=True)
# vm_pricing = models.ForeignKey(VMPricing)
order_detail = models.ForeignKey(
OrderDetail, null=True, blank=True, default=None,
on_delete=models.SET_NULL
)
generic_product = models.ForeignKey(
GenericProduct, null=True, blank=True, default=None,
on_delete=models.SET_NULL
)
generic_payment_description = models.CharField(
max_length=500, null=True
)
permissions = ('u_view_hostingorder',)
class Meta:
permissions = (
('u_view_hostingorder', 'View Hosting Order'),
)
def __str__(self):
hosting_order_str = ("Order Nr: #{} - VM_ID: {} - {} - {} - "
"Specs: {} - Price: {}").format(
self.id, self.vm_id, self.customer.user.email, self.created_at,
self.order_detail, self.price
)
if self.generic_product_id is not None:
hosting_order_str += " - Generic Payment"
if self.stripe_charge_id is not None:
hosting_order_str += " - One time charge"
else:
hosting_order_str += " - Recurring"
return hosting_order_str
@cached_property
def status(self):
return self.ORDER_APPROVED_STATUS if self.approved else self.ORDER_DECLINED_STATUS
@classmethod
def create(cls, price=None, vm_id=0, customer=None,
billing_address=None, vm_pricing=None):
instance = cls.objects.create(
price=price,
vm_id=vm_id,
customer=customer,
billing_address=billing_address,
vm_pricing=vm_pricing
)
instance.assign_permissions(customer.user)
return instance
def set_approved(self):
self.approved = True
self.save()
def set_stripe_charge(self, stripe_charge):
self.stripe_charge_id = stripe_charge.id
if stripe_charge.source is None:
self.last4 = stripe_charge.payment_method_details.card.last4
self.cc_brand = stripe_charge.payment_method_details.card.brand
else:
self.last4 = stripe_charge.source.last4
self.cc_brand = stripe_charge.source.brand
self.save()
def set_subscription_id(self, subscription_id, cc_details):
"""
When creating a Stripe subscription, we have subscription id.
We store this in the subscription_id field.
This method sets the subscription id
and the last4 and credit card brands used for this order.
:param subscription_id: Stripe's subscription id
:param cc_details: A dict containing card details
{last4, brand}
:return:
"""
self.subscription_id = subscription_id
self.last4 = cc_details.get('last4')
self.cc_brand = cc_details.get('brand')
self.save()
def get_cc_data(self):
return {
'last4': self.last4,
'cc_brand': self.cc_brand,
} if self.last4 and self.cc_brand else None
class UserHostingKey(models.Model):
user = models.ForeignKey(CustomUser, blank=True, null=True, on_delete=models.CASCADE)
public_key = models.TextField()
private_key = models.FileField(upload_to='private_keys', blank=True)
created_at = models.DateTimeField(auto_now_add=True)
name = models.CharField(max_length=100)
@staticmethod
def generate_RSA(bits=2048):
'''
Generate an RSA keypair with an exponent of 65537 in PEM format
param: bits The key length in bits
Return private key and public key
'''
new_key = RSA.generate(2048, os.urandom)
public_key = new_key.publickey().exportKey("OpenSSH")
private_key = new_key.exportKey("PEM")
return private_key, public_key
@classmethod
def generate_keys(cls):
private_key, public_key = cls.generate_RSA()
# self.public_key = public_key
# self.save(update_fields=['public_key'])
return private_key, public_key
def delete(self, *args, **kwargs):
if bool(self.private_key) and os.path.isfile(self.private_key.path):
logger.debug("Removing private key {}".format(self.private_key.path))
os.remove(self.private_key.path)
else:
logger.debug("No private_key to remove")
super(UserHostingKey, self).delete(*args, **kwargs)
def get_anonymous_user_instance(CustomUser):
return CustomUser(email='anonymous@ungleich.ch')