Merge pull request #867 from guedou/sendrecv

[coverage] Add tests for more sending and receiving functions
This commit is contained in:
Pierre Lalet 2017-10-05 09:45:01 +02:00 committed by GitHub
commit c21964f58c
1 changed files with 44 additions and 3 deletions

View File

@ -730,20 +730,61 @@ assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
x is not None and ICMP in x and x[ICMP].type == 0
= Sending an ICMP message at layer 2 and layer 3
~ netaccess IP ICMP
tmp = send(IP(dst="8.8.8.8")/ICMP(), return_packets=True, realtime=True)
assert(len(tmp) == 1)
tmp = sendp(Ether()/IP(dst="8.8.8.8")/ICMP(), return_packets=True, realtime=True)
assert(len(tmp) == 1)
= Sending an ICMP message 'forever' at layer 2 and layer 3
~ netaccess IP ICMP
tmp = srloop(IP(dst="8.8.8.8")/ICMP(), count=1)
assert(type(tmp) == tuple and len(tmp[0]) == 1)
tmp = srploop(Ether()/IP(dst="8.8.8.8")/ICMP(), count=1)
assert(type(tmp) == tuple and len(tmp[0]) == 1)
= Sending and receiving an ICMP with flooding methods
~ netaccess IP ICMP
from functools import partial
# flooding methods do not support timeout. Packing the test for security
def _test_flood():
def _test_flood(flood_function, add_ether=False):
old_debug_dissector = conf.debug_dissector
conf.debug_dissector = False
x = sr1flood(IP(dst="www.google.com")/ICMP())
p = IP(dst="www.google.com")/ICMP()
if add_ether:
p = Ether()/p
x = flood_function(p)
conf.debug_dissector = old_debug_dissector
if type(x) == tuple:
x = x[0][0][1]
x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
x is not None and ICMP in x and x[ICMP].type == 0
t = Thread(target=_test_flood)
_test_srflood = partial(_test_flood, srflood)
t = Thread(target=_test_srflood)
t.start()
t.join(3)
assert not t.is_alive()
_test_sr1flood = partial(_test_flood, sr1flood)
t = Thread(target=_test_sr1flood)
t.start()
t.join(3)
assert not t.is_alive()
_test_srpflood = partial(_test_flood, srpflood, True)
t = Thread(target=_test_sr1flood)
t.start()
t.join(3)
assert not t.is_alive()
_test_srp1flood = partial(_test_flood, srp1flood, True)
t = Thread(target=_test_sr1flood)
t.start()
t.join(3)
assert not t.is_alive()