websockets: lINTEGER to specify payload length
This commit is contained in:
parent
cd2fb13b3e
commit
bddf81edfc
|
@ -357,19 +357,34 @@ class OptionsOrValue(_Component):
|
|||
|
||||
|
||||
class Integer(_Component):
|
||||
bounds = (None, None)
|
||||
preamble = ""
|
||||
|
||||
def __init__(self, value):
|
||||
v = int(value)
|
||||
outofbounds = any([
|
||||
self.bounds[0] is not None and v < self.bounds[0],
|
||||
self.bounds[1] is not None and v > self.bounds[1]
|
||||
])
|
||||
if outofbounds:
|
||||
raise exceptions.ParseException(
|
||||
"Integer value must be between %s and %s."%self.bounds,
|
||||
0, 0
|
||||
)
|
||||
self.value = str(value)
|
||||
|
||||
@classmethod
|
||||
def expr(klass):
|
||||
e = v_integer.copy()
|
||||
if klass.preamble:
|
||||
e = pp.Literal(klass.preamble).suppress() + e
|
||||
return e.setParseAction(lambda x: klass(*x))
|
||||
|
||||
def values(self, settings):
|
||||
return self.value
|
||||
|
||||
def spec(self):
|
||||
return "%s"%(self.value)
|
||||
return "%s%s"%(self.preamble, self.value)
|
||||
|
||||
def freeze(self, settings):
|
||||
return self
|
||||
|
|
|
@ -10,6 +10,7 @@ from . import base, generators, actions, message
|
|||
wf:-fin:-rsv1:-rsv2:-rsv3:-mask
|
||||
|
||||
wf:l234
|
||||
wf:mask:r"foo
|
||||
"""
|
||||
|
||||
|
||||
|
@ -64,11 +65,17 @@ class KeyNone(base.CaselessLiteral):
|
|||
TOK = "knone"
|
||||
|
||||
|
||||
class Length(base.Integer):
|
||||
bounds = (0, 1<<64)
|
||||
preamble = "l"
|
||||
|
||||
|
||||
class WebsocketFrame(message.Message):
|
||||
comps = (
|
||||
Body,
|
||||
|
||||
OpCode,
|
||||
Length,
|
||||
# Bit flags
|
||||
Fin,
|
||||
RSV1,
|
||||
|
@ -122,6 +129,10 @@ class WebsocketFrame(message.Message):
|
|||
def knone(self):
|
||||
return self.tok(KeyNone)
|
||||
|
||||
@property
|
||||
def toklength(self):
|
||||
return self.tok(Length)
|
||||
|
||||
@classmethod
|
||||
def expr(klass):
|
||||
parts = [i.expr() for i in klass.comps]
|
||||
|
@ -157,6 +168,8 @@ class WebsocketFrame(message.Message):
|
|||
else:
|
||||
bodygen = None
|
||||
length = 0
|
||||
if self.toklength:
|
||||
length = int(self.toklength.value)
|
||||
frameparts = dict(
|
||||
payload_length = length
|
||||
)
|
||||
|
|
|
@ -53,6 +53,14 @@
|
|||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td> l<a href="#valuespec">INTEGER</a> </td>
|
||||
<td>
|
||||
Set the payload length in the frame header, regardless of the
|
||||
actual body length.
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td> [-]mask </td>
|
||||
<td>
|
||||
|
|
|
@ -305,6 +305,17 @@ def test_integer():
|
|||
|
||||
assert v.freeze({}).value == v.value
|
||||
|
||||
class BInt(base.Integer):
|
||||
bounds = (1, 5)
|
||||
|
||||
tutils.raises("must be between", BInt, 0)
|
||||
tutils.raises("must be between", BInt, 6)
|
||||
assert BInt(5)
|
||||
assert BInt(1)
|
||||
assert BInt(3)
|
||||
|
||||
|
||||
|
||||
|
||||
class TBoolean(base.Boolean):
|
||||
name = "test"
|
||||
|
|
|
@ -15,6 +15,7 @@ class TestWebsocketFrame:
|
|||
"wf",
|
||||
"wf:dr",
|
||||
"wf:b'foo'",
|
||||
"wf:l1024:b'foo'",
|
||||
"wf:cbinary",
|
||||
"wf:c1",
|
||||
"wf:mask:knone",
|
||||
|
@ -106,3 +107,10 @@ class TestWebsocketFrame:
|
|||
self.fr,
|
||||
"wf:b'foo':mask:knone",
|
||||
)
|
||||
|
||||
def test_length(self):
|
||||
assert self.fr("wf:l3:b'foo'").header.payload_length == 3
|
||||
frm = self.fr("wf:l2:b'foo'")
|
||||
assert frm.header.payload_length == 2
|
||||
assert frm.payload == "fo"
|
||||
tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'")
|
||||
|
|
Loading…
Reference in New Issue