ungleich-user/dal/views.py

695 lines
28 KiB
Python

# Imports from django
from django.shortcuts import render
from django.views.generic import View, FormView
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.http import HttpResponse
from django.core.validators import validate_email, ValidationError
from django.urls import reverse_lazy
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.mail import EmailMessage
from django.views.decorators.cache import cache_control
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import ResetToken
from .forms import LoginForm
from .ungleich_ldap import LdapManager
from decouple import config, Csv
from pyotp import TOTP
import logging
logger = logging.getLogger(__name__)
# Imports for the extra stuff not in django
from base64 import b64encode, b64decode
from datetime import datetime
from random import choice, randint
import string
import requests
import json
from django.conf import settings
from django.contrib.auth.mixins import LoginRequiredMixin
admin_seed = config('ADMIN_SEED')
admin_name = config('ADMIN_NAME')
admin_realm = config('ADMIN_REALM')
user_realm = config('USER_REALM')
otp_url = config('OTPSERVER')
def activate_account_link(base_url, user, pwd, firstname, lastname, email, epochutc):
tokengen = PasswordResetTokenGenerator()
pseudouser = PseudoUser()
token = tokengen.make_token(pseudouser)
buser = bytes(user, 'utf-8')
bpwd = bytes(pwd, 'utf-8')
bfirstname = bytes(firstname, 'utf-8')
blasttname = bytes(lastname, 'utf-8')
bemail = bytes(email, 'utf-8')
userpart = b64encode(buser)
pwdpart = b64encode(bpwd)
fnpart = b64encode(bfirstname)
lnpart = b64encode(blasttname)
mailpart = b64encode(bemail)
# create entry into the database
newdbentry = ResetToken(user=user, token=token, creation=epochutc)
newdbentry.save()
# set up the link
link = "{base_url}/activate/{user}/{pwd}/{fn}/{ln}/{mail}/{token}/".format(
base_url=base_url, user=userpart.decode('utf-8'),
pwd=pwdpart.decode('utf-8'),
fn=fnpart.decode('utf-8'),
ln=lnpart.decode('utf-8'),
mail=mailpart.decode('utf-8'),
token=token
)
return link
def clean_db():
"""Revoves outdated tokens"""
# cutoff time is set to 24h hours
# using utcnow() to have no headache with timezones
cutoff = int(datetime.utcnow().timestamp()) - (24*60*60)
# Get all tokens older than 24 hours
oldtokens = ResetToken.objects.all().filter(creation__lt=cutoff)
for token in oldtokens:
# delete all tokens older than 24 hours
token.delete()
return True
class Index(FormView):
template_name = "landing.html"
form_class = LoginForm
success_url = 'useroptions.html'
def form_valid(self, form):
username = form.cleaned_data.get('username')
password = form.cleaned_data.get('password')
user = authenticate(username=username, password=password)
if user is not None:
login(self.request, user)
return render(self.request, 'useroptions.html', { 'user': user } )
return render(self.request, 'loginfailed.html')
@cache_control(no_cache=True, must_revalidate=True, no_store=True)
def get(self, request, *args, **kwargs):
if self.request.user.is_authenticated:
return render(self.request, 'useroptions.html',
{ 'user': self.request.user.username } )
return super(Index, self).get(request, *args, **kwargs)
class Register(View):
def get(self, request):
return render(request, 'registeruser.html')
# Someone filled out the register page, do some basic checks and throw it at nameko
def post(self, request):
service = 'register an user'
urlname = 'register'
username = request.POST.get('username')
if username == "" or not username:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please supply a username.' } )
password1 = request.POST.get('password1')
password2 = request.POST.get('password2')
if password1 != password2:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': "Passwords don't match." } )
email = request.POST.get('email')
try:
validate_email(email)
except ValidationError:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': 'The supplied email address is invalid.' } )
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
if not firstname or not lastname:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': 'Please enter your firstname and lastname.' } )
# so nothing strange happens if there are escapable chars
pwd = r'%s' % password1
try:
creationtime = int(datetime.utcnow().timestamp())
base_url = "{0}://{1}".format(self.request.scheme,
self.request.get_host())
link = activate_account_link(base_url, username, pwd, firstname, lastname, email, creationtime)
email_from = settings.EMAIL_FROM_ADDRESS
to = ['%s <%s>' % (username, email)]
subject = 'Activate your ungleich account'.format(firstname)
body = 'You can activate your ungleich account account by clicking <a href="{link}">here</a>.' \
' You can also copy and paste the following link into the address bar of your browser and follow' \
' the link in order to activate your account.\n\n{link}'.format(link=link)
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
mail.send()
except Exception as e:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': e } )
return render(request, 'confirm_email.html')
class ChangeData(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
# provide the form for the change request
def get(self, request):
urlname = 'change_data'
service = 'get default data for logged in user'
user = request.user
ldap_manager = LdapManager()
user_exists, entries = ldap_manager.check_user_exists(
uid=user.username,
attributes=['uid', 'givenName', 'sn', 'mail'],
search_base=settings.ENTIRE_SEARCH_BASE
)
if user_exists:
return render(
request,
'changeuserdata.html',
{ 'user': user.username,
'firstname': entries[0].givenName
if entries[0].givenName.value is not None else '',
'lastname': entries[0].sn
if entries[0].sn.value is not None else '',
'email': entries[0].mail
if entries[0].mail.value is not None else ''}
)
else:
return render(request, 'error.html',
{'urlname': urlname, 'service': service,
'error': request.user.username})
# get the change request
def post(self, request):
# variables for the error page
service = 'change user data'
urlname = 'change_data'
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
email = request.POST.get('email')
# Some sanity checks for the supplied data
if firstname == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a firstname.' } )
elif lastname == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a lastname.' } )
elif email == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter an email.' } )
try:
validate_email(email)
except ValidationError:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } )
ldap_manager = LdapManager()
result, msg = ldap_manager.change_user_details(
uid=request.user.username,
details={"givenName": firstname, "sn": lastname, "mail": email}
)
# Data change worked
if result:
return render(request, 'changeddata.html', { 'user': request.user.username, 'firstname': firstname, 'lastname': lastname, 'email': email } )
# Data change did not work, display error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': msg } )
class ResetPassword(View):
def get(self, request):
return render(request, 'resetpassword.html')
def post(self, request):
urlname = 'reset_password'
service = 'send a password reset request'
user = request.POST.get('user')
# First, check if the user exists
ldap_manager = LdapManager()
user_exists, entries = ldap_manager.check_user_exists(
uid=user,
search_base=settings.ENTIRE_SEARCH_BASE,
attributes=['uid', 'givenName', 'sn', 'mail']
)
if user_exists:
# user exists, so try to get email
# with get_pool().next() as rpc:
# (state, tmp1, tmp2, email) = rpc.getuserdata.get_data(user)
# Either error with the datalookup or no email provided
email = entries[0].mail.value
if email is None:
return render(
request, 'error.html',
{'urlname': urlname, 'service': service,
'error': 'Unable to retrieve email address for user.'}
)
base_url = "{0}://{1}".format(self.request.scheme,
self.request.get_host())
# Try to send the email out
emailsend = self.email(user, email, base_url)
# Email got sent out
if emailsend == True:
return render(
request, 'send_resetrequest.html', {'user': user}
)
# Error while trying to send email
else:
return render(
request, 'error.html',
{'urlname': urlname, 'service': service,
'error': emailsend}
)
else:
return render(
request, 'error.html',
{ 'urlname': urlname, 'service': service,
'error': 'The user does not exist.' }
)
# Sends an email to the user with the 24h active link for a password reset
def email(self, user, email, base_url):
# getting epoch for the time now in UTC to spare us headache with timezones
creationtime = int(datetime.utcnow().timestamp())
# Construct the data for the email
email_from = settings.EMAIL_FROM_ADDRESS
to = ['%s <%s>' % (user, email)]
subject = 'Password reset request for %s' % user
link = self.build_reset_link(user, creationtime, base_url)
body = 'This is an automated email which was triggered by a reset request for the user %s. Please do not reply to this email.\n' % user
body += 'If you received this email in error, please disregard it. If you get multiple emails like this, please contact us to look into potential abuse.\n'
body += 'To reset your password, please follow the link below:\n'
body += '%s\n\n' % link
body += 'The link will remain active for 24 hours.\n'
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
try:
mail.send()
result = True
except:
result = "An error occurred while trying to send the mail."
return result
# Builds the reset link for the email and puts the token into the database
def build_reset_link(self, user, epochutc, base_url):
# set up the data
tokengen = PasswordResetTokenGenerator()
# create some noise for use in the tokengenerator
pseudouser = PseudoUser()
token = tokengen.make_token(pseudouser)
buser = bytes(user, 'utf-8')
userpart = b64encode(buser)
# create entry into the database
newdbentry = ResetToken(user=user, token=token, creation=epochutc)
newdbentry.save()
# set up the link
link = "{base_url}/reset/{user}/{token}/".format(
base_url=base_url,user=userpart.decode('utf-8'),token=token
)
logger.debug("User reset url is {}".format(link))
return link
# Catch the resetrequest URL and check it
class ResetRequest(View):
# Gets the URL with user in b64 and the token, and checks it
# Also cleans the database
def get(self, request, user=None, token=None):
# Cleans up outdated tokens
# If we expect quite a bit of old tokens, maybe somewhere else is better,
# but for now we don't really expect many unused tokens
clean_db()
# If user and token are not supplied by django, it was called from somewhere else, so it's
# invalid
if user == None or token == None:
return HttpResponse('Invalid URL.', status=404)
# extract user from b64 format
tmp_user = bytes(user, 'utf-8')
user = b64decode(tmp_user)
user_clean = user.decode('utf-8')
# set checks_out = True if token is found in database
checks_out = False
dbentries = ResetToken.objects.all().filter(user=user_clean)
for entry in dbentries:
if entry.token == token:
# found the token, now delete it since it's used
checks_out = True
entry.delete()
# No token was found
if not checks_out:
return HttpResponse('Invalid URL.', status=404)
# Token was found, supply the form
else:
return render(request, 'resetpasswordnew.html', { 'user': user_clean } )
# Gets the post form with the new password and sets it
def post(self, request):
service = 'reset the password'
# get the supplied passwords
password1 = request.POST.get("password1")
password2 = request.POST.get("password2")
# get the hidden value of user
user = request.POST.get("user")
# some checks over the supplied data
if user == "" or not user:
return render(request, 'error.html', { 'service': service, 'error': 'Something went wrong. Did you use the supplied form?' } )
if password1 == "" or not password1 or password2 == "" or not password2:
return render(request, 'error.html', { 'service': service, 'error': 'Please supply a password and confirm it.' } )
if password1 != password2:
return render(request, 'error.html', {'service': service, 'error': 'The supplied passwords do not match.'})
if len(password1) < 8:
return render(request, 'error.html', {'service': service,
'error': 'The password is too short, please use a longer one. At least 8 characters.'})
ldap_manager = LdapManager()
result = ldap_manager.change_password(
user,
password1
)
# password change successful
if result:
return render(request, 'changedpassword.html', { 'user': user } )
# Something went wrong while changing the password
else:
return render(request, 'error.html', { 'service': service, 'error': result } )
# The logged in user can change the password here
class ChangePassword(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
# Presents the page for a logged in user
def get(self, request):
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
return render(request, 'changepassword.html', { 'user': request.user } )
# Does some checks on the supplied data and changes the password
def post(self, request):
# Variables for the error page
urlname = 'change_password'
service = 'change the password'
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
login(request, request.user)
user = str(request.user)
oldpassword = request.POST.get('oldpassword')
check = authenticate(request, username=user, password=oldpassword)
# Is the right password for the user supplied?
if check is None:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for the user.' } )
password1 = request.POST.get('password1')
password2 = request.POST.get('password2')
# Are both passwords from the form the same?
if password1 != password2:
return render(request, 'error.html', { 'urlname': urlname, 'service': service,
'error': 'Please check if you typed the same password both times for the new password' } )
# Check for password length
if len(password1) < 8:
return render(request, 'error.html', { 'urlname': urlname, 'service': service,
'error': 'The password is too short, please use a longer one. At least 8 characters.' } )
from .ungleich_ldap import LdapManager
ldap_manager = LdapManager()
result = ldap_manager.change_password(
user,
password1
)
# Password was changed
if result:
logout(request)
return render(request, 'changedpassword.html', { 'user': user } )
# Password not changed, instead got some kind of error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } )
# Deletes an account
class DeleteAccount(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
# Show the basic form for deleting an account
def get(self, request):
return render(request, 'deleteaccount.html')
# Reads the filled out form
def post(self, request):
# Variables for error page
urlname = 'account_delete'
service = 'delete an account'
# Does the user exist?
username = request.POST.get('username')
ldap_manager = LdapManager()
user_exists, user_details = ldap_manager.check_user_exists(username)
if user_exists and request.user.username == username:
# Do user and password match?
password = request.POST.get('password')
pwd = r'%s' % password
check = authenticate(request, username=username, password=pwd)
if check is None:
return render(request, 'error.html',
{'urlname': urlname, 'service': service,
'error': 'Wrong password for user.'})
result = ldap_manager.delete_user(username)
# User deleted
if result:
logout(request)
return render(request, 'deleteduser.html', {'user': username})
# User not deleted, got some kind of error
else:
return render(request, 'error.html',
{'urlname': urlname, 'service': service,
'error': result})
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unknown user.' } )
# Log out the session
class LogOut(View):
def get(self, request):
logout(request)
return HttpResponse(
"You have been logged out. You will be redirected in 2 seconds."
"<script>"
"setTimeout(function (){document.location.href='/';}, 2000);"
"</script>",
status=200
)
# TO be clarified
# To trick the tokengenerator to work with us, because we don't really
# have the expected user Class since we are reading the user from a form
# We store the tokens and don't have to use the check function,
# some one time data works fine.
class LastLogin():
def replace(self, microsecond=0, tzinfo=None):
return randint(1,100000)
class PseudoUser():
# easiest way to handle the check for lastlogin
last_login = LastLogin()
# random alphanumeric strings for primary key and password, just used for token generation
pk = ''.join(choice(string.ascii_letters + string.digits) for _ in range(20))
password = ''.join(choice(string.ascii_letters + string.digits) for _ in range(30))
class ActivateAccount(View):
def get(self, request, user=None, pwd=None, firstname=None, lastname=None, email=None, token=None):
clean_db()
if token is None:
return HttpResponse('Invalid URL', status=404)
elem_list = [user, pwd, firstname, lastname, email]
clean_list = []
for value in elem_list:
try:
value_temp = bytes(value, 'utf-8')
value_decode = b64decode(value_temp)
value_clean = value_decode.decode('utf-8')
clean_list.append(value_clean)
except Exception as e:
return HttpResponse('Invalid URL', status=404)
checks_out = False
dbentries = ResetToken.objects.all().filter(user=clean_list[0])
for entry in dbentries:
if entry.token == token:
# found the token, now delete it since it's used
checks_out = True
entry.delete()
# No token was found
if not checks_out:
return HttpResponse('Invalid URL.', status=404)
# Token was found, create user
try:
ldap_manager = LdapManager()
ldap_manager.create_user(
clean_list[0], clean_list[1], clean_list[2], clean_list[3], clean_list[4]
)
req = requests.post(otp_url, data=json.dumps(
{
'auth_token': TOTP(admin_seed).now(),
'auth_name': admin_name,
'auth_realm': admin_realm,
'name': clean_list[0],
'realm': user_realm
}), headers={'Content-Type': 'application/json'})
if req.status_code != 201:
logger.error("User {} failed to create its otp seed".format(clean_list[0]))
#Send welcome email
except Exception as e:
return render(request, 'error.html', {'urlname': 'register',
'service': 'register an user',
'error': e})
return render(request, 'usercreated.html', { 'user': clean_list[0] } )
class UserCreateAPI(APIView):
def post(self, request):
username = request.POST.get('username')
email = request.POST.get('email')
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
if username == "" or not username:
return Response('Please supply a username.', 400)
try:
validate_email(email)
except ValidationError:
return Response('Email is not valid.', 400)
if not firstname or not lastname:
return Response('Please provide firstname and lastname', 400)
pwd = r'%s' % User.objects.make_random_password()
base_url = "{0}://{1}".format(self.request.scheme,
self.request.get_host())
creationtime = int(datetime.utcnow().timestamp())
link = activate_account_link(base_url, username, pwd, firstname, lastname, email, creationtime)
# Construct the data for the email
email_from = settings.EMAIL_FROM_ADDRESS
to = ['%s <%s>' % (username, email)]
subject = 'Ungleich account creation.'
body = 'A request has been sent to our servers to register you as a ungleich user.\n'
body += 'In order to complete the registration process you must ' \
'click <a href="{link}">here</a> or copy & paste the following link into the address bar of ' \
'your browser.\n{link}\n'.format(link=link)
body += 'Your credentials are:\n'
body += 'Username: %s\n\n' % username
body += 'Password: %s\n\n' % pwd
body += 'We strongly recommend after the activation to log in and change your password.\n'
body += 'This link will remain active for 24 hours.\n'
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
try:
mail.send()
except Exception as e:
return Response('Failed to send the email, please try again', 400)
return Response('An email with activation link has been sent in order to complete your registration.\n\
\nPlease check your inbox.', 200)
class SeedRetrieveCreate(APIView):
def post(self, request):
try:
username = request.data['username']
password = request.data[r'password']
realm = request.data['realm']
print(password)
except KeyError:
return Response('You need to specify username, password, and realm values', 400)
# authenticate the user against ldap
user = authenticate(username=username, password=password)
if user is not None:
req = requests.get(otp_url, data=json.dumps(
{
'auth_token': TOTP(admin_seed).now(),
'auth_name': admin_name,
'auth_realm': admin_realm}), headers={'Content-Type': 'application/json'})
response_data = json.loads(req.text)
for elem in response_data:
if elem['name'] == username and elem['realm'] == realm:
return Response(elem, 200)
# If doesn't find a match then check if the realm is allowed and create the user
allowed_realms = config('ALLOWED_REALMS', cast=Csv())
if realm not in allowed_realms:
return Response('Not allowed to perform this action.', 403)
else:
req = requests.post(otp_url, data=json.dumps(
{
'auth_token': TOTP(admin_seed).now(),
'auth_name': admin_name,
'auth_realm': admin_realm,
'name': username,
'realm': realm
}), headers={'Content-Type': 'application/json'})
if req.status_code == 201:
msg = json.loads(req.text)
return Response(msg, 201)
else:
return Response(json.loads(req.text), req.status_code)
else:
return Response('Invalid Credentials', 400)
class Seeds(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
def get(self, request):
seedlist = []
response = requests.get(
otp_url,
headers={'Content-Type': 'application/json'},
data=json.dumps(
{'auth_name': admin_name, 'auth_realm': admin_realm, 'auth_token': TOTP(admin_seed).now()}))
response_data = json.loads(response.text)
for i in range(len(response_data)):
if response_data[i]['name'] == request.user.username:
value = {'realm': response_data[i]['realm'], 'seed': response_data[i]['seed']}
seedlist.append(value)
return render(request, 'seed_list.html', {'seed': seedlist})