diff --git a/helper/monitor_open_files.sh b/helper/monitor_open_files.sh index f353e0db..d4214e8b 100755 --- a/helper/monitor_open_files.sh +++ b/helper/monitor_open_files.sh @@ -20,15 +20,20 @@ if [[ -z "$PROXY_PY_PID" ]]; then exit 1 fi -OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l) -echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN" +while true; +do + OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l) + echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN" -pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do - OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l) - echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR" + pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do + OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l) + echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR" - pgrep -P "$acceptorPid" | while read -r childPid; do - OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l) - echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC" + pgrep -P "$acceptorPid" | while read -r childPid; do + OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l) + echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC" + done done + + sleep 1 done diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index d0bebe26..63cb62e3 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -87,7 +87,11 @@ class TcpConnection(ABC): # TODO: Assemble multiple packets if total # size remains below max send size. max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE - sent: int = self.send(mv[:max_send_size]) + try: + sent: int = self.send(mv[:max_send_size]) + except BlockingIOError: + logger.warning('BlockingIOError when trying send to {0}'.format(self.tag)) + return 0 if sent == len(mv): self.buffer.pop(0) self._num_buffer -= 1 diff --git a/proxy/http/handler.py b/proxy/http/handler.py index b8d207d5..581e911a 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -49,6 +49,8 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]): if not self.flags.threadless: self.selector = selectors.DefaultSelector() self.plugin: Optional[HttpProtocolHandlerPlugin] = None + self.writes_teared: bool = False + self.reads_teared: bool = False ## # initialize, is_inactive, shutdown, get_events, handle_events @@ -137,23 +139,26 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]): ) -> bool: """Returns True if proxy must tear down.""" # Flush buffer for ready to write sockets - teardown = await self.handle_writables(writables) - if teardown: + self.writes_teared = await self.handle_writables(writables) + if self.writes_teared: return True # Invoke plugin.write_to_descriptors if self.plugin: - teardown = await self.plugin.write_to_descriptors(writables) - if teardown: + self.writes_teared = await self.plugin.write_to_descriptors(writables) + if self.writes_teared: return True - # Read from ready to read sockets - teardown = await self.handle_readables(readables) - if teardown: + # Read from ready to read sockets if reads have not already teared down + if not self.reads_teared: + self.reads_teared = await self.handle_readables(readables) + if not self.reads_teared: + # Invoke plugin.read_from_descriptors + if self.plugin: + self.reads_teared = await self.plugin.read_from_descriptors( + readables, + ) + # Wait until client buffer has flushed when reads has teared down but we can still write + if self.reads_teared and not self.work.has_buffer(): return True - # Invoke plugin.read_from_descriptors - if self.plugin: - teardown = await self.plugin.read_from_descriptors(readables) - if teardown: - return True return False def handle_data(self, data: memoryview) -> Optional[bool]: diff --git a/proxy/http/proxy/server.py b/proxy/http/proxy/server.py index f18f45fc..70d3369e 100644 --- a/proxy/http/proxy/server.py +++ b/proxy/http/proxy/server.py @@ -175,7 +175,11 @@ class HttpProxyPlugin(HttpProtocolHandlerPlugin): return r, w async def write_to_descriptors(self, w: Writables) -> bool: - if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream: + if ( + self.upstream + and not self.upstream.closed + and self.upstream.connection.fileno() not in w + ) or not self.upstream: # Currently, we just call write/read block of each plugins. It is # plugins responsibility to ignore this callback, if passed descriptors # doesn't contain the descriptor they registered. @@ -208,9 +212,9 @@ class HttpProxyPlugin(HttpProtocolHandlerPlugin): async def read_from_descriptors(self, r: Readables) -> bool: if ( - self.upstream and not - self.upstream.closed and - self.upstream.connection.fileno() not in r + self.upstream + and not self.upstream.closed + and self.upstream.connection.fileno() not in r ) or not self.upstream: # Currently, we just call write/read block of each plugins. It is # plugins responsibility to ignore this callback, if passed descriptors