Move all HTTP objects to flow.py

That's Request, Response, ClientConnect, ClientDisconnect, Error, and Headers.
This commit is contained in:
Aldo Cortesi 2011-08-03 22:38:23 +12:00
parent cbd8d09849
commit 57c653be5f
11 changed files with 975 additions and 963 deletions

View File

@ -17,7 +17,7 @@ import mailcap, mimetypes, tempfile, os, subprocess, glob, time
import os.path, sys
import cStringIO
import urwid
import controller, utils, filt, proxy, flow, encoding
import controller, utils, filt, flow, encoding
VIEW_CUTOFF = 1024*100
EVENTLOG_SIZE = 500
@ -460,7 +460,7 @@ class ConnectionView(WWrap):
conn.content = self._spawn_editor(conn.content or "")
elif part == "h":
headertext = self._spawn_editor(repr(conn.headers))
headers = utils.Headers()
headers = flow.Headers()
fp = cStringIO.StringIO(headertext)
headers.read(fp)
conn.headers = headers
@ -474,7 +474,7 @@ class ConnectionView(WWrap):
self.master.prompt_edit("Message", conn.msg, self.set_resp_msg)
elif part == "r" and self.state.view_flow_mode == VIEW_FLOW_REQUEST:
if not conn.acked:
response = proxy.Response(conn, "200", "OK", utils.Headers(), "")
response = flow.Response(conn, "200", "OK", flow.Headers(), "")
conn.ack(response)
self.view_response()
self.master.refresh_connection(self.flow)

View File

