Rework PupyConnection to avoid nested locks

This commit is contained in:
Oleksii Shevchuk 2018-03-08 14:16:25 +02:00
parent 96e67364dd
commit 55069e64db
3 changed files with 221 additions and 219 deletions

View File

@ -10,6 +10,7 @@ import logging
logger = None
logger = logging.getLogger('pconn')
synclogger = logging.getLogger('sync')
from network.lib.buffer import Buffer
@ -132,13 +133,12 @@ class SyncRequestDispatchQueue(object):
break
class PupyConnection(Connection):
def __init__(self, lock, pupy_srv, *args, **kwargs):
def __init__(self, pupy_srv, *args, **kwargs):
self._sync_events = {}
self._sync_events_interrupts = {}
self._sync_locks = {}
self._connection_serve_lock = lock
self._sync_raw_replies = {}
self._sync_raw_exceptions = {}
self._async_lock = Lock()
self._sync_events_lock = Lock()
self._last_recv = time.time()
self._ping = False
self._ping_timeout = 30
@ -146,6 +146,9 @@ class PupyConnection(Connection):
self._last_ping = None
self._default_serve_timeout = 5
self._queue = SyncRequestDispatchQueue.get_queue()
self._data_queue = Queue()
self._serve_thread = Thread(target=self._serve_loop)
self._serve_thread.daemon = True
self._serve_interrupt = Event()
@ -176,6 +179,8 @@ class PupyConnection(Connection):
next(self._seqcounter)
self._serve_thread.start()
def _dispatch_request(self, seq, args):
super(PupyConnection, self)._dispatch_request(seq, args)
@ -194,25 +199,6 @@ class PupyConnection(Connection):
def wake(self):
self._channel.wake()
def initialize(self, timeout=10):
try:
Thread(
target=self._initialization_timeout, args=(timeout,)
).start()
self._init_service()
self.initialized.set()
except (EOFError, TypeError):
self.close()
return False
return self.initialized.is_set()
def _initialization_timeout(self, timeout):
self.initialized.wait(timeout)
if not self.initialized.is_set():
self.close()
def set_pings(self, ping=None, timeout=None):
if ping is not None:
try:
@ -243,99 +229,40 @@ class PupyConnection(Connection):
def sync_request(self, handler, *args):
seq = self._send_request(handler, args)
if __debug__:
logger.debug('Sync request: {} / {}'.format(seq, handler))
synclogger.debug('Sync request wait: {}'.format(seq))
while not ( self._sync_events[seq].is_set() or self.closed ):
if __debug__:
logger.debug('Sync poll until: {}'.format(seq))
synclocked = self._sync_locks[seq].acquire(False)
if synclocked and self._connection_serve_lock.acquire(False):
data = None
with self._sync_events_lock:
if seq in self._sync_events_interrupts:
self._sync_events_interrupts[seq].set()
try:
# Ensure event was not missed between previous lock
if self._sync_events[seq].is_set():
if __debug__:
logger.debug('Rollback sync poll: {}'.format(seq))
break
if __debug__:
logger.debug('Sync poll serve: {}'.format(seq))
data = self.serve(self._serve_timeout)
if not data:
if __debug__:
logger.debug('Sync poll serve interrupted: {}/inactive={}'.format(
seq, self.inactive))
if self._ping:
if __debug__:
logger.debug(
'Submit ping request (timeout: {}): {} - interrupted'.format(
self._serve_timeout, seq))
self.ping(timeout=self._ping_timeout)
if __debug__:
logger.debug('Submit ping request: {} - resumed'.format(seq))
finally:
if __debug__:
logger.debug('Sync poll serve complete. release: {}'.format(seq))
self._connection_serve_lock.release()
self._sync_locks[seq].release()
self.dispatch(data)
# Wake one pending event just in case
with self._sync_events_lock:
for event in self._sync_events_interrupts.itervalues():
if not event.is_set():
event.set()
break
else:
if synclocked:
self._sync_locks[seq].release()
if __debug__:
logger.debug('Sync poll wait: {} / {}'.format(seq, synclocked))
with self._sync_events_lock:
self._sync_events_interrupts[seq].clear()
self._sync_events_interrupts[seq].wait(timeout=self._serve_timeout)
if self._ping:
if __debug__:
logger.debug('Send ping (timeout: {})'.format(self._ping_timeout))
self.ping(timeout=self._ping_timeout)
if __debug__:
logger.debug('Send ping (timeout: {}) - sent'.format(self._ping_timeout))
if not self._sync_events[seq].is_set():
continue
if __debug__:
logger.debug('Sync poll complete: {}/inactive={}'.format(seq, self.inactive))
self._sync_events[seq].wait()
if __debug__:
logger.debug('Sync request handled: {}'.format(seq))
synclogger.debug('Sync request wait: {} - complete'.format(seq))
with self._sync_events_lock:
with self._sync_locks[seq]:
del self._sync_locks[seq]
del self._sync_events[seq]
del self._sync_events_interrupts[seq]
del self._sync_events[seq]
if __debug__:
synclogger.debug('Sync request process: {}'.format(seq))
if seq in self._sync_raw_replies:
if __debug__:
synclogger.debug('Dispatch sync reply: {} - start'.format(seq))
Connection._dispatch_reply(
self, seq, self._sync_raw_replies.pop(seq))
if __debug__:
synclogger.debug('Dispatch sync reply: {} - complete'.format(seq))
if seq in self._sync_raw_exceptions:
if __debug__:
synclogger.debug('Dispatch sync exception: {} - start'.format(seq))
Connection._dispatch_exception(
self, seq, self._sync_raw_exceptions.pop(seq))
if __debug__:
synclogger.debug('Dispatch sync exception: {} - complete'.format(seq))
if __debug__:
synclogger.debug('Sync request: {} - complete'.format(seq))
if self.closed:
raise EOFError('Connection was closed, seq: {}'.format(seq))
@ -356,17 +283,14 @@ class PupyConnection(Connection):
self._async_callbacks[seq] = async
else:
if __debug__:
logger.debug('Sync request: {}'.format(seq))
synclogger.debug('Sync request: {}'.format(seq))
with self._sync_events_lock:
self._sync_locks[seq] = RLock()
self._sync_events[seq] = Event()
self._sync_events_interrupts[seq] = Event()
self._sync_events[seq] = Event()
self._send(consts.MSG_REQUEST, seq, (handler, self._box(args)))
if __debug__:
logger.debug('Request submitted: {}'.format(seq))
synclogger.debug('Request submitted: {}'.format(seq))
return seq
@ -383,16 +307,21 @@ class PupyConnection(Connection):
with self._async_lock:
sync = seq not in self._async_callbacks
Connection._dispatch_reply(self, seq, raw)
if sync:
with self._sync_events_lock:
self._sync_events[seq].set()
if seq in self._sync_events_interrupts:
self._sync_events_interrupts[seq].set()
self._sync_raw_replies[seq] = raw
if __debug__:
logger.debug('Dispatch sync reply: {} - pass'.format(seq))
self._sync_events[seq].set()
if __debug__:
logger.debug('Dispatch reply: {} - complete'.format(seq))
else:
# We hope here that this request will not block x_x
if __debug__:
logger.debug('Dispatch async reply: {} - start'.format(seq))
Connection._dispatch_reply(self, seq, raw)
if __debug__:
logger.debug('Dispatch async reply: {} - complete'.format(seq))
def _dispatch_exception(self, seq, raw):
if __debug__:
@ -404,25 +333,118 @@ class PupyConnection(Connection):
with self._async_lock:
sync = seq not in self._async_callbacks
Connection._dispatch_exception(self, seq, raw)
with self._sync_events_lock:
if sync:
self._sync_events[seq].set()
if seq in self._sync_events_interrupts:
self._sync_events_interrupts[seq].set()
if sync:
self._sync_raw_exceptions[seq] = raw
if __debug__:
logger.debug('Dispatch sync exception: {} - pass'.format(seq))
self._sync_events[seq].set()
else:
if __debug__:
logger.debug('Dispatch async reply: {} - start'.format(seq))
Connection._dispatch_exception(self, seq, raw)
if __debug__:
logger.debug('Dispatch async reply: {} - complete'.format(seq))
def close(self, *args):
if self._closed:
return
if __debug__:
logger.debug('Connection - close - start')
try:
Connection.close(self, *args)
finally:
for lock in self._sync_events.itervalues():
lock.set()
self._data_queue.put(None)
if __debug__:
logger.debug('Connection - closed')
@property
def inactive(self):
return time.time() - self._last_recv
def serve(self, timeout=None):
raise NotImplemented('Serve method should not be used!')
def _init_service_with_notify(self, event):
try:
self._init_service()
finally:
event.set()
def init(self, timeout=60):
event = Event()
def check_timeout():
now = time.time()
while ( time.time() - now < timeout ) and not event.is_set() and not self.closed:
time.sleep(1)
if not event.is_set():
logger.error('timeout occured!')
if not self.closed:
self.close()
t = Thread(target=check_timeout)
t.daemon = True
t.start()
self._queue(
self._on_sync_request_exception, self._init_service_with_notify, event)
def loop(self):
if __debug__:
logger.debug('Dispatch loop started')
while not self.closed:
try:
self._dispatch()
except EOFError:
break
except Exception, e:
logger.exception(e)
break
if __debug__:
logger.debug('Dispatch loop completed - close connection')
self.close()
if __debug__:
logger.debug('Dispatch loop completed')
def _serve_loop(self):
if __debug__:
logger.debug('Serve loop started')
while not self.closed:
try:
self._serve()
continue
except EOFError:
logger.info('Serve loop - EOF')
except Exception, e:
logger.exception('Exception: {}: {}'.format(type(e), e))
break
if __debug__:
logger.debug('Serve loop completed')
self.close()
def _serve(self, timeout=None):
''' Check timeouts every serve cycle '''
interval, ping_timeout = self.get_pings()
@ -440,12 +462,31 @@ class PupyConnection(Connection):
if not hasattr(async_event, '_ttl') or not async_event._ttl:
continue
if async_event._ttl < now:
raise EOFError('Async timeout!')
etimeout = async_event._ttl - now
if __debug__:
logger.debug('etimeout = {} / mintimeout = {} / ttl = {}'.format(
etimeout, mintimeout, async_event._ttl))
if mintimeout is None or etimeout < mintimeout:
mintimeout = etimeout
timeout = mintimeout
if __debug__:
logger.debug('Serve: start / timeout = {} / interval = {} / ping = {} / {}'.format(
timeout, interval, ping_timeout, self._last_ping))
data = self._recv(timeout, wait_for_lock = False)
if __debug__:
logger.debug('Serve: complete / data = {}'.format(len(data) if data else None))
self._data_queue.put(data)
if not data and interval and ping_timeout:
ping = False
if not self._last_ping:
@ -457,13 +498,23 @@ class PupyConnection(Connection):
interval, ping_timeout))
self._last_ping = self.ping(timeout=ping_timeout, now=now)
else:
if __debug__:
logger.debug('Ping not required: {} < {}'.format(
now, self._last_ping + interval))
return data
def dispatch(self, data):
def _dispatch(self):
if __debug__:
logger.debug('Dispatch start')
now = time.time()
data = self._data_queue.get()
if data:
if __debug__:
logger.debug('Dispatch - data ({})'.format(len(data)))
msg, seq, args = brine._load(data)
if msg == consts.MSG_REQUEST:
if __debug__:
@ -474,28 +525,26 @@ class PupyConnection(Connection):
if __debug__:
logger.debug('Processing message response, seq: {} - started'.format(seq))
locked = False
if seq in self._sync_locks:
self._sync_locks[seq].acquire()
locked = True
try:
if msg == consts.MSG_REPLY:
self._dispatch_reply(seq, args)
elif msg == consts.MSG_EXCEPTION:
self._dispatch_exception(seq, args)
else:
raise ValueError("invalid message type: %r" % (msg,))
finally:
if locked:
self._sync_locks[seq].release()
if msg == consts.MSG_REPLY:
self._dispatch_reply(seq, args)
elif msg == consts.MSG_EXCEPTION:
self._dispatch_exception(seq, args)
else:
raise ValueError("invalid message type: %r" % (msg,))
if __debug__:
logger.debug('Processing message, seq: {} - completed'.format(seq))
self._last_ping = now
self._last_ping = now
elif self.closed:
if __debug__:
logger.debug('Dispatch interrupt - closed')
return
else:
if __debug__:
logger.debug('Dispatch - no data')
with self._async_lock:
for async_event in self._async_callbacks.itervalues():
@ -514,18 +563,11 @@ class PupyConnection(Connection):
class PupyConnectionThread(Thread):
def __init__(self, *args, **kwargs):
if 'lock' in kwargs:
self.lock = kwargs['lock']
del kwargs['lock']
else:
self.lock = Lock()
if __debug__:
logger.debug('Create connection thread')
self.pupy_srv = args[0]
self.connection = PupyConnection(self.lock, *args, **kwargs)
self.Initialized = Event()
self.connection = PupyConnection(*args, **kwargs)
Thread.__init__(self)
self.daemon = True
@ -537,43 +579,8 @@ class PupyConnectionThread(Thread):
if __debug__:
logger.debug('Run connection thread')
if __debug__:
logger.debug('Init connection')
if not self.connection.initialize():
if __debug__:
logger.debug('Initialization failed')
return
self.connection.init()
self.connection.loop()
if __debug__:
logger.info('Start serve loop')
if __debug__:
logger.debug('Bind payload, serve with interruptions')
try:
while not self.connection.closed:
if __debug__:
logger.debug('Connection thread loop. Inactive: {}'.format(
self.connection.inactive))
with self.lock:
data = self.connection.serve()
self.connection.dispatch(data)
except (EOFError, TypeError):
if __debug__:
logger.debug('Session closed'.format(
self.connection.inactive))
pass
except Exception, e:
if __debug__:
logger.exception(e)
pass
finally:
self.connection.close()
logger.debug('Connection thread closed')

