diff --git a/pupy/network/lib/connection.py b/pupy/network/lib/connection.py index e1ad1ec0..7858e274 100644 --- a/pupy/network/lib/connection.py +++ b/pupy/network/lib/connection.py @@ -194,35 +194,54 @@ class SyncRequestDispatchQueue(object): except Full: if __debug__: syncqueuelogger.debug( - 'Task not queued - no empty slots. Launch new worker (%s, %s)'.format( - self, self._pending_workers)) + 'Task not queued - no empty slots. Launch new worker (%s, %s)', + self, self._pending_workers) pass if not queued or not ack.wait(timeout=self.MAX_TASK_ACK_TIME, probe=0.1): with self._workers_lock: + if self._closed: + if __debug__: + syncqueuelogger.debug( + 'Queue (%s) closed, do not start new worker', self) + self._workers += 1 if self._workers > self._max_workers: self._max_workers = self._workers if __debug__: syncqueuelogger.info( - 'Max workers({}): {}'.format(self, self._max_workers)) + 'Max workers(%s): %s', + self, self._max_workers) - thread = Thread( - target=self._dispatch_request_worker, - name="SyncQueue Dispatcher" - ) - thread.daemon = True - thread.start() + thread = Thread( + target=self._dispatch_request_worker, + name="SyncQueue Dispatcher" + ) + thread.daemon = True + thread.start() def close(self): - self._closed = True - while True: + with self._workers_lock: + self._closed = True + + if __debug__: + syncqueuelogger.debug('Queue(%s) closing: %s', self) + try: - self._queue.put_nowait(None) - except Full: - break + while True: + try: + self._queue.put_nowait(None) + except Full: + break + + except Exception as e: + if __debug__: + syncqueuelogger.exception('Queue(%s) close: error: %s', self, e) + + if __debug__: + syncqueuelogger.debug('Queue(%s) closed', self) class PupyConnection(Connection): @@ -257,6 +276,9 @@ class PupyConnection(Connection): self._timer_event = None self._timer_event_last = None + self._initialized = False + self._deinitialized = False + self._closing = False if 'ping' in kwargs: ping = kwargs.pop('ping') @@ -287,6 +309,9 @@ class PupyConnection(Connection): logger.debug('New PupyConnection: (%s)', self) def _on_sync_request_exception(self, exc): + if __debug__: + logger.debug('Connection(%s) - sync request exception %s', self, exc) + if not isinstance(exc, EOFError): logger.exception('%s: %s', self, exc) @@ -477,12 +502,11 @@ class PupyConnection(Connection): logger.debug( 'Dispatch async reply(%s): %s - complete', self, seq) - def close(self, _catchall=True): - with self._close_lock: - if self._closed: - return + def _close_rpyc(self, _catchall=True): + if self._closed: + return - self._closed = True + self._closed = True if __debug__: trace = traceback.extract_stack() @@ -491,8 +515,6 @@ class PupyConnection(Connection): self, *trace[-2]) try: - self.buf_in.wake() - self._async_request(consts.HANDLE_CLOSE) except EOFError, e: logger.info( @@ -502,23 +524,76 @@ class PupyConnection(Connection): except Exception: if not _catchall: raise - finally: - try: - self._cleanup(_anyway=True) - except Exception, e: + + def _close_chan(self, _catchall=True): + if self._deinitialized: + if __debug__: + logger.debug('Connection(%s) - already deinitialized', self) + + return + + self._deinitialized = True + + try: + if __debug__: + logger.debug('Connection(%s) - cleanup', self) + + self._cleanup(_anyway=True) + + if self._channel and hasattr(self._channel, 'wake'): if __debug__: - logger.debug('Cleanup exception(%s): %s', self, e) + logger.debug('Connection(%s) - wake buf_in (%s)', self, self._channel) - pass + self._channel.wake() - _sync_events = self._sync_events.keys() - for lock in _sync_events: - lock = self._sync_events.get(lock) - if lock: - lock.set() + except Exception, e: + if __debug__: + logger.debug('Connection(%s) - cleanup exception - %s', self, e) + pass if __debug__: - logger.debug('Connection(%s) - closed', self) + logger.debug('Connection(%s) - cleanup locks', self) + + with self._sync_events_lock: + for lock in self._sync_events.values(): + try: + lock.set() + except Exception as e: + + if __debug__: + logger.exception( + 'Connection(%s) - ack failed: %s', self, e) + + pass + + if __debug__: + try: + logger.debug('Connection(%s) - closed:', self) + except Exception as e: + logger.exception(e) + + def close(self, _catchall=True): + with self._close_lock: + if self._closing: + return + + self._closing = True + + try: + self._close_rpyc(_catchall) + except Exception as e: + if __debug__: + logger.exception('Connection(%s) - rpyc close - %s', self, e) + + pass + + try: + self._close_chan(_catchall) + except Exception as e: + if __debug__: + logger.exception('Connection(%s) - chan close - %s', self, e) + + pass @property def inactive(self): @@ -533,15 +608,19 @@ class PupyConnection(Connection): logger.debug('Check timeout(%s) - start', self) - while (time.time() - now < timeout) and not self._last_ping and not self.closed: - time.sleep(1) + promise = self.async_request(consts.HANDLE_PING, 'ping', timeout=timeout) - if not self._last_ping: - logger.info('Check timeout(%s) - failed', self) - if not self.closed: + while (time.time() - now < timeout) and not self.closed: + if promise.expired: + logger.info('Check timeout(%s) - failed', self) self.close() - else: - logger.debug('Check timeout(%s) - ok', self) + break + elif promise.ready: + logger.debug('Check timeout(%s) - ok', self) + self._initialized = True + break + else: + time.sleep(1) if self._local_root: t = Thread( @@ -555,7 +634,7 @@ class PupyConnection(Connection): self._init_service() except AttributeError: # Connection was broken in the middle - pass + raise EOFError() else: logger.debug('Local root is absent') @@ -585,7 +664,12 @@ class PupyConnection(Connection): self, type(e), e) try: - data = self._serve() + timeout = None + if not self._initialized: + timeout = 1 + + data = self._serve(timeout) + self._dispatch(data) continue diff --git a/pupy/network/lib/streams/PupySocketStream.py b/pupy/network/lib/streams/PupySocketStream.py index f44da3f7..a0b78ba6 100644 --- a/pupy/network/lib/streams/PupySocketStream.py +++ b/pupy/network/lib/streams/PupySocketStream.py @@ -55,10 +55,12 @@ class PupyChannel(Channel): self._recv_channel_lock = threading.Lock() def consume(self): - return self.stream.consume() + if hasattr(self.stream, 'consume'): + return self.stream.consume() def wake(self): - return self.stream.wake() + if hasattr(self.stream, 'wake'): + return self.stream.wake() def recv(self): # print "RECV", threading.currentThread() diff --git a/pupy/pupy/service.py b/pupy/pupy/service.py index 72a41ed3..cc035e17 100644 --- a/pupy/pupy/service.py +++ b/pupy/pupy/service.py @@ -126,30 +126,32 @@ class ReverseSlaveService(Service): ) def on_disconnect(self): - if self.client.terminated: - return - - for cleanup in self.exposed_cleanups: - try: - cleanup() - except Exception as e: - pupy.remote_error('Disconnect/cleanup: {}', e) - - self.exposed_cleanups = [] - try: - self._conn.close() - except: - pupy.remote_error('Disconnect/close: {}', e) + if self.client.terminated: + return - if os.name == 'posix': + for cleanup in self.exposed_cleanups: + try: + cleanup() + except Exception as e: + pupy.remote_error('Disconnect/cleanup: {}', e) + + self.exposed_cleanups = [] + + finally: try: - for _ in xrange(1024): - if not os.waitpid(-1, os.WNOHANG): - break + self._conn.close() + except Exception as e: + pupy.remote_error('Disconnect/close: {}', e) - except OSError: - pass + if os.name == 'posix': + try: + for _ in xrange(1024): + if not os.waitpid(-1, os.WNOHANG): + break + + except OSError: + pass def exposed_exit(self): logger.debug('TERMINATION REQUEST')