You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
483 lines
20 KiB
483 lines
20 KiB
# Imports from django |
|
from django.shortcuts import render |
|
from django.views.generic import View |
|
from django.contrib.auth import authenticate, login, logout |
|
from django.contrib.auth.models import User |
|
from django.http import HttpResponse, HttpResponseRedirect |
|
from django.core.validators import validate_email, ValidationError |
|
from django.urls import reverse_lazy |
|
from django.contrib.auth.tokens import PasswordResetTokenGenerator |
|
from django.core.mail import EmailMessage |
|
from .models import ResetToken |
|
|
|
# Imports for the extra stuff not in django |
|
|
|
from base64 import b64encode, b64decode |
|
from datetime import datetime |
|
|
|
from random import choice, randint |
|
import string |
|
|
|
from django.conf import settings |
|
from django_auth_ldap.backend import LDAPBackend |
|
from ldap3 import Server, ServerPool, Connection, ObjectDef, AttrDef, Reader, Writer |
|
|
|
|
|
|
|
def check_user_exists(username): |
|
user = LDAPBackend().populate_user(username) |
|
|
|
if not user == None: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
class LDAP(object): |
|
def __init__(self): |
|
self.server = settings.AUTH_LDAP_SERVER_URI |
|
self.user = settings.AUTH_LDAP_BIND_DN |
|
self.password = settings.AUTH_LDAP_BIND_PASSWORD |
|
|
|
# FIXME: take from settings |
|
self.search = os.environ['LDAPSEARCH'] |
|
|
|
# FIXME: hard coded |
|
self.dn = "uid={{}},{}".format(os.environ['LDAPCREATE']) |
|
|
|
def create_user(self, user, password, firstname, lastname, email): |
|
conn = Connection(self.server, |
|
self.user, |
|
self.password) |
|
|
|
if not conn.bind(): |
|
raise Exception("Could not connect to LDAPserver {}".format(self.server)) |
|
|
|
# set objectClasses for the new user |
|
obj_new_user = ObjectDef(['inetOrgPerson', 'posixAccount', 'ldapPublicKey'], conn) |
|
|
|
w = Writer(conn, obj_new_user) |
|
dn = self.dn.format(user) |
|
w.new(dn) |
|
|
|
# Filling in some of the data |
|
# required attributes are sn, cn, homeDirectory, uid (already handled by dn), uidNumber, gidNumber |
|
|
|
w[0].givenName = firstname |
|
w[0].sn = lastname |
|
w[0].cn = firstname + " " + lastname |
|
w[0].mail = email |
|
w[0].userPassword = password |
|
w[0].homeDirectory = "/home/{}".format(user) |
|
|
|
# Set uidNumber as last used uidNumber+1 |
|
w[0].uidNumber = self.get_new_uid_number(conn) |
|
# gidNumber for users created by userservice, nice and clear |
|
w[0].gidNumber = 10004 |
|
|
|
if not w.commit(): |
|
conn.unbind() |
|
raise Exception("Could not write new user {} to LDAP DB, commit error".format(user)) |
|
|
|
conn.unbind() |
|
|
|
# Function to get the next uid number. Not elegant, but LAM does it too and didn't really find anything |
|
# nicer. The sorted() seems to be quite efficient, so it shouldn't take too long even on larger arrays |
|
def get_new_uid_number(self, conn): |
|
newuid = 0 |
|
uidlist = [] |
|
|
|
for search in self.search.split(): |
|
conn.search(search, '(&(objectClass=posixAccount)(uidNumber=*))', attributes = [ 'uidNumber' ]) |
|
for c in conn.response: |
|
uidlist.append(c['attributes']['uidNumber']) |
|
|
|
# New uid is highest old uidnumber plus one |
|
newuid = (sorted(uidlist)[len(uidlist)-1] + 1) |
|
return newuid |
|
|
|
class Index(View): |
|
def get(self, request): |
|
if request.user.is_authenticated: |
|
return render(request, 'useroptions.html', { 'user': request.user } ) |
|
return render(request, 'landing.html') |
|
|
|
def post(self, request): |
|
username = request.POST.get('username') |
|
password = request.POST.get('password') |
|
pwd = r'%s' % password |
|
user = authenticate(request, username=username, password=pwd) |
|
if user is not None: |
|
login(request, user) |
|
return render(request, 'useroptions.html', { 'user': user } ) |
|
return render(request, 'loginfailed.html') |
|
|
|
class Register(View): |
|
def get(self, request): |
|
return render(request, 'registeruser.html') |
|
|
|
# Someone filled out the register page, do some basic checks and throw it at nameko |
|
def post(self, request): |
|
# message for the error template |
|
service = 'register an user' |
|
# urlname for 'go back' on the errorpage |
|
urlname = 'register' |
|
username = request.POST.get('username') |
|
if username == "" or not username: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please supply a username.' } ) |
|
|
|
if check_user_exists(username): |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'User already exists.' } ) |
|
password1 = request.POST.get('password1') |
|
password2 = request.POST.get('password2') |
|
|
|
# check if the supplied passwords match |
|
if password1 != password2: |
|
return render(request, 'error.html', { 'urlname': urlname, |
|
'service': service, |
|
'error': 'Your passwords did not match. Please supply the same password twice.' } ) |
|
|
|
email = request.POST.get('email') |
|
# Is the emailaddress valid? |
|
try: |
|
validate_email(email) |
|
except ValidationError: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } ) |
|
|
|
firstname = request.POST.get('firstname') |
|
lastname = request.POST.get('lastname') |
|
if firstname == "" or not firstname or lastname == "" or not lastname: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter your firstname and lastname.' } ) |
|
|
|
# so nothing strange happens if there are escapable chars |
|
pwd = r'%s' % password1 |
|
|
|
l = LDAP() |
|
|
|
try: |
|
l.create_user(username, pwd, firstname, lastname, email) |
|
except Exception as e: |
|
return render(request, 'error.html', { 'urlname': urlname, |
|
'service': service, |
|
'error': e } ) |
|
|
|
return render(request, 'usercreated.html', { 'user': username } ) |
|
|
|
|
|
class ChangeData(View): |
|
# provide the form for the change request |
|
def get(self, request): |
|
urlname = 'change_data' |
|
service = 'get default data for logged in user' |
|
if not request.user.is_authenticated: |
|
return render(request, 'mustbeloggedin.html') |
|
user = request.user |
|
login(request, user) |
|
# get basic data (firstname, lastname, email) |
|
with get_pool().next() as rpc: |
|
(state, firstname, lastname, email) = rpc.getuserdata.get_data(str(request.user)) |
|
# If it throws an error, the errormessage gets put into firstname.. not great naming, but works best this way |
|
if state == "error": |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': firstname } ) |
|
# The template puts the old data as standard in the fields |
|
else: |
|
return render(request, 'changeuserdata.html', { 'user': str(request.user), 'firstname': firstname, 'lastname': lastname, 'email': email } ) |
|
|
|
# get the change request |
|
def post(self, request): |
|
# variables for the error page |
|
service = 'change user data' |
|
urlname = 'change_data' |
|
|
|
# Only logged in users may change data |
|
if not request.user.is_authenticated: |
|
return render(request, 'mustbeloggedin.html') |
|
|
|
user = str(request.user) |
|
firstname = request.POST.get('firstname') |
|
lastname = request.POST.get('lastname') |
|
email = request.POST.get('email') |
|
|
|
# Some sanity checks for the supplied data |
|
if firstname == "": |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a firstname.' } ) |
|
elif lastname == "": |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a lastname.' } ) |
|
elif email == "": |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter an email.' } ) |
|
try: |
|
validate_email(email) |
|
except ValidationError: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } ) |
|
# Trying to change the data |
|
with get_pool().next() as rpc: |
|
result = rpc.changeuserdata.change_data(user, firstname, lastname, email) |
|
# Data change worked |
|
if result == True: |
|
return render(request, 'changeddata.html', { 'user': user, 'firstname': firstname, 'lastname': lastname, 'email': email } ) |
|
# Data change did not work, display error |
|
else: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } ) |
|
|
|
|
|
# Resets the password for a user |
|
# Sends email to the user with a link to reset the password |
|
|
|
class ResetPassword(View): |
|
|
|
# Presents the form with some information |
|
def get(self, request): |
|
return render(request, 'resetpassword.html') |
|
|
|
# gets the data from confirming the reset request and checks if it was not a misclick |
|
# (by having the user type in his username) |
|
def post(self, request): |
|
urlname = 'reset_password' |
|
service = 'send a password reset request' |
|
user = request.POST.get('user') |
|
# First, check if the user exists |
|
if not check_user_exists(user): |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The user does not exist.' } ) |
|
# user exists, so try to get email |
|
with get_pool().next() as rpc: |
|
(state, tmp1, tmp2, email) = rpc.getuserdata.get_data(user) |
|
# Either error with the datalookup or no email provided |
|
if state == "error" or email == 'No email given' or not email: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unable to retrieve email address for user.' } ) |
|
# Try to send the email out |
|
emailsend = self.email(user, email) |
|
# Email got sent out |
|
if emailsend == True: |
|
return render(request, 'send_resetrequest.html', { 'user': user } ) |
|
# Error while trying to send email |
|
else: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': emailsend } ) |
|
|
|
# Sends an email to the user with the 24h active link for a password reset |
|
def email(self, user, email): |
|
# getting epoch for the time now in UTC to spare us headache with timezones |
|
creationtime = int(datetime.utcnow().timestamp()) |
|
# Construct the data for the email |
|
email_from = 'Userservice at ungleich <%s>' % config['EMAIL']['EMAILFROM'] |
|
to = ['%s <%s>' % (user, email)] |
|
subject = 'Password reset request for %s' % user |
|
link = self.build_reset_link(user, creationtime) |
|
body = 'This is an automated email which was triggered by a reset request for the user %s. Please do not reply to this email.\n' % user |
|
body += 'If you received this email in error, please disregard it. If you get multiple emails like this, please contact us to look into potential abuse.\n' |
|
body += 'To reset your password, please follow the link below:\n' |
|
body += '%s\n\n' % link |
|
body += 'The link will remain active for 24 hours.\n' |
|
# Build the email |
|
mail = EmailMessage( |
|
subject=subject, |
|
body=body, |
|
from_email=email_from, |
|
to=to |
|
) |
|
try: |
|
mail.send() |
|
result = True |
|
except: |
|
result = "An error occurred while trying to send the mail." |
|
return result |
|
|
|
# Builds the reset link for the email and puts the token into the database |
|
def build_reset_link(self, user, epochutc): |
|
# set up the data |
|
host = 'account-staging.ungleich.ch' |
|
tokengen = PasswordResetTokenGenerator() |
|
# create some noise for use in the tokengenerator |
|
pseudouser = PseudoUser() |
|
token = tokengen.make_token(pseudouser) |
|
buser = bytes(user, 'utf-8') |
|
userpart = b64encode(buser) |
|
# create entry into the database |
|
newdbentry = ResetToken(user=user, token=token, creation=epochutc) |
|
newdbentry.save() |
|
# set up the link |
|
link = 'https://%s/reset/%s/%s/' % (host, userpart.decode('utf-8'), token) |
|
return link |
|
|
|
|
|
# Catch the resetrequest URL and check it |
|
class ResetRequest(View): |
|
|
|
# Gets the URL with user in b64 and the token, and checks it |
|
# Also cleans the database |
|
def get(self, request, user=None, token=None): |
|
# Cleans up outdated tokens |
|
# If we expect quite a bit of old tokens, maybe somewhere else is better, |
|
# but for now we don't really expect many unused tokens |
|
self.clean_db() |
|
# If user and token are not supplied by django, it was called from somewhere else, so it's |
|
# invalid |
|
if user == None or token == None: |
|
return HttpResponse('Invalid URL.', status=404) |
|
# extract user from b64 format |
|
tmp_user = bytes(user, 'utf-8') |
|
user = b64decode(tmp_user) |
|
user_clean = user.decode('utf-8') |
|
# set checks_out = True if token is found in database |
|
dbentries = ResetToken.objects.all().filter(user=user_clean) |
|
for entry in dbentries: |
|
if entry.token == token: |
|
# found the token, now delete it since it's used |
|
checks_out = True |
|
entry.delete() |
|
# No token was found |
|
if not checks_out: |
|
return HttpResponse('Invalid URL.', status=404) |
|
# Token was found, supply the form |
|
else: |
|
return render(request, 'resetpasswordnew.html', { 'user': user_clean } ) |
|
|
|
|
|
# Gets the post form with the new password and sets it |
|
def post(self, request): |
|
service = 'reset the password' |
|
# get the supplied passwords |
|
password1 = request.POST.get("password1") |
|
password2 = request.POST.get("password2") |
|
# get the hidden value of user |
|
user = request.POST.get("user") |
|
# some checks over the supplied data |
|
if user == "" or not user: |
|
return render(request, 'error.html', { 'service': service, 'error': 'Something went wrong. Did you use the supplied form?' } ) |
|
if password1 == "" or not password1 or password2 == "" or not password2: |
|
return render(request, 'error.html', { 'service': service, 'error': 'Please supply a password and confirm it.' } ) |
|
if password1 != password2: |
|
return render(request, 'error.html', { 'service': service, 'error': 'The supplied passwords do not match.' } ) |
|
if len(password1) < 8: |
|
return render(request, 'error.html', { 'service': service, 'error': 'The password is too short, please use a longer one. At least 8 characters.' } ) |
|
# everything checks out, now change the password |
|
with get_pool().next() as rpc: |
|
pwd = r'%s' % password1 |
|
result = rpc.changepassword.change_password(user, pwd) |
|
# password change successfull |
|
if result == True: |
|
return render(request, 'changedpassword.html', { 'user': user } ) |
|
# Something went wrong while changing the password |
|
else: |
|
return render(request, 'error.html', { 'service': service, 'error': result } ) |
|
|
|
# Cleans up outdated tokens |
|
def clean_db(self): |
|
# cutoff time is set to 24h hours |
|
# using utcnow() to have no headache with timezones |
|
cutoff = int(datetime.utcnow().timestamp()) - (24*60*60) |
|
# Get all tokens older than 24 hours |
|
oldtokens = ResetToken.objects.all().filter(creation__lt=cutoff) |
|
for token in oldtokens: |
|
# delete all tokens older than 24 hours |
|
token.delete() |
|
return True |
|
|
|
# The logged in user can change the password here |
|
|
|
class ChangePassword(View): |
|
|
|
# Presents the page for a logged in user |
|
def get(self, request): |
|
if not request.user.is_authenticated: |
|
return render(request, 'mustbeloggedin.html') |
|
return render(request, 'changepassword.html', { 'user': request.user } ) |
|
|
|
# Does some checks on the supplied data and changes the password |
|
def post(self, request): |
|
# Variables for the error page |
|
urlname = 'change_password' |
|
service = 'change the password' |
|
|
|
if not request.user.is_authenticated: |
|
return render(request, 'mustbeloggedin.html') |
|
login(request, request.user) |
|
|
|
user = str(request.user) |
|
oldpassword = request.POST.get('oldpassword') |
|
check = authenticate(request, username=user, password=oldpassword) |
|
# Is the right password for the user supplied? |
|
if check is None: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for the user.' } ) |
|
|
|
password1 = request.POST.get('password1') |
|
password2 = request.POST.get('password2') |
|
# Are both passwords from the form the same? |
|
if password1 != password2: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, |
|
'error': 'Please check if you typed the same password both times for the new password' } ) |
|
# Check for password length |
|
if len(password1) < 8: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, |
|
'error': 'The password is too short, please use a longer one. At least 8 characters.' } ) |
|
with get_pool().next() as rpc: |
|
# Trying to change the password |
|
pwd = r'%s' % password1 |
|
result = rpc.changepassword.change_password(user, pwd) |
|
# Password was changed |
|
if result == True: |
|
return render(request, 'changedpassword.html', { 'user': user } ) |
|
# Password not changed, instead got some kind of error |
|
else: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } ) |
|
|
|
|
|
# Deletes an account |
|
class DeleteAccount(View): |
|
# Show the basic form for deleting an account |
|
def get(self, request): |
|
return render(request, 'deleteaccount.html') |
|
|
|
# Reads the filled out form |
|
def post(self, request): |
|
# Variables for error page |
|
urlname = 'account_delete' |
|
service = 'delete an account' |
|
|
|
# Does the user exist? |
|
username = request.POST.get('username') |
|
if not check_user_exists(username): |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unknown user.' } ) |
|
|
|
# Do user and password match? |
|
password = request.POST.get('password') |
|
pwd = r'%s' % password |
|
check = authenticate(request, username=username, password=pwd) |
|
if check is None: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for user.' } ) |
|
|
|
# Try to delete the user |
|
with get_pool().next() as rpc: |
|
result = rpc.deleteuser.delete_user(username) |
|
# User deleted |
|
if result == True: |
|
logout(request) |
|
return render(request, 'deleteduser.html', { 'user': username } ) |
|
# User not deleted, got some kind of error |
|
else: |
|
return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } ) |
|
|
|
# Log out the session |
|
class LogOut(View): |
|
def get(self, request): |
|
logout(request) |
|
return HttpResponse("You have been logged out.", status=200) |
|
|
|
|
|
|
|
# TO be clarified |
|
|
|
# To trick the tokengenerator to work with us, because we don't really |
|
# have the expected user Class since we are reading the user from a form |
|
# We store the tokens and don't have to use the check function, |
|
# some one time data works fine. |
|
|
|
class LastLogin(): |
|
def replace(self, microsecond=0, tzinfo=None): |
|
return randint(1,100000) |
|
|
|
class PseudoUser(): |
|
# easiest way to handle the check for lastlogin |
|
last_login = LastLogin() |
|
# random alphanumeric strings for primary key and password, just used for token generation |
|
pk = ''.join(choice(string.ascii_letters + string.digits) for _ in range(20)) |
|
password = ''.join(choice(string.ascii_letters + string.digits) for _ in range(30))
|
|
|