From 3d907f7d467f7c89477380d3770e53a32e06b953 Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Sun, 16 Oct 2016 23:24:14 +0300 Subject: [PATCH] Use ThreadedServer instead of ThreadedPoolServer ThreadedPoolServer server have an significant issue - accept method implemented in blocking manner. This means that slow client will hang rest in queue. In this case when there is more than two slow clients, one of them will very likely be disconnected because of timeout. --- pupy/network/lib/servers.py | 125 ++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 54 deletions(-) diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index 77a6f9bf..0fa56920 100755 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -4,7 +4,7 @@ import sys, logging -from rpyc.utils.server import ThreadPoolServer, Server +from rpyc.utils.server import ThreadedServer from rpyc.core import Channel, Connection from rpyc.utils.authenticators import AuthenticationError from rpyc.utils.registry import UDPRegistryClient @@ -15,18 +15,12 @@ import socket, time import errno import random -try: - import multiprocessing - Process=multiprocessing.Process - Event=multiprocessing.Event -except ImportError: #multiprocessing not available on android ? - import threading - Process=threading.Thread - Event=threading.Event +from Queue import Queue, Empty +from threading import Thread, Event from streams.PupySocketStream import addGetPeer -class PupyTCPServer(ThreadPoolServer): +class PupyTCPServer(ThreadedServer): def __init__(self, *args, **kwargs): if not "stream" in kwargs: @@ -38,67 +32,90 @@ class PupyTCPServer(ThreadPoolServer): self.stream_class = kwargs["stream"] self.transport_class = kwargs["transport"] self.transport_kwargs = kwargs["transport_kwargs"] + del kwargs["stream"] del kwargs["transport"] del kwargs["transport_kwargs"] - ThreadPoolServer.__init__(self, *args, **kwargs) + ThreadedServer.__init__(self, *args, **kwargs) - def _authenticate_and_build_connection(self, sock): + def _setup_connection(self, sock, queue): '''Authenticate a client and if it succeeds, wraps the socket in a connection object. Note that this code is cut and paste from the rpyc internals and may have to be changed if rpyc evolves''' - # authenticate - addrinfo = sock.getpeername() - h=addrinfo[0] - p=addrinfo[1] + h, p, _, _ = sock.getpeername() + + credentials = None if self.authenticator: try: - sock, credentials = self.authenticator(sock) - except KeyboardInterrupt: - pass + wrapper, credentials = self.authenticator(sock) except AuthenticationError: - self.logger.info("%s:%s failed to authenticate, rejecting connection", h, p) - return None + self.logger.info('{}:{} failed to authenticate, rejecting connection'.format(h, p)) + queue.put_nowait((None, None, None)) + return else: - credentials = None + wrapper = sock # build a connection - config = dict(self.protocol_config, credentials=credentials, connid="%s:%d"%(h, p)) - - def check_timeout(event, cb, timeout=60): - begin = time.time() - duration = 0 - while duration < timeout: - try: - time.sleep(timeout - duration) - except KeyboardInterrupt: - pass - finally: - duration = time.time() - begin - - if not event.is_set(): - logging.info("({}:{}) timeout occured ({}) !".format( - h, p, duration)) - cb() - - stream = self.stream_class(sock, self.transport_class, self.transport_kwargs) - - event = Event() - t = Process(target=check_timeout, args=(event, stream.close)) - t.daemon = True - t.start() + config = dict(self.protocol_config, credentials=credentials, connid='{}:{}'.format(h, p)) + stream = self.stream_class(wrapper, self.transport_class, self.transport_kwargs) + connection = None try: - c=Connection(self.service, Channel(stream), config=config) - except KeyboardInterrupt: - pass - finally: - event.set() - t.terminate() + self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p)) - return c + connection = Connection( + self.service, + Channel(stream), + config=config, + _lazy=True + ) + + self.logger.debug('{}:{} Connection complete'.format(h, p)) + finally: + self.logger.debug('{}:{} Report connection: {}'.format(h, p, connection)) + queue.put_nowait((connection, wrapper, credentials)) + + def _authenticate_and_serve_client(self, sock): + queue = Queue(maxsize=1) + + authentication = Thread(target=self._setup_connection, args=(sock, queue)) + authentication.daemon = True + authentication.start() + + connection = None + wrapper = None + + h, p, _, _ = sock.getpeername() + + try: + self.logger.debug('{}:{} Wait for authentication result'.format(h, p)) + connection, wrapper, credentials = queue.get(block=True, timeout=60) + self.logger.debug('{}:{} Wait complete: {}'.format(h, p, connection)) + if connection: + connection._init_service() + self.logger.debug('{}:{} Serving'.format(h, p)) + connection.serve_all() + + except Empty: + self.logger.debug('{}:{} Timeout'.format(h, p)) + + except Exception as e: + self.logger.exception('{}:{} Exception: {}'.format(h, p, type(e))) + + finally: + self.logger.debug('{}:{} Shutting down'.format(h, p)) + + try: + sock.shutdown(socket.SHUT_RDWR) + except Exception: + pass + + if wrapper: + wrapper.close() + + self.clients.discard(sock) class PupyUDPServer(object): def __init__(self, service, **kwargs): @@ -173,7 +190,7 @@ class PupyUDPServer(object): raise self.clients[addr]=self.stream_class((self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False) conn=Connection(self.service, Channel(self.clients[addr]), config=config, _lazy=True) - t = Process(target = self.handle_new_conn, args=(conn,)) + t = Thread(target = self.handle_new_conn, args=(conn,)) t.daemon=True t.start() with self.clients[addr].downstream_lock: