From 04b05ed5f556b888e8c8a6c4c549d5af50ae0afe Mon Sep 17 00:00:00 2001 From: Guillaume Valadon Date: Wed, 19 Apr 2017 10:22:14 +0200 Subject: [PATCH 1/2] Test report_ports() using mock --- test/regression.uts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/regression.uts b/test/regression.uts index a4a9d3e9c..624b157ef 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -8147,6 +8147,18 @@ assert("192.168.0.254" not in [p[IP].src for p in new_pl]) = IPv4 - reporting +@mock.patch("scapy.layers.inet.sr") +def test_report_ports(mock_sr): + def sr(*args, **kargs): + return [(IP()/TCP(dport=81, flags="S"), IP()/TCP(sport=81, flags="SA")), + (IP()/TCP(dport=82, flags="S"), IP()/ICMP(type=3, code=1)), + (IP()/TCP(dport=83, flags="S"), IP()/TCP(sport=83, flags="R"))], [IP()/TCP(dport=84, flags="S")] + mock_sr.side_effect = sr + report = "\\begin{tabular}{|r|l|l|}\n\hline\n81 & open & SA \\\\\n\hline\n?? & closed & ICMP type dest-unreach/host-unreachable from 127.0.0.1 \\\\\n83 & closed & TCP R \\\\\n\hline\n84 & ? & unanswered \\\\\n\hline\n\end{tabular}\n" + assert(report_ports("www.secdev.org", [81,82,83,84]) == report) + +test_report_ports() + result_IPID_count = "" def test_IPID_count(): def write(s): From 84d5cf965a850ca4893155b1c9aa1e6c828f8e22 Mon Sep 17 00:00:00 2001 From: Guillaume Valadon Date: Wed, 19 Apr 2017 10:38:36 +0200 Subject: [PATCH 2/2] TCP automaton unit tests on Linux --- .appveyor.yml | 2 +- .travis/test.sh | 2 +- test/linux.uts | 55 +++++++++++++++++++++++++++++++++++++++++++++ test/regression.uts | 10 ++++----- 4 files changed, 62 insertions(+), 7 deletions(-) create mode 100644 test/linux.uts diff --git a/.appveyor.yml b/.appveyor.yml index 46361ef07..d92886c00 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -28,7 +28,7 @@ test_script: - 'del test\regression.uts' # Secondary and contrib unit tests - - 'del test\bpf.uts' # Don't bother with BPF regression tests + - 'del test\bpf.uts test\linux.uts' # Don't bother with OS dependent regression tests - "%PYTHON%\\python -m coverage run --parallel-mode bin\\UTscapy -c test\\configs\\windows.utsc || exit /b 42" # TLS unit tests diff --git a/.travis/test.sh b/.travis/test.sh index 691aaf1d4..cb7a55f44 100644 --- a/.travis/test.sh +++ b/.travis/test.sh @@ -61,7 +61,7 @@ then then $SCAPY_SUDO ./run_tests -q -F -t bpf.uts $UT_FLAGS || exit $? fi - UT_FLAGS+=" -K manufdb" + UT_FLAGS+=" -K manufdb -K linux" fi # Run all normal and contrib tests diff --git a/test/linux.uts b/test/linux.uts new file mode 100644 index 000000000..ed251ae43 --- /dev/null +++ b/test/linux.uts @@ -0,0 +1,55 @@ +% Regression tests for Linux only + +# More informations at http://www.secdev.org/projects/UTscapy/ + + +############ +############ + ++ Linux only test + += TCP client automaton +~ automaton netaccess linux needs_root +* This test retries on failure because it often fails + +import os +import time +import signal + +def handler(signum, stack_frame): + raise Exception("Timer expired !") + +tmp = signal.signal(signal.SIGALRM, handler) + +SECDEV_IP4 = "203.178.141.194" +IPTABLE_RULE = "iptables -%c INPUT -s %s -p tcp --sport 80 -j DROP" + +# Drop packets from SECDEV_IP4 +assert(os.system(IPTABLE_RULE % ('A', SECDEV_IP4)) == 0) + +success = False +for i in xrange(10): + tmp = signal.alarm(5) + try: + r, w = os.pipe() + t = TCP_client(SECDEV_IP4, 80, external_fd={ "tcp": (r,w) }) + tmp = os.write(w, "HEAD / HTTP/1.0\r\n\r\n") + t.runbg() + time.sleep(0.5) + response = os.read(r, 4096) + tmp = signal.alarm(0) # cancel the alarm + t.stop() + os.close(r) + os.close(w) + if response.startswith("HTTP/1.1 200 OK"): + success = True + break + else: + time.sleep(0.5) + except Exception as e: + print e + +# Remove the iptables rule +assert(os.system(IPTABLE_RULE % ('D', SECDEV_IP4)) == 0) + +assert(success) diff --git a/test/regression.uts b/test/regression.uts index 624b157ef..59cdcb5c7 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -8150,12 +8150,12 @@ assert("192.168.0.254" not in [p[IP].src for p in new_pl]) @mock.patch("scapy.layers.inet.sr") def test_report_ports(mock_sr): def sr(*args, **kargs): - return [(IP()/TCP(dport=81, flags="S"), IP()/TCP(sport=81, flags="SA")), - (IP()/TCP(dport=82, flags="S"), IP()/ICMP(type=3, code=1)), - (IP()/TCP(dport=83, flags="S"), IP()/TCP(sport=83, flags="R"))], [IP()/TCP(dport=84, flags="S")] + return [(IP()/TCP(dport=65081, flags="S"), IP()/TCP(sport=65081, flags="SA")), + (IP()/TCP(dport=65082, flags="S"), IP()/ICMP(type=3, code=1)), + (IP()/TCP(dport=65083, flags="S"), IP()/TCP(sport=65083, flags="R"))], [IP()/TCP(dport=65084, flags="S")] mock_sr.side_effect = sr - report = "\\begin{tabular}{|r|l|l|}\n\hline\n81 & open & SA \\\\\n\hline\n?? & closed & ICMP type dest-unreach/host-unreachable from 127.0.0.1 \\\\\n83 & closed & TCP R \\\\\n\hline\n84 & ? & unanswered \\\\\n\hline\n\end{tabular}\n" - assert(report_ports("www.secdev.org", [81,82,83,84]) == report) + report = "\\begin{tabular}{|r|l|l|}\n\hline\n65081 & open & SA \\\\\n\hline\n?? & closed & ICMP type dest-unreach/host-unreachable from 127.0.0.1 \\\\\n65083 & closed & TCP R \\\\\n\hline\n65084 & ? & unanswered \\\\\n\hline\n\end{tabular}\n" + assert(report_ports("www.secdev.org", [65081,65082,65083,65084]) == report) test_report_ports()