bugfix: write to another transport in resume_writing() fails (#498)

Fixes #496
This commit is contained in:
Fantix King 2022-09-13 16:50:27 -04:00 committed by GitHub
parent 25b5f1e557
commit d2deffefa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 17 deletions

View File

@ -654,6 +654,59 @@ class _TestTCP:
self.assertIsNone(
self.loop.run_until_complete(connection_lost_called))
def test_resume_writing_write_different_transport(self):
loop = self.loop
class P1(asyncio.Protocol):
def __init__(self, t2):
self.t2 = t2
self.paused = False
self.waiter = loop.create_future()
def data_received(self, data):
self.waiter.set_result(data)
def pause_writing(self):
self.paused = True
def resume_writing(self):
self.paused = False
self.t2.write(b'hello')
s1, s2 = socket.socketpair()
s1.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
s2.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
async def _test(t1, p1, t2):
t1.set_write_buffer_limits(1024, 1023)
# fill s1 up first
t2.pause_reading()
while not p1.paused:
t1.write(b' ' * 1024)
# trigger resume_writing() in _exec_queued_writes() with tight loop
t2.resume_reading()
while p1.paused:
t1.write(b' ')
await asyncio.sleep(0)
# t2.write() in p1.resume_writing() should work fine
data = await asyncio.wait_for(p1.waiter, 5)
self.assertEqual(data, b'hello')
async def test():
t2, _ = await loop.create_connection(asyncio.Protocol, sock=s2)
t1, p1 = await loop.create_connection(lambda: P1(t2), sock=s1)
try:
await _test(t1, p1, t2)
finally:
t1.close()
t2.close()
with s1, s2:
loop.run_until_complete(test())
class Test_UV_TCP(_TestTCP, tb.UVTestCase):

View File

@ -49,7 +49,7 @@ cdef class Loop:
object _exception_handler
object _default_executor
object _ready
set _queued_streams
set _queued_streams, _executing_streams
Py_ssize_t _ready_len
set _servers

View File

@ -177,6 +177,7 @@ cdef class Loop:
self._default_executor = None
self._queued_streams = set()
self._executing_streams = set()
self._ready = col_deque()
self._ready_len = 0
@ -645,25 +646,20 @@ cdef class Loop:
cdef:
UVStream stream
int queued_len
if UVLOOP_DEBUG:
queued_len = len(self._queued_streams)
for pystream in self._queued_streams:
stream = <UVStream>pystream
stream._exec_write()
if UVLOOP_DEBUG:
if len(self._queued_streams) != queued_len:
raise RuntimeError(
'loop._queued_streams are not empty after '
'_exec_queued_writes')
self._queued_streams.clear()
streams = self._queued_streams
self._queued_streams = self._executing_streams
self._executing_streams = streams
try:
for pystream in streams:
stream = <UVStream>pystream
stream._exec_write()
finally:
streams.clear()
if self.handler_check__exec_writes.running:
self.handler_check__exec_writes.stop()
if len(self._queued_streams) == 0:
self.handler_check__exec_writes.stop()
cdef inline _call_soon(self, object callback, object args, object context):
cdef Handle handle