222 lines
8.3 KiB
Python
222 lines
8.3 KiB
Python
import logging
|
|
import config
|
|
import json
|
|
import math
|
|
|
|
from config import ldap_manager
|
|
from helper import resolve_product
|
|
|
|
etcd_client = config.etcd_client
|
|
|
|
|
|
class ValidationException(Exception):
|
|
"""Validation Error"""
|
|
|
|
|
|
class Field:
|
|
def __init__(self, _name, _type, _value=None, validators=None, disable_validation=False):
|
|
self.validation_disabled = disable_validation
|
|
self.name = _name
|
|
self.value = _value
|
|
self.type = _type
|
|
self.validators = validators or []
|
|
|
|
def is_valid(self):
|
|
if not self.validation_disabled:
|
|
if not isinstance(self.value, self.type):
|
|
try:
|
|
self.value = self.type(self.value)
|
|
except Exception:
|
|
raise ValidationException("Incorrect Type for '{}' field".format(self.name))
|
|
|
|
for validator in self.validators:
|
|
validator()
|
|
|
|
def __repr__(self):
|
|
return self.name
|
|
|
|
|
|
class BaseSchema:
|
|
def __init__(self):
|
|
self.objects = {}
|
|
|
|
def validation(self):
|
|
# custom validation is optional
|
|
return True
|
|
|
|
def get_fields(self):
|
|
return [getattr(self, field) for field in dir(self) if isinstance(getattr(self, field), Field)]
|
|
|
|
def is_valid(self):
|
|
for field in self.get_fields():
|
|
field.is_valid()
|
|
self.validation()
|
|
|
|
def get_cleaned_values(self):
|
|
field_kv_dict = {
|
|
field.name: field.value
|
|
for field in self.get_fields()
|
|
}
|
|
cleaned_values = field_kv_dict
|
|
cleaned_values.update(self.objects)
|
|
|
|
return cleaned_values
|
|
|
|
def add_schema(self, schema, data, under_field_name=None):
|
|
s = schema(data)
|
|
s.is_valid()
|
|
|
|
base = self
|
|
if under_field_name:
|
|
# Create a field in self
|
|
setattr(self, under_field_name, Field(under_field_name, dict, _value={}, disable_validation=True))
|
|
base = getattr(self, under_field_name)
|
|
|
|
for field in s.get_fields():
|
|
if under_field_name:
|
|
getattr(base, 'value')[field.name] = field.value
|
|
else:
|
|
setattr(base, field.name, field)
|
|
|
|
self.objects.update(s.objects)
|
|
|
|
@staticmethod
|
|
def get(dictionary: dict, key: str, return_default=False, default=None):
|
|
if dictionary is None:
|
|
raise ValidationException('No data provided at all.')
|
|
try:
|
|
value = dictionary[key]
|
|
except KeyError:
|
|
if return_default:
|
|
return {'_value': default, 'disable_validation': True}
|
|
raise ValidationException("Missing data for '{}' field.".format(key))
|
|
else:
|
|
return {'_value': value, 'disable_validation': False}
|
|
|
|
|
|
class AddProductSchema(BaseSchema):
|
|
def __init__(self, data):
|
|
super().__init__()
|
|
self.add_schema(UserCredentialSchema, data)
|
|
self.specs = Field('specs', dict, **self.get(data, 'specs'))
|
|
self.update = Field('update', bool, **self.get(data, 'update', return_default=True, default=False))
|
|
|
|
def validation(self):
|
|
user = self.objects['user']
|
|
user = json.loads(user.entry_to_json())
|
|
uid, ou, *dc = user['dn'].replace('ou=', '').replace('dc=', '').replace('uid=', '').split(',')
|
|
if ou != config.config['ldap']['internal_user_ou']:
|
|
raise ValidationException('You do not have access to create product.')
|
|
|
|
product = resolve_product(self.specs.value['usable-id'], etcd_client)
|
|
if product:
|
|
self.objects['product'] = product
|
|
|
|
|
|
class AddressSchema(BaseSchema):
|
|
def __init__(self, data):
|
|
super().__init__()
|
|
self.line1 = Field('line1', str, **self.get(data, 'line1'))
|
|
self.line2 = Field('line2', str, **self.get(data, 'line2', return_default=True))
|
|
self.city = Field('city', str, **self.get(data, 'city'))
|
|
self.country = Field('country', str, **self.get(data, 'country'))
|
|
self.state = Field('state', str, **self.get(data, 'state', return_default=True))
|
|
self.postal_code = Field('postal_code', str, **self.get(data, 'postal_code', return_default=True))
|
|
|
|
|
|
class UserRegisterPaymentSchema(BaseSchema):
|
|
def __init__(self, data):
|
|
super().__init__()
|
|
|
|
self.add_schema(UserCredentialSchema, data)
|
|
self.add_schema(AddressSchema, data, under_field_name='address')
|
|
|
|
self.card_number = Field('card_number', str, **self.get(data, 'card_number'))
|
|
self.cvc = Field('cvc', str, **self.get(data, 'cvc'))
|
|
self.expiry_year = Field('expiry_year', int, **self.get(data, 'expiry_year'))
|
|
self.expiry_month = Field('expiry_month', int, **self.get(data, 'expiry_month'))
|
|
self.card_holder_name = Field('card_holder_name', str, **self.get(data, 'card_holder_name'))
|
|
|
|
|
|
class UserCredentialSchema(BaseSchema):
|
|
def __init__(self, data):
|
|
super().__init__()
|
|
self.username = Field('username', str, **self.get(data, 'username'))
|
|
self.password = Field('password', str, **self.get(data, 'password'))
|
|
|
|
def validation(self):
|
|
try:
|
|
entry = ldap_manager.is_password_valid(self.username.value, self.password.value, query_key='uid')
|
|
except ValueError:
|
|
raise ValidationException('No user with \'{}\' username found. You can create account at '
|
|
'https://account.ungleich.ch'.format(self.username.value))
|
|
except Exception:
|
|
raise ValidationException('Invalid username/password.')
|
|
else:
|
|
self.objects['user'] = entry
|
|
|
|
|
|
class ProductOrderSchema(BaseSchema):
|
|
def __init__(self, data):
|
|
super().__init__()
|
|
self.product_id = Field(
|
|
'product_id', str, **self.get(data, 'product_id'), validators=[self.product_id_validation]
|
|
)
|
|
self.pay_consent = Field('pay', bool, **self.get(data, 'pay', return_default=True, default=False))
|
|
self.add_schema(UserCredentialSchema, data)
|
|
|
|
def product_id_validation(self):
|
|
product = resolve_product(self.product_id.value, etcd_client)
|
|
if product:
|
|
product['quantity'] = float(product['quantity'])
|
|
self.product_id.value = product['uuid']
|
|
self.objects['product'] = product
|
|
logging.debug('Got product {}'.format(product))
|
|
|
|
if not product['active']:
|
|
raise ValidationException('Product is not active at the moment.')
|
|
|
|
if product['quantity'] <= 0:
|
|
raise ValidationException('Out of stock.')
|
|
else:
|
|
raise ValidationException('No such product exists.')
|
|
|
|
def validation(self):
|
|
username = self.objects['user'].uid
|
|
customer_previous_orders = etcd_client.get_prefix('/v1/user/{}'.format(username), value_in_json=True)
|
|
customer_previous_orders = [o.value for o in customer_previous_orders]
|
|
membership = next(filter(lambda o: o['product'] == 'membership', customer_previous_orders), None)
|
|
if membership is None and self.objects['product']['usable-id'] != 'membership':
|
|
raise ValidationException('Please buy membership first to use this facility')
|
|
max_quantity_user_can_order = float(self.objects['product'].get('max_per_user', math.inf))
|
|
previous_order_of_same_product = [
|
|
o for o in customer_previous_orders if o['product'] == self.objects['product']['usable-id']
|
|
]
|
|
if len(previous_order_of_same_product) >= max_quantity_user_can_order:
|
|
raise ValidationException(
|
|
'You cannot buy {} more than {} times'.format(
|
|
self.objects['product']['name'], int(max_quantity_user_can_order)
|
|
)
|
|
)
|
|
|
|
|
|
class OrderListSchema(BaseSchema):
|
|
def __init__(self, data):
|
|
super().__init__()
|
|
self.add_schema(UserCredentialSchema, data)
|
|
|
|
|
|
def make_return_message(err, status_code=200):
|
|
logging.debug('message: {}'.format(str(err)))
|
|
return {'message': str(err)}, status_code
|
|
|
|
|
|
def create_schema(specification, data):
|
|
fields = {}
|
|
for feature_name, feature_detail in specification['features'].items():
|
|
if not feature_detail['constant']:
|
|
fields[feature_name] = Field(
|
|
feature_name, eval(feature_detail['unit']['type']), **BaseSchema.get(data, feature_name)
|
|
)
|
|
|
|
return type('{}Schema'.format(specification['name']), (BaseSchema,), fields)
|