import base64 import hashlib import random import ldap3 from django.conf import settings class LdapManager: __instance = None def __new__(cls): if LdapManager.__instance is None: LdapManager.__instance = object.__new__(cls) return LdapManager.__instance def __init__(self): """ Initialize the LDAP subsystem. """ self.rng = random.SystemRandom() self.server = ldap3.Server(settings.AUTH_LDAP_SERVER) def get_admin_conn(self): """ Return a bound :class:`ldap3.Connection` instance which has write permissions on the dn in which the user accounts reside. """ conn = self.get_conn(user=settings.LDAP_ADMIN_DN, password=settings.LDAP_ADMIN_PASSWORD, raise_exceptions=True) conn.bind() return conn def get_conn(self, **kwargs): """ Return an unbound :class:`ldap3.Connection` which talks to the configured LDAP server. The *kwargs* are passed to the constructor of :class:`ldap3.Connection` and can be used to set *user*, *password* and other useful arguments. """ return ldap3.Connection(self.server, **kwargs) def _ssha_password(self, password): """ Apply the SSHA password hashing scheme to the given *password*. *password* must be a :class:`bytes` object, containing the utf-8 encoded password. Return a :class:`bytes` object containing ``ascii``-compatible data which can be used as LDAP value, e.g. after armoring it once more using base64 or decoding it to unicode from ``ascii``. """ SALT_BYTES = 15 sha1 = hashlib.sha1() salt = self.rng.getrandbits(SALT_BYTES * 8).to_bytes(SALT_BYTES, "little") sha1.update(password) sha1.update(salt) digest = sha1.digest() passwd = b"{SSHA}" + base64.b64encode(digest + salt) return passwd def create_user(self, loginname, displayname, mail, password): """ Create a new user in the LDAP storage. *loginname* must be a unique, valid user id. It is generally safe to pass lower-case ascii letters here. The *loginname* of an account cannot be changed. *displayname* is the name which is shown to other users. This can be changed in the future. *mail* is a valid mail address of the user. *password* is the initial plain text password for the user. """ conn = self.get_admin_conn() try: conn.add( ("uid={uid}," + settings.LDAP_CUSTOMER_DN).format(uid=loginname), ["inetOrgPerson"], { "uid": [loginname], "cn": [displayname], "sn": ["XXX"], "givenName": ["XXX"], "mail": [mail], "userpassword": [self._ssha_password( password.encode("utf-8") )] } ) finally: conn.unbind() def change_password(self, user_dn, new_password): """ Changes the password of the user identified by user_dn :param user_dn: str The distinguished name for identifying the user :param new_password: str The new password string :return: True if password was changed successfully False otherwise """ conn = self.get_admin_conn() return_val = conn.modify( user_dn, { "userpassword": ( ldap3.MODIFY_REPLACE, [self._ssha_password(new_password.encode("utf-8"))] ) } ) conn.unbind() return return_val def check_user_exists(self, uid, is_customer=True): """ Check if the user with the given uid exists in the customer group. :param uid: str representing the user :param is_customer: bool representing whether the current user is a customer. By default, the user is a customer (assume) :return: True if the user exists otherwise return False """ conn = self.get_admin_conn() try: result = conn.search( settings.LDAP_CUSTOMER_DN if is_customer else settings.LDAP_USERS_DN, search_filter='(uid={uid})'.format(uid=uid) ) finally: conn.unbind() return result