mirror of https://github.com/MagicStack/uvloop.git
bugfix: write to another transport in resume_writing() fails (#498)
Fixes #496
This commit is contained in:
parent
25b5f1e557
commit
d2deffefa1
|
@ -654,6 +654,59 @@ class _TestTCP:
|
|||
self.assertIsNone(
|
||||
self.loop.run_until_complete(connection_lost_called))
|
||||
|
||||
def test_resume_writing_write_different_transport(self):
|
||||
loop = self.loop
|
||||
|
||||
class P1(asyncio.Protocol):
|
||||
def __init__(self, t2):
|
||||
self.t2 = t2
|
||||
self.paused = False
|
||||
self.waiter = loop.create_future()
|
||||
|
||||
def data_received(self, data):
|
||||
self.waiter.set_result(data)
|
||||
|
||||
def pause_writing(self):
|
||||
self.paused = True
|
||||
|
||||
def resume_writing(self):
|
||||
self.paused = False
|
||||
self.t2.write(b'hello')
|
||||
|
||||
s1, s2 = socket.socketpair()
|
||||
s1.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
|
||||
s2.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
|
||||
|
||||
async def _test(t1, p1, t2):
|
||||
t1.set_write_buffer_limits(1024, 1023)
|
||||
|
||||
# fill s1 up first
|
||||
t2.pause_reading()
|
||||
while not p1.paused:
|
||||
t1.write(b' ' * 1024)
|
||||
|
||||
# trigger resume_writing() in _exec_queued_writes() with tight loop
|
||||
t2.resume_reading()
|
||||
while p1.paused:
|
||||
t1.write(b' ')
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# t2.write() in p1.resume_writing() should work fine
|
||||
data = await asyncio.wait_for(p1.waiter, 5)
|
||||
self.assertEqual(data, b'hello')
|
||||
|
||||
async def test():
|
||||
t2, _ = await loop.create_connection(asyncio.Protocol, sock=s2)
|
||||
t1, p1 = await loop.create_connection(lambda: P1(t2), sock=s1)
|
||||
try:
|
||||
await _test(t1, p1, t2)
|
||||
finally:
|
||||
t1.close()
|
||||
t2.close()
|
||||
|
||||
with s1, s2:
|
||||
loop.run_until_complete(test())
|
||||
|
||||
|
||||
class Test_UV_TCP(_TestTCP, tb.UVTestCase):
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ cdef class Loop:
|
|||
object _exception_handler
|
||||
object _default_executor
|
||||
object _ready
|
||||
set _queued_streams
|
||||
set _queued_streams, _executing_streams
|
||||
Py_ssize_t _ready_len
|
||||
|
||||
set _servers
|
||||
|
|
|
@ -177,6 +177,7 @@ cdef class Loop:
|
|||
self._default_executor = None
|
||||
|
||||
self._queued_streams = set()
|
||||
self._executing_streams = set()
|
||||
self._ready = col_deque()
|
||||
self._ready_len = 0
|
||||
|
||||
|
@ -645,25 +646,20 @@ cdef class Loop:
|
|||
|
||||
cdef:
|
||||
UVStream stream
|
||||
int queued_len
|
||||
|
||||
if UVLOOP_DEBUG:
|
||||
queued_len = len(self._queued_streams)
|
||||
|
||||
for pystream in self._queued_streams:
|
||||
stream = <UVStream>pystream
|
||||
stream._exec_write()
|
||||
|
||||
if UVLOOP_DEBUG:
|
||||
if len(self._queued_streams) != queued_len:
|
||||
raise RuntimeError(
|
||||
'loop._queued_streams are not empty after '
|
||||
'_exec_queued_writes')
|
||||
|
||||
self._queued_streams.clear()
|
||||
streams = self._queued_streams
|
||||
self._queued_streams = self._executing_streams
|
||||
self._executing_streams = streams
|
||||
try:
|
||||
for pystream in streams:
|
||||
stream = <UVStream>pystream
|
||||
stream._exec_write()
|
||||
finally:
|
||||
streams.clear()
|
||||
|
||||
if self.handler_check__exec_writes.running:
|
||||
self.handler_check__exec_writes.stop()
|
||||
if len(self._queued_streams) == 0:
|
||||
self.handler_check__exec_writes.stop()
|
||||
|
||||
cdef inline _call_soon(self, object callback, object args, object context):
|
||||
cdef Handle handle
|
||||
|
|
Loading…
Reference in New Issue