From ea092a61e6ac28247022212af844cd3e6b8b6055 Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Tue, 18 Oct 2016 23:00:15 +0300 Subject: [PATCH] First attempt to fix stupid shitty horrible default RPyC polling mode So, there was a problem. If you are doing polling and syncronious calls simultaniously the only way to handle responses (both - sync and async) is to poll with some constant timeout the shared socket. This introduces delays, lags and other shit. --- pupy/network/lib/servers.py | 49 ++++++++++++++++++-- pupy/network/lib/streams/PupySocketStream.py | 22 ++++----- 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index 0fa56920..99612441 100755 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -5,7 +5,7 @@ import sys, logging from rpyc.utils.server import ThreadedServer -from rpyc.core import Channel, Connection +from rpyc.core import Channel, Connection, consts from rpyc.utils.authenticators import AuthenticationError from rpyc.utils.registry import UDPRegistryClient from rpyc.core.stream import Stream @@ -20,6 +20,48 @@ from threading import Thread, Event from streams.PupySocketStream import addGetPeer +class PupyConnection(Connection): + def __init__(self, *args, **kwargs): + self._sync_events = {} + Connection.__init__(self, *args, **kwargs) + + def sync_request(self, handler, *args): + seq = self._send_request(handler, args) + while not self._sync_events[seq].is_set(): + if not self.poll(timeout=None): + self._sync_events[seq].wait() + del self._sync_events[seq] + + isexc, obj = self._sync_replies.pop(seq) + if isexc: + raise obj + else: + return obj + + def _send_request(self, handler, args, async=False): + seq = next(self._seqcounter) + if async: + self._async_callbacks[seq] = callback + else: + self._sync_events[seq] = Event() + + self._send(consts.MSG_REQUEST, seq, (handler, self._box(args))) + return seq + + def _async_request(self, handler, args = (), callback = (lambda a, b: None)): + seq = self._send_request(handler, args) + return seq + + def _dispatch_reply(self, seq, raw): + Connection._dispatch_reply(self, seq, raw) + if not seq in self._async_callbacks: + self._sync_events[seq].set() + + def _dispatch_exception(self, seq, raw): + Connection._dispatch_exception(self, seq, raw) + if not seq in self._async_callbacks: + self._sync_events[seq].set() + class PupyTCPServer(ThreadedServer): def __init__(self, *args, **kwargs): @@ -65,7 +107,7 @@ class PupyTCPServer(ThreadedServer): try: self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p)) - connection = Connection( + connection = PupyConnection( self.service, Channel(stream), config=config, @@ -96,7 +138,8 @@ class PupyTCPServer(ThreadedServer): if connection: connection._init_service() self.logger.debug('{}:{} Serving'.format(h, p)) - connection.serve_all() + while True: + connection.serve(None) except Empty: self.logger.debug('{}:{} Timeout'.format(h, p)) diff --git a/pupy/network/lib/streams/PupySocketStream.py b/pupy/network/lib/streams/PupySocketStream.py index 5fde5184..e3db6b34 100644 --- a/pupy/network/lib/streams/PupySocketStream.py +++ b/pupy/network/lib/streams/PupySocketStream.py @@ -6,7 +6,7 @@ __all__=["PupySocketStream", "PupyUDPSocketStream"] import sys -from rpyc.core import SocketStream +from rpyc.core import SocketStream, Connection from ..buffer import Buffer import socket import time @@ -16,7 +16,6 @@ import traceback from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint import threading - class addGetPeer(object): """ add some functions needed by some obfsproxy transports""" def __init__(self, peer): @@ -70,12 +69,19 @@ class PupySocketStream(SocketStream): raise EOFError("connection closed by peer") self.buf_in.write(BYTES_LITERAL(buf)) + # The root of evil def poll(self, timeout): - return len(self.upstream)>0 or self.sock_poll(timeout) + # Just ignore timeout + result = ( len(self.upstream)>0 or self.sock_poll(timeout) ) + return result def sock_poll(self, timeout): with self.downstream_lock: - if super(PupySocketStream, self).poll(timeout): + to_read, _, to_close = select([self.sock], [], [self.sock], timeout) + if to_close: + raise EOFError('sock_poll error') + + if to_read: self._read() self.transport.downstream_recv(self.buf_in) return True @@ -92,15 +98,9 @@ class PupySocketStream(SocketStream): if len(self.upstream)>=count: return self.upstream.read(count) while len(self.upstream)