From ea0e0aeeb3f2bdefd97fa5cb24604af0a55b5878 Mon Sep 17 00:00:00 2001 From: meow Date: Tue, 8 Oct 2019 23:07:28 +0500 Subject: [PATCH] Delete/List Added, Unit Tests created --- .gitignore | 2 +- Pipfile | 2 + Pipfile.lock | 92 ++++++++++++++++- app.py | 148 ++++++++++++--------------- helper.py | 23 +++-- schemas.py | 248 +++++++++++++++++++++++++++++++++++++-------- tests/test_uotp.py | 184 +++++++++++++++++++++++++++++++++ 7 files changed, 562 insertions(+), 137 deletions(-) create mode 100644 tests/test_uotp.py diff --git a/.gitignore b/.gitignore index ff9c994..eb40727 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ .vscode - +__pycache__ .env client.py \ No newline at end of file diff --git a/Pipfile b/Pipfile index 40bcad3..6c10f2c 100644 --- a/Pipfile +++ b/Pipfile @@ -4,6 +4,7 @@ url = "https://pypi.org/simple" verify_ssl = true [dev-packages] +pep8 = "*" [packages] flask = "*" @@ -12,6 +13,7 @@ etcd3-wrapper-wip = {editable = true,git = "https://code.ungleich.ch/ungleich-pu python-decouple = "*" requests = "*" pyotp = "*" +pytest = "*" [requires] python_version = "3.5" diff --git a/Pipfile.lock b/Pipfile.lock index 8376b27..03576f7 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "140985247e5362ad0317970c161febd85cdd856163f9269cc38d802fb8b7d3f6" + "sha256": "881d2dd7e14f980c6d76875c7ecc1949989d040129622e51cfe6849217ece4c0" }, "pipfile-spec": 6, "requires": { @@ -23,6 +23,20 @@ ], "version": "==8.0.0" }, + "atomicwrites": { + "hashes": [ + "sha256:03472c30eb2c5d1ba9227e4c2ca66ab8287fbfbbda3888aa93dc2e28fc6811b4", + "sha256:75a9445bac02d8d058d5e1fe689654ba5a6556a1dfd8ce6ec55a0ed79866cfa6" + ], + "version": "==1.3.0" + }, + "attrs": { + "hashes": [ + "sha256:ec20e7a4825331c1b5ebf261d111e16fa9612c1f7a5e1f884f12bd53a664dfd2", + "sha256:f913492e1663d3c36f502e5e9ba6cd13cf19d7fab50aa13239e420fef95e1396" + ], + "version": "==19.2.0" + }, "certifi": { "hashes": [ "sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50", @@ -115,6 +129,14 @@ ], "version": "==2.8" }, + "importlib-metadata": { + "hashes": [ + "sha256:aa18d7378b00b40847790e7c27e11673d7fed219354109d0e7b9e5b25dc3ad26", + "sha256:d5f18a79777f3aa179c145737780282e27b508fc8fd688cb17c7a813e8bd39af" + ], + "markers": "python_version < '3.8'", + "version": "==0.23" + }, "itsdangerous": { "hashes": [ "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19", @@ -162,6 +184,27 @@ ], "version": "==1.1.1" }, + "more-itertools": { + "hashes": [ + "sha256:409cd48d4db7052af495b09dec721011634af3753ae1ef92d2b32f73a745f832", + "sha256:92b8c4b06dac4f0611c0729b2f2ede52b2e1bac1ab48f089c7ddc12e26bb60c4" + ], + "version": "==7.2.0" + }, + "packaging": { + "hashes": [ + "sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47", + "sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108" + ], + "version": "==19.2" + }, + "pluggy": { + "hashes": [ + "sha256:0db4b7601aae1d35b4a033282da476845aa19185c1e6964b25cf324b5e4ec3e6", + "sha256:fa5fa1622fa6dd5c030e9cad086fa19ef6a0cf6d7a2d12318e10cb49d6d68f34" + ], + "version": "==0.13.0" + }, "protobuf": { "hashes": [ "sha256:125713564d8cfed7610e52444c9769b8dcb0b55e25cc7841f2290ee7bc86636f", @@ -183,6 +226,13 @@ ], "version": "==3.10.0" }, + "py": { + "hashes": [ + "sha256:64f65755aee5b381cea27766a3a147c3f15b9b6b9ac88676de66ba2ae36793fa", + "sha256:dc639b046a6e2cff5bbe40194ad65936d6ba360b52b3c3fe1d08a82dd50b5e53" + ], + "version": "==1.8.0" + }, "pyotp": { "hashes": [ "sha256:c88f37fd47541a580b744b42136f387cdad481b560ef410c0d85c957eb2a2bc0", @@ -191,6 +241,21 @@ "index": "pypi", "version": "==2.3.0" }, + "pyparsing": { + "hashes": [ + "sha256:6f98a7b9397e206d78cc01df10131398f1c8b8510a2f4d97d9abd82e1aacdd80", + "sha256:d9338df12903bbf5d65a0e4e87c2161968b10d2e489652bb47001d82a9b028b4" + ], + "version": "==2.4.2" + }, + "pytest": { + "hashes": [ + "sha256:7e4800063ccfc306a53c461442526c5571e1462f61583506ce97e4da6a1d88c8", + "sha256:ca563435f4941d0cb34767301c27bc65c510cb82e90b9ecf9cb52dc2c63caaa0" + ], + "index": "pypi", + "version": "==5.2.1" + }, "python-decouple": { "hashes": [ "sha256:1317df14b43efee4337a4aa02914bf004f010cd56d6c4bd894e6474ec8c4fe2d" @@ -234,13 +299,36 @@ ], "version": "==1.25.6" }, + "wcwidth": { + "hashes": [ + "sha256:3df37372226d6e63e1b1e1eda15c594bca98a22d33a23832a90998faa96bc65e", + "sha256:f4ebe71925af7b40a864553f761ed559b43544f8f71746c2d756c7fe788ade7c" + ], + "version": "==0.1.7" + }, "werkzeug": { "hashes": [ "sha256:7280924747b5733b246fe23972186c6b348f9ae29724135a6dfc1e53cea433e7", "sha256:e5f4a1f98b52b18a93da705a7458e55afb26f32bff83ff5d19189f92462d65c4" ], "version": "==0.16.0" + }, + "zipp": { + "hashes": [ + "sha256:3718b1cbcd963c7d4c5511a8240812904164b7f381b647143a89d3b98f9bcd8e", + "sha256:f06903e9f1f43b12d371004b4ac7b06ab39a44adc747266928ae6debfa7b3335" + ], + "version": "==0.6.0" } }, - "develop": {} + "develop": { + "pep8": { + "hashes": [ + "sha256:b22cfae5db09833bb9bd7c8463b53e1a9c9b39f12e304a8d0bba729c501827ee", + "sha256:fe249b52e20498e59e0b5c5256aa52ee99fc295b26ec9eaa85776ffdb9fe6374" + ], + "index": "pypi", + "version": "==1.7.1" + } + } } diff --git a/app.py b/app.py index e06f224..81a9151 100644 --- a/app.py +++ b/app.py @@ -1,32 +1,23 @@ import pyotp import decouple +import os + from os.path import join from flask import Flask, request from flask_restful import Resource, Api -from schemas import OTPSchema, CreateOTPSchema +from schemas import ( + OTPSchema, + CreateOTPSchema, + DeleteOTPSchema, + ListAccountSchema, +) from config import etcd_client -from helper import is_valid_otp +from helper import is_valid_otp, create_admin_if_dont_exists app = Flask(__name__) api = Api(app) - -def create_admin_if_dont_exists(etcd_client): - _key = join( - decouple.config('BASE_PREFIX'), - decouple.config('ADMIN_REALM'), - 'admin', - ) - if etcd_client.get(_key) is None: - print('admin does not exists!. So, creating one') - _value = { - 'seed': pyotp.random_base32(), - } - etcd_client.put(_key, _value, value_in_json=True) - - - create_admin_if_dont_exists(etcd_client) @@ -36,90 +27,79 @@ class Verify(Resource): data = request.json schema = OTPSchema(data) if schema.is_valid(): - return {'message': 'Verified'}, 200 + return {"message": "Verified"}, 200 else: return schema.get_errors(), 400 - # try: - # name = data['name'] - # realm = data['realm'] - # token = data['token'] - - # auth_name = data['auth-name'] - # auth_realm = data['auth-realm'] - # auth_token = data['auth-token'] - # except Exception: - # return { - # "message": "Your provided data is not correct." - # } - # else: - # if is_valid_otp(auth_name, auth_realm, auth_token): - # _key = join(decouple.config("BASE_PREFIX"), realm, name) - # entry = etcd_client.get(_key, value_in_json=True) - # if entry: - # totp = pyotp.TOTP(entry.value['seed']) - # try: - # is_token_valid = totp.verify(token) - # except: - # return {'message': 'Invalid Data'}, 400 - # else: - # if is_token_valid: - # return {'message': 'Verified'}, 200 - # else: - # return {'message': 'Invalid token'}, 400 - # else: - # return {"message": "No such Account Found"}, 400 - # else: - # return {'message': 'Invalid Auth Credentials'}, 400 - class Create(Resource): @staticmethod def post(): data = request.json - + schema = CreateOTPSchema(data) if schema.is_valid(): - _key = join(decouple.config('BASE_PREFIX'), data['realm'], data['name']) + _key = join( + decouple.config("BASE_PREFIX"), + data["realm"], + data["name"], + ) if etcd_client.get(_key) is None: - _value = { - 'seed': pyotp.random_base32(), - } + _value = {"seed": pyotp.random_base32()} etcd_client.put(_key, _value, value_in_json=True) - return {'message': 'Account Created\n' - 'name: {}, realm: {}, seed: {}'.format(data['name'],data['realm'], _value['seed'])} + return { + "message": "Account Created\n" + "name: {}, realm: {}, seed: {}".format( + data["name"], data["realm"], _value["seed"] + ) + } else: - return schema.get_errors() - # try: - # name = data['name'] - # realm = data['realm'] - # admin_name = data['admin-name'] - # admin_realm = data['admin-realm'] - # admin_token = data['admin-token'] - # except Exception: - # return {'message': 'Invalid Data'}, 400 - # else: - # if admin_realm == decouple.config('ADMIN_REALM'): - # if is_valid_otp(admin_name, admin_realm, admin_token): - # _key = join(decouple.config('BASE_PREFIX'), realm, name) + return schema.get_errors(), 400 + + +class Delete(Resource): + @staticmethod + def post(): + data = request.json + + schema = DeleteOTPSchema(data) + if schema.is_valid(): + _key = join( + decouple.config("BASE_PREFIX"), + data["realm"], + data["name"], + ) + etcd_client.client.delete(_key) + + return {"message": "Account Deleted"} + else: + return schema.get_errors(), 400 + + +class List(Resource): + @staticmethod + def get(): + data = request.json + + schema = ListAccountSchema(data) + if schema.is_valid(): + result = etcd_client.get_prefix( + decouple.config("BASE_PREFIX"), value_in_json=True + ) + r = {} + for entry in result: + _realm, _name = entry.key.split("/")[-2:] + r['{}/{}'.format(_realm, _name)] = entry.value['seed'] + return r + else: + return schema.get_errors(), 400 - # if etcd_client.get(_key) is None: - # _value = { - # 'seed': pyotp.random_base32(), - # } - # etcd_client.put(_key, _value, value_in_json=True) - # return {'message': 'Account Created\n' - # 'name: {}, realm: {}, seed: {}'.format(name, realm, _value['seed'])} - # else: - # return {'message': 'Account already exists'}, 400 - # else: - # return {'message': 'Invalid Admin OTP Credentials'} - # else: - # return {'message': 'Admin must be from {} realm'.format(decouple.config('ADMIN_REALM'))} api.add_resource(Verify, "/verify") api.add_resource(Create, "/create") +api.add_resource(Delete, "/delete") +api.add_resource(List, "/list") if __name__ == "__main__": app.run(debug=True) diff --git a/helper.py b/helper.py index 56b9703..23e8b4d 100644 --- a/helper.py +++ b/helper.py @@ -1,19 +1,30 @@ import pyotp - -from decouple import config +import decouple from os.path import join def is_valid_otp(etcd_client, name, realm, token): - _key = join(config("BASE_PREFIX"), realm, name) + _key = join(decouple.config("BASE_PREFIX"), realm, name) entry = etcd_client.get(_key, value_in_json=True) if entry: - totp = pyotp.TOTP(entry.value['seed']) + totp = pyotp.TOTP(entry.value["seed"]) try: is_token_valid = totp.verify(token) except: return False else: return is_token_valid - - return False \ No newline at end of file + + return False + + +def create_admin_if_dont_exists(etcd_client): + _key = join( + decouple.config("BASE_PREFIX"), + decouple.config("ADMIN_REALM"), + "admin", + ) + if etcd_client.get(_key) is None: + print("admin does not exists!. So, creating one") + _value = {"seed": pyotp.random_base32()} + etcd_client.put(_key, _value, value_in_json=True) diff --git a/schemas.py b/schemas.py index 223e1f8..0ea0bb6 100644 --- a/schemas.py +++ b/schemas.py @@ -6,6 +6,8 @@ from os.path import join from helper import is_valid_otp from config import etcd_client + + class Field: def __init__(self, _name, _type, _value=None): self.name = _name @@ -18,10 +20,16 @@ class Field: def is_valid(self): if self.value == KeyError: - self.add_error("'{}' field is a required field".format(self.name)) + self.add_error( + "'{}' field is a required field".format(self.name) + ) else: if not isinstance(self.value, self.type): - self.add_error("Incorrect Type for '{}' field".format(self.name)) + self.add_error( + "Incorrect Type for '{}' field".format( + self.name + ) + ) else: self.validation() @@ -35,6 +43,7 @@ class Field: def add_error(self, error): self.__errors.append(error) + class BaseSchema: def __init__(self, data, fields=None): _ = data # suppress linter warning @@ -74,40 +83,75 @@ class BaseSchema: def add_error(self, error): self.__errors.append(error) -class OTPSchema(BaseSchema): + +class DataRequiredSchema(BaseSchema): + def __init__(self, data, fields=None): + if data is None: + self.add_error("No Data is provided.") + data = {} + super().__init__(data, fields=fields) + + +class OTPSchema(DataRequiredSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) - self.realm = Field("realm", str, data.get("realm", KeyError)) - self.token = Field("token", str, data.get("token", KeyError)) + self.realm = Field( + "realm", str, data.get("realm", KeyError) + ) + self.token = Field( + "token", str, data.get("token", KeyError) + ) - self.auth_name = Field("auth-name", str, - data.get("auth-name", KeyError)) - self.auth_realm = Field("auth-realm", str, - data.get("auth-realm", KeyError)) - self.auth_token = Field("auth-token", str, - data.get("auth-token", KeyError)) + self.auth_name = Field( + "auth-name", str, data.get("auth-name", KeyError) + ) + self.auth_realm = Field( + "auth-realm", str, data.get("auth-realm", KeyError) + ) + self.auth_token = Field( + "auth-token", str, data.get("auth-token", KeyError) + ) self.auth_realm.validation = self.auth_realm_validation - _fields = [self.name, self.realm, self.token, - self.auth_name, self.auth_realm, - self.auth_token] + _fields = [ + self.name, + self.realm, + self.token, + self.auth_name, + self.auth_realm, + self.auth_token, + ] if fields: _fields += fields - super().__init__(data=data, fields=_fields) + super().__init__(data=data, fields=_fields) def auth_realm_validation(self): if self.auth_realm.value != decouple.config("AUTH_REALM"): self.add_error( - "Authentication realm must be {}".format(decouple.config("AUTH_REALM")) + "Authentication realm must be {}".format( + decouple.config("AUTH_REALM") + ) ) + def validation(self): - if is_valid_otp(etcd_client, self.auth_name.value, - self.auth_realm.value, self.auth_token.value): - - if is_valid_otp(etcd_client, self.name.value, - self.realm.value, self.token.value): - _key = join(decouple.config("BASE_PREFIX"), - self.realm.value, self.name.value) + if is_valid_otp( + etcd_client, + self.auth_name.value, + self.auth_realm.value, + self.auth_token.value, + ): + + if is_valid_otp( + etcd_client, + self.name.value, + self.realm.value, + self.token.value, + ): + _key = join( + decouple.config("BASE_PREFIX"), + self.realm.value, + self.name.value, + ) entry = etcd_client.get(_key, value_in_json=True) if not entry: self.add_error("No such Account Found") @@ -117,22 +161,87 @@ class OTPSchema(BaseSchema): self.add_error("Invalid Auth Credentials") -class CreateOTPSchema(BaseSchema): +class CreateOTPSchema(DataRequiredSchema): def __init__(self, data: dict, fields=None): self.name = Field("name", str, data.get("name", KeyError)) - self.realm = Field("realm", str, data.get("realm", KeyError)) + self.realm = Field( + "realm", str, data.get("realm", KeyError) + ) + + self.admin_name = Field( + "admin-name", str, data.get("admin-name", KeyError) + ) + self.admin_realm = Field( + "admin-realm", str, data.get("admin-realm", KeyError) + ) + self.admin_token = Field( + "admin-token", str, data.get("admin-token", KeyError) + ) - self.admin_name = Field("admin-name", str, - data.get("admin-name", KeyError)) - self.admin_realm = Field("admin-realm", str, - data.get("admin-realm", KeyError)) - self.admin_token = Field("admin-token", str, - data.get("admin-token", KeyError)) - self.admin_realm.validation = self.admin_realm_validation - _fields = [self.name, self.realm, - self.admin_name, self.admin_realm, - self.admin_token] + _fields = [ + self.name, + self.realm, + self.admin_name, + self.admin_realm, + self.admin_token, + ] + if fields: + _fields += fields + super().__init__(data=data, fields=_fields) + + def admin_realm_validation(self): + if self.admin_realm.value != decouple.config("ADMIN_REALM"): + self.add_error( + "Admin must be from {} realm".format( + decouple.config("ADMIN_REALM") + ) + ) + + def validation(self): + if is_valid_otp( + etcd_client, + self.admin_name.value, + self.admin_realm.value, + self.admin_token.value, + ): + + _key = join( + decouple.config("BASE_PREFIX"), + self.realm.value, + self.name.value, + ) + if etcd_client.get(_key): + self.add_error("Account already exists") + else: + self.add_error("Invalid Admin OTP Credentials") + + +class DeleteOTPSchema(DataRequiredSchema): + def __init__(self, data: dict, fields=None): + self.name = Field("name", str, data.get("name", KeyError)) + self.realm = Field( + "realm", str, data.get("realm", KeyError) + ) + + self.admin_name = Field( + "admin-name", str, data.get("admin-name", KeyError) + ) + self.admin_realm = Field( + "admin-realm", str, data.get("admin-realm", KeyError) + ) + self.admin_token = Field( + "admin-token", str, data.get("admin-token", KeyError) + ) + + self.admin_realm.validation = self.admin_realm_validation + _fields = [ + self.name, + self.realm, + self.admin_name, + self.admin_realm, + self.admin_token, + ] if fields: _fields += fields super().__init__(data=data, fields=_fields) @@ -140,15 +249,66 @@ class CreateOTPSchema(BaseSchema): def admin_realm_validation(self): if self.admin_realm.value != decouple.config("ADMIN_REALM"): self.add_field_errors( - 'Admin must be from {} realm'.format(decouple.config('ADMIN_REALM')) + "Admin must be from {} realm".format( + decouple.config("ADMIN_REALM") + ) ) - def validation(self): - if is_valid_otp(etcd_client, self.admin_name.value, - self.admin_realm.value, self.admin_token.value): - _key = join(decouple.config("BASE_PREFIX"), - self.realm.value, self.name.value) - if etcd_client.get(_key): - self.add_error("Account already exists") + def validation(self): + if is_valid_otp( + etcd_client, + self.admin_name.value, + self.admin_realm.value, + self.admin_token.value, + ): + + _key = join( + decouple.config("BASE_PREFIX"), + self.realm.value, + self.name.value, + ) + if not etcd_client.get(_key): + self.add_error("Account does not exists") else: - self.add_error("Invalid Admin OTP Credentials") \ No newline at end of file + self.add_error("Invalid Admin OTP Credentials") + + +class ListAccountSchema(DataRequiredSchema): + def __init__(self, data: dict, fields=None): + data = data or {'': None} + self.admin_name = Field( + "admin-name", str, data.get("admin-name", KeyError) + ) + self.admin_realm = Field( + "admin-realm", str, data.get("admin-realm", KeyError) + ) + self.admin_token = Field( + "admin-token", str, data.get("admin-token", KeyError) + ) + + self.admin_realm.validation = self.admin_realm_validation + _fields = [ + self.admin_name, + self.admin_realm, + self.admin_token, + ] + if fields: + _fields += fields + super().__init__(data=data, fields=_fields) + + def admin_realm_validation(self): + if self.admin_realm.value != decouple.config("ADMIN_REALM"): + self.add_field_errors( + "Admin must be from {} realm".format( + decouple.config("ADMIN_REALM") + ) + ) + + def validation(self): + if not is_valid_otp( + etcd_client, + self.admin_name.value, + self.admin_realm.value, + self.admin_token.value, + ): + self.add_error("Invalid Admin OTP Credentials") diff --git a/tests/test_uotp.py b/tests/test_uotp.py new file mode 100644 index 0000000..f2ae80c --- /dev/null +++ b/tests/test_uotp.py @@ -0,0 +1,184 @@ +import unittest +import sys +import os +import pytest +import pyotp + +sys.path.append( + os.path.dirname( + os.path.dirname(os.path.abspath(__file__)) + ) +) +import app +import config + + +class TestUOTP(unittest.TestCase): + app_client = None + admin_name = 'admin' + admin_seed = None + admin_realm = 'ungleich-admin' + auth_realm = 'ungleich-auth' + etcd_client = config.etcd_client + + @classmethod + def setUpClass(cls): + app.app.config['TESTING'] = True + os.environ['BASE_PREFIX'] = '/test/uotp/' + os.environ['ADMIN_REALM'] = TestUOTP.admin_realm + os.environ['AUTH_REALM'] = TestUOTP.auth_realm + + with app.app.test_client() as client: + with app.app.app_context(): + app.create_admin_if_dont_exists(TestUOTP.etcd_client) + + entry = TestUOTP.etcd_client.get( + os.path.join( + os.environ['BASE_PREFIX'], + os.environ['ADMIN_REALM'], + 'admin' + ), value_in_json=True + ) + + assert entry is not None + + TestUOTP.app_client = client + TestUOTP.admin_seed = entry.value['seed'] + + return super().setUpClass() + + @classmethod + def tearDownClass(cls): + TestUOTP.etcd_client.client.delete_prefix(os.environ['BASE_PREFIX']) + del os.environ['BASE_PREFIX'] + del os.environ['ADMIN_REALM'] + del os.environ['AUTH_REALM'] + return super().tearDownClass() + + def get_otp_list(self): + r = self.app_client.get('/list', + json={ + 'admin-name': self.admin_name, + 'admin-realm': self.admin_realm, + 'admin-token': pyotp.TOTP(self.admin_seed).now() + } + ) + return r + + def create_otp(self, _name, _realm, _admin_name, _admin_realm, _admin_seed): + r = self.app_client.post('/create', + json={ + "name": _name, + "realm": _realm, + "admin-name": _admin_name, + "admin-realm": _admin_realm, + "admin-token": pyotp.TOTP(_admin_seed).now() + }) + return r + + def verify_otp(self, _name, _realm, _seed, + _auth_name, _auth_realm, _auth_seed): + r = self.app_client.get('/verify', + json={ + "name": _name, + "realm": _realm, + "token": pyotp.TOTP(_seed).now(), + "auth-name": _auth_name, + "auth-realm": _auth_realm, + "auth-token": pyotp.TOTP(_auth_seed).now() + }) + return r + + def test_list(self): + r = self.get_otp_list() + self.assertEqual(r.status_code, 200) + self.assertIn( + '{}/{}'.format(self.admin_realm, self.admin_name), + r.json + ) + + def test_create(self): + _name = 'auth' + _realm = 'ungleich-auth' + + # Test Successful case i.e Admin creating OTP Account + r = self.create_otp(_name, _realm, + self.admin_name, self.admin_realm, + self.admin_seed) + + self.assertEqual(r.status_code, 200) + + r = self.get_otp_list() + self.assertIn( + '{}/{}'.format(_realm, _name), + r.json + ) + + # Test Unsuccesful Creation i.e User from non-admin realm + # tries to create OTP account + + # Get Auth Account + entry = self.etcd_client.get( + os.path.join( + os.environ['BASE_PREFIX'], _realm, _name + ), value_in_json=True + ) + _adversery_name = 'adversery' + _adversery_realm = 'ungleich-admin' + + r = self.create_otp(_adversery_name, _adversery_realm, + _name, _realm, entry.value['seed']) + self.assertEqual(r.status_code, 400) + + r = self.get_otp_list() + self.assertNotIn( + '{}/{}'.format(_adversery_realm, _adversery_name), + r.json + ) + + def test_verify(self): + _auth_name = 'verification-auth' + _auth_realm = 'ungleich-auth' + _auth_seed = None + + r = self.create_otp(_auth_name, _auth_realm, + self.admin_name, self.admin_realm, + self.admin_seed) + + self.assertEqual(r.status_code, 200) + entry = self.etcd_client.get( + os.path.join( + os.environ['BASE_PREFIX'], + _auth_realm, + _auth_name + ), + value_in_json=True + ) + self.assertIsNotNone(entry) + _auth_seed = entry.value['seed'] + + # This should work + r = self.verify_otp(self.admin_name, self.admin_realm, + self.admin_seed, _auth_name, _auth_realm, + _auth_seed) + self.assertEqual(r.status_code, 200) + + # This should not work i.e should rerturn 400 + # because the auth_seed is not correct + r = self.verify_otp(self.admin_name, self.admin_realm, + self.admin_seed, _auth_name, + _auth_realm, 'meowmeowmeow') + self.assertEqual(r.status_code, 400) + + # This should not work i.e should rerturn 400 + # because the auth user is not from ungleich-auth realm + r = self.verify_otp(self.admin_name, self.admin_realm, + self.admin_seed, self.admin_name, + self.admin_realm, self.admin_seed) + self.assertEqual(r.status_code, 400) + + + + +if __name__ == '__main__': + unittest.main()