Delete/List Added, Unit Tests created

This commit is contained in:
ahmadbilalkhalid 2019-10-08 23:07:28 +05:00
parent 90f88cd456
commit ea0e0aeeb3
7 changed files with 562 additions and 137 deletions

2
.gitignore vendored
View file

@ -1,4 +1,4 @@
.vscode .vscode
__pycache__
.env .env
client.py client.py

View file

@ -4,6 +4,7 @@ url = "https://pypi.org/simple"
verify_ssl = true verify_ssl = true
[dev-packages] [dev-packages]
pep8 = "*"
[packages] [packages]
flask = "*" flask = "*"
@ -12,6 +13,7 @@ etcd3-wrapper-wip = {editable = true,git = "https://code.ungleich.ch/ungleich-pu
python-decouple = "*" python-decouple = "*"
requests = "*" requests = "*"
pyotp = "*" pyotp = "*"
pytest = "*"
[requires] [requires]
python_version = "3.5" python_version = "3.5"

92
Pipfile.lock generated
View file

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "140985247e5362ad0317970c161febd85cdd856163f9269cc38d802fb8b7d3f6" "sha256": "881d2dd7e14f980c6d76875c7ecc1949989d040129622e51cfe6849217ece4c0"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -23,6 +23,20 @@
], ],
"version": "==8.0.0" "version": "==8.0.0"
}, },
"atomicwrites": {
"hashes": [
"sha256:03472c30eb2c5d1ba9227e4c2ca66ab8287fbfbbda3888aa93dc2e28fc6811b4",
"sha256:75a9445bac02d8d058d5e1fe689654ba5a6556a1dfd8ce6ec55a0ed79866cfa6"
],
"version": "==1.3.0"
},
"attrs": {
"hashes": [
"sha256:ec20e7a4825331c1b5ebf261d111e16fa9612c1f7a5e1f884f12bd53a664dfd2",
"sha256:f913492e1663d3c36f502e5e9ba6cd13cf19d7fab50aa13239e420fef95e1396"
],
"version": "==19.2.0"
},
"certifi": { "certifi": {
"hashes": [ "hashes": [
"sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50", "sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50",
@ -115,6 +129,14 @@
], ],
"version": "==2.8" "version": "==2.8"
}, },
"importlib-metadata": {
"hashes": [
"sha256:aa18d7378b00b40847790e7c27e11673d7fed219354109d0e7b9e5b25dc3ad26",
"sha256:d5f18a79777f3aa179c145737780282e27b508fc8fd688cb17c7a813e8bd39af"
],
"markers": "python_version < '3.8'",
"version": "==0.23"
},
"itsdangerous": { "itsdangerous": {
"hashes": [ "hashes": [
"sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19", "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19",
@ -162,6 +184,27 @@
], ],
"version": "==1.1.1" "version": "==1.1.1"
}, },
"more-itertools": {
"hashes": [
"sha256:409cd48d4db7052af495b09dec721011634af3753ae1ef92d2b32f73a745f832",
"sha256:92b8c4b06dac4f0611c0729b2f2ede52b2e1bac1ab48f089c7ddc12e26bb60c4"
],
"version": "==7.2.0"
},
"packaging": {
"hashes": [
"sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47",
"sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108"
],
"version": "==19.2"
},
"pluggy": {
"hashes": [
"sha256:0db4b7601aae1d35b4a033282da476845aa19185c1e6964b25cf324b5e4ec3e6",
"sha256:fa5fa1622fa6dd5c030e9cad086fa19ef6a0cf6d7a2d12318e10cb49d6d68f34"
],
"version": "==0.13.0"
},
"protobuf": { "protobuf": {
"hashes": [ "hashes": [
"sha256:125713564d8cfed7610e52444c9769b8dcb0b55e25cc7841f2290ee7bc86636f", "sha256:125713564d8cfed7610e52444c9769b8dcb0b55e25cc7841f2290ee7bc86636f",
@ -183,6 +226,13 @@
], ],
"version": "==3.10.0" "version": "==3.10.0"
}, },
"py": {
"hashes": [
"sha256:64f65755aee5b381cea27766a3a147c3f15b9b6b9ac88676de66ba2ae36793fa",
"sha256:dc639b046a6e2cff5bbe40194ad65936d6ba360b52b3c3fe1d08a82dd50b5e53"
],
"version": "==1.8.0"
},
"pyotp": { "pyotp": {
"hashes": [ "hashes": [
"sha256:c88f37fd47541a580b744b42136f387cdad481b560ef410c0d85c957eb2a2bc0", "sha256:c88f37fd47541a580b744b42136f387cdad481b560ef410c0d85c957eb2a2bc0",
@ -191,6 +241,21 @@
"index": "pypi", "index": "pypi",
"version": "==2.3.0" "version": "==2.3.0"
}, },
"pyparsing": {
"hashes": [
"sha256:6f98a7b9397e206d78cc01df10131398f1c8b8510a2f4d97d9abd82e1aacdd80",
"sha256:d9338df12903bbf5d65a0e4e87c2161968b10d2e489652bb47001d82a9b028b4"
],
"version": "==2.4.2"
},
"pytest": {
"hashes": [
"sha256:7e4800063ccfc306a53c461442526c5571e1462f61583506ce97e4da6a1d88c8",
"sha256:ca563435f4941d0cb34767301c27bc65c510cb82e90b9ecf9cb52dc2c63caaa0"
],
"index": "pypi",
"version": "==5.2.1"
},
"python-decouple": { "python-decouple": {
"hashes": [ "hashes": [
"sha256:1317df14b43efee4337a4aa02914bf004f010cd56d6c4bd894e6474ec8c4fe2d" "sha256:1317df14b43efee4337a4aa02914bf004f010cd56d6c4bd894e6474ec8c4fe2d"
@ -234,13 +299,36 @@
], ],
"version": "==1.25.6" "version": "==1.25.6"
}, },
"wcwidth": {
"hashes": [
"sha256:3df37372226d6e63e1b1e1eda15c594bca98a22d33a23832a90998faa96bc65e",
"sha256:f4ebe71925af7b40a864553f761ed559b43544f8f71746c2d756c7fe788ade7c"
],
"version": "==0.1.7"
},
"werkzeug": { "werkzeug": {
"hashes": [ "hashes": [
"sha256:7280924747b5733b246fe23972186c6b348f9ae29724135a6dfc1e53cea433e7", "sha256:7280924747b5733b246fe23972186c6b348f9ae29724135a6dfc1e53cea433e7",
"sha256:e5f4a1f98b52b18a93da705a7458e55afb26f32bff83ff5d19189f92462d65c4" "sha256:e5f4a1f98b52b18a93da705a7458e55afb26f32bff83ff5d19189f92462d65c4"
], ],
"version": "==0.16.0" "version": "==0.16.0"
},
"zipp": {
"hashes": [
"sha256:3718b1cbcd963c7d4c5511a8240812904164b7f381b647143a89d3b98f9bcd8e",
"sha256:f06903e9f1f43b12d371004b4ac7b06ab39a44adc747266928ae6debfa7b3335"
],
"version": "==0.6.0"
} }
}, },
"develop": {} "develop": {
"pep8": {
"hashes": [
"sha256:b22cfae5db09833bb9bd7c8463b53e1a9c9b39f12e304a8d0bba729c501827ee",
"sha256:fe249b52e20498e59e0b5c5256aa52ee99fc295b26ec9eaa85776ffdb9fe6374"
],
"index": "pypi",
"version": "==1.7.1"
}
}
} }

