371 lines
14 KiB
Python
371 lines
14 KiB
Python
from django.shortcuts import render
|
|
from django.db import transaction
|
|
from django.contrib.auth import get_user_model
|
|
from rest_framework import viewsets, mixins, permissions, status, views
|
|
from rest_framework.renderers import TemplateHTMLRenderer
|
|
from rest_framework.response import Response
|
|
from rest_framework.decorators import action
|
|
from rest_framework.reverse import reverse
|
|
from rest_framework.decorators import renderer_classes
|
|
from vat_validator import validate_vat, vies
|
|
from vat_validator.countries import EU_COUNTRY_CODES
|
|
from hardcopy import bytestring_to_pdf
|
|
from django.core.files.temp import NamedTemporaryFile
|
|
from django.http import FileResponse
|
|
from django.template.loader import render_to_string
|
|
from copy import deepcopy
|
|
|
|
import json
|
|
import logging
|
|
|
|
from .models import *
|
|
from .serializers import *
|
|
from datetime import datetime
|
|
from vat_validator import sanitize_vat
|
|
import uncloud_pay.stripe as uncloud_stripe
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
###
|
|
# Payments and Payment Methods.
|
|
|
|
class PaymentViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = PaymentSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Payment.objects.filter(owner=self.request.user)
|
|
|
|
class OrderViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = OrderSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Order.objects.filter(owner=self.request.user)
|
|
|
|
class PaymentMethodViewSet(viewsets.ModelViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'create':
|
|
return CreatePaymentMethodSerializer
|
|
elif self.action == 'update':
|
|
return UpdatePaymentMethodSerializer
|
|
elif self.action == 'charge':
|
|
return ChargePaymentMethodSerializer
|
|
else:
|
|
return PaymentMethodSerializer
|
|
|
|
def get_queryset(self):
|
|
return PaymentMethod.objects.filter(owner=self.request.user)
|
|
|
|
# XXX: Handling of errors is far from great down there.
|
|
@transaction.atomic
|
|
def create(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Set newly created method as primary if no other method is.
|
|
if PaymentMethod.get_primary_for(request.user) == None:
|
|
serializer.validated_data['primary'] = True
|
|
|
|
if serializer.validated_data['source'] == "stripe":
|
|
# Retrieve Stripe customer ID for user.
|
|
customer_id = uncloud_stripe.get_customer_id_for(request.user)
|
|
if customer_id == None:
|
|
return Response(
|
|
{'error': 'Could not resolve customer stripe ID.'},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
try:
|
|
setup_intent = uncloud_stripe.create_setup_intent(customer_id)
|
|
except Exception as e:
|
|
return Response({'error': str(e)},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
payment_method = PaymentMethod.objects.create(
|
|
owner=request.user,
|
|
stripe_setup_intent_id=setup_intent.id,
|
|
**serializer.validated_data)
|
|
|
|
# TODO: find a way to use reverse properly:
|
|
# https://www.django-rest-framework.org/api-guide/reverse/
|
|
path = "payment-method/{}/register-stripe-cc".format(
|
|
payment_method.uuid)
|
|
stripe_registration_url = reverse('api-root', request=request) + path
|
|
return Response({'please_visit': stripe_registration_url})
|
|
else:
|
|
serializer.save(owner=request.user, **serializer.validated_data)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def charge(self, request, pk=None):
|
|
payment_method = self.get_object()
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
amount = serializer.validated_data['amount']
|
|
try:
|
|
payment = payment_method.charge(amount)
|
|
output_serializer = PaymentSerializer(payment)
|
|
return Response(output_serializer.data)
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
@action(detail=True, methods=['get'], url_path='register-stripe-cc', renderer_classes=[TemplateHTMLRenderer])
|
|
def register_stripe_cc(self, request, pk=None):
|
|
payment_method = self.get_object()
|
|
|
|
if payment_method.source != 'stripe':
|
|
return Response(
|
|
{'error': 'This is not a Stripe-based payment method.'},
|
|
template_name='error.html.j2')
|
|
|
|
if payment_method.active:
|
|
return Response(
|
|
{'error': 'This payment method is already active'},
|
|
template_name='error.html.j2')
|
|
|
|
try:
|
|
setup_intent = uncloud_stripe.get_setup_intent(
|
|
payment_method.stripe_setup_intent_id)
|
|
except Exception as e:
|
|
return Response(
|
|
{'error': str(e)},
|
|
template_name='error.html.j2')
|
|
|
|
# TODO: find a way to use reverse properly:
|
|
# https://www.django-rest-framework.org/api-guide/reverse/
|
|
callback_path= "payment-method/{}/activate-stripe-cc/".format(
|
|
payment_method.uuid)
|
|
callback = reverse('api-root', request=request) + callback_path
|
|
|
|
# Render stripe card registration form.
|
|
template_args = {
|
|
'client_secret': setup_intent.client_secret,
|
|
'stripe_pk': uncloud_stripe.public_api_key,
|
|
'callback': callback
|
|
}
|
|
return Response(template_args, template_name='stripe-payment.html.j2')
|
|
|
|
@action(detail=True, methods=['post'], url_path='activate-stripe-cc')
|
|
def activate_stripe_cc(self, request, pk=None):
|
|
payment_method = self.get_object()
|
|
try:
|
|
setup_intent = uncloud_stripe.get_setup_intent(
|
|
payment_method.stripe_setup_intent_id)
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
# Card had been registered, fetching payment method.
|
|
print(setup_intent)
|
|
if setup_intent.payment_method:
|
|
payment_method.stripe_payment_method_id = setup_intent.payment_method
|
|
payment_method.save()
|
|
|
|
return Response({
|
|
'uuid': payment_method.uuid,
|
|
'activated': payment_method.active})
|
|
else:
|
|
error = 'Could not fetch payment method from stripe. Please try again.'
|
|
return Response({'error': error})
|
|
|
|
@action(detail=True, methods=['post'], url_path='set-as-primary')
|
|
def set_as_primary(self, request, pk=None):
|
|
payment_method = self.get_object()
|
|
payment_method.set_as_primary_for(request.user)
|
|
|
|
serializer = self.get_serializer(payment_method)
|
|
return Response(serializer.data)
|
|
|
|
###
|
|
# Bills and Orders.
|
|
|
|
class BillViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = BillSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Bill.objects.filter(owner=self.request.user)
|
|
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def unpaid(self, request):
|
|
serializer = self.get_serializer(
|
|
Bill.get_unpaid_for(self.request.user),
|
|
many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def download(self, *args, **kwargs):
|
|
bill = self.get_object()
|
|
output_file = NamedTemporaryFile()
|
|
bill_html = render_to_string("bill.html.j2", {'bill': bill})
|
|
|
|
bytestring_to_pdf(bill_html.encode('utf-8'), output_file)
|
|
response = FileResponse(output_file, content_type="application/pdf")
|
|
response['Content-Disposition'] = 'filename="{}_{}.pdf"'.format(
|
|
bill.reference, bill.uuid
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
class OrderViewSet(viewsets.ReadOnlyModelViewSet):
|
|
serializer_class = OrderSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return Order.objects.filter(owner=self.request.user)
|
|
|
|
class BillingAddressViewSet(mixins.CreateModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
mixins.ListModelMixin,
|
|
viewsets.GenericViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'update':
|
|
return UpdateBillingAddressSerializer
|
|
else:
|
|
return BillingAddressSerializer
|
|
|
|
def get_queryset(self):
|
|
return self.request.user.billingaddress_set.all()
|
|
|
|
def create(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Validate VAT numbers.
|
|
country = serializer.validated_data["country"]
|
|
|
|
# We ignore empty VAT numbers.
|
|
if 'vat_number' in serializer.validated_data and serializer.validated_data["vat_number"] != "":
|
|
vat_number = serializer.validated_data["vat_number"]
|
|
|
|
if not validate_vat(country, vat_number):
|
|
return Response(
|
|
{'error': 'Malformed VAT number.'},
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
elif country in EU_COUNTRY_CODES:
|
|
# XXX: make a synchroneous call to a third patry API here might not be a good idea..
|
|
try:
|
|
vies_state = vies.check_vat(country, vat_number)
|
|
if not vies_state.valid:
|
|
return Response(
|
|
{'error': 'European VAT number does not exist in VIES.'},
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
return Response(
|
|
{'error': 'Could not validate EU VAT number against VIES. Try again later..'},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
serializer.save(owner=request.user)
|
|
return Response(serializer.data)
|
|
|
|
###
|
|
# Admin stuff.
|
|
|
|
class AdminPaymentViewSet(viewsets.ModelViewSet):
|
|
serializer_class = PaymentSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
return Payment.objects.all()
|
|
|
|
def create(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
serializer.save(timestamp=datetime.now())
|
|
|
|
headers = self.get_success_headers(serializer.data)
|
|
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
|
|
|
|
# Bills are generated from orders and should not be created or updated by hand.
|
|
class AdminBillViewSet(BillViewSet):
|
|
serializer_class = BillSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
return Bill.objects.all()
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def unpaid(self, request):
|
|
unpaid_bills = []
|
|
# XXX: works but we can do better than number of users + 1 SQL requests...
|
|
for user in get_user_model().objects.all():
|
|
unpaid_bills = unpaid_bills + Bill.get_unpaid_for(self.request.user)
|
|
|
|
serializer = self.get_serializer(unpaid_bills, many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def generate(self, request):
|
|
users = get_user_model().objects.all()
|
|
|
|
generated_bills = []
|
|
for user in users:
|
|
now = timezone.now()
|
|
generated_bills = generated_bills + Bill.generate_for(
|
|
year=now.year,
|
|
month=now.month,
|
|
user=user)
|
|
|
|
return Response(
|
|
map(lambda b: b.reference, generated_bills),
|
|
status=status.HTTP_200_OK)
|
|
|
|
class AdminOrderViewSet(mixins.ListModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.CreateModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
viewsets.GenericViewSet):
|
|
serializer_class = OrderSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
def get_serializer(self, *args, **kwargs):
|
|
return self.serializer_class(*args, **kwargs, admin=True)
|
|
|
|
def get_queryset(self):
|
|
return Order.objects.all()
|
|
|
|
# Updates create a new order and terminate the 'old' one.
|
|
@transaction.atomic
|
|
def update(self, request, *args, **kwargs):
|
|
order = self.get_object()
|
|
partial = kwargs.pop('partial', False)
|
|
serializer = self.get_serializer(order, data=request.data, partial=partial)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Clone existing order for replacement.
|
|
replacing_order = deepcopy(order)
|
|
|
|
# Yes, that's how you make a new entry in DB:
|
|
# https://docs.djangoproject.com/en/3.0/topics/db/queries/#copying-model-instances
|
|
replacing_order.pk = None
|
|
|
|
for attr, value in serializer.validated_data.items():
|
|
setattr(replacing_order, attr, value)
|
|
|
|
# Save replacing order and terminate 'previous' one.
|
|
replacing_order.save()
|
|
order.replaced_by = replacing_order
|
|
order.save()
|
|
order.terminate()
|
|
|
|
return Response(replacing_order)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def terminate(self, request, pk):
|
|
order = self.get_object()
|
|
if order.is_terminated:
|
|
return Response(
|
|
{'error': 'Order is already terminated.'},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
else:
|
|
order.terminate()
|
|
return Response({}, status=status.HTTP_200_OK)
|