Switch to BSON for data serialization.

This commit is contained in:
Aldo Cortesi 2011-01-27 10:52:42 +13:00
parent 460107589c
commit 077272ec97
7 changed files with 520 additions and 5 deletions

View File

View File

@ -0,0 +1,24 @@
Copyright (c) 2010, Kou Man Tong
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Kou Man Tong nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,5 @@
BSON module taken from here:
https://github.com/martinkou/bson

View File

@ -0,0 +1,97 @@
#!/usr/bin/python -OOOO
# vim: set fileencoding=utf8 shiftwidth=4 tabstop=4 textwidth=80 foldmethod=marker :
# Copyright (c) 2010, Kou Man Tong. All rights reserved.
# For licensing, see LICENSE file included in the package.
"""
BSON serialization and deserialization logic.
Specifications taken from: http://bsonspec.org/#/specification
The following types are unsupported, because for data exchange purposes, they're
over-engineered:
0x06 (Undefined)
0x07 (ObjectId)
0x0b (Regex - Exactly which flavor do you want? Better let higher level
programmers make that decision.)
0x0c (DBPointer)
0x0d (JavaScript code)
0x0e (Symbol)
0x0f (JS w/ scope)
0x11 (MongoDB-specific timestamp)
For binaries, only the default 0x0 type is supported.
>>> a = {
... u"Item A" : u"String item A",
... u"Item D" : {u"ROFLOL" : u"Blah blah blah"},
... u"Item C" : [1, 123456789012345, None, "Party and Bad Romance"],
... u"Item B" : u"\u4e00\u9580\u4e94\u5091"
... }
>>> def sorted(obj, dfs_stack):
... keys = obj.keys()
... keys.sort()
... for i in keys: yield i
...
>>> def reverse(obj, dfs_stack):
... keys = obj.keys()
... keys.sort(reverse = True)
... for i in keys: yield i
...
>>> serialized = dumps(a, sorted)
>>> serialized
'\\x9f\\x00\\x00\\x00\\x02Item A\\x00\\x0e\\x00\\x00\\x00String item A\\x00\\x02Item B\\x00\\r\\x00\\x00\\x00\\xe4\\xb8\\x80\\xe9\\x96\\x80\\xe4\\xba\\x94\\xe5\\x82\\x91\\x00\\x04Item C\\x007\\x00\\x00\\x00\\x100\\x00\\x01\\x00\\x00\\x00\\x121\\x00y\\xdf\\r\\x86Hp\\x00\\x00\\n2\\x00\\x053\\x00\\x15\\x00\\x00\\x00\\x00Party and Bad Romance\\x00\\x03Item D\\x00 \\x00\\x00\\x00\\x02ROFLOL\\x00\\x0f\\x00\\x00\\x00Blah blah blah\\x00\\x00\\x00'
>>>
>>> b = loads(serialized)
>>> b
{u'Item C': [1, 123456789012345, None, 'Party and Bad Romance'], u'Item B': u'\\u4e00\\u9580\\u4e94\\u5091', u'Item A': u'String item A', u'Item D': {u'ROFLOL': u'Blah blah blah'}}
>>> reverse_serialized = dumps(a, reverse)
>>> reverse_serialized
'\\x9f\\x00\\x00\\x00\\x03Item D\\x00 \\x00\\x00\\x00\\x02ROFLOL\\x00\\x0f\\x00\\x00\\x00Blah blah blah\\x00\\x00\\x04Item C\\x007\\x00\\x00\\x00\\x100\\x00\\x01\\x00\\x00\\x00\\x121\\x00y\\xdf\\r\\x86Hp\\x00\\x00\\n2\\x00\\x053\\x00\\x15\\x00\\x00\\x00\\x00Party and Bad Romance\\x00\\x02Item B\\x00\\r\\x00\\x00\\x00\\xe4\\xb8\\x80\\xe9\\x96\\x80\\xe4\\xba\\x94\\xe5\\x82\\x91\\x00\\x02Item A\\x00\\x0e\\x00\\x00\\x00String item A\\x00\\x00'
>>> c = loads(reverse_serialized)
>>> c
{u'Item C': [1, 123456789012345, None, 'Party and Bad Romance'], u'Item B': u'\\u4e00\\u9580\\u4e94\\u5091', u'Item A': u'String item A', u'Item D': {u'ROFLOL': u'Blah blah blah'}}
"""
from codec import *
import network
__all__ = ["loads", "dumps"]
# {{{ Serialization and Deserialization
def dumps(obj, generator = None):
"""
Given a dict, outputs a BSON string.
generator is an optional function which accepts the dictionary/array being
encoded, the current DFS traversal stack, and outputs an iterator indicating
the correct encoding order for keys.
"""
if isinstance(obj, BSONCoding):
return encode_object(obj, [], generator_func = generator)
return encode_document(obj, [], generator_func = generator)
def loads(data):
"""
Given a BSON string, outputs a dict.
"""
return decode_document(data, 0)[1]
# }}}
# {{{ Socket Patchers
def patch_socket():
"""
Patches the Python socket class such that sockets can send and receive BSON
objects atomically.
This adds the following functions to socket:
recvbytes(bytes_needed, sock_buf = None) - reads bytes_needed bytes
atomically. Returns None if socket closed.
recvobj() - reads a BSON document from the socket atomically and returns
the deserialized dictionary. Returns None if socket closed.
sendobj(obj) - sends a BSON document to the socket atomically.
"""
from socket import socket
socket.recvbytes = network._recvbytes
socket.recvobj = network._recvobj
socket.sendobj = network._sendobj
# }}}

