From a64ff67db3ae02c8690bd4729ee49b467d6a8619 Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Fri, 2 Mar 2018 21:48:19 +0200 Subject: [PATCH] Create shortcut extention for compressed buffer --- pupy/network/lib/buffer.py | 39 +++++++++++++++++++++++++++++++------- pupy/pp.py | 7 ++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/pupy/network/lib/buffer.py b/pupy/network/lib/buffer.py index 67ddc58f..f647b256 100644 --- a/pupy/network/lib/buffer.py +++ b/pupy/network/lib/buffer.py @@ -3,8 +3,7 @@ import threading import sys - -import traceback +import zlib DEFAULT_FORCED_FLUSH_BUFFER_SIZE = 32768 DEFAULT_MAX_STR_SIZE = 4096 @@ -17,7 +16,8 @@ class Buffer(object): ALLOW_BUFFER_AS_DATA = True - def __init__(self, data='', on_write=None, transport_func=None, truncate=False, chunk_size=None): + def __init__(self, data='', on_write=None, transport_func=None, truncate=False, + chunk_size=None, compressed=False): """ Initialize a buffer with 'data'. """ @@ -36,6 +36,11 @@ class Buffer(object): self.transport = transport_func self.cookie = None self.chunk_size = None + self.compressor = None + if compressed: + self.compressor = zlib.compressobj( + compressed if type(compressed) is int else 9 + ) def on_write(self): if self.on_write_f: @@ -157,6 +162,9 @@ class Buffer(object): def insert(self, data): with self.data_lock: + if self.compressor: + raise ValueError('Insert is not supported for compressed buffers') + ldata = len(data) if self._bofft: if type(self._data[0]) in (bytearray, memoryview) and ldata <= self._bofft: @@ -179,13 +187,25 @@ class Buffer(object): return if isinstance(data, Buffer): - self._data += data._data - self._len += data._len + if self.compressor: + for chunk in data._data: + chunk = self.compressor.compress(chunk) + self._data.append(chunk) + self._len += len(chunk) + else: + self._data += data._data + self._len += data._len elif type(data) in (tuple, list): for chunk in data: - self._data.append(data) - self._len += len(data) + if self.compressor: + chunk = self.compressor.compress(chunk) + + self._data.append(chunk) + self._len += len(chunk) else: + if self.compressor: + data = self.compressor.compress(data) + if self._len and type(self._data[-1]) == type(data) and \ len(self._data[-1]) + len(data) <= DEFAULT_MAX_STR_SIZE: self._data[-1] += data @@ -211,6 +231,11 @@ class Buffer(object): self.on_write() def flush(self): + if self.compressor: + chunk = self.compressor.flush() + self._data.append(chunk) + self._len += len(chunk) + if self._len > 0: self.on_write() diff --git a/pupy/pp.py b/pupy/pp.py index 11846f2f..c12c2be7 100755 --- a/pupy/pp.py +++ b/pupy/pp.py @@ -73,6 +73,7 @@ from network import conf from network.lib.base_launcher import LauncherError from network.lib.connection import PupyConnection from network.lib.streams.PupySocketStream import PupyChannel +from network.lib.buffer import Buffer import shlex import zlib @@ -565,9 +566,9 @@ class ReverseSlaveService(Service): return packed_result def exposed_msgpack_dumps(self, obj, compressed=False): - data = umsgpack.dumps(obj) - if compressed: - data = zlib.compress(data) + data = Buffer(compressed=compressed) + umsgpack.dump(obj, data) + data.flush() return data def exposed_json_dumps(self, obj, compressed=False):