diff --git a/pupy/network/lib/connection.py b/pupy/network/lib/connection.py index e4cd82f3..fb8117b6 100644 --- a/pupy/network/lib/connection.py +++ b/pupy/network/lib/connection.py @@ -278,14 +278,6 @@ class PupyConnection(Connection): self._last_ping = None self._default_serve_timeout = 5 self._queue = SyncRequestDispatchQueue.get_queue() - self._data_queue = Queue() - self._serve_thread = Thread( - target=self._serve_loop, - name="PupyConnection Serve Loop" - ) - self._serve_thread.daemon = True - - self._serve_interrupt = Event() if 'ping' in kwargs: ping = kwargs['ping'] @@ -312,8 +304,6 @@ class PupyConnection(Connection): next(self._seqcounter) - self._serve_thread.start() - def _queue_dispatch_request(self, seq, args): self._queue( self._on_sync_request_exception, @@ -493,9 +483,6 @@ class PupyConnection(Connection): if __debug__: logger.debug('Connection - close - start') - # Stop dispatch queue first - self._data_queue.put(None) - try: self._async_request(consts.HANDLE_CLOSE) except EOFError, e: @@ -562,35 +549,13 @@ class PupyConnection(Connection): self._on_sync_request_exception, self._init_service_with_notify) def loop(self): - if __debug__: - logger.debug('Dispatch loop started') - - while not self.closed: - try: - self._dispatch() - - except EOFError, e: - logger.info('Dispatch loop - EOF ({})'.format(e)) - - except Exception, e: - logger.exception(e) - break - - if __debug__: - logger.debug('Dispatch loop completed - close connection') - - self.close() - - if __debug__: - logger.debug('Dispatch loop completed') - - def _serve_loop(self): if __debug__: logger.debug('Serve loop started') while not self.closed: try: - self._serve() + data = self._serve() + self._dispatch(data) continue except EOFError, e: @@ -651,8 +616,6 @@ class PupyConnection(Connection): if __debug__: logger.debug('Serve: complete / data = {}'.format(len(data) if data else None)) - self._data_queue.put(data) - if not data and interval and ping_timeout: ping = False if not self._last_ping: @@ -669,14 +632,15 @@ class PupyConnection(Connection): logger.debug('Ping not required: {} < {}'.format( now, self._last_ping + interval)) + return data - def _dispatch(self): + + def _dispatch(self, data): if __debug__: logger.debug('Dispatch start') now = time.time() - data = self._data_queue.get() if data: if __debug__: logger.debug('Dispatch - data ({})'.format(len(data)))