2025-01-30 19:30:46 +01:00
#!/usr/bin/env python3
# ungleich glarus ag, 2025-01-23
import base58
import base64
import json
import argparse
import requests
2025-01-30 20:35:14 +01:00
from cryptography . hazmat . backends import default_backend
from cryptography . hazmat . primitives import hashes , padding
from cryptography . hazmat . primitives . asymmetric import ec
from cryptography . hazmat . primitives . kdf . hkdf import HKDF
from cryptography . hazmat . primitives . ciphers import Cipher , algorithms , modes
from cryptography . hazmat . primitives . ciphers . algorithms import AES
from secrets import token_bytes
from cryptography . hazmat . primitives . hmac import HMAC
from cryptography . hazmat . primitives . asymmetric . x25519 import X25519PrivateKey , X25519PublicKey
from cryptography . hazmat . primitives . serialization import load_pem_private_key
from olm import Account , InboundGroupSession
2025-01-30 19:30:46 +01:00
class UngleichMatrixClient :
def __init__ ( self , args ) :
self . server = args . server_url
self . room_id = args . room_id
self . username = args . login_username
self . password = args . login_password
2025-01-30 20:35:14 +01:00
self . security_key_unparsed = args . security_key
2025-01-30 19:30:46 +01:00
self . access_token = False
2025-01-30 20:12:39 +01:00
self . room_keys = False
self . room_messages = [ ]
2025-01-30 19:30:46 +01:00
self . matrix_url = { }
self . matrix_url [ ' login ' ] = f " { args . server_url } /_matrix/client/v3/login "
2025-01-30 20:12:39 +01:00
self . matrix_url [ ' room_keys ' ] = f " { args . server_url } /_matrix/client/v3/room_keys/keys?version=1 "
self . matrix_url [ ' room_messages ' ] = f " { args . server_url } /_matrix/client/v3/rooms/ { self . room_id } /messages "
2025-01-30 19:30:46 +01:00
def login_to_server ( self ) :
login_data = {
' identifier ' : {
" type " : " m.id.user " ,
" user " : f " { self . username } "
} ,
' type ' : " m.login.password " ,
' device_id ' : " ungleich-matrix-client " ,
' initial_device_display_name ' : " ungleich-matrix-client " ,
' password ' : f " { self . password } "
}
r = requests . post ( self . matrix_url [ ' login ' ] , json = login_data )
if not r . status_code == 200 :
raise Exception ( " Login Failed " )
return r
2025-01-30 20:12:39 +01:00
def _ensure_logged_in ( self ) :
if not self . access_token :
self . login_response = self . login_to_server ( )
self . access_token = self . login_response . json ( ) [ ' access_token ' ]
def get_room_keys ( self ) :
2025-01-31 15:25:10 +01:00
"""
We assume version == 1 is correct because that ' s what ' s seen in reality
In theory we need to query the current version on the server first .
"""
2025-01-30 20:12:39 +01:00
self . _ensure_logged_in ( )
2025-01-30 19:30:46 +01:00
2025-01-30 20:12:39 +01:00
params = {
' version ' : " 1 " ,
' access_token ' : self . access_token
}
if not self . room_keys :
2025-01-30 21:30:46 +01:00
print ( " Getting room keys ... this can take a while ... " )
2025-01-30 20:12:39 +01:00
r = requests . get ( self . matrix_url [ ' room_keys ' ] ,
params = params )
self . room_keys = r . json ( )
def get_room_messages ( self ) :
"""
Get messages from a room , requires to use pagination !
Continue until no end property is in the reply anyomer
"""
self . _ensure_logged_in ( )
params = {
' access_token ' : self . access_token
}
more_messages = True
next_batch = " "
while more_messages :
if next_batch :
params [ ' from ' ] = next_batch
2025-01-30 19:30:46 +01:00
2025-01-30 20:12:39 +01:00
r = requests . get ( self . matrix_url [ ' room_messages ' ] ,
params = params )
2025-01-30 20:35:14 +01:00
for message in r . json ( ) [ ' chunk ' ] :
self . room_messages . append ( message )
2025-01-30 19:30:46 +01:00
2025-01-30 20:12:39 +01:00
if ' end ' in r . json ( ) :
next_batch = r . json ( ) [ ' end ' ]
else :
more_messages = False
2025-01-30 19:30:46 +01:00
2025-01-30 20:35:14 +01:00
def parse_security_key ( self ) :
security_key = self . security_key_unparsed . replace ( " " , " " )
security_key_binary = base58 . b58decode ( security_key )
self . security_key = security_key_binary
# without useless bytes and without parity
self . real_security_key = security_key_binary [ 2 : - 1 ]
def check_security_key_parity ( self ) :
parity_byte = self . security_key [ - 1 ]
calculated_parity = 0
for key_byte in self . security_key [ : - 1 ] :
calculated_parity ^ = key_byte
print ( f " Parity byte = { parity_byte } calculated parity = { calculated_parity } " )
if parity_byte != calculated_parity :
raise Exception ( " Security key is broken " )
def setup_security_key_pair ( self ) :
self . security_private_key = X25519PrivateKey . from_private_bytes ( self . real_security_key )
print ( f " Private key = { self . security_private_key } " )
self . security_public_key = self . security_private_key . public_key ( )
print ( f " Public key = { self . security_public_key } " )
2025-01-30 20:52:26 +01:00
def decrypt_session_key ( self , encrypted_session_key , ephemeral_key , session_mac ) :
# Construct the public ephemeral key
# use + b'==') to expand padding https://stackoverflow.com/questions/2941995/python-ignore-incorrect-padding-error-when-base64-decoding
ephemeral_key_bytes = base64 . b64decode ( ephemeral_key + ' == ' )
ephemeral_public_key = X25519PublicKey . from_public_bytes ( ephemeral_key_bytes )
2025-01-31 15:25:10 +01:00
# This is effectively ECDH provided by cryptography library
2025-01-30 21:07:31 +01:00
shared_key = self . security_private_key . exchange ( ephemeral_public_key )
2025-01-30 20:52:26 +01:00
# when we have shared secret, use HDKF to get the AES part
# "Using the shared secret,
# generate 80 bytes
# by performing an HKDF
# using SHA-256 as the hash,
# with a salt of 32 bytes of 0,
# and with the empty string as the info.
# The first 32 bytes are used as the AES key,
# the next 32 bytes are used as the MAC key,
# and the last 16 bytes are used as the AES initialization vector."
2025-01-31 15:25:10 +01:00
# Using a key derivation function
2025-01-30 20:52:26 +01:00
derived_key = HKDF (
algorithm = hashes . SHA256 ( ) ,
length = 80 ,
salt = bytes ( 32 ) ,
info = b ' ' ,
) . derive ( shared_key )
print ( f " Derived key = %s, len=%s " % ( derived_key , len ( derived_key ) ) )
aes_key = derived_key [ : 32 ]
mac_key = derived_key [ 32 : 64 ]
aes_iv = derived_key [ 64 : ]
print ( " AES key = {0} / len = {1} " . format ( aes_key , len ( aes_key ) ) )
print ( " Mac key = {0} / len = {1} " . format ( mac_key , len ( mac_key ) ) )
print ( " AES IV = {0} / len = {1} " . format ( aes_iv , len ( aes_iv ) ) )
# Pass an empty string through HMAC-SHA-256 using the MAC key generated above. The first 8 bytes of the resulting MAC are base64-encoded, and become the mac property of the session_data.
2025-01-31 15:25:10 +01:00
# hashed message authentication code = HMAC
# This basically allows us to check if we derived the correct key
2025-01-30 20:52:26 +01:00
mac = HMAC ( mac_key , hashes . SHA256 ( ) )
mac . update ( b ' ' )
# only use first 8 bytes
signature = mac . finalize ( ) [ : 8 ]
print ( f " Calculated signature over empty string = { signature } " )
session_signature = base64 . b64decode ( session_mac + ' == ' )
print ( f " Session signature = { session_signature } " )
if signature == session_signature :
print ( " Signature seems to be correct " )
else :
print ( " Signature likely incorrect " )
raise Exception ( " Session key signature broken " )
2025-01-30 21:07:31 +01:00
cipher = Cipher ( algorithms . AES ( aes_key ) , modes . CBC ( aes_iv ) )
decryptor = cipher . decryptor ( )
# use + b'==') to expand padding https://stackoverflow.com/questions/2941995/python-ignore-incorrect-padding-error-when-base64-decoding
encrypted_session_key_bytes = base64 . b64decode ( encrypted_session_key + ' == ' )
session_key_bytes = decryptor . update ( encrypted_session_key_bytes ) + decryptor . finalize ( )
# Remove PKCS7 padding - block size 128 was guessed / tested to be correct
2025-01-31 15:25:10 +01:00
# Needs to be verified - it should in theory be 256
2025-01-31 15:26:12 +01:00
unpadder = padding . PKCS7 ( 256 ) . unpadder ( )
2025-01-30 21:07:31 +01:00
data = unpadder . update ( session_key_bytes )
data + = unpadder . finalize ( )
session_key_json_string = data . decode ( " utf8 " )
print ( f " Unencrypted session key JSON: { session_key_json_string } " )
session_key_json = json . loads ( session_key_json_string )
session_key_base64 = session_key_json [ ' session_key ' ]
print ( " session key = {session_key_base64} , {length} " . format ( session_key_base64 = session_key_base64 , length = len ( session_key_base64 ) ) )
2025-01-30 21:10:05 +01:00
return session_key_base64
2025-01-30 20:52:26 +01:00
2025-01-30 20:35:14 +01:00
def decrypt_message ( self , ciphertext , session_id ) :
room_key = self . room_keys [ ' rooms ' ] [ self . room_id ] [ ' sessions ' ]
print ( f " Messages key data: { room_key } " )
2025-01-30 20:52:26 +01:00
encrypted_session_key = room_key [ session_id ] [ ' session_data ' ] [ ' ciphertext ' ]
ephemeral_key = room_key [ session_id ] [ ' session_data ' ] [ ' ephemeral ' ]
session_mac = room_key [ session_id ] [ ' session_data ' ] [ ' mac ' ]
2025-01-30 21:07:31 +01:00
session_key_base64 = self . decrypt_session_key ( encrypted_session_key ,
2025-01-30 20:52:26 +01:00
ephemeral_key ,
session_mac )
2025-01-30 20:39:21 +01:00
2025-01-30 21:07:31 +01:00
inbound_group = InboundGroupSession . import_session ( session_key_base64 )
2025-01-30 21:10:05 +01:00
plaintext = inbound_group . decrypt ( ciphertext )
2025-01-30 21:07:31 +01:00
2025-01-30 21:10:05 +01:00
print ( f " Encrypted message { ciphertext } = { plaintext } " )
2025-01-30 21:07:31 +01:00
2025-01-30 20:35:14 +01:00
def decrypt_room_messages ( self ) :
"""
Decrypt messages that are of type ' m.room.encrypted '
{ ' type ' : ' m.room.encrypted ' , ' room_id ' : ' !fDjvLemgiriPvvWEeG:ungleich.ch ' , ' sender ' : ' @nico:ungleich.ch ' , ' content ' : { ' algorithm ' : ' m.megolm.v1.aes-sha2 ' , ' ciphertext ' : ' AwgBEqABNL8ztRQA67gXxkpbeiSp3zkJTkPXUwjQh0VnnFh6+Tff/dWjfF2rYu9q7MhG7BQgtaAoBoFNot8bPan23Y8Niip714ntI7t89F1t79TkUOcn5H0STydqGOOoZqnDf/l63ggWfD8EbudFSxoO7sJLL9iGO2+9HYWTMdTFAhcHg5c/k3aG+fQrXkbv+5afZXH3CxKnWxe4ukkoGMaDAo7jm3l2killUJ/J6NynCiJ/XinFWIdbRXSIUx3cwnFS/KWvdVmhu2iXYFtIvV65UE/JFhDjZ+rCH7lZ9DBD5jKjsVPQJqtFule0CQ ' , ' device_id ' : ' SSAUACUQKJ ' , ' sender_key ' : ' pEDLuq1RlDI2bxO6/lx9OQZt0NYma+gs6jg3QVYl4Vk ' , ' session_id ' : ' nkx3WnUpLL7hblZ9LNBkx0RPrKp3weX2o/aAgp7hx0c ' } , ' origin_server_ts ' : 1738264304685 , ' unsigned ' : { ' membership ' : ' join ' , ' age ' : 126031 } , ' event_id ' : ' $k9dYdD6b5eG_AZaZtO6imeHU8HGBpiZt3dqM8C3T8-8 ' , ' user_id ' : ' @nico:ungleich.ch ' , ' age ' : 126031 }
"""
2025-01-30 20:12:39 +01:00
for message in self . room_messages :
2025-01-30 20:35:14 +01:00
if message [ ' type ' ] == ' m.room.encrypted ' :
sender = message [ ' sender ' ]
ciphertext = message [ ' content ' ] [ ' ciphertext ' ]
session_id = message [ ' content ' ] [ ' session_id ' ]
2025-01-30 19:30:46 +01:00
2025-01-30 20:35:14 +01:00
plaintext = self . decrypt_message ( ciphertext , session_id )
2025-01-30 19:30:46 +01:00
2025-01-30 20:35:14 +01:00
def get_messages ( self ) :
self . parse_security_key ( )
self . check_security_key_parity ( )
self . setup_security_key_pair ( )
2025-01-30 20:12:39 +01:00
2025-01-30 20:35:14 +01:00
self . get_room_messages ( )
for message in self . room_messages :
print ( message )
self . get_room_keys ( )
self . decrypt_room_messages ( )
2025-01-30 19:30:46 +01:00
# Decrypt each message:
# Retrieve the session key
if __name__ == ' __main__ ' :
parser = argparse . ArgumentParser ( )
parser . add_argument ( " --server-url " , required = True , help = " Matrix Server URL, i.e. https://your-server " )
parser . add_argument ( " --room-id " , required = True , help = " ID of the room to get messages from, i.e. !...:your-matrix-domain " )
parser . add_argument ( " --login-username " , required = True , help = " Username for logging into the server, i.e. @you:your-matrix-domain " )
parser . add_argument ( " --login-password " , required = True , help = " Password for logging into the server, i.e. your-very-safe-password!! " )
2025-01-30 20:12:39 +01:00
parser . add_argument ( " --security-key " , required = True , help = " Your security backup key, i.e. ABCf defg aaaa - ensure to quote as one argument! " )
2025-01-30 19:30:46 +01:00
args = parser . parse_args ( )
client = UngleichMatrixClient ( args )
client . get_messages ( )