87 lines
3 KiB
Python
87 lines
3 KiB
Python
import logging
|
|
import pyotp
|
|
import otpauth
|
|
from rest_framework import serializers, exceptions
|
|
from otpauth.models import OTPSeed
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# For accessing / modifying the data -- currently unused
|
|
class OTPSerializer(serializers.ModelSerializer):
|
|
class Meta:
|
|
model = OTPSeed
|
|
fields = ('name', 'realm', 'seed')
|
|
read_only_fields = ('seed',)
|
|
|
|
def create(self, validated_data):
|
|
validated_data['seed'] = pyotp.random_base32()
|
|
return OTPSeed.objects.create(**validated_data)
|
|
|
|
class TokenSerializer(serializers.Serializer):
|
|
""" This class is mainly / only used for authentication"""
|
|
|
|
auth_name = serializers.CharField(max_length=128)
|
|
auth_token = serializers.CharField(max_length=128)
|
|
auth_realm = serializers.CharField(max_length=128)
|
|
|
|
token_name = 'auth_token'
|
|
name_name = 'auth_name'
|
|
realm_name = 'auth_realm'
|
|
|
|
def save(self):
|
|
auth_token = self.validated_data.get(self.token_name)
|
|
auth_name = self.validated_data.get(self.name_name)
|
|
auth_realm = self.validated_data.get(self.realm_name)
|
|
|
|
# only 2 special realms can login
|
|
# if not auth_realm in ["ungleich-admin", "ungleich-auth" ]:
|
|
# logger.error("Auth-realm is neither ungleich-admin "
|
|
# "nor ungleich-auth".format()
|
|
# )
|
|
# raise exceptions.AuthenticationFailed()
|
|
|
|
logger.debug("auth: [{}]{}@'{}' {} + {})".format(
|
|
self.name_name, auth_name, auth_realm,
|
|
auth_token, self.validated_data
|
|
))
|
|
|
|
# 1. Verify that the connection might authenticate
|
|
try:
|
|
logger.debug("Checking in db for name:{} & realm:{}".format(
|
|
auth_name, auth_realm
|
|
))
|
|
db_instance = otpauth.models.OTPSeed.objects.get(name=auth_name, realm=auth_realm)
|
|
except (OTPSeed.MultipleObjectsReturned, OTPSeed.DoesNotExist):
|
|
logger.error("OTPSeed name: {}, realm: {} does not exist".format(
|
|
auth_name, auth_realm
|
|
))
|
|
raise exceptions.AuthenticationFailed()
|
|
logger.debug("Found seed: {}".format(db_instance.seed))
|
|
totp = pyotp.TOTP(db_instance.seed)
|
|
logger.debug("calculated token = {}".format(totp.now()))
|
|
|
|
if not totp.verify(auth_token, valid_window=3):
|
|
logger.error("totp not verified")
|
|
raise exceptions.AuthenticationFailed()
|
|
|
|
return (db_instance, auth_token)
|
|
|
|
# For verifying somebody else's token
|
|
class VerifySerializer(TokenSerializer):
|
|
name = serializers.CharField(max_length=128)
|
|
token = serializers.CharField(max_length=128)
|
|
realm = serializers.CharField(max_length=128)
|
|
|
|
token_name = 'token'
|
|
name_name = 'name'
|
|
realm_name = 'realm'
|
|
|
|
def save(self):
|
|
auth_realm = self.validated_data.get("auth_realm")
|
|
|
|
if not auth_realm == "ungleich-auth":
|
|
logger.error("Auth-realm is not ungleich-auth")
|
|
raise exceptions.AuthenticationFailed()
|
|
|
|
# Do the authentication part
|
|
super().save()
|