148
app.py
View file

@ -1,32 +1,23 @@
import pyotp import pyotp
import decouple import decouple
import os
from os.path import join from os.path import join
from flask import Flask, request from flask import Flask, request
from flask_restful import Resource, Api from flask_restful import Resource, Api
from schemas import OTPSchema, CreateOTPSchema from schemas import (
OTPSchema,
CreateOTPSchema,
DeleteOTPSchema,
ListAccountSchema,
)
from config import etcd_client from config import etcd_client
from helper import is_valid_otp from helper import is_valid_otp, create_admin_if_dont_exists
app = Flask(__name__) app = Flask(__name__)
api = Api(app) api = Api(app)
def create_admin_if_dont_exists(etcd_client):
_key = join(
decouple.config('BASE_PREFIX'),
decouple.config('ADMIN_REALM'),
'admin',
)
if etcd_client.get(_key) is None:
print('admin does not exists!. So, creating one')
_value = {
'seed': pyotp.random_base32(),
}
etcd_client.put(_key, _value, value_in_json=True)
create_admin_if_dont_exists(etcd_client) create_admin_if_dont_exists(etcd_client)
@ -36,90 +27,79 @@ class Verify(Resource):
data = request.json data = request.json
schema = OTPSchema(data) schema = OTPSchema(data)
if schema.is_valid(): if schema.is_valid():
return {'message': 'Verified'}, 200 return {"message": "Verified"}, 200
else: else:
return schema.get_errors(), 400 return schema.get_errors(), 400
# try:
# name = data['name']
# realm = data['realm']
# token = data['token']
# auth_name = data['auth-name']
# auth_realm = data['auth-realm']
# auth_token = data['auth-token']
# except Exception:
# return {
# "message": "Your provided data is not correct."
# }
# else:
# if is_valid_otp(auth_name, auth_realm, auth_token):
# _key = join(decouple.config("BASE_PREFIX"), realm, name)
# entry = etcd_client.get(_key, value_in_json=True)
# if entry:
# totp = pyotp.TOTP(entry.value['seed'])
# try:
# is_token_valid = totp.verify(token)
# except:
# return {'message': 'Invalid Data'}, 400
# else:
# if is_token_valid:
# return {'message': 'Verified'}, 200
# else:
# return {'message': 'Invalid token'}, 400
# else:
# return {"message": "No such Account Found"}, 400
# else:
# return {'message': 'Invalid Auth Credentials'}, 400
class Create(Resource): class Create(Resource):
@staticmethod @staticmethod
def post(): def post():
data = request.json data = request.json
schema = CreateOTPSchema(data) schema = CreateOTPSchema(data)
if schema.is_valid(): if schema.is_valid():
_key = join(decouple.config('BASE_PREFIX'), data['realm'], data['name']) _key = join(
decouple.config("BASE_PREFIX"),
data["realm"],
data["name"],
)
if etcd_client.get(_key) is None: if etcd_client.get(_key) is None:
_value = { _value = {"seed": pyotp.random_base32()}
'seed': pyotp.random_base32(),
}
etcd_client.put(_key, _value, value_in_json=True) etcd_client.put(_key, _value, value_in_json=True)
return {'message': 'Account Created\n' return {
'name: {}, realm: {}, seed: {}'.format(data['name'],data['realm'], _value['seed'])} "message": "Account Created\n"
"name: {}, realm: {}, seed: {}".format(
data["name"], data["realm"], _value["seed"]
)
}
else: else:
return schema.get_errors() return schema.get_errors(), 400
# try:
# name = data['name']
# realm = data['realm'] class Delete(Resource):
# admin_name = data['admin-name'] @staticmethod
# admin_realm = data['admin-realm'] def post():
# admin_token = data['admin-token'] data = request.json
# except Exception:
# return {'message': 'Invalid Data'}, 400 schema = DeleteOTPSchema(data)
# else: if schema.is_valid():
# if admin_realm == decouple.config('ADMIN_REALM'): _key = join(
# if is_valid_otp(admin_name, admin_realm, admin_token): decouple.config("BASE_PREFIX"),
# _key = join(decouple.config('BASE_PREFIX'), realm, name) data["realm"],
data["name"],
)
etcd_client.client.delete(_key)
return {"message": "Account Deleted"}
else:
return schema.get_errors(), 400
class List(Resource):
@staticmethod
def get():
data = request.json
schema = ListAccountSchema(data)
if schema.is_valid():
result = etcd_client.get_prefix(
decouple.config("BASE_PREFIX"), value_in_json=True
)
r = {}
for entry in result:
_realm, _name = entry.key.split("/")[-2:]
r['{}/{}'.format(_realm, _name)] = entry.value['seed']
return r
else:
return schema.get_errors(), 400
# if etcd_client.get(_key) is None:
# _value = {
# 'seed': pyotp.random_base32(),
# }
# etcd_client.put(_key, _value, value_in_json=True)
# return {'message': 'Account Created\n'
# 'name: {}, realm: {}, seed: {}'.format(name, realm, _value['seed'])}
# else:
# return {'message': 'Account already exists'}, 400
# else:
# return {'message': 'Invalid Admin OTP Credentials'}
# else:
# return {'message': 'Admin must be from {} realm'.format(decouple.config('ADMIN_REALM'))}
api.add_resource(Verify, "/verify") api.add_resource(Verify, "/verify")
api.add_resource(Create, "/create") api.add_resource(Create, "/create")
api.add_resource(Delete, "/delete")
api.add_resource(List, "/list")
if __name__ == "__main__": if __name__ == "__main__":
app.run(debug=True) app.run(debug=True)

