From d2deffefa18653636eb03ea77a5dab6e4febf6c6 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Tue, 13 Sep 2022 16:50:27 -0400 Subject: [PATCH] bugfix: write to another transport in resume_writing() fails (#498) Fixes #496 --- tests/test_tcp.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++ uvloop/loop.pxd | 2 +- uvloop/loop.pyx | 28 +++++++++++-------------- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/tests/test_tcp.py b/tests/test_tcp.py index aa86d65..375f330 100644 --- a/tests/test_tcp.py +++ b/tests/test_tcp.py @@ -654,6 +654,59 @@ class _TestTCP: self.assertIsNone( self.loop.run_until_complete(connection_lost_called)) + def test_resume_writing_write_different_transport(self): + loop = self.loop + + class P1(asyncio.Protocol): + def __init__(self, t2): + self.t2 = t2 + self.paused = False + self.waiter = loop.create_future() + + def data_received(self, data): + self.waiter.set_result(data) + + def pause_writing(self): + self.paused = True + + def resume_writing(self): + self.paused = False + self.t2.write(b'hello') + + s1, s2 = socket.socketpair() + s1.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) + s2.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) + + async def _test(t1, p1, t2): + t1.set_write_buffer_limits(1024, 1023) + + # fill s1 up first + t2.pause_reading() + while not p1.paused: + t1.write(b' ' * 1024) + + # trigger resume_writing() in _exec_queued_writes() with tight loop + t2.resume_reading() + while p1.paused: + t1.write(b' ') + await asyncio.sleep(0) + + # t2.write() in p1.resume_writing() should work fine + data = await asyncio.wait_for(p1.waiter, 5) + self.assertEqual(data, b'hello') + + async def test(): + t2, _ = await loop.create_connection(asyncio.Protocol, sock=s2) + t1, p1 = await loop.create_connection(lambda: P1(t2), sock=s1) + try: + await _test(t1, p1, t2) + finally: + t1.close() + t2.close() + + with s1, s2: + loop.run_until_complete(test()) + class Test_UV_TCP(_TestTCP, tb.UVTestCase): diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index 2080dfe..5613473 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -49,7 +49,7 @@ cdef class Loop: object _exception_handler object _default_executor object _ready - set _queued_streams + set _queued_streams, _executing_streams Py_ssize_t _ready_len set _servers diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index afaa94e..1a735b1 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -177,6 +177,7 @@ cdef class Loop: self._default_executor = None self._queued_streams = set() + self._executing_streams = set() self._ready = col_deque() self._ready_len = 0 @@ -645,25 +646,20 @@ cdef class Loop: cdef: UVStream stream - int queued_len - if UVLOOP_DEBUG: - queued_len = len(self._queued_streams) - - for pystream in self._queued_streams: - stream = pystream - stream._exec_write() - - if UVLOOP_DEBUG: - if len(self._queued_streams) != queued_len: - raise RuntimeError( - 'loop._queued_streams are not empty after ' - '_exec_queued_writes') - - self._queued_streams.clear() + streams = self._queued_streams + self._queued_streams = self._executing_streams + self._executing_streams = streams + try: + for pystream in streams: + stream = pystream + stream._exec_write() + finally: + streams.clear() if self.handler_check__exec_writes.running: - self.handler_check__exec_writes.stop() + if len(self._queued_streams) == 0: + self.handler_check__exec_writes.stop() cdef inline _call_soon(self, object callback, object args, object context): cdef Handle handle