py2--: inline type info

This commit is contained in:
Maximilian Hils 2016-10-16 20:56:46 -07:00
parent 3fbce7e981
commit 5a07892bfc
27 changed files with 213 additions and 246 deletions

View File

@ -295,8 +295,7 @@ class FlowListWalker(urwid.ListWalker):
class FlowListBox(urwid.ListBox):
def __init__(self, master):
# type: (mitmproxy.console.master.ConsoleMaster) -> None
def __init__(self, master: "mitmproxy.console.master.ConsoleMaster"):
self.master = master
super(FlowListBox, self).__init__(FlowListWalker(master, master.state))

View File

@ -102,9 +102,9 @@ footer = [
class FlowViewHeader(urwid.WidgetWrap):
def __init__(self, master, f):
self.master = master # type: "mitmproxy.console.master.ConsoleMaster"
self.flow = f # type: models.HTTPFlow
def __init__(self, master: "mitmproxy.console.master.ConsoleMaster", f: models.HTTPFlow):
self.master = master
self.flow = f
self._w = common.format_flow(
f,
False,

View File

@ -1,20 +1,20 @@
from __future__ import absolute_import, print_function, division
import abc
import copy
import six
import urwid
from mitmproxy.console import common
from mitmproxy.console import signals
from typing import Any # noqa
from typing import Callable # noqa
from typing import Any
from typing import Callable
from typing import Container # noqa
from typing import Iterable # noqa
from typing import Optional # noqa
from typing import Sequence # noqa
from typing import Tuple # noqa
import six
import urwid
from mitmproxy.console import common
from mitmproxy.console import signals
FOOTER = [
('heading_key', "enter"), ":edit ",
('heading_key', "q"), ":back ",
@ -24,33 +24,6 @@ FOOTER_EDITING = [
]
@six.add_metaclass(abc.ABCMeta)
class Column(object):
subeditor = None
def __init__(self, heading):
self.heading = heading
@abc.abstractmethod
def Display(self, data):
# type: () -> Cell
pass
@abc.abstractmethod
def Edit(self, data):
# type: () -> Cell
pass
@abc.abstractmethod
def blank(self):
# type: () -> Any
pass
def keypress(self, key, editor):
# type: (str, GridEditor) -> Optional[str]
return key
class Cell(urwid.WidgetWrap):
def get_data(self):
@ -60,17 +33,40 @@ class Cell(urwid.WidgetWrap):
"""
raise NotImplementedError()
def selectable(self):
def selectable(self) -> bool:
return True
@six.add_metaclass(abc.ABCMeta)
class Column(object):
subeditor = None
def __init__(self, heading):
self.heading = heading
@abc.abstractmethod
def Display(self, data) -> Cell:
pass
@abc.abstractmethod
def Edit(self, data) -> Cell:
pass
@abc.abstractmethod
def blank(self) -> Any:
pass
def keypress(self, key: str, editor: "GridEditor") -> Optional[str]:
return key
class GridRow(urwid.WidgetWrap):
def __init__(
self,
focused, # type: Optional[int]
editing, # type: bool
editor, # type: GridEditor
values # type: Tuple[Iterable[bytes], Container[int]
focused: Optional[int],
editing: bool,
editor: "GridEditor",
values: Tuple[Iterable[bytes], Container[int]]
):
self.focused = focused
self.editor = editor
@ -123,8 +119,8 @@ class GridWalker(urwid.ListWalker):
def __init__(
self,
lst, # type: Iterable[list]
editor # type: GridEditor
lst: Iterable[list],
editor: "GridEditor"
):
self.lst = [(i, set()) for i in lst]
self.editor = editor
@ -265,9 +261,9 @@ class GridEditor(urwid.WidgetWrap):
def __init__(
self,
master, # type: "mitmproxy.console.master.ConsoleMaster"
value, # type: Any
callback, # type: Callable[..., None]
master: "mitmproxy.console.master.ConsoleMaster",
value: Any,
callback: Callable[..., None],
*cb_args,
**cb_kwargs
):
@ -370,23 +366,20 @@ class GridEditor(urwid.WidgetWrap):
elif column.keypress(key, self) and not self.handle_key(key):
return self._w.keypress(size, key)
def data_out(self, data):
# type: (Sequence[list]) -> Any
def data_out(self, data: Sequence[list]) -> Any:
"""
Called on raw list data, before data is returned through the
callback.
"""
return data
def data_in(self, data):
# type: (Any) -> Iterable[list]
def data_in(self, data: Any) -> Iterable[list]:
"""
Called to prepare provided data.
"""
return data
def is_error(self, col, val):
# type: (int, Any) -> Optional[str]
def is_error(self, col: int, val: Any) -> Optional[str]:
"""
Return None, or a string error message.
"""

View File

@ -1,6 +1,7 @@
from __future__ import absolute_import, print_function, division
import os
from typing import Callable, Optional
import urwid
from mitmproxy.console import signals
@ -8,8 +9,7 @@ from mitmproxy.console.grideditor import base
from netlib import strutils
def read_file(filename, callback, escaped):
# type: (str, Callable[...,None], bool) -> Optional[str]
def read_file(filename: str, callback: Callable[..., None], escaped: bool) -> Optional[str]:
if not filename:
return
@ -70,27 +70,24 @@ class Column(base.Column):
class Display(base.Cell):
def __init__(self, data):
# type: (bytes) -> Display
def __init__(self, data: bytes):
self.data = data
escaped = strutils.bytes_to_escaped_str(data)
w = urwid.Text(escaped, wrap="any")
super(Display, self).__init__(w)
def get_data(self):
def get_data(self) -> bytes:
return self.data
class Edit(base.Cell):
def __init__(self, data):
# type: (bytes) -> Edit
def __init__(self, data: bytes):
data = strutils.bytes_to_escaped_str(data)
w = urwid.Edit(edit_text=data, wrap="any", multiline=True)
w = urwid.AttrWrap(w, "editfield")
super(Edit, self).__init__(w)
def get_data(self):
# type: () -> bytes
def get_data(self) -> bytes:
txt = self._w.get_text()[0].strip()
try:
return strutils.escaped_str_to_bytes(txt)

View File

@ -27,13 +27,12 @@ class Column(col_bytes.Column):
# This is the same for both edit and display.
class EncodingMixin(object):
def __init__(self, data, encoding_args):
# type: (str) -> TDisplay
def __init__(self, data: str, encoding_args):
self.encoding_args = encoding_args
data = data.encode(*self.encoding_args)
super(EncodingMixin, self).__init__(data)
def get_data(self):
def get_data(self) -> str:
data = super(EncodingMixin, self).get_data()
try:
return data.decode(*self.encoding_args)

View File

@ -15,7 +15,7 @@ import weakref
import six
import urwid
from typing import Optional # noqa
from typing import Optional
from mitmproxy import builtins
from mitmproxy import contentviews
@ -202,13 +202,13 @@ class ConsoleState(flow.State):
class Options(mitmproxy.options.Options):
def __init__(
self,
eventlog=False, # type: bool
follow=False, # type: bool
intercept=False, # type: bool
filter=None, # type: Optional[str]
palette=None, # type: Optional[str]
palette_transparent=False, # type: bool
no_mouse=False, # type: bool
eventlog: bool = False,
follow: bool = False,
intercept: bool = False,
filter: Optional[str] = None,
palette: Optional[str] = None,
palette_transparent: bool = False,
no_mouse: bool = False,
**kwargs
):
self.eventlog = eventlog

View File

@ -116,8 +116,7 @@ class ActionBar(urwid.WidgetWrap):
class StatusBar(urwid.WidgetWrap):
def __init__(self, master, helptext):
# type: (mitmproxy.console.master.ConsoleMaster, object) -> None
def __init__(self, master: "mitmproxy.console.master.ConsoleMaster", helptext):
self.master = master
self.helptext = helptext
self.ib = urwid.WidgetWrap(urwid.Text(""))

View File

@ -14,16 +14,18 @@ requests, the query parameters are passed as the ``query`` keyword argument.
"""
from __future__ import absolute_import, print_function, division
import cssutils
import datetime
import html2text
import jsbeautifier
import json
import logging
import lxml.etree
import lxml.html
import subprocess
import traceback
from typing import Mapping, Union, Generator, Tuple
import cssutils
import html2text
import jsbeautifier
import lxml.etree
import lxml.html
from PIL import ExifTags
from PIL import Image
from mitmproxy import exceptions
@ -33,7 +35,6 @@ from netlib import multidict
from netlib import strutils
from netlib.http import url
from six import BytesIO
from typing import Mapping # noqa
try:
import pyamf
@ -48,8 +49,7 @@ VIEW_CUTOFF = 512
KEY_MAX = 30
def pretty_json(s):
# type: (bytes) -> bytes
def pretty_json(s: bytes) -> bytes:
try:
p = json.loads(s.decode('utf-8'))
except ValueError:
@ -63,8 +63,9 @@ def pretty_json(s):
return pretty
def format_dict(d):
# type: (Mapping[Union[str,bytes], Union[str,bytes]]) -> Generator[Tuple[Union[str,bytes], Union[str,bytes]]]
def format_dict(
d: Mapping[Union[str, bytes], Union[str, bytes]]
) -> Generator[Tuple[Union[str, bytes], Union[str, bytes]], None, None]:
"""
Helper function that transforms the given dictionary into a list of
("key", key )
@ -97,7 +98,7 @@ class View(object):
def __call__(
self,
data, # type: bytes
data: bytes,
**metadata
):
"""

View File

@ -42,13 +42,12 @@ all other strings are returned as plain bytes.
import collections
import six
from typing import io, Union, Tuple # noqa
from typing import io, Union, Tuple
TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict]
def dumps(value):
# type: (TSerializable) -> bytes
def dumps(value: TSerializable) -> bytes:
"""
This function dumps a python object as a tnetstring.
"""
@ -60,8 +59,7 @@ def dumps(value):
return b''.join(q)
def dump(value, file_handle):
# type: (TSerializable, io.BinaryIO) -> None
def dump(value: TSerializable, file_handle: io.BinaryIO) -> None:
"""
This function dumps a python object as a tnetstring and
writes it to the given file.
@ -69,8 +67,7 @@ def dump(value, file_handle):
file_handle.write(dumps(value))
def _rdumpq(q, size, value):
# type: (collections.deque, int, TSerializable) -> int
def _rdumpq(q: collections.deque, size: int, value: TSerializable) -> int:
"""
Dump value as a tnetstring, to a deque instance, last chunks first.
@ -153,16 +150,14 @@ def _rdumpq(q, size, value):
raise ValueError("unserializable object: {} ({})".format(value, type(value)))
def loads(string):
# type: (bytes) -> TSerializable
def loads(string: bytes) -> TSerializable:
"""
This function parses a tnetstring into a python object.
"""
return pop(string)[0]
def load(file_handle):
# type: (io.BinaryIO) -> TSerializable
def load(file_handle: io.BinaryIO) -> TSerializable:
"""load(file) -> object
This function reads a tnetstring from a file and parses it into a
@ -189,8 +184,7 @@ def load(file_handle):
return parse(data_type, data)
def parse(data_type, data):
# type: (int, bytes) -> TSerializable
def parse(data_type: int, data: bytes) -> TSerializable:
if six.PY2:
data_type = ord(data_type)
if data_type == ord(b','):
@ -236,8 +230,7 @@ def parse(data_type, data):
raise ValueError("unknown type tag: {}".format(data_type))
def pop(data):
# type: (bytes) -> Tuple[TSerializable, bytes]
def pop(data: bytes) -> Tuple[TSerializable, bytes]:
"""
This function parses a tnetstring into a python object.
It returns a tuple giving the parsed object and a string

View File

@ -1,4 +1,2 @@
from typing import Callable # noqa
master = None # type: "mitmproxy.flow.FlowMaster"
log = None # type: "mitmproxy.controller.Log"

View File

@ -1,7 +1,7 @@
from __future__ import absolute_import, print_function, division
from typing import Optional # noqa
import typing # noqa
import typing
from typing import Optional
from mitmproxy import controller
from mitmproxy import exceptions
@ -19,10 +19,10 @@ class DumpError(Exception):
class Options(options.Options):
def __init__(
self,
keepserving=False, # type: bool
filtstr=None, # type: Optional[str]
flow_detail=1, # type: int
tfile=None, # type: Optional[typing.io.TextIO]
keepserving: bool = False,
filtstr: Optional[str] = None,
flow_detail: int = 1,
tfile: Optional[typing.io.TextIO] = None,
**kwargs
):
self.filtstr = filtstr

View File

@ -97,8 +97,7 @@ def python_code(flow):
return code
def is_json(headers, content):
# type: (netlib.http.Headers, bytes) -> bool
def is_json(headers: netlib.http.Headers, content: bytes) -> bool:
if headers:
ct = netlib.http.parse_content_type(headers.get("content-type", ""))
if ct and "%s/%s" % (ct[0], ct[1]) == "application/json":

View File

@ -4,7 +4,7 @@ This module handles the import of mitmproxy flows generated by old versions.
from __future__ import absolute_import, print_function, division
import six
from typing import Any # noqa
from typing import Any
from netlib import version, strutils
@ -70,16 +70,14 @@ def convert_018_019(data):
return data
def _convert_dict_keys(o):
# type: (Any) -> Any
def _convert_dict_keys(o: Any) -> Any:
if isinstance(o, dict):
return {strutils.native(k): _convert_dict_keys(v) for k, v in o.items()}
else:
return o
def _convert_dict_vals(o, values_to_convert):
# type: (dict, dict) -> dict
def _convert_dict_vals(o: dict, values_to_convert: dict) -> dict:
for k, v in values_to_convert.items():
if not o or k not in o:
continue
@ -90,8 +88,7 @@ def _convert_dict_vals(o, values_to_convert):
return o
def convert_unicode(data):
# type: (dict) -> dict
def convert_unicode(data: dict) -> dict:
"""
The Python 2 version of mitmproxy serializes everything as bytes.
This method converts between Python 3 and Python 2 dumpfiles.

View File

@ -489,8 +489,7 @@ bnf = _make()
TFilter = Callable[[Flow], bool]
def parse(s):
# type: (str) -> TFilter
def parse(s: str) -> TFilter:
try:
flt = bnf.parseString(s, parseAll=True)[0]
flt.pattern = s

View File

@ -67,14 +67,20 @@ class Flow(stateobject.StateObject):
This class is usually subclassed for each protocol, e.g. HTTPFlow.
"""
def __init__(self, type, client_conn, server_conn, live=None):
def __init__(
self,
type: str,
client_conn: ClientConnection,
server_conn: ServerConnection,
live=None
):
self.type = type
self.id = str(uuid.uuid4())
self.client_conn = client_conn # type: ClientConnection
self.server_conn = server_conn # type: ServerConnection
self.client_conn = client_conn
self.server_conn = server_conn
self.live = live
self.error = None # type: Error
self.error = None # type: Optional[Error]
self.intercepted = False # type: bool
self._backup = None # type: Optional[Flow]
self.reply = None

View File

@ -1,6 +1,8 @@
from __future__ import absolute_import, print_function, division
from typing import Tuple, Optional, Sequence
from mitmproxy import optmanager
from typing import Tuple, Optional, Sequence # noqa
APP_HOST = "mitm.it"
APP_PORT = 80
@ -24,62 +26,61 @@ class Options(optmanager.OptManager):
def __init__(
self,
# TODO: rename to onboarding_app_*
app=True, # type: bool
app_host=APP_HOST, # type: str
app_port=APP_PORT, # type: int
anticache=False, # type: bool
anticomp=False, # type: bool
client_replay=None, # type: Optional[str]
replay_kill_extra=False, # type: bool
keepserving=True, # type: bool
no_server=False, # type: bool
server_replay_nopop=False, # type: bool
refresh_server_playback=False, # type: bool
rfile=None, # type: Optional[str]
scripts=(), # type: Sequence[str]
showhost=False, # type: bool
replacements=(), # type: Sequence[Tuple[str, str, str]]
server_replay_use_headers=(), # type: Sequence[str]
setheaders=(), # type: Sequence[Tuple[str, str, str]]
server_replay=None, # type: Sequence[str]
stickycookie=None, # type: Optional[str]
stickyauth=None, # type: Optional[str]
stream_large_bodies=None, # type: Optional[str]
verbosity=2, # type: int
outfile=None, # type: Tuple[str, str]
server_replay_ignore_content=False, # type: bool
server_replay_ignore_params=(), # type: Sequence[str]
server_replay_ignore_payload_params=(), # type: Sequence[str]
server_replay_ignore_host=False, # type: bool
app: bool = True,
app_host: str = APP_HOST,
app_port: int = APP_PORT,
anticache: bool = False,
anticomp: bool = False,
client_replay: Optional[str] = None,
replay_kill_extra: bool = False,
keepserving: bool = True,
no_server: bool = False,
server_replay_nopop: bool = False,
refresh_server_playback: bool = False,
rfile: Optional[str] = None,
scripts: Sequence[str] = (),
showhost: bool = False,
replacements: Sequence[Tuple[str, str, str]] = (),
server_replay_use_headers: Sequence[str] = (),
setheaders: Sequence[Tuple[str, str, str]] = (),
server_replay: Sequence[str] = None,
stickycookie: Optional[str] = None,
stickyauth: Optional[str] = None,
stream_large_bodies: Optional[str] = None,
verbosity: int = 2,
outfile: Tuple[str, str] = None,
server_replay_ignore_content: bool = False,
server_replay_ignore_params: Sequence[str] = (),
server_replay_ignore_payload_params: Sequence[str] = (),
server_replay_ignore_host: bool = False,
# Proxy options
auth_nonanonymous=False, # type: bool
auth_singleuser=None, # type: Optional[str]
auth_htpasswd=None, # type: Optional[str]
add_upstream_certs_to_client_chain=False, # type: bool
body_size_limit=None, # type: Optional[int]
cadir = CA_DIR, # type: str
certs = (), # type: Sequence[Tuple[str, str]]
ciphers_client = DEFAULT_CLIENT_CIPHERS, # type: str
ciphers_server = None, # type: Optional[str]
clientcerts = None, # type: Optional[str]
http2 = True, # type: bool
ignore_hosts = (), # type: Sequence[str]
listen_host = "", # type: str
listen_port = LISTEN_PORT, # type: int
mode = "regular", # type: str
no_upstream_cert = False, # type: bool
rawtcp = False, # type: bool
websockets = False, # type: bool
spoof_source_address = False, # type: bool
upstream_server = "", # type: str
upstream_auth = "", # type: str
ssl_version_client="secure", # type: str
ssl_version_server="secure", # type: str
ssl_insecure=False, # type: bool
ssl_verify_upstream_trusted_cadir=None, # type: str
ssl_verify_upstream_trusted_ca=None, # type: str
tcp_hosts = (), # type: Sequence[str]
auth_nonanonymous: bool = False,
auth_singleuser: Optional[str] = None,
auth_htpasswd: Optional[str] = None,
add_upstream_certs_to_client_chain: bool = False,
body_size_limit: Optional[int] = None,
cadir: str = CA_DIR,
certs: Sequence[Tuple[str, str]] = (),
ciphers_client: str=DEFAULT_CLIENT_CIPHERS,
ciphers_server: Optional[str]=None,
clientcerts: Optional[str] = None,
http2: bool = True,
ignore_hosts: Sequence[str] = (),
listen_host: str = "",
listen_port: int = LISTEN_PORT,
mode: str = "regular",
no_upstream_cert: bool = False,
rawtcp: bool = False,
websockets: bool = False,
spoof_source_address: bool = False,
upstream_server: str = "",
upstream_auth: str = "",
ssl_version_client: str = "secure",
ssl_version_server: str = "secure",
ssl_insecure: bool = False,
ssl_verify_upstream_trusted_cadir: str = None,
ssl_verify_upstream_trusted_ca: str = None,
tcp_hosts: Sequence[str] = ()
):
# We could replace all assignments with clever metaprogramming,
# but type hints are a much more valueable asset.

View File

@ -2,6 +2,8 @@ from __future__ import absolute_import, print_function, division
import struct
import sys
from typing import Optional # noqa
from typing import Union
import construct
import six
@ -332,7 +334,7 @@ class TlsLayer(base.Layer):
self._server_tls = server_tls
self._custom_server_sni = custom_server_sni
self._client_hello = None # type: TlsClientHello
self._client_hello = None # type: Optional[TlsClientHello]
def __call__(self):
"""
@ -406,8 +408,7 @@ class TlsLayer(base.Layer):
if self._server_tls and not self.server_conn.tls_established:
self._establish_tls_with_server()
def set_server_tls(self, server_tls, sni=None):
# type: (bool, Union[str, None, False]) -> None
def set_server_tls(self, server_tls: bool, sni: Union[str, None, bool]=None) -> None:
"""
Set the TLS settings for the next server connection that will be established.
This function will not alter an existing connection.

View File

@ -4,13 +4,15 @@ import base64
import collections
import os
import re
from typing import Any
from netlib import strutils
import six
from OpenSSL import SSL, crypto
from mitmproxy import exceptions
from mitmproxy import options as moptions # noqa
from mitmproxy import options as moptions
from netlib import certutils
from netlib import tcp
from netlib.http import authentication
@ -71,8 +73,8 @@ def parse_upstream_auth(auth):
class ProxyConfig:
def __init__(self, options):
self.options = options # type: moptions.Options
def __init__(self, options: moptions.Options):
self.options = options
self.authenticator = None
self.check_ignore = None
@ -83,8 +85,7 @@ class ProxyConfig:
self.configure(options, set(options.keys()))
options.changed.connect(self.configure)
def configure(self, options, updated):
# type: (moptions.Options, Any) -> None
def configure(self, options: moptions.Options, updated: Any) -> None:
if options.add_upstream_certs_to_client_chain and not options.ssl_insecure:
raise exceptions.OptionsError(
"The verify-upstream-cert requires certificate verification to be disabled. "

View File

@ -20,8 +20,7 @@ from mitmproxy import contentviews
from netlib import version
def convert_flow_to_json_dict(flow):
# type: (models.Flow) -> dict
def convert_flow_to_json_dict(flow: models.Flow) -> dict:
"""
Remove flow message content and cert to save transmission space.
@ -123,8 +122,7 @@ class RequestHandler(BasicAuth, tornado.web.RequestHandler):
return self.application.master.state
@property
def master(self):
# type: () -> mitmproxy.web.master.WebMaster
def master(self) -> "mitmproxy.web.master.WebMaster":
return self.application.master
@property

View File

@ -6,7 +6,7 @@ import collections
import tornado.httpserver
import tornado.ioloop
from typing import Optional # noqa
from typing import Optional
from mitmproxy import builtins
from mitmproxy import controller
@ -95,13 +95,13 @@ class WebState(flow.State):
class Options(options.Options):
def __init__(
self,
intercept=None, # type: Optional[str]
wdebug=bool, # type: bool
wport=8081, # type: int
wiface="127.0.0.1", # type: str
wauthenticator=None, # type: Optional[authentication.PassMan]
wsingleuser=None, # type: Optional[str]
whtpasswd=None, # type: Optional[str]
intercept: Optional[str] = None,
wdebug: bool = bool,
wport: int = 8081,
wiface: str = "127.0.0.1",
wauthenticator: Optional[authentication.PassMan] = None,
wsingleuser: Optional[str] = None,
whtpasswd: Optional[str] = None,
**kwargs
):
self.wdebug = wdebug

View File

@ -11,7 +11,7 @@ import gzip
import zlib
import brotli
from typing import Union # noqa
from typing import Union
# We have a shared single-element cache for encoding and decoding.
@ -22,8 +22,7 @@ CachedDecode = collections.namedtuple("CachedDecode", "encoded encoding errors d
_cache = CachedDecode(None, None, None, None)
def decode(encoded, encoding, errors='strict'):
# type: (Union[str, bytes], str, str) -> Union[str, bytes]
def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]:
"""
Decode the given input object
@ -64,8 +63,7 @@ def decode(encoded, encoding, errors='strict'):
))
def encode(decoded, encoding, errors='strict'):
# type: (Union[str, bytes], str, str) -> Union[str, bytes]
def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]:
"""
Encode the given input object

View File

@ -2,6 +2,7 @@ from __future__ import absolute_import, print_function, division
import re
import warnings
from typing import Optional
from netlib import encoding, strutils, basetypes
from netlib.http import headers
@ -77,8 +78,7 @@ class Message(basetypes.Serializable):
self.data.headers = h
@property
def raw_content(self):
# type: () -> bytes
def raw_content(self) -> bytes:
"""
The raw (encoded) HTTP message body
@ -90,8 +90,7 @@ class Message(basetypes.Serializable):
def raw_content(self, content):
self.data.content = content
def get_content(self, strict=True):
# type: (bool) -> bytes
def get_content(self, strict: bool=True) -> bytes:
"""
The HTTP message body decoded with the content-encoding header (e.g. gzip)
@ -168,14 +167,12 @@ class Message(basetypes.Serializable):
def timestamp_end(self, timestamp_end):
self.data.timestamp_end = timestamp_end
def _get_content_type_charset(self):
# type: () -> Optional[str]
def _get_content_type_charset(self) -> Optional[str]:
ct = headers.parse_content_type(self.headers.get("content-type", ""))
if ct:
return ct[2].get("charset")
def _guess_encoding(self):
# type: () -> str
def _guess_encoding(self) -> str:
enc = self._get_content_type_charset()
if enc:
return enc
@ -186,8 +183,7 @@ class Message(basetypes.Serializable):
# We may also want to check for HTML meta tags here at some point.
return "latin-1"
def get_text(self, strict=True):
# type: (bool) -> str
def get_text(self, strict: bool=True) -> str:
"""
The HTTP message body decoded with both content-encoding header (e.g. gzip)
and content-type header charset.

