Revert buffers back to Events, but only where really used

This commit is contained in:
Oleksii Shevchuk 2018-03-11 18:53:45 +02:00
parent 6afced9b90
commit 0868a4ddc6
2 changed files with 22 additions and 23 deletions

View File

@ -21,27 +21,20 @@ from time import time, sleep
class Ack(object):
""" Dumb (and fast, and unsafe) event replacement """
__slots__ = ( '_lock', '_is_set', '_wait_lock' )
__slots__ = ( '_lock', '_is_set', '_wait_lock' )
def __init__(self):
self._lock = Lock()
self._is_set = False
self._is_set = None
self._wait_lock = None
def clear(self):
with self._lock:
if self._is_set is None and self._wait_lock:
self._wait_lock.release()
self._is_set = False
def is_set(self):
with self._lock:
return self._is_set is True
def set(self):
with self._lock:
if self._is_set is None and self._wait_lock:
if self._is_set is False and self._wait_lock:
self._wait_lock.release()
self._is_set = True
@ -51,18 +44,18 @@ class Ack(object):
with self._lock:
if self._is_set:
return True
else:
self._is_set = None
elif self._is_set is None:
self._is_set = False
self._wait_lock = Lock()
self._wait_lock.acquire()
else:
raise ValueError('Already in wait state!')
self._wait_lock.acquire()
with self._lock:
if self._is_set is False:
# Cleared
self._wait_lock.release()
self._wait_lock = None
return self._is_set is True
@ -81,7 +74,7 @@ class Ack(object):
delay = min(timeout, probe, delay*2)
with self._lock:
if self._is_set is True:
if self._is_set:
return True
return False

View File

@ -12,8 +12,7 @@ __all__ = (
import sys
import zlib
from threading import Lock
from .ack import Ack
from threading import Lock, Event
DEFAULT_FORCED_FLUSH_BUFFER_SIZE = 32768
DEFAULT_MAX_STR_SIZE = 4096
@ -33,7 +32,7 @@ class Buffer(object):
ALLOW_BUFFER_AS_DATA = True
def __init__(self, data='', on_write=None, transport_func=None, truncate=False,
chunk_size=None, compressed=False):
chunk_size=None, compressed=False, shared=False):
"""
Initialize a buffer with 'data'.
"""
@ -44,7 +43,7 @@ class Buffer(object):
self.on_write_f = on_write
self.data_lock = Lock()
self.waiting = Ack()
self.waiting = Event() if shared else None
self.transport = transport_func
self.cookie = None
self.chunk_size = None
@ -78,19 +77,26 @@ class Buffer(object):
if self.on_write_f:
self.on_write_f()
self.waiting.set()
if self.waiting:
self.waiting.set()
def wait(self, timeout=0.1):
""" wait for a size """
if self._len > 0:
return True
else:
elif self.waiting is not None:
self.waiting.clear()
else:
raise ValueError('Bufer should be shared to use wait()')
self.waiting.wait(timeout)
return self._len > 0
def wake(self):
if not self.waiting:
raise ValueError('Bufer should be shared to use wake()')
self.waiting.set()
def _linearize(self, upto=None):