Fix various critical bugs on Windows (#4467)

* Cleanup windows native mode and fix Windows tests.

Note: this module should be considered a last chance only, as it comes
with MANY limitations.

* Remove allow_failures on appveyor

* Improve select_objects thanks to WSAEventSelect

* Restore support for legacy Npcap adapter (<0.9983)

* AppVeyor: upgrade tested Python version

* Fix tox breaking AGAIN

* Disable native TLS1.3 for the ancient Windows used by AppVeyor

* Improvements to SSLStreamSocket on Windows

* Disable unstable windows tests

* Disable broken DoIP tests on Windows

* Minor HTTP bugfix
This commit is contained in:
gpotter2 2024-07-26 00:55:14 +02:00 committed by GitHub
parent ee755d0d17
commit 420173c742
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 186 additions and 218 deletions

View File

@ -7,26 +7,23 @@ environment:
# Python versions that will be tested
# Note: it defines variables that can be used later
matrix:
- PYTHON: "C:\\Python37-x64"
PYTHON_VERSION: "3.7.x"
- PYTHON: "C:\\Python312-x64"
PYTHON_VERSION: "3.12.x"
PYTHON_ARCH: "64"
TOXENV: "py37-windows"
TOXENV: "py312-windows"
UT_FLAGS: "-K scanner"
- PYTHON: "C:\\Python37-x64"
PYTHON_VERSION: "3.7.x"
- PYTHON: "C:\\Python312-x64"
PYTHON_VERSION: "3.12.x"
PYTHON_ARCH: "64"
TOXENV: "py37-windows"
TOXENV: "py312-windows"
UT_FLAGS: "-k scanner"
# allow scanner builds to fail
matrix:
allow_failures:
- UT_FLAGS: "-k scanner"
# There is no build phase for Scapy
build: off
install:
# Log some debug info
- ver
# Install the npcap, windump and wireshark suites
- ps: .\.config\appveyor\InstallNpcap.ps1
- ps: .\.config\appveyor\InstallWindumpNpcap.ps1
@ -43,7 +40,7 @@ test_script:
# Set environment variables
- set PYTHONPATH=%APPVEYOR_BUILD_FOLDER%
- set PATH=%APPVEYOR_BUILD_FOLDER%;C:\Program Files\Wireshark\;C:\Program Files\Windump\;%PATH%
- set TOX_PARALLEL_NO_SPINNER=1
# - set TOX_PARALLEL_NO_SPINNER=1
# Main unit tests
- "%PYTHON%\\python -m tox -- %UT_FLAGS%"

View File

@ -117,7 +117,8 @@ then
fi
# Launch Scapy unit tests
TOX_PARALLEL_NO_SPINNER=1 tox -- ${UT_FLAGS} || exit 1
# export TOX_PARALLEL_NO_SPINNER=1
tox -- ${UT_FLAGS} || exit 1
# Stop if NO_BASH_TESTS is set
if [ ! -z "$SIMPLE_TESTS" ]

View File

@ -71,6 +71,7 @@ from scapy.arch.libpcap import ( # noqa: E402
# Detection happens after libpcap import (NPcap detection)
NPCAP_LOOPBACK_NAME = r"\Device\NPF_Loopback"
NPCAP_LOOPBACK_NAME_LEGACY = "Npcap Loopback Adapter" # before npcap 0.9983
if conf.use_npcap:
conf.loopback_name = NPCAP_LOOPBACK_NAME
else:
@ -342,7 +343,7 @@ class NetworkInterface_Win(NetworkInterface):
try:
# Npcap loopback interface
if conf.use_npcap and self.network_name == NPCAP_LOOPBACK_NAME:
if conf.use_npcap and self.network_name == conf.loopback_name:
# https://nmap.org/npcap/guide/npcap-devguide.html
data["mac"] = "00:00:00:00:00:00"
data["ip"] = "127.0.0.1"
@ -602,14 +603,20 @@ class WindowsInterfacesProvider(InterfaceProvider):
# Try a restart
WindowsInterfacesProvider._pcap_check()
legacy_npcap_guid = None
windows_interfaces = dict()
for i in get_windows_if_list():
# Detect Loopback interface
if "Loopback" in i['name']:
i['name'] = conf.loopback_name
# Only consider interfaces with a GUID
if i['guid']:
if conf.use_npcap and i['name'] == conf.loopback_name:
i['guid'] = NPCAP_LOOPBACK_NAME
if conf.use_npcap:
# Detect the legacy Loopback interface
if i['name'] == NPCAP_LOOPBACK_NAME_LEGACY:
# Legacy Npcap (<0.9983)
legacy_npcap_guid = i['guid']
elif "Loopback" in i['name']:
# Newer Npcap
i['guid'] = conf.loopback_name
# Map interface
windows_interfaces[i['guid']] = i
def iterinterfaces() -> Iterator[
@ -621,12 +628,16 @@ class WindowsInterfacesProvider(InterfaceProvider):
for netw, if_data in conf.cache_pcapiflist.items():
name, ips, flags, _ = if_data
guid = _pcapname_to_guid(netw)
if guid == legacy_npcap_guid:
# Legacy Npcap detected !
conf.loopback_name = netw
data = windows_interfaces.get(guid, None)
yield netw, name, ips, flags, guid, data
else:
# We don't have a libpcap provider: only use Windows data
for guid, data in windows_interfaces.items():
yield guid, None, [], 0, guid, data
netw = r'\Device\NPF_' + guid if guid[0] != '\\' else guid
yield netw, None, [], 0, guid, data
index = 0
for netw, name, ips, flags, guid, data in iterinterfaces():
@ -1021,7 +1032,7 @@ class _NotAvailableSocket(SuperSocket):
# type: (*Any, **Any) -> None
raise RuntimeError(
"Sniffing and sending packets is not available at layer 2: "
"winpcap is not installed. You may use conf.L3socket or"
"winpcap is not installed. You may use conf.L3socket or "
"conf.L3socket6 to access layer 3"
)

View File

@ -6,49 +6,23 @@
"""
Native Microsoft Windows sockets (L3 only)
## Notice: ICMP packets
This uses Raw Sockets from winsock
https://learn.microsoft.com/en-us/windows/win32/winsock/tcp-ip-raw-sockets-2
DISCLAIMER: Please use Npcap/Winpcap to send/receive ICMP. It is going to work.
Below is some additional information, mainly implemented in a testing purpose.
.. note::
When in native mode, everything goes through the Windows kernel.
This firstly requires that the Firewall is open. Be sure it allows ICMPv4/6
packets in and out.
Windows may drop packets that it finds wrong. for instance, answers to
ICMP packets with id=0 or seq=0 may be dropped. It means that sent packets
should (most of the time) be perfectly built.
A perfectly built ICMP req packet on Windows means that its id is 1, its
checksum (IP and ICMP) are correctly built, but also that its seq number is
in the "allowed range".
In fact, every time an ICMP packet is sent on Windows, a global sequence
number is increased, which is only reset at boot time. The seq number of the
received ICMP packet must be in the range [current, current + 3] to be valid,
and received by the socket. The current number is quite hard to get, thus we
provide in this module the get_actual_icmp_seq() function.
Example:
>>> conf.use_pcap = False
>>> a = conf.L3socket()
# This will (most likely) work:
>>> current = get_current_icmp_seq()
>>> a.sr(IP(dst="www.google.com", ttl=128)/ICMP(id=1, seq=current))
# This won't:
>>> a.sr(IP(dst="www.google.com", ttl=128)/ICMP())
PS: on computers where the firewall isn't open, Windows temporarily opens it
when using the `ping` util from cmd.exe. One can first call a ping on cmd,
then do custom calls through the socket using get_current_icmp_seq(). See
the tests (windows.uts) for an example.
Don't use this module.
It is a proof of concept, and a worse-case-scenario failover, but you should
consider that raw sockets on Windows don't work and install Npcap to avoid using
it at all cost.
"""
import io
import os
import socket
import subprocess
import struct
import time
from scapy.automaton import select_objects
from scapy.arch.windows.structures import GetIcmpStatistics
from scapy.compat import raw
from scapy.config import conf
from scapy.data import MTU
@ -70,14 +44,31 @@ from typing import (
class L3WinSocket(SuperSocket):
"""
A L3 raw socket implementation native to Windows.
Official "Windows Limitations" from MSDN:
- TCP data cannot be sent over raw sockets.
- UDP datagrams with an invalid source address cannot be sent over raw sockets.
- For IPv6 (address family of AF_INET6), an application receives everything
after the last IPv6 header in each received datagram [...]. The application
does not receive any IPv6 headers using a raw socket.
Unofficial limitations:
- Turns out we actually don't see any incoming TCP data, only the outgoing.
We do properly see UDP, ICMP, etc. both ways though.
- To match IPv6 responses, one must use `conf.checkIPaddr = False` as we can't
get the real destination.
**To overcome those limitations, install Npcap.**
"""
desc = "a native Layer 3 (IPv4) raw socket under Windows"
nonblocking_socket = True
__selectable_force_select__ = True # see automaton.py
__slots__ = ["promisc", "cls", "ipv6", "proto"]
__slots__ = ["promisc", "cls", "ipv6"]
def __init__(self,
iface=None, # type: Optional[_GlobInterfaceType]
proto=None, # type: Optional[int]
ttl=128, # type: int
ipv6=False, # type: bool
promisc=True, # type: bool
@ -89,49 +80,28 @@ class L3WinSocket(SuperSocket):
for kwarg in kwargs:
log_runtime.warning("Dropping unsupported option: %s" % kwarg)
self.iface = iface and resolve_iface(iface) or conf.iface
if not self.iface.is_valid():
log_runtime.warning("Interface is invalid. This will fail.")
af = socket.AF_INET6 if ipv6 else socket.AF_INET
self.ipv6 = ipv6
# Proto and cls
if proto is None:
if self.ipv6:
# On IPv6, the header isn't returned with recvfrom().
# We don't want to guess if it's TCP, UDP or SCTP.. so ask for proto
# (This would be fixable if Python supported recvmsg() on Windows)
log_runtime.warning(
"Due to restrictions, 'proto' must be provided when "
"opening raw IPv6 sockets. Defaulting to socket.IPPROTO_UDP"
)
self.proto = socket.IPPROTO_UDP
else:
self.proto = socket.IPPROTO_IP
elif self.ipv6 and proto == socket.IPPROTO_TCP:
# Ah, sadly this isn't supported either.
log_runtime.warning(
"Be careful, socket.IPPROTO_TCP doesn't work in raw sockets on "
"Windows, so this is equivalent to socket.IPPROTO_IP."
)
self.proto = socket.IPPROTO_IP
else:
self.proto = proto
self.cls = IPv6 if ipv6 else IP
# Promisc
if promisc is None:
promisc = conf.sniff_promisc
self.promisc = promisc
# Notes:
# - IPPROTO_RAW only works to send packets.
# - IPPROTO_RAW is broken. We don't use it.
# - IPPROTO_IPV6 exists in MSDN docs, but using it will result in
# no packets being received. Same for its options (IPV6_HDRINCL...)
# However, using IPPROTO_IP with AF_INET6 will still receive
# the IPv6 packets
try:
# Listening on AF_INET6 IPPROTO_IPV6 is broken. Use IPPROTO_IP
self.ins = socket.socket(af,
socket.SOCK_RAW,
socket.IPPROTO_IP)
self.outs = socket.socket(af,
socket.SOCK_RAW,
socket.IPPROTO_RAW)
self.outs = self.ins = socket.socket(
af,
socket.SOCK_RAW,
socket.IPPROTO_IP,
)
except OSError as e:
if e.errno == 13:
raise OSError("Windows native L3 Raw sockets are only "
@ -139,12 +109,10 @@ class L3WinSocket(SuperSocket):
"Please install Npcap to workaround !")
raise
self.ins.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.outs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.ins.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2**30)
self.outs.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2**30)
# set TTL
self.ins.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl)
self.outs.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl)
# Get as much data as possible: reduce what is cropped
if ipv6:
# IPV6_HDRINCL is broken. Use IP_HDRINCL even on IPv6
@ -159,7 +127,6 @@ class L3WinSocket(SuperSocket):
else:
# IOCTL Include IP headers
self.ins.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
self.outs.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
try: # Not Windows XP
self.ins.setsockopt(socket.IPPROTO_IP,
socket.IP_RECVDSTADDR, 1)
@ -193,6 +160,12 @@ class L3WinSocket(SuperSocket):
if self.cls not in x:
raise Scapy_Exception("L3WinSocket can only send IP/IPv6 packets !"
" Install Npcap/Winpcap to send more")
from scapy.layers.inet import TCP
if TCP in x:
raise Scapy_Exception(
"'TCP data cannot be sent over raw socket': "
"https://learn.microsoft.com/en-us/windows/win32/winsock/tcp-ip-raw-sockets-2" # noqa: E501
)
if not self.outs:
raise Scapy_Exception("Socket not created")
dst_ip = str(x[self.cls].dst)
@ -225,14 +198,20 @@ class L3WinSocket(SuperSocket):
# AF_INET6 does not return the IPv6 header. Let's build it
# (host, port, flowinfo, scopeid)
host, _, flowinfo, _ = address
# We have to guess what the proto is. Ugly heuristics ahead :(
# Waiting for https://github.com/python/cpython/issues/80398
if len(data) > 6 and struct.unpack("!H", data[4:6])[0] == len(data):
proto = socket.IPPROTO_UDP
elif data and data[0] in range(128, 138): # ugh
proto = socket.IPPROTO_ICMPV6
else:
proto = socket.IPPROTO_TCP
header = raw(
IPv6(
src=host,
dst="::",
fl=flowinfo,
# when IPPROTO_IP (0) is selected, we have no idea what's nh,
# so set an invalid value.
nh=self.proto or 0xFF,
nh=proto or 0xFF,
plen=len(data)
)
)
@ -262,22 +241,3 @@ class L3WinSocket6(L3WinSocket):
ipv6=True,
**kwargs,
)
def open_icmp_firewall(host):
# type: (str) -> int
"""Temporarily open the ICMP firewall. Tricks Windows into allowing
ICMP packets for a short period of time (~ 1 minute)"""
# We call ping with a timeout of 1ms: will return instantly
with open(os.devnull, 'wb') as DEVNULL:
return subprocess.Popen("ping -4 -w 1 -n 1 %s" % host,
shell=True,
stdout=DEVNULL,
stderr=DEVNULL).wait()
def get_current_icmp_seq():
# type: () -> int
"""See help(scapy.arch.windows.native) for more information.
Returns the current ICMP seq number."""
return GetIcmpStatistics()['stats']['icmpOutStats']['dwEchos']

