import logging import config import json import math from config import ldap_manager from helper import resolve_product etcd_client = config.etcd_client class ValidationException(Exception): """Validation Error""" class Field: def __init__(self, _name, _type, _value=None, validators=None, disable_validation=False): self.validation_disabled = disable_validation self.name = _name self.value = _value self.type = _type self.validators = validators or [] def is_valid(self): if not self.validation_disabled: if not isinstance(self.value, self.type): try: self.value = self.type(self.value) except Exception: raise ValidationException("Incorrect Type for '{}' field".format(self.name)) for validator in self.validators: validator() def __repr__(self): return self.name class BaseSchema: def __init__(self): self.objects = {} def validation(self): # custom validation is optional return True def get_fields(self): return [getattr(self, field) for field in dir(self) if isinstance(getattr(self, field), Field)] def is_valid(self): for field in self.get_fields(): field.is_valid() self.validation() def get_cleaned_values(self): field_kv_dict = { field.name: field.value for field in self.get_fields() } cleaned_values = field_kv_dict cleaned_values.update(self.objects) return cleaned_values def add_schema(self, schema, data, under_field_name=None): s = schema(data) s.is_valid() base = self if under_field_name: # Create a field in self setattr(self, under_field_name, Field(under_field_name, dict, _value={}, disable_validation=True)) base = getattr(self, under_field_name) for field in s.get_fields(): if under_field_name: getattr(base, 'value')[field.name] = field.value else: setattr(base, field.name, field) self.objects.update(s.objects) @staticmethod def get(dictionary: dict, key: str, return_default=False, default=None): if dictionary is None: raise ValidationException('No data provided at all.') try: value = dictionary[key] except KeyError: if return_default: return {'_value': default, 'disable_validation': True} raise ValidationException("Missing data for '{}' field.".format(key)) else: return {'_value': value, 'disable_validation': False} class AddProductSchema(BaseSchema): def __init__(self, data): super().__init__() self.add_schema(UserCredentialSchema, data) self.specs = Field('specs', dict, **self.get(data, 'specs')) self.update = Field('update', bool, **self.get(data, 'update', return_default=True, default=False)) def validation(self): user = self.objects['user'] user = json.loads(user.entry_to_json()) uid, ou, *dc = user['dn'].replace('ou=', '').replace('dc=', '').replace('uid=', '').split(',') if ou != config.config['ldap']['internal_user_ou']: raise ValidationException('You do not have access to create product.') product = resolve_product(self.specs.value['usable-id'], etcd_client) if product: self.objects['product'] = product class AddressSchema(BaseSchema): def __init__(self, data): super().__init__() self.line1 = Field('line1', str, **self.get(data, 'line1')) self.line2 = Field('line2', str, **self.get(data, 'line2', return_default=True)) self.city = Field('city', str, **self.get(data, 'city')) self.country = Field('country', str, **self.get(data, 'country')) self.state = Field('state', str, **self.get(data, 'state', return_default=True)) self.postal_code = Field('postal_code', str, **self.get(data, 'postal_code', return_default=True)) class UserRegisterPaymentSchema(BaseSchema): def __init__(self, data): super().__init__() self.add_schema(UserCredentialSchema, data) self.add_schema(AddressSchema, data, under_field_name='address') self.card_number = Field('card_number', str, **self.get(data, 'card_number')) self.cvc = Field('cvc', str, **self.get(data, 'cvc')) self.expiry_year = Field('expiry_year', int, **self.get(data, 'expiry_year')) self.expiry_month = Field('expiry_month', int, **self.get(data, 'expiry_month')) self.card_holder_name = Field('card_holder_name', str, **self.get(data, 'card_holder_name')) class UserCredentialSchema(BaseSchema): def __init__(self, data): super().__init__() self.username = Field('username', str, **self.get(data, 'username')) self.password = Field('password', str, **self.get(data, 'password')) def validation(self): try: entry = ldap_manager.is_password_valid(self.username.value, self.password.value, query_key='uid') except ValueError: raise ValidationException('No user with \'{}\' username found. You can create account at ' 'https://account.ungleich.ch'.format(self.username.value)) except Exception: raise ValidationException('Invalid username/password.') else: self.objects['user'] = entry class ProductOrderSchema(BaseSchema): def __init__(self, data): super().__init__() self.product_id = Field( 'product_id', str, **self.get(data, 'product_id'), validators=[self.product_id_validation] ) self.pay_consent = Field('pay', bool, **self.get(data, 'pay', return_default=True, default=False)) self.add_schema(UserCredentialSchema, data) def product_id_validation(self): product = resolve_product(self.product_id.value, etcd_client) if product: product['quantity'] = float(product['quantity']) self.product_id.value = product['uuid'] self.objects['product'] = product logging.debug('Got product {}'.format(product)) if not product['active']: raise ValidationException('Product is not active at the moment.') if product['quantity'] <= 0: raise ValidationException('Out of stock.') else: raise ValidationException('No such product exists.') def validation(self): username = self.objects['user'].uid customer_previous_orders = etcd_client.get_prefix('/v1/user/{}'.format(username), value_in_json=True) customer_previous_orders = [o.value for o in customer_previous_orders] membership = next(filter(lambda o: o['product'] == 'membership', customer_previous_orders), None) if membership is None and self.objects['product']['usable-id'] != 'membership': raise ValidationException('Please buy membership first to use this facility') max_quantity_user_can_order = float(self.objects['product'].get('max_per_user', math.inf)) previous_order_of_same_product = [ o for o in customer_previous_orders if o['product'] == self.objects['product']['usable-id'] ] if len(previous_order_of_same_product) >= max_quantity_user_can_order: raise ValidationException( 'You cannot buy {} more than {} times'.format( self.objects['product']['name'], int(max_quantity_user_can_order) ) ) class OrderListSchema(BaseSchema): def __init__(self, data): super().__init__() self.add_schema(UserCredentialSchema, data) def make_return_message(err, status_code=200): logging.debug('message: {}'.format(str(err))) return {'message': str(err)}, status_code def create_schema(specification, data): fields = {} for feature_name, feature_detail in specification['features'].items(): if not feature_detail['constant']: fields[feature_name] = Field( feature_name, eval(feature_detail['unit']['type']), **BaseSchema.get(data, feature_name) ) return type('{}Schema'.format(specification['name']), (BaseSchema,), fields)