From 885003504e8b5e55dc61b77980163a15cb63b027 Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Sun, 27 Aug 2017 03:49:13 +0200 Subject: [PATCH] Add tests for TAP sockets & bridge_and_sniff() --- .appveyor.yml | 2 +- test/sendsniff.uts | 75 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 test/sendsniff.uts diff --git a/.appveyor.yml b/.appveyor.yml index 3a744d852..ffa3c7a8a 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -29,7 +29,7 @@ test_script: - 'del test\regression.uts' # Secondary and contrib unit tests - - 'del test\bpf.uts test\linux.uts' # Don't bother with OS dependent regression tests + - 'del test\bpf.uts test\linux.uts test\sendsniff.uts' # Don't bother with OS dependent regression tests - "%PYTHON%\\python -m coverage run --parallel-mode bin\\UTscapy -c test\\configs\\windows.utsc || exit /b 42" # TLS unit tests diff --git a/test/sendsniff.uts b/test/sendsniff.uts new file mode 100644 index 000000000..64658050a --- /dev/null +++ b/test/sendsniff.uts @@ -0,0 +1,75 @@ +% send, sniff, sr* tests for Scapy + +~ netaccess + +############ +############ ++ Bridge using tap interfaces + +~ tap linux + += Create two tap interfaces +tap0, tap1 = [TunTapInterface("tap%d" % i, create=True) for i in range(2)] +from threading import Thread + += Run a sniff thread on the tap1 **interface** +t_sniff = Thread( + target=sniff, + kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, + "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} +) +t_sniff.start() + += Run a bridge_and_sniff thread between the taps **sockets** +t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), + kwargs={"store": False, "count": 5, 'prn': Packet.summary, + "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) +t_bridge.start() + += Send five packets to the tap0 **interface** +time.sleep(1) +sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", + count=5) + += Wait for the threads +t_bridge.join() +t_sniff.join() + +# Same tests, with "NAT" using xfrm function + += Run a sniff thread on the tap1 **interface** +t_sniff = Thread( + target=sniff, + kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, + "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} +) +t_sniff.start() + += Run a bridge_and_sniff thread between the taps **sockets** +def nat_1_2(pkt): + if IP in pkt and pkt[IP].src == "1.2.3.4": + pkt[IP].src = "2.3.4.5" + del pkt[IP].chksum + return pkt + return False + +t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), + kwargs={"store": False, "count": 5, 'prn': Packet.summary, + "xfrm12": nat_1_2, + "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) +t_bridge.start() + += Send five packets to the tap0 **interface** +time.sleep(1) +sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", + count=5) + += Wait for the threads +t_bridge.join() +t_sniff.join() + += Delete the tap interfaces +tap0.close() +tap1.close() +tap0.delete() +tap1.delete()