View File

@ -205,52 +205,6 @@ class SOCKADDR_INET(ctypes.Union):
("Ipv6", sockaddr_in6),
("si_family", USHORT)]
##############################
######### ICMP stats #########
##############################
class MIBICMPSTATS(Structure):
_fields_ = [("dwMsgs", DWORD),
("dwErrors", DWORD),
("dwDestUnreachs", DWORD),
("dwTimeExcds", DWORD),
("dwParmProbs", DWORD),
("dwSrcQuenchs", DWORD),
("dwRedirects", DWORD),
("dwEchos", DWORD),
("dwEchoReps", DWORD),
("dwTimestamps", DWORD),
("dwTimestampReps", DWORD),
("dwAddrMasks", DWORD),
("dwAddrMaskReps", DWORD)]
class MIBICMPINFO(Structure):
_fields_ = [("icmpInStats", MIBICMPSTATS),
("icmpOutStats", MIBICMPSTATS)]
class MIB_ICMP(Structure):
_fields_ = [("stats", MIBICMPINFO)]
PMIB_ICMP = POINTER(MIB_ICMP)
# Func
_GetIcmpStatistics = WINFUNCTYPE(ULONG, PMIB_ICMP)(
('GetIcmpStatistics', iphlpapi))
def GetIcmpStatistics():
# type: () -> Dict[str, Dict[str, Dict[str, int]]]
"""Return all Windows ICMP stats from iphlpapi"""
statistics = MIB_ICMP()
_GetIcmpStatistics(byref(statistics))
results = _struct_to_dict(statistics)
del statistics
return results
##############################
##### Adapters Addresses #####
@ -668,4 +622,4 @@ def _win_fifo_open(fd: Any) -> IO[bytes]:
def close(self) -> None:
# ignore failures
ctypes.windll.kernel32.CloseHandle(fd)
return _opened() # type: ignore
return _opened() # type: ignore

