# coding=utf-8 # Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os.path import sys PY_VERSION = sys.version_info[:2] import ctypes from collections import defaultdict import timeit import unittest from flatbuffers import compat from flatbuffers.compat import range_func as compat_range import flatbuffers from flatbuffers import number_types as N import MyGame # refers to generated code import MyGame.Example # refers to generated code import MyGame.Example.Any # refers to generated code import MyGame.Example.Color # refers to generated code import MyGame.Example.Monster # refers to generated code import MyGame.Example.Test # refers to generated code import MyGame.Example.Stat # refers to generated code import MyGame.Example.Vec3 # refers to generated code def assertRaises(test_case, fn, exception_class): ''' Backwards-compatible assertion for exceptions raised. ''' exc = None try: fn() except Exception as e: exc = e test_case.assertTrue(exc is not None) test_case.assertTrue(isinstance(exc, exception_class)) class TestWireFormat(unittest.TestCase): def test_wire_format(self): # Verify that using the generated Python code builds a buffer without # returning errors, and is interpreted correctly: gen_buf, gen_off = make_monster_from_generated_code() CheckReadBuffer(gen_buf, gen_off) # Verify that the canonical flatbuffer file is readable by the # generated Python code. Note that context managers are not part of # Python 2.5, so we use the simpler open/close methods here: f = open('monsterdata_test.mon', 'rb') canonicalWireData = f.read() f.close() CheckReadBuffer(bytearray(canonicalWireData), 0) # Write the generated buffer out to a file: f = open('monsterdata_python_wire.mon', 'wb') f.write(gen_buf[gen_off:]) f.close() def CheckReadBuffer(buf, offset): ''' CheckReadBuffer checks that the given buffer is evaluated correctly as the example Monster. ''' def asserter(stmt): ''' An assertion helper that is separated from TestCase classes. ''' if not stmt: raise AssertionError('CheckReadBuffer case failed') monster = MyGame.Example.Monster.Monster.GetRootAsMonster(buf, offset) asserter(monster.Hp() == 80) asserter(monster.Mana() == 150) asserter(monster.Name() == b'MyMonster') # initialize a Vec3 from Pos() vec = monster.Pos() asserter(vec is not None) # verify the properties of the Vec3 asserter(vec.X() == 1.0) asserter(vec.Y() == 2.0) asserter(vec.Z() == 3.0) asserter(vec.Test1() == 3.0) asserter(vec.Test2() == 2) # initialize a Test from Test3(...) t = MyGame.Example.Test.Test() t = vec.Test3(t) asserter(t is not None) # verify the properties of the Test asserter(t.A() == 5) asserter(t.B() == 6) # verify that the enum code matches the enum declaration: union_type = MyGame.Example.Any.Any asserter(monster.TestType() == union_type.Monster) # initialize a Table from a union field Test(...) table2 = monster.Test() asserter(type(table2) is flatbuffers.table.Table) # initialize a Monster from the Table from the union monster2 = MyGame.Example.Monster.Monster() monster2.Init(table2.Bytes, table2.Pos) asserter(monster2.Name() == b"Fred") # iterate through the first monster's inventory: asserter(monster.InventoryLength() == 5) invsum = 0 for i in compat_range(monster.InventoryLength()): v = monster.Inventory(i) invsum += int(v) asserter(invsum == 10) asserter(monster.Test4Length() == 2) # create a 'Test' object and populate it: test0 = monster.Test4(0) asserter(type(test0) is MyGame.Example.Test.Test) test1 = monster.Test4(1) asserter(type(test1) is MyGame.Example.Test.Test) # the position of test0 and test1 are swapped in monsterdata_java_wire # and monsterdata_test_wire, so ignore ordering v0 = test0.A() v1 = test0.B() v2 = test1.A() v3 = test1.B() sumtest12 = int(v0) + int(v1) + int(v2) + int(v3) asserter(sumtest12 == 100) asserter(monster.TestarrayofstringLength() == 2) asserter(monster.Testarrayofstring(0) == b"test1") asserter(monster.Testarrayofstring(1) == b"test2") asserter(monster.TestarrayoftablesLength() == 0) asserter(monster.TestnestedflatbufferLength() == 0) asserter(monster.Testempty() is None) class TestFuzz(unittest.TestCase): ''' Low level stress/fuzz test: serialize/deserialize a variety of different kinds of data in different combinations ''' binary_type = compat.binary_types[0] # this will always exist ofInt32Bytes = binary_type([0x83, 0x33, 0x33, 0x33]) ofInt64Bytes = binary_type([0x84, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44]) overflowingInt32Val = flatbuffers.encode.Get(flatbuffers.packer.int32, ofInt32Bytes, 0) overflowingInt64Val = flatbuffers.encode.Get(flatbuffers.packer.int64, ofInt64Bytes, 0) # Values we're testing against: chosen to ensure no bits get chopped # off anywhere, and also be different from eachother. boolVal = True int8Val = N.Int8Flags.py_type(-127) # 0x81 uint8Val = N.Uint8Flags.py_type(0xFF) int16Val = N.Int16Flags.py_type(-32222) # 0x8222 uint16Val = N.Uint16Flags.py_type(0xFEEE) int32Val = N.Int32Flags.py_type(overflowingInt32Val) uint32Val = N.Uint32Flags.py_type(0xFDDDDDDD) int64Val = N.Int64Flags.py_type(overflowingInt64Val) uint64Val = N.Uint64Flags.py_type(0xFCCCCCCCCCCCCCCC) # Python uses doubles, so force it here float32Val = N.Float32Flags.py_type(ctypes.c_float(3.14159).value) float64Val = N.Float64Flags.py_type(3.14159265359) def test_fuzz(self): return self.check_once(11, 100) def check_once(self, fuzzFields, fuzzObjects): testValuesMax = 11 # hardcoded to the number of scalar types builder = flatbuffers.Builder(0) l = LCG() objects = [0 for _ in compat_range(fuzzObjects)] # Generate fuzzObjects random objects each consisting of # fuzzFields fields, each of a random type. for i in compat_range(fuzzObjects): builder.StartObject(fuzzFields) for j in compat_range(fuzzFields): choice = int(l.Next()) % testValuesMax if choice == 0: builder.PrependBoolSlot(int(j), self.boolVal, False) elif choice == 1: builder.PrependInt8Slot(int(j), self.int8Val, 0) elif choice == 2: builder.PrependUint8Slot(int(j), self.uint8Val, 0) elif choice == 3: builder.PrependInt16Slot(int(j), self.int16Val, 0) elif choice == 4: builder.PrependUint16Slot(int(j), self.uint16Val, 0) elif choice == 5: builder.PrependInt32Slot(int(j), self.int32Val, 0) elif choice == 6: builder.PrependUint32Slot(int(j), self.uint32Val, 0) elif choice == 7: builder.PrependInt64Slot(int(j), self.int64Val, 0) elif choice == 8: builder.PrependUint64Slot(int(j), self.uint64Val, 0) elif choice == 9: builder.PrependFloat32Slot(int(j), self.float32Val, 0) elif choice == 10: builder.PrependFloat64Slot(int(j), self.float64Val, 0) else: raise RuntimeError('unreachable') off = builder.EndObject() # store the offset from the end of the builder buffer, # since it will keep growing: objects[i] = off # Do some bookkeeping to generate stats on fuzzes: stats = defaultdict(int) def check(table, desc, want, got): stats[desc] += 1 self.assertEqual(want, got, "%s != %s, %s" % (want, got, desc)) l = LCG() # Reset. # Test that all objects we generated are readable and return the # expected values. We generate random objects in the same order # so this is deterministic. for i in compat_range(fuzzObjects): table = flatbuffers.table.Table(builder.Bytes, len(builder.Bytes) - objects[i]) for j in compat_range(fuzzFields): field_count = flatbuffers.builder.VtableMetadataFields + j f = N.VOffsetTFlags.py_type(field_count * N.VOffsetTFlags.bytewidth) choice = int(l.Next()) % testValuesMax if choice == 0: check(table, "bool", self.boolVal, table.GetSlot(f, False, N.BoolFlags)) elif choice == 1: check(table, "int8", self.int8Val, table.GetSlot(f, 0, N.Int8Flags)) elif choice == 2: check(table, "uint8", self.uint8Val, table.GetSlot(f, 0, N.Uint8Flags)) elif choice == 3: check(table, "int16", self.int16Val, table.GetSlot(f, 0, N.Int16Flags)) elif choice == 4: check(table, "uint16", self.uint16Val, table.GetSlot(f, 0, N.Uint16Flags)) elif choice == 5: check(table, "int32", self.int32Val, table.GetSlot(f, 0, N.Int32Flags)) elif choice == 6: check(table, "uint32", self.uint32Val, table.GetSlot(f, 0, N.Uint32Flags)) elif choice == 7: check(table, "int64", self.int64Val, table.GetSlot(f, 0, N.Int64Flags)) elif choice == 8: check(table, "uint64", self.uint64Val, table.GetSlot(f, 0, N.Uint64Flags)) elif choice == 9: check(table, "float32", self.float32Val, table.GetSlot(f, 0, N.Float32Flags)) elif choice == 10: check(table, "float64", self.float64Val, table.GetSlot(f, 0, N.Float64Flags)) else: raise RuntimeError('unreachable') # If enough checks were made, verify that all scalar types were used: self.assertEqual(testValuesMax, len(stats), "fuzzing failed to test all scalar types: %s" % stats) class TestByteLayout(unittest.TestCase): ''' TestByteLayout checks the bytes of a Builder in various scenarios. ''' def assertBuilderEquals(self, builder, want_chars_or_ints): def integerize(x): if isinstance(x, compat.string_types): return ord(x) return x want_ints = list(map(integerize, want_chars_or_ints)) want = bytearray(want_ints) got = builder.Bytes[builder.Head():] # use the buffer directly self.assertEqual(want, got) def test_numbers(self): b = flatbuffers.Builder(0) self.assertBuilderEquals(b, []) b.PrependBool(True) self.assertBuilderEquals(b, [1]) b.PrependInt8(-127) self.assertBuilderEquals(b, [129, 1]) b.PrependUint8(255) self.assertBuilderEquals(b, [255, 129, 1]) b.PrependInt16(-32222) self.assertBuilderEquals(b, [0x22, 0x82, 0, 255, 129, 1]) # first pad b.PrependUint16(0xFEEE) # no pad this time: self.assertBuilderEquals(b, [0xEE, 0xFE, 0x22, 0x82, 0, 255, 129, 1]) b.PrependInt32(-53687092) self.assertBuilderEquals(b, [204, 204, 204, 252, 0xEE, 0xFE, 0x22, 0x82, 0, 255, 129, 1]) b.PrependUint32(0x98765432) self.assertBuilderEquals(b, [0x32, 0x54, 0x76, 0x98, 204, 204, 204, 252, 0xEE, 0xFE, 0x22, 0x82, 0, 255, 129, 1]) def test_numbers64(self): b = flatbuffers.Builder(0) b.PrependUint64(0x1122334455667788) self.assertBuilderEquals(b, [0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11]) b = flatbuffers.Builder(0) b.PrependInt64(0x1122334455667788) self.assertBuilderEquals(b, [0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11]) def test_1xbyte_vector(self): b = flatbuffers.Builder(0) self.assertBuilderEquals(b, []) b.StartVector(flatbuffers.number_types.Uint8Flags.bytewidth, 1, 1) self.assertBuilderEquals(b, [0, 0, 0]) # align to 4bytes b.PrependByte(1) self.assertBuilderEquals(b, [1, 0, 0, 0]) b.EndVector(1) self.assertBuilderEquals(b, [1, 0, 0, 0, 1, 0, 0, 0]) # padding def test_2xbyte_vector(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Uint8Flags.bytewidth, 2, 1) self.assertBuilderEquals(b, [0, 0]) # align to 4bytes b.PrependByte(1) self.assertBuilderEquals(b, [1, 0, 0]) b.PrependByte(2) self.assertBuilderEquals(b, [2, 1, 0, 0]) b.EndVector(2) self.assertBuilderEquals(b, [2, 0, 0, 0, 2, 1, 0, 0]) # padding def test_1xuint16_vector(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Uint16Flags.bytewidth, 1, 1) self.assertBuilderEquals(b, [0, 0]) # align to 4bytes b.PrependUint16(1) self.assertBuilderEquals(b, [1, 0, 0, 0]) b.EndVector(1) self.assertBuilderEquals(b, [1, 0, 0, 0, 1, 0, 0, 0]) # padding def test_2xuint16_vector(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Uint16Flags.bytewidth, 2, 1) self.assertBuilderEquals(b, []) # align to 4bytes b.PrependUint16(0xABCD) self.assertBuilderEquals(b, [0xCD, 0xAB]) b.PrependUint16(0xDCBA) self.assertBuilderEquals(b, [0xBA, 0xDC, 0xCD, 0xAB]) b.EndVector(2) self.assertBuilderEquals(b, [2, 0, 0, 0, 0xBA, 0xDC, 0xCD, 0xAB]) def test_create_ascii_string(self): b = flatbuffers.Builder(0) b.CreateString(u"foo", encoding='ascii') # 0-terminated, no pad: self.assertBuilderEquals(b, [3, 0, 0, 0, 'f', 'o', 'o', 0]) b.CreateString(u"moop", encoding='ascii') # 0-terminated, 3-byte pad: self.assertBuilderEquals(b, [4, 0, 0, 0, 'm', 'o', 'o', 'p', 0, 0, 0, 0, 3, 0, 0, 0, 'f', 'o', 'o', 0]) def test_create_utf8_string(self): b = flatbuffers.Builder(0) b.CreateString(u"Цлїςσδε") self.assertBuilderEquals(b, "\x0e\x00\x00\x00\xd0\xa6\xd0\xbb\xd1\x97" \ "\xcf\x82\xcf\x83\xce\xb4\xce\xb5\x00\x00") b.CreateString(u"フムアムカモケモ") self.assertBuilderEquals(b, "\x18\x00\x00\x00\xef\xbe\x8c\xef\xbe\x91" \ "\xef\xbd\xb1\xef\xbe\x91\xef\xbd\xb6\xef\xbe\x93\xef\xbd\xb9\xef" \ "\xbe\x93\x00\x00\x00\x00\x0e\x00\x00\x00\xd0\xa6\xd0\xbb\xd1\x97" \ "\xcf\x82\xcf\x83\xce\xb4\xce\xb5\x00\x00") def test_create_arbitrary_string(self): b = flatbuffers.Builder(0) s = "\x01\x02\x03" b.CreateString(s) # Default encoding is utf-8. # 0-terminated, no pad: self.assertBuilderEquals(b, [3, 0, 0, 0, 1, 2, 3, 0]) s2 = "\x04\x05\x06\x07" b.CreateString(s2) # Default encoding is utf-8. # 0-terminated, 3-byte pad: self.assertBuilderEquals(b, [4, 0, 0, 0, 4, 5, 6, 7, 0, 0, 0, 0, 3, 0, 0, 0, 1, 2, 3, 0]) def test_empty_vtable(self): b = flatbuffers.Builder(0) b.StartObject(0) self.assertBuilderEquals(b, []) b.EndObject() self.assertBuilderEquals(b, [4, 0, 4, 0, 4, 0, 0, 0]) def test_vtable_with_one_true_bool(self): b = flatbuffers.Builder(0) self.assertBuilderEquals(b, []) b.StartObject(1) self.assertBuilderEquals(b, []) b.PrependBoolSlot(0, True, False) b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 8, 0, # length of object including vtable offset 7, 0, # start of bool value 6, 0, 0, 0, # offset for start of vtable (int32) 0, 0, 0, # padded to 4 bytes 1, # bool value ]) def test_vtable_with_one_default_bool(self): b = flatbuffers.Builder(0) self.assertBuilderEquals(b, []) b.StartObject(1) self.assertBuilderEquals(b, []) b.PrependBoolSlot(0, False, False) b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 4, 0, # end of object from here 0, 0, # entry 1 is zero 6, 0, 0, 0, # offset for start of vtable (int32) ]) def test_vtable_with_one_int16(self): b = flatbuffers.Builder(0) b.StartObject(1) b.PrependInt16Slot(0, 0x789A, 0) b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 8, 0, # end of object from here 6, 0, # offset to value 6, 0, 0, 0, # offset for start of vtable (int32) 0, 0, # padding to 4 bytes 0x9A, 0x78, ]) def test_vtable_with_two_int16(self): b = flatbuffers.Builder(0) b.StartObject(2) b.PrependInt16Slot(0, 0x3456, 0) b.PrependInt16Slot(1, 0x789A, 0) b.EndObject() self.assertBuilderEquals(b, [ 8, 0, # vtable bytes 8, 0, # end of object from here 6, 0, # offset to value 0 4, 0, # offset to value 1 8, 0, 0, 0, # offset for start of vtable (int32) 0x9A, 0x78, # value 1 0x56, 0x34, # value 0 ]) def test_vtable_with_int16_and_bool(self): b = flatbuffers.Builder(0) b.StartObject(2) b.PrependInt16Slot(0, 0x3456, 0) b.PrependBoolSlot(1, True, False) b.EndObject() self.assertBuilderEquals(b, [ 8, 0, # vtable bytes 8, 0, # end of object from here 6, 0, # offset to value 0 5, 0, # offset to value 1 8, 0, 0, 0, # offset for start of vtable (int32) 0, # padding 1, # value 1 0x56, 0x34, # value 0 ]) def test_vtable_with_empty_vector(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Uint8Flags.bytewidth, 0, 1) vecend = b.EndVector(0) b.StartObject(1) b.PrependUOffsetTRelativeSlot(0, vecend, 0) b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 8, 0, 4, 0, # offset to vector offset 6, 0, 0, 0, # offset for start of vtable (int32) 4, 0, 0, 0, 0, 0, 0, 0, # length of vector (not in struct) ]) def test_vtable_with_empty_vector_of_byte_and_some_scalars(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Uint8Flags.bytewidth, 0, 1) vecend = b.EndVector(0) b.StartObject(2) b.PrependInt16Slot(0, 55, 0) b.PrependUOffsetTRelativeSlot(1, vecend, 0) b.EndObject() self.assertBuilderEquals(b, [ 8, 0, # vtable bytes 12, 0, 10, 0, # offset to value 0 4, 0, # offset to vector offset 8, 0, 0, 0, # vtable loc 8, 0, 0, 0, # value 1 0, 0, 55, 0, # value 0 0, 0, 0, 0, # length of vector (not in struct) ]) def test_vtable_with_1_int16_and_2vector_of_int16(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Int16Flags.bytewidth, 2, 1) b.PrependInt16(0x1234) b.PrependInt16(0x5678) vecend = b.EndVector(2) b.StartObject(2) b.PrependUOffsetTRelativeSlot(1, vecend, 0) b.PrependInt16Slot(0, 55, 0) b.EndObject() self.assertBuilderEquals(b, [ 8, 0, # vtable bytes 12, 0, # length of object 6, 0, # start of value 0 from end of vtable 8, 0, # start of value 1 from end of buffer 8, 0, 0, 0, # offset for start of vtable (int32) 0, 0, # padding 55, 0, # value 0 4, 0, 0, 0, # vector position from here 2, 0, 0, 0, # length of vector (uint32) 0x78, 0x56, # vector value 1 0x34, 0x12, # vector value 0 ]) def test_vtable_with_1_struct_of_1_int8__1_int16__1_int32(self): b = flatbuffers.Builder(0) b.StartObject(1) b.Prep(4+4+4, 0) b.PrependInt8(55) b.Pad(3) b.PrependInt16(0x1234) b.Pad(2) b.PrependInt32(0x12345678) structStart = b.Offset() b.PrependStructSlot(0, structStart, 0) b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 16, 0, # end of object from here 4, 0, # start of struct from here 6, 0, 0, 0, # offset for start of vtable (int32) 0x78, 0x56, 0x34, 0x12, # value 2 0, 0, # padding 0x34, 0x12, # value 1 0, 0, 0, # padding 55, # value 0 ]) def test_vtable_with_1_vector_of_2_struct_of_2_int8(self): b = flatbuffers.Builder(0) b.StartVector(flatbuffers.number_types.Int8Flags.bytewidth*2, 2, 1) b.PrependInt8(33) b.PrependInt8(44) b.PrependInt8(55) b.PrependInt8(66) vecend = b.EndVector(2) b.StartObject(1) b.PrependUOffsetTRelativeSlot(0, vecend, 0) b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 8, 0, 4, 0, # offset of vector offset 6, 0, 0, 0, # offset for start of vtable (int32) 4, 0, 0, 0, # vector start offset 2, 0, 0, 0, # vector length 66, # vector value 1,1 55, # vector value 1,0 44, # vector value 0,1 33, # vector value 0,0 ]) def test_table_with_some_elements(self): b = flatbuffers.Builder(0) b.StartObject(2) b.PrependInt8Slot(0, 33, 0) b.PrependInt16Slot(1, 66, 0) off = b.EndObject() b.Finish(off) self.assertBuilderEquals(b, [ 12, 0, 0, 0, # root of table: points to vtable offset 8, 0, # vtable bytes 8, 0, # end of object from here 7, 0, # start of value 0 4, 0, # start of value 1 8, 0, 0, 0, # offset for start of vtable (int32) 66, 0, # value 1 0, # padding 33, # value 0 ]) def test__one_unfinished_table_and_one_finished_table(self): b = flatbuffers.Builder(0) b.StartObject(2) b.PrependInt8Slot(0, 33, 0) b.PrependInt8Slot(1, 44, 0) off = b.EndObject() b.Finish(off) b.StartObject(3) b.PrependInt8Slot(0, 55, 0) b.PrependInt8Slot(1, 66, 0) b.PrependInt8Slot(2, 77, 0) off = b.EndObject() b.Finish(off) self.assertBuilderEquals(b, [ 16, 0, 0, 0, # root of table: points to object 0, 0, # padding 10, 0, # vtable bytes 8, 0, # size of object 7, 0, # start of value 0 6, 0, # start of value 1 5, 0, # start of value 2 10, 0, 0, 0, # offset for start of vtable (int32) 0, # padding 77, # value 2 66, # value 1 55, # value 0 12, 0, 0, 0, # root of table: points to object 8, 0, # vtable bytes 8, 0, # size of object 7, 0, # start of value 0 6, 0, # start of value 1 8, 0, 0, 0, # offset for start of vtable (int32) 0, 0, # padding 44, # value 1 33, # value 0 ]) def test_a_bunch_of_bools(self): b = flatbuffers.Builder(0) b.StartObject(8) b.PrependBoolSlot(0, True, False) b.PrependBoolSlot(1, True, False) b.PrependBoolSlot(2, True, False) b.PrependBoolSlot(3, True, False) b.PrependBoolSlot(4, True, False) b.PrependBoolSlot(5, True, False) b.PrependBoolSlot(6, True, False) b.PrependBoolSlot(7, True, False) off = b.EndObject() b.Finish(off) self.assertBuilderEquals(b, [ 24, 0, 0, 0, # root of table: points to vtable offset 20, 0, # vtable bytes 12, 0, # size of object 11, 0, # start of value 0 10, 0, # start of value 1 9, 0, # start of value 2 8, 0, # start of value 3 7, 0, # start of value 4 6, 0, # start of value 5 5, 0, # start of value 6 4, 0, # start of value 7 20, 0, 0, 0, # vtable offset 1, # value 7 1, # value 6 1, # value 5 1, # value 4 1, # value 3 1, # value 2 1, # value 1 1, # value 0 ]) def test_three_bools(self): b = flatbuffers.Builder(0) b.StartObject(3) b.PrependBoolSlot(0, True, False) b.PrependBoolSlot(1, True, False) b.PrependBoolSlot(2, True, False) off = b.EndObject() b.Finish(off) self.assertBuilderEquals(b, [ 16, 0, 0, 0, # root of table: points to vtable offset 0, 0, # padding 10, 0, # vtable bytes 8, 0, # size of object 7, 0, # start of value 0 6, 0, # start of value 1 5, 0, # start of value 2 10, 0, 0, 0, # vtable offset from here 0, # padding 1, # value 2 1, # value 1 1, # value 0 ]) def test_some_floats(self): b = flatbuffers.Builder(0) b.StartObject(1) b.PrependFloat32Slot(0, 1.0, 0.0) off = b.EndObject() self.assertBuilderEquals(b, [ 6, 0, # vtable bytes 8, 0, # size of object 4, 0, # start of value 0 6, 0, 0, 0, # vtable offset 0, 0, 128, 63, # value 0 ]) def make_monster_from_generated_code(): ''' Use generated code to build the example Monster. ''' b = flatbuffers.Builder(0) string = b.CreateString("MyMonster") test1 = b.CreateString("test1") test2 = b.CreateString("test2") fred = b.CreateString("Fred") MyGame.Example.Monster.MonsterStartInventoryVector(b, 5) b.PrependByte(4) b.PrependByte(3) b.PrependByte(2) b.PrependByte(1) b.PrependByte(0) inv = b.EndVector(5) MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddName(b, fred) mon2 = MyGame.Example.Monster.MonsterEnd(b) MyGame.Example.Monster.MonsterStartTest4Vector(b, 2) MyGame.Example.Test.CreateTest(b, 10, 20) MyGame.Example.Test.CreateTest(b, 30, 40) test4 = b.EndVector(2) MyGame.Example.Monster.MonsterStartTestarrayofstringVector(b, 2) b.PrependUOffsetTRelative(test2) b.PrependUOffsetTRelative(test1) testArrayOfString = b.EndVector(2) MyGame.Example.Monster.MonsterStart(b) pos = MyGame.Example.Vec3.CreateVec3(b, 1.0, 2.0, 3.0, 3.0, 2, 5, 6) MyGame.Example.Monster.MonsterAddPos(b, pos) MyGame.Example.Monster.MonsterAddHp(b, 80) MyGame.Example.Monster.MonsterAddName(b, string) MyGame.Example.Monster.MonsterAddInventory(b, inv) MyGame.Example.Monster.MonsterAddTestType(b, 1) MyGame.Example.Monster.MonsterAddTest(b, mon2) MyGame.Example.Monster.MonsterAddTest4(b, test4) MyGame.Example.Monster.MonsterAddTestarrayofstring(b, testArrayOfString) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) return b.Bytes, b.Head() class TestAllCodePathsOfExampleSchema(unittest.TestCase): def setUp(self, *args, **kwargs): super(TestAllCodePathsOfExampleSchema, self).setUp(*args, **kwargs) b = flatbuffers.Builder(0) MyGame.Example.Monster.MonsterStart(b) gen_mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(gen_mon) self.mon = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) def test_default_monster_pos(self): self.assertTrue(self.mon.Pos() is None) def test_nondefault_monster_mana(self): b = flatbuffers.Builder(0) MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddMana(b, 50) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) got_mon = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertEqual(50, got_mon.Mana()) def test_default_monster_hp(self): self.assertEqual(100, self.mon.Hp()) def test_default_monster_name(self): self.assertEqual('', self.mon.Name()) def test_default_monster_inventory_item(self): self.assertEqual(0, self.mon.Inventory(0)) def test_default_monster_inventory_length(self): self.assertEqual(0, self.mon.InventoryLength()) def test_default_monster_color(self): self.assertEqual(MyGame.Example.Color.Color.Blue, self.mon.Color()) def test_nondefault_monster_color(self): b = flatbuffers.Builder(0) color = MyGame.Example.Color.Color.Red MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddColor(b, color) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertEqual(MyGame.Example.Color.Color.Red, mon2.Color()) def test_default_monster_testtype(self): self.assertEqual(0, self.mon.TestType()) def test_default_monster_test_field(self): self.assertEqual(None, self.mon.Test()) def test_default_monster_test4_item(self): self.assertEqual(None, self.mon.Test4(0)) def test_default_monster_test4_length(self): self.assertEqual(0, self.mon.Test4Length()) def test_default_monster_testarrayofstring(self): self.assertEqual("", self.mon.Testarrayofstring(0)) def test_default_monster_testarrayofstring_length(self): self.assertEqual(0, self.mon.TestarrayofstringLength()) def test_default_monster_testarrayoftables(self): self.assertEqual(None, self.mon.Testarrayoftables(0)) def test_nondefault_monster_testarrayoftables(self): b = flatbuffers.Builder(0) # make a child Monster within a vector of Monsters: MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddHp(b, 99) sub_monster = MyGame.Example.Monster.MonsterEnd(b) # build the vector: MyGame.Example.Monster.MonsterStartTestarrayoftablesVector(b, 1) b.PrependUOffsetTRelative(sub_monster) vec = b.EndVector(1) # make the parent monster and include the vector of Monster: MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddTestarrayoftables(b, vec) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) # inspect the resulting data: mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Output(), 0) self.assertEqual(99, mon2.Testarrayoftables(0).Hp()) self.assertEqual(1, mon2.TestarrayoftablesLength()) def test_default_monster_testarrayoftables_length(self): self.assertEqual(0, self.mon.TestarrayoftablesLength()) def test_nondefault_monster_enemy(self): b = flatbuffers.Builder(0) # make an Enemy object: MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddHp(b, 88) enemy = MyGame.Example.Monster.MonsterEnd(b) b.Finish(enemy) # make the parent monster and include the vector of Monster: MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddEnemy(b, enemy) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) # inspect the resulting data: mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertEqual(88, mon2.Enemy().Hp()) def test_default_monster_testnestedflatbuffer(self): self.assertEqual(0, self.mon.Testnestedflatbuffer(0)) def test_default_monster_testnestedflatbuffer_length(self): self.assertEqual(0, self.mon.TestnestedflatbufferLength()) def test_nondefault_monster_testnestedflatbuffer(self): b = flatbuffers.Builder(0) MyGame.Example.Monster.MonsterStartTestnestedflatbufferVector(b, 3) b.PrependByte(4) b.PrependByte(2) b.PrependByte(0) sub_buf = b.EndVector(3) # make the parent monster and include the vector of Monster: MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddTestnestedflatbuffer(b, sub_buf) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) # inspect the resulting data: mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertEqual(3, mon2.TestnestedflatbufferLength()) self.assertEqual(0, mon2.Testnestedflatbuffer(0)) self.assertEqual(2, mon2.Testnestedflatbuffer(1)) self.assertEqual(4, mon2.Testnestedflatbuffer(2)) def test_nondefault_monster_testempty(self): b = flatbuffers.Builder(0) # make a Stat object: MyGame.Example.Stat.StatStart(b) MyGame.Example.Stat.StatAddVal(b, 123) my_stat = MyGame.Example.Stat.StatEnd(b) b.Finish(my_stat) # include the stat object in a monster: MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddTestempty(b, my_stat) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) # inspect the resulting data: mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertEqual(123, mon2.Testempty().Val()) def test_default_monster_testbool(self): self.assertFalse(self.mon.Testbool()) def test_nondefault_monster_testbool(self): b = flatbuffers.Builder(0) MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddTestbool(b, True) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) # inspect the resulting data: mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertTrue(mon2.Testbool()) def test_default_monster_testhashes(self): self.assertEqual(0, self.mon.Testhashs32Fnv1()) self.assertEqual(0, self.mon.Testhashu32Fnv1()) self.assertEqual(0, self.mon.Testhashs64Fnv1()) self.assertEqual(0, self.mon.Testhashu64Fnv1()) self.assertEqual(0, self.mon.Testhashs32Fnv1a()) self.assertEqual(0, self.mon.Testhashu32Fnv1a()) self.assertEqual(0, self.mon.Testhashs64Fnv1a()) self.assertEqual(0, self.mon.Testhashu64Fnv1a()) def test_nondefault_monster_testhashes(self): b = flatbuffers.Builder(0) MyGame.Example.Monster.MonsterStart(b) MyGame.Example.Monster.MonsterAddTesthashs32Fnv1(b, 1) MyGame.Example.Monster.MonsterAddTesthashu32Fnv1(b, 2) MyGame.Example.Monster.MonsterAddTesthashs64Fnv1(b, 3) MyGame.Example.Monster.MonsterAddTesthashu64Fnv1(b, 4) MyGame.Example.Monster.MonsterAddTesthashs32Fnv1a(b, 5) MyGame.Example.Monster.MonsterAddTesthashu32Fnv1a(b, 6) MyGame.Example.Monster.MonsterAddTesthashs64Fnv1a(b, 7) MyGame.Example.Monster.MonsterAddTesthashu64Fnv1a(b, 8) mon = MyGame.Example.Monster.MonsterEnd(b) b.Finish(mon) # inspect the resulting data: mon2 = MyGame.Example.Monster.Monster.GetRootAsMonster(b.Bytes, b.Head()) self.assertEqual(1, mon2.Testhashs32Fnv1()) self.assertEqual(2, mon2.Testhashu32Fnv1()) self.assertEqual(3, mon2.Testhashs64Fnv1()) self.assertEqual(4, mon2.Testhashu64Fnv1()) self.assertEqual(5, mon2.Testhashs32Fnv1a()) self.assertEqual(6, mon2.Testhashu32Fnv1a()) self.assertEqual(7, mon2.Testhashs64Fnv1a()) self.assertEqual(8, mon2.Testhashu64Fnv1a()) class TestVtableDeduplication(unittest.TestCase): ''' TestVtableDeduplication verifies that vtables are deduplicated. ''' def test_vtable_deduplication(self): b = flatbuffers.Builder(0) b.StartObject(4) b.PrependByteSlot(0, 0, 0) b.PrependByteSlot(1, 11, 0) b.PrependByteSlot(2, 22, 0) b.PrependInt16Slot(3, 33, 0) obj0 = b.EndObject() b.StartObject(4) b.PrependByteSlot(0, 0, 0) b.PrependByteSlot(1, 44, 0) b.PrependByteSlot(2, 55, 0) b.PrependInt16Slot(3, 66, 0) obj1 = b.EndObject() b.StartObject(4) b.PrependByteSlot(0, 0, 0) b.PrependByteSlot(1, 77, 0) b.PrependByteSlot(2, 88, 0) b.PrependInt16Slot(3, 99, 0) obj2 = b.EndObject() got = b.Bytes[b.Head():] want = bytearray([ 240, 255, 255, 255, # == -12. offset to dedupped vtable. 99, 0, 88, 77, 248, 255, 255, 255, # == -8. offset to dedupped vtable. 66, 0, 55, 44, 12, 0, 8, 0, 0, 0, 7, 0, 6, 0, 4, 0, 12, 0, 0, 0, 33, 0, 22, 11, ]) self.assertEqual((len(want), want), (len(got), got)) table0 = flatbuffers.table.Table(b.Bytes, len(b.Bytes) - obj0) table1 = flatbuffers.table.Table(b.Bytes, len(b.Bytes) - obj1) table2 = flatbuffers.table.Table(b.Bytes, len(b.Bytes) - obj2) def _checkTable(tab, voffsett_value, b, c, d): # vtable size got = tab.GetVOffsetTSlot(0, 0) self.assertEqual(12, got, 'case 0, 0') # object size got = tab.GetVOffsetTSlot(2, 0) self.assertEqual(8, got, 'case 2, 0') # default value got = tab.GetVOffsetTSlot(4, 0) self.assertEqual(voffsett_value, got, 'case 4, 0') got = tab.GetSlot(6, 0, N.Uint8Flags) self.assertEqual(b, got, 'case 6, 0') val = tab.GetSlot(8, 0, N.Uint8Flags) self.assertEqual(c, val, 'failed 8, 0') got = tab.GetSlot(10, 0, N.Uint8Flags) self.assertEqual(d, got, 'failed 10, 0') _checkTable(table0, 0, 11, 22, 33) _checkTable(table1, 0, 44, 55, 66) _checkTable(table2, 0, 77, 88, 99) class TestExceptions(unittest.TestCase): def test_object_is_nested_error(self): b = flatbuffers.Builder(0) b.StartObject(0) assertRaises(self, lambda: b.StartObject(0), flatbuffers.builder.IsNestedError) def test_object_is_not_nested_error(self): b = flatbuffers.Builder(0) assertRaises(self, lambda: b.EndObject(), flatbuffers.builder.IsNotNestedError) def test_struct_is_not_inline_error(self): b = flatbuffers.Builder(0) b.StartObject(0) assertRaises(self, lambda: b.PrependStructSlot(0, 1, 0), flatbuffers.builder.StructIsNotInlineError) def test_unreachable_error(self): b = flatbuffers.Builder(0) assertRaises(self, lambda: b.PrependUOffsetTRelative(1), flatbuffers.builder.OffsetArithmeticError) def test_create_string_is_nested_error(self): b = flatbuffers.Builder(0) b.StartObject(0) s = 'test1' assertRaises(self, lambda: b.CreateString(s), flatbuffers.builder.IsNestedError) def test_finished_bytes_error(self): b = flatbuffers.Builder(0) assertRaises(self, lambda: b.Output(), flatbuffers.builder.BuilderNotFinishedError) def CheckAgainstGoldDataGo(): try: gen_buf, gen_off = make_monster_from_generated_code() fn = 'monsterdata_go_wire.mon' if not os.path.exists(fn): print('Go-generated data does not exist, failed.') return False # would like to use a context manager here, but it's less # backwards-compatible: f = open(fn, 'rb') go_wire_data = f.read() f.close() CheckReadBuffer(bytearray(go_wire_data), 0) if not bytearray(gen_buf[gen_off:]) == bytearray(go_wire_data): raise AssertionError('CheckAgainstGoldDataGo failed') except: print('Failed to test against Go-generated test data.') return False print('Can read Go-generated test data, and Python generates bytewise identical data.') return True def CheckAgainstGoldDataJava(): try: gen_buf, gen_off = make_monster_from_generated_code() fn = 'monsterdata_java_wire.mon' if not os.path.exists(fn): print('Java-generated data does not exist, failed.') return False f = open(fn, 'rb') java_wire_data = f.read() f.close() CheckReadBuffer(bytearray(java_wire_data), 0) except: print('Failed to read Java-generated test data.') return False print('Can read Java-generated test data.') return True class LCG(object): ''' Include simple random number generator to ensure results will be the same cross platform. http://en.wikipedia.org/wiki/Park%E2%80%93Miller_random_number_generator ''' __slots__ = ['n'] InitialLCGSeed = 48271 def __init__(self): self.n = self.InitialLCGSeed def Reset(self): self.n = self.InitialLCGSeed def Next(self): self.n = ((self.n * 279470273) % 4294967291) & 0xFFFFFFFF return self.n def BenchmarkVtableDeduplication(count): ''' BenchmarkVtableDeduplication measures the speed of vtable deduplication by creating `prePop` vtables, then populating `count` objects with a different single vtable. When count is large (as in long benchmarks), memory usage may be high. ''' prePop = 10 builder = flatbuffers.Builder(0) # pre-populate some vtables: for i in compat_range(prePop): builder.StartObject(i) for j in compat_range(i): builder.PrependInt16Slot(j, j, 0) builder.EndObject() # benchmark deduplication of a new vtable: def f(): builder.StartObject(prePop) for j in compat_range(prePop): builder.PrependInt16Slot(j, j, 0) builder.EndObject() duration = timeit.timeit(stmt=f, number=count) rate = float(count) / duration print(('vtable deduplication rate: %.2f/sec' % rate)) def BenchmarkCheckReadBuffer(count, buf, off): ''' BenchmarkCheckReadBuffer measures the speed of flatbuffer reading by re-using the CheckReadBuffer function with the gold data. ''' def f(): CheckReadBuffer(buf, off) duration = timeit.timeit(stmt=f, number=count) rate = float(count) / duration data = float(len(buf) * count) / float(1024 * 1024) data_rate = data / float(duration) print(('traversed %d %d-byte flatbuffers in %.2fsec: %.2f/sec, %.2fMB/sec') % (count, len(buf), duration, rate, data_rate)) def BenchmarkMakeMonsterFromGeneratedCode(count, length): ''' BenchmarkMakeMonsterFromGeneratedCode measures the speed of flatbuffer creation by re-using the make_monster_from_generated_code function for generating gold data examples. ''' duration = timeit.timeit(stmt=make_monster_from_generated_code, number=count) rate = float(count) / duration data = float(length * count) / float(1024 * 1024) data_rate = data / float(duration) print(('built %d %d-byte flatbuffers in %.2fsec: %.2f/sec, %.2fMB/sec' % \ (count, length, duration, rate, data_rate))) def backward_compatible_run_tests(**kwargs): if PY_VERSION < (2, 6): sys.stderr.write("Python version less than 2.6 are not supported") sys.stderr.flush() return False # python2.6 has a reduced-functionality unittest.main function: if PY_VERSION == (2, 6): try: unittest.main(**kwargs) except SystemExit as e: if not e.code == 0: return False return True # python2.7 and above let us not exit once unittest.main is run: kwargs['exit'] = False kwargs['verbosity'] = 0 ret = unittest.main(**kwargs) if ret.result.errors or ret.result.failures: return False return True def main(): import os import sys if not len(sys.argv) == 4: sys.stderr.write('Usage: %s ' ' \n' % sys.argv[0]) sys.stderr.write(' Provide COMPARE_GENERATED_TO_GO=1 to check' 'for bytewise comparison to Go data.\n') sys.stderr.write(' Provide COMPARE_GENERATED_TO_JAVA=1 to check' 'for bytewise comparison to Java data.\n') sys.stderr.flush() sys.exit(1) kwargs = dict(argv=sys.argv[:-3]) # run tests, and run some language comparison checks if needed: success = backward_compatible_run_tests(**kwargs) if success and os.environ.get('COMPARE_GENERATED_TO_GO', 0) == "1": success = success and CheckAgainstGoldDataGo() if success and os.environ.get('COMPARE_GENERATED_TO_JAVA', 0) == "1": success = success and CheckAgainstGoldDataJava() if not success: sys.stderr.write('Tests failed, skipping benchmarks.\n') sys.stderr.flush() sys.exit(1) # run benchmarks (if 0, they will be a noop): bench_vtable = int(sys.argv[1]) bench_traverse = int(sys.argv[2]) bench_build = int(sys.argv[3]) if bench_vtable: BenchmarkVtableDeduplication(bench_vtable) if bench_traverse: buf, off = make_monster_from_generated_code() BenchmarkCheckReadBuffer(bench_traverse, buf, off) if bench_build: buf, off = make_monster_from_generated_code() BenchmarkMakeMonsterFromGeneratedCode(bench_build, len(buf)) if __name__ == '__main__': main()