From b9e4dd62e9ce960ff34c9e850d10248c34603c55 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 9 Aug 2016 17:38:56 +0100 Subject: [PATCH] Fix _UpdateStream race. --- econtext/core.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index a62e45f8..62ce85ad 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -166,7 +166,7 @@ class Channel(object): Receive an object from the remote, or return ``None`` if `timeout` is reached. """ - LOG.debug('%r.Receive(%r)', self, timeout) + LOG.debug('%r.Receive(timeout=%r)', self, timeout) try: data = self._queue.get(True, timeout) except Queue.Empty: @@ -394,11 +394,10 @@ class Stream(BasicStream): Enqueue `obj` to `handle`, and tell the broker we have output. """ LOG.debug('%r.Enqueue(%r, %r)', self, handle, obj) - encoded = self.Pickle((handle, obj)) - msg = struct.pack('>L', len(encoded)) + encoded - self._lock.acquire() try: + encoded = self.Pickle((handle, obj)) + msg = struct.pack('>L', len(encoded)) + encoded self._whmac.update(msg) self._output_buf += self._whmac.digest() + msg finally: @@ -721,7 +720,7 @@ class Broker(object): """ _waker = None - def __init__(self, log_level=logging.DEBUG): + def __init__(self, log_level=logging.INFO): self.log_level = log_level self._alive = True @@ -743,15 +742,19 @@ class Broker(object): def _UpdateStream(self, stream): LOG.debug('_UpdateStream(%r)', stream) - if stream.ReadMore() and stream.read_side.fileno(): - self._readers.add(stream.read_side) - else: - self._readers.discard(stream.read_side) + self._lock.acquire() + try: + if stream.ReadMore() and stream.read_side.fileno(): + self._readers.add(stream.read_side) + else: + self._readers.discard(stream.read_side) - if stream.WriteMore() and stream.write_side.fileno(): - self._writers.add(stream.write_side) - else: - self._writers.discard(stream.write_side) + if stream.WriteMore() and stream.write_side.fileno(): + self._writers.add(stream.write_side) + else: + self._writers.discard(stream.write_side) + finally: + self._lock.release() def UpdateStream(self, stream): LOG.debug('UpdateStream(%r)', stream) @@ -867,7 +870,7 @@ class ExternalContext(object): klass.__module__ = 'econtext.core' def _SetupLogging(self, log_level): - logging.basicConfig(level=log_level) + logging.basicConfig(level=log_level, filename='slave.txt') logging.getLogger('').handlers[0].formatter = Formatter(False) def _ReapFirstStage(self): @@ -921,9 +924,10 @@ class ExternalContext(object): self._SetupMaster(key) self._SetupImporter() #self._SetupStdio() - fd = open('/dev/null', 'w') - os.dup2(fd.fileno(), 1) - os.dup2(fd.fileno(), 2) + if 0: + fd = open('/dev/null', 'w') + os.dup2(fd.fileno(), 1) + os.dup2(fd.fileno(), 2) self.broker.Register(self.context) self._DispatchCalls()