View File

@ -63,7 +63,10 @@ class AS_resolver:
self.s.send(("%s\n" % ip).encode("utf8"))
x = b""
while not (b"%" in x or b"source" in x):
x += self.s.recv(8192)
d = self.s.recv(8192)
if not d:
break
x += d
asn, desc = self._parse_whois(x)
return ip, asn, desc

View File

@ -57,6 +57,10 @@ from typing import (
from scapy.compat import DecoratorCallable
# winsock.h
FD_READ = 0x00000001
def select_objects(inputs, remain):
# type: (Iterable[Any], Union[float, int, None]) -> List[Any]
"""
@ -83,12 +87,26 @@ def select_objects(inputs, remain):
"""
if not WINDOWS:
return select.select(inputs, [], [], remain)[0]
natives = []
inputs = list(inputs)
events = []
created = []
results = set()
for i in list(inputs):
for i in inputs:
if getattr(i, "__selectable_force_select__", False):
natives.append(i)
# Native socket.socket object. We would normally use select.select.
evt = ctypes.windll.ws2_32.WSACreateEvent()
created.append(evt)
res = ctypes.windll.ws2_32.WSAEventSelect(
ctypes.c_void_p(i.fileno()),
evt,
FD_READ
)
if res == 0:
# Was a socket
events.append(evt)
else:
# Fallback to normal event
events.append(i.fileno())
elif i.fileno() < 0:
# Special case: On Windows, we consider that an object that returns
# a negative fileno (impossible), is always readable. This is used
@ -96,18 +114,13 @@ def select_objects(inputs, remain):
# no valid fileno (and will stop on EOFError).
results.add(i)
else:
events.append(i)
if natives:
results = results.union(set(select.select(natives, [], [], remain)[0]))
if results:
# We have native results, poll.
remain = 0
events.append(i.fileno())
if events:
# 0xFFFFFFFF = INFINITE
remainms = int(remain * 1000 if remain is not None else 0xFFFFFFFF)
if len(events) == 1:
res = ctypes.windll.kernel32.WaitForSingleObject(
ctypes.c_void_p(events[0].fileno()),
ctypes.c_void_p(events[0]),
remainms
)
else:
@ -117,22 +130,25 @@ def select_objects(inputs, remain):
res = ctypes.windll.kernel32.WaitForMultipleObjects(
len(events),
(ctypes.c_void_p * len(events))(
*[x.fileno() for x in events]
*events
),
False,
remainms
)
if res != 0xFFFFFFFF and res != 0x00000102: # Failed or Timeout
results.add(events[res])
results.add(inputs[res])
if len(events) > 1:
# Now poll the others, if any
for evt in events:
for i, evt in enumerate(events):
res = ctypes.windll.kernel32.WaitForSingleObject(
ctypes.c_void_p(evt.fileno()),
ctypes.c_void_p(evt),
0 # poll: don't wait
)
if res == 0:
results.add(evt)
results.add(inputs[i])
# Cleanup created events, if any
for evt in created:
ctypes.windll.ws2_32.WSACloseEvent(evt)
return list(results)

View File

@ -769,10 +769,7 @@ class HTTP_Client(object):
def _connect_or_reuse(self, host, port=None, tls=False, timeout=5):
# Get the port
if port is None:
if tls:
port = 443
else:
port = 80
port = 443 if tls else 80
# If the current socket matches, keep it.
if self._sockinfo == (host, port):
return
@ -800,13 +797,15 @@ class HTTP_Client(object):
)
if tls:
if self.sslcontext is None:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if self.no_check_certificate:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
context = ssl.create_default_context()
else:
context = self.sslcontext
sock = context.wrap_socket(sock)
sock = context.wrap_socket(sock, server_hostname=host)
self.sock = SSLStreamSocket(sock, HTTP)
else:
self.sock = StreamSocket(sock, HTTP)
@ -918,14 +917,14 @@ class HTTP_Client(object):
self.sock.close()
def http_request(host, path="/", port=80, timeout=3,
display=False, verbose=0, **headers):
def http_request(host, path="/", port=None, timeout=3,
display=False, tls=False, verbose=0, **headers):
"""
Util to perform an HTTP request.
:param host: the host to connect to
:param path: the path of the request (default /)
:param port: the port (default 80)
:param port: the port (default 80/443)
:param timeout: timeout before None is returned
:param display: display the result in the default browser (default False)
:param iface: interface to use. Changing this turns on "raw"
@ -934,8 +933,10 @@ def http_request(host, path="/", port=80, timeout=3,
:returns: the HTTPResponse packet
"""
client = HTTP_Client(HTTP_AUTH_MECHS.NONE, verb=verbose)
if port is None:
port = 443 if tls else 80
ans = client.request(
"http://%s:%s%s" % (host, port, path),
"http%s://%s:%s%s" % (tls and "s" or "", host, port, path),
timeout=timeout,
)