View File

@ -98,7 +98,7 @@ class PupyTCPServer(ThreadedServer):
"Couldn't create IGD mapping: {}".format(e.description))
def _setup_connection(self, lock, sock, queue):
def _setup_connection(self, sock, queue):
'''Authenticate a client and if it succeeds, wraps the socket in a connection object.
Note that this code is cut and paste from the rpyc internals and may have to be
changed if rpyc evolves'''
@ -125,7 +125,7 @@ class PupyTCPServer(ThreadedServer):
self.logger.debug('{}:{} Authenticated. Starting connection'.format(h, p))
connection = PupyConnection(
lock, self.pupy_srv,
self.pupy_srv,
self.service,
PupyChannel(stream),
ping=stream.KEEP_ALIVE_REQUIRED or self.ping_interval,
@ -140,9 +140,8 @@ class PupyTCPServer(ThreadedServer):
def _authenticate_and_serve_client(self, sock):
queue = Queue(maxsize=1)
lock = Lock()
authentication = Thread(target=self._setup_connection, args=(lock, sock, queue))
authentication = Thread(target=self._setup_connection, args=(sock, queue))
authentication.daemon = True
authentication.start()
@ -156,19 +155,12 @@ class PupyTCPServer(ThreadedServer):
self.logger.debug('{}:{} Wait for authentication result'.format(h, p))
connection, wrapper, credentials = queue.get(block=True, timeout=60)
self.logger.debug('{}:{} Wait complete: {}'.format(h, p, connection))
if connection:
if connection and connection._local_root:
self.logger.debug('{}:{} Initializing service...')
connection._init_service()
self.logger.debug('Bind server. Serving with interruptions')
while not connection.closed:
self.logger.debug('{}:{} Serving main loop. Inactive: {}'.format(
h, p, connection.inactive))
with lock:
data = connection.serve()
connection.dispatch(data)
self.logger.debug('Bind server. Serving ...')
connection.loop()
except Empty:
self.logger.debug('{}:{} Timeout'.format(h, p))

View File

@ -46,7 +46,8 @@ class PupyChannel(Channel):
self.compress = True
self.COMPRESSION_LEVEL = 5
self.COMPRESSION_THRESHOLD = self.stream.MAX_IO_CHUNK
self._channel_lock = threading.RLock()
self._send_channel_lock = threading.RLock()
self._recv_channel_lock = threading.RLock()
def consume(self):
return self.stream.consume()
@ -56,12 +57,11 @@ class PupyChannel(Channel):
def recv(self):
# print "RECV", threading.currentThread()
with self._channel_lock:
with self._recv_channel_lock:
return self._recv()
def send(self, data):
# print "SEND", threading.currentThread()
with self._channel_lock:
with self._send_channel_lock:
self._send(data)
def _recv(self):
@ -128,7 +128,7 @@ class PupyChannel(Channel):
portion = None
lportion = 0
# print "SEND .. ", ldata, data[:64].encode('hex')
# print "SEND .. ", ldata
if self.compress and ldata > self.COMPRESSION_THRESHOLD:
portion = data.peek(self.COMPRESSION_THRESHOLD)
@ -142,6 +142,7 @@ class PupyChannel(Channel):
self.stream.write(self.FRAME_HEADER.pack(ldata, compressed), notify=False)
self.stream.write(data, notify=False)
self.stream.write(self.FLUSHER)
# print "SEND .. ", ldata, "DONE"
return
del portion
@ -175,7 +176,7 @@ class PupyChannel(Channel):
del portion, data, cdata
self.stream.insert(self.FRAME_HEADER.pack(total_length, compressed))
#print "SEND WITH TOTAL LENGTH", total_length
# print "SEND WITH TOTAL LENGTH", total_length
self.stream.write(self.FLUSHER)
class PupySocketStream(SocketStream):
@ -290,7 +291,9 @@ class PupySocketStream(SocketStream):
raise
def read(self, count):
return self.waitfor(count).read(count)
promise = self.waitfor(count)
if promise:
return promise.read(count)
def insert(self, data):
with self.upstream_lock: