mirror of https://github.com/python/cpython.git
Issue #1677694: Refactor and improve test_timeout. Original patch by
Björn Lindqvist.
This commit is contained in:
parent
d53dfa3fb1
commit
ca023cab4d
|
@ -7,6 +7,7 @@
|
|||
skip_expected = not support.is_resource_enabled('network')
|
||||
|
||||
import time
|
||||
import errno
|
||||
import socket
|
||||
|
||||
|
||||
|
@ -101,8 +102,29 @@ class TimeoutTestCase(unittest.TestCase):
|
|||
def setUp(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def tearDown(self):
|
||||
self.sock.close()
|
||||
tearDown = setUp
|
||||
|
||||
def _sock_operation(self, count, timeout, method, *args):
|
||||
"""
|
||||
Test the specified socket method.
|
||||
|
||||
The method is run at most `count` times and must raise a socket.timeout
|
||||
within `timeout` + self.fuzz seconds.
|
||||
"""
|
||||
self.sock.settimeout(timeout)
|
||||
method = getattr(self.sock, method)
|
||||
for i in range(count):
|
||||
t1 = time.time()
|
||||
try:
|
||||
method(*args)
|
||||
except socket.timeout as e:
|
||||
delta = time.time() - t1
|
||||
break
|
||||
else:
|
||||
self.fail('socket.timeout was not raised')
|
||||
# These checks should account for timing unprecision
|
||||
self.assertLess(delta, timeout + self.fuzz)
|
||||
self.assertGreater(delta, timeout - 1.0)
|
||||
|
||||
|
||||
class TCPTimeoutTestCase(TimeoutTestCase):
|
||||
|
@ -112,6 +134,9 @@ def setUp(self):
|
|||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.addr_remote = ('www.python.org.', 80)
|
||||
|
||||
def tearDown(self):
|
||||
self.sock.close()
|
||||
|
||||
def testConnectTimeout(self):
|
||||
# Choose a private address that is unlikely to exist to prevent
|
||||
# failures due to the connect succeeding before the timeout.
|
||||
|
@ -119,68 +144,48 @@ def testConnectTimeout(self):
|
|||
# with the connect time. This avoids failing the assertion that
|
||||
# the timeout occurred fast enough.
|
||||
addr = ('10.0.0.0', 12345)
|
||||
|
||||
# Test connect() timeout
|
||||
_timeout = 0.001
|
||||
self.sock.settimeout(_timeout)
|
||||
|
||||
_t1 = time.time()
|
||||
self.assertRaises(socket.error, self.sock.connect, addr)
|
||||
_t2 = time.time()
|
||||
|
||||
_delta = abs(_t1 - _t2)
|
||||
self.assertTrue(_delta < _timeout + self.fuzz,
|
||||
"timeout (%g) is more than %g seconds more than expected (%g)"
|
||||
%(_delta, self.fuzz, _timeout))
|
||||
with support.transient_internet(addr[0]):
|
||||
self._sock_operation(1, 0.001, 'connect', addr)
|
||||
|
||||
def testRecvTimeout(self):
|
||||
# Test recv() timeout
|
||||
_timeout = 0.02
|
||||
|
||||
with support.transient_internet(self.addr_remote[0]):
|
||||
self.sock.connect(self.addr_remote)
|
||||
self.sock.settimeout(_timeout)
|
||||
|
||||
_t1 = time.time()
|
||||
self.assertRaises(socket.timeout, self.sock.recv, 1024)
|
||||
_t2 = time.time()
|
||||
|
||||
_delta = abs(_t1 - _t2)
|
||||
self.assertTrue(_delta < _timeout + self.fuzz,
|
||||
"timeout (%g) is %g seconds more than expected (%g)"
|
||||
%(_delta, self.fuzz, _timeout))
|
||||
self._sock_operation(1, 1.5, 'recv', 1024)
|
||||
|
||||
def testAcceptTimeout(self):
|
||||
# Test accept() timeout
|
||||
_timeout = 2
|
||||
self.sock.settimeout(_timeout)
|
||||
# Prevent "Address already in use" socket exceptions
|
||||
support.bind_port(self.sock, self.localhost)
|
||||
self.sock.listen(5)
|
||||
|
||||
_t1 = time.time()
|
||||
self.assertRaises(socket.error, self.sock.accept)
|
||||
_t2 = time.time()
|
||||
|
||||
_delta = abs(_t1 - _t2)
|
||||
self.assertTrue(_delta < _timeout + self.fuzz,
|
||||
"timeout (%g) is %g seconds more than expected (%g)"
|
||||
%(_delta, self.fuzz, _timeout))
|
||||
self._sock_operation(1, 1.5, 'accept')
|
||||
|
||||
def testSend(self):
|
||||
# Test send() timeout
|
||||
# couldn't figure out how to test it
|
||||
pass
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
|
||||
support.bind_port(serv, self.localhost)
|
||||
serv.listen(5)
|
||||
self.sock.connect(serv.getsockname())
|
||||
# Send a lot of data in order to bypass buffering in the TCP stack.
|
||||
self._sock_operation(100, 1.5, 'send', b"X" * 200000)
|
||||
|
||||
def testSendto(self):
|
||||
# Test sendto() timeout
|
||||
# couldn't figure out how to test it
|
||||
pass
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
|
||||
support.bind_port(serv, self.localhost)
|
||||
serv.listen(5)
|
||||
self.sock.connect(serv.getsockname())
|
||||
# The address argument is ignored since we already connected.
|
||||
self._sock_operation(100, 1.5, 'sendto', b"X" * 200000,
|
||||
serv.getsockname())
|
||||
|
||||
def testSendall(self):
|
||||
# Test sendall() timeout
|
||||
# couldn't figure out how to test it
|
||||
pass
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
|
||||
support.bind_port(serv, self.localhost)
|
||||
serv.listen(5)
|
||||
self.sock.connect(serv.getsockname())
|
||||
# Send a lot of data in order to bypass buffering in the TCP stack.
|
||||
self._sock_operation(100, 1.5, 'sendall', b"X" * 200000)
|
||||
|
||||
|
||||
class UDPTimeoutTestCase(TimeoutTestCase):
|
||||
|
@ -189,21 +194,14 @@ class UDPTimeoutTestCase(TimeoutTestCase):
|
|||
def setUp(self):
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
def tearDown(self):
|
||||
self.sock.close()
|
||||
|
||||
def testRecvfromTimeout(self):
|
||||
# Test recvfrom() timeout
|
||||
_timeout = 2
|
||||
self.sock.settimeout(_timeout)
|
||||
# Prevent "Address already in use" socket exceptions
|
||||
support.bind_port(self.sock, self.localhost)
|
||||
|
||||
_t1 = time.time()
|
||||
self.assertRaises(socket.error, self.sock.recvfrom, 8192)
|
||||
_t2 = time.time()
|
||||
|
||||
_delta = abs(_t1 - _t2)
|
||||
self.assertTrue(_delta < _timeout + self.fuzz,
|
||||
"timeout (%g) is %g seconds more than expected (%g)"
|
||||
%(_delta, self.fuzz, _timeout))
|
||||
self._sock_operation(1, 1.5, 'recvfrom', 1024)
|
||||
|
||||
|
||||
def test_main():
|
||||
|
|
Loading…
Reference in New Issue