View File

@ -17,6 +17,7 @@ import functools
import hashlib
import struct
from scapy.automaton import select_objects
from scapy.config import conf, crypto_validator
from scapy.error import log_runtime
from scapy.packet import Packet, bind_layers, bind_top_down
@ -4237,7 +4238,7 @@ class SMBStreamSocket(StreamSocket):
def select(sockets, remain=conf.recv_poll_rate):
if any(getattr(x, "queue", None) for x in sockets):
return [x for x in sockets if isinstance(x, SMBStreamSocket) and x.queue]
return StreamSocket.select(sockets, remain=remain)
return select_objects(sockets, remain=remain)
class SMBSession(DefaultSession):

View File

@ -1710,7 +1710,10 @@ class smbserver:
Close the smbserver if started in background mode (bg=True)
"""
if self.srv:
self.srv.shutdown(socket.SHUT_RDWR)
try:
self.srv.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self.srv.close()

View File

@ -55,13 +55,13 @@ def load_nss_keys(filename):
try:
client_random = binascii.unhexlify(data[1])
except binascii.Error:
except ValueError:
warning("Invalid ClientRandom: %s", data[1])
return {}
try:
secret = binascii.unhexlify(data[2])
except binascii.Error:
except ValueError:
warning("Invalid Secret: %s", data[2])
return {}

View File

@ -515,7 +515,10 @@ class SSLStreamSocket(StreamSocket):
if x is None:
x = MTU
# Block
data = self.ins.recv(x)
try:
data = self.ins.recv(x)
except OSError:
raise EOFError
try:
pkt = self.sess.process(data, cls=self.basecls) # type: ignore
except struct.error:
@ -527,6 +530,18 @@ class SSLStreamSocket(StreamSocket):
return self.recv(x)
return pkt
@staticmethod
def select(sockets, remain=None):
# type: (List[SuperSocket], Optional[float]) -> List[SuperSocket]
queued = [
x
for x in sockets
if isinstance(x, SSLStreamSocket) and x.sess.data
]
if queued:
return queued # type: ignore
return super(SSLStreamSocket, SSLStreamSocket).select(sockets, remain=remain)
class L2ListenTcpdump(SuperSocket):
desc = "read packets at layer 2 using tcpdump"

View File

@ -23,9 +23,12 @@
"test\\scapy\\layers\\tls\\*.uts": "load_layer(\"tls\")"
},
"kw_ko": [
"as_resolvers",
"brotli",
"broken_windows",
"ipv6",
"linux",
"native_tls13",
"mock_read_routes_bsd",
"open_ssl_client",
"osx",

View File

@ -24,6 +24,7 @@
"kw_ko": [
"osx",
"linux",
"broken_windows",
"crypto_advanced",
"mock_read_routes_bsd",
"appveyor_only",

View File

@ -524,6 +524,7 @@ server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSslSocket
~ broken_windows
certstring = """
LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB
@ -626,6 +627,7 @@ server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSslSocket6
~ broken_windows
server_up = threading.Event()
def server():
@ -662,6 +664,7 @@ server_thread.join(timeout=1)
assert len(pkts) == 2
= Test UDS_DoIPSslSocket6
~ broken_windows
server_up = threading.Event()
def server():
@ -698,6 +701,7 @@ server_thread.join(timeout=1)
assert len(pkts) == 2
= Test UDS_DualDoIPSslSocket6
~ broken_windows
server_tcp_up = threading.Event()
server_tls_up = threading.Event()
@ -755,4 +759,4 @@ sock = UDS_DoIPSocket6(ip="::1", context=context)
pkts = sock.sniff(timeout=1, count=2)
server_tcp_thread.join(timeout=1)
server_tls_thread.join(timeout=1)
assert len(pkts) == 2
assert len(pkts) == 2

