import json import decouple import pyotp from os.path import join from helper import is_valid_otp from config import etcd_client from typing import Union class Field: def __init__(self, _name, _type, _value=None): self.name = _name self.value = _value self.type = _type self.__errors = [] def validation(self): return True def is_valid(self): if self.value == KeyError: self.add_error("'{}' field is a required field".format(self.name)) else: try: _type = self.type.__args__ except Exception: _type = self.type if not isinstance(self.value, _type): self.add_error("Incorrect Type for '{}' field".format(self.name)) else: self.validation() if self.__errors: return False return True def get_errors(self): return self.__errors def add_error(self, error): self.__errors.append(error) class BaseSchema: def __init__(self, data, fields=None): self.__errors = [] if fields is None: self.fields = [] else: self.fields = fields def validation(self): # custom validation is optional return True def is_valid(self): for field in self.fields: field.is_valid() self.add_field_errors(field) for parent in self.__class__.__bases__: try: parent.validation(self) except AttributeError: pass if not self.__errors: self.validation() if self.__errors: return False return True def get_errors(self): return {"message": self.__errors} def add_field_errors(self, field: Field): self.__errors += field.get_errors() def add_error(self, error): self.__errors.append(error) class DataRequiredSchema(BaseSchema): def __init__(self, data, fields=None): if data is None: self.add_error("No Data is provided.") data = {} super().__init__(data, fields=fields) class OTPSchema(DataRequiredSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) self.realm = Field("realm", str, data.get("realm", KeyError)) self.token = Field("token", str, data.get("token", KeyError)) self.auth_name = Field("auth_name", str, data.get("auth_name", KeyError)) self.auth_realm = Field("auth_realm", str, data.get("auth_realm", KeyError)) self.auth_token = Field("auth_token", str, data.get("auth_token", KeyError)) self.auth_realm.validation = self.auth_realm_validation _fields = [ self.name, self.realm, self.token, self.auth_name, self.auth_realm, self.auth_token, ] if fields: _fields += fields super().__init__(data=data, fields=_fields) def auth_realm_validation(self): if self.auth_realm.value != decouple.config("AUTH_REALM"): self.add_error( "Authentication realm must be {}".format(decouple.config("AUTH_REALM")) ) def validation(self): if is_valid_otp( etcd_client, self.auth_name.value, self.auth_realm.value, self.auth_token.value, ): if is_valid_otp( etcd_client, self.name.value, self.realm.value, self.token.value ): _key = join(decouple.config("BASE_PREFIX"), self.name.value) entry = etcd_client.get(_key, value_in_json=True) if not entry: self.add_error("No such Account Found") else: self.add_error("Invalid OTP Credentials") else: self.add_error("Invalid Auth Credentials") class CreateOTPSchema(DataRequiredSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) self.realm = Field("realm", Union[str, list], data.get("realm", KeyError)) self.admin_name = Field("admin_name", str, data.get("admin_name", KeyError)) self.admin_realm = Field("admin_realm", str, data.get("admin_realm", KeyError)) self.admin_token = Field("admin_token", str, data.get("admin_token", KeyError)) self.admin_realm.validation = self.admin_realm_validation _fields = [ self.name, self.realm, self.admin_name, self.admin_realm, self.admin_token, ] if fields: _fields += fields super().__init__(data=data, fields=_fields) def admin_realm_validation(self): if self.admin_realm.value != decouple.config("ADMIN_REALM"): self.add_error( "Admin must be from {} realm".format(decouple.config("ADMIN_REALM")) ) def validation(self): if is_valid_otp( etcd_client, self.admin_name.value, self.admin_realm.value, self.admin_token.value, ): _key = join(decouple.config("BASE_PREFIX"), self.name.value) if etcd_client.get(_key): self.add_error("Account already exists") else: self.add_error("Invalid Admin OTP Credentials") class DeleteOTPSchema(DataRequiredSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) self.admin_name = Field("admin_name", str, data.get("admin_name", KeyError)) self.admin_realm = Field("admin_realm", str, data.get("admin_realm", KeyError)) self.admin_token = Field("admin_token", str, data.get("admin_token", KeyError)) self.admin_realm.validation = self.admin_realm_validation _fields = [self.name, self.admin_name, self.admin_realm, self.admin_token] if fields: _fields += fields super().__init__(data=data, fields=_fields) def admin_realm_validation(self): if self.admin_realm.value != decouple.config("ADMIN_REALM"): self.add_error( "Admin must be from {} realm".format(decouple.config("ADMIN_REALM")) ) def validation(self): if is_valid_otp( etcd_client, self.admin_name.value, self.admin_realm.value, self.admin_token.value, ): _key = join(decouple.config("BASE_PREFIX"), self.name.value) if not etcd_client.get(_key): self.add_error("Account does not exists") else: self.add_error("Invalid Admin OTP Credentials") class ListAccountSchema(DataRequiredSchema): def __init__(self, data: dict, fields=None): data = data or {"": None} self.admin_name = Field("admin_name", str, data.get("admin_name", KeyError)) self.admin_realm = Field("admin_realm", str, data.get("admin_realm", KeyError)) self.admin_token = Field("admin_token", str, data.get("admin_token", KeyError)) self.admin_realm.validation = self.admin_realm_validation _fields = [self.admin_name, self.admin_realm, self.admin_token] if fields: _fields += fields super().__init__(data=data, fields=_fields) def admin_realm_validation(self): if self.admin_realm.value != decouple.config("ADMIN_REALM"): self.add_error( "Admin must be from {} realm".format(decouple.config("ADMIN_REALM")) ) def validation(self): if not is_valid_otp( etcd_client, self.admin_name.value, self.admin_realm.value, self.admin_token.value, ): self.add_error("Invalid Admin OTP Credentials")