from django.contrib.auth.mixins import LoginRequiredMixin from django.views.generic.base import TemplateView from django.shortcuts import render from django.db import transaction from django.contrib.auth import get_user_model from rest_framework import viewsets, mixins, permissions, status, views from rest_framework.renderers import TemplateHTMLRenderer from rest_framework.response import Response from rest_framework.decorators import action from rest_framework.reverse import reverse from rest_framework.decorators import renderer_classes from vat_validator import validate_vat, vies from vat_validator.countries import EU_COUNTRY_CODES from hardcopy import bytestring_to_pdf from django.core.files.temp import NamedTemporaryFile from django.http import FileResponse from django.template.loader import render_to_string from copy import deepcopy import json import logging from .models import * from .serializers import * from .selectors import * from datetime import datetime from vat_validator import sanitize_vat import uncloud_pay.stripe as uncloud_stripe from django.contrib.auth.decorators import login_required from django.utils.decorators import method_decorator from django.http import JsonResponse import stripe logger = logging.getLogger(__name__) ### # 2020-12 checked code class RegisterCard(TemplateView): template_name = "uncloud_pay/register_stripe.html" @method_decorator(login_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) def get_context_data(self, **kwargs): customer_id = uncloud_stripe.get_customer_id_for(self.request.user) setup_intent = uncloud_stripe.create_setup_intent(customer_id) context = super().get_context_data(**kwargs) context['client_secret'] = setup_intent.client_secret context['username'] = self.request.user.username context['stripe_pk'] = uncloud_stripe.public_api_key return context class CreditCardViewSet(mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet): serializer_class = StripeCreditCardSerializer permission_classes = [permissions.IsAuthenticated] def list(self, request): uncloud_stripe.sync_cards_for_user(self.request.user) return super().list(request) def get_queryset(self): return StripeCreditCard.objects.filter(owner=self.request.user) class PaymentViewSet(viewsets.ModelViewSet): serializer_class = PaymentSerializer permission_classes = [permissions.IsAuthenticated] def get_queryset(self): return Payment.objects.filter(owner=self.request.user) class BalanceViewSet(viewsets.ViewSet): permission_classes = [permissions.IsAuthenticated] def list(self, request): serializer = BalanceSerializer(data={ 'balance': get_balance_for_user(self.request.user) }) serializer.is_valid() return Response(serializer.data) class ListCards(TemplateView): template_name = "uncloud_pay/list_stripe.html" @method_decorator(login_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) def get_context_data(self, **kwargs): customer_id = uncloud_stripe.get_customer_id_for(self.request.user) cards = uncloud_stripe.get_customer_cards(customer_id) context = super().get_context_data(**kwargs) context['cards'] = cards context['username'] = self.request.user return context ### # Bills and Orders. class BillViewSet(viewsets.ReadOnlyModelViewSet): serializer_class = BillSerializer permission_classes = [permissions.IsAuthenticated] def get_queryset(self): return Bill.objects.filter(owner=self.request.user) @action(detail=False, methods=['get']) def unpaid(self, request): serializer = self.get_serializer( Bill.get_unpaid_for(self.request.user), many=True) return Response(serializer.data) @action(detail=True, methods=['get']) def download(self, *args, **kwargs): """ Allow to download """ bill = self.get_object() provider = UncloudProvider.get_provider() output_file = NamedTemporaryFile() bill_html = render_to_string("bill.html.j2", {'bill': bill}) bytestring_to_pdf(bill_html.encode('utf-8'), output_file) response = FileResponse(output_file, content_type="application/pdf") response['Content-Disposition'] = 'filename="{}_{}.pdf"'.format( bill.reference, bill.uuid ) return response class OrderViewSet(viewsets.ReadOnlyModelViewSet): serializer_class = OrderSerializer permission_classes = [permissions.IsAuthenticated] def get_queryset(self): return Order.objects.filter(owner=self.request.user) class VATRateViewSet(viewsets.ReadOnlyModelViewSet): serializer_class = VATRateSerializer permission_classes = [permissions.IsAuthenticated] queryset = VATRate.objects.all() class BillingAddressViewSet(mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet): permission_classes = [permissions.IsAuthenticated] def get_serializer_class(self): if self.action == 'update': return UpdateBillingAddressSerializer else: return BillingAddressSerializer def get_queryset(self): return self.request.user.billing_addresses.all() def create(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) # Validate VAT numbers. country = serializer.validated_data["country"] # We ignore empty VAT numbers. if 'vat_number' in serializer.validated_data and serializer.validated_data["vat_number"] != "": vat_number = serializer.validated_data["vat_number"] if not validate_vat(country, vat_number): return Response( {'error': 'Malformed VAT number.'}, status=status.HTTP_400_BAD_REQUEST) elif country in EU_COUNTRY_CODES: # XXX: make a synchroneous call to a third patry API here might not be a good idea.. try: vies_state = vies.check_vat(country, vat_number) if not vies_state.valid: return Response( {'error': 'European VAT number does not exist in VIES.'}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.warning(e) return Response( {'error': 'Could not validate EU VAT number against VIES. Try again later..'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) serializer.save(owner=request.user) return Response(serializer.data) ### # Admin stuff. class AdminPaymentViewSet(viewsets.ModelViewSet): serializer_class = PaymentSerializer permission_classes = [permissions.IsAdminUser] def get_queryset(self): return Payment.objects.all() def create(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) serializer.save(timestamp=datetime.now()) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) # Bills are generated from orders and should not be created or updated by hand. class AdminBillViewSet(BillViewSet): serializer_class = BillSerializer permission_classes = [permissions.IsAdminUser] def get_queryset(self): return Bill.objects.all() @action(detail=False, methods=['get']) def unpaid(self, request): unpaid_bills = [] # XXX: works but we can do better than number of users + 1 SQL requests... for user in get_user_model().objects.all(): unpaid_bills = unpaid_bills + Bill.get_unpaid_for(self.request.user) serializer = self.get_serializer(unpaid_bills, many=True) return Response(serializer.data) @action(detail=False, methods=['post']) def generate(self, request): users = get_user_model().objects.all() generated_bills = [] for user in users: now = timezone.now() generated_bills = generated_bills + Bill.generate_for( year=now.year, month=now.month, user=user) return Response( map(lambda b: b.reference, generated_bills), status=status.HTTP_200_OK) class AdminOrderViewSet(mixins.ListModelMixin, mixins.RetrieveModelMixin, mixins.CreateModelMixin, mixins.UpdateModelMixin, viewsets.GenericViewSet): serializer_class = OrderSerializer permission_classes = [permissions.IsAdminUser] def get_serializer(self, *args, **kwargs): return self.serializer_class(*args, **kwargs, admin=True) def get_queryset(self): return Order.objects.all() # Updates create a new order and terminate the 'old' one. @transaction.atomic def update(self, request, *args, **kwargs): order = self.get_object() partial = kwargs.pop('partial', False) serializer = self.get_serializer(order, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) # Clone existing order for replacement. replacing_order = deepcopy(order) # Yes, that's how you make a new entry in DB: # https://docs.djangoproject.com/en/3.0/topics/db/queries/#copying-model-instances replacing_order.pk = None for attr, value in serializer.validated_data.items(): setattr(replacing_order, attr, value) # Save replacing order and terminate 'previous' one. replacing_order.save() order.replaced_by = replacing_order order.save() order.terminate() return Response(replacing_order) @action(detail=True, methods=['post']) def terminate(self, request, pk): order = self.get_object() if order.is_terminated: return Response( {'error': 'Order is already terminated.'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) else: order.terminate() return Response({}, status=status.HTTP_200_OK)