Wait until buffer flush (#1385)

* Wait until all data in buffer is flushed to client when upstream server finishes.

(cherry picked from commit d7765067b0)

* Wait until buffer flush

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Avoid shadowing

* _teared not _teardown

* Refactor logic

* Do not try `read_from_descriptors` if reads have previously teared down

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: yk <1876421041@qq.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Abhinav Singh 2024-04-13 14:47:35 +05:30 committed by GitHub
parent 2fa320d03f
commit 5ab40f20b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 43 additions and 25 deletions

View File

@ -20,10 +20,12 @@ if [[ -z "$PROXY_PY_PID" ]]; then
exit 1
fi
OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l)
echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN"
while true;
do
OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l)
echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN"
pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l)
echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR"
@ -31,4 +33,7 @@ pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l)
echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC"
done
done
sleep 1
done

View File

@ -87,7 +87,11 @@ class TcpConnection(ABC):
# TODO: Assemble multiple packets if total
# size remains below max send size.
max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE
try:
sent: int = self.send(mv[:max_send_size])
except BlockingIOError:
logger.warning('BlockingIOError when trying send to {0}'.format(self.tag))
return 0
if sent == len(mv):
self.buffer.pop(0)
self._num_buffer -= 1

View File

@ -49,6 +49,8 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]):
if not self.flags.threadless:
self.selector = selectors.DefaultSelector()
self.plugin: Optional[HttpProtocolHandlerPlugin] = None
self.writes_teared: bool = False
self.reads_teared: bool = False
##
# initialize, is_inactive, shutdown, get_events, handle_events
@ -137,22 +139,25 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]):
) -> bool:
"""Returns True if proxy must tear down."""
# Flush buffer for ready to write sockets
teardown = await self.handle_writables(writables)
if teardown:
self.writes_teared = await self.handle_writables(writables)
if self.writes_teared:
return True
# Invoke plugin.write_to_descriptors
if self.plugin:
teardown = await self.plugin.write_to_descriptors(writables)
if teardown:
return True
# Read from ready to read sockets
teardown = await self.handle_readables(readables)
if teardown:
self.writes_teared = await self.plugin.write_to_descriptors(writables)
if self.writes_teared:
return True
# Read from ready to read sockets if reads have not already teared down
if not self.reads_teared:
self.reads_teared = await self.handle_readables(readables)
if not self.reads_teared:
# Invoke plugin.read_from_descriptors
if self.plugin:
teardown = await self.plugin.read_from_descriptors(readables)
if teardown:
self.reads_teared = await self.plugin.read_from_descriptors(
readables,
)
# Wait until client buffer has flushed when reads has teared down but we can still write
if self.reads_teared and not self.work.has_buffer():
return True
return False

View File

@ -175,7 +175,11 @@ class HttpProxyPlugin(HttpProtocolHandlerPlugin):
return r, w
async def write_to_descriptors(self, w: Writables) -> bool:
if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream:
if (
self.upstream
and not self.upstream.closed
and self.upstream.connection.fileno() not in w
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
# doesn't contain the descriptor they registered.
@ -208,9 +212,9 @@ class HttpProxyPlugin(HttpProtocolHandlerPlugin):
async def read_from_descriptors(self, r: Readables) -> bool:
if (
self.upstream and not
self.upstream.closed and
self.upstream.connection.fileno() not in r
self.upstream
and not self.upstream.closed
and self.upstream.connection.fileno() not in r
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors