uncloud-mravi/uncloud_pay/views.py
2021-12-26 20:36:31 +01:00

460 lines
17 KiB
Python

from django.contrib.auth.mixins import LoginRequiredMixin
from django.views.generic.base import TemplateView, View
from django.views.generic import DetailView
from django.shortcuts import render
from django.views.decorators.cache import cache_control
from django.db import transaction
from django.contrib.auth import get_user_model
from rest_framework import viewsets, mixins, permissions, status, views
from rest_framework.renderers import TemplateHTMLRenderer
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.reverse import reverse
from rest_framework.decorators import renderer_classes
from vat_validator import validate_vat, vies
from vat_validator.countries import EU_COUNTRY_CODES
from hardcopy import bytestring_to_pdf
from django.core.files.temp import NamedTemporaryFile
from django.views.generic.list import ListView
from django.http import FileResponse, HttpResponseRedirect
from django.template.loader import render_to_string
#from wkhtmltopdf.views import PDFTemplateResponse
from copy import deepcopy
import json
import logging
from .models import *
from .serializers import *
from .selectors import *
from .utils import get_order_total_with_vat
from datetime import datetime
from vat_validator import sanitize_vat
import uncloud_pay.stripe as uncloud_stripe
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.http import JsonResponse
import stripe
logger = logging.getLogger(__name__)
class PricingView(View):
def get(self, request, **args):
vat_rate = False
vat_validation_status = False
address = False
selected_country = request.GET.get('country', False)
if self.request.user and self.request.user.is_authenticated:
address = get_billing_address_for_user(self.request.user)
if address and (address.country == selected_country or not selected_country):
vat_rate = VATRate.get_vat_rate(address)
vat_validation_status = "verified" if address.vat_number_validated_on and address.vat_number_verified else False
elif selected_country:
vat_rate = VATRate.get_vat_rate_for_country(selected_country)
vat_validation_status = False
pricing = get_order_total_with_vat(
request.GET.get('cores'),
request.GET.get('memory'),
request.GET.get('storage'),
pricing_name = args['name'],
vat_rate = vat_rate * 100,
vat_validation_status = vat_validation_status
)
return JsonResponse(pricing)
class CardActivateView(View):
def post(self, request, **args):
card_id = request.POST.get('card_id')
if card_id:
matched_card = StripeCreditCard.objects.filter(owner=self.request.user, id=card_id).first()
matched_card.activate()
return JsonResponse({'success': 1})
else:
return JsonResponse({'error': "Please select a card"})
class RegisterCard(TemplateView):
template_name = "uncloud_pay/register_stripe.html"
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def get_context_data(self, **kwargs):
customer_id = uncloud_stripe.get_customer_id_for(self.request.user)
setup_intent = uncloud_stripe.create_setup_intent(customer_id)
context = super().get_context_data(**kwargs)
context['client_secret'] = setup_intent.client_secret
context['username'] = self.request.user.username
context['stripe_pk'] = uncloud_stripe.public_api_key
return context
class OrderSuccessView(DetailView):
template_name = "uncloud_pay/order_success.html"
context_object_name = "order"
model = Order
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
@cache_control(no_cache=True, must_revalidate=True, no_store=True)
def get(self, request, *args, **kwargs):
if ('order' not in request.session or ('bill_id' not in request.session)):
return HttpResponseRedirect(reverse('nextcloud:index'))
context = {
'order': self.request.session.get('order'),
'bill_id': self.request.session['bill_id'],
'balance': get_balance_for_user(self.request.user)
}
return render(request, self.template_name, context)
class InvoiceDownloadView(View):
template = 'uncloud_pay/invoice.html'
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def get_context_data(self, **kwargs):
context = {'base_url': f'{self.request.scheme}://{self.request.get_host()}'}
return context
def get(self, request, bill_id):
cmd_options = settings.REPORT_FORMAT
context = self.get_context_data()
bill = Bill.objects.get(owner=self.request.user, id=bill_id)
if bill:
context['bill'] = bill
context['vat_rate'] = str(round(bill.vat_rate * 100, 2))
context['tax_amount'] = round(bill.vat_rate * bill.subtotal, 2)
return PDFTemplateResponse(request=request,
template=self.template,
filename = f"bill-{bill_id}.pdf",
cmd_options= cmd_options,
footer_template= 'uncloud_pay/includes/invoice_footer.html',
context= context)
class PaymentsView(ListView):
template_name = "uncloud_pay/payments.html"
model = Payment
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(PaymentsView, self).get_context_data(**kwargs)
context.update({
'balance': get_balance_for_user(self.request.user),
'type': self.request.GET.get('type')
})
return context
def get_queryset(self):
if self.request.GET.get('type'):
return Payment.objects.filter(owner=self.request.user, type=self.request.GET.get('type')).order_by('-timestamp')
return Payment.objects.filter(owner=self.request.user).order_by('-timestamp')
class CardsView(ListView):
template_name = "uncloud_pay/cards.html"
model = StripeCreditCard
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(CardsView, self).get_context_data(**kwargs)
customer_id = uncloud_stripe.get_customer_id_for(self.request.user)
setup_intent = uncloud_stripe.create_setup_intent(customer_id)
context = super().get_context_data(**kwargs)
context.update({
'balance': get_balance_for_user(self.request.user),
'client_secret': setup_intent.client_secret,
'username': self.request.user.username,
'stripe_pk':uncloud_stripe.public_api_key,
'min_amount': settings.MIN_PER_TRANSACTION
})
return context
def get_queryset(self):
uncloud_stripe.sync_cards_for_user(self.request.user)
return StripeCreditCard.objects.filter(owner=self.request.user).order_by('-active')
class BillsView(ListView):
template_name = "uncloud_pay/bills.html"
model = Bill
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def get_context_data(self, **kwargs):
context = super(BillsView, self).get_context_data(**kwargs)
context = super().get_context_data(**kwargs)
context.update({
'balance': get_balance_for_user(self.request.user),
})
return context
def get_queryset(self):
return Bill.objects.filter(owner=self.request.user).order_by('-creation_date')
class CreditCardViewSet(mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet):
serializer_class = StripeCreditCardSerializer
permission_classes = [permissions.IsAuthenticated]
def list(self, request):
uncloud_stripe.sync_cards_for_user(self.request.user)
return super().list(request)
def get_queryset(self):
return StripeCreditCard.objects.filter(owner=self.request.user)
class PaymentViewSet(viewsets.ModelViewSet):
serializer_class = PaymentSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return Payment.objects.filter(owner=self.request.user)
class BalanceViewSet(viewsets.ViewSet):
permission_classes = [permissions.IsAuthenticated]
def list(self, request):
serializer = BalanceSerializer(data={
'balance': get_balance_for_user(self.request.user)
})
serializer.is_valid()
return Response(serializer.data)
class ListCards(TemplateView):
template_name = "uncloud_pay/list_stripe.html"
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def get_context_data(self, **kwargs):
customer_id = uncloud_stripe.get_customer_id_for(self.request.user)
cards = uncloud_stripe.get_customer_cards(customer_id)
context = super().get_context_data(**kwargs)
context['cards'] = cards
context['username'] = self.request.user
return context
###
# Bills and Orders.
class BillViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = BillSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return Bill.objects.filter(owner=self.request.user)
@action(detail=False, methods=['get'])
def unpaid(self, request):
serializer = self.get_serializer(
Bill.get_unpaid_for(self.request.user),
many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def download(self, *args, **kwargs):
"""
Allow to download
"""
bill = self.get_object()
provider = UncloudProvider.get_provider()
output_file = NamedTemporaryFile()
bill_html = render_to_string("bill.html.j2", {'bill': bill})
bytestring_to_pdf(bill_html.encode('utf-8'), output_file)
response = FileResponse(output_file, content_type="application/pdf")
response['Content-Disposition'] = 'filename="{}_{}.pdf"'.format(
bill.reference, bill.uuid
)
return response
class OrderViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = OrderSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return Order.objects.filter(owner=self.request.user)
class VATRateViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = VATRateSerializer
permission_classes = [permissions.IsAuthenticated]
queryset = VATRate.objects.all()
class BillingAddressViewSet(mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet):
permission_classes = [permissions.IsAuthenticated]
def get_serializer_class(self):
if self.action == 'update':
return UpdateBillingAddressSerializer
else:
return BillingAddressSerializer
def get_queryset(self):
return self.request.user.billing_addresses.all()
def create(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Validate VAT numbers.
country = serializer.validated_data["country"]
# We ignore empty VAT numbers.
if 'vat_number' in serializer.validated_data and serializer.validated_data["vat_number"] != "":
vat_number = serializer.validated_data["vat_number"]
if not validate_vat(country, vat_number):
return Response(
{'error': 'Malformed VAT number.'},
status=status.HTTP_400_BAD_REQUEST)
elif country in EU_COUNTRY_CODES:
# XXX: make a synchroneous call to a third patry API here might not be a good idea..
try:
vies_state = vies.check_vat(country, vat_number)
if not vies_state.valid:
return Response(
{'error': 'European VAT number does not exist in VIES.'},
status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.warning(e)
return Response(
{'error': 'Could not validate EU VAT number against VIES. Try again later..'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
serializer.save(owner=request.user)
return Response(serializer.data)
###
# Admin stuff.
class AdminPaymentViewSet(viewsets.ModelViewSet):
serializer_class = PaymentSerializer
permission_classes = [permissions.IsAdminUser]
def get_queryset(self):
return Payment.objects.all()
def create(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save(timestamp=datetime.now())
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
# Bills are generated from orders and should not be created or updated by hand.
class AdminBillViewSet(BillViewSet):
serializer_class = BillSerializer
permission_classes = [permissions.IsAdminUser]
def get_queryset(self):
return Bill.objects.all()
@action(detail=False, methods=['get'])
def unpaid(self, request):
unpaid_bills = []
# XXX: works but we can do better than number of users + 1 SQL requests...
for user in get_user_model().objects.all():
unpaid_bills = unpaid_bills + Bill.get_unpaid_for(self.request.user)
serializer = self.get_serializer(unpaid_bills, many=True)
return Response(serializer.data)
@action(detail=False, methods=['post'])
def generate(self, request):
users = get_user_model().objects.all()
generated_bills = []
for user in users:
now = timezone.now()
generated_bills = generated_bills + Bill.generate_for(
year=now.year,
month=now.month,
user=user)
return Response(
map(lambda b: b.reference, generated_bills),
status=status.HTTP_200_OK)
class AdminOrderViewSet(mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.CreateModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet):
serializer_class = OrderSerializer
permission_classes = [permissions.IsAdminUser]
def get_serializer(self, *args, **kwargs):
return self.serializer_class(*args, **kwargs, admin=True)
def get_queryset(self):
return Order.objects.all()
# Updates create a new order and terminate the 'old' one.
@transaction.atomic
def update(self, request, *args, **kwargs):
order = self.get_object()
partial = kwargs.pop('partial', False)
serializer = self.get_serializer(order, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
# Clone existing order for replacement.
replacing_order = deepcopy(order)
# Yes, that's how you make a new entry in DB:
# https://docs.djangoproject.com/en/3.0/topics/db/queries/#copying-model-instances
replacing_order.pk = None
for attr, value in serializer.validated_data.items():
setattr(replacing_order, attr, value)
# Save replacing order and terminate 'previous' one.
replacing_order.save()
order.replaced_by = replacing_order
order.save()
order.terminate()
return Response(replacing_order)
@action(detail=True, methods=['post'])
def terminate(self, request, pk):
order = self.get_object()
if order.is_terminated:
return Response(
{'error': 'Order is already terminated.'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
order.terminate()
return Response({}, status=status.HTTP_200_OK)