# Imports from django from django.shortcuts import render from django.views.generic import View from django.contrib.auth import authenticate, login, logout from django.contrib.auth.models import User from django.http import HttpResponse, HttpResponseRedirect from django.core.validators import validate_email, ValidationError from django.urls import reverse_lazy from django.contrib.auth.tokens import PasswordResetTokenGenerator from django.core.mail import EmailMessage from .models import ResetToken # Imports for the extra stuff not in django from base64 import b64encode, b64decode from datetime import datetime from random import choice, randint import string import os # Use ldap, like django_auth_backend import ldap from django.conf import settings class LDAP(object): def __init__(self): self.uri = settings.AUTH_LDAP_SERVER_URI self.user = settings.AUTH_LDAP_BIND_DN self.password = settings.AUTH_LDAP_BIND_PASSWORD # FIXME: take from settings self.search_base = os.environ['LDAPSEARCH'] self.search_scope = ldap.SCOPE_SUBTREE self.search_filter = "objectClass=inetOrgPerson" # FIXME: hard coded self.dn = "uid={{}},{}".format(os.environ['LDAPCREATE']) self.gid = "10004" self.conn = ldap.initialize(self.uri) if settings.AUTH_LDAP_START_TLS: self.conn.start_tls_s() print("{} {} {}".format(self.uri, self.user, self.password)) self.conn.bind_s(self.user, self.password) def check_user_exists(self, username): result = self.conn.search_s(self.search_base, self.search_scope, self.dn.format(username)) if not len(result) == 0: return True else: return False def create_user(self, user, password, firstname, lastname, email): dn = self.dn.format(user) modlist = { "objectClass": ["inetOrgPerson", "posixAccount", "ldapPublickey"], "uid": [user], "sn": [lastname], "givenName": [firstname], "cn": ["{} {}".format(firstname, lastname)], "displayName": ["{} {}".format(firstname, lastname)], "uidNumber": ["{}".format(self.get_new_uid_number(conn))], "gidNumber": [self.gid], "loginShell": ["/bin/bash"], "homeDirectory": ["/home/{}".format(user)], "mail": email, "userPassword": password } result = self.conn.add_s(dn, ldap.modlist.addModlist(modlist)) def get_new_uid_number(self): uidlist = [0] for result in self.conn.search_s(self.search_base, self.search_scope, self.search_filter): uidlist.append(int(result[1]['uidNumber'][0])) return sorted(uidlist)[-1] + 1 class Index(View): def get(self, request): if request.user.is_authenticated: return render(request, 'useroptions.html', { 'user': request.user } ) return render(request, 'landing.html') def post(self, request): username = request.POST.get('username') password = request.POST.get('password') pwd = r'%s' % password user = authenticate(request, username=username, password=pwd) if user is not None: login(request, user) return render(request, 'useroptions.html', { 'user': user } ) return render(request, 'loginfailed.html') class Register(View): def get(self, request): return render(request, 'registeruser.html') # Someone filled out the register page, do some basic checks and throw it at nameko def post(self, request): l = LDAP() service = 'register an user' urlname = 'register' username = request.POST.get('username') if username == "" or not username: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please supply a username.' } ) if l.check_user_exists(username): return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'User already exists.' } ) password1 = request.POST.get('password1') password2 = request.POST.get('password2') if password1 != password2: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': "Passwords don't match." } ) email = request.POST.get('email') try: validate_email(email) except ValidationError: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } ) firstname = request.POST.get('firstname') lastname = request.POST.get('lastname') if not firstname or not lastname: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter your firstname and lastname.' } ) # so nothing strange happens if there are escapable chars pwd = r'%s' % password1 l = LDAP() try: l.create_user(username, pwd, firstname, lastname, email) except Exception as e: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': e } ) return render(request, 'usercreated.html', { 'user': username } ) class ChangeData(View): # provide the form for the change request def get(self, request): urlname = 'change_data' service = 'get default data for logged in user' if not request.user.is_authenticated: return render(request, 'mustbeloggedin.html') user = request.user login(request, user) # get basic data (firstname, lastname, email) with get_pool().next() as rpc: (state, firstname, lastname, email) = rpc.getuserdata.get_data(str(request.user)) # If it throws an error, the errormessage gets put into firstname.. not great naming, but works best this way if state == "error": return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': firstname } ) # The template puts the old data as standard in the fields else: return render(request, 'changeuserdata.html', { 'user': str(request.user), 'firstname': firstname, 'lastname': lastname, 'email': email } ) # get the change request def post(self, request): # variables for the error page service = 'change user data' urlname = 'change_data' # Only logged in users may change data if not request.user.is_authenticated: return render(request, 'mustbeloggedin.html') user = str(request.user) firstname = request.POST.get('firstname') lastname = request.POST.get('lastname') email = request.POST.get('email') # Some sanity checks for the supplied data if firstname == "": return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a firstname.' } ) elif lastname == "": return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter a lastname.' } ) elif email == "": return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please enter an email.' } ) try: validate_email(email) except ValidationError: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The supplied email address is invalid.' } ) # Trying to change the data with get_pool().next() as rpc: result = rpc.changeuserdata.change_data(user, firstname, lastname, email) # Data change worked if result == True: return render(request, 'changeddata.html', { 'user': user, 'firstname': firstname, 'lastname': lastname, 'email': email } ) # Data change did not work, display error else: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } ) class ResetPassword(View): def get(self, request): return render(request, 'resetpassword.html') def post(self, request): urlname = 'reset_password' service = 'send a password reset request' user = request.POST.get('user') # First, check if the user exists if not check_user_exists(user): return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The user does not exist.' } ) # user exists, so try to get email with get_pool().next() as rpc: (state, tmp1, tmp2, email) = rpc.getuserdata.get_data(user) # Either error with the datalookup or no email provided if state == "error" or email == 'No email given' or not email: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unable to retrieve email address for user.' } ) # Try to send the email out emailsend = self.email(user, email) # Email got sent out if emailsend == True: return render(request, 'send_resetrequest.html', { 'user': user } ) # Error while trying to send email else: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': emailsend } ) # Sends an email to the user with the 24h active link for a password reset def email(self, user, email): # getting epoch for the time now in UTC to spare us headache with timezones creationtime = int(datetime.utcnow().timestamp()) # Construct the data for the email email_from = 'Userservice at ungleich <%s>' % config['EMAIL']['EMAILFROM'] to = ['%s <%s>' % (user, email)] subject = 'Password reset request for %s' % user link = self.build_reset_link(user, creationtime) body = 'This is an automated email which was triggered by a reset request for the user %s. Please do not reply to this email.\n' % user body += 'If you received this email in error, please disregard it. If you get multiple emails like this, please contact us to look into potential abuse.\n' body += 'To reset your password, please follow the link below:\n' body += '%s\n\n' % link body += 'The link will remain active for 24 hours.\n' # Build the email mail = EmailMessage( subject=subject, body=body, from_email=email_from, to=to ) try: mail.send() result = True except: result = "An error occurred while trying to send the mail." return result # Builds the reset link for the email and puts the token into the database def build_reset_link(self, user, epochutc): # set up the data host = 'account-staging.ungleich.ch' tokengen = PasswordResetTokenGenerator() # create some noise for use in the tokengenerator pseudouser = PseudoUser() token = tokengen.make_token(pseudouser) buser = bytes(user, 'utf-8') userpart = b64encode(buser) # create entry into the database newdbentry = ResetToken(user=user, token=token, creation=epochutc) newdbentry.save() # set up the link link = 'https://%s/reset/%s/%s/' % (host, userpart.decode('utf-8'), token) return link # Catch the resetrequest URL and check it class ResetRequest(View): # Gets the URL with user in b64 and the token, and checks it # Also cleans the database def get(self, request, user=None, token=None): # Cleans up outdated tokens # If we expect quite a bit of old tokens, maybe somewhere else is better, # but for now we don't really expect many unused tokens self.clean_db() # If user and token are not supplied by django, it was called from somewhere else, so it's # invalid if user == None or token == None: return HttpResponse('Invalid URL.', status=404) # extract user from b64 format tmp_user = bytes(user, 'utf-8') user = b64decode(tmp_user) user_clean = user.decode('utf-8') # set checks_out = True if token is found in database dbentries = ResetToken.objects.all().filter(user=user_clean) for entry in dbentries: if entry.token == token: # found the token, now delete it since it's used checks_out = True entry.delete() # No token was found if not checks_out: return HttpResponse('Invalid URL.', status=404) # Token was found, supply the form else: return render(request, 'resetpasswordnew.html', { 'user': user_clean } ) # Gets the post form with the new password and sets it def post(self, request): service = 'reset the password' # get the supplied passwords password1 = request.POST.get("password1") password2 = request.POST.get("password2") # get the hidden value of user user = request.POST.get("user") # some checks over the supplied data if user == "" or not user: return render(request, 'error.html', { 'service': service, 'error': 'Something went wrong. Did you use the supplied form?' } ) if password1 == "" or not password1 or password2 == "" or not password2: return render(request, 'error.html', { 'service': service, 'error': 'Please supply a password and confirm it.' } ) if password1 != password2: return render(request, 'error.html', { 'service': service, 'error': 'The supplied passwords do not match.' } ) if len(password1) < 8: return render(request, 'error.html', { 'service': service, 'error': 'The password is too short, please use a longer one. At least 8 characters.' } ) # everything checks out, now change the password with get_pool().next() as rpc: pwd = r'%s' % password1 result = rpc.changepassword.change_password(user, pwd) # password change successfull if result == True: return render(request, 'changedpassword.html', { 'user': user } ) # Something went wrong while changing the password else: return render(request, 'error.html', { 'service': service, 'error': result } ) # Cleans up outdated tokens def clean_db(self): # cutoff time is set to 24h hours # using utcnow() to have no headache with timezones cutoff = int(datetime.utcnow().timestamp()) - (24*60*60) # Get all tokens older than 24 hours oldtokens = ResetToken.objects.all().filter(creation__lt=cutoff) for token in oldtokens: # delete all tokens older than 24 hours token.delete() return True # The logged in user can change the password here class ChangePassword(View): # Presents the page for a logged in user def get(self, request): if not request.user.is_authenticated: return render(request, 'mustbeloggedin.html') return render(request, 'changepassword.html', { 'user': request.user } ) # Does some checks on the supplied data and changes the password def post(self, request): # Variables for the error page urlname = 'change_password' service = 'change the password' if not request.user.is_authenticated: return render(request, 'mustbeloggedin.html') login(request, request.user) user = str(request.user) oldpassword = request.POST.get('oldpassword') check = authenticate(request, username=user, password=oldpassword) # Is the right password for the user supplied? if check is None: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for the user.' } ) password1 = request.POST.get('password1') password2 = request.POST.get('password2') # Are both passwords from the form the same? if password1 != password2: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Please check if you typed the same password both times for the new password' } ) # Check for password length if len(password1) < 8: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'The password is too short, please use a longer one. At least 8 characters.' } ) with get_pool().next() as rpc: # Trying to change the password pwd = r'%s' % password1 result = rpc.changepassword.change_password(user, pwd) # Password was changed if result == True: return render(request, 'changedpassword.html', { 'user': user } ) # Password not changed, instead got some kind of error else: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } ) # Deletes an account class DeleteAccount(View): # Show the basic form for deleting an account def get(self, request): return render(request, 'deleteaccount.html') # Reads the filled out form def post(self, request): # Variables for error page urlname = 'account_delete' service = 'delete an account' # Does the user exist? username = request.POST.get('username') if not check_user_exists(username): return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Unknown user.' } ) # Do user and password match? password = request.POST.get('password') pwd = r'%s' % password check = authenticate(request, username=username, password=pwd) if check is None: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': 'Wrong password for user.' } ) # Try to delete the user with get_pool().next() as rpc: result = rpc.deleteuser.delete_user(username) # User deleted if result == True: logout(request) return render(request, 'deleteduser.html', { 'user': username } ) # User not deleted, got some kind of error else: return render(request, 'error.html', { 'urlname': urlname, 'service': service, 'error': result } ) # Log out the session class LogOut(View): def get(self, request): logout(request) return HttpResponse("You have been logged out.", status=200) # TO be clarified # To trick the tokengenerator to work with us, because we don't really # have the expected user Class since we are reading the user from a form # We store the tokens and don't have to use the check function, # some one time data works fine. class LastLogin(): def replace(self, microsecond=0, tzinfo=None): return randint(1,100000) class PseudoUser(): # easiest way to handle the check for lastlogin last_login = LastLogin() # random alphanumeric strings for primary key and password, just used for token generation pk = ''.join(choice(string.ascii_letters + string.digits) for _ in range(20)) password = ''.join(choice(string.ascii_letters + string.digits) for _ in range(30))