uncloud/uncloud_pay/stripe.py

255 lines
8.4 KiB
Python

import stripe
import stripe.error
import logging
import datetime
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.conf import settings
from django.contrib.auth import get_user_model
from .models import StripeCustomer, StripeCreditCard
logger = logging.getLogger(__name__)
CURRENCY = 'chf'
stripe.api_key = settings.STRIPE_KEY
def handle_stripe_error(f):
def handle_problems(*args, **kwargs):
response = {
'paid': False,
'response_object': None,
'error': None
}
common_message = "Currently it is not possible to make payments. Please try agin later."
try:
response_object = f(*args, **kwargs)
return response_object
except stripe.error.CardError as e:
# Since it's a decline, stripe.error.CardError will be caught
body = e.json_body
logging.error(str(e))
raise e # For error handling.
except stripe.error.RateLimitError:
logging.error("Too many requests made to the API too quickly.")
raise Exception(common_message)
except stripe.error.InvalidRequestError as e:
logging.error(str(e))
raise Exception('Invalid parameters.')
except stripe.error.AuthenticationError as e:
# Authentication with Stripe's API failed
# (maybe you changed API keys recently)
logging.error(str(e))
raise Exception(common_message)
except stripe.error.APIConnectionError as e:
logging.error(str(e))
raise Exception(common_message)
except stripe.error.StripeError as e:
# XXX: maybe send email
logging.error(str(e))
raise Exception(common_message)
return handle_problems
def public_api_key():
return settings.STRIPE_PUBLIC_KEY
def get_customer_id_for(user):
try:
# .get() raise if there is no matching entry.
return StripeCustomer.objects.get(owner=user).stripe_id
except ObjectDoesNotExist:
# No entry yet - making a new one.
try:
customer = create_customer(user.username, user.email)
uncloud_stripe_mapping = StripeCustomer.objects.create(
owner=user, stripe_id=customer.id)
return uncloud_stripe_mapping.stripe_id
except Exception as e:
return None
@handle_stripe_error
def create_setup_intent(customer_id):
return stripe.SetupIntent.create(customer=customer_id)
@handle_stripe_error
def get_setup_intent(setup_intent_id):
return stripe.SetupIntent.retrieve(setup_intent_id)
@handle_stripe_error
def get_payment_method(payment_method_id):
return stripe.PaymentMethod.retrieve(payment_method_id)
@handle_stripe_error
def get_card_from_payment(user, payment_method_id):
payment_method = stripe.PaymentMethod.retrieve(payment_method_id)
if payment_method:
if 'card' in payment_method:
sync_cards_for_user(user)
return payment_method['card']
return False
@handle_stripe_error
def attach_payment_method(payment_method_id, user):
customer_id = get_customer_id_for(user)
ret = stripe.PaymentMethod.attach(payment_method_id, customer=customer_id)
sync_cards_for_user(user)
return ret
@handle_stripe_error
def create_customer(name, email):
return stripe.Customer.create(name=name, email=email)
@handle_stripe_error
def get_customer(customer_id):
return stripe.Customer.retrieve(customer_id)
@handle_stripe_error
def get_customer_cards(customer_id):
cards = []
stripe_cards = stripe.PaymentMethod.list(
customer=customer_id,
type="card",
)
for stripe_card in stripe_cards["data"]:
card = {}
card['brand'] = stripe_card["card"]["brand"]
card['last4'] = stripe_card["card"]["last4"]
card['month'] = stripe_card["card"]["exp_month"]
card['year'] = stripe_card["card"]["exp_year"]
card['id'] = stripe_card["id"]
cards.append(card)
return cards
@handle_stripe_error
def delete_card(card_id):
return stripe.PaymentMethod.detach(card_id)
def sync_cards_for_user(user):
customer_id = get_customer_id_for(user)
cards = get_customer_cards(customer_id)
active_cards = StripeCreditCard.objects.filter(owner=user,
active=True)
if len(active_cards) > 0:
has_active_card = True
else:
has_active_card = False
for card in cards:
active = False
if not has_active_card:
active = True
has_active_card = True
StripeCreditCard.objects.get_or_create(card_id=card['id'],
owner = user,
defaults = {
'last4': card['last4'],
'brand': card['brand'],
'expiry_date': datetime.date(card['year'],
card['month'],
1),
'active': active
}
)
@handle_stripe_error
def charge_customer(user, amount, currency='CHF', card=False):
# Amount is in CHF but stripes requires smallest possible unit.
# https://stripe.com/docs/api/payment_intents/create#create_payment_intent-amount
# FIXME: might need to be adjusted for other currencies
if currency == 'CHF':
adjusted_amount = int(amount * 100)
else:
return Exception("Programming error: unsupported currency")
try:
card = card or StripeCreditCard.objects.get(owner=user,
active=True)
except StripeCreditCard.DoesNotExist:
raise ValidationError("No active credit card - cannot create payment")
customer_id = get_customer_id_for(user)
return stripe.PaymentIntent.create(
amount=adjusted_amount,
currency=currency,
customer=customer_id,
payment_method=card.card_id,
off_session=True,
confirm=True,
)
@handle_stripe_error
def get_payment_intent(user, amount, currency='CHF', card=False):
# Amount is in CHF but stripes requires smallest possible unit.
# https://stripe.com/docs/api/payment_intents/create#create_payment_intent-amount
# FIXME: might need to be adjusted for other currencies
if currency == 'CHF':
adjusted_amount = int(amount * 100)
else:
return Exception("Programming error: unsupported currency")
try:
card = card or StripeCreditCard.objects.get(owner=user,
active=True)
except StripeCreditCard.DoesNotExist:
raise ValidationError("No active credit card - cannot create payment")
customer_id = get_customer_id_for(user)
return stripe.PaymentIntent.create(
amount=adjusted_amount,
currency=currency,
customer=customer_id,
payment_method=card.card_id,
setup_future_usage='off_session',
confirm=False,
)
@handle_stripe_error
def get_or_create_tax_id_for_user(stripe_customer_id, vat_number,
type="eu_vat", country=""):
def compare_vat_numbers(vat1, vat2):
_vat1 = vat1.replace(" ", "").replace(".", "").replace("-","")
_vat2 = vat2.replace(" ", "").replace(".", "").replace("-","")
return True if _vat1 == _vat2 else False
tax_ids_list = stripe.Customer.list_tax_ids(
stripe_customer_id,
limit=100,
)
for tax_id_obj in tax_ids_list.data:
if compare_vat_numbers(tax_id_obj.value, vat_number):
return tax_id_obj
else:
logger.debug(
"{val1} is not equal to {val2} or {con1} not same as "
"{con2}".format(val1=tax_id_obj.value, val2=vat_number,
con1=tax_id_obj.country.lower(),
con2=country.lower().strip()))
logger.debug(
"tax id obj does not exist for {val}. Creating a new one".format(
val=vat_number
))
tax_id_obj = stripe.Customer.create_tax_id(
stripe_customer_id,
type=type,
value=vat_number,
)
return tax_id_obj