@ -2,8 +2,10 @@
This module provides more sophisticated flow tracking. These match requests
with their responses, and provide filtering and interception facilities.
"""
import subprocess, sys, json, hashlib, Cookie, cookielib
import proxy, threading, netstring, filt, script
import subprocess, sys, json, hashlib, Cookie, cookielib, base64, copy, re
import time
import netstring, filt, script, utils, encoding, proxy
from email.utils import parsedate_tz, formatdate, mktime_tz
import controller, version
class RunException(Exception):
@ -13,22 +15,542 @@ class RunException(Exception):
self.errout = errout
# begin nocover
class RequestReplayThread(threading.Thread):
def __init__(self, flow, masterq):
self.flow, self.masterq = flow, masterq
threading.Thread.__init__(self)
class Headers:
def __init__(self, lst=None):
if lst:
self.lst = lst
else:
self.lst = []
def _kconv(self, s):
return s.lower()
def __eq__(self, other):
return self.lst == other.lst
def __getitem__(self, k):
ret = []
k = self._kconv(k)
for i in self.lst:
if self._kconv(i[0]) == k:
ret.append(i[1])
return ret
def _filter_lst(self, k, lst):
new = []
for i in lst:
if self._kconv(i[0]) != k:
new.append(i)
return new
def __setitem__(self, k, hdrs):
k = self._kconv(k)
new = self._filter_lst(k, self.lst)
for i in hdrs:
new.append((k, i))
self.lst = new
def __delitem__(self, k):
self.lst = self._filter_lst(k, self.lst)
def __contains__(self, k):
for i in self.lst:
if self._kconv(i[0]) == k:
return True
return False
def add(self, key, value):
self.lst.append([key, str(value)])
def get_state(self):
return [tuple(i) for i in self.lst]
@classmethod
def from_state(klass, state):
return klass([list(i) for i in state])
def copy(self):
lst = copy.deepcopy(self.lst)
return Headers(lst)
def __repr__(self):
"""
Returns a string containing a formatted header string.
"""
headerElements = []
for itm in self.lst:
headerElements.append(itm[0] + ": " + itm[1])
headerElements.append("")
return "\r\n".join(headerElements)
def match_re(self, expr):
"""
Match the regular expression against each header (key, value) pair.
"""
for k, v in self.lst:
s = "%s: %s"%(k, v)
if re.search(expr, s):
return True
return False
def read(self, fp):
"""
Read a set of headers from a file pointer. Stop once a blank line
is reached.
"""
ret = []
name = ''
while 1:
line = fp.readline()
if not line or line == '\r\n' or line == '\n':
break
if line[0] in ' \t':
# continued header
ret[-1][1] = ret[-1][1] + '\r\n ' + line.strip()
else:
i = line.find(':')
# We're being liberal in what we accept, here.
if i > 0:
name = line[:i]
value = line[i+1:].strip()
ret.append([name, value])
self.lst = ret
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both header keys
and values. Returns the number of replacements made.
"""
nlst, count = [], 0
for i in self.lst:
k, c = re.subn(pattern, repl, i[0], *args, **kwargs)
count += c
v, c = re.subn(pattern, repl, i[1], *args, **kwargs)
count += c
nlst.append([k, v])
self.lst = nlst
return count
class HTTPMsg(controller.Msg):
def decode(self):
"""
Alters Response object, decoding its content based on the current
Content-Encoding header and changing Content-Encoding header to
'identity'.
"""
ce = self.headers["content-encoding"]
if not ce or ce[0] not in encoding.ENCODINGS:
return
self.content = encoding.decode(
ce[0],
self.content
)
del self.headers["content-encoding"]
def encode(self, e):
"""
Alters Response object, encoding its content with the specified
coding. This method should only be called on Responses with
Content-Encoding headers of 'identity'.
"""
self.content = encoding.encode(e, self.content)
self.headers["content-encoding"] = [e]
class Request(HTTPMsg):
FMT = '%s %s HTTP/1.1\r\n%s\r\n%s'
FMT_PROXY = '%s %s://%s:%s%s HTTP/1.1\r\n%s\r\n%s'
def __init__(self, client_conn, host, port, scheme, method, path, headers, content, timestamp=None):
self.client_conn = client_conn
self.host, self.port, self.scheme = host, port, scheme
self.method, self.path, self.headers, self.content = method, path, headers, content
self.timestamp = timestamp or utils.timestamp()
self.close = False
controller.Msg.__init__(self)
# Have this request's cookies been modified by sticky cookies or auth?
self.stickycookie = False
self.stickyauth = False
def anticache(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
del self.headers[i]
def anticomp(self):
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = ["identity"]
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
if self.headers["accept-encoding"]:
self.headers["accept-encoding"] = [', '.join([
e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0]
])]
def set_replay(self):
self.client_conn = None
def is_replay(self):
if self.client_conn:
return False
else:
return True
def load_state(self, state):
if state["client_conn"]:
if self.client_conn:
self.client_conn.load_state(state["client_conn"])
else:
self.client_conn = ClientConnect.from_state(state["client_conn"])
else:
self.client_conn = None
self.host = state["host"]
self.port = state["port"]
self.scheme = state["scheme"]
self.method = state["method"]
self.path = state["path"]
self.headers = Headers.from_state(state["headers"])
self.content = base64.decodestring(state["content"])
self.timestamp = state["timestamp"]
def get_state(self):
return dict(
client_conn = self.client_conn.get_state() if self.client_conn else None,
host = self.host,
port = self.port,
scheme = self.scheme,
method = self.method,
path = self.path,
headers = self.headers.get_state(),
content = base64.encodestring(self.content),
timestamp = self.timestamp,
)
@classmethod
def from_state(klass, state):
return klass(
ClientConnect.from_state(state["client_conn"]),
str(state["host"]),
state["port"],
str(state["scheme"]),
str(state["method"]),
str(state["path"]),
Headers.from_state(state["headers"]),
base64.decodestring(state["content"]),
state["timestamp"]
)
def __hash__(self):
return id(self)
def __eq__(self, other):
return self.get_state() == other.get_state()
def copy(self):
c = copy.copy(self)
c.headers = self.headers.copy()
return c
def hostport(self):
if (self.port, self.scheme) in [(80, "http"), (443, "https")]:
host = self.host
else:
host = "%s:%s"%(self.host, self.port)
return host
def url(self):
return "%s://%s%s"%(self.scheme, self.hostport(), self.path)
def set_url(self, url):
parts = utils.parse_url(url)
if not parts:
return False
self.scheme, self.host, self.port, self.path = parts
return True
def is_response(self):
return False
def assemble(self, _proxy = False):
"""
Assembles the request for transmission to the server. We make some
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.del_all(
headers,
[
'proxy-connection',
'keep-alive',
'connection',
'content-length',
'transfer-encoding'
]
)
if not 'host' in headers:
headers["host"] = [self.hostport()]
content = self.content
if content is not None:
headers["content-length"] = [str(len(content))]
else:
content = ""
if self.close:
headers["connection"] = ["close"]
if not _proxy:
return self.FMT % (self.method, self.path, str(headers), content)
else:
return self.FMT_PROXY % (self.method, self.scheme, self.host, self.port, self.path, str(headers), content)
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, *args, **kwargs)
self.path, pc = re.subn(pattern, repl, self.path, *args, **kwargs)
c += pc
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
class Response(HTTPMsg):
FMT = '%s\r\n%s\r\n%s'
def __init__(self, request, code, msg, headers, content, timestamp=None):
self.request = request
self.code, self.msg = code, msg
self.headers, self.content = headers, content
self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self)
self.replay = False
def _refresh_cookie(self, c, delta):
"""
Takes a cookie string c and a time delta in seconds, and returns
a refreshed cookie string.
"""
c = Cookie.SimpleCookie(str(c))
for i in c.values():
if "expires" in i:
d = parsedate_tz(i["expires"])
if d:
d = mktime_tz(d) + delta
i["expires"] = formatdate(d)
else:
# This can happen when the expires tag is invalid.
# reddit.com sends a an expires tag like this: "Thu, 31 Dec
# 2037 23:59:59 GMT", which is valid RFC 1123, but not
# strictly correct according tot he cookie spec. Browsers
# appear to parse this tolerantly - maybe we should too.
# For now, we just ignore this.
del i["expires"]
return c.output(header="").strip()
def refresh(self, now=None):
"""
This fairly complex and heuristic function refreshes a server
response for replay.
- It adjusts date, expires and last-modified headers.
- It adjusts cookie expiration.
"""
if not now:
now = time.time()
delta = now - self.timestamp
refresh_headers = [
"date",
"expires",
"last-modified",
]
for i in refresh_headers:
if i in self.headers:
d = parsedate_tz(self.headers[i][0])
if d:
new = mktime_tz(d) + delta
self.headers[i] = [formatdate(new)]
c = []
for i in self.headers["set-cookie"]:
c.append(self._refresh_cookie(i, delta))
if c:
self.headers["set-cookie"] = c
def set_replay(self):
self.replay = True
def is_replay(self):
return self.replay
def load_state(self, state):
self.code = state["code"]
self.msg = state["msg"]
self.headers = Headers.from_state(state["headers"])
self.content = base64.decodestring(state["content"])
self.timestamp = state["timestamp"]
def get_state(self):
return dict(
code = self.code,
msg = self.msg,
headers = self.headers.get_state(),
timestamp = self.timestamp,
content = base64.encodestring(self.content)
)
@classmethod
def from_state(klass, request, state):
return klass(
request,
state["code"],
str(state["msg"]),
Headers.from_state(state["headers"]),
base64.decodestring(state["content"]),
state["timestamp"],
)
def __eq__(self, other):
return self.get_state() == other.get_state()
def copy(self):
c = copy.copy(self)
c.headers = self.headers.copy()
return c
def is_response(self):
return True
def assemble(self):
"""
Assembles the response for transmission to the client. We make some
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.del_all(
headers,
['proxy-connection', 'connection', 'keep-alive', 'transfer-encoding']
)
content = self.content
if content is not None:
headers["content-length"] = [str(len(content))]
else:
content = ""
if self.request.client_conn.close:
headers["connection"] = ["close"]
proto = "HTTP/1.1 %s %s"%(self.code, str(self.msg))
data = (proto, str(headers), content)
return self.FMT%data
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the response. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, *args, **kwargs)
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
class ClientDisconnect(controller.Msg):
def __init__(self, client_conn):
controller.Msg.__init__(self)
self.client_conn = client_conn
class ClientConnect(controller.Msg):
def __init__(self, address):
"""
address is an (address, port) tuple, or None if this connection has
been replayed from within mitmproxy.
"""
self.address = address
self.close = False
self.requestcount = 0
self.connection_error = None
controller.Msg.__init__(self)
def __eq__(self, other):
return self.get_state() == other.get_state()
def load_state(self, state):
self.address = state
def get_state(self):
return list(self.address) if self.address else None
@classmethod
def from_state(klass, state):
if state:
return klass(state)
else:
return None
def copy(self):
return copy.copy(self)
class Error(controller.Msg):
def __init__(self, request, msg, timestamp=None):
self.request, self.msg = request, msg
self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self)
def load_state(self, state):
self.msg = state["msg"]
self.timestamp = state["timestamp"]
def copy(self):
return copy.copy(self)
def get_state(self):
return dict(
msg = self.msg,
timestamp = self.timestamp,
)
@classmethod
def from_state(klass, state):
return klass(
None,
state["msg"],
state["timestamp"],
)
def __eq__(self, other):
return self.get_state() == other.get_state()
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.msg, c = re.subn(pattern, repl, self.msg, *args, **kwargs)
return c
def run(self):
try:
server = proxy.ServerConnection(self.flow.request)
server.send_request(self.flow.request)
response = server.read_response()
response.send(self.masterq)
except proxy.ProxyError, v:
err = proxy.Error(self.flow.request, v.msg)
err.send(self.masterq)
# end nocover
class ClientPlaybackState:
@ -217,13 +739,13 @@ class Flow:
if self.request:
self.request.load_state(state["request"])
else:
self.request = proxy.Request.from_state(state["request"])
self.request = Request.from_state(state["request"])
if state["response"]:
if self.response:
self.response.load_state(state["response"])
else:
self.response = proxy.Response.from_state(self.request, state["response"])
self.response = Response.from_state(self.request, state["response"])
else:
self.response = None
@ -231,7 +753,7 @@ class Flow:
if self.error:
self.error.load_state(state["error"])
else:
self.error = proxy.Error.from_state(state["error"])
self.error = Error.from_state(state["error"])
else:
self.error = None
@ -261,7 +783,7 @@ class Flow:
return True
def kill(self, master):
self.error = proxy.Error(self.request, "Connection killed")
self.error = Error(self.request, "Connection killed")
if self.request and not self.request.acked:
self.request.ack(None)
elif self.response and not self.response.acked:
@ -519,7 +1041,7 @@ class FlowMaster(controller.Master):
rflow = self.server_playback.next_flow(flow)
if not rflow:
return None
response = proxy.Response.from_state(flow.request, rflow.response.get_state())
response = Response.from_state(flow.request, rflow.response.get_state())
response.set_replay()
flow.response = response
if self.refresh_server_playback:
@ -594,7 +1116,7 @@ class FlowMaster(controller.Master):
f.response = None
f.error = None
self.process_new_request(f)
rt = RequestReplayThread(f, self.masterq)
rt = proxy.RequestReplayThread(f, self.masterq)
rt.start()
#end nocover

