diff --git a/pupy/network/lib/ack.py b/pupy/network/lib/ack.py index c6d46afc..024e414e 100644 --- a/pupy/network/lib/ack.py +++ b/pupy/network/lib/ack.py @@ -21,27 +21,20 @@ from time import time, sleep class Ack(object): """ Dumb (and fast, and unsafe) event replacement """ - __slots__ = ( '_lock', '_is_set', '_wait_lock' ) + __slots__ = ( '_lock', '_is_set', '_wait_lock' ) def __init__(self): self._lock = Lock() - self._is_set = False + self._is_set = None self._wait_lock = None - def clear(self): - with self._lock: - if self._is_set is None and self._wait_lock: - self._wait_lock.release() - - self._is_set = False - def is_set(self): with self._lock: return self._is_set is True def set(self): with self._lock: - if self._is_set is None and self._wait_lock: + if self._is_set is False and self._wait_lock: self._wait_lock.release() self._is_set = True @@ -51,18 +44,18 @@ class Ack(object): with self._lock: if self._is_set: return True - else: - self._is_set = None + + elif self._is_set is None: + self._is_set = False self._wait_lock = Lock() self._wait_lock.acquire() + else: + raise ValueError('Already in wait state!') + self._wait_lock.acquire() with self._lock: - if self._is_set is False: - # Cleared - self._wait_lock.release() - self._wait_lock = None return self._is_set is True @@ -81,7 +74,7 @@ class Ack(object): delay = min(timeout, probe, delay*2) with self._lock: - if self._is_set is True: + if self._is_set: return True return False diff --git a/pupy/network/lib/buffer.py b/pupy/network/lib/buffer.py index c483a58c..9cd3c057 100644 --- a/pupy/network/lib/buffer.py +++ b/pupy/network/lib/buffer.py @@ -12,8 +12,7 @@ __all__ = ( import sys import zlib -from threading import Lock -from .ack import Ack +from threading import Lock, Event DEFAULT_FORCED_FLUSH_BUFFER_SIZE = 32768 DEFAULT_MAX_STR_SIZE = 4096 @@ -33,7 +32,7 @@ class Buffer(object): ALLOW_BUFFER_AS_DATA = True def __init__(self, data='', on_write=None, transport_func=None, truncate=False, - chunk_size=None, compressed=False): + chunk_size=None, compressed=False, shared=False): """ Initialize a buffer with 'data'. """ @@ -44,7 +43,7 @@ class Buffer(object): self.on_write_f = on_write self.data_lock = Lock() - self.waiting = Ack() + self.waiting = Event() if shared else None self.transport = transport_func self.cookie = None self.chunk_size = None @@ -78,19 +77,26 @@ class Buffer(object): if self.on_write_f: self.on_write_f() - self.waiting.set() + if self.waiting: + self.waiting.set() def wait(self, timeout=0.1): """ wait for a size """ + if self._len > 0: return True - else: + elif self.waiting is not None: self.waiting.clear() + else: + raise ValueError('Bufer should be shared to use wait()') self.waiting.wait(timeout) return self._len > 0 def wake(self): + if not self.waiting: + raise ValueError('Bufer should be shared to use wake()') + self.waiting.set() def _linearize(self, upto=None):