View File

@ -0,0 +1,323 @@
#!/usr/bin/python -OOOO
# vim: set fileencoding=utf8 shiftwidth=4 tabstop=4 textwidth=80 foldmethod=marker :
# Copyright (c) 2010, Kou Man Tong. All rights reserved.
# For licensing, see LICENSE file included in the package.
"""
Base codec functions for bson.
"""
import struct
import cStringIO
import calendar, pytz
from datetime import datetime
import warnings
from abc import ABCMeta, abstractmethod
# {{{ Error Classes
class MissingClassDefinition(ValueError):
def __init__(self, class_name):
super(MissingClassDefinition, self).__init__(
"No class definition for class %s" % (class_name,))
# }}}
# {{{ Warning Classes
class MissingTimezoneWarning(RuntimeWarning):
def __init__(self, *args):
args = list(args)
if len(args) < 1:
args.append("Input datetime object has no tzinfo, assuming UTC.")
super(MissingTimezoneWarning, self).__init__(*args)
# }}}
# {{{ Traversal Step
class TraversalStep(object):
def __init__(self, parent, key):
self.parent = parent
self.key = key
# }}}
# {{{ Custom Object Codec
class BSONCoding(object):
__metaclass__ = ABCMeta
@abstractmethod
def bson_encode(self):
pass
@abstractmethod
def bson_init(self, raw_values):
pass
classes = {}
def import_class(cls):
if not issubclass(cls, BSONCoding):
return
global classes
classes[cls.__name__] = cls
def import_classes(*args):
for cls in args:
import_class(cls)
def import_classes_from_modules(*args):
for module in args:
for item in module.__dict__:
if hasattr(item, "__new__") and hasattr(item, "__name__"):
import_class(item)
def encode_object(obj, traversal_stack, generator_func):
values = obj.bson_encode()
class_name = obj.__class__.__name__
values["$$__CLASS_NAME__$$"] = class_name
return encode_document(values, traversal_stack, obj, generator_func)
def encode_object_element(name, value, traversal_stack, generator_func):
return "\x03" + encode_cstring(name) + \
encode_object(value, traversal_stack,
generator_func = generator_func)
class _EmptyClass(object):
pass
def decode_object(raw_values):
global classes
class_name = raw_values["$$__CLASS_NAME__$$"]
cls = None
try:
cls = classes[class_name]
except KeyError, e:
raise MissingClassDefinition(class_name)
retval = _EmptyClass()
retval.__class__ = cls
retval.bson_init(raw_values)
return retval
# }}}
# {{{ Codec Logic
def encode_string(value):
value = value.encode("utf8")
length = len(value)
return struct.pack("<i%dsb" % (length,), length + 1, value, 0)
def decode_string(data, base):
length = struct.unpack("<i", data[base:base + 4])[0]
value = data[base + 4: base + 4 + length - 1]
value = value.decode("utf8")
return (base + 4 + length, value)
def encode_cstring(value):
if isinstance(value, unicode):
value = value.encode("utf8")
return value + "\x00"
def decode_cstring(data, base):
buf = cStringIO.StringIO()
length = 0
for character in data[base:]:
length += 1
if character == "\x00":
break
buf.write(character)
return (base + length, buf.getvalue().decode("utf8"))
def encode_binary(value):
length = len(value)
return struct.pack("<ib", length, 0) + value
def decode_binary(data, base):
length, binary_type = struct.unpack("<ib", data[base:base + 5])
return (base + 5 + length, data[base + 5:base + 5 + length])
def encode_double(value):
return struct.pack("<d", value)
def decode_double(data, base):
return (base + 8, struct.unpack("<d", data[base: base + 8])[0])
ELEMENT_TYPES = {
0x01 : "double",
0x02 : "string",
0x03 : "document",
0x04 : "array",
0x05 : "binary",
0x08 : "boolean",
0x09 : "UTCdatetime",
0x0A : "none",
0x10 : "int32",
0x12 : "int64"
}
def encode_double_element(name, value):
return "\x01" + encode_cstring(name) + encode_double(value)
def decode_double_element(data, base):
base, name = decode_cstring(data, base + 1)
base, value = decode_double(data, base)
return (base, name, value)
def encode_string_element(name, value):
return "\x02" + encode_cstring(name) + encode_string(value)
def decode_string_element(data, base):
base, name = decode_cstring(data, base + 1)
base, value = decode_string(data, base)
return (base, name, value)
def encode_value(name, value, buf, traversal_stack, generator_func):
if isinstance(value, BSONCoding):
buf.write(encode_object_element(name, value))
elif isinstance(value, float):
buf.write(encode_double_element(name, value))
elif isinstance(value, unicode):
buf.write(encode_string_element(name, value))
elif isinstance(value, dict):
buf.write(encode_document_element(name, value,
traversal_stack, generator_func))
elif isinstance(value, list) or isinstance(value, tuple):
buf.write(encode_array_element(name, value,
traversal_stack, generator_func))
elif isinstance(value, str):
buf.write(encode_binary_element(name, value))
elif isinstance(value, bool):
buf.write(encode_boolean_element(name, value))
elif isinstance(value, datetime):
buf.write(encode_UTCdatetime_element(name, value))
elif value is None:
buf.write(encode_none_element(name, value))
elif isinstance(value, int):
if value < -0x80000000 or value > 0x7fffffff:
buf.write(encode_int64_element(name, value))
else:
buf.write(encode_int32_element(name, value))
elif isinstance(value, long):
buf.write(encode_int64_element(name, value))
def encode_document(obj, traversal_stack,
traversal_parent = None,
generator_func = None):
buf = cStringIO.StringIO()
key_iter = obj.iterkeys()
if generator_func is not None:
key_iter = generator_func(obj, traversal_stack)
for name in key_iter:
value = obj[name]
traversal_stack.append(TraversalStep(traversal_parent or obj, name))
encode_value(name, value, buf, traversal_stack, generator_func)
traversal_stack.pop()
e_list = buf.getvalue()
e_list_length = len(e_list)
return struct.pack("<i%dsb" % (e_list_length,), e_list_length + 4 + 1,
e_list, 0)
def encode_array(array, traversal_stack,
traversal_parent = None,
generator_func = None):
buf = cStringIO.StringIO()
for i in xrange(0, len(array)):
value = array[i]
traversal_stack.append(TraversalStep(traversal_parent or array, i))
encode_value(unicode(i), value, buf, traversal_stack, generator_func)
traversal_stack.pop()
e_list = buf.getvalue()
e_list_length = len(e_list)
return struct.pack("<i%dsb" % (e_list_length,), e_list_length + 4 + 1,
e_list, 0)
def decode_element(data, base):
element_type = struct.unpack("<b", data[base:base + 1])[0]
element_description = ELEMENT_TYPES[element_type]
decode_func = globals()["decode_" + element_description + "_element"]
return decode_func(data, base)
def decode_document(data, base):
length = struct.unpack("<i", data[base:base + 4])[0]
end_point = base + length
base += 4
retval = {}
while base < end_point - 1:
base, name, value = decode_element(data, base)
retval[name] = value
if "$$__CLASS_NAME__$$" in retval:
retval = decode_object(retval)
return (end_point, retval)
def encode_document_element(name, value, traversal_stack, generator_func):
return "\x03" + encode_cstring(name) + \
encode_document(value, traversal_stack,
generator_func = generator_func)
def decode_document_element(data, base):
base, name = decode_cstring(data, base + 1)
base, value = decode_document(data, base)
return (base, name, value)
def encode_array_element(name, value, traversal_stack, generator_func):
return "\x04" + encode_cstring(name) + \
encode_array(value, traversal_stack, generator_func = generator_func)
def decode_array_element(data, base):
base, name = decode_cstring(data, base + 1)
base, value = decode_document(data, base)
retval = []
try:
i = 0
while True:
retval.append(value[unicode(i)])
i += 1
except KeyError:
pass
return (base, name, retval)
def encode_binary_element(name, value):
return "\x05" + encode_cstring(name) + encode_binary(value)
def decode_binary_element(data, base):
base, name = decode_cstring(data, base + 1)
base, value = decode_binary(data, base)
return (base, name, value)
def encode_boolean_element(name, value):
return "\x08" + encode_cstring(name) + struct.pack("<b", value)
def decode_boolean_element(data, base):
base, name = decode_cstring(data, base + 1)
value = not not struct.unpack("<b", data[base:base + 1])[0]
return (base + 1, name, value)
def encode_UTCdatetime_element(name, value):
if value.tzinfo is None:
warnings.warn(MissingTimezoneWarning(), None, 4)
value = int(round(calendar.timegm(value.utctimetuple()) * 1000 +
(value.microsecond / 1000.0)))
return "\x09" + encode_cstring(name) + struct.pack("<q", value)
def decode_UTCdatetime_element(data, base):
base, name = decode_cstring(data, base + 1)
value = datetime.fromtimestamp(struct.unpack("<q",
data[base:base + 8])[0] / 1000.0, pytz.utc)
return (base + 8, name, value)
def encode_none_element(name, value):
return "\x0a" + encode_cstring(name)
def decode_none_element(data, base):
base, name = decode_cstring(data, base + 1)
return (base, name, None)
def encode_int32_element(name, value):
return "\x10" + encode_cstring(name) + struct.pack("<i", value)
def decode_int32_element(data, base):
base, name = decode_cstring(data, base + 1)
value = struct.unpack("<i", data[base:base + 4])[0]
return (base + 4, name, value)
def encode_int64_element(name, value):
return "\x12" + encode_cstring(name) + struct.pack("<q", value)
def decode_int64_element(data, base):
base, name = decode_cstring(data, base + 1)
value = struct.unpack("<q", data[base:base + 8])[0]
return (base + 8, name, value)
# }}}

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python
import socket
try:
from cStringIO import StringIO
except ImportError, e:
from StringIO import StringIO
from struct import unpack
from __init__ import dumps, loads
def _bintoint(data):
return unpack("<i", data)[0]
def _sendobj(self, obj):
"""
Atomically send a BSON message.
"""
data = dumps(obj)
self.sendall(data)
def _recvobj(self):
"""
Atomic read of a BSON message.
This function either returns a dict, None, or raises a socket error.
If the return value is None, it means the socket is closed by the other side.
"""
sock_buf = self.recvbytes(4)
if sock_buf is None:
return None
message_length = _bintoint(sock_buf.getvalue())
sock_buf = self.recvbytes(message_length - 4, sock_buf)
if sock_buf is None:
return None
retval = loads(sock_buf.getvalue())
return retval
def _recvbytes(self, bytes_needed, sock_buf = None):
"""
Atomic read of bytes_needed bytes.
This function either returns exactly the nmber of bytes requested in a
StringIO buffer, None, or raises a socket error.
If the return value is None, it means the socket is closed by the other side.
"""
if sock_buf is None:
sock_buf = StringIO()
bytes_count = 0
while bytes_count < bytes_needed:
chunk = self.recv(min(bytes_needed - bytes_count, 32768))
part_count = len(chunk)
if part_count < 1:
return None
bytes_count += part_count
sock_buf.write(chunk)
return sock_buf

View File

@ -2,7 +2,7 @@
This module provides more sophisticated flow tracking. These match requests
with their responses, and provide filtering and interception facilities.
"""
import json
from contrib import bson
import proxy, threading
class ReplayConnection:
@ -148,12 +148,14 @@ class State:
return f
def dump_flows(self):
data = [i.get_state() for i in self.view]
return json.dumps(data)
data = dict(
flows =[i.get_state() for i in self.view]
)
return bson.dumps(data)
def load_flows(self, js, klass):
data = json.loads(js)
data = [klass.from_state(i) for i in data]
data = bson.loads(js)
data = [klass.from_state(i) for i in data["flows"]]
self.flow_list.extend(data)
def set_limit(self, limit):