#!/usr/bin/env python # from switch_calc_headers import * from scapy.all import * from nf_sim_tools import * from collections import OrderedDict import sss_sdnet_tuples ######################## # pkt generation tools # ######################## pktsApplied = [] pktsExpected = [] # Pkt lists for SUME simulations nf_applied = OrderedDict() nf_applied[0] = [] nf_applied[1] = [] nf_applied[2] = [] nf_applied[3] = [] nf_expected = OrderedDict() nf_expected[0] = [] nf_expected[1] = [] nf_expected[2] = [] nf_expected[3] = [] nf_port_map = { "nf0":0b00000001, "nf1":0b00000100, "nf2":0b00010000, "nf3":0b01000000, "dma0":0b00000010 } nf_id_map = { "nf0":0, "nf1":1, "nf2":2, "nf3":3 } sss_sdnet_tuples.clear_tuple_files() def applyPkt(pkt, ingress, time): print("Applying pkt on {} at {}: ".format(ingress, time)) pktsApplied.append(pkt) sss_sdnet_tuples.sume_tuple_in['src_port'] = nf_port_map[ingress] sss_sdnet_tuples.sume_tuple_expect['src_port'] = nf_port_map[ingress] pkt.time = time nf_applied[nf_id_map[ingress]].append(pkt) def expPkt(pkt, egress): pktsExpected.append(pkt) sss_sdnet_tuples.sume_tuple_expect['dst_port'] = nf_port_map[egress] sss_sdnet_tuples.write_tuples() if egress in ["nf0","nf1","nf2","nf3"]: nf_expected[nf_id_map[egress]].append(pkt) elif egress == 'bcast': nf_expected[0].append(pkt) nf_expected[1].append(pkt) nf_expected[2].append(pkt) nf_expected[3].append(pkt) def print_summary(pkts): for pkt in pkts: print "summary = ", pkt.summary() def write_pcap_files(): wrpcap("src.pcap", pktsApplied) wrpcap("dst.pcap", pktsExpected) for i in nf_applied.keys(): if (len(nf_applied[i]) > 0): wrpcap('nf{0}_applied.pcap'.format(i), nf_applied[i]) for i in nf_expected.keys(): if (len(nf_expected[i]) > 0): wrpcap('nf{0}_expected.pcap'.format(i), nf_expected[i]) # i = 0..3 for i in nf_applied.keys(): print "nf{0}_applied times: ".format(i), [p.time for p in nf_applied[i]] ##################### # generate testdata # ##################### MAC1 = "08:11:11:11:11:08" MAC2 = "08:22:22:22:22:08" pktCnt = 0 INDEX_WIDTH = 4 REG_DEPTH = 2**INDEX_WIDTH # Not sure what this is used for NUM_KEYS = 4 lookup_table = { 0: 0x00000001, 1: 0x00000010, 2: 0x00000100, 3: 0x00001000 } def test_nat64(): """ packets for a certain mac always go to nf0 """ pkgCnt = 1 # From v4 to v6 e4 = Ether(dst=MAC2, src=MAC1) i4 = IP(src = "10.0.0.42", dst = "10.0.0.66") u4 = UDP(sport=5000, dport=2345) p4 = 500 * "A" pkg4 = e4 / i4 / u4 / p4 applyPkt(pkg4, "nf2", pkgCnt) e6 = e4 i6 = IPv6(src = "2001:db8:42::a00:2a", dst = "2001:db8:42::42") u6 = u4 p6 = p4 pkg6 = e6 / i6 / u6 / p6 expPkt(pkg6, 'nf3') pkgCnt += 1 # From v6 to v4 e6 = Ether(dst=MAC1, src=MAC2) i6 = IPv6(dst = "2001:db8:42::a00:2a", src = "2001:db8:42::42") u6 = UDP(dport=5000, sport=2345) p6 = 500 * "A" pkg6 = e6 / i6 / u6 / p6 e4 = e6 i4 = IP(dst = "10.0.0.42", src = "10.0.0.66", id=0) u4 = u6 p4 = p6 pkg4 = e4 / i4 / u4 / p4 applyPkt(pkg6, "nf3", pkgCnt) expPkt(pkg4, 'nf2') test_nat64() write_pcap_files()