master-thesis/bin/checksum_delta_diff_test.py
Nico Schottelius 3465dbb10e +perms
Signed-off-by: Nico Schottelius <nico@nico-notebook.schottelius.org>
2019-08-16 19:59:18 +02:00

243 lines
7.1 KiB
Python
Executable file

#!/usr/bin/python3
#from __future__ import unicode_literals
import ipaddress
from scapy.all import *
import array
import struct
# stolen from scapy (little endian system)
def checksum_scapy(pkt):
# Even the length
if len(pkt) % 2 == 1:
pkt += b"\0"
# P4: ABOVE not needed (always even)
# array: create an array of 16 bit values from the input
# and then sum it up -> this might be sligthly/much higher
# than 16 bit (additions!)
s = sum(array.array("H", pkt))
# P4: summing manually into 32bit
# add the (right shift 16) and the 16 right bits
# basically: assuming 32 bit (?): add the two 16 bit "words"
# together
# This might still exceed 16 bit!
s = (s >> 16) + (s & 0xffff)
# P4:
# right shift 16 -> zero least significant bits,
# and add the upper bits to it
# So now we add anything that was left over from before
s += s >> 16
# 2 complement -- this is the only important part here ???
s = ~s
# right shift 8 bit -> maximum 8 bit are set (above code)
# then we and it with 1's -- ??? WTF??? -> 8 bit filter!
# -> first part is extracting 8 highest bits
#
# then we left shift the original value by 8 (???)
# and then we OR all of that
# -> second part is unclear
#
# Then we mask it again with 16 bit 1's -> cut off stuff
return (((s>>8)&0xff)|s<<8) & 0xffff
# a) Compare for TCP, UDP -> IPv6 does not have checksum!
# 1. convert to array of "bytes"
# 2. import into an array
# 3. sum everything up
# b) Generate checksum from v4 offset and IPv6 for IPv4
# a)
# UDP/TCP CONTENT stays the same
# Only the diff between v4 and v6 "counts"
#
# b)
# Not needed
def sum_for_udp(packet):
sums = ""
sums += struct.pack("H", packet[UDP].sport) # 16 bit
sums += struct.pack("H", packet[UDP].dport) # 16 bit
sums += struct.pack("H", packet[UDP].len) # 16 bit
return sums
def sum_for_v6(packet):
# hdr.ipv6.src_addr, /* 128 */
# hdr.ipv6.dst_addr, /* 128 */
# meta.length_without_ip_header, /* 32 */
# 24w0, /* 24 */
# hdr.ipv6.next_header, /* 8 */
# /* total: 324 */
# // UDP header
# hdr.udp.src_port, /* 16 */
# hdr.udp.dst_port, /* 16 */
# hdr.udp.payload_length /* 16 */
# /* all: 372 */
# order does not matter!
sums = ""
sums += ipaddress.IPv6Address(packet[IPv6].src.decode("utf-8")).packed
sums += ipaddress.IPv6Address(packet[IPv6].dst.decode("utf-8")).packed
sums += struct.pack("H", packet[IPv6].plen)
sums += struct.pack("B", packet[IPv6].nh)
# print("{} - {} - {}".format(len(sums), sums, checksum_scapy(sums)))
return sums
def sum_for_v4(packet):
# Get diffs -- for UDP
# udp_v4 =
# hdr.ipv4.src_addr,
# hdr.ipv4.dst_addr,
# 8w0,
# hdr.ipv4.protocol,
# meta.length_without_ip_header,
sums = ""
sums += ipaddress.IPv4Address(packet[IP].src.decode("utf-8")).packed
sums += ipaddress.IPv4Address(packet[IP].dst.decode("utf-8")).packed
sums += struct.pack("H", packet[IP].len - 20) # -20 for ip header
sums += struct.pack("H", packet[IP].proto) # udp / tcp
return sums
def sum_for_v4_from_v6(packet):
# update_checksum(meta.chk_ipv4 == 1,
# {
# hdr.ipv4.version,
# hdr.ipv4.ihl,
# hdr.ipv4.diff_serv,
# hdr.ipv4.ecn,
# hdr.ipv4.totalLen,
# hdr.ipv4.identification,
# hdr.ipv4.flags,
# hdr.ipv4.fragOffset,
# hdr.ipv4.ttl,
# hdr.ipv4.protocol,
# hdr.ipv4.src_addr,
# hdr.ipv4.dst_addr
# },
# hdr.ipv4.hdrChecksum,
# HashAlgorithm.csum16
# );
pass
if __name__ == '__main__':
p = []
e0 = Ether(src="00:00:00:00:00:00",
dst="00:00:00:00:00:00")
i0 = IP(src = "0.0.0.0",
dst = "0.0.0.0")
t0 = TCP(dport=0, sport=0)
#t = TCP(dport=80, sport=random.randint(49152,65535))
# print("chk_t = {}".format(t))
e = Ether(src="00:00:0a:00:00:03", dst='00:00:0a:00:00:01')
i4 = IP(src = "10.0.0.1", dst = "10.1.1.1")
i6 = IPv6(src="2001:db8:1::a00:1", dst="2001:db8::1")
# e = Ether(src="02:53:55:42:45:01", dst='ff:ff:ff:ff:ff:ff')
# i4 = IP(src = "192.168.1.1", dst = "192.168.4.2")
# i6 = IPv6(src = "2001:db8:42::1", dst = "2001:db8::2")
i62 = IPv6(src = "2001:db8:42::2", dst = "2001:db8::2")
t = TCP(dport=80, sport=1337)
u = UDP(dport=80, sport=1337)
#print("chk_t = {}".format(t))
d0 = ""
d = "A"
p6_udp = e / i6 / u / d
p6_udp2 = e / i62 / u / d
p4_udp = e / i4 / u / d
p4_p6_1 = {
"ipv6": p6_udp,
"ipv4": p4_udp
}
p.append(p4_p6_1)
p4_p6_2 = {
"ipv6": p6_udp2,
"ipv4": p4_udp
}
p.append(p4_p6_2)
#p.append(e / i62 / u / d)
for p_pair in p:
v6 = p_pair["ipv6"]
v4 = p_pair["ipv4"]
checksums = {}
header_checksums = {}
diff_ip_headers = 0
for packet in [v6, v4]:
#print("p = {}".format(packet.__repr__()))
packet_rebuild = packet.__class__(str(packet))
print("rebuild = {}".format(packet_rebuild.__repr__()))
chk_old = packet[UDP].chksum
chk_new = packet_rebuild[UDP].chksum
# print("chk1 = {} chk2={}".format(chk_old, chk_new))
sums = ""
if IPv6 in packet:
headertype = "ipv6"
sums += sum_for_v6(packet_rebuild)
if UDP in packet:
checksums["ipv6"] = packet_rebuild[UDP].chksum
if IP in packet:
headertype = "ipv4"
sums += sum_for_v4(packet_rebuild)
if UDP in packet:
checksums["ipv4"] = packet_rebuild[UDP].chksum
header_checksums[headertype] = checksum_scapy(sums)
print("Checksum-parts {} for {}".format(checksum_scapy(sums), packet_rebuild.__repr__()))
print("UDP v6: {} v4: {} diff: {}".format(checksums["ipv6"],
checksums["ipv4"],
checksums["ipv6"] - checksums["ipv4"]))
header_diff_from_v6 = header_checksums["ipv6"] - header_checksums["ipv4"]
print("Header v6: {} v4: {} diff: {}".format(header_checksums["ipv6"],
header_checksums["ipv4"],
header_diff_from_v6))
print("Translating v6 to v4, expected v4 UDP checksum: {}".format(
checksums["ipv6"] - header_diff_from_v6))
print("Translating v4 to v6, expected v6 UDP checksum: {}".format(
checksums["ipv4"] + header_diff_from_v6))
# Generate the IPv4 header checksum from IPv6: