#!/usr/bin/env python3 # ungleich glarus ag, 2025-01-23 import base58 import base64 import json import argparse import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, padding from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers.algorithms import AES from secrets import token_bytes from cryptography.hazmat.primitives.hmac import HMAC from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey,X25519PublicKey from cryptography.hazmat.primitives.serialization import load_pem_private_key from olm import Account,InboundGroupSession class UngleichMatrixClient: def __init__(self, args): self.server = args.server_url self.room_id = args.room_id self.username = args.login_username self.password = args.login_password self.security_key_unparsed = args.security_key self.access_token = False self.room_keys = False self.room_messages = [] self.matrix_url = {} self.matrix_url['login'] = f"{args.server_url}/_matrix/client/v3/login" self.matrix_url['room_keys'] = f"{args.server_url}/_matrix/client/v3/room_keys/keys?version=1" self.matrix_url['room_messages'] = f"{args.server_url}/_matrix/client/v3/rooms/{self.room_id}/messages" def login_to_server(self): login_data = { 'identifier': { "type": "m.id.user", "user": f"{self.username}" }, 'type': "m.login.password", 'device_id': "ungleich-matrix-client", 'initial_device_display_name' : "ungleich-matrix-client", 'password': f"{self.password}" } r = requests.post(self.matrix_url['login'], json=login_data) if not r.status_code == 200: raise Exception("Login Failed") return r def _ensure_logged_in(self): if not self.access_token: self.login_response = self.login_to_server() self.access_token = self.login_response.json()['access_token'] def get_room_keys(self): self._ensure_logged_in() params = { 'version': "1", 'access_token': self.access_token } if not self.room_keys: r = requests.get(self.matrix_url['room_keys'], params=params) self.room_keys = r.json() def get_room_messages(self): """ Get messages from a room, requires to use pagination! Continue until no end property is in the reply anyomer """ self._ensure_logged_in() params = { 'access_token': self.access_token } more_messages = True next_batch = "" while more_messages: if next_batch: params['from'] = next_batch r = requests.get(self.matrix_url['room_messages'], params=params) for message in r.json()['chunk']: self.room_messages.append(message) if 'end' in r.json(): next_batch = r.json()['end'] else: more_messages = False def parse_security_key(self): security_key = self.security_key_unparsed.replace(" ", "") security_key_binary = base58.b58decode(security_key) self.security_key = security_key_binary # without useless bytes and without parity self.real_security_key = security_key_binary[2:-1] def check_security_key_parity(self): parity_byte = self.security_key[-1] calculated_parity=0 for key_byte in self.security_key[:-1]: calculated_parity ^= key_byte print(f"Parity byte = {parity_byte} calculated parity = {calculated_parity}") if parity_byte != calculated_parity: raise Exception("Security key is broken") def setup_security_key_pair(self): self.security_private_key = X25519PrivateKey.from_private_bytes(self.real_security_key) print(f"Private key = {self.security_private_key}") self.security_public_key = self.security_private_key.public_key() print(f"Public key = {self.security_public_key}") def decrypt_message(self, ciphertext, session_id): room_key = self.room_keys['rooms'][self.room_id]['sessions'] print(f"Messages key data: {room_key}") def decrypt_room_messages(self): """ Decrypt messages that are of type 'm.room.encrypted' {'type': 'm.room.encrypted', 'room_id': '!fDjvLemgiriPvvWEeG:ungleich.ch', 'sender': '@nico:ungleich.ch', 'content': {'algorithm': 'm.megolm.v1.aes-sha2', 'ciphertext': 'AwgBEqABNL8ztRQA67gXxkpbeiSp3zkJTkPXUwjQh0VnnFh6+Tff/dWjfF2rYu9q7MhG7BQgtaAoBoFNot8bPan23Y8Niip714ntI7t89F1t79TkUOcn5H0STydqGOOoZqnDf/l63ggWfD8EbudFSxoO7sJLL9iGO2+9HYWTMdTFAhcHg5c/k3aG+fQrXkbv+5afZXH3CxKnWxe4ukkoGMaDAo7jm3l2killUJ/J6NynCiJ/XinFWIdbRXSIUx3cwnFS/KWvdVmhu2iXYFtIvV65UE/JFhDjZ+rCH7lZ9DBD5jKjsVPQJqtFule0CQ', 'device_id': 'SSAUACUQKJ', 'sender_key': 'pEDLuq1RlDI2bxO6/lx9OQZt0NYma+gs6jg3QVYl4Vk', 'session_id': 'nkx3WnUpLL7hblZ9LNBkx0RPrKp3weX2o/aAgp7hx0c'}, 'origin_server_ts': 1738264304685, 'unsigned': {'membership': 'join', 'age': 126031}, 'event_id': '$k9dYdD6b5eG_AZaZtO6imeHU8HGBpiZt3dqM8C3T8-8', 'user_id': '@nico:ungleich.ch', 'age': 126031} """ for message in self.room_messages: if message['type'] == 'm.room.encrypted': sender = message['sender'] ciphertext = message['content']['ciphertext'] session_id = message['content']['session_id'] plaintext = self.decrypt_message(ciphertext, session_id) def get_messages(self): self.parse_security_key() self.check_security_key_parity() self.setup_security_key_pair() self.get_room_messages() for message in self.room_messages: print(message) self.get_room_keys() self.decrypt_room_messages() # Decrypt each message: # Retrieve the session key if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--server-url", required=True, help="Matrix Server URL, i.e. https://your-server ") parser.add_argument("--room-id", required=True, help="ID of the room to get messages from, i.e. !...:your-matrix-domain ") parser.add_argument("--login-username", required=True, help="Username for logging into the server, i.e. @you:your-matrix-domain ") parser.add_argument("--login-password", required=True, help="Password for logging into the server, i.e. your-very-safe-password!! ") parser.add_argument("--security-key", required=True, help="Your security backup key, i.e. ABCf defg aaaa - ensure to quote as one argument! ") args = parser.parse_args() client = UngleichMatrixClient(args) client.get_messages()