280 lines
11 KiB
Python
Executable file
280 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# ungleich glarus ag, 2025-01-23
|
|
|
|
import base58
|
|
import base64
|
|
import json
|
|
import argparse
|
|
import requests
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes, padding
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
|
from secrets import token_bytes
|
|
from cryptography.hazmat.primitives.hmac import HMAC
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey,X25519PublicKey
|
|
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
|
|
|
from olm import Account,InboundGroupSession
|
|
|
|
|
|
class UngleichMatrixClient:
|
|
def __init__(self, args):
|
|
self.server = args.server_url
|
|
self.room_id = args.room_id
|
|
self.username = args.login_username
|
|
self.password = args.login_password
|
|
self.security_key_unparsed = args.security_key
|
|
|
|
self.access_token = False
|
|
self.room_keys = False
|
|
self.room_messages = []
|
|
|
|
self.matrix_url = {}
|
|
self.matrix_url['login'] = f"{args.server_url}/_matrix/client/v3/login"
|
|
self.matrix_url['room_keys'] = f"{args.server_url}/_matrix/client/v3/room_keys/keys?version=1"
|
|
self.matrix_url['room_messages'] = f"{args.server_url}/_matrix/client/v3/rooms/{self.room_id}/messages"
|
|
|
|
|
|
def login_to_server(self):
|
|
login_data = {
|
|
'identifier': {
|
|
"type": "m.id.user",
|
|
"user": f"{self.username}"
|
|
},
|
|
'type': "m.login.password",
|
|
'device_id': "ungleich-matrix-client",
|
|
'initial_device_display_name' : "ungleich-matrix-client",
|
|
'password': f"{self.password}"
|
|
}
|
|
r = requests.post(self.matrix_url['login'], json=login_data)
|
|
|
|
if not r.status_code == 200:
|
|
raise Exception("Login Failed")
|
|
return r
|
|
|
|
def _ensure_logged_in(self):
|
|
if not self.access_token:
|
|
self.login_response = self.login_to_server()
|
|
self.access_token = self.login_response.json()['access_token']
|
|
|
|
def get_room_keys(self):
|
|
"""
|
|
We assume version == 1 is correct because that's what's seen in reality
|
|
In theory we need to query the current version on the server first.
|
|
"""
|
|
self._ensure_logged_in()
|
|
|
|
params = {
|
|
'version': "1",
|
|
'access_token': self.access_token
|
|
}
|
|
|
|
if not self.room_keys:
|
|
print("Getting room keys ... this can take a while ...")
|
|
r = requests.get(self.matrix_url['room_keys'],
|
|
params=params)
|
|
self.room_keys = r.json()
|
|
|
|
def get_room_messages(self):
|
|
"""
|
|
Get messages from a room, requires to use pagination!
|
|
Continue until no end property is in the reply anyomer
|
|
|
|
"""
|
|
self._ensure_logged_in()
|
|
|
|
params = {
|
|
'access_token': self.access_token
|
|
}
|
|
|
|
more_messages = True
|
|
next_batch = ""
|
|
while more_messages:
|
|
if next_batch:
|
|
params['from'] = next_batch
|
|
|
|
r = requests.get(self.matrix_url['room_messages'],
|
|
params=params)
|
|
|
|
for message in r.json()['chunk']:
|
|
self.room_messages.append(message)
|
|
|
|
if 'end' in r.json():
|
|
next_batch = r.json()['end']
|
|
else:
|
|
more_messages = False
|
|
|
|
def parse_security_key(self):
|
|
security_key = self.security_key_unparsed.replace(" ", "")
|
|
security_key_binary = base58.b58decode(security_key)
|
|
|
|
self.security_key = security_key_binary
|
|
|
|
# without useless bytes and without parity
|
|
self.real_security_key = security_key_binary[2:-1]
|
|
|
|
|
|
def check_security_key_parity(self):
|
|
parity_byte = self.security_key[-1]
|
|
calculated_parity=0
|
|
for key_byte in self.security_key[:-1]:
|
|
calculated_parity ^= key_byte
|
|
|
|
print(f"Parity byte = {parity_byte} calculated parity = {calculated_parity}")
|
|
|
|
if parity_byte != calculated_parity:
|
|
raise Exception("Security key is broken")
|
|
|
|
def setup_security_key_pair(self):
|
|
self.security_private_key = X25519PrivateKey.from_private_bytes(self.real_security_key)
|
|
print(f"Private key = {self.security_private_key}")
|
|
self.security_public_key = self.security_private_key.public_key()
|
|
print(f"Public key = {self.security_public_key}")
|
|
|
|
|
|
def decrypt_session_key(self, encrypted_session_key, ephemeral_key, session_mac):
|
|
|
|
# Construct the public ephemeral key
|
|
# use + b'==') to expand padding https://stackoverflow.com/questions/2941995/python-ignore-incorrect-padding-error-when-base64-decoding
|
|
ephemeral_key_bytes = base64.b64decode(ephemeral_key + '==')
|
|
ephemeral_public_key = X25519PublicKey.from_public_bytes(ephemeral_key_bytes)
|
|
|
|
# This is effectively ECDH provided by cryptography library
|
|
shared_key = self.security_private_key.exchange(ephemeral_public_key)
|
|
|
|
# when we have shared secret, use HDKF to get the AES part
|
|
# "Using the shared secret,
|
|
# generate 80 bytes
|
|
# by performing an HKDF
|
|
# using SHA-256 as the hash,
|
|
# with a salt of 32 bytes of 0,
|
|
# and with the empty string as the info.
|
|
|
|
# The first 32 bytes are used as the AES key,
|
|
# the next 32 bytes are used as the MAC key,
|
|
# and the last 16 bytes are used as the AES initialization vector."
|
|
# Using a key derivation function
|
|
derived_key = HKDF(
|
|
algorithm=hashes.SHA256(),
|
|
length=80,
|
|
salt=bytes(32),
|
|
info=b'',
|
|
).derive(shared_key)
|
|
|
|
print(f"Derived key = %s, len=%s" % (derived_key, len(derived_key) ))
|
|
|
|
aes_key = derived_key[:32]
|
|
mac_key = derived_key[32:64]
|
|
aes_iv = derived_key[64:]
|
|
|
|
print("AES key = {0} / len = {1}".format(aes_key, len(aes_key)))
|
|
print("Mac key = {0} / len = {1}".format(mac_key, len(mac_key)))
|
|
print("AES IV = {0} / len = {1}".format(aes_iv, len(aes_iv)))
|
|
|
|
# Pass an empty string through HMAC-SHA-256 using the MAC key generated above. The first 8 bytes of the resulting MAC are base64-encoded, and become the mac property of the session_data.
|
|
|
|
# hashed message authentication code = HMAC
|
|
# This basically allows us to check if we derived the correct key
|
|
mac = HMAC(mac_key, hashes.SHA256())
|
|
mac.update(b'')
|
|
|
|
# only use first 8 bytes
|
|
signature = mac.finalize()[:8]
|
|
print(f"Calculated signature over empty string = {signature}")
|
|
|
|
session_signature = base64.b64decode(session_mac + '==')
|
|
print(f"Session signature = {session_signature}")
|
|
|
|
if signature == session_signature:
|
|
print("Signature seems to be correct")
|
|
else:
|
|
print("Signature likely incorrect")
|
|
raise Exception("Session key signature broken")
|
|
|
|
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(aes_iv))
|
|
decryptor = cipher.decryptor()
|
|
|
|
# use + b'==') to expand padding https://stackoverflow.com/questions/2941995/python-ignore-incorrect-padding-error-when-base64-decoding
|
|
encrypted_session_key_bytes = base64.b64decode(encrypted_session_key + '==')
|
|
session_key_bytes = decryptor.update(encrypted_session_key_bytes) + decryptor.finalize()
|
|
|
|
# Remove PKCS7 padding - block size 128 was guessed / tested to be correct
|
|
# Needs to be verified - it should in theory be 256
|
|
unpadder = padding.PKCS7(256).unpadder()
|
|
data = unpadder.update(session_key_bytes)
|
|
data += unpadder.finalize()
|
|
|
|
session_key_json_string = data.decode("utf8")
|
|
|
|
print(f"Unencrypted session key JSON: {session_key_json_string}")
|
|
session_key_json = json.loads(session_key_json_string)
|
|
session_key_base64 = session_key_json['session_key']
|
|
|
|
print("session key = {session_key_base64}, {length}".format(session_key_base64=session_key_base64, length=len(session_key_base64)))
|
|
|
|
return session_key_base64
|
|
|
|
def decrypt_message(self, ciphertext, session_id):
|
|
room_key = self.room_keys['rooms'][self.room_id]['sessions']
|
|
print(f"Messages key data: {room_key}")
|
|
|
|
encrypted_session_key = room_key[session_id]['session_data']['ciphertext']
|
|
ephemeral_key = room_key[session_id]['session_data']['ephemeral']
|
|
session_mac = room_key[session_id]['session_data']['mac']
|
|
|
|
session_key_base64 = self.decrypt_session_key(encrypted_session_key,
|
|
ephemeral_key,
|
|
session_mac)
|
|
|
|
inbound_group = InboundGroupSession.import_session(session_key_base64)
|
|
plaintext = inbound_group.decrypt(ciphertext)
|
|
|
|
print(f"Encrypted message {ciphertext} = {plaintext}")
|
|
|
|
def decrypt_room_messages(self):
|
|
"""
|
|
Decrypt messages that are of type 'm.room.encrypted'
|
|
|
|
{'type': 'm.room.encrypted', 'room_id': '!fDjvLemgiriPvvWEeG:ungleich.ch', 'sender': '@nico:ungleich.ch', 'content': {'algorithm': 'm.megolm.v1.aes-sha2', 'ciphertext': 'AwgBEqABNL8ztRQA67gXxkpbeiSp3zkJTkPXUwjQh0VnnFh6+Tff/dWjfF2rYu9q7MhG7BQgtaAoBoFNot8bPan23Y8Niip714ntI7t89F1t79TkUOcn5H0STydqGOOoZqnDf/l63ggWfD8EbudFSxoO7sJLL9iGO2+9HYWTMdTFAhcHg5c/k3aG+fQrXkbv+5afZXH3CxKnWxe4ukkoGMaDAo7jm3l2killUJ/J6NynCiJ/XinFWIdbRXSIUx3cwnFS/KWvdVmhu2iXYFtIvV65UE/JFhDjZ+rCH7lZ9DBD5jKjsVPQJqtFule0CQ', 'device_id': 'SSAUACUQKJ', 'sender_key': 'pEDLuq1RlDI2bxO6/lx9OQZt0NYma+gs6jg3QVYl4Vk', 'session_id': 'nkx3WnUpLL7hblZ9LNBkx0RPrKp3weX2o/aAgp7hx0c'}, 'origin_server_ts': 1738264304685, 'unsigned': {'membership': 'join', 'age': 126031}, 'event_id': '$k9dYdD6b5eG_AZaZtO6imeHU8HGBpiZt3dqM8C3T8-8', 'user_id': '@nico:ungleich.ch', 'age': 126031}
|
|
"""
|
|
|
|
for message in self.room_messages:
|
|
if message['type'] == 'm.room.encrypted':
|
|
sender = message['sender']
|
|
ciphertext = message['content']['ciphertext']
|
|
session_id = message['content']['session_id']
|
|
|
|
plaintext = self.decrypt_message(ciphertext, session_id)
|
|
|
|
|
|
def get_messages(self):
|
|
self.parse_security_key()
|
|
self.check_security_key_parity()
|
|
self.setup_security_key_pair()
|
|
|
|
self.get_room_messages()
|
|
for message in self.room_messages:
|
|
print(message)
|
|
self.get_room_keys()
|
|
self.decrypt_room_messages()
|
|
|
|
|
|
# Decrypt each message:
|
|
# Retrieve the session key
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--server-url", required=True, help="Matrix Server URL, i.e. https://your-server ")
|
|
parser.add_argument("--room-id", required=True, help="ID of the room to get messages from, i.e. !...:your-matrix-domain ")
|
|
parser.add_argument("--login-username", required=True, help="Username for logging into the server, i.e. @you:your-matrix-domain ")
|
|
parser.add_argument("--login-password", required=True, help="Password for logging into the server, i.e. your-very-safe-password!! ")
|
|
parser.add_argument("--security-key", required=True, help="Your security backup key, i.e. ABCf defg aaaa - ensure to quote as one argument! ")
|
|
|
|
args = parser.parse_args()
|
|
client = UngleichMatrixClient(args)
|
|
client.get_messages()
|