View File

@ -5,11 +5,10 @@
Development started from Neil Schemenauer's munchy.py
"""
import sys, os, string, socket, urlparse, re, select, copy, base64, time, Cookie
from email.utils import parsedate_tz, formatdate, mktime_tz
import shutil, tempfile
import sys, os, string, socket, select, time, Cookie
import shutil, tempfile, threading
import optparse, SocketServer, ssl
import utils, controller, encoding
import utils, controller, flow
NAME = "mitmproxy"
@ -70,28 +69,6 @@ def read_http_body(rfile, connection, headers, all):
return content
def parse_url(url):
"""
Returns a (scheme, host, port, path) tuple, or None on error.
"""
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
if not scheme:
return None
if ':' in netloc:
host, port = string.split(netloc, ':')
port = int(port)
else:
host = netloc
if scheme == "https":
port = 443
else:
port = 80
path = urlparse.urlunparse(('', '', path, params, query, fragment))
if not path.startswith("/"):
path = "/" + path
return scheme, host, port, path
def parse_request_line(request):
"""
Parse a proxy request line. Return (method, scheme, host, port, path, minor).
@ -113,7 +90,7 @@ def parse_request_line(request):
if url.startswith("/") or url == "*":
scheme, port, host, path = None, None, None, url
else:
parts = parse_url(url)
parts = utils.parse_url(url)
if not parts:
raise ProxyError(400, "Invalid url: %s"%url)
scheme, host, port, path = parts
@ -127,416 +104,6 @@ def parse_request_line(request):
return method, scheme, host, port, path, minor
class HTTPMsg(controller.Msg):
def decode(self):
"""
Alters Response object, decoding its content based on the current
Content-Encoding header and changing Content-Encoding header to
'identity'.
"""
ce = self.headers["content-encoding"]
if not ce or ce[0] not in encoding.ENCODINGS:
return
self.content = encoding.decode(
ce[0],
self.content
)
del self.headers["content-encoding"]
def encode(self, e):
"""
Alters Response object, encoding its content with the specified
coding. This method should only be called on Responses with
Content-Encoding headers of 'identity'.
"""
self.content = encoding.encode(e, self.content)
self.headers["content-encoding"] = [e]
class Request(HTTPMsg):
FMT = '%s %s HTTP/1.1\r\n%s\r\n%s'
FMT_PROXY = '%s %s://%s:%s%s HTTP/1.1\r\n%s\r\n%s'
def __init__(self, client_conn, host, port, scheme, method, path, headers, content, timestamp=None):
self.client_conn = client_conn
self.host, self.port, self.scheme = host, port, scheme
self.method, self.path, self.headers, self.content = method, path, headers, content
self.timestamp = timestamp or utils.timestamp()
self.close = False
controller.Msg.__init__(self)
# Have this request's cookies been modified by sticky cookies or auth?
self.stickycookie = False
self.stickyauth = False
def anticache(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
del self.headers[i]
def anticomp(self):
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = ["identity"]
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
if self.headers["accept-encoding"]:
self.headers["accept-encoding"] = [', '.join([
e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0]
])]
def set_replay(self):
self.client_conn = None
def is_replay(self):
if self.client_conn:
return False
else:
return True
def load_state(self, state):
if state["client_conn"]:
if self.client_conn:
self.client_conn.load_state(state["client_conn"])
else:
self.client_conn = ClientConnect.from_state(state["client_conn"])
else:
self.client_conn = None
self.host = state["host"]
self.port = state["port"]
self.scheme = state["scheme"]
self.method = state["method"]
self.path = state["path"]
self.headers = utils.Headers.from_state(state["headers"])
self.content = base64.decodestring(state["content"])
self.timestamp = state["timestamp"]
def get_state(self):
return dict(
client_conn = self.client_conn.get_state() if self.client_conn else None,
host = self.host,
port = self.port,
scheme = self.scheme,
method = self.method,
path = self.path,
headers = self.headers.get_state(),
content = base64.encodestring(self.content),
timestamp = self.timestamp,
)
@classmethod
def from_state(klass, state):
return klass(
ClientConnect.from_state(state["client_conn"]),
str(state["host"]),
state["port"],
str(state["scheme"]),
str(state["method"]),
str(state["path"]),
utils.Headers.from_state(state["headers"]),
base64.decodestring(state["content"]),
state["timestamp"]
)
def __hash__(self):
return id(self)
def __eq__(self, other):
return self.get_state() == other.get_state()
def copy(self):
c = copy.copy(self)
c.headers = self.headers.copy()
return c
def hostport(self):
if (self.port, self.scheme) in [(80, "http"), (443, "https")]:
host = self.host
else:
host = "%s:%s"%(self.host, self.port)
return host
def url(self):
return "%s://%s%s"%(self.scheme, self.hostport(), self.path)
def set_url(self, url):
parts = parse_url(url)
if not parts:
return False
self.scheme, self.host, self.port, self.path = parts
return True
def is_response(self):
return False
def assemble(self, _proxy = False):
"""
Assembles the request for transmission to the server. We make some
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.del_all(
headers,
[
'proxy-connection',
'keep-alive',
'connection',
'content-length',
'transfer-encoding'
]
)
if not 'host' in headers:
headers["host"] = [self.hostport()]
content = self.content
if content is not None:
headers["content-length"] = [str(len(content))]
else:
content = ""
if self.close:
headers["connection"] = ["close"]
if not _proxy:
return self.FMT % (self.method, self.path, str(headers), content)
else:
return self.FMT_PROXY % (self.method, self.scheme, self.host, self.port, self.path, str(headers), content)
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, *args, **kwargs)
self.path, pc = re.subn(pattern, repl, self.path, *args, **kwargs)
c += pc
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
class Response(HTTPMsg):
FMT = '%s\r\n%s\r\n%s'
def __init__(self, request, code, msg, headers, content, timestamp=None):
self.request = request
self.code, self.msg = code, msg
self.headers, self.content = headers, content
self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self)
self.replay = False
def _refresh_cookie(self, c, delta):
"""
Takes a cookie string c and a time delta in seconds, and returns
a refreshed cookie string.
"""
c = Cookie.SimpleCookie(str(c))
for i in c.values():
if "expires" in i:
d = parsedate_tz(i["expires"])
if d:
d = mktime_tz(d) + delta
i["expires"] = formatdate(d)
else:
# This can happen when the expires tag is invalid.
# reddit.com sends a an expires tag like this: "Thu, 31 Dec
# 2037 23:59:59 GMT", which is valid RFC 1123, but not
# strictly correct according tot he cookie spec. Browsers
# appear to parse this tolerantly - maybe we should too.
# For now, we just ignore this.
del i["expires"]
return c.output(header="").strip()
def refresh(self, now=None):
"""
This fairly complex and heuristic function refreshes a server
response for replay.
- It adjusts date, expires and last-modified headers.
- It adjusts cookie expiration.
"""
if not now:
now = time.time()
delta = now - self.timestamp
refresh_headers = [
"date",
"expires",
"last-modified",
]
for i in refresh_headers:
if i in self.headers:
d = parsedate_tz(self.headers[i][0])
if d:
new = mktime_tz(d) + delta
self.headers[i] = [formatdate(new)]
c = []
for i in self.headers["set-cookie"]:
c.append(self._refresh_cookie(i, delta))
if c:
self.headers["set-cookie"] = c
def set_replay(self):
self.replay = True
def is_replay(self):
return self.replay
def load_state(self, state):
self.code = state["code"]
self.msg = state["msg"]
self.headers = utils.Headers.from_state(state["headers"])
self.content = base64.decodestring(state["content"])
self.timestamp = state["timestamp"]
def get_state(self):
return dict(
code = self.code,
msg = self.msg,
headers = self.headers.get_state(),
timestamp = self.timestamp,
content = base64.encodestring(self.content)
)
@classmethod
def from_state(klass, request, state):
return klass(
request,
state["code"],
str(state["msg"]),
utils.Headers.from_state(state["headers"]),
base64.decodestring(state["content"]),
state["timestamp"],
)
def __eq__(self, other):
return self.get_state() == other.get_state()
def copy(self):
c = copy.copy(self)
c.headers = self.headers.copy()
return c
def is_response(self):
return True
def assemble(self):
"""
Assembles the response for transmission to the client. We make some
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.del_all(
headers,
['proxy-connection', 'connection', 'keep-alive', 'transfer-encoding']
)
content = self.content
if content is not None:
headers["content-length"] = [str(len(content))]
else:
content = ""
if self.request.client_conn.close:
headers["connection"] = ["close"]
proto = "HTTP/1.1 %s %s"%(self.code, str(self.msg))
data = (proto, str(headers), content)
return self.FMT%data
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the response. Returns the number of replacements
made.
"""
self.content, c = re.subn(pattern, repl, self.content, *args, **kwargs)
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
class ClientDisconnect(controller.Msg):
def __init__(self, client_conn):
controller.Msg.__init__(self)
self.client_conn = client_conn
class ClientConnect(controller.Msg):
def __init__(self, address):
"""
address is an (address, port) tuple, or None if this connection has
been replayed from within mitmproxy.
"""
self.address = address
self.close = False
self.requestcount = 0
self.connection_error = None
controller.Msg.__init__(self)
def __eq__(self, other):
return self.get_state() == other.get_state()
def load_state(self, state):
self.address = state
def get_state(self):
return list(self.address) if self.address else None
@classmethod
def from_state(klass, state):
if state:
return klass(state)
else:
return None
def copy(self):
return copy.copy(self)
class Error(controller.Msg):
def __init__(self, request, msg, timestamp=None):
self.request, self.msg = request, msg
self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self)
def load_state(self, state):
self.msg = state["msg"]
self.timestamp = state["timestamp"]
def copy(self):
return copy.copy(self)
def get_state(self):
return dict(
msg = self.msg,
timestamp = self.timestamp,
)
@classmethod
def from_state(klass, state):
return klass(
None,
state["msg"],
state["timestamp"],
)
def __eq__(self, other):
return self.get_state() == other.get_state()
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the request. Returns the number of replacements
made.
"""
self.msg, c = re.subn(pattern, repl, self.msg, *args, **kwargs)
return c
class FileLike:
def __init__(self, o):
@ -574,6 +141,21 @@ class FileLike:
#begin nocover
class RequestReplayThread(threading.Thread):
def __init__(self, flow, masterq):
self.flow, self.masterq = flow, masterq
threading.Thread.__init__(self)
def run(self):
try:
server = ServerConnection(self.flow.request)
server.send_request(self.flow.request)
response = server.read_response()
response.send(self.masterq)
except ProxyError, v:
err = flow.Error(self.flow.request, v.msg)
err.send(self.masterq)
class ServerConnection:
def __init__(self, request):
@ -616,7 +198,7 @@ class ServerConnection:
raise ProxyError(502, "Invalid server response: %s."%line)
proto, code, msg = parts
code = int(code)
headers = utils.Headers()
headers = flow.Headers()
headers.read(self.rfile)
if code >= 100 and code <= 199:
return self.read_response()
@ -624,7 +206,7 @@ class ServerConnection:
content = ""
else:
content = read_http_body(self.rfile, self, headers, True)
return Response(self.request, code, msg, headers, content)
return flow.Response(self.request, code, msg, headers, content)
def terminate(self):
try:
@ -642,11 +224,11 @@ class ProxyHandler(SocketServer.StreamRequestHandler):
SocketServer.StreamRequestHandler.__init__(self, request, client_address, server)
def handle(self):
cc = ClientConnect(self.client_address)
cc = flow.ClientConnect(self.client_address)
cc.send(self.mqueue)
while not cc.close:
self.handle_request(cc)
cd = ClientDisconnect(cc)
cd = flow.ClientDisconnect(cc)
cd.send(self.mqueue)
self.finish()
@ -691,7 +273,7 @@ class ProxyHandler(SocketServer.StreamRequestHandler):
cc.close = True
cc.connection_error = "%s: %s"%(e.code, e.msg)
if request:
err = Error(request, e.msg)
err = flow.Error(request, e.msg)
err.send(self.mqueue)
self.send_error(e.code, e.msg)
if server:
@ -742,7 +324,7 @@ class ProxyHandler(SocketServer.StreamRequestHandler):
method, scheme, host, port, path, httpminor = parse_request_line(self.rfile.readline())
if scheme is None:
scheme = "https"
headers = utils.Headers()
headers = flow.Headers()
headers.read(self.rfile)
if host is None and "host" in headers:
netloc = headers["host"][0]
@ -779,7 +361,7 @@ class ProxyHandler(SocketServer.StreamRequestHandler):
if value == "keep-alive":
client_conn.close = False
content = read_http_body(self.rfile, client_conn, headers, False)
return Request(client_conn, host, port, scheme, method, path, headers, content)
return flow.Request(client_conn, host, port, scheme, method, path, headers, content)
def send_response(self, response):
self.wfile.write(response.assemble())