View File

@ -248,8 +248,7 @@ class Request(message.Message):
return netlib.http.url.unparse(self.scheme, self.pretty_host, self.port, self.path)
@property
def query(self):
# type: () -> multidict.MultiDictView
def query(self) -> multidict.MultiDictView:
"""
The request query string as an :py:class:`~netlib.multidict.MultiDictView` object.
"""
@ -272,8 +271,7 @@ class Request(message.Message):
self._set_query(value)
@property
def cookies(self):
# type: () -> multidict.MultiDictView
def cookies(self) -> multidict.MultiDictView:
"""
The request cookies.

View File

@ -8,11 +8,11 @@ from netlib.http import cookies
from netlib.http import headers as nheaders
from netlib.http import message
from netlib.http import status_codes
from typing import AnyStr # noqa
from typing import Dict # noqa
from typing import Iterable # noqa
from typing import Tuple # noqa
from typing import Union # noqa
from typing import AnyStr
from typing import Dict
from typing import Iterable
from typing import Tuple
from typing import Union
class ResponseData(message.MessageData):
@ -69,9 +69,9 @@ class Response(message.Message):
@classmethod
def make(
cls,
status_code=200, # type: int
content=b"", # type: AnyStr
headers=() # type: Union[Dict[AnyStr, AnyStr], Iterable[Tuple[bytes, bytes]]]
status_code: int=200,
content: AnyStr=b"",
headers: Union[Dict[AnyStr, AnyStr], Iterable[Tuple[bytes, bytes]]]=()
):
"""
Simplified API for creating response objects.
@ -130,8 +130,7 @@ class Response(message.Message):
self.data.reason = message._always_bytes(reason)
@property
def cookies(self):
# type: () -> multidict.MultiDictView
def cookies(self) -> multidict.MultiDictView:
"""
The response cookies. A possibly empty
:py:class:`~netlib.multidict.MultiDictView`, where the keys are cookie

View File

@ -1,4 +1,6 @@
import urllib
from typing import Sequence
from typing import Tuple
from netlib import utils
@ -80,8 +82,7 @@ def unparse(scheme, host, port, path=""):
return "%s://%s%s" % (scheme, hostport(scheme, host, port), path)
def encode(s):
# type: Sequence[Tuple[str,str]] -> str
def encode(s: Sequence[Tuple[str, str]]) -> str:
"""
Takes a list of (key, value) tuples and returns a urlencoded string.
"""
@ -95,23 +96,21 @@ def decode(s):
return urllib.parse.parse_qsl(s, keep_blank_values=True, errors='surrogateescape')
def quote(b, safe="/"):
def quote(b: str, safe: str="/") -> str:
"""
Returns:
An ascii-encodable str.
"""
# type: (str) -> str
return urllib.parse.quote(b, safe=safe, errors="surrogateescape")
def unquote(s):
def unquote(s: str) -> str:
"""
Args:
s: A surrogate-escaped str
Returns:
A surrogate-escaped str
"""
# type: (str) -> str
return urllib.parse.unquote(s, errors="surrogateescape")

View File

@ -43,14 +43,13 @@ _control_char_trans = str.maketrans(_control_char_trans)
_control_char_trans_newline = str.maketrans(_control_char_trans_newline)
def escape_control_characters(text, keep_spacing=True):
def escape_control_characters(text: str, keep_spacing=True) -> str:
"""
Replace all unicode C1 control characters from the given text with a single "."
Args:
keep_spacing: If True, tabs and newlines will not be replaced.
"""
# type: (str) -> str
if not isinstance(text, str):
raise ValueError("text type must be unicode but is {}".format(type(text).__name__))
@ -101,8 +100,7 @@ def escaped_str_to_bytes(data):
return codecs.escape_decode(data)[0]
def is_mostly_bin(s):
# type: (bytes) -> bool
def is_mostly_bin(s: bytes) -> bool:
if not s or len(s) == 0:
return False
@ -112,8 +110,7 @@ def is_mostly_bin(s):
) / len(s[:100]) > 0.3
def is_xml(s):
# type: (bytes) -> bool
def is_xml(s: bytes) -> bool:
return s.strip().startswith(b"<")

View File

@ -79,8 +79,7 @@ class Data(object):
_label_valid = re.compile(b"(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
def is_valid_host(host):
# type: (bytes) -> bool
def is_valid_host(host: bytes) -> bool:
"""
Checks if a hostname is valid.
"""