118 lines
2.9 KiB
Python
118 lines
2.9 KiB
Python
import threading, Queue
|
|
import libpry
|
|
from libmproxy import proxy, flow, controller
|
|
import serv, sslserv
|
|
import random
|
|
|
|
def treq(conn=None):
|
|
if not conn:
|
|
conn = flow.ClientConnect(("address", 22))
|
|
headers = flow.ODictCaseless()
|
|
headers["header"] = ["qvalue"]
|
|
return flow.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
|
|
|
|
|
|
def tresp(req=None):
|
|
if not req:
|
|
req = treq()
|
|
headers = flow.ODictCaseless()
|
|
headers["header_response"] = ["svalue"]
|
|
return flow.Response(req, 200, "message", headers, "content_response")
|
|
|
|
|
|
def tflow():
|
|
r = treq()
|
|
return flow.Flow(r)
|
|
|
|
|
|
def tflow_full():
|
|
r = treq()
|
|
f = flow.Flow(r)
|
|
f.response = tresp(r)
|
|
return f
|
|
|
|
|
|
def tflow_err():
|
|
r = treq()
|
|
f = flow.Flow(r)
|
|
f.error = flow.Error(r, "error")
|
|
return f
|
|
|
|
|
|
# Yes, the random ports are horrible. During development, sockets are often not
|
|
# properly closed during error conditions, which means you have to wait until
|
|
# you can re-bind to the same port. This is a pain in the ass, so we just pick
|
|
# a random port and keep moving.
|
|
PROXL_PORT = random.randint(10000, 20000)
|
|
HTTP_PORT = random.randint(20000, 30000)
|
|
HTTPS_PORT = random.randint(30000, 40000)
|
|
|
|
|
|
class TestMaster(controller.Master):
|
|
def __init__(self, port, testq):
|
|
s = proxy.ProxyServer(proxy.ProxyConfig("data/testkey.pem"), port)
|
|
controller.Master.__init__(self, s)
|
|
self.testq = testq
|
|
self.log = []
|
|
|
|
def clear(self):
|
|
self.log = []
|
|
|
|
def handle(self, m):
|
|
self.log.append(m)
|
|
m._ack()
|
|
|
|
|
|
class ProxyThread(threading.Thread):
|
|
def __init__(self, port, testq):
|
|
self.tmaster = TestMaster(port, testq)
|
|
controller.should_exit = False
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
self.tmaster.run()
|
|
|
|
def shutdown(self):
|
|
self.tmaster.shutdown()
|
|
|
|
|
|
class ServerThread(threading.Thread):
|
|
def __init__(self, server):
|
|
self.server = server
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
self.server.serve_forever()
|
|
|
|
def shutdown(self):
|
|
self.server.shutdown()
|
|
|
|
|
|
class TestServers(libpry.TestContainer):
|
|
def setUpAll(self):
|
|
self.tqueue = Queue.Queue()
|
|
# We don't make any concurrent requests, so we can access
|
|
# the attributes on this object safely.
|
|
self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
|
|
self.threads = [
|
|
ServerThread(serv.make(HTTP_PORT)),
|
|
ServerThread(sslserv.make(HTTPS_PORT)),
|
|
self.proxthread
|
|
]
|
|
for i in self.threads:
|
|
i.start()
|
|
|
|
def setUp(self):
|
|
self.proxthread.tmaster.clear()
|
|
|
|
def tearDownAll(self):
|
|
for i in self.threads:
|
|
i.shutdown()
|
|
|
|
|
|
class ProxTest(libpry.AutoTree):
|
|
def log(self):
|
|
pthread = self.findAttr("proxthread")
|
|
return pthread.tmaster.log
|
|
|