phase in message decryption

This commit is contained in:
Nico Schottelius 2025-01-30 21:07:31 +01:00
commit fa7451c791

View file

@ -138,7 +138,7 @@ class UngleichMatrixClient:
# use + b'==') to expand padding https://stackoverflow.com/questions/2941995/python-ignore-incorrect-padding-error-when-base64-decoding
ephemeral_key_bytes = base64.b64decode(ephemeral_key + '==')
ephemeral_public_key = X25519PublicKey.from_public_bytes(ephemeral_key_bytes)
shared_key = private_key.exchange(ephemeral_public_key)
shared_key = self.security_private_key.exchange(ephemeral_public_key)
# when we have shared secret, use HDKF to get the AES part
# "Using the shared secret,
@ -186,6 +186,27 @@ class UngleichMatrixClient:
print("Signature likely incorrect")
raise Exception("Session key signature broken")
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(aes_iv))
decryptor = cipher.decryptor()
# use + b'==') to expand padding https://stackoverflow.com/questions/2941995/python-ignore-incorrect-padding-error-when-base64-decoding
encrypted_session_key_bytes = base64.b64decode(encrypted_session_key + '==')
session_key_bytes = decryptor.update(encrypted_session_key_bytes) + decryptor.finalize()
# Remove PKCS7 padding - block size 128 was guessed / tested to be correct
unpadder = padding.PKCS7(128).unpadder()
data = unpadder.update(session_key_bytes)
data += unpadder.finalize()
session_key_json_string = data.decode("utf8")
print(f"Unencrypted session key JSON: {session_key_json_string}")
session_key_json = json.loads(session_key_json_string)
session_key_base64 = session_key_json['session_key']
print("session key = {session_key_base64}, {length}".format(session_key_base64=session_key_base64, length=len(session_key_base64)))
return session_key
def decrypt_message(self, ciphertext, session_id):
room_key = self.room_keys['rooms'][self.room_id]['sessions']
@ -195,10 +216,15 @@ class UngleichMatrixClient:
ephemeral_key = room_key[session_id]['session_data']['ephemeral']
session_mac = room_key[session_id]['session_data']['mac']
session_key = self.decrypt_session_key(encrypted_session_key,
session_key_base64 = self.decrypt_session_key(encrypted_session_key,
ephemeral_key,
session_mac)
inbound_group = InboundGroupSession.import_session(session_key_base64)
plaintext = inbound_group.decrypt(message_ciphertext_base64)
print(f"Encrypted {ciphertext} = {plaintext}")
def decrypt_room_messages(self):
"""
Decrypt messages that are of type 'm.room.encrypted'