First attempt to fix stupid shitty horrible default RPyC polling mode

So, there was a problem. If you are doing polling and syncronious calls
simultaniously the only way to handle responses (both - sync and async)
is to poll with some constant timeout the shared socket. This introduces
delays, lags and other shit.
This commit is contained in:
Oleksii Shevchuk 2016-10-18 23:00:15 +03:00
parent 3d907f7d46
commit ea092a61e6
2 changed files with 57 additions and 14 deletions

View File

@ -5,7 +5,7 @@
import sys, logging
from rpyc.utils.server import ThreadedServer
from rpyc.core import Channel, Connection
from rpyc.core import Channel, Connection, consts
from rpyc.utils.authenticators import AuthenticationError
from rpyc.utils.registry import UDPRegistryClient
from rpyc.core.stream import Stream
@ -20,6 +20,48 @@ from threading import Thread, Event
from streams.PupySocketStream import addGetPeer
class PupyConnection(Connection):
def __init__(self, *args, **kwargs):
self._sync_events = {}
Connection.__init__(self, *args, **kwargs)
def sync_request(self, handler, *args):
seq = self._send_request(handler, args)
while not self._sync_events[seq].is_set():
if not self.poll(timeout=None):
self._sync_events[seq].wait()
del self._sync_events[seq]
isexc, obj = self._sync_replies.pop(seq)
if isexc:
raise obj
else:
return obj
def _send_request(self, handler, args, async=False):
seq = next(self._seqcounter)
if async:
self._async_callbacks[seq] = callback
else:
self._sync_events[seq] = Event()
self._send(consts.MSG_REQUEST, seq, (handler, self._box(args)))
return seq
def _async_request(self, handler, args = (), callback = (lambda a, b: None)):
seq = self._send_request(handler, args)
return seq
def _dispatch_reply(self, seq, raw):
Connection._dispatch_reply(self, seq, raw)
if not seq in self._async_callbacks:
self._sync_events[seq].set()
def _dispatch_exception(self, seq, raw):
Connection._dispatch_exception(self, seq, raw)
if not seq in self._async_callbacks:
self._sync_events[seq].set()
class PupyTCPServer(ThreadedServer):
def __init__(self, *args, **kwargs):
@ -65,7 +107,7 @@ class PupyTCPServer(ThreadedServer):
try:
self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p))
connection = Connection(
connection = PupyConnection(
self.service,
Channel(stream),
config=config,
@ -96,7 +138,8 @@ class PupyTCPServer(ThreadedServer):
if connection:
connection._init_service()
self.logger.debug('{}:{} Serving'.format(h, p))
connection.serve_all()
while True:
connection.serve(None)
except Empty:
self.logger.debug('{}:{} Timeout'.format(h, p))

View File

@ -6,7 +6,7 @@
__all__=["PupySocketStream", "PupyUDPSocketStream"]
import sys
from rpyc.core import SocketStream
from rpyc.core import SocketStream, Connection
from ..buffer import Buffer
import socket
import time
@ -16,7 +16,6 @@ import traceback
from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint
import threading
class addGetPeer(object):
""" add some functions needed by some obfsproxy transports"""
def __init__(self, peer):
@ -70,12 +69,19 @@ class PupySocketStream(SocketStream):
raise EOFError("connection closed by peer")
self.buf_in.write(BYTES_LITERAL(buf))
# The root of evil
def poll(self, timeout):
return len(self.upstream)>0 or self.sock_poll(timeout)
# Just ignore timeout
result = ( len(self.upstream)>0 or self.sock_poll(timeout) )
return result
def sock_poll(self, timeout):
with self.downstream_lock:
if super(PupySocketStream, self).poll(timeout):
to_read, _, to_close = select([self.sock], [], [self.sock], timeout)
if to_close:
raise EOFError('sock_poll error')
if to_read:
self._read()
self.transport.downstream_recv(self.buf_in)
return True
@ -92,15 +98,9 @@ class PupySocketStream(SocketStream):
if len(self.upstream)>=count:
return self.upstream.read(count)
while len(self.upstream)<count:
if not self.sock_poll(0.1) and self.closed:
if not self.sock_poll(None) and self.closed:
return None
#self._read()
#it seems we can actively wait here with only perf enhancement
#if len(self.upstream)<count:
# self.upstream.wait(0.1)#to avoid active wait
return self.upstream.read(count)
except Exception as e:
logging.debug(traceback.format_exc())