#!/usr/bin/env python3 # ungleich glarus ag, 2025-01-23 import base58 import base64 import json import sys import argparse import requests import pprint class UngleichMatrixClient: def __init__(self, args): self.server = args.server_url self.room_id = args.room_id self.username = args.login_username self.password = args.login_password self.security_key = args.security_key self.access_token = False self.room_keys = False self.room_messages = [] self.matrix_url = {} self.matrix_url['login'] = f"{args.server_url}/_matrix/client/v3/login" self.matrix_url['room_keys'] = f"{args.server_url}/_matrix/client/v3/room_keys/keys?version=1" self.matrix_url['room_messages'] = f"{args.server_url}/_matrix/client/v3/rooms/{self.room_id}/messages" def login_to_server(self): login_data = { 'identifier': { "type": "m.id.user", "user": f"{self.username}" }, 'type': "m.login.password", 'device_id': "ungleich-matrix-client", 'initial_device_display_name' : "ungleich-matrix-client", 'password': f"{self.password}" } r = requests.post(self.matrix_url['login'], json=login_data) if not r.status_code == 200: raise Exception("Login Failed") return r def _ensure_logged_in(self): if not self.access_token: self.login_response = self.login_to_server() self.access_token = self.login_response.json()['access_token'] def get_room_keys(self): self._ensure_logged_in() params = { 'version': "1", 'access_token': self.access_token } if not self.room_keys: r = requests.get(self.matrix_url['room_keys'], params=params) self.room_keys = r.json() def get_room_messages(self): """ Get messages from a room, requires to use pagination! Continue until no end property is in the reply anyomer """ self._ensure_logged_in() params = { 'access_token': self.access_token } more_messages = True next_batch = "" while more_messages: if next_batch: params['from'] = next_batch r = requests.get(self.matrix_url['room_messages'], params=params) self.room_messages.append(r.json()) if 'end' in r.json(): next_batch = r.json()['end'] else: more_messages = False def get_messages(self): # self.get_room_keys() # print(self.room_keys) self.get_room_messages() for message in self.room_messages: print(message) # Get room_keys # http 'https://ungleich.matrix.ungleich.cloud/_matrix/client/v3/room_keys/keys' version==1 access_token=="$matrix_access_token" # jq '.rooms."!pkP......."' < key-backup.json > room-specific-keys.json # Get messages from room # Decrypt each message: # Retrieve the session key if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("--server-url", required=True, help="Matrix Server URL, i.e. https://your-server ") parser.add_argument("--room-id", required=True, help="ID of the room to get messages from, i.e. !...:your-matrix-domain ") parser.add_argument("--login-username", required=True, help="Username for logging into the server, i.e. @you:your-matrix-domain ") parser.add_argument("--login-password", required=True, help="Password for logging into the server, i.e. your-very-safe-password!! ") parser.add_argument("--security-key", required=True, help="Your security backup key, i.e. ABCf defg aaaa - ensure to quote as one argument! ") args = parser.parse_args() client = UngleichMatrixClient(args) client.get_messages()