diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index 77a6f9bf..0fa56920 100755 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -4,7 +4,7 @@ import sys, logging -from rpyc.utils.server import ThreadPoolServer, Server +from rpyc.utils.server import ThreadedServer from rpyc.core import Channel, Connection from rpyc.utils.authenticators import AuthenticationError from rpyc.utils.registry import UDPRegistryClient @@ -15,18 +15,12 @@ import socket, time import errno import random -try: - import multiprocessing - Process=multiprocessing.Process - Event=multiprocessing.Event -except ImportError: #multiprocessing not available on android ? - import threading - Process=threading.Thread - Event=threading.Event +from Queue import Queue, Empty +from threading import Thread, Event from streams.PupySocketStream import addGetPeer -class PupyTCPServer(ThreadPoolServer): +class PupyTCPServer(ThreadedServer): def __init__(self, *args, **kwargs): if not "stream" in kwargs: @@ -38,67 +32,90 @@ class PupyTCPServer(ThreadPoolServer): self.stream_class = kwargs["stream"] self.transport_class = kwargs["transport"] self.transport_kwargs = kwargs["transport_kwargs"] + del kwargs["stream"] del kwargs["transport"] del kwargs["transport_kwargs"] - ThreadPoolServer.__init__(self, *args, **kwargs) + ThreadedServer.__init__(self, *args, **kwargs) - def _authenticate_and_build_connection(self, sock): + def _setup_connection(self, sock, queue): '''Authenticate a client and if it succeeds, wraps the socket in a connection object. Note that this code is cut and paste from the rpyc internals and may have to be changed if rpyc evolves''' - # authenticate - addrinfo = sock.getpeername() - h=addrinfo[0] - p=addrinfo[1] + h, p, _, _ = sock.getpeername() + + credentials = None if self.authenticator: try: - sock, credentials = self.authenticator(sock) - except KeyboardInterrupt: - pass + wrapper, credentials = self.authenticator(sock) except AuthenticationError: - self.logger.info("%s:%s failed to authenticate, rejecting connection", h, p) - return None + self.logger.info('{}:{} failed to authenticate, rejecting connection'.format(h, p)) + queue.put_nowait((None, None, None)) + return else: - credentials = None + wrapper = sock # build a connection - config = dict(self.protocol_config, credentials=credentials, connid="%s:%d"%(h, p)) - - def check_timeout(event, cb, timeout=60): - begin = time.time() - duration = 0 - while duration < timeout: - try: - time.sleep(timeout - duration) - except KeyboardInterrupt: - pass - finally: - duration = time.time() - begin - - if not event.is_set(): - logging.info("({}:{}) timeout occured ({}) !".format( - h, p, duration)) - cb() - - stream = self.stream_class(sock, self.transport_class, self.transport_kwargs) - - event = Event() - t = Process(target=check_timeout, args=(event, stream.close)) - t.daemon = True - t.start() + config = dict(self.protocol_config, credentials=credentials, connid='{}:{}'.format(h, p)) + stream = self.stream_class(wrapper, self.transport_class, self.transport_kwargs) + connection = None try: - c=Connection(self.service, Channel(stream), config=config) - except KeyboardInterrupt: - pass - finally: - event.set() - t.terminate() + self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p)) - return c + connection = Connection( + self.service, + Channel(stream), + config=config, + _lazy=True + ) + + self.logger.debug('{}:{} Connection complete'.format(h, p)) + finally: + self.logger.debug('{}:{} Report connection: {}'.format(h, p, connection)) + queue.put_nowait((connection, wrapper, credentials)) + + def _authenticate_and_serve_client(self, sock): + queue = Queue(maxsize=1) + + authentication = Thread(target=self._setup_connection, args=(sock, queue)) + authentication.daemon = True + authentication.start() + + connection = None + wrapper = None + + h, p, _, _ = sock.getpeername() + + try: + self.logger.debug('{}:{} Wait for authentication result'.format(h, p)) + connection, wrapper, credentials = queue.get(block=True, timeout=60) + self.logger.debug('{}:{} Wait complete: {}'.format(h, p, connection)) + if connection: + connection._init_service() + self.logger.debug('{}:{} Serving'.format(h, p)) + connection.serve_all() + + except Empty: + self.logger.debug('{}:{} Timeout'.format(h, p)) + + except Exception as e: + self.logger.exception('{}:{} Exception: {}'.format(h, p, type(e))) + + finally: + self.logger.debug('{}:{} Shutting down'.format(h, p)) + + try: + sock.shutdown(socket.SHUT_RDWR) + except Exception: + pass + + if wrapper: + wrapper.close() + + self.clients.discard(sock) class PupyUDPServer(object): def __init__(self, service, **kwargs): @@ -173,7 +190,7 @@ class PupyUDPServer(object): raise self.clients[addr]=self.stream_class((self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False) conn=Connection(self.service, Channel(self.clients[addr]), config=config, _lazy=True) - t = Process(target = self.handle_new_conn, args=(conn,)) + t = Thread(target = self.handle_new_conn, args=(conn,)) t.daemon=True t.start() with self.clients[addr].downstream_lock: