forked from uncloud/uncloud
338 lines
12 KiB
Python
338 lines
12 KiB
Python
from django.contrib.auth.mixins import LoginRequiredMixin
|
|
from django.views.generic.base import TemplateView, View
|
|
from django.shortcuts import render
|
|
from django.db import transaction
|
|
from django.contrib.auth import get_user_model
|
|
from rest_framework import viewsets, mixins, permissions, status, views
|
|
from rest_framework.renderers import TemplateHTMLRenderer
|
|
from rest_framework.response import Response
|
|
from rest_framework.decorators import action
|
|
from rest_framework.reverse import reverse
|
|
from rest_framework.decorators import renderer_classes
|
|
from vat_validator import validate_vat, vies
|
|
from vat_validator.countries import EU_COUNTRY_CODES
|
|
from hardcopy import bytestring_to_pdf
|
|
from django.core.files.temp import NamedTemporaryFile
|
|
from django.http import FileResponse
|
|
from django.template.loader import render_to_string
|
|
from copy import deepcopy
|
|
|
|
import json
|
|
import logging
|
|
|
|
from .models import *
|
|
from .serializers import *
|
|
from .selectors import *
|
|
from .utils import get_order_total_with_vat
|
|
|
|
from datetime import datetime
|
|
from vat_validator import sanitize_vat
|
|
import uncloud_pay.stripe as uncloud_stripe
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.utils.decorators import method_decorator
|
|
from django.http import JsonResponse
|
|
import stripe
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PricingView(View):
|
|
def get(self, request, **args):
|
|
vat_rate = False
|
|
vat_validation_status = False
|
|
address = False
|
|
if self.request.user and self.request.user.is_authenticated:
|
|
address = get_billing_address_for_user(self.request.user)
|
|
if address:
|
|
vat_rate = VATRate.get_vat_rate(address)
|
|
vat_validation_status = "verified" if address.vat_number_validated_on and address.vat_number_verified else False
|
|
pricing = get_order_total_with_vat(
|
|
request.GET.get('cores'),
|
|
request.GET.get('memory'),
|
|
request.GET.get('storage'),
|
|
pricing_name = args['name'],
|
|
vat_rate = vat_rate * 100,
|
|
vat_validation_status = vat_validation_status
|
|
)
|
|
return JsonResponse(pricing)
|
|
|
|
class CardActivateView(View):
|
|
def post(self, request, **args):
|
|
card_id = request.POST.get('card_id')
|
|
if card_id:
|
|
matched_card = StripeCreditCard.objects.filter(owner=self.request.user, id=card_id).first()
|
|
matched_card.activate()
|
|
return JsonResponse({'success': 1})
|
|
else:
|
|
return JsonResponse({'error': "Please select a card"})
|
|
|
|
class RegisterCard(TemplateView):
|
|
template_name = "uncloud_pay/register_stripe.html"
|
|
|
|
@method_decorator(login_required)
|
|
def dispatch(self, *args, **kwargs):
|
|
return super().dispatch(*args, **kwargs)
|
|
|
|
|
|
def get_context_data(self, **kwargs):
|
|
customer_id = uncloud_stripe.get_customer_id_for(self.request.user)
|
|
setup_intent = uncloud_stripe.create_setup_intent(customer_id)
|
|
context = super().get_context_data(**kwargs)
|
|
context['client_secret'] = setup_intent.client_secret
|
|
context['username'] = self.request.user.username
|
|
context['stripe_pk'] = uncloud_stripe.public_api_key
|
|
return context
|
|
|
|
|
|
class CreditCardViewSet(mixins.RetrieveModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
mixins.DestroyModelMixin,
|
|
mixins.ListModelMixin,
|
|
viewsets.GenericViewSet):
|
|
|
|
serializer_class = StripeCreditCardSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def list(self, request):
|
|
uncloud_stripe.sync_cards_for_user(self.request.user)
|
|
return super().list(request)
|
|
|
|
def get_queryset(self):
|
|
return StripeCreditCard.objects.filter(owner=self.request.user)
|
|
|
|
class PaymentViewSet(viewsets.ModelViewSet):
|
|
serializer_class = PaymentSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Payment.objects.filter(owner=self.request.user)
|
|
|
|
class BalanceViewSet(viewsets.ViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def list(self, request):
|
|
serializer = BalanceSerializer(data={
|
|
'balance': get_balance_for_user(self.request.user)
|
|
})
|
|
serializer.is_valid()
|
|
return Response(serializer.data)
|
|
|
|
|
|
class ListCards(TemplateView):
|
|
template_name = "uncloud_pay/list_stripe.html"
|
|
|
|
@method_decorator(login_required)
|
|
def dispatch(self, *args, **kwargs):
|
|
return super().dispatch(*args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
customer_id = uncloud_stripe.get_customer_id_for(self.request.user)
|
|
cards = uncloud_stripe.get_customer_cards(customer_id)
|
|
|
|
context = super().get_context_data(**kwargs)
|
|
context['cards'] = cards
|
|
context['username'] = self.request.user
|
|
|
|
return context
|
|
|
|
###
|
|
# Bills and Orders.
|
|
|
|
class BillViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = BillSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Bill.objects.filter(owner=self.request.user)
|
|
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def unpaid(self, request):
|
|
serializer = self.get_serializer(
|
|
Bill.get_unpaid_for(self.request.user),
|
|
many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def download(self, *args, **kwargs):
|
|
"""
|
|
Allow to download
|
|
"""
|
|
bill = self.get_object()
|
|
provider = UncloudProvider.get_provider()
|
|
output_file = NamedTemporaryFile()
|
|
bill_html = render_to_string("bill.html.j2", {'bill': bill})
|
|
|
|
bytestring_to_pdf(bill_html.encode('utf-8'), output_file)
|
|
response = FileResponse(output_file, content_type="application/pdf")
|
|
response['Content-Disposition'] = 'filename="{}_{}.pdf"'.format(
|
|
bill.reference, bill.uuid
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
class OrderViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = OrderSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Order.objects.filter(owner=self.request.user)
|
|
|
|
class VATRateViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = VATRateSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
queryset = VATRate.objects.all()
|
|
|
|
class BillingAddressViewSet(mixins.CreateModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
mixins.ListModelMixin,
|
|
viewsets.GenericViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'update':
|
|
return UpdateBillingAddressSerializer
|
|
else:
|
|
return BillingAddressSerializer
|
|
|
|
def get_queryset(self):
|
|
return self.request.user.billing_addresses.all()
|
|
|
|
def create(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Validate VAT numbers.
|
|
country = serializer.validated_data["country"]
|
|
|
|
# We ignore empty VAT numbers.
|
|
if 'vat_number' in serializer.validated_data and serializer.validated_data["vat_number"] != "":
|
|
vat_number = serializer.validated_data["vat_number"]
|
|
|
|
if not validate_vat(country, vat_number):
|
|
return Response(
|
|
{'error': 'Malformed VAT number.'},
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
elif country in EU_COUNTRY_CODES:
|
|
# XXX: make a synchroneous call to a third patry API here might not be a good idea..
|
|
try:
|
|
vies_state = vies.check_vat(country, vat_number)
|
|
if not vies_state.valid:
|
|
return Response(
|
|
{'error': 'European VAT number does not exist in VIES.'},
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
return Response(
|
|
{'error': 'Could not validate EU VAT number against VIES. Try again later..'},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
serializer.save(owner=request.user)
|
|
return Response(serializer.data)
|
|
|
|
###
|
|
# Admin stuff.
|
|
|
|
class AdminPaymentViewSet(viewsets.ModelViewSet):
|
|
serializer_class = PaymentSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
return Payment.objects.all()
|
|
|
|
def create(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
serializer.save(timestamp=datetime.now())
|
|
|
|
headers = self.get_success_headers(serializer.data)
|
|
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
|
|
|
|
# Bills are generated from orders and should not be created or updated by hand.
|
|
class AdminBillViewSet(BillViewSet):
|
|
serializer_class = BillSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
return Bill.objects.all()
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def unpaid(self, request):
|
|
unpaid_bills = []
|
|
# XXX: works but we can do better than number of users + 1 SQL requests...
|
|
for user in get_user_model().objects.all():
|
|
unpaid_bills = unpaid_bills + Bill.get_unpaid_for(self.request.user)
|
|
|
|
serializer = self.get_serializer(unpaid_bills, many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def generate(self, request):
|
|
users = get_user_model().objects.all()
|
|
|
|
generated_bills = []
|
|
for user in users:
|
|
now = timezone.now()
|
|
generated_bills = generated_bills + Bill.generate_for(
|
|
year=now.year,
|
|
month=now.month,
|
|
user=user)
|
|
|
|
return Response(
|
|
map(lambda b: b.reference, generated_bills),
|
|
status=status.HTTP_200_OK)
|
|
|
|
class AdminOrderViewSet(mixins.ListModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.CreateModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
viewsets.GenericViewSet):
|
|
serializer_class = OrderSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
def get_serializer(self, *args, **kwargs):
|
|
return self.serializer_class(*args, **kwargs, admin=True)
|
|
|
|
def get_queryset(self):
|
|
return Order.objects.all()
|
|
|
|
# Updates create a new order and terminate the 'old' one.
|
|
@transaction.atomic
|
|
def update(self, request, *args, **kwargs):
|
|
order = self.get_object()
|
|
partial = kwargs.pop('partial', False)
|
|
serializer = self.get_serializer(order, data=request.data, partial=partial)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Clone existing order for replacement.
|
|
replacing_order = deepcopy(order)
|
|
|
|
# Yes, that's how you make a new entry in DB:
|
|
# https://docs.djangoproject.com/en/3.0/topics/db/queries/#copying-model-instances
|
|
replacing_order.pk = None
|
|
|
|
for attr, value in serializer.validated_data.items():
|
|
setattr(replacing_order, attr, value)
|
|
|
|
# Save replacing order and terminate 'previous' one.
|
|
replacing_order.save()
|
|
order.replaced_by = replacing_order
|
|
order.save()
|
|
order.terminate()
|
|
|
|
return Response(replacing_order)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def terminate(self, request, pk):
|
|
order = self.get_object()
|
|
if order.is_terminated:
|
|
return Response(
|
|
{'error': 'Order is already terminated.'},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
else:
|
|
order.terminate()
|
|
return Response({}, status=status.HTTP_200_OK)
|