uotp/schemas.py

248 lines
7.8 KiB
Python
Raw Normal View History

2019-10-07 17:13:42 +00:00
import json
import decouple
import pyotp
from os.path import join
from helper import is_valid_otp
from config import etcd_client
from typing import Union
2019-10-08 18:07:28 +00:00
2019-10-07 17:13:42 +00:00
class Field:
def __init__(self, _name, _type, _value=None):
self.name = _name
self.value = _value
self.type = _type
self.__errors = []
def validation(self):
return True
def is_valid(self):
if self.value == KeyError:
self.add_error("'{}' field is a required field".format(self.name))
2019-10-07 17:13:42 +00:00
else:
try:
_type = self.type.__args__
except Exception:
_type = self.type
if not isinstance(self.value, _type):
self.add_error("Incorrect Type for '{}' field".format(self.name))
2019-10-07 17:13:42 +00:00
else:
self.validation()
if self.__errors:
return False
return True
def get_errors(self):
return self.__errors
def add_error(self, error):
self.__errors.append(error)
2019-10-08 18:07:28 +00:00
2019-10-07 17:13:42 +00:00
class BaseSchema:
def __init__(self, data, fields=None):
self.__errors = []
if fields is None:
self.fields = []
else:
self.fields = fields
def validation(self):
# custom validation is optional
return True
def is_valid(self):
for field in self.fields:
field.is_valid()
self.add_field_errors(field)
for parent in self.__class__.__bases__:
try:
parent.validation(self)
except AttributeError:
pass
if not self.__errors:
self.validation()
if self.__errors:
return False
return True
def get_errors(self):
return {"message": self.__errors}
def add_field_errors(self, field: Field):
self.__errors += field.get_errors()
def add_error(self, error):
self.__errors.append(error)
2019-10-08 18:07:28 +00:00
class DataRequiredSchema(BaseSchema):
def __init__(self, data, fields=None):
if data is None:
self.add_error("No Data is provided.")
data = {}
super().__init__(data, fields=fields)
class OTPSchema(DataRequiredSchema):
2019-10-07 17:13:42 +00:00
def __init__(self, data: dict, fields=None):
self.name = Field("name", str, data.get("name", KeyError))
self.realm = Field("realm", str, data.get("realm", KeyError))
self.token = Field("token", str, data.get("token", KeyError))
self.auth_name = Field("auth_name", str, data.get("auth_name", KeyError))
self.auth_realm = Field("auth_realm", str, data.get("auth_realm", KeyError))
self.auth_token = Field("auth_token", str, data.get("auth_token", KeyError))
2019-10-07 17:13:42 +00:00
self.auth_realm.validation = self.auth_realm_validation
2019-10-08 18:07:28 +00:00
_fields = [
self.name,
self.realm,
self.token,
self.auth_name,
self.auth_realm,
self.auth_token,
]
2019-10-07 17:13:42 +00:00
if fields:
_fields += fields
2019-10-08 18:07:28 +00:00
super().__init__(data=data, fields=_fields)
2019-10-07 17:13:42 +00:00
def auth_realm_validation(self):
if self.auth_realm.value != decouple.config("AUTH_REALM"):
self.add_error(
"Authentication realm must be {}".format(decouple.config("AUTH_REALM"))
2019-10-07 17:13:42 +00:00
)
2019-10-08 18:07:28 +00:00
2019-10-07 17:13:42 +00:00
def validation(self):
2019-10-08 18:07:28 +00:00
if is_valid_otp(
etcd_client,
self.auth_name.value,
self.auth_realm.value,
self.auth_token.value,
):
if is_valid_otp(
etcd_client, self.name.value, self.realm.value, self.token.value
2019-10-08 18:07:28 +00:00
):
_key = join(decouple.config("BASE_PREFIX"), self.name.value)
2019-10-07 17:13:42 +00:00
entry = etcd_client.get(_key, value_in_json=True)
if not entry:
self.add_error("No such Account Found")
else:
self.add_error("Invalid OTP Credentials")
else:
self.add_error("Invalid Auth Credentials")
2019-10-08 18:07:28 +00:00
class CreateOTPSchema(DataRequiredSchema):
2019-10-07 17:13:42 +00:00
def __init__(self, data: dict, fields=None):
self.name = Field("name", str, data.get("name", KeyError))
self.realm = Field("realm", Union[str, list], data.get("realm", KeyError))
self.admin_name = Field("admin_name", str, data.get("admin_name", KeyError))
self.admin_realm = Field("admin_realm", str, data.get("admin_realm", KeyError))
self.admin_token = Field("admin_token", str, data.get("admin_token", KeyError))
2019-10-08 18:07:28 +00:00
2019-10-07 17:13:42 +00:00
self.admin_realm.validation = self.admin_realm_validation
2019-10-08 18:07:28 +00:00
_fields = [
self.name,
self.realm,
self.admin_name,
self.admin_realm,
self.admin_token,
]
2019-10-07 17:13:42 +00:00
if fields:
_fields += fields
super().__init__(data=data, fields=_fields)
def admin_realm_validation(self):
if self.admin_realm.value != decouple.config("ADMIN_REALM"):
2019-10-08 18:07:28 +00:00
self.add_error(
"Admin must be from {} realm".format(decouple.config("ADMIN_REALM"))
2019-10-07 17:13:42 +00:00
)
2019-10-08 18:07:28 +00:00
2019-10-07 17:13:42 +00:00
def validation(self):
2019-10-08 18:07:28 +00:00
if is_valid_otp(
etcd_client,
self.admin_name.value,
self.admin_realm.value,
self.admin_token.value,
):
2019-10-07 17:13:42 +00:00
_key = join(decouple.config("BASE_PREFIX"), self.name.value)
2019-10-07 17:13:42 +00:00
if etcd_client.get(_key):
self.add_error("Account already exists")
else:
2019-10-08 18:07:28 +00:00
self.add_error("Invalid Admin OTP Credentials")
class DeleteOTPSchema(DataRequiredSchema):
def __init__(self, data: dict, fields=None):
self.name = Field("name", str, data.get("name", KeyError))
self.admin_name = Field("admin_name", str, data.get("admin_name", KeyError))
self.admin_realm = Field("admin_realm", str, data.get("admin_realm", KeyError))
self.admin_token = Field("admin_token", str, data.get("admin_token", KeyError))
2019-10-08 18:07:28 +00:00
self.admin_realm.validation = self.admin_realm_validation
_fields = [self.name, self.admin_name, self.admin_realm, self.admin_token]
2019-10-08 18:07:28 +00:00
if fields:
_fields += fields
super().__init__(data=data, fields=_fields)
def admin_realm_validation(self):
if self.admin_realm.value != decouple.config("ADMIN_REALM"):
self.add_field_errors(
"Admin must be from {} realm".format(decouple.config("ADMIN_REALM"))
2019-10-08 18:07:28 +00:00
)
def validation(self):
if is_valid_otp(
etcd_client,
self.admin_name.value,
self.admin_realm.value,
self.admin_token.value,
):
_key = join(decouple.config("BASE_PREFIX"), self.name.value)
2019-10-08 18:07:28 +00:00
if not etcd_client.get(_key):
self.add_error("Account does not exists")
else:
self.add_error("Invalid Admin OTP Credentials")
class ListAccountSchema(DataRequiredSchema):
def __init__(self, data: dict, fields=None):
data = data or {"": None}
self.admin_name = Field("admin_name", str, data.get("admin_name", KeyError))
self.admin_realm = Field("admin_realm", str, data.get("admin_realm", KeyError))
self.admin_token = Field("admin_token", str, data.get("admin_token", KeyError))
2019-10-08 18:07:28 +00:00
self.admin_realm.validation = self.admin_realm_validation
_fields = [self.admin_name, self.admin_realm, self.admin_token]
2019-10-08 18:07:28 +00:00
if fields:
_fields += fields
super().__init__(data=data, fields=_fields)
def admin_realm_validation(self):
if self.admin_realm.value != decouple.config("ADMIN_REALM"):
self.add_field_errors(
"Admin must be from {} realm".format(decouple.config("ADMIN_REALM"))
2019-10-08 18:07:28 +00:00
)
def validation(self):
if not is_valid_otp(
etcd_client,
self.admin_name.value,
self.admin_realm.value,
self.admin_token.value,
):
self.add_error("Invalid Admin OTP Credentials")