mirror of https://github.com/MagicStack/uvloop.git
forcely add UV_HANDLE_READABLE on pipe_t
* in order to detect peer close on O_WRONLY pipe_t
* partially reverted d8fe153
* refs libuv/libuv#2058
* refs #317
* fixes #311, fixes #312
This commit is contained in:
parent
506a2aa1eb
commit
5d41af8079
|
@ -43,6 +43,7 @@ class MyReadPipeProto(asyncio.Protocol):
|
||||||
|
|
||||||
class MyWritePipeProto(asyncio.BaseProtocol):
|
class MyWritePipeProto(asyncio.BaseProtocol):
|
||||||
done = None
|
done = None
|
||||||
|
paused = False
|
||||||
|
|
||||||
def __init__(self, loop=None):
|
def __init__(self, loop=None):
|
||||||
self.state = 'INITIAL'
|
self.state = 'INITIAL'
|
||||||
|
@ -61,6 +62,12 @@ class MyWritePipeProto(asyncio.BaseProtocol):
|
||||||
if self.done:
|
if self.done:
|
||||||
self.done.set_result(None)
|
self.done.set_result(None)
|
||||||
|
|
||||||
|
def pause_writing(self):
|
||||||
|
self.paused = True
|
||||||
|
|
||||||
|
def resume_writing(self):
|
||||||
|
self.paused = False
|
||||||
|
|
||||||
|
|
||||||
class _BasePipeTest:
|
class _BasePipeTest:
|
||||||
def test_read_pipe(self):
|
def test_read_pipe(self):
|
||||||
|
@ -241,6 +248,29 @@ class _BasePipeTest:
|
||||||
self.loop.run_until_complete(proto.done)
|
self.loop.run_until_complete(proto.done)
|
||||||
self.assertEqual('CLOSED', proto.state)
|
self.assertEqual('CLOSED', proto.state)
|
||||||
|
|
||||||
|
def test_write_buffer_full(self):
|
||||||
|
rpipe, wpipe = os.pipe()
|
||||||
|
pipeobj = io.open(wpipe, 'wb', 1024)
|
||||||
|
|
||||||
|
proto = MyWritePipeProto(loop=self.loop)
|
||||||
|
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
|
||||||
|
transport, p = self.loop.run_until_complete(connect)
|
||||||
|
self.assertIs(p, proto)
|
||||||
|
self.assertIs(transport, proto.transport)
|
||||||
|
self.assertEqual('CONNECTED', proto.state)
|
||||||
|
|
||||||
|
for i in range(32):
|
||||||
|
transport.write(b'x' * 32768)
|
||||||
|
if proto.paused:
|
||||||
|
transport.write(b'x' * 32768)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.fail("Didn't reach a full buffer")
|
||||||
|
|
||||||
|
os.close(rpipe)
|
||||||
|
self.loop.run_until_complete(asyncio.wait_for(proto.done, 1))
|
||||||
|
self.assertEqual('CLOSED', proto.state)
|
||||||
|
|
||||||
|
|
||||||
class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase):
|
class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -28,10 +28,6 @@ cdef class ReadUnixTransport(UVStream):
|
||||||
|
|
||||||
cdef class WriteUnixTransport(UVStream):
|
cdef class WriteUnixTransport(UVStream):
|
||||||
|
|
||||||
cdef:
|
|
||||||
uv.uv_poll_t disconnect_listener
|
|
||||||
bint disconnect_listener_inited
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
|
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
|
||||||
object waiter)
|
object waiter)
|
||||||
|
|
|
@ -12,6 +12,9 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
|
||||||
err = uv.uv_pipe_init(handle._loop.uvloop,
|
err = uv.uv_pipe_init(handle._loop.uvloop,
|
||||||
<uv.uv_pipe_t*>handle._handle,
|
<uv.uv_pipe_t*>handle._handle,
|
||||||
0)
|
0)
|
||||||
|
# UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
|
||||||
|
# even if it is O_WRONLY, see also #317, libuv/libuv#2058
|
||||||
|
handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE
|
||||||
if err < 0:
|
if err < 0:
|
||||||
handle._abort_init()
|
handle._abort_init()
|
||||||
raise convert_error(err)
|
raise convert_error(err)
|
||||||
|
@ -147,10 +150,6 @@ cdef class ReadUnixTransport(UVStream):
|
||||||
@cython.no_gc_clear
|
@cython.no_gc_clear
|
||||||
cdef class WriteUnixTransport(UVStream):
|
cdef class WriteUnixTransport(UVStream):
|
||||||
|
|
||||||
def __cinit__(self):
|
|
||||||
self.disconnect_listener_inited = False
|
|
||||||
self.disconnect_listener.data = NULL
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
|
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
|
||||||
object waiter):
|
object waiter):
|
||||||
|
@ -167,46 +166,6 @@ cdef class WriteUnixTransport(UVStream):
|
||||||
__pipe_init_uv_handle(<UVStream>handle, loop)
|
__pipe_init_uv_handle(<UVStream>handle, loop)
|
||||||
return handle
|
return handle
|
||||||
|
|
||||||
cdef _start_reading(self):
|
|
||||||
# A custom implementation for monitoring for EOF:
|
|
||||||
# libuv since v1.23.1 prohibits using uv_read_start on
|
|
||||||
# write-only FDs, so we use a throw-away uv_poll_t handle
|
|
||||||
# for that purpose, as suggested in
|
|
||||||
# https://github.com/libuv/libuv/issues/2058.
|
|
||||||
|
|
||||||
cdef int err
|
|
||||||
|
|
||||||
if not self.disconnect_listener_inited:
|
|
||||||
err = uv.uv_poll_init(self._loop.uvloop,
|
|
||||||
&self.disconnect_listener,
|
|
||||||
self._fileno())
|
|
||||||
if err < 0:
|
|
||||||
raise convert_error(err)
|
|
||||||
self.disconnect_listener.data = <void*>self
|
|
||||||
self.disconnect_listener_inited = True
|
|
||||||
|
|
||||||
err = uv.uv_poll_start(&self.disconnect_listener,
|
|
||||||
uv.UV_READABLE | uv.UV_DISCONNECT,
|
|
||||||
__on_write_pipe_poll_event)
|
|
||||||
if err < 0:
|
|
||||||
raise convert_error(err)
|
|
||||||
|
|
||||||
cdef _stop_reading(self):
|
|
||||||
cdef int err
|
|
||||||
if not self.disconnect_listener_inited:
|
|
||||||
return
|
|
||||||
err = uv.uv_poll_stop(&self.disconnect_listener)
|
|
||||||
if err < 0:
|
|
||||||
raise convert_error(err)
|
|
||||||
|
|
||||||
cdef _close(self):
|
|
||||||
if self.disconnect_listener_inited:
|
|
||||||
self.disconnect_listener.data = NULL
|
|
||||||
uv.uv_close(<uv.uv_handle_t *>(&self.disconnect_listener), NULL)
|
|
||||||
self.disconnect_listener_inited = False
|
|
||||||
|
|
||||||
UVStream._close(self)
|
|
||||||
|
|
||||||
cdef _new_socket(self):
|
cdef _new_socket(self):
|
||||||
return __pipe_get_socket(<UVSocketHandle>self)
|
return __pipe_get_socket(<UVSocketHandle>self)
|
||||||
|
|
||||||
|
@ -220,25 +179,6 @@ cdef class WriteUnixTransport(UVStream):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle,
|
|
||||||
int status, int events) with gil:
|
|
||||||
cdef WriteUnixTransport tr
|
|
||||||
|
|
||||||
if handle.data is NULL:
|
|
||||||
return
|
|
||||||
|
|
||||||
tr = <WriteUnixTransport>handle.data
|
|
||||||
if tr._closed:
|
|
||||||
return
|
|
||||||
|
|
||||||
if events & uv.UV_DISCONNECT:
|
|
||||||
try:
|
|
||||||
tr._stop_reading()
|
|
||||||
tr._on_eof()
|
|
||||||
except BaseException as ex:
|
|
||||||
tr._fatal_error(ex, False)
|
|
||||||
|
|
||||||
|
|
||||||
cdef class _PipeConnectRequest(UVRequest):
|
cdef class _PipeConnectRequest(UVRequest):
|
||||||
cdef:
|
cdef:
|
||||||
UnixTransport transport
|
UnixTransport transport
|
||||||
|
|
|
@ -3,6 +3,15 @@ from posix.types cimport gid_t, uid_t
|
||||||
|
|
||||||
from . cimport system
|
from . cimport system
|
||||||
|
|
||||||
|
# This is an internal enum UV_HANDLE_READABLE from uv-common.h, used only by
|
||||||
|
# handles/pipe.pyx to temporarily workaround a libuv issue libuv/libuv#2058,
|
||||||
|
# before there is a proper fix in libuv. In short, libuv disallowed feeding a
|
||||||
|
# write-only pipe to uv_read_start(), which was needed by uvloop to detect a
|
||||||
|
# broken pipe without having to send anything on the write-only end. We're
|
||||||
|
# setting UV_HANDLE_READABLE on pipe_t to workaround this limitation
|
||||||
|
# temporarily, please see also #317.
|
||||||
|
cdef enum:
|
||||||
|
UV_INTERNAL_HANDLE_READABLE = 0x00004000
|
||||||
|
|
||||||
cdef extern from "uv.h" nogil:
|
cdef extern from "uv.h" nogil:
|
||||||
cdef int UV_TCP_IPV6ONLY
|
cdef int UV_TCP_IPV6ONLY
|
||||||
|
@ -82,6 +91,7 @@ cdef extern from "uv.h" nogil:
|
||||||
ctypedef struct uv_handle_t:
|
ctypedef struct uv_handle_t:
|
||||||
void* data
|
void* data
|
||||||
uv_loop_t* loop
|
uv_loop_t* loop
|
||||||
|
unsigned int flags
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
ctypedef struct uv_idle_t:
|
ctypedef struct uv_idle_t:
|
||||||
|
|
Loading…
Reference in New Issue