Several fixes: bytes, types, layers, networking

This commit is contained in:
gpotter2 2017-07-26 23:34:28 +02:00
parent d64526a001
commit 2d6991edd4
7 changed files with 23 additions and 21 deletions

View File

@ -153,7 +153,7 @@ def attach_filter(s, bpf_filter, iface):
)
return
nb = int(lines[0])
bpf = ""
bpf = b""
for l in lines[1:]:
bpf += struct.pack("HBBI", *(int(e) for e in l.split()))

View File

@ -746,7 +746,7 @@ class Automaton(six.with_metaclass(Automaton_metaclass)):
self.cmdout.send(c)
except Exception as e:
exc_info = sys.exc_info()
self.debug(3, "Transfering exception from tid=%i:\n%s"% (self.threadid, traceback.format_exc(exc_info)))
self.debug(3, "Transfering exception from tid=%i:\n%s"% (self.threadid, traceback.format_exception(*exc_info)))
m = Message(type=_ATMT_Command.EXCEPTION, exception=e, exc_info=exc_info)
self.cmdout.send(m)
self.debug(3, "Stopping control thread (tid=%i)"%self.threadid)

View File

@ -224,7 +224,7 @@ def load_manuf(filename):
lng=shrt
else:
lng = l[i+2:]
manufdb[oui] = shrt, lng
manufdb[oui] = plain_str(shrt), plain_str(lng)
except Exception:
log_loading.warning("Couldn't parse one line from [%s] [%r]",
filename, l, exc_info=True)

View File

@ -34,8 +34,8 @@ class DNSStrField(StrField):
return b"\x00"
# Truncate chunks that cannot be encoded (more than 63 bytes..)
x = b"".join(chr(len(y)) + y for y in (k[:63] for k in x.split(".")))
if ord(x[-1]) != 0:
x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(".")))
if orb(x[-1]) != 0:
x += b"\x00"
return x

View File

@ -753,7 +753,8 @@ class Packet(six.with_metaclass(Packet_metaclass, BasePacket)):
def hide_defaults(self):
"""Removes fields' values that are the same as default values."""
for k, v in self.fields.items(): # use .items(): self.fields is modified in the loop
for k in list(self.fields.keys()): # use .items(): self.fields is modified in the loop
v = self.fields[k]
if k in self.default_fields:
if self.default_fields[k] == v:
del self.fields[k]

View File

@ -33,7 +33,8 @@ from scapy.base_classes import BasePacketList
###########
def get_temp_file(keep=False, autoext=""):
f = os.tempnam("","scapy")
with tempfile.NamedTemporaryFile(prefix="scapy") as _f:
f = _f.name
if not keep:
conf.temp_files.append(f+autoext)
return f + autoext
@ -55,7 +56,7 @@ def sane(x):
if (j < 32) or (j >= 127):
r=r+"."
else:
r=r+i
r=r+chb(i)
return r
def lhex(x):
@ -485,7 +486,7 @@ def do_graph(graph,prog=None,format=None,target=None,type=None,string=None,optio
start_viewer=False
if target is None:
if WINDOWS:
tempfile = os.tempnam("", "scapy") + "." + format
tempfile = get_temp_file() + "." + format
target = "> %s" % tempfile
start_viewer = True
else:

View File

@ -27,7 +27,7 @@ def test_list_contrib():
list_contrib()
result_list_contrib = cmco.get_output()
assert("http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib)
assert(result_list_contrib.split('\n') > 40)
assert(len(result_list_contrib.split('\n')) > 40)
test_list_contrib()
@ -94,7 +94,7 @@ else:
if len(routes6):
assert(len([r for r in routes6 if r[0] == "::1" and r[-1] == ["::1"]]) >= 1)
if iflist >= 2:
if len(iflist) >= 2:
assert(len([r for r in routes6 if r[0] == "fe80::" and r[1] == 64]) >= 1)
len([r for r in routes6 if in6_islladdr(r[0]) and r[1] == 128 and r[-1] == ["::1"]]) >= 1
else:
@ -262,8 +262,8 @@ Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000)
assert _.mysummary() == 'ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (0x9000)'
= Test fletcher16_* functions
assert(fletcher16_checksum("\x28\x07") == 22319)
assert(fletcher16_checkbytes("ABCDEF", 2) == "\x89\x67")
assert(fletcher16_checksum(b"\x28\x07") == 22319)
assert(fletcher16_checkbytes(b"ABCDEF", 2) == "\x89\x67")
= Test zerofree_randstring function
random.seed(0x2807)
@ -275,7 +275,7 @@ def test_export_import_object():
with ContextManagerCaptureOutput() as cmco:
export_object(2807)
result_export_object = cmco.get_output()
assert(result_export_object.endswith("eNprYPL9zqUHAAdrAf8=\n\n"))
assert(result_export_object.endswith(b"eNprYPL9zqUHAAdrAf8=\n\n"))
assert(import_object(result_export_object) == 2807)
test_export_import_object()
@ -285,11 +285,11 @@ tex_escape("$#_") == "\\$\\#\\_"
= Test colgen function
f = colgen(range(3))
assert(len([f.next() for i in range(2)]) == 2)
assert(len([next(f) for i in range(2)]) == 2)
= Test incremental_label function
f = incremental_label()
assert([f.next() for i in range(2)] == ["tag00000", "tag00001"])
assert([next(f) for i in range(2)] == ["tag00000", "tag00001"])
= Test corrupt_* functions
import random
@ -309,7 +309,7 @@ assert(load_object(fname) == 2807)
= Test whois function
if not WINDOWS:
result = whois("193.0.6.139")
assert("inetnum" in result and "Amsterdam" in result)
assert(b"inetnum" in result and b"Amsterdam" in result)
= Test manuf DB methods
~ manufdb
@ -351,7 +351,7 @@ assert a.next() == (3, 2, 3)
saved_conf_verb = conf.verb
fd, fname = tempfile.mkstemp()
os.write(fd, "conf.verb = 42\n")
os.write(fd, b"conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname, globals(), locals())
@ -671,9 +671,9 @@ x=SNMP(b'0y\x02\x01\x00\x04\x06public\xa2l\x02\x01)\x02\x01\x00\x02\x01\x000a0!\
x.show()
assert(x.community==b"public" and x.version == 0)
assert(x.PDU.id == 41 and len(x.PDU.varbindlist) == 3)
assert(x.PDU.varbindlist[0].oid == "1.3.6.1.4.1.253.8.64.4.2.1.7.10.14130104")
assert(x.PDU.varbindlist[0].oid == b"1.3.6.1.4.1.253.8.64.4.2.1.7.10.14130104")
assert(x.PDU.varbindlist[0].value == "172.31.19.2")
assert(x.PDU.varbindlist[2].oid == "1.3.6.1.4.1.253.8.64.4.2.1.5.10.14130400")
assert(x.PDU.varbindlist[2].oid == b"1.3.6.1.4.1.253.8.64.4.2.1.5.10.14130400")
assert(x.PDU.varbindlist[2].value == 1)
@ -891,7 +891,7 @@ def _send_or_sniff(pkt, timeout, flt, pid, fork, t_other=None):
os.waitpid(pid, 0)
else:
t_other.join()
assert raw(pkt) in (str(p[pkt.__class__]) for p in pkts if pkt.__class__ in p)
assert raw(pkt) in (raw(p[pkt.__class__]) for p in pkts if pkt.__class__ in p)
def send_and_sniff(pkt, timeout=2, flt=None):
"""Send a packet, sniff, and check the packet has been seen"""