ungleich-user/dal/views.py
2019-02-19 23:16:05 +01:00

465 lines
20 KiB
Python

# Imports from django
from django.shortcuts import render
from django.views.generic import View, FormView
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.http import HttpResponse, HttpResponseRedirect
from django.core.validators import validate_email, ValidationError
from django.urls import reverse_lazy
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.mail import EmailMessage
from .models import ResetToken
from .forms import LoginForm
# Imports for the extra stuff not in django
from base64 import b64encode, b64decode
from datetime import datetime
from random import choice, randint
import string
import os
# Use ldap, like django_auth_backend
import ldap
import ldap.modlist as modlist
from django.conf import settings
class LDAP(object):
def __init__(self):
self.uri = settings.AUTH_LDAP_SERVER_URI
self.user = settings.AUTH_LDAP_BIND_DN
self.password = settings.AUTH_LDAP_BIND_PASSWORD
# FIXME: take from settings
self.search_base = os.environ['LDAPSEARCH']
self.search_scope = ldap.SCOPE_SUBTREE
self.search_filter = "objectClass=inetOrgPerson"
# FIXME: hard coded
self.dn = "uid={{}},{}".format(os.environ['LDAPCREATE'])
self.gid = "10004"
self.conn = ldap.initialize(self.uri)
if settings.AUTH_LDAP_START_TLS:
self.conn.start_tls_s()
self.conn.bind_s(self.user, self.password)
def check_user_exists(self, username):
exists = False
result = self.conn.search_s(self.search_base,
self.search_scope,
self.dn.format(username))
if len(result) > 0:
exists = True
return exists
def create_user(self, user, password, firstname, lastname, email):
dn = self.dn.format(user)
attr = {
"objectClass": ["inetOrgPerson".encode("utf-8"),
"posixAccount".encode("utf-8"),
"ldapPublickey".encode("utf-8")],
"uid": [user.encode("utf-8")],
"sn": [lastname.encode("utf-8")],
"givenName": [firstname.encode("utf-8")],
"cn": ["{} {}".format(firstname, lastname).encode("utf-8")],
"displayName": ["{} {}".format(firstname, lastname).encode("utf-8")],
"uidNumber": ["{}".format(self.get_new_uid_number()).encode("utf-8")],
"gidNumber": [self.gid.encode("utf-8")],
"loginShell": ["/bin/bash".encode("utf-8")],
"homeDirectory": ["/home/{}".format(user).encode("utf-8")],
"mail": email.encode("utf-8"),
"userPassword": password.encode("utf-8")
}
ldif = modlist.addModlist(attr)
print("just before: {} {}".format(dn, ldif))
return self.conn.add_s(dn, ldif)
def get_new_uid_number(self):
uidlist = [0]
for result in self.conn.search_s(self.search_base,
self.search_scope,
self.search_filter):
if 'uidNumber' in result[1]:
uidlist.append(int(result[1]['uidNumber'][0]))
return sorted(uidlist)[-1] + 1
class Index(FormView):
template_name = "landing.html"
form_class = LoginForm
class Register(View):
def get(self, request):
return render(request, 'registeruser.html')
# Someone filled out the register page, do some basic checks and throw it at nameko
def post(self, request):
l = LDAP()
service = 'register an user'
urlname = 'register'
username = request.POST.get('username')
if username == "" or not username:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please supply a username.' } )
if l.check_user_exists(username):
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'User already exists.' } )
password1 = request.POST.get('password1')
password2 = request.POST.get('password2')
if password1 != password2:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': "Passwords don't match." } )
email = request.POST.get('email')
try:
validate_email(email)
except ValidationError:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': 'The supplied email address is invalid.' } )
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
if not firstname or not lastname:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': 'Please enter your firstname and lastname.' } )
# so nothing strange happens if there are escapable chars
pwd = r'%s' % password1
try:
l.create_user(username, pwd, firstname, lastname, email)
except Exception as e:
return render(request, 'error.html', { 'urlname': urlname,
'service': service,
'error': e } )
return render(request, 'usercreated.html', { 'user': username } )
class ChangeData(View):
# provide the form for the change request
def get(self, request):
urlname = 'change_data'
service = 'get default data for logged in user'
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
user = request.user
login(request, user)
# get basic data (firstname, lastname, email)
with get_pool().next() as rpc:
(state, firstname, lastname, email) = rpc.getuserdata.get_data(str(request.user))
# If it throws an error, the errormessage gets put into firstname.. not great naming, but works best this way
if state == "error":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': firstname } )
# The template puts the old data as standard in the fields
else:
return render(request, 'changeuserdata.html', { 'user': str(request.user), 'firstname': firstname, 'lastname': lastname, 'email': email } )
# get the change request
def post(self, request):
# variables for the error page
service = 'change user data'
urlname = 'change_data'
# Only logged in users may change data
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
user = str(request.user)
firstname = request.POST.get('firstname')
lastname = request.POST.get('lastname')
email = request.POST.get('email')
# Some sanity checks for the supplied data
if firstname == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a firstname.' } )
elif lastname == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a lastname.' } )
elif email == "":
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter an email.' } )
try:
validate_email(email)
except ValidationError:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } )
# Trying to change the data
with get_pool().next() as rpc:
result = rpc.changeuserdata.change_data(user, firstname, lastname, email)
# Data change worked
if result == True:
return render(request, 'changeddata.html', { 'user': user, 'firstname': firstname, 'lastname': lastname, 'email': email } )
# Data change did not work, display error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } )
class ResetPassword(View):
def get(self, request):
return render(request, 'resetpassword.html')
def post(self, request):
urlname = 'reset_password'
service = 'send a password reset request'
user = request.POST.get('user')
# First, check if the user exists
if not check_user_exists(user):
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The user does not exist.' } )
# user exists, so try to get email
with get_pool().next() as rpc:
(state, tmp1, tmp2, email) = rpc.getuserdata.get_data(user)
# Either error with the datalookup or no email provided
if state == "error" or email == 'No email given' or not email:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unable to retrieve email address for user.' } )
# Try to send the email out
emailsend = self.email(user, email)
# Email got sent out
if emailsend == True:
return render(request, 'send_resetrequest.html', { 'user': user } )
# Error while trying to send email
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': emailsend } )
# Sends an email to the user with the 24h active link for a password reset
def email(self, user, email):
# getting epoch for the time now in UTC to spare us headache with timezones
creationtime = int(datetime.utcnow().timestamp())
# Construct the data for the email
email_from = 'Userservice at ungleich <%s>' % config['EMAIL']['EMAILFROM']
to = ['%s <%s>' % (user, email)]
subject = 'Password reset request for %s' % user
link = self.build_reset_link(user, creationtime)
body = 'This is an automated email which was triggered by a reset request for the user %s. Please do not reply to this email.\n' % user
body += 'If you received this email in error, please disregard it. If you get multiple emails like this, please contact us to look into potential abuse.\n'
body += 'To reset your password, please follow the link below:\n'
body += '%s\n\n' % link
body += 'The link will remain active for 24 hours.\n'
# Build the email
mail = EmailMessage(
subject=subject,
body=body,
from_email=email_from,
to=to
)
try:
mail.send()
result = True
except:
result = "An error occurred while trying to send the mail."
return result
# Builds the reset link for the email and puts the token into the database
def build_reset_link(self, user, epochutc):
# set up the data
host = 'account-staging.ungleich.ch'
tokengen = PasswordResetTokenGenerator()
# create some noise for use in the tokengenerator
pseudouser = PseudoUser()
token = tokengen.make_token(pseudouser)
buser = bytes(user, 'utf-8')
userpart = b64encode(buser)
# create entry into the database
newdbentry = ResetToken(user=user, token=token, creation=epochutc)
newdbentry.save()
# set up the link
link = 'https://%s/reset/%s/%s/' % (host, userpart.decode('utf-8'), token)
return link
# Catch the resetrequest URL and check it
class ResetRequest(View):
# Gets the URL with user in b64 and the token, and checks it
# Also cleans the database
def get(self, request, user=None, token=None):
# Cleans up outdated tokens
# If we expect quite a bit of old tokens, maybe somewhere else is better,
# but for now we don't really expect many unused tokens
self.clean_db()
# If user and token are not supplied by django, it was called from somewhere else, so it's
# invalid
if user == None or token == None:
return HttpResponse('Invalid URL.', status=404)
# extract user from b64 format
tmp_user = bytes(user, 'utf-8')
user = b64decode(tmp_user)
user_clean = user.decode('utf-8')
# set checks_out = True if token is found in database
dbentries = ResetToken.objects.all().filter(user=user_clean)
for entry in dbentries:
if entry.token == token:
# found the token, now delete it since it's used
checks_out = True
entry.delete()
# No token was found
if not checks_out:
return HttpResponse('Invalid URL.', status=404)
# Token was found, supply the form
else:
return render(request, 'resetpasswordnew.html', { 'user': user_clean } )
# Gets the post form with the new password and sets it
def post(self, request):
service = 'reset the password'
# get the supplied passwords
password1 = request.POST.get("password1")
password2 = request.POST.get("password2")
# get the hidden value of user
user = request.POST.get("user")
# some checks over the supplied data
if user == "" or not user:
return render(request, 'error.html', { 'service': service, 'error': 'Something went wrong. Did you use the supplied form?' } )
if password1 == "" or not password1 or password2 == "" or not password2:
return render(request, 'error.html', { 'service': service, 'error': 'Please supply a password and confirm it.' } )
if password1 != password2:
return render(request, 'error.html', { 'service': service, 'error': 'The supplied passwords do not match.' } )
if len(password1) < 8:
return render(request, 'error.html', { 'service': service, 'error': 'The password is too short, please use a longer one. At least 8 characters.' } )
# everything checks out, now change the password
with get_pool().next() as rpc:
pwd = r'%s' % password1
result = rpc.changepassword.change_password(user, pwd)
# password change successfull
if result == True:
return render(request, 'changedpassword.html', { 'user': user } )
# Something went wrong while changing the password
else:
return render(request, 'error.html', { 'service': service, 'error': result } )
# Cleans up outdated tokens
def clean_db(self):
# cutoff time is set to 24h hours
# using utcnow() to have no headache with timezones
cutoff = int(datetime.utcnow().timestamp()) - (24*60*60)
# Get all tokens older than 24 hours
oldtokens = ResetToken.objects.all().filter(creation__lt=cutoff)
for token in oldtokens:
# delete all tokens older than 24 hours
token.delete()
return True
# The logged in user can change the password here
class ChangePassword(View):
# Presents the page for a logged in user
def get(self, request):
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
return render(request, 'changepassword.html', { 'user': request.user } )
# Does some checks on the supplied data and changes the password
def post(self, request):
# Variables for the error page
urlname = 'change_password'
service = 'change the password'
if not request.user.is_authenticated:
return render(request, 'mustbeloggedin.html')
login(request, request.user)
user = str(request.user)
oldpassword = request.POST.get('oldpassword')
check = authenticate(request, username=user, password=oldpassword)
# Is the right password for the user supplied?
if check is None:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for the user.' } )
password1 = request.POST.get('password1')
password2 = request.POST.get('password2')
# Are both passwords from the form the same?
if password1 != password2:
return render(request, 'error.html', { 'urlname': urlname, 'service': service,
'error': 'Please check if you typed the same password both times for the new password' } )
# Check for password length
if len(password1) < 8:
return render(request, 'error.html', { 'urlname': urlname, 'service': service,
'error': 'The password is too short, please use a longer one. At least 8 characters.' } )
with get_pool().next() as rpc:
# Trying to change the password
pwd = r'%s' % password1
result = rpc.changepassword.change_password(user, pwd)
# Password was changed
if result == True:
return render(request, 'changedpassword.html', { 'user': user } )
# Password not changed, instead got some kind of error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } )
# Deletes an account
class DeleteAccount(View):
# Show the basic form for deleting an account
def get(self, request):
return render(request, 'deleteaccount.html')
# Reads the filled out form
def post(self, request):
# Variables for error page
urlname = 'account_delete'
service = 'delete an account'
# Does the user exist?
username = request.POST.get('username')
if not check_user_exists(username):
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unknown user.' } )
# Do user and password match?
password = request.POST.get('password')
pwd = r'%s' % password
check = authenticate(request, username=username, password=pwd)
if check is None:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for user.' } )
# Try to delete the user
with get_pool().next() as rpc:
result = rpc.deleteuser.delete_user(username)
# User deleted
if result == True:
logout(request)
return render(request, 'deleteduser.html', { 'user': username } )
# User not deleted, got some kind of error
else:
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } )
# Log out the session
class LogOut(View):
def get(self, request):
logout(request)
return HttpResponse("You have been logged out.", status=200)
# TO be clarified
# To trick the tokengenerator to work with us, because we don't really
# have the expected user Class since we are reading the user from a form
# We store the tokens and don't have to use the check function,
# some one time data works fine.
class LastLogin():
def replace(self, microsecond=0, tzinfo=None):
return randint(1,100000)
class PseudoUser():
# easiest way to handle the check for lastlogin
last_login = LastLogin()
# random alphanumeric strings for primary key and password, just used for token generation
pk = ''.join(choice(string.ascii_letters + string.digits) for _ in range(20))
password = ''.join(choice(string.ascii_letters + string.digits) for _ in range(30))