ungleich-user/dal/views.py

553 lines
22 KiB
Python

# Imports from django
from django.shortcuts import render
from django.views.generic import View, FormView
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.http import HttpResponse
from django.core.validators import validate_email, ValidationError
from django.urls import reverse_lazy
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.mail import EmailMessage
from django.views.decorators.cache import cache_control
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import ResetToken
from .forms import LoginForm
from .ungleich_ldap import LdapManager
import logging
import re
logger = logging.getLogger(__name__)
# Imports for the extra stuff not in django
from base64 import b64encode, b64decode
from datetime import datetime
from random import choice, randint
import string
from django.conf import settings
from django.contrib.auth.mixins import LoginRequiredMixin
# Small helper used for registration.
def is_username_valid(username):
return re.fullmatch(r"^[a-z|0-9|\-|_]+$", username)
class Index(FormView):
template_name = "landing.html"
form_class = LoginForm
success_url = 'useroptions.html'
def form_valid(self, form):
username = form.cleaned_data.get('username')
password = form.cleaned_data.get('password')
user = authenticate(username=username, password=password)
if user is not None:
login(self.request, user)
return render(self.request, 'useroptions.html', { 'user': user } )
return render(self.request, 'loginfailed.html')
@cache_control(no_cache=True, must_revalidate=True, no_store=True)
def get(self, request, *args, **kwargs):
if self.request.user.is_authenticated:
return render(self.request, 'useroptions.html',
{ 'user': self.request.user.username } )
return super(Index, self).get(request, *args, **kwargs)
class Register(View):
def get(self, request):
return render(request, 'registeruser.html')
# Someone filled out the register page, do some basic checks and throw it at nameko
def post(self, request):
service = 'register an user'
urlname = 'register'
username = request.POST.get('username')
if username == "" or not username:
return render(request, 'error.html', {
'urlname': urlname,
'service': service,
'error': 'Please supply a username.' } )
if not is_username_valid(username):
return render(request, 'error.html', {
'urlname': urlname,
'service': service,
'error': 'You can only use lowercase letters, numbers, underscores and the dash character in your username.' } )
password1 = request.POST.get('password1')
password2 = request.POST.get('password2')
if password1 != password2:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': "Passwords don't match." } )
email = request.POST.get('email')
try:
validate_email(email)
except ValidationError:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': 'The supplied email address is invalid.' } )
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
if not firstname or not lastname:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': 'Please enter your firstname and lastname.' } )
# so nothing strange happens if there are escapable chars
pwd = r'%s' % password1
try:
ldap_manager = LdapManager()
ldap_manager.create_user(
username, pwd, firstname, lastname, email
)
except Exception as e:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': e } )
# Finally, we send the send user credentials via email
creationtime = int(datetime.utcnow().timestamp())
# Construct the data for the email
email_from = settings.EMAIL_FROM_ADDRESS
to = ['%s <%s>' % (username, email)]
subject = '{}, Welcome to datacenterlight'.format(firstname)
body = 'The username {} was successfully created.\n'.format(username)
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
try:
mail.send()
except Exception as e:
print(e)
pass
return render(request, 'usercreated.html', { 'user': username } )
class ChangeData(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
# provide the form for the change request
def get(self, request):
urlname = 'change_data'
service = 'get default data for logged in user'
user = request.user
ldap_manager = LdapManager()
user_exists, entries = ldap_manager.check_user_exists(
uid=user.username,
attributes=['uid', 'givenName', 'sn', 'mail'],
search_base=settings.ENTIRE_SEARCH_BASE
)
if user_exists:
return render(
request,
'changeuserdata.html',
{ 'user': user.username,
'firstname': entries[0].givenName
if entries[0].givenName.value is not None else '',
'lastname': entries[0].sn
if entries[0].sn.value is not None else '',
'email': entries[0].mail
if entries[0].mail.value is not None else ''}
)
else:
return render(request, 'error.html',
{'urlname': urlname, 'service': service,
'error': request.user.username})
# get the change request
def post(self, request):
# variables for the error page
service = 'change user data'
urlname = 'change_data'
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
email = request.POST.get('email')
# Some sanity checks for the supplied data
if firstname == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a firstname.' } )
elif lastname == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a lastname.' } )
elif email == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter an email.' } )
try:
validate_email(email)
except ValidationError:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } )
ldap_manager = LdapManager()
result, msg = ldap_manager.change_user_details(
uid=request.user.username,
details={"givenName": firstname, "sn": lastname, "mail": email}
)
# Data change worked
if result:
return render(request, 'changeddata.html', { 'user': request.user.username, 'firstname': firstname, 'lastname': lastname, 'email': email } )
# Data change did not work, display error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': msg } )
class ResetPassword(View):
def get(self, request):
return render(request, 'resetpassword.html')
def post(self, request):
urlname = 'reset_password'
service = 'send a password reset request'
user = request.POST.get('user')
# First, check if the user exists
ldap_manager = LdapManager()
user_exists, entries = ldap_manager.check_user_exists(
uid=user,
search_base=settings.ENTIRE_SEARCH_BASE,
attributes=['uid', 'givenName', 'sn', 'mail']
)
if user_exists:
# user exists, so try to get email
# with get_pool().next() as rpc:
# (state, tmp1, tmp2, email) = rpc.getuserdata.get_data(user)
# Either error with the datalookup or no email provided
email = entries[0].mail.value
if email is None:
return render(
request, 'error.html',
{'urlname': urlname, 'service': service,
'error': 'Unable to retrieve email address for user.'}
)
base_url = "{0}://{1}".format(self.request.scheme,
self.request.get_host())
# Try to send the email out
emailsend = self.email(user, email, base_url)
# Email got sent out
if emailsend == True:
return render(
request, 'send_resetrequest.html', {'user': user}
)
# Error while trying to send email
else:
return render(
request, 'error.html',
{'urlname': urlname, 'service': service,
'error': emailsend}
)
else:
return render(
request, 'error.html',
{ 'urlname': urlname, 'service': service,
'error': 'The user does not exist.' }
)
# Sends an email to the user with the 24h active link for a password reset
def email(self, user, email, base_url):
# getting epoch for the time now in UTC to spare us headache with timezones
creationtime = int(datetime.utcnow().timestamp())
# Construct the data for the email
email_from = settings.EMAIL_FROM_ADDRESS
to = ['%s <%s>' % (user, email)]
subject = 'Password reset request for %s' % user
link = self.build_reset_link(user, creationtime, base_url)
body = 'This is an automated email which was triggered by a reset request for the user %s. Please do not reply to this email.\n' % user
body += 'If you received this email in error, please disregard it. If you get multiple emails like this, please contact us to look into potential abuse.\n'
body += 'To reset your password, please follow the link below:\n'
body += '%s\n\n' % link
body += 'The link will remain active for 24 hours.\n'
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
try:
mail.send()
result = True
except:
result = "An error occurred while trying to send the mail."
return result
# Builds the reset link for the email and puts the token into the database
def build_reset_link(self, user, epochutc, base_url):
# set up the data
tokengen = PasswordResetTokenGenerator()
# create some noise for use in the tokengenerator
pseudouser = PseudoUser()
token = tokengen.make_token(pseudouser)
buser = bytes(user, 'utf-8')
userpart = b64encode(buser)
# create entry into the database
newdbentry = ResetToken(user=user, token=token, creation=epochutc)
newdbentry.save()
# set up the link
link = "{base_url}/reset/{user}/{token}/".format(
base_url=base_url,user=userpart.decode('utf-8'),token=token
)
logger.debug("User reset url is {}".format(link))
return link
# Catch the resetrequest URL and check it
class ResetRequest(View):
# Gets the URL with user in b64 and the token, and checks it
# Also cleans the database
def get(self, request, user=None, token=None):
# Cleans up outdated tokens
# If we expect quite a bit of old tokens, maybe somewhere else is better,
# but for now we don't really expect many unused tokens
self.clean_db()
# If user and token are not supplied by django, it was called from somewhere else, so it's
# invalid
if user == None or token == None:
return HttpResponse('Invalid URL.', status=404)
# extract user from b64 format
tmp_user = bytes(user, 'utf-8')
user = b64decode(tmp_user)
user_clean = user.decode('utf-8')
# set checks_out = True if token is found in database
checks_out = False
dbentries = ResetToken.objects.all().filter(user=user_clean)
for entry in dbentries:
if entry.token == token:
# found the token, now delete it since it's used
checks_out = True
entry.delete()
# No token was found
if not checks_out:
return HttpResponse('Invalid URL.', status=404)
# Token was found, supply the form
else:
return render(request, 'resetpasswordnew.html', { 'user': user_clean } )
# Gets the post form with the new password and sets it
def post(self, request):
service = 'reset the password'
# get the supplied passwords
password1 = request.POST.get("password1")
password2 = request.POST.get("password2")
# get the hidden value of user
user = request.POST.get("user")
# some checks over the supplied data
if user == "" or not user:
return render(request, 'error.html', { 'service': service, 'error': 'Something went wrong. Did you use the supplied form?' } )
if password1 == "" or not password1 or password2 == "" or not password2:
return render(request, 'error.html', { 'service': service, 'error': 'Please supply a password and confirm it.' } )
if password1 != password2:
return render(request, 'error.html', { 'service': service, 'error': 'The supplied passwords do not match.' } )
if len(password1) < 8:
return render(request, 'error.html', { 'service': service, 'error': 'The password is too short, please use a longer one. At least 8 characters.' } )
# everything checks out, now change the password
ldap_manager = LdapManager()
result = ldap_manager.change_password(
user,
password1
)
# password change successful
if result:
return render(request, 'changedpassword.html', { 'user': user } )
# Something went wrong while changing the password
else:
return render(request, 'error.html', { 'service': service, 'error': result } )
# Cleans up outdated tokens
def clean_db(self):
# cutoff time is set to 24h hours
# using utcnow() to have no headache with timezones
cutoff = int(datetime.utcnow().timestamp()) - (24*60*60)
# Get all tokens older than 24 hours
oldtokens = ResetToken.objects.all().filter(creation__lt=cutoff)
for token in oldtokens:
# delete all tokens older than 24 hours
token.delete()
return True
# The logged in user can change the password here
class ChangePassword(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
# Presents the page for a logged in user
def get(self, request):
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
return render(request, 'changepassword.html', { 'user': request.user } )
# Does some checks on the supplied data and changes the password
def post(self, request):
# Variables for the error page
urlname = 'change_password'
service = 'change the password'
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
login(request, request.user)
user = str(request.user)
oldpassword = request.POST.get('oldpassword')
check = authenticate(request, username=user, password=oldpassword)
# Is the right password for the user supplied?
if check is None:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for the user.' } )
password1 = request.POST.get('password1')
password2 = request.POST.get('password2')
# Are both passwords from the form the same?
if password1 != password2:
return render(request, 'error.html', { 'urlname': urlname, 'service': service,
'error': 'Please check if you typed the same password both times for the new password' } )
# Check for password length
if len(password1) < 8:
return render(request, 'error.html', { 'urlname': urlname, 'service': service,
'error': 'The password is too short, please use a longer one. At least 8 characters.' } )
from .ungleich_ldap import LdapManager
ldap_manager = LdapManager()
result = ldap_manager.change_password(
user,
password1
)
# Password was changed
if result:
logout(request)
return render(request, 'changedpassword.html', { 'user': user } )
# Password not changed, instead got some kind of error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } )
# Deletes an account
class DeleteAccount(LoginRequiredMixin, View):
login_url = reverse_lazy('login_index')
# Show the basic form for deleting an account
def get(self, request):
return render(request, 'deleteaccount.html')
# Reads the filled out form
def post(self, request):
# Variables for error page
urlname = 'account_delete'
service = 'delete an account'
# Does the user exist?
username = request.POST.get('username')
ldap_manager = LdapManager()
user_exists, user_details = ldap_manager.check_user_exists(username)
if user_exists and request.user.username == username:
# Do user and password match?
password = request.POST.get('password')
pwd = r'%s' % password
check = authenticate(request, username=username, password=pwd)
if check is None:
return render(request, 'error.html',
{'urlname': urlname, 'service': service,
'error': 'Wrong password for user.'})
result = ldap_manager.delete_user(username)
# User deleted
if result:
logout(request)
return render(request, 'deleteduser.html', {'user': username})
# User not deleted, got some kind of error
else:
return render(request, 'error.html',
{'urlname': urlname, 'service': service,
'error': result})
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unknown user.' } )
# Log out the session
class LogOut(View):
def get(self, request):
logout(request)
return HttpResponse(
"You have been logged out. You will be redirected in 2 seconds."
"<script>"
"setTimeout(function (){document.location.href='/';}, 2000);"
"</script>",
status=200
)
# TO be clarified
# To trick the tokengenerator to work with us, because we don't really
# have the expected user Class since we are reading the user from a form
# We store the tokens and don't have to use the check function,
# some one time data works fine.
class LastLogin():
def replace(self, microsecond=0, tzinfo=None):
return randint(1,100000)
class PseudoUser():
# easiest way to handle the check for lastlogin
last_login = LastLogin()
# random alphanumeric strings for primary key and password, just used for token generation
pk = ''.join(choice(string.ascii_letters + string.digits) for _ in range(20))
password = ''.join(choice(string.ascii_letters + string.digits) for _ in range(30))
class UserCreateAPI(APIView):
def post(self, request):
username = request.POST.get('username')
email = request.POST.get('email')
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
if username == "" or not username:
return Response('Please supply a username.', 400)
try:
validate_email(email)
except ValidationError:
return Response('Email is not valid.', 400)
if not firstname or not lastname:
return Response('Please provide firstname and lastname', 400)
pwd = r'%s' % User.objects.make_random_password()
try:
ldap_manager = LdapManager()
ldap_manager.create_user(
username, pwd, firstname, lastname, email
)
except Exception as e:
return Response('While trying to create the user, an error was encountered: %s' % e, 400)
# send user credentials via email
creationtime = int(datetime.utcnow().timestamp())
# Construct the data for the email
email_from = settings.EMAIL_FROM_ADDRESS
to = ['%s <%s>' % (username, email)]
subject = 'Your datacenterlight credentials'
body = 'Your user was successfully created.\n'
body += 'Your credentials are:\n'
body += 'Username: %s\n\n' % username
body += 'Password: %s\n\n' % pwd
body += 'We strongly recommend you to after log in change your password.\n'
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
try:
mail.send()
except:
return Response('User was created, but failed to send the email', 201)
return Response('User successfully created', 200)