View File

@ -1830,7 +1830,7 @@ def _test():
retry_test(_test)
= Whois request
~ netaccess IP
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():
IP(src="8.8.8.8").whois()
@ -1874,7 +1874,7 @@ assert len(tmp) == 3
assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496']
= AS resolver - IPv6
~ netaccess IP
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():

View File

@ -486,7 +486,10 @@ def run_tls_native_test_server(post_handshake_auth=False,
assert resp == bytes(REQS[1])
ssl_client_socket.send(bytes(RESPS[1]))
# close socket
server.close()
try:
server.shutdown(socket.SHUT_RDWR)
finally:
server.close()
server = threading.Thread(target=ssl_server)
server.start()
@ -524,11 +527,15 @@ def test_tls_client_native(post_handshake_auth=False,
assert not server.is_alive()
# XXX: Ugh, Appveyor uses an ancient Windows 10 build that doesn't support TLS 1.3 natively.
= Testing TLS client against ssl.SSLContext server with TLS 1.3 and a post-handshake authentication
~ native_tls13
test_tls_client_native(post_handshake_auth=True)
= Testing TLS client against ssl.SSLContext server with TLS 1.3 and a Hello-Retry request
~ native_tls13
test_tls_client_native(with_hello_retry=True)

View File

@ -40,6 +40,7 @@ from scapy.config import conf
assert dev_from_networkname(conf.iface.network_name).guid == conf.iface.guid
= test pcap_service_status
~ npcap_service
from scapy.arch.windows import pcap_service_status
@ -54,16 +55,20 @@ print(get_if_list())
assert all(x.startswith(r"\Device\NPF_") for x in get_if_list())
= test pcap_service_stop
~ appveyor_only require_gui
~ appveyor_only require_gui npcap_service
from scapy.arch.windows import pcap_service_stop
pcap_service_stop()
assert pcap_service_status()[2] == False
assert pcap_service_status() == False
= test pcap_service_start
~ appveyor_only require_gui
~ appveyor_only require_gui npcap_service
from scapy.arch.windows import pcap_service_start
pcap_service_start()
assert pcap_service_status()[2] == True
assert pcap_service_status() == True
= Test auto-pcap start UI
@ -86,39 +91,23 @@ finally:
= Set up native mode
conf.use_pcap = False
conf.route.resync()
conf.ifaces.reload()
assert conf.use_pcap == False
= Prepare ping: open firewall & get current seq number
~ netaccess needs_root
from scapy.arch.windows.native import open_icmp_firewall, get_current_icmp_seq
# Note: this method is complicated, but allow us to perform a real test
# it is discouraged otherwise. Npcap/Winpcap does NOT require such mechanics
# output of this may vary, but it doesn't matter:
# if it fails the teat below won't work
open_icmp_firewall("www.google.com")
seq = get_current_icmp_seq()
assert seq > 0
True
= Ping
~ netaccess needs_root
def _test():
with conf.L3socket() as a:
answer = a.sr1(IP(dst="www.google.com", ttl=128)/ICMP(id=1, seq=seq)/"abcdefghijklmnopqrstuvwabcdefghi", timeout=2)
answer = a.sr1(IP(dst="1.1.1.1", ttl=128)/ICMP()/"abcdefghijklmnopqrstuvwabcdefghi", timeout=2)
answer.show()
assert ICMP in answer
retry_test(_test)
= DNS lookup
~ netaccess needs_root require_gui
% XXX currently disabled
~ netaccess needs_root
def _test():
answer = sr1(IP(dst="8.8.8.8")/UDP()/DNS(rd=1, qd=DNSQR(qname="www.google.com")), timeout=2)
@ -131,4 +120,6 @@ retry_test(_test)
= Leave native mode
conf.use_pcap = True
conf.route.resync()
conf.ifaces.reload()
assert conf.use_pcap == True