diff --git a/tests/test_pipes.py b/tests/test_pipes.py index 7d332cb..c2b8a01 100644 --- a/tests/test_pipes.py +++ b/tests/test_pipes.py @@ -43,6 +43,7 @@ class MyReadPipeProto(asyncio.Protocol): class MyWritePipeProto(asyncio.BaseProtocol): done = None + paused = False def __init__(self, loop=None): self.state = 'INITIAL' @@ -61,6 +62,12 @@ class MyWritePipeProto(asyncio.BaseProtocol): if self.done: self.done.set_result(None) + def pause_writing(self): + self.paused = True + + def resume_writing(self): + self.paused = False + class _BasePipeTest: def test_read_pipe(self): @@ -241,6 +248,29 @@ class _BasePipeTest: self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) + def test_write_buffer_full(self): + rpipe, wpipe = os.pipe() + pipeobj = io.open(wpipe, 'wb', 1024) + + proto = MyWritePipeProto(loop=self.loop) + connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) + transport, p = self.loop.run_until_complete(connect) + self.assertIs(p, proto) + self.assertIs(transport, proto.transport) + self.assertEqual('CONNECTED', proto.state) + + for i in range(32): + transport.write(b'x' * 32768) + if proto.paused: + transport.write(b'x' * 32768) + break + else: + self.fail("Didn't reach a full buffer") + + os.close(rpipe) + self.loop.run_until_complete(asyncio.wait_for(proto.done, 1)) + self.assertEqual('CLOSED', proto.state) + class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase): pass diff --git a/uvloop/handles/pipe.pxd b/uvloop/handles/pipe.pxd index f5dc1a9..7c60fc6 100644 --- a/uvloop/handles/pipe.pxd +++ b/uvloop/handles/pipe.pxd @@ -28,10 +28,6 @@ cdef class ReadUnixTransport(UVStream): cdef class WriteUnixTransport(UVStream): - cdef: - uv.uv_poll_t disconnect_listener - bint disconnect_listener_inited - @staticmethod cdef WriteUnixTransport new(Loop loop, object protocol, Server server, object waiter) diff --git a/uvloop/handles/pipe.pyx b/uvloop/handles/pipe.pyx index 581554f..bd8809a 100644 --- a/uvloop/handles/pipe.pyx +++ b/uvloop/handles/pipe.pyx @@ -12,6 +12,9 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop): err = uv.uv_pipe_init(handle._loop.uvloop, handle._handle, 0) + # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe + # even if it is O_WRONLY, see also #317, libuv/libuv#2058 + handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE if err < 0: handle._abort_init() raise convert_error(err) @@ -147,10 +150,6 @@ cdef class ReadUnixTransport(UVStream): @cython.no_gc_clear cdef class WriteUnixTransport(UVStream): - def __cinit__(self): - self.disconnect_listener_inited = False - self.disconnect_listener.data = NULL - @staticmethod cdef WriteUnixTransport new(Loop loop, object protocol, Server server, object waiter): @@ -167,46 +166,6 @@ cdef class WriteUnixTransport(UVStream): __pipe_init_uv_handle(handle, loop) return handle - cdef _start_reading(self): - # A custom implementation for monitoring for EOF: - # libuv since v1.23.1 prohibits using uv_read_start on - # write-only FDs, so we use a throw-away uv_poll_t handle - # for that purpose, as suggested in - # https://github.com/libuv/libuv/issues/2058. - - cdef int err - - if not self.disconnect_listener_inited: - err = uv.uv_poll_init(self._loop.uvloop, - &self.disconnect_listener, - self._fileno()) - if err < 0: - raise convert_error(err) - self.disconnect_listener.data = self - self.disconnect_listener_inited = True - - err = uv.uv_poll_start(&self.disconnect_listener, - uv.UV_READABLE | uv.UV_DISCONNECT, - __on_write_pipe_poll_event) - if err < 0: - raise convert_error(err) - - cdef _stop_reading(self): - cdef int err - if not self.disconnect_listener_inited: - return - err = uv.uv_poll_stop(&self.disconnect_listener) - if err < 0: - raise convert_error(err) - - cdef _close(self): - if self.disconnect_listener_inited: - self.disconnect_listener.data = NULL - uv.uv_close((&self.disconnect_listener), NULL) - self.disconnect_listener_inited = False - - UVStream._close(self) - cdef _new_socket(self): return __pipe_get_socket(self) @@ -220,25 +179,6 @@ cdef class WriteUnixTransport(UVStream): raise NotImplementedError -cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle, - int status, int events) with gil: - cdef WriteUnixTransport tr - - if handle.data is NULL: - return - - tr = handle.data - if tr._closed: - return - - if events & uv.UV_DISCONNECT: - try: - tr._stop_reading() - tr._on_eof() - except BaseException as ex: - tr._fatal_error(ex, False) - - cdef class _PipeConnectRequest(UVRequest): cdef: UnixTransport transport diff --git a/uvloop/includes/uv.pxd b/uvloop/includes/uv.pxd index cfb2be4..0b0f1da 100644 --- a/uvloop/includes/uv.pxd +++ b/uvloop/includes/uv.pxd @@ -3,6 +3,15 @@ from posix.types cimport gid_t, uid_t from . cimport system +# This is an internal enum UV_HANDLE_READABLE from uv-common.h, used only by +# handles/pipe.pyx to temporarily workaround a libuv issue libuv/libuv#2058, +# before there is a proper fix in libuv. In short, libuv disallowed feeding a +# write-only pipe to uv_read_start(), which was needed by uvloop to detect a +# broken pipe without having to send anything on the write-only end. We're +# setting UV_HANDLE_READABLE on pipe_t to workaround this limitation +# temporarily, please see also #317. +cdef enum: + UV_INTERNAL_HANDLE_READABLE = 0x00004000 cdef extern from "uv.h" nogil: cdef int UV_TCP_IPV6ONLY @@ -82,6 +91,7 @@ cdef extern from "uv.h" nogil: ctypedef struct uv_handle_t: void* data uv_loop_t* loop + unsigned int flags # ... ctypedef struct uv_idle_t: