Queue write only after processing all buffers (#445)

Allows `writelines` to leverage vectorized IO within uvloop to send
multiple buffers in one `sendmsg` call.

* Update license copyright

Co-authored-by: Fantix King <fantix.king@gmail.com>
This commit is contained in:
jakirkham 2022-09-09 13:48:59 -07:00 committed by GitHub
parent 089f6cbf57
commit 9c6ecb62ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 11 deletions

View File

@ -1,4 +1,4 @@
Copyright (c) 2015-present MagicStack Inc. http://magic.io
Copyright (C) 2016-present the uvloop authors and contributors.
Apache License
Version 2.0, January 2004

View File

@ -1,6 +1,6 @@
The MIT License
Copyright (c) 2015-present MagicStack Inc. http://magic.io
Copyright (C) 2016-present the uvloop authors and contributors.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -21,7 +21,6 @@ cdef class UVStream(UVBaseTransport):
cdef inline _init(self, Loop loop, object protocol, Server server,
object waiter, object context)
cdef inline _exec_write(self)
cdef inline _shutdown(self)
cdef inline _accept(self, UVStream server)
@ -31,7 +30,15 @@ cdef class UVStream(UVBaseTransport):
cdef inline __reading_started(self)
cdef inline __reading_stopped(self)
cdef inline _write(self, object data)
# The user API firstly calls _buffer_write() to buffer up user data chunks,
# potentially multiple times in writelines(), and then call _start_write()
# to start writing either immediately or in the next iteration.
cdef inline _buffer_write(self, object data)
cdef inline _start_write(self)
# _exec_write() is the method that does the actual send, and _try_write()
# is a fast-path used in _exec_write() to send a single chunk.
cdef inline _exec_write(self)
cdef inline _try_write(self, object data)
cdef _close(self)

View File

@ -162,7 +162,7 @@ cdef class _StreamWriteContext:
PyObject_GetBuffer(
buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
except Exception:
# This shouldn't ever happen, as `UVStream._write`
# This shouldn't ever happen, as `UVStream._buffer_write`
# casts non-bytes objects to `memoryviews`.
ctx.py_bufs_len = py_bufs_len
ctx.free_bufs()
@ -407,7 +407,7 @@ cdef class UVStream(UVBaseTransport):
return written
cdef inline _write(self, object data):
cdef inline _buffer_write(self, object data):
cdef int dlen
if not PyBytes_CheckExact(data):
@ -420,6 +420,7 @@ cdef class UVStream(UVBaseTransport):
self._buffer_size += dlen
self._buffer.append(data)
cdef inline _start_write(self):
if (not self._protocol_paused and
(<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
self._buffer_size > self._high_water):
@ -443,10 +444,10 @@ cdef class UVStream(UVBaseTransport):
# If not all of the data was sent successfully,
# we might need to pause the protocol.
self._maybe_pause_protocol()
return
self._maybe_pause_protocol()
self._loop._queue_write(self)
elif self._buffer_size > 0:
self._maybe_pause_protocol()
self._loop._queue_write(self)
cdef inline _exec_write(self):
cdef:
@ -679,7 +680,8 @@ cdef class UVStream(UVBaseTransport):
if self._conn_lost:
self._conn_lost += 1
return
self._write(buf)
self._buffer_write(buf)
self._start_write()
def writelines(self, bufs):
self._ensure_alive()
@ -690,7 +692,8 @@ cdef class UVStream(UVBaseTransport):
self._conn_lost += 1
return
for buf in bufs:
self._write(buf)
self._buffer_write(buf)
self._start_write()
def write_eof(self):
self._ensure_alive()