Create shortcut extention for compressed buffer

This commit is contained in:
Oleksii Shevchuk 2018-03-02 21:48:19 +02:00
parent fcb6433246
commit a64ff67db3
2 changed files with 36 additions and 10 deletions

View File

@ -3,8 +3,7 @@
import threading import threading
import sys import sys
import zlib
import traceback
DEFAULT_FORCED_FLUSH_BUFFER_SIZE = 32768 DEFAULT_FORCED_FLUSH_BUFFER_SIZE = 32768
DEFAULT_MAX_STR_SIZE = 4096 DEFAULT_MAX_STR_SIZE = 4096
@ -17,7 +16,8 @@ class Buffer(object):
ALLOW_BUFFER_AS_DATA = True ALLOW_BUFFER_AS_DATA = True
def __init__(self, data='', on_write=None, transport_func=None, truncate=False, chunk_size=None): def __init__(self, data='', on_write=None, transport_func=None, truncate=False,
chunk_size=None, compressed=False):
""" """
Initialize a buffer with 'data'. Initialize a buffer with 'data'.
""" """
@ -36,6 +36,11 @@ class Buffer(object):
self.transport = transport_func self.transport = transport_func
self.cookie = None self.cookie = None
self.chunk_size = None self.chunk_size = None
self.compressor = None
if compressed:
self.compressor = zlib.compressobj(
compressed if type(compressed) is int else 9
)
def on_write(self): def on_write(self):
if self.on_write_f: if self.on_write_f:
@ -157,6 +162,9 @@ class Buffer(object):
def insert(self, data): def insert(self, data):
with self.data_lock: with self.data_lock:
if self.compressor:
raise ValueError('Insert is not supported for compressed buffers')
ldata = len(data) ldata = len(data)
if self._bofft: if self._bofft:
if type(self._data[0]) in (bytearray, memoryview) and ldata <= self._bofft: if type(self._data[0]) in (bytearray, memoryview) and ldata <= self._bofft:
@ -179,13 +187,25 @@ class Buffer(object):
return return
if isinstance(data, Buffer): if isinstance(data, Buffer):
if self.compressor:
for chunk in data._data:
chunk = self.compressor.compress(chunk)
self._data.append(chunk)
self._len += len(chunk)
else:
self._data += data._data self._data += data._data
self._len += data._len self._len += data._len
elif type(data) in (tuple, list): elif type(data) in (tuple, list):
for chunk in data: for chunk in data:
self._data.append(data) if self.compressor:
self._len += len(data) chunk = self.compressor.compress(chunk)
self._data.append(chunk)
self._len += len(chunk)
else: else:
if self.compressor:
data = self.compressor.compress(data)
if self._len and type(self._data[-1]) == type(data) and \ if self._len and type(self._data[-1]) == type(data) and \
len(self._data[-1]) + len(data) <= DEFAULT_MAX_STR_SIZE: len(self._data[-1]) + len(data) <= DEFAULT_MAX_STR_SIZE:
self._data[-1] += data self._data[-1] += data
@ -211,6 +231,11 @@ class Buffer(object):
self.on_write() self.on_write()
def flush(self): def flush(self):
if self.compressor:
chunk = self.compressor.flush()
self._data.append(chunk)
self._len += len(chunk)
if self._len > 0: if self._len > 0:
self.on_write() self.on_write()

View File

@ -73,6 +73,7 @@ from network import conf
from network.lib.base_launcher import LauncherError from network.lib.base_launcher import LauncherError
from network.lib.connection import PupyConnection from network.lib.connection import PupyConnection
from network.lib.streams.PupySocketStream import PupyChannel from network.lib.streams.PupySocketStream import PupyChannel
from network.lib.buffer import Buffer
import shlex import shlex
import zlib import zlib
@ -565,9 +566,9 @@ class ReverseSlaveService(Service):
return packed_result return packed_result
def exposed_msgpack_dumps(self, obj, compressed=False): def exposed_msgpack_dumps(self, obj, compressed=False):
data = umsgpack.dumps(obj) data = Buffer(compressed=compressed)
if compressed: umsgpack.dump(obj, data)
data = zlib.compress(data) data.flush()
return data return data
def exposed_json_dumps(self, obj, compressed=False): def exposed_json_dumps(self, obj, compressed=False):