View File

@ -12,8 +12,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re, os, subprocess, datetime, textwrap
import time, functools, copy, cgi
import re, os, subprocess, datetime, urlparse, string
import time, functools, copy, cgi, textwrap
import json
CERT_SLEEP_TIME = 1
@ -151,122 +151,6 @@ def del_all(dict, keys):
del dict[key]
class Headers:
def __init__(self, lst=None):
if lst:
self.lst = lst
else:
self.lst = []
def _kconv(self, s):
return s.lower()
def __eq__(self, other):
return self.lst == other.lst
def __getitem__(self, k):
ret = []
k = self._kconv(k)
for i in self.lst:
if self._kconv(i[0]) == k:
ret.append(i[1])
return ret
def _filter_lst(self, k, lst):
new = []
for i in lst:
if self._kconv(i[0]) != k:
new.append(i)
return new
def __setitem__(self, k, hdrs):
k = self._kconv(k)
new = self._filter_lst(k, self.lst)
for i in hdrs:
new.append((k, i))
self.lst = new
def __delitem__(self, k):
self.lst = self._filter_lst(k, self.lst)
def __contains__(self, k):
for i in self.lst:
if self._kconv(i[0]) == k:
return True
return False
def add(self, key, value):
self.lst.append([key, str(value)])
def get_state(self):
return [tuple(i) for i in self.lst]
@classmethod
def from_state(klass, state):
return klass([list(i) for i in state])
def copy(self):
lst = copy.deepcopy(self.lst)
return Headers(lst)
def __repr__(self):
"""
Returns a string containing a formatted header string.
"""
headerElements = []
for itm in self.lst:
headerElements.append(itm[0] + ": " + itm[1])
headerElements.append("")
return "\r\n".join(headerElements)
def match_re(self, expr):
"""
Match the regular expression against each header (key, value) pair.
"""
for k, v in self.lst:
s = "%s: %s"%(k, v)
if re.search(expr, s):
return True
return False
def read(self, fp):
"""
Read a set of headers from a file pointer. Stop once a blank line
is reached.
"""
ret = []
name = ''
while 1:
line = fp.readline()
if not line or line == '\r\n' or line == '\n':
break
if line[0] in ' \t':
# continued header
ret[-1][1] = ret[-1][1] + '\r\n ' + line.strip()
else:
i = line.find(':')
# We're being liberal in what we accept, here.
if i > 0:
name = line[:i]
value = line[i+1:].strip()
ret.append([name, value])
self.lst = ret
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both header keys
and values. Returns the number of replacements made.
"""
nlst, count = [], 0
for i in self.lst:
k, c = re.subn(pattern, repl, i[0], *args, **kwargs)
count += c
v, c = re.subn(pattern, repl, i[1], *args, **kwargs)
count += c
nlst.append([k, v])
self.lst = nlst
return count
def pretty_size(size):
suffixes = [
@ -499,3 +383,27 @@ class LRUCache:
cache.pop(d)
return ret
return wrap
def parse_url(url):
"""
Returns a (scheme, host, port, path) tuple, or None on error.
"""
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
if not scheme:
return None
if ':' in netloc:
host, port = string.split(netloc, ':')
port = int(port)
else:
host = netloc
if scheme == "https":
port = 443
else:
port = 80
path = urlparse.urlunparse(('', '', path, params, query, fragment))
if not path.startswith("/"):
path = "/" + path
return scheme, host, port, path

View File

@ -1,4 +1,4 @@
from libmproxy import console, proxy, filt, flow
from libmproxy import console, filt, flow
import tutils
import libpry
@ -114,7 +114,7 @@ class uformat_flow(libpry.AutoTree):
assert ('text', ' text/html') in console.format_flow(f, True, True)
f.response =None
f.error = proxy.Error(f.request, "error")
f.error = flow.Error(f.request, "error")
assert ('error', 'error') in console.format_flow(f, True, True)

View File

@ -28,7 +28,7 @@ class uDumpMaster(libpry.AutoTree):
m.handle_clientconnect(cc)
m.handle_request(req)
m.handle_response(resp)
m.handle_clientdisconnect(proxy.ClientDisconnect(cc))
m.handle_clientdisconnect(flow.ClientDisconnect(cc))
def _dummy_cycle(self, n, filt, content, **options):
cs = StringIO()

View File

@ -1,5 +1,5 @@
import cStringIO
from libmproxy import filt, proxy, utils
from libmproxy import filt, flow
import libpry
@ -72,10 +72,10 @@ class uParsing(libpry.AutoTree):
class uMatching(libpry.AutoTree):
def req(self):
conn = proxy.ClientConnect(("one", 2222))
headers = utils.Headers()
conn = flow.ClientConnect(("one", 2222))
headers = flow.Headers()
headers["header"] = ["qvalue"]
return proxy.Request(
return flow.Request(
conn,
"host",
80,
@ -88,9 +88,9 @@ class uMatching(libpry.AutoTree):
def resp(self):
q = self.req()
headers = utils.Headers()
headers = flow.Headers()
headers["header_response"] = ["svalue"]
return proxy.Response(
return flow.Response(
q,
200,
"message",

View File

@ -1,6 +1,7 @@
import Queue
import Queue, time, textwrap
from cStringIO import StringIO
from libmproxy import console, proxy, filt, flow, controller
import email.utils
from libmproxy import console, proxy, filt, flow, controller, utils
import tutils
import libpry
@ -156,12 +157,12 @@ class uFlow(libpry.AutoTree):
assert f.get_state() == flow.Flow.from_state(state).get_state()
f.response = None
f.error = proxy.Error(f.request, "error")
f.error = flow.Error(f.request, "error")
state = f.get_state()
assert f.get_state() == flow.Flow.from_state(state).get_state()
f2 = tutils.tflow()
f2.error = proxy.Error(f.request, "e2")
f2.error = flow.Error(f.request, "e2")
assert not f == f2
f.load_state(f2.get_state())
assert f.get_state() == f2.get_state()
@ -240,7 +241,7 @@ class uFlow(libpry.AutoTree):
class uState(libpry.AutoTree):
def test_backup(self):
bc = proxy.ClientConnect(("address", 22))
bc = flow.ClientConnect(("address", 22))
c = flow.State()
req = tutils.treq()
f = c.add_request(req)
@ -254,7 +255,7 @@ class uState(libpry.AutoTree):
connect -> request -> response
"""
bc = proxy.ClientConnect(("address", 22))
bc = flow.ClientConnect(("address", 22))
c = flow.State()
req = tutils.treq(bc)
@ -284,17 +285,17 @@ class uState(libpry.AutoTree):
assert c.add_response(resp)
assert c.active_flow_count() == 0
dc = proxy.ClientDisconnect(bc)
dc = flow.ClientDisconnect(bc)
def test_err(self):
bc = proxy.ClientConnect(("address", 22))
bc = flow.ClientConnect(("address", 22))
c = flow.State()
req = tutils.treq()
f = c.add_request(req)
e = proxy.Error(f.request, "message")
e = flow.Error(f.request, "message")
assert c.add_error(e)
e = proxy.Error(tutils.tflow().request, "message")
e = flow.Error(tutils.tflow().request, "message")
assert not c.add_error(e)
@ -348,7 +349,7 @@ class uState(libpry.AutoTree):
def _add_error(self, state):
req = tutils.treq()
f = state.add_request(req)
f.error = proxy.Error(f.request, "msg")
f.error = flow.Error(f.request, "msg")
def test_clear(self):
c = flow.State()
@ -451,10 +452,10 @@ class uFlowMaster(libpry.AutoTree):
resp = tutils.tresp(req)
fm.handle_response(resp)
assert fm.script.ns["log"][-1] == "response"
dc = proxy.ClientDisconnect(req.client_conn)
dc = flow.ClientDisconnect(req.client_conn)
fm.handle_clientdisconnect(dc)
assert fm.script.ns["log"][-1] == "clientdisconnect"
err = proxy.Error(f.request, "msg")
err = flow.Error(f.request, "msg")
fm.handle_error(err)
assert fm.script.ns["log"][-1] == "error"
@ -476,10 +477,10 @@ class uFlowMaster(libpry.AutoTree):
rx = tutils.tresp()
assert not fm.handle_response(rx)
dc = proxy.ClientDisconnect(req.client_conn)
dc = flow.ClientDisconnect(req.client_conn)
fm.handle_clientdisconnect(dc)
err = proxy.Error(f.request, "msg")
err = flow.Error(f.request, "msg")
fm.handle_error(err)
def test_client_playback(self):
@ -496,7 +497,7 @@ class uFlowMaster(libpry.AutoTree):
fm.tick(q)
assert fm.state.flow_count()
fm.handle_error(proxy.Error(f.request, "error"))
fm.handle_error(flow.Error(f.request, "error"))
def test_server_playback(self):
s = flow.State()
@ -564,6 +565,318 @@ class uFlowMaster(libpry.AutoTree):
fm.handle_request(f.request)
assert f.request.headers["authorization"] == ["foo"]
class uRequest(libpry.AutoTree):
def test_simple(self):
h = flow.Headers()
h["test"] = ["test"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, "host", 22, "https", "GET", "/", h, "content")
u = r.url()
assert r.set_url(u)
assert not r.set_url("")
assert r.url() == u
assert r.assemble()
r2 = r.copy()
assert r == r2
def test_anticache(self):
h = flow.Headers()
r = flow.Request(None, "host", 22, "https", "GET", "/", h, "content")
h["if-modified-since"] = ["test"]
h["if-none-match"] = ["test"]
r.anticache()
assert not "if-modified-since" in r.headers
assert not "if-none-match" in r.headers
def test_getset_state(self):
h = flow.Headers()
h["test"] = ["test"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, "host", 22, "https", "GET", "/", h, "content")
state = r.get_state()
assert flow.Request.from_state(state) == r
r.client_conn = None
state = r.get_state()
assert flow.Request.from_state(state) == r
r2 = flow.Request(c, "testing", 20, "http", "PUT", "/foo", h, "test")
assert not r == r2
r.load_state(r2.get_state())
assert r == r2
r2.client_conn = None
r.load_state(r2.get_state())
assert not r.client_conn
def test_replace(self):
r = tutils.treq()
r.path = "path/foo"
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 4
assert r.path == "path/boo"
assert not "foo" in r.content
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
r = tutils.treq()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
r = tutils.treq()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
r = tutils.treq()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
assert r.headers["content-encoding"] == ["gzip"]
assert r.content != "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
class uResponse(libpry.AutoTree):
def test_simple(self):
h = flow.Headers()
h["test"] = ["test"]
c = flow.ClientConnect(("addr", 2222))
req = flow.Request(c, "host", 22, "https", "GET", "/", h, "content")
resp = flow.Response(req, 200, "msg", h.copy(), "content")
assert resp.assemble()
resp2 = resp.copy()
assert resp2 == resp
def test_refresh(self):
r = tutils.tresp()
n = time.time()
r.headers["date"] = [email.utils.formatdate(n)]
pre = r.headers["date"]
r.refresh(n)
assert pre == r.headers["date"]
r.refresh(n+60)
d = email.utils.parsedate_tz(r.headers["date"][0])
d = email.utils.mktime_tz(d)
# Weird that this is not exact...
assert abs(60-(d-n)) <= 1
r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"]
r.refresh()
def test_refresh_cookie(self):
r = tutils.tresp()
# Invalid expires format, sent to us by Reddit.
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
assert r._refresh_cookie(c, 60)
c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
assert "00:21:38" in r._refresh_cookie(c, 60)
def test_getset_state(self):
h = flow.Headers()
h["test"] = ["test"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, "host", 22, "https", "GET", "/", h, "content")
req = flow.Request(c, "host", 22, "https", "GET", "/", h, "content")
resp = flow.Response(req, 200, "msg", h.copy(), "content")
state = resp.get_state()
assert flow.Response.from_state(req, state) == resp
resp2 = flow.Response(req, 220, "foo", h.copy(), "test")
assert not resp == resp2
resp.load_state(resp2.get_state())
assert resp == resp2
def test_replace(self):
r = tutils.tresp()
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
assert not "foo" in r.content
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
assert r.headers["content-encoding"] == ["gzip"]
assert r.content != "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
class uError(libpry.AutoTree):
def test_getset_state(self):
e = flow.Error(None, "Error")
state = e.get_state()
assert flow.Error.from_state(state) == e
assert e.copy()
e2 = flow.Error(None, "bar")
assert not e == e2
e.load_state(e2.get_state())
assert e == e2
e3 = e.copy()
assert e3 == e
def test_replace(self):
e = flow.Error(None, "amoop")
e.replace("moo", "bar")
assert e.msg == "abarp"
class uClientConnect(libpry.AutoTree):
def test_state(self):
c = flow.ClientConnect(("a", 22))
assert flow.ClientConnect.from_state(c.get_state()) == c
c2 = flow.ClientConnect(("a", 25))
assert not c == c2
c.load_state(c2.get_state())
assert c == c2
c3 = c.copy()
assert c3 == c
class uHeaders(libpry.AutoTree):
def setUp(self):
self.hd = flow.Headers()
def test_read_simple(self):
data = """
Header: one
Header2: two
\r\n
"""
data = textwrap.dedent(data)
data = data.strip()
s = StringIO(data)
self.hd.read(s)
assert self.hd["header"] == ["one"]
assert self.hd["header2"] == ["two"]
def test_read_multi(self):
data = """
Header: one
Header: two
\r\n
"""
data = textwrap.dedent(data)
data = data.strip()
s = StringIO(data)
self.hd.read(s)
assert self.hd["header"] == ["one", "two"]
def test_read_continued(self):
data = """
Header: one
\ttwo
Header2: three
\r\n
"""
data = textwrap.dedent(data)
data = data.strip()
s = StringIO(data)
self.hd.read(s)
assert self.hd["header"] == ['one\r\n two']
def test_dictToHeader1(self):
self.hd.add("one", "uno")
self.hd.add("two", "due")
self.hd.add("two", "tre")
expected = [
"one: uno\r\n",
"two: due\r\n",
"two: tre\r\n",
"\r\n"
]
out = repr(self.hd)
for i in expected:
assert out.find(i) >= 0
def test_dictToHeader2(self):
self.hd["one"] = ["uno"]
expected1 = "one: uno\r\n"
expected2 = "\r\n"
out = repr(self.hd)
assert out.find(expected1) >= 0
assert out.find(expected2) >= 0
def test_match_re(self):
h = flow.Headers()
h.add("one", "uno")
h.add("two", "due")
h.add("two", "tre")
assert h.match_re("uno")
assert h.match_re("two: due")
assert not h.match_re("nonono")
def test_getset_state(self):
self.hd.add("foo", 1)
self.hd.add("foo", 2)
self.hd.add("bar", 3)
state = self.hd.get_state()
nd = flow.Headers.from_state(state)
assert nd == self.hd
def test_copy(self):
self.hd.add("foo", 1)
self.hd.add("foo", 2)
self.hd.add("bar", 3)
assert self.hd == self.hd.copy()
def test_del(self):
self.hd.add("foo", 1)
self.hd.add("Foo", 2)
self.hd.add("bar", 3)
del self.hd["foo"]
assert len(self.hd.lst) == 1
def test_replace(self):
self.hd.add("one", "two")
self.hd.add("two", "one")
assert self.hd.replace("one", "vun") == 2
assert self.hd.lst == [
["vun", "two"],
["two", "vun"],
]
tests = [
uStickyCookieState(),
@ -574,4 +887,9 @@ tests = [
uState(),
uSerialize(),
uFlowMaster(),
uRequest(),
uResponse(),
uError(),
uClientConnect(),
uHeaders(),
]

View File

@ -1,7 +1,6 @@
import cStringIO, time, re
import libpry
from libmproxy import proxy, controller, utils, dump
import email.utils
import tutils
@ -48,30 +47,6 @@ class u_parse_request_line(libpry.AutoTree):
assert proxy.parse_request_line(u) == ('GET', None, None, None, '/', 1)
class u_parse_url(libpry.AutoTree):
def test_simple(self):
assert not proxy.parse_url("")
u = "http://foo.com:8888/test"
s, h, po, pa = proxy.parse_url(u)
assert s == "http"
assert h == "foo.com"
assert po == 8888
assert pa == "/test"
s, h, po, pa = proxy.parse_url("http://foo/bar")
assert s == "http"
assert h == "foo"
assert po == 80
assert pa == "/bar"
s, h, po, pa = proxy.parse_url("http://foo")
assert pa == "/"
s, h, po, pa = proxy.parse_url("https://foo")
assert po == 443
class uFileLike(libpry.AutoTree):
def test_wrap(self):
s = cStringIO.StringIO("foobar\nfoobar")
@ -83,199 +58,6 @@ class uFileLike(libpry.AutoTree):
assert s.isatty
class uRequest(libpry.AutoTree):
def test_simple(self):
h = utils.Headers()
h["test"] = ["test"]
c = proxy.ClientConnect(("addr", 2222))
r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
u = r.url()
assert r.set_url(u)
assert not r.set_url("")
assert r.url() == u
assert r.assemble()
r2 = r.copy()
assert r == r2
def test_anticache(self):
h = utils.Headers()
r = proxy.Request(None, "host", 22, "https", "GET", "/", h, "content")
h["if-modified-since"] = ["test"]
h["if-none-match"] = ["test"]
r.anticache()
assert not "if-modified-since" in r.headers
assert not "if-none-match" in r.headers
def test_getset_state(self):
h = utils.Headers()
h["test"] = ["test"]
c = proxy.ClientConnect(("addr", 2222))
r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
state = r.get_state()
assert proxy.Request.from_state(state) == r
r.client_conn = None
state = r.get_state()
assert proxy.Request.from_state(state) == r
r2 = proxy.Request(c, "testing", 20, "http", "PUT", "/foo", h, "test")
assert not r == r2
r.load_state(r2.get_state())
assert r == r2
r2.client_conn = None
r.load_state(r2.get_state())
assert not r.client_conn
def test_replace(self):
r = tutils.treq()
r.path = "path/foo"
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 4
assert r.path == "path/boo"
assert not "foo" in r.content
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
r = tutils.treq()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
r = tutils.treq()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
r = tutils.treq()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
assert r.headers["content-encoding"] == ["gzip"]
assert r.content != "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
class uResponse(libpry.AutoTree):
def test_simple(self):
h = utils.Headers()
h["test"] = ["test"]
c = proxy.ClientConnect(("addr", 2222))
req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
resp = proxy.Response(req, 200, "msg", h.copy(), "content")
assert resp.assemble()
resp2 = resp.copy()
assert resp2 == resp
def test_refresh(self):
r = tutils.tresp()
n = time.time()
r.headers["date"] = [email.utils.formatdate(n)]
pre = r.headers["date"]
r.refresh(n)
assert pre == r.headers["date"]
r.refresh(n+60)
d = email.utils.parsedate_tz(r.headers["date"][0])
d = email.utils.mktime_tz(d)
# Weird that this is not exact...
assert abs(60-(d-n)) <= 1
r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"]
r.refresh()
def test_refresh_cookie(self):
r = tutils.tresp()
# Invalid expires format, sent to us by Reddit.
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
assert r._refresh_cookie(c, 60)
c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
assert "00:21:38" in r._refresh_cookie(c, 60)
def test_getset_state(self):
h = utils.Headers()
h["test"] = ["test"]
c = proxy.ClientConnect(("addr", 2222))
r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
resp = proxy.Response(req, 200, "msg", h.copy(), "content")
state = resp.get_state()
assert proxy.Response.from_state(req, state) == resp
resp2 = proxy.Response(req, 220, "foo", h.copy(), "test")
assert not resp == resp2
resp.load_state(resp2.get_state())
assert resp == resp2
def test_replace(self):
r = tutils.tresp()
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
assert not "foo" in r.content
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
assert r.headers["content-encoding"] == ["gzip"]
assert r.content != "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
class uError(libpry.AutoTree):
def test_getset_state(self):
e = proxy.Error(None, "Error")
state = e.get_state()
assert proxy.Error.from_state(state) == e
assert e.copy()
e2 = proxy.Error(None, "bar")
assert not e == e2
e.load_state(e2.get_state())
assert e == e2
e3 = e.copy()
assert e3 == e
def test_replace(self):
e = proxy.Error(None, "amoop")
e.replace("moo", "bar")
assert e.msg == "abarp"
class uProxyError(libpry.AutoTree):
def test_simple(self):
@ -283,30 +65,9 @@ class uProxyError(libpry.AutoTree):
assert repr(p)
class uClientConnect(libpry.AutoTree):
def test_state(self):
c = proxy.ClientConnect(("a", 22))
assert proxy.ClientConnect.from_state(c.get_state()) == c
c2 = proxy.ClientConnect(("a", 25))
assert not c == c2
c.load_state(c2.get_state())
assert c == c2
c3 = c.copy()
assert c3 == c
tests = [
uProxyError(),
uRequest(),
uResponse(),
uFileLike(),
u_parse_request_line(),
u_parse_url(),
uError(),
uClientConnect(),
u_read_chunked(),
]

View File

@ -49,109 +49,6 @@ class uData(libpry.AutoTree):
libpry.raises("does not exist", utils.pkg_data.path, "nonexistent")
class uHeaders(libpry.AutoTree):
def setUp(self):
self.hd = utils.Headers()
def test_read_simple(self):
data = """
Header: one
Header2: two
\r\n
"""
data = textwrap.dedent(data)
data = data.strip()
s = cStringIO.StringIO(data)
self.hd.read(s)
assert self.hd["header"] == ["one"]
assert self.hd["header2"] == ["two"]
def test_read_multi(self):
data = """
Header: one
Header: two
\r\n
"""
data = textwrap.dedent(data)
data = data.strip()
s = cStringIO.StringIO(data)
self.hd.read(s)
assert self.hd["header"] == ["one", "two"]
def test_read_continued(self):
data = """
Header: one
\ttwo
Header2: three
\r\n
"""
data = textwrap.dedent(data)
data = data.strip()
s = cStringIO.StringIO(data)
self.hd.read(s)
assert self.hd["header"] == ['one\r\n two']
def test_dictToHeader1(self):
self.hd.add("one", "uno")
self.hd.add("two", "due")
self.hd.add("two", "tre")
expected = [
"one: uno\r\n",
"two: due\r\n",
"two: tre\r\n",
"\r\n"
]
out = repr(self.hd)
for i in expected:
assert out.find(i) >= 0
def test_dictToHeader2(self):
self.hd["one"] = ["uno"]
expected1 = "one: uno\r\n"
expected2 = "\r\n"
out = repr(self.hd)
assert out.find(expected1) >= 0
assert out.find(expected2) >= 0
def test_match_re(self):
h = utils.Headers()
h.add("one", "uno")
h.add("two", "due")
h.add("two", "tre")
assert h.match_re("uno")
assert h.match_re("two: due")
assert not h.match_re("nonono")
def test_getset_state(self):
self.hd.add("foo", 1)
self.hd.add("foo", 2)
self.hd.add("bar", 3)
state = self.hd.get_state()
nd = utils.Headers.from_state(state)
assert nd == self.hd
def test_copy(self):
self.hd.add("foo", 1)
self.hd.add("foo", 2)
self.hd.add("bar", 3)
assert self.hd == self.hd.copy()
def test_del(self):
self.hd.add("foo", 1)
self.hd.add("Foo", 2)
self.hd.add("bar", 3)
del self.hd["foo"]
assert len(self.hd.lst) == 1
def test_replace(self):
self.hd.add("one", "two")
self.hd.add("two", "one")
assert self.hd.replace("one", "vun") == 2
assert self.hd.lst == [
["vun", "two"],
["two", "vun"],
]
class upretty_xmlish(libpry.AutoTree):
def test_tagre(self):
@ -295,13 +192,36 @@ class uLRUCache(libpry.AutoTree):
assert len(f._cachelist_one) == 2
class u_parse_url(libpry.AutoTree):
def test_simple(self):
assert not utils.parse_url("")
u = "http://foo.com:8888/test"
s, h, po, pa = utils.parse_url(u)
assert s == "http"
assert h == "foo.com"
assert po == 8888
assert pa == "/test"
s, h, po, pa = utils.parse_url("http://foo/bar")
assert s == "http"
assert h == "foo"
assert po == 80
assert pa == "/bar"
s, h, po, pa = utils.parse_url("http://foo")
assert pa == "/"
s, h, po, pa = utils.parse_url("https://foo")
assert po == 443
tests = [
uformat_timestamp(),
uisBin(),
uisXML(),
uhexdump(),
upretty_size(),
uHeaders(),
uData(),
upretty_xmlish(),
upretty_json(),
@ -310,4 +230,5 @@ tests = [
udummy_ca(),
udummy_cert(),
uLRUCache(),
u_parse_url()
]

View File

@ -1,23 +1,23 @@
import os.path, threading, Queue
import libpry
from libmproxy import proxy, utils, filt, flow, controller
from libmproxy import proxy, filt, flow, controller
import serv, sslserv
import random
def treq(conn=None):
if not conn:
conn = proxy.ClientConnect(("address", 22))
headers = utils.Headers()
conn = flow.ClientConnect(("address", 22))
headers = flow.Headers()
headers["header"] = ["qvalue"]
return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
return flow.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
def tresp(req=None):
if not req:
req = treq()
headers = utils.Headers()
headers = flow.Headers()
headers["header_response"] = ["svalue"]
return proxy.Response(req, 200, "message", headers, "content_response")
return flow.Response(req, 200, "message", headers, "content_response")
def tflow():
@ -35,7 +35,7 @@ def tflow_full():
def tflow_err():
r = treq()
f = flow.Flow(r)
f.error = proxy.Error(r, "error")
f.error = flow.Error(r, "error")
return f