Revamp header components in language

This commit is contained in:
Aldo Cortesi 2015-05-02 22:32:57 +12:00
parent fffee660e5
commit 24437ba180
4 changed files with 153 additions and 150 deletions

View File

@ -4,7 +4,6 @@ import os
import copy
import abc
import contrib.pyparsing as pp
from netlib import http_uastrings
from .. import utils
from . import generators, exceptions
@ -234,91 +233,29 @@ class _Component(_Token):
return "".join(i[:] for i in self.values(settings or {}))
class _Header(_Component):
class KeyValue(_Component):
"""
A key/value pair.
klass.preamble: leader
"""
def __init__(self, key, value):
self.key, self.value = key, value
def values(self, settings):
return [
self.key.get_generator(settings),
": ",
self.value.get_generator(settings),
"\r\n",
]
class Header(_Header):
@classmethod
def expr(klass):
e = pp.Literal("h").suppress()
e = pp.Literal(klass.preamble).suppress()
e += Value
e += pp.Literal("=").suppress()
e += Value
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "h%s=%s"%(self.key.spec(), self.value.spec())
return "%s%s=%s"%(self.preamble, self.key.spec(), self.value.spec())
def freeze(self, settings):
return Header(self.key.freeze(settings), self.value.freeze(settings))
class ShortcutContentType(_Header):
def __init__(self, value):
_Header.__init__(self, ValueLiteral("Content-Type"), value)
@classmethod
def expr(klass):
e = pp.Literal("c").suppress()
e = e + Value
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "c%s"%(self.value.spec())
def freeze(self, settings):
return ShortcutContentType(self.value.freeze(settings))
class ShortcutLocation(_Header):
def __init__(self, value):
_Header.__init__(self, ValueLiteral("Location"), value)
@classmethod
def expr(klass):
e = pp.Literal("l").suppress()
e = e + Value
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "l%s"%(self.value.spec())
def freeze(self, settings):
return ShortcutLocation(self.value.freeze(settings))
class ShortcutUserAgent(_Header):
def __init__(self, value):
self.specvalue = value
if isinstance(value, basestring):
value = ValueLiteral(http_uastrings.get_by_shortcut(value)[2])
_Header.__init__(self, ValueLiteral("User-Agent"), value)
@classmethod
def expr(klass):
e = pp.Literal("u").suppress()
u = reduce(
operator.or_,
[pp.Literal(i[1]) for i in http_uastrings.UASTRINGS]
return self.__class__(
self.key.freeze(settings), self.value.freeze(settings)
)
e += u | Value
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "u%s"%self.specvalue
def freeze(self, settings):
return ShortcutUserAgent(self.value.freeze(settings))
class PathodSpec(_Token):
@ -407,12 +344,15 @@ class OptionsOrValue(_Component):
"""
Can be any of a specified set of options, or a value specifier.
"""
preamble = ""
def __init__(self, value):
# If it's a string, we were passed one of the options, so we upper-case
# it to be canonical. The user can specify a different case by using a
# string value literal.
self.option_used = False
if isinstance(value, basestring):
value = ValueLiteral(value.upper())
self.option_used = True
self.value = value
@classmethod
@ -421,6 +361,8 @@ class OptionsOrValue(_Component):
m = pp.MatchFirst(parts)
spec = m | Value.copy()
spec = spec.setParseAction(lambda x: klass(*x))
if klass.preamble:
spec = pp.Literal(klass.preamble).suppress() + spec
return spec
def values(self, settings):
@ -432,7 +374,7 @@ class OptionsOrValue(_Component):
s = self.value.spec()
if s[1:-1].lower() in self.options:
s = s[1:-1].lower()
return "%s"%s
return "%s%s"%(self.preamble, s)
def freeze(self, settings):
return self.__class__(self.value.freeze(settings))
@ -617,10 +559,6 @@ class _Message(object):
def actions(self):
return self.toks(_Action)
@property
def headers(self):
return self.toks(_Header)
def length(self, settings):
"""
Calculate the length of the base message without any applied

View File

@ -4,7 +4,7 @@ import abc
import contrib.pyparsing as pp
import netlib.websockets
from netlib import http_status
from netlib import http_status, http_uastrings
from . import base, generators, exceptions
@ -45,6 +45,49 @@ class Method(base.OptionsOrValue):
]
class _HeaderMixin(object):
def format_header(self, key, value):
return [key, ": ", value, "\r\n"]
def values(self, settings):
return self.format_header(
self.key.get_generator(settings),
self.value.get_generator(settings),
)
class Header(_HeaderMixin, base.KeyValue):
preamble = "h"
class ShortcutContentType(_HeaderMixin, base.PreValue):
preamble = "c"
key = base.ValueLiteral("Content-Type")
class ShortcutLocation(_HeaderMixin, base.PreValue):
preamble = "l"
key = base.ValueLiteral("Location")
class ShortcutUserAgent(_HeaderMixin, base.OptionsOrValue):
preamble = "u"
options = [i[1] for i in http_uastrings.UASTRINGS]
key = base.ValueLiteral("User-Agent")
def values(self, settings):
if self.option_used:
value = http_uastrings.get_by_shortcut(
self.value.val.lower()
)[2]
else:
value = self.value
return self.format_header(
self.key.get_generator(settings),
value
)
def get_header(val, headers):
"""
Header keys may be Values, so we have to "generate" them as we try the
@ -72,6 +115,10 @@ class _HTTPMessage(base._Message):
def preamble(self, settings): # pragma: no cover
pass
@property
def headers(self):
return self.toks(_HeaderMixin)
def values(self, settings):
vals = self.preamble(settings)
vals.append("\r\n")
@ -86,12 +133,12 @@ class _HTTPMessage(base._Message):
class Response(_HTTPMessage):
comps = (
Body,
base.Header,
Header,
base.PauseAt,
base.DisconnectAt,
base.InjectAt,
base.ShortcutContentType,
base.ShortcutLocation,
ShortcutContentType,
ShortcutLocation,
Raw,
Reason
)
@ -145,7 +192,7 @@ class Response(_HTTPMessage):
for i in hdrs.lst:
if not get_header(i[0], self.headers):
tokens.append(
base.Header(
Header(
base.ValueLiteral(i[0]),
base.ValueLiteral(i[1]))
)
@ -156,7 +203,7 @@ class Response(_HTTPMessage):
else:
length = len(self.body.value.get_generator(settings))
tokens.append(
base.Header(
Header(
base.ValueLiteral("Content-Length"),
base.ValueLiteral(str(length)),
)
@ -193,12 +240,12 @@ class Response(_HTTPMessage):
class Request(_HTTPMessage):
comps = (
Body,
base.Header,
Header,
base.PauseAt,
base.DisconnectAt,
base.InjectAt,
base.ShortcutContentType,
base.ShortcutUserAgent,
ShortcutContentType,
ShortcutUserAgent,
Raw,
base.PathodSpec,
)
@ -241,7 +288,7 @@ class Request(_HTTPMessage):
for i in netlib.websockets.client_handshake_headers().lst:
if not get_header(i[0], self.headers):
tokens.append(
base.Header(
Header(
base.ValueLiteral(i[0]),
base.ValueLiteral(i[1])
)
@ -251,7 +298,7 @@ class Request(_HTTPMessage):
if self.body:
length = len(self.body.value.get_generator(settings))
tokens.append(
base.Header(
Header(
base.ValueLiteral("Content-Length"),
base.ValueLiteral(str(length)),
)
@ -259,7 +306,7 @@ class Request(_HTTPMessage):
if settings.request_host:
if not get_header("Host", self.headers):
tokens.append(
base.Header(
Header(
base.ValueLiteral("Host"),
base.ValueLiteral(settings.request_host)
)
@ -302,7 +349,7 @@ class PathodErrorResponse(Response):
def make_error_response(reason, body=None):
tokens = [
Code("800"),
base.Header(
Header(
base.ValueLiteral("Content-Type"),
base.ValueLiteral("text/plain")
),

View File

@ -225,9 +225,20 @@ class TestMisc:
assert v2.value.val == v3.value.val
class TestHeaders:
def test_header(self):
e = base.Header.expr()
class TKeyValue(base.KeyValue):
preamble = "h"
def values(self, settings):
return [
self.key.get_generator(settings),
": ",
self.value.get_generator(settings),
"\r\n",
]
class TestKeyValue:
def test_simple(self):
e = TKeyValue.expr()
v = e.parseString("h'foo'='bar'")[0]
assert v.key.val == "foo"
assert v.value.val == "bar"
@ -239,69 +250,14 @@ class TestHeaders:
s = v.spec()
assert s == e.parseString(s)[0].spec()
def test_header_freeze(self):
e = base.Header.expr()
def test_freeze(self):
e = TKeyValue.expr()
v = e.parseString("h@10=@10'")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.key.val == v3.key.val
assert v2.value.val == v3.value.val
def test_ctype_shortcut(self):
e = base.ShortcutContentType.expr()
v = e.parseString("c'foo'")[0]
assert v.key.val == "Content-Type"
assert v.value.val == "foo"
s = v.spec()
assert s == e.parseString(s)[0].spec()
e = base.ShortcutContentType.expr()
v = e.parseString("c@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val
def test_location_shortcut(self):
e = base.ShortcutLocation.expr()
v = e.parseString("l'foo'")[0]
assert v.key.val == "Location"
assert v.value.val == "foo"
s = v.spec()
assert s == e.parseString(s)[0].spec()
e = base.ShortcutLocation.expr()
v = e.parseString("l@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val
def test_shortcuts(self):
assert language.parse_response("400:c'foo'").headers[0].key.val == "Content-Type"
assert language.parse_response("400:l'foo'").headers[0].key.val == "Location"
assert 'Android' in parse_request("get:/:ua").headers[0].value.val
assert parse_request("get:/:ua").headers[0].key.val == "User-Agent"
class TestShortcutUserAgent:
def test_location_shortcut(self):
e = base.ShortcutUserAgent.expr()
v = e.parseString("ua")[0]
assert "Android" in str(v.value)
assert v.spec() == "ua"
assert v.key.val == "User-Agent"
v = e.parseString("u'foo'")[0]
assert "foo" in str(v.value)
assert "foo" in v.spec()
v = e.parseString("u@100'")[0]
assert len(str(v.freeze({}).value)) > 100
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val
class Test_Action:

View File

@ -9,6 +9,12 @@ def parse_request(s):
return language.parse_requests(s)[0]
def render(r, settings=language.Settings()):
s = cStringIO.StringIO()
assert language.serve(r, s, settings)
return s.getvalue()
def test_make_error_response():
d = cStringIO.StringIO()
s = http.make_error_response("foo")
@ -258,3 +264,59 @@ class TestResponse:
tutils.raises("no websocket key", r.resolve, language.Settings())
res = r.resolve(language.Settings(websocket_key="foo"))
assert res.code.string() == "101"
def test_ctype_shortcut():
e = http.ShortcutContentType.expr()
v = e.parseString("c'foo'")[0]
assert v.key.val == "Content-Type"
assert v.value.val == "foo"
s = v.spec()
assert s == e.parseString(s)[0].spec()
e = http.ShortcutContentType.expr()
v = e.parseString("c@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val
def test_location_shortcut():
e = http.ShortcutLocation.expr()
v = e.parseString("l'foo'")[0]
assert v.key.val == "Location"
assert v.value.val == "foo"
s = v.spec()
assert s == e.parseString(s)[0].spec()
e = http.ShortcutLocation.expr()
v = e.parseString("l@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val
def test_shortcuts():
assert language.parse_response("400:c'foo'").headers[0].key.val == "Content-Type"
assert language.parse_response("400:l'foo'").headers[0].key.val == "Location"
assert "Android" in render(parse_request("get:/:ua"))
assert "User-Agent" in render(parse_request("get:/:ua"))
def test_user_agent():
e = http.ShortcutUserAgent.expr()
v = e.parseString("ua")[0]
assert "Android" in str(v.values({})[2])
e = http.ShortcutUserAgent.expr()
v = e.parseString("u'a'")[0]
assert "Android" not in str(v.values({})[2])
v = e.parseString("u@100'")[0]
assert len(str(v.freeze({}).value)) > 100
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val