import json import decouple import pyotp from os.path import join from helper import is_valid_otp from config import etcd_client class Field: def __init__(self, _name, _type, _value=None): self.name = _name self.value = _value self.type = _type self.__errors = [] def validation(self): return True def is_valid(self): if self.value == KeyError: self.add_error("'{}' field is a required field".format(self.name)) else: if not isinstance(self.value, self.type): self.add_error("Incorrect Type for '{}' field".format(self.name)) else: self.validation() if self.__errors: return False return True def get_errors(self): return self.__errors def add_error(self, error): self.__errors.append(error) class BaseSchema: def __init__(self, data, fields=None): _ = data # suppress linter warning self.__errors = [] if fields is None: self.fields = [] else: self.fields = fields def validation(self): # custom validation is optional return True def is_valid(self): for field in self.fields: field.is_valid() self.add_field_errors(field) for parent in self.__class__.__bases__: try: parent.validation(self) except AttributeError: pass if not self.__errors: self.validation() if self.__errors: return False return True def get_errors(self): return {"message": self.__errors} def add_field_errors(self, field: Field): self.__errors += field.get_errors() def add_error(self, error): self.__errors.append(error) class OTPSchema(BaseSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) self.realm = Field("realm", str, data.get("realm", KeyError)) self.token = Field("token", str, data.get("token", KeyError)) self.auth_name = Field("auth-name", str, data.get("auth-name", KeyError)) self.auth_realm = Field("auth-realm", str, data.get("auth-realm", KeyError)) self.auth_token = Field("auth-token", str, data.get("auth-token", KeyError)) self.auth_realm.validation = self.auth_realm_validation _fields = [self.name, self.realm, self.token, self.auth_name, self.auth_realm, self.auth_token] if fields: _fields += fields super().__init__(data=data, fields=_fields) def auth_realm_validation(self): if self.auth_realm.value != decouple.config("AUTH_REALM"): self.add_error( "Authentication realm must be {}".format(decouple.config("AUTH_REALM")) ) def validation(self): if is_valid_otp(etcd_client, self.auth_name.value, self.auth_realm.value, self.auth_token.value): if is_valid_otp(etcd_client, self.name.value, self.realm.value, self.token.value): _key = join(decouple.config("BASE_PREFIX"), self.realm.value, self.name.value) entry = etcd_client.get(_key, value_in_json=True) if not entry: self.add_error("No such Account Found") else: self.add_error("Invalid OTP Credentials") else: self.add_error("Invalid Auth Credentials") class CreateOTPSchema(BaseSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) self.realm = Field("realm", str, data.get("realm", KeyError)) self.admin_name = Field("admin-name", str, data.get("admin-name", KeyError)) self.admin_realm = Field("admin-realm", str, data.get("admin-realm", KeyError)) self.admin_token = Field("admin-token", str, data.get("admin-token", KeyError)) self.admin_realm.validation = self.admin_realm_validation _fields = [self.name, self.realm, self.admin_name, self.admin_realm, self.admin_token] if fields: _fields += fields super().__init__(data=data, fields=_fields) def admin_realm_validation(self): if self.admin_realm.value != decouple.config("ADMIN_REALM"): self.add_field_errors( 'Admin must be from {} realm'.format(decouple.config('ADMIN_REALM')) ) def validation(self): if is_valid_otp(etcd_client, self.admin_name.value, self.admin_realm.value, self.admin_token.value): _key = join(decouple.config("BASE_PREFIX"), self.realm.value, self.name.value) if etcd_client.get(_key): self.add_error("Account already exists") else: self.add_error("Invalid Admin OTP Credentials")