View file

@ -1,19 +1,30 @@
import pyotp import pyotp
import decouple
from decouple import config
from os.path import join from os.path import join
def is_valid_otp(etcd_client, name, realm, token): def is_valid_otp(etcd_client, name, realm, token):
_key = join(config("BASE_PREFIX"), realm, name) _key = join(decouple.config("BASE_PREFIX"), realm, name)
entry = etcd_client.get(_key, value_in_json=True) entry = etcd_client.get(_key, value_in_json=True)
if entry: if entry:
totp = pyotp.TOTP(entry.value['seed']) totp = pyotp.TOTP(entry.value["seed"])
try: try:
is_token_valid = totp.verify(token) is_token_valid = totp.verify(token)
except: except:
return False return False
else: else:
return is_token_valid return is_token_valid
return False return False
def create_admin_if_dont_exists(etcd_client):
_key = join(
decouple.config("BASE_PREFIX"),
decouple.config("ADMIN_REALM"),
"admin",
)
if etcd_client.get(_key) is None:
print("admin does not exists!. So, creating one")
_value = {"seed": pyotp.random_base32()}
etcd_client.put(_key, _value, value_in_json=True)

View file

@ -6,6 +6,8 @@ from os.path import join
from helper import is_valid_otp from helper import is_valid_otp
from config import etcd_client from config import etcd_client
class Field: class Field:
def __init__(self, _name, _type, _value=None): def __init__(self, _name, _type, _value=None):
self.name = _name self.name = _name
@ -18,10 +20,16 @@ class Field:
def is_valid(self): def is_valid(self):
if self.value == KeyError: if self.value == KeyError:
self.add_error("'{}' field is a required field".format(self.name)) self.add_error(
"'{}' field is a required field".format(self.name)
)
else: else:
if not isinstance(self.value, self.type): if not isinstance(self.value, self.type):
self.add_error("Incorrect Type for '{}' field".format(self.name)) self.add_error(
"Incorrect Type for '{}' field".format(
self.name
)
)
else: else:
self.validation() self.validation()
@ -35,6 +43,7 @@ class Field:
def add_error(self, error): def add_error(self, error):
self.__errors.append(error) self.__errors.append(error)
class BaseSchema: class BaseSchema:
def __init__(self, data, fields=None): def __init__(self, data, fields=None):
_ = data # suppress linter warning _ = data # suppress linter warning
@ -74,40 +83,75 @@ class BaseSchema:
def add_error(self, error): def add_error(self, error):
self.__errors.append(error) self.__errors.append(error)
class OTPSchema(BaseSchema):
class DataRequiredSchema(BaseSchema):
def __init__(self, data, fields=None):
if data is None:
self.add_error("No Data is provided.")
data = {}
super().__init__(data, fields=fields)
class OTPSchema(DataRequiredSchema):
def __init__(self, data: dict, fields=None): def __init__(self, data: dict, fields=None):
self.name = Field("name", str, data.get("name", KeyError)) self.name = Field("name", str, data.get("name", KeyError))
self.realm = Field("realm", str, data.get("realm", KeyError)) self.realm = Field(
self.token = Field("token", str, data.get("token", KeyError)) "realm", str, data.get("realm", KeyError)
)
self.token = Field(
"token", str, data.get("token", KeyError)
)
self.auth_name = Field("auth-name", str, self.auth_name = Field(
data.get("auth-name", KeyError)) "auth-name", str, data.get("auth-name", KeyError)
self.auth_realm = Field("auth-realm", str, )
data.get("auth-realm", KeyError)) self.auth_realm = Field(
self.auth_token = Field("auth-token", str, "auth-realm", str, data.get("auth-realm", KeyError)
data.get("auth-token", KeyError)) )
self.auth_token = Field(
"auth-token", str, data.get("auth-token", KeyError)
)
self.auth_realm.validation = self.auth_realm_validation self.auth_realm.validation = self.auth_realm_validation
_fields = [self.name, self.realm, self.token, _fields = [
self.auth_name, self.auth_realm, self.name,
self.auth_token] self.realm,
self.token,
self.auth_name,
self.auth_realm,
self.auth_token,
]
if fields: if fields:
_fields += fields _fields += fields
super().__init__(data=data, fields=_fields)
super().__init__(data=data, fields=_fields)
def auth_realm_validation(self): def auth_realm_validation(self):
if self.auth_realm.value != decouple.config("AUTH_REALM"): if self.auth_realm.value != decouple.config("AUTH_REALM"):
self.add_error( self.add_error(
"Authentication realm must be {}".format(decouple.config("AUTH_REALM")) "Authentication realm must be {}".format(
decouple.config("AUTH_REALM")
)
) )
def validation(self): def validation(self):
if is_valid_otp(etcd_client, self.auth_name.value, if is_valid_otp(
self.auth_realm.value, self.auth_token.value): etcd_client,
self.auth_name.value,
if is_valid_otp(etcd_client, self.name.value, self.auth_realm.value,
self.realm.value, self.token.value): self.auth_token.value,
_key = join(decouple.config("BASE_PREFIX"), ):
self.realm.value, self.name.value)
if is_valid_otp(
etcd_client,
self.name.value,
self.realm.value,
self.token.value,
):
_key = join(
decouple.config("BASE_PREFIX"),
self.realm.value,
self.name.value,
)
entry = etcd_client.get(_key, value_in_json=True) entry = etcd_client.get(_key, value_in_json=True)
if not entry: if not entry:
self.add_error("No such Account Found") self.add_error("No such Account Found")
@ -117,22 +161,87 @@ class OTPSchema(BaseSchema):
self.add_error("Invalid Auth Credentials") self.add_error("Invalid Auth Credentials")
class CreateOTPSchema(BaseSchema): class CreateOTPSchema(DataRequiredSchema):
def __init__(self, data: dict, fields=None): def __init__(self, data: dict, fields=None):
self.name = Field("name", str, data.get("name", KeyError)) self.name = Field("name", str, data.get("name", KeyError))
self.realm = Field("realm", str, data.get("realm", KeyError)) self.realm = Field(
"realm", str, data.get("realm", KeyError)
)
self.admin_name = Field(
"admin-name", str, data.get("admin-name", KeyError)
)
self.admin_realm = Field(
"admin-realm", str, data.get("admin-realm", KeyError)
)
self.admin_token = Field(
"admin-token", str, data.get("admin-token", KeyError)
)
self.admin_name = Field("admin-name", str,
data.get("admin-name", KeyError))
self.admin_realm = Field("admin-realm", str,
data.get("admin-realm", KeyError))
self.admin_token = Field("admin-token", str,
data.get("admin-token", KeyError))
self.admin_realm.validation = self.admin_realm_validation self.admin_realm.validation = self.admin_realm_validation
_fields = [self.name, self.realm, _fields = [
self.admin_name, self.admin_realm, self.name,
self.admin_token] self.realm,
self.admin_name,
self.admin_realm,
self.admin_token,
]
if fields:
_fields += fields
super().__init__(data=data, fields=_fields)
def admin_realm_validation(self):
if self.admin_realm.value != decouple.config("ADMIN_REALM"):
self.add_error(
"Admin must be from {} realm".format(
decouple.config("ADMIN_REALM")
)
)
def validation(self):
if is_valid_otp(
etcd_client,
self.admin_name.value,
self.admin_realm.value,
self.admin_token.value,
):
_key = join(
decouple.config("BASE_PREFIX"),
self.realm.value,
self.name.value,
)
if etcd_client.get(_key):
self.add_error("Account already exists")
else:
self.add_error("Invalid Admin OTP Credentials")
class DeleteOTPSchema(DataRequiredSchema):
def __init__(self, data: dict, fields=None):
self.name = Field("name", str, data.get("name", KeyError))
self.realm = Field(
"realm", str, data.get("realm", KeyError)
)
self.admin_name = Field(
"admin-name", str, data.get("admin-name", KeyError)
)
self.admin_realm = Field(
"admin-realm", str, data.get("admin-realm", KeyError)
)
self.admin_token = Field(
"admin-token", str, data.get("admin-token", KeyError)
)
self.admin_realm.validation = self.admin_realm_validation
_fields = [
self.name,
self.realm,
self.admin_name,
self.admin_realm,
self.admin_token,
]
if fields: if fields:
_fields += fields _fields += fields
super().__init__(data=data, fields=_fields) super().__init__(data=data, fields=_fields)
@ -140,15 +249,66 @@ class CreateOTPSchema(BaseSchema):
def admin_realm_validation(self): def admin_realm_validation(self):
if self.admin_realm.value != decouple.config("ADMIN_REALM"): if self.admin_realm.value != decouple.config("ADMIN_REALM"):
self.add_field_errors( self.add_field_errors(
'Admin must be from {} realm'.format(decouple.config('ADMIN_REALM')) "Admin must be from {} realm".format(
decouple.config("ADMIN_REALM")
)
) )
def validation(self):
if is_valid_otp(etcd_client, self.admin_name.value,
self.admin_realm.value, self.admin_token.value):
_key = join(decouple.config("BASE_PREFIX"), def validation(self):
self.realm.value, self.name.value) if is_valid_otp(
if etcd_client.get(_key): etcd_client,
self.add_error("Account already exists") self.admin_name.value,
self.admin_realm.value,
self.admin_token.value,
):
_key = join(
decouple.config("BASE_PREFIX"),
self.realm.value,
self.name.value,
)
if not etcd_client.get(_key):
self.add_error("Account does not exists")
else: else:
self.add_error("Invalid Admin OTP Credentials") self.add_error("Invalid Admin OTP Credentials")
class ListAccountSchema(DataRequiredSchema):
def __init__(self, data: dict, fields=None):
data = data or {'': None}
self.admin_name = Field(
"admin-name", str, data.get("admin-name", KeyError)
)
self.admin_realm = Field(
"admin-realm", str, data.get("admin-realm", KeyError)
)
self.admin_token = Field(
"admin-token", str, data.get("admin-token", KeyError)
)
self.admin_realm.validation = self.admin_realm_validation
_fields = [
self.admin_name,
self.admin_realm,
self.admin_token,
]
if fields:
_fields += fields
super().__init__(data=data, fields=_fields)
def admin_realm_validation(self):
if self.admin_realm.value != decouple.config("ADMIN_REALM"):
self.add_field_errors(
"Admin must be from {} realm".format(
decouple.config("ADMIN_REALM")
)
)
def validation(self):
if not is_valid_otp(
etcd_client,
self.admin_name.value,
self.admin_realm.value,
self.admin_token.value,
):
self.add_error("Invalid Admin OTP Credentials")

