From 55069e64db2dd8bfee589634850968da33a757a1 Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Thu, 8 Mar 2018 14:16:25 +0200 Subject: [PATCH] Rework PupyConnection to avoid nested locks --- pupy/network/lib/connection.py | 403 ++++++++++--------- pupy/network/lib/servers.py | 20 +- pupy/network/lib/streams/PupySocketStream.py | 17 +- 3 files changed, 221 insertions(+), 219 deletions(-) diff --git a/pupy/network/lib/connection.py b/pupy/network/lib/connection.py index 73baf887..7ac49d9e 100644 --- a/pupy/network/lib/connection.py +++ b/pupy/network/lib/connection.py @@ -10,6 +10,7 @@ import logging logger = None logger = logging.getLogger('pconn') +synclogger = logging.getLogger('sync') from network.lib.buffer import Buffer @@ -132,13 +133,12 @@ class SyncRequestDispatchQueue(object): break class PupyConnection(Connection): - def __init__(self, lock, pupy_srv, *args, **kwargs): + def __init__(self, pupy_srv, *args, **kwargs): self._sync_events = {} - self._sync_events_interrupts = {} - self._sync_locks = {} - self._connection_serve_lock = lock + self._sync_raw_replies = {} + self._sync_raw_exceptions = {} + self._async_lock = Lock() - self._sync_events_lock = Lock() self._last_recv = time.time() self._ping = False self._ping_timeout = 30 @@ -146,6 +146,9 @@ class PupyConnection(Connection): self._last_ping = None self._default_serve_timeout = 5 self._queue = SyncRequestDispatchQueue.get_queue() + self._data_queue = Queue() + self._serve_thread = Thread(target=self._serve_loop) + self._serve_thread.daemon = True self._serve_interrupt = Event() @@ -176,6 +179,8 @@ class PupyConnection(Connection): next(self._seqcounter) + self._serve_thread.start() + def _dispatch_request(self, seq, args): super(PupyConnection, self)._dispatch_request(seq, args) @@ -194,25 +199,6 @@ class PupyConnection(Connection): def wake(self): self._channel.wake() - def initialize(self, timeout=10): - try: - Thread( - target=self._initialization_timeout, args=(timeout,) - ).start() - - self._init_service() - self.initialized.set() - except (EOFError, TypeError): - self.close() - return False - - return self.initialized.is_set() - - def _initialization_timeout(self, timeout): - self.initialized.wait(timeout) - if not self.initialized.is_set(): - self.close() - def set_pings(self, ping=None, timeout=None): if ping is not None: try: @@ -243,99 +229,40 @@ class PupyConnection(Connection): def sync_request(self, handler, *args): seq = self._send_request(handler, args) if __debug__: - logger.debug('Sync request: {} / {}'.format(seq, handler)) + synclogger.debug('Sync request wait: {}'.format(seq)) - while not ( self._sync_events[seq].is_set() or self.closed ): - if __debug__: - logger.debug('Sync poll until: {}'.format(seq)) - - synclocked = self._sync_locks[seq].acquire(False) - - if synclocked and self._connection_serve_lock.acquire(False): - data = None - - with self._sync_events_lock: - if seq in self._sync_events_interrupts: - self._sync_events_interrupts[seq].set() - - try: - # Ensure event was not missed between previous lock - if self._sync_events[seq].is_set(): - if __debug__: - logger.debug('Rollback sync poll: {}'.format(seq)) - - break - - if __debug__: - logger.debug('Sync poll serve: {}'.format(seq)) - - data = self.serve(self._serve_timeout) - if not data: - if __debug__: - logger.debug('Sync poll serve interrupted: {}/inactive={}'.format( - seq, self.inactive)) - - if self._ping: - if __debug__: - logger.debug( - 'Submit ping request (timeout: {}): {} - interrupted'.format( - self._serve_timeout, seq)) - - self.ping(timeout=self._ping_timeout) - - if __debug__: - logger.debug('Submit ping request: {} - resumed'.format(seq)) - - finally: - if __debug__: - logger.debug('Sync poll serve complete. release: {}'.format(seq)) - self._connection_serve_lock.release() - self._sync_locks[seq].release() - - self.dispatch(data) - - # Wake one pending event just in case - with self._sync_events_lock: - for event in self._sync_events_interrupts.itervalues(): - if not event.is_set(): - event.set() - break - - else: - if synclocked: - self._sync_locks[seq].release() - - if __debug__: - logger.debug('Sync poll wait: {} / {}'.format(seq, synclocked)) - - with self._sync_events_lock: - self._sync_events_interrupts[seq].clear() - - self._sync_events_interrupts[seq].wait(timeout=self._serve_timeout) - - if self._ping: - if __debug__: - logger.debug('Send ping (timeout: {})'.format(self._ping_timeout)) - - self.ping(timeout=self._ping_timeout) - - if __debug__: - logger.debug('Send ping (timeout: {}) - sent'.format(self._ping_timeout)) - - if not self._sync_events[seq].is_set(): - continue - - if __debug__: - logger.debug('Sync poll complete: {}/inactive={}'.format(seq, self.inactive)) + self._sync_events[seq].wait() if __debug__: - logger.debug('Sync request handled: {}'.format(seq)) + synclogger.debug('Sync request wait: {} - complete'.format(seq)) - with self._sync_events_lock: - with self._sync_locks[seq]: - del self._sync_locks[seq] - del self._sync_events[seq] - del self._sync_events_interrupts[seq] + del self._sync_events[seq] + + if __debug__: + synclogger.debug('Sync request process: {}'.format(seq)) + + if seq in self._sync_raw_replies: + if __debug__: + synclogger.debug('Dispatch sync reply: {} - start'.format(seq)) + + Connection._dispatch_reply( + self, seq, self._sync_raw_replies.pop(seq)) + + if __debug__: + synclogger.debug('Dispatch sync reply: {} - complete'.format(seq)) + + if seq in self._sync_raw_exceptions: + if __debug__: + synclogger.debug('Dispatch sync exception: {} - start'.format(seq)) + + Connection._dispatch_exception( + self, seq, self._sync_raw_exceptions.pop(seq)) + + if __debug__: + synclogger.debug('Dispatch sync exception: {} - complete'.format(seq)) + + if __debug__: + synclogger.debug('Sync request: {} - complete'.format(seq)) if self.closed: raise EOFError('Connection was closed, seq: {}'.format(seq)) @@ -356,17 +283,14 @@ class PupyConnection(Connection): self._async_callbacks[seq] = async else: if __debug__: - logger.debug('Sync request: {}'.format(seq)) + synclogger.debug('Sync request: {}'.format(seq)) - with self._sync_events_lock: - self._sync_locks[seq] = RLock() - self._sync_events[seq] = Event() - self._sync_events_interrupts[seq] = Event() + self._sync_events[seq] = Event() self._send(consts.MSG_REQUEST, seq, (handler, self._box(args))) if __debug__: - logger.debug('Request submitted: {}'.format(seq)) + synclogger.debug('Request submitted: {}'.format(seq)) return seq @@ -383,16 +307,21 @@ class PupyConnection(Connection): with self._async_lock: sync = seq not in self._async_callbacks - Connection._dispatch_reply(self, seq, raw) if sync: - with self._sync_events_lock: - self._sync_events[seq].set() - if seq in self._sync_events_interrupts: - self._sync_events_interrupts[seq].set() + self._sync_raw_replies[seq] = raw + if __debug__: + logger.debug('Dispatch sync reply: {} - pass'.format(seq)) + self._sync_events[seq].set() - if __debug__: - logger.debug('Dispatch reply: {} - complete'.format(seq)) + else: + # We hope here that this request will not block x_x + if __debug__: + logger.debug('Dispatch async reply: {} - start'.format(seq)) + Connection._dispatch_reply(self, seq, raw) + + if __debug__: + logger.debug('Dispatch async reply: {} - complete'.format(seq)) def _dispatch_exception(self, seq, raw): if __debug__: @@ -404,25 +333,118 @@ class PupyConnection(Connection): with self._async_lock: sync = seq not in self._async_callbacks - Connection._dispatch_exception(self, seq, raw) - with self._sync_events_lock: - if sync: - self._sync_events[seq].set() - if seq in self._sync_events_interrupts: - self._sync_events_interrupts[seq].set() + if sync: + self._sync_raw_exceptions[seq] = raw + if __debug__: + logger.debug('Dispatch sync exception: {} - pass'.format(seq)) + self._sync_events[seq].set() + else: + if __debug__: + logger.debug('Dispatch async reply: {} - start'.format(seq)) + Connection._dispatch_exception(self, seq, raw) + if __debug__: + logger.debug('Dispatch async reply: {} - complete'.format(seq)) def close(self, *args): + if self._closed: + return + + if __debug__: + logger.debug('Connection - close - start') + try: Connection.close(self, *args) finally: for lock in self._sync_events.itervalues(): lock.set() + self._data_queue.put(None) + + if __debug__: + logger.debug('Connection - closed') + + @property def inactive(self): return time.time() - self._last_recv def serve(self, timeout=None): + raise NotImplemented('Serve method should not be used!') + + def _init_service_with_notify(self, event): + try: + self._init_service() + finally: + event.set() + + def init(self, timeout=60): + + event = Event() + + def check_timeout(): + now = time.time() + + while ( time.time() - now < timeout ) and not event.is_set() and not self.closed: + time.sleep(1) + + if not event.is_set(): + logger.error('timeout occured!') + if not self.closed: + self.close() + + t = Thread(target=check_timeout) + t.daemon = True + t.start() + + self._queue( + self._on_sync_request_exception, self._init_service_with_notify, event) + + def loop(self): + if __debug__: + logger.debug('Dispatch loop started') + + while not self.closed: + try: + self._dispatch() + + except EOFError: + break + + except Exception, e: + logger.exception(e) + break + + if __debug__: + logger.debug('Dispatch loop completed - close connection') + + self.close() + + if __debug__: + logger.debug('Dispatch loop completed') + + def _serve_loop(self): + if __debug__: + logger.debug('Serve loop started') + + while not self.closed: + try: + self._serve() + continue + + except EOFError: + logger.info('Serve loop - EOF') + + except Exception, e: + logger.exception('Exception: {}: {}'.format(type(e), e)) + + break + + if __debug__: + logger.debug('Serve loop completed') + + self.close() + + def _serve(self, timeout=None): ''' Check timeouts every serve cycle ''' interval, ping_timeout = self.get_pings() @@ -440,12 +462,31 @@ class PupyConnection(Connection): if not hasattr(async_event, '_ttl') or not async_event._ttl: continue + if async_event._ttl < now: + raise EOFError('Async timeout!') + etimeout = async_event._ttl - now + if __debug__: + logger.debug('etimeout = {} / mintimeout = {} / ttl = {}'.format( + etimeout, mintimeout, async_event._ttl)) + if mintimeout is None or etimeout < mintimeout: mintimeout = etimeout + timeout = mintimeout + + if __debug__: + logger.debug('Serve: start / timeout = {} / interval = {} / ping = {} / {}'.format( + timeout, interval, ping_timeout, self._last_ping)) + data = self._recv(timeout, wait_for_lock = False) + + if __debug__: + logger.debug('Serve: complete / data = {}'.format(len(data) if data else None)) + + self._data_queue.put(data) + if not data and interval and ping_timeout: ping = False if not self._last_ping: @@ -457,13 +498,23 @@ class PupyConnection(Connection): interval, ping_timeout)) self._last_ping = self.ping(timeout=ping_timeout, now=now) + else: + if __debug__: + logger.debug('Ping not required: {} < {}'.format( + now, self._last_ping + interval)) - return data - def dispatch(self, data): + def _dispatch(self): + if __debug__: + logger.debug('Dispatch start') + now = time.time() + data = self._data_queue.get() if data: + if __debug__: + logger.debug('Dispatch - data ({})'.format(len(data))) + msg, seq, args = brine._load(data) if msg == consts.MSG_REQUEST: if __debug__: @@ -474,28 +525,26 @@ class PupyConnection(Connection): if __debug__: logger.debug('Processing message response, seq: {} - started'.format(seq)) - locked = False - - if seq in self._sync_locks: - self._sync_locks[seq].acquire() - locked = True - - try: - if msg == consts.MSG_REPLY: - self._dispatch_reply(seq, args) - elif msg == consts.MSG_EXCEPTION: - self._dispatch_exception(seq, args) - else: - raise ValueError("invalid message type: %r" % (msg,)) - - finally: - if locked: - self._sync_locks[seq].release() + if msg == consts.MSG_REPLY: + self._dispatch_reply(seq, args) + elif msg == consts.MSG_EXCEPTION: + self._dispatch_exception(seq, args) + else: + raise ValueError("invalid message type: %r" % (msg,)) if __debug__: logger.debug('Processing message, seq: {} - completed'.format(seq)) - self._last_ping = now + self._last_ping = now + + elif self.closed: + if __debug__: + logger.debug('Dispatch interrupt - closed') + + return + else: + if __debug__: + logger.debug('Dispatch - no data') with self._async_lock: for async_event in self._async_callbacks.itervalues(): @@ -514,18 +563,11 @@ class PupyConnection(Connection): class PupyConnectionThread(Thread): def __init__(self, *args, **kwargs): - if 'lock' in kwargs: - self.lock = kwargs['lock'] - del kwargs['lock'] - else: - self.lock = Lock() - if __debug__: logger.debug('Create connection thread') self.pupy_srv = args[0] - self.connection = PupyConnection(self.lock, *args, **kwargs) - self.Initialized = Event() + self.connection = PupyConnection(*args, **kwargs) Thread.__init__(self) self.daemon = True @@ -537,43 +579,8 @@ class PupyConnectionThread(Thread): if __debug__: logger.debug('Run connection thread') - if __debug__: - logger.debug('Init connection') - - if not self.connection.initialize(): - if __debug__: - logger.debug('Initialization failed') - - return + self.connection.init() + self.connection.loop() if __debug__: - logger.info('Start serve loop') - - if __debug__: - logger.debug('Bind payload, serve with interruptions') - - try: - while not self.connection.closed: - if __debug__: - logger.debug('Connection thread loop. Inactive: {}'.format( - self.connection.inactive)) - - with self.lock: - data = self.connection.serve() - - self.connection.dispatch(data) - - except (EOFError, TypeError): - if __debug__: - logger.debug('Session closed'.format( - self.connection.inactive)) - pass - - except Exception, e: - if __debug__: - logger.exception(e) - - pass - - finally: - self.connection.close() + logger.debug('Connection thread closed') diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index 1e09d582..5e8ec290 100644 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -98,7 +98,7 @@ class PupyTCPServer(ThreadedServer): "Couldn't create IGD mapping: {}".format(e.description)) - def _setup_connection(self, lock, sock, queue): + def _setup_connection(self, sock, queue): '''Authenticate a client and if it succeeds, wraps the socket in a connection object. Note that this code is cut and paste from the rpyc internals and may have to be changed if rpyc evolves''' @@ -125,7 +125,7 @@ class PupyTCPServer(ThreadedServer): self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p)) connection = PupyConnection( - lock, self.pupy_srv, + self.pupy_srv, self.service, PupyChannel(stream), ping=stream.KEEP_ALIVE_REQUIRED or self.ping_interval, @@ -140,9 +140,8 @@ class PupyTCPServer(ThreadedServer): def _authenticate_and_serve_client(self, sock): queue = Queue(maxsize=1) - lock = Lock() - authentication = Thread(target=self._setup_connection, args=(lock, sock, queue)) + authentication = Thread(target=self._setup_connection, args=(sock, queue)) authentication.daemon = True authentication.start() @@ -156,19 +155,12 @@ class PupyTCPServer(ThreadedServer): self.logger.debug('{}:{} Wait for authentication result'.format(h, p)) connection, wrapper, credentials = queue.get(block=True, timeout=60) self.logger.debug('{}:{} Wait complete: {}'.format(h, p, connection)) - if connection: + if connection and connection._local_root: self.logger.debug('{}:{} Initializing service...') connection._init_service() - self.logger.debug('Bind server. Serving with interruptions') - while not connection.closed: - self.logger.debug('{}:{} Serving main loop. Inactive: {}'.format( - h, p, connection.inactive)) - - with lock: - data = connection.serve() - - connection.dispatch(data) + self.logger.debug('Bind server. Serving ...') + connection.loop() except Empty: self.logger.debug('{}:{} Timeout'.format(h, p)) diff --git a/pupy/network/lib/streams/PupySocketStream.py b/pupy/network/lib/streams/PupySocketStream.py index 7bd99b00..f62e29ad 100644 --- a/pupy/network/lib/streams/PupySocketStream.py +++ b/pupy/network/lib/streams/PupySocketStream.py @@ -46,7 +46,8 @@ class PupyChannel(Channel): self.compress = True self.COMPRESSION_LEVEL = 5 self.COMPRESSION_THRESHOLD = self.stream.MAX_IO_CHUNK - self._channel_lock = threading.RLock() + self._send_channel_lock = threading.RLock() + self._recv_channel_lock = threading.RLock() def consume(self): return self.stream.consume() @@ -56,12 +57,11 @@ class PupyChannel(Channel): def recv(self): # print "RECV", threading.currentThread() - with self._channel_lock: + with self._recv_channel_lock: return self._recv() def send(self, data): - # print "SEND", threading.currentThread() - with self._channel_lock: + with self._send_channel_lock: self._send(data) def _recv(self): @@ -128,7 +128,7 @@ class PupyChannel(Channel): portion = None lportion = 0 - # print "SEND .. ", ldata, data[:64].encode('hex') + # print "SEND .. ", ldata if self.compress and ldata > self.COMPRESSION_THRESHOLD: portion = data.peek(self.COMPRESSION_THRESHOLD) @@ -142,6 +142,7 @@ class PupyChannel(Channel): self.stream.write(self.FRAME_HEADER.pack(ldata, compressed), notify=False) self.stream.write(data, notify=False) self.stream.write(self.FLUSHER) + # print "SEND .. ", ldata, "DONE" return del portion @@ -175,7 +176,7 @@ class PupyChannel(Channel): del portion, data, cdata self.stream.insert(self.FRAME_HEADER.pack(total_length, compressed)) - #print "SEND WITH TOTAL LENGTH", total_length + # print "SEND WITH TOTAL LENGTH", total_length self.stream.write(self.FLUSHER) class PupySocketStream(SocketStream): @@ -290,7 +291,9 @@ class PupySocketStream(SocketStream): raise def read(self, count): - return self.waitfor(count).read(count) + promise = self.waitfor(count) + if promise: + return promise.read(count) def insert(self, data): with self.upstream_lock: