ungleich-user/dal/ungleich_ldap.py

144 lines
4.6 KiB
Python

import base64
import hashlib
import random
import ldap3
from django.conf import settings
class LdapManager:
__instance = None
def __new__(cls):
if LdapManager.__instance is None:
LdapManager.__instance = object.__new__(cls)
return LdapManager.__instance
def __init__(self):
"""
Initialize the LDAP subsystem.
"""
self.rng = random.SystemRandom()
self.server = ldap3.Server(settings.AUTH_LDAP_SERVER)
def get_admin_conn(self):
"""
Return a bound :class:`ldap3.Connection` instance which has write
permissions on the dn in which the user accounts reside.
"""
conn = self.get_conn(user=settings.LDAP_ADMIN_DN,
password=settings.LDAP_ADMIN_PASSWORD,
raise_exceptions=True)
conn.bind()
return conn
def get_conn(self, **kwargs):
"""
Return an unbound :class:`ldap3.Connection` which talks to the configured
LDAP server.
The *kwargs* are passed to the constructor of :class:`ldap3.Connection` and
can be used to set *user*, *password* and other useful arguments.
"""
return ldap3.Connection(self.server, **kwargs)
def _ssha_password(self, password):
"""
Apply the SSHA password hashing scheme to the given *password*.
*password* must be a :class:`bytes` object, containing the utf-8
encoded password.
Return a :class:`bytes` object containing ``ascii``-compatible data
which can be used as LDAP value, e.g. after armoring it once more using
base64 or decoding it to unicode from ``ascii``.
"""
SALT_BYTES = 15
sha1 = hashlib.sha1()
salt = self.rng.getrandbits(SALT_BYTES * 8).to_bytes(SALT_BYTES,
"little")
sha1.update(password)
sha1.update(salt)
digest = sha1.digest()
passwd = b"{SSHA}" + base64.b64encode(digest + salt)
return passwd
def create_user(self, loginname, displayname, mail, password):
"""
Create a new user in the LDAP storage.
*loginname* must be a unique, valid user id. It is generally safe to
pass lower-case ascii letters here. The *loginname* of an account
cannot be changed.
*displayname* is the name which is shown to other users. This can be
changed in the future.
*mail* is a valid mail address of the user.
*password* is the initial plain text password for the user.
"""
conn = self.get_admin_conn()
try:
conn.add(
("uid={uid}," + settings.LDAP_CUSTOMER_DN).format(uid=loginname),
["inetOrgPerson"],
{
"uid": [loginname],
"cn": [displayname],
"sn": ["XXX"],
"givenName": ["XXX"],
"mail": [mail],
"userpassword": [self._ssha_password(
password.encode("utf-8")
)]
}
)
finally:
conn.unbind()
def change_password(self, user_dn, new_password):
"""
Changes the password of the user identified by user_dn
:param user_dn: str The distinguished name for identifying the user
:param new_password: str The new password string
:return: True if password was changed successfully False otherwise
"""
conn = self.get_admin_conn()
return_val = conn.modify(
user_dn,
{
"userpassword": (
ldap3.MODIFY_REPLACE,
[self._ssha_password(new_password.encode("utf-8"))]
)
}
)
conn.unbind()
return return_val
def check_user_exists(self, uid, is_customer=True):
"""
Check if the user with the given uid exists in the customer group.
:param uid: str representing the user
:param is_customer: bool representing whether the current user is a
customer. By default, the user is a customer (assume)
:return: True if the user exists otherwise return False
"""
conn = self.get_admin_conn()
try:
result = conn.search(
settings.LDAP_CUSTOMER_DN if is_customer else settings.LDAP_USERS_DN,
search_filter='(uid={uid})'.format(uid=uid)
)
finally:
conn.unbind()
return result