Use ThreadedServer instead of ThreadedPoolServer

ThreadedPoolServer server have an significant issue - accept method
implemented in blocking manner. This means that slow client will hang
rest in queue. In this case when there is more than two slow clients,
one of them will very likely be disconnected because of timeout.
This commit is contained in:
Oleksii Shevchuk 2016-10-16 23:24:14 +03:00
parent 782ae50e52
commit 3d907f7d46
1 changed files with 71 additions and 54 deletions

View File

@ -4,7 +4,7 @@
import sys, logging
from rpyc.utils.server import ThreadPoolServer, Server
from rpyc.utils.server import ThreadedServer
from rpyc.core import Channel, Connection
from rpyc.utils.authenticators import AuthenticationError
from rpyc.utils.registry import UDPRegistryClient
@ -15,18 +15,12 @@ import socket, time
import errno
import random
try:
import multiprocessing
Process=multiprocessing.Process
Event=multiprocessing.Event
except ImportError: #multiprocessing not available on android ?
import threading
Process=threading.Thread
Event=threading.Event
from Queue import Queue, Empty
from threading import Thread, Event
from streams.PupySocketStream import addGetPeer
class PupyTCPServer(ThreadPoolServer):
class PupyTCPServer(ThreadedServer):
def __init__(self, *args, **kwargs):
if not "stream" in kwargs:
@ -38,67 +32,90 @@ class PupyTCPServer(ThreadPoolServer):
self.stream_class = kwargs["stream"]
self.transport_class = kwargs["transport"]
self.transport_kwargs = kwargs["transport_kwargs"]
del kwargs["stream"]
del kwargs["transport"]
del kwargs["transport_kwargs"]
ThreadPoolServer.__init__(self, *args, **kwargs)
ThreadedServer.__init__(self, *args, **kwargs)
def _authenticate_and_build_connection(self, sock):
def _setup_connection(self, sock, queue):
'''Authenticate a client and if it succeeds, wraps the socket in a connection object.
Note that this code is cut and paste from the rpyc internals and may have to be
changed if rpyc evolves'''
# authenticate
addrinfo = sock.getpeername()
h=addrinfo[0]
p=addrinfo[1]
h, p, _, _ = sock.getpeername()
credentials = None
if self.authenticator:
try:
sock, credentials = self.authenticator(sock)
except KeyboardInterrupt:
pass
wrapper, credentials = self.authenticator(sock)
except AuthenticationError:
self.logger.info("%s:%s failed to authenticate, rejecting connection", h, p)
return None
self.logger.info('{}:{} failed to authenticate, rejecting connection'.format(h, p))
queue.put_nowait((None, None, None))
return
else:
credentials = None
wrapper = sock
# build a connection
config = dict(self.protocol_config, credentials=credentials, connid="%s:%d"%(h, p))
def check_timeout(event, cb, timeout=60):
begin = time.time()
duration = 0
while duration < timeout:
try:
time.sleep(timeout - duration)
except KeyboardInterrupt:
pass
finally:
duration = time.time() - begin
if not event.is_set():
logging.info("({}:{}) timeout occured ({}) !".format(
h, p, duration))
cb()
stream = self.stream_class(sock, self.transport_class, self.transport_kwargs)
event = Event()
t = Process(target=check_timeout, args=(event, stream.close))
t.daemon = True
t.start()
config = dict(self.protocol_config, credentials=credentials, connid='{}:{}'.format(h, p))
stream = self.stream_class(wrapper, self.transport_class, self.transport_kwargs)
connection = None
try:
c=Connection(self.service, Channel(stream), config=config)
except KeyboardInterrupt:
pass
finally:
event.set()
t.terminate()
self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p))
return c
connection = Connection(
self.service,
Channel(stream),
config=config,
_lazy=True
)
self.logger.debug('{}:{} Connection complete'.format(h, p))
finally:
self.logger.debug('{}:{} Report connection: {}'.format(h, p, connection))
queue.put_nowait((connection, wrapper, credentials))
def _authenticate_and_serve_client(self, sock):
queue = Queue(maxsize=1)
authentication = Thread(target=self._setup_connection, args=(sock, queue))
authentication.daemon = True
authentication.start()
connection = None
wrapper = None
h, p, _, _ = sock.getpeername()
try:
self.logger.debug('{}:{} Wait for authentication result'.format(h, p))
connection, wrapper, credentials = queue.get(block=True, timeout=60)
self.logger.debug('{}:{} Wait complete: {}'.format(h, p, connection))
if connection:
connection._init_service()
self.logger.debug('{}:{} Serving'.format(h, p))
connection.serve_all()
except Empty:
self.logger.debug('{}:{} Timeout'.format(h, p))
except Exception as e:
self.logger.exception('{}:{} Exception: {}'.format(h, p, type(e)))
finally:
self.logger.debug('{}:{} Shutting down'.format(h, p))
try:
sock.shutdown(socket.SHUT_RDWR)
except Exception:
pass
if wrapper:
wrapper.close()
self.clients.discard(sock)
class PupyUDPServer(object):
def __init__(self, service, **kwargs):
@ -173,7 +190,7 @@ class PupyUDPServer(object):
raise
self.clients[addr]=self.stream_class((self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False)
conn=Connection(self.service, Channel(self.clients[addr]), config=config, _lazy=True)
t = Process(target = self.handle_new_conn, args=(conn,))
t = Thread(target = self.handle_new_conn, args=(conn,))
t.daemon=True
t.start()
with self.clients[addr].downstream_lock: