From 64fcc3e61349b8a5ce3df31404b6316eea90f324 Mon Sep 17 00:00:00 2001 From: "M.Ravi" Date: Tue, 21 Nov 2023 13:08:29 +0530 Subject: [PATCH] Introduce dotenv and ldap --- dynamicweb2/ldap_manager.py | 282 ++++++++++++++++++++++++++++++++++++ dynamicweb2/settings.py | 36 +++++ requirements.txt | 4 + 3 files changed, 322 insertions(+) create mode 100644 dynamicweb2/ldap_manager.py diff --git a/dynamicweb2/ldap_manager.py b/dynamicweb2/ldap_manager.py new file mode 100644 index 0000000..d40e931 --- /dev/null +++ b/dynamicweb2/ldap_manager.py @@ -0,0 +1,282 @@ +import base64 +import hashlib +import random +import ldap3 +import logging +import unicodedata + +from django.conf import settings + +logger = logging.getLogger(__name__) + + +class LdapManager: + __instance = None + + def __new__(cls): + if LdapManager.__instance is None: + LdapManager.__instance = object.__new__(cls) + return LdapManager.__instance + + def __init__(self): + """ + Initialize the LDAP subsystem. + """ + self.rng = random.SystemRandom() + self.server = ldap3.Server(settings.AUTH_LDAP_SERVER) + + def get_admin_conn(self): + """ + Return a bound :class:`ldap3.Connection` instance which has write + permissions on the dn in which the user accounts reside. + """ + conn = self.get_conn(user=settings.LDAP_ADMIN_DN, + password=settings.LDAP_ADMIN_PASSWORD, + raise_exceptions=True) + conn.bind() + return conn + + def get_conn(self, **kwargs): + """ + Return an unbound :class:`ldap3.Connection` which talks to the configured + LDAP server. + + The *kwargs* are passed to the constructor of :class:`ldap3.Connection` and + can be used to set *user*, *password* and other useful arguments. + """ + return ldap3.Connection(self.server, **kwargs) + + def _ssha_password(self, password): + """ + Apply the SSHA password hashing scheme to the given *password*. + *password* must be a :class:`bytes` object, containing the utf-8 + encoded password. + + Return a :class:`bytes` object containing ``ascii``-compatible data + which can be used as LDAP value, e.g. after armoring it once more using + base64 or decoding it to unicode from ``ascii``. + """ + SALT_BYTES = 15 + + sha1 = hashlib.sha1() + salt = self.rng.getrandbits(SALT_BYTES * 8).to_bytes(SALT_BYTES, "little") + sha1.update(password) + sha1.update(salt) + + digest = sha1.digest() + passwd = b"{SSHA}" + base64.b64encode(digest + salt) + return passwd + + def create_user(self, user, password, firstname, lastname, email): + conn = self.get_admin_conn() + uidNumber = self._get_max_uid() + 1 + logger.debug("uidNumber={uidNumber}".format(uidNumber=uidNumber)) + user_exists = True + while user_exists: + user_exists, _ = self.check_user_exists( + "", + '(&(objectClass=inetOrgPerson)(objectClass=posixAccount)' + '(objectClass=top)(uidNumber={uidNumber}))'.format( + uidNumber=uidNumber + ) + ) + if user_exists: + logger.debug( + "{uid} exists. Trying next.".format(uid=uidNumber) + ) + uidNumber += 1 + logger.debug("{uid} does not exist. Using it".format(uid=uidNumber)) + self._set_max_uid(uidNumber) + try: + uid = user + conn.add("uid={uid},{customer_dn}".format( + uid=uid, customer_dn=settings.LDAP_CUSTOMER_DN + ), + ["inetOrgPerson", "posixAccount", "ldapPublickey"], + { + "uid": [uid], + "sn": [lastname.encode("utf-8")], + "givenName": [firstname.encode("utf-8")], + "cn": [uid], + "displayName": ["{} {}".format(firstname, lastname).encode("utf-8")], + "uidNumber": [str(uidNumber)], + "gidNumber": [str(settings.LDAP_CUSTOMER_GROUP_ID)], + "loginShell": ["/bin/bash"], + "homeDirectory": ["/home/{}".format(unicodedata.normalize('NFKD', user).encode('ascii','ignore'))], + "mail": email.encode("utf-8"), + "userPassword": [self._ssha_password( + password.encode("utf-8") + )] + } + ) + logger.debug('Created user %s %s' % (user.encode('utf-8'), + uidNumber)) + except Exception as ex: + logger.debug('Could not create user %s' % user.encode('utf-8')) + logger.error("Exception: " + str(ex)) + raise Exception(ex) + finally: + conn.unbind() + + def change_password(self, uid, new_password): + """ + Changes the password of the user identified by user_dn + + :param uid: str The uid that identifies the user + :param new_password: str The new password string + :return: True if password was changed successfully False otherwise + """ + conn = self.get_admin_conn() + + # Make sure the user exists first to change his/her details + user_exists, entries = self.check_user_exists( + uid=uid, + search_base=settings.ENTIRE_SEARCH_BASE + ) + return_val = False + if user_exists: + try: + return_val = conn.modify( + entries[0].entry_dn, + { + "userpassword": ( + ldap3.MODIFY_REPLACE, + [self._ssha_password(new_password.encode("utf-8"))] + ) + } + ) + except Exception as ex: + logger.error("Exception: " + str(ex)) + else: + logger.error("User {} not found".format(uid)) + + conn.unbind() + return return_val + + + def change_user_details(self, uid, details): + """ + Updates the user details as per given values in kwargs of the user + identified by user_dn. + + Assumes that all attributes passed in kwargs are valid. + + :param uid: str The uid that identifies the user + :param details: dict A dictionary containing the new values + :return: True if user details were updated successfully False otherwise + """ + conn = self.get_admin_conn() + + # Make sure the user exists first to change his/her details + user_exists, entries = self.check_user_exists( + uid=uid, + search_base=settings.ENTIRE_SEARCH_BASE + ) + + return_val = False + if user_exists: + details_dict = {k: (ldap3.MODIFY_REPLACE, [v.encode("utf-8")]) for + k, v in details.items()} + try: + return_val = conn.modify(entries[0].entry_dn, details_dict) + msg = "success" + except Exception as ex: + msg = str(ex) + logger.error("Exception: " + msg) + finally: + conn.unbind() + else: + msg = "User {} not found".format(uid) + logger.error(msg) + conn.unbind() + return return_val, msg + + def check_user_exists(self, uid, search_filter="", attributes=None, + search_base=settings.LDAP_CUSTOMER_DN, search_attr="uid"): + """ + Check if the user with the given uid exists in the customer group. + + :param uid: str representing the user + :param search_filter: str representing the filter condition to find + users. If its empty, the search finds the user with + the given uid. + :param attributes: list A list of str representing all the attributes + to be obtained in the result entries + :param search_base: str + :return: tuple (bool, [ldap3.abstract.entry.Entry ..]) + A bool indicating if the user exists + A list of all entries obtained in the search + """ + conn = self.get_admin_conn() + entries = [] + try: + result = conn.search( + search_base=search_base, + search_filter=search_filter if len(search_filter) > 0 else + '(uid={uid})'.format(uid=uid), + attributes=attributes + ) + entries = conn.entries + finally: + conn.unbind() + return result, entries + + def delete_user(self, uid): + """ + Deletes the user with the given uid from ldap + + :param uid: str representing the user + :return: True if the delete was successful False otherwise + """ + conn = self.get_admin_conn() + try: + return_val = conn.delete( + ("uid={uid}," + settings.LDAP_CUSTOMER_DN).format(uid=uid), + ) + msg = "success" + except Exception as ex: + msg = str(ex) + logger.error("Exception: " + msg) + return_val = False + finally: + conn.unbind() + return return_val, msg + + def _set_max_uid(self, max_uid): + """ + a utility function to save max_uid value to a file + + :param max_uid: an integer representing the max uid + :return: + """ + with open(settings.LDAP_MAX_UID_FILE_PATH, 'w+') as handler: + handler.write(str(max_uid)) + + def _get_max_uid(self): + """ + A utility function to read the max uid value that was previously set + + :return: An integer representing the max uid value that was previously + set + """ + try: + with open(settings.LDAP_MAX_UID_FILE_PATH, 'r+') as handler: + try: + return_value = int(handler.read()) + except ValueError as ve: + logger.error( + "Error reading int value from {}. {}" + "Returning default value {} instead".format( + settings.LDAP_MAX_UID_FILE_PATH, + str(ve), + settings.LDAP_DEFAULT_START_UID + ) + ) + return_value = settings.LDAP_DEFAULT_START_UID + return return_value + except FileNotFoundError as fnfe: + logger.error("File not found : " + str(fnfe)) + return_value = settings.LDAP_DEFAULT_START_UID + logger.error("So, returning UID={}".format(return_value)) + return return_value + diff --git a/dynamicweb2/settings.py b/dynamicweb2/settings.py index a220fe9..9e81dbd 100644 --- a/dynamicweb2/settings.py +++ b/dynamicweb2/settings.py @@ -9,6 +9,8 @@ https://docs.djangoproject.com/en/4.2/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/4.2/ref/settings/ """ +import dotenv +import os from pathlib import Path @@ -122,3 +124,37 @@ STATIC_URL = 'static/' # https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' + + +def env(env_name): + return os.environ.get(env_name) + + +PROJECT_DIR = os.path.abspath( + os.path.join(os.path.dirname(__file__), ".."), +) + +dotenv.load_dotenv("{0}/.env".format(PROJECT_DIR)) + +# LDAP setup +LDAP_ADMIN_DN = env('LDAP_ADMIN_DN') +LDAP_ADMIN_PASSWORD = env('LDAP_ADMIN_PASSWORD') +AUTH_LDAP_SERVER = env('LDAPSERVER') + +LDAP_CUSTOMER_DN = env('LDAP_CUSTOMER_DN') +LDAP_CUSTOMER_GROUP_ID = int(env('LDAP_CUSTOMER_GROUP_ID')) +LDAP_MAX_UID_FILE_PATH = os.environ.get('LDAP_MAX_UID_FILE_PATH', + os.path.join(os.path.abspath(os.path.dirname(__file__)), 'ldap_max_uid_file') + ) +LDAP_DEFAULT_START_UID = int(env('LDAP_DEFAULT_START_UID')) + +# Search union over OUs +AUTH_LDAP_START_TLS = bool(os.environ.get('LDAP_USE_TLS', False)) + +ENTIRE_SEARCH_BASE = env("ENTIRE_SEARCH_BASE") + +AUTH_LDAP_USER_ATTR_MAP = { + "first_name": "givenName", + "last_name": "sn", + "email": "mail" +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a543531..c2f848d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,7 @@ asgiref==3.7.2 Django==4.2.7 +ldap3==2.9.1 +pyasn1==0.5.1 +python-dotenv==1.0.0 sqlparse==0.4.4 +typing_extensions==4.8.0