184
tests/test_uotp.py Normal file
View file

@ -0,0 +1,184 @@
import unittest
import sys
import os
import pytest
import pyotp
sys.path.append(
os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
)
)
import app
import config
class TestUOTP(unittest.TestCase):
app_client = None
admin_name = 'admin'
admin_seed = None
admin_realm = 'ungleich-admin'
auth_realm = 'ungleich-auth'
etcd_client = config.etcd_client
@classmethod
def setUpClass(cls):
app.app.config['TESTING'] = True
os.environ['BASE_PREFIX'] = '/test/uotp/'
os.environ['ADMIN_REALM'] = TestUOTP.admin_realm
os.environ['AUTH_REALM'] = TestUOTP.auth_realm
with app.app.test_client() as client:
with app.app.app_context():
app.create_admin_if_dont_exists(TestUOTP.etcd_client)
entry = TestUOTP.etcd_client.get(
os.path.join(
os.environ['BASE_PREFIX'],
os.environ['ADMIN_REALM'],
'admin'
), value_in_json=True
)
assert entry is not None
TestUOTP.app_client = client
TestUOTP.admin_seed = entry.value['seed']
return super().setUpClass()
@classmethod
def tearDownClass(cls):
TestUOTP.etcd_client.client.delete_prefix(os.environ['BASE_PREFIX'])
del os.environ['BASE_PREFIX']
del os.environ['ADMIN_REALM']
del os.environ['AUTH_REALM']
return super().tearDownClass()
def get_otp_list(self):
r = self.app_client.get('/list',
json={
'admin-name': self.admin_name,
'admin-realm': self.admin_realm,
'admin-token': pyotp.TOTP(self.admin_seed).now()
}
)
return r
def create_otp(self, _name, _realm, _admin_name, _admin_realm, _admin_seed):
r = self.app_client.post('/create',
json={
"name": _name,
"realm": _realm,
"admin-name": _admin_name,
"admin-realm": _admin_realm,
"admin-token": pyotp.TOTP(_admin_seed).now()
})
return r
def verify_otp(self, _name, _realm, _seed,
_auth_name, _auth_realm, _auth_seed):
r = self.app_client.get('/verify',
json={
"name": _name,
"realm": _realm,
"token": pyotp.TOTP(_seed).now(),
"auth-name": _auth_name,
"auth-realm": _auth_realm,
"auth-token": pyotp.TOTP(_auth_seed).now()
})
return r
def test_list(self):
r = self.get_otp_list()
self.assertEqual(r.status_code, 200)
self.assertIn(
'{}/{}'.format(self.admin_realm, self.admin_name),
r.json
)
def test_create(self):
_name = 'auth'
_realm = 'ungleich-auth'
# Test Successful case i.e Admin creating OTP Account
r = self.create_otp(_name, _realm,
self.admin_name, self.admin_realm,
self.admin_seed)
self.assertEqual(r.status_code, 200)
r = self.get_otp_list()
self.assertIn(
'{}/{}'.format(_realm, _name),
r.json
)
# Test Unsuccesful Creation i.e User from non-admin realm
# tries to create OTP account
# Get Auth Account
entry = self.etcd_client.get(
os.path.join(
os.environ['BASE_PREFIX'], _realm, _name
), value_in_json=True
)
_adversery_name = 'adversery'
_adversery_realm = 'ungleich-admin'
r = self.create_otp(_adversery_name, _adversery_realm,
_name, _realm, entry.value['seed'])
self.assertEqual(r.status_code, 400)
r = self.get_otp_list()
self.assertNotIn(
'{}/{}'.format(_adversery_realm, _adversery_name),
r.json
)
def test_verify(self):
_auth_name = 'verification-auth'
_auth_realm = 'ungleich-auth'
_auth_seed = None
r = self.create_otp(_auth_name, _auth_realm,
self.admin_name, self.admin_realm,
self.admin_seed)
self.assertEqual(r.status_code, 200)
entry = self.etcd_client.get(
os.path.join(
os.environ['BASE_PREFIX'],
_auth_realm,
_auth_name
),
value_in_json=True
)
self.assertIsNotNone(entry)
_auth_seed = entry.value['seed']
# This should work
r = self.verify_otp(self.admin_name, self.admin_realm,
self.admin_seed, _auth_name, _auth_realm,
_auth_seed)
self.assertEqual(r.status_code, 200)
# This should not work i.e should rerturn 400
# because the auth_seed is not correct
r = self.verify_otp(self.admin_name, self.admin_realm,
self.admin_seed, _auth_name,
_auth_realm, 'meowmeowmeow')
self.assertEqual(r.status_code, 400)
# This should not work i.e should rerturn 400
# because the auth user is not from ungleich-auth realm
r = self.verify_otp(self.admin_name, self.admin_realm,
self.admin_seed, self.admin_name,
self.admin_realm, self.admin_seed)
self.assertEqual(r.status_code, 400)
if __name__ == '__main__':
unittest.main()