Merge pull request #802 from gpotter2/tests-scapypipes

[coverage] Add ScapyPipes tests
This commit is contained in:
Guillaume Valadon 2017-09-11 12:29:36 +02:00 committed by GitHub
commit 9a2e4bfec9
3 changed files with 99 additions and 3 deletions

View File

@ -28,6 +28,12 @@ except ImportError:
else:
THREAD_EXCEPTION = thread.error
if WINDOWS:
from scapy.arch.pcapdnet import PcapTimeoutElapsed
recv_error = PcapTimeoutElapsed
else:
recv_error = ()
""" In Windows, select.select is not available for custom objects. Here's the implementation of scapy to re-create this functionnality
# Passive way: using no-ressources locks
+---------+ +---------------+ +-------------------------+
@ -350,7 +356,11 @@ class _ATMT_supersocket(SuperSocket):
s = str(s)
return self.spa.send(s)
def recv(self, n=MTU):
r = self.spa.recv(n)
try:
r = self.spa.recv(n)
except recv_error:
if not WINDOWS:
raise
if self.proto is not None:
r = self.proto(r)
return r

View File

@ -9,7 +9,7 @@ import Queue
from scapy.pipetool import Source,Drain,Sink
from scapy.config import conf
from scapy.utils import PcapReader, PcapWriter
from scapy.automaton import recv_error
class SniffSource(Source):
"""Read packets from an interface and send them to low exit.
@ -29,8 +29,14 @@ class SniffSource(Source):
self.s.close()
def fileno(self):
return self.s.fileno()
def check_recv(self):
return True
def deliver(self):
self._send(self.s.recv())
try:
self._send(self.s.recv())
except recv_error:
if not WINDOWS:
raise
class RdpcapSource(Source):
"""Read packets from a PCAP file send them to low exit.
@ -53,6 +59,8 @@ class RdpcapSource(Source):
self.f.close()
def fileno(self):
return self.f.fileno()
def check_recv(self):
return True
def deliver(self):
p = self.f.recv()
print("deliver %r" % p)
@ -100,6 +108,7 @@ class WrpcapSink(Sink):
self.f = PcapWriter(fname)
def stop(self):
self.f.flush()
self.f.close()
def push(self, msg):
self.f.write(msg)

View File

@ -216,3 +216,80 @@ p.start()
p.wait_and_stop()
assert test_val == "hello"
+ Advanced ScapyPipes pipetools tests
= Test SniffSource
~ netaccess
p = PipeEngine()
s = SniffSource()
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c
p.add(s)
p.start()
sniff(count=3)
p.stop()
assert c.q.get()
= Test RdpcapSource and WrpcapSink
~ needs_root
req = Ether()/IP()/ICMP()
rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
wrpcap("t.pcap", [req, rpy])
p = PipeEngine()
s = RdpcapSource("t.pcap")
d1 = Drain(name="d1")
c = WrpcapSink("t2.pcap", name="c")
s > d1 > c
p.add(s)
p.start()
p.wait_and_stop()
results = rdpcap("t2.pcap")
assert str(results[0]) == str(req)
assert str(results[1]) == str(rpy)
os.unlink("t.pcap")
os.unlink("t2.pcap")
= Test InjectSink and Inject3Sink
~ needs_root
a = IP(dst="192.168.0.1")/ICMP()
msgs = []
class FakeSocket(object):
def __init__(self, *arg, **karg):
pass
def close(self):
pass
def send(self, msg):
global msgs
msgs.append(msg)
@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
def _inject_sink(i3):
s = CLIFeeder()
s.send(a)
s.is_exhausted = True
d1 = Drain(name="d1")
c = Inject3Sink() if i3 else InjectSink()
s > d1 > c
p = PipeEngine(s)
p.start()
p.wait_and_stop()
_inject_sink(False) # InjectSink
_inject_sink(True) # Inject3Sink
assert msgs == [a,a]