598 lines
25 KiB
Python
598 lines
25 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
proxy.py
|
|
~~~~~~~~
|
|
|
|
HTTP Proxy Server in Python.
|
|
|
|
:copyright: (c) 2013-2018 by Abhinav Singh.
|
|
:license: BSD, see LICENSE for more details.
|
|
"""
|
|
import sys
|
|
import base64
|
|
import socket
|
|
import logging
|
|
import unittest
|
|
from threading import Thread
|
|
from contextlib import closing
|
|
from proxy import Proxy, ChunkParser, HttpParser, Client
|
|
from proxy import ProxyAuthenticationFailed, ProxyConnectionFailed
|
|
from proxy import CRLF, version, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT
|
|
|
|
# logging.basicConfig(level=logging.DEBUG,
|
|
# format='%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s')
|
|
|
|
# True if we are running on Python 3.
|
|
if sys.version_info[0] == 3:
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
else:
|
|
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
|
|
|
|
|
|
class TestChunkParser(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.parser = ChunkParser()
|
|
|
|
def test_chunk_parse_basic(self):
|
|
self.parser.parse(b''.join([
|
|
b'4\r\n',
|
|
b'Wiki\r\n',
|
|
b'5\r\n',
|
|
b'pedia\r\n',
|
|
b'E\r\n',
|
|
b' in\r\n\r\nchunks.\r\n',
|
|
b'0\r\n',
|
|
b'\r\n'
|
|
]))
|
|
self.assertEqual(self.parser.chunk, b'')
|
|
self.assertEqual(self.parser.size, None)
|
|
self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.COMPLETE)
|
|
|
|
def test_chunk_parse_issue_27(self):
|
|
"""Case when data ends with the chunk size but without CRLF."""
|
|
self.parser.parse(b'3')
|
|
self.assertEqual(self.parser.chunk, b'3')
|
|
self.assertEqual(self.parser.size, None)
|
|
self.assertEqual(self.parser.body, b'')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE)
|
|
self.parser.parse(b'\r\n')
|
|
self.assertEqual(self.parser.chunk, b'')
|
|
self.assertEqual(self.parser.size, 3)
|
|
self.assertEqual(self.parser.body, b'')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_DATA)
|
|
self.parser.parse(b'abc')
|
|
self.assertEqual(self.parser.chunk, b'')
|
|
self.assertEqual(self.parser.size, None)
|
|
self.assertEqual(self.parser.body, b'abc')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE)
|
|
self.parser.parse(b'\r\n')
|
|
self.assertEqual(self.parser.chunk, b'')
|
|
self.assertEqual(self.parser.size, None)
|
|
self.assertEqual(self.parser.body, b'abc')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE)
|
|
self.parser.parse(b'4\r\n')
|
|
self.assertEqual(self.parser.chunk, b'')
|
|
self.assertEqual(self.parser.size, 4)
|
|
self.assertEqual(self.parser.body, b'abc')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_DATA)
|
|
self.parser.parse(b'defg\r\n0')
|
|
self.assertEqual(self.parser.chunk, b'0')
|
|
self.assertEqual(self.parser.size, None)
|
|
self.assertEqual(self.parser.body, b'abcdefg')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE)
|
|
self.parser.parse(b'\r\n\r\n')
|
|
self.assertEqual(self.parser.chunk, b'')
|
|
self.assertEqual(self.parser.size, None)
|
|
self.assertEqual(self.parser.body, b'abcdefg')
|
|
self.assertEqual(self.parser.state, ChunkParser.states.COMPLETE)
|
|
|
|
|
|
class TestHttpParser(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.parser = HttpParser(HttpParser.types.REQUEST_PARSER)
|
|
|
|
def test_build_header(self):
|
|
self.assertEqual(HttpParser.build_header(b'key', b'value'), b'key: value')
|
|
|
|
def test_split(self):
|
|
self.assertEqual(HttpParser.split(b'CONNECT python.org:443 HTTP/1.0\r\n\r\n'),
|
|
(b'CONNECT python.org:443 HTTP/1.0', b'\r\n'))
|
|
|
|
def test_split_false_line(self):
|
|
self.assertEqual(HttpParser.split(b'CONNECT python.org:443 HTTP/1.0'),
|
|
(False, b'CONNECT python.org:443 HTTP/1.0'))
|
|
|
|
def test_get_full_parse(self):
|
|
raw = CRLF.join([
|
|
b'GET %s HTTP/1.1',
|
|
b'Host: %s',
|
|
CRLF
|
|
])
|
|
self.parser.parse(raw % (b'https://example.com/path/dir/?a=b&c=d#p=q', b'example.com'))
|
|
self.assertEqual(self.parser.build_url(), b'/path/dir/?a=b&c=d#p=q')
|
|
self.assertEqual(self.parser.method, b'GET')
|
|
self.assertEqual(self.parser.url.hostname, b'example.com')
|
|
self.assertEqual(self.parser.url.port, None)
|
|
self.assertEqual(self.parser.version, b'HTTP/1.1')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
self.assertDictContainsSubset({b'host': (b'Host', b'example.com')}, self.parser.headers)
|
|
self.assertEqual(raw % (b'/path/dir/?a=b&c=d#p=q', b'example.com'),
|
|
self.parser.build(del_headers=[b'host'], add_headers=[(b'Host', b'example.com')]))
|
|
|
|
def test_build_url_none(self):
|
|
self.assertEqual(self.parser.build_url(), b'/None')
|
|
|
|
def test_line_rcvd_to_rcving_headers_state_change(self):
|
|
self.parser.parse(b'GET http://localhost HTTP/1.1')
|
|
self.assertEqual(self.parser.state, HttpParser.states.INITIALIZED)
|
|
self.parser.parse(CRLF)
|
|
self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD)
|
|
self.parser.parse(CRLF)
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS)
|
|
|
|
def test_get_partial_parse1(self):
|
|
self.parser.parse(CRLF.join([
|
|
b'GET http://localhost:8080 HTTP/1.1'
|
|
]))
|
|
self.assertEqual(self.parser.method, None)
|
|
self.assertEqual(self.parser.url, None)
|
|
self.assertEqual(self.parser.version, None)
|
|
self.assertEqual(self.parser.state, HttpParser.states.INITIALIZED)
|
|
|
|
self.parser.parse(CRLF)
|
|
self.assertEqual(self.parser.method, b'GET')
|
|
self.assertEqual(self.parser.url.hostname, b'localhost')
|
|
self.assertEqual(self.parser.url.port, 8080)
|
|
self.assertEqual(self.parser.version, b'HTTP/1.1')
|
|
self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD)
|
|
|
|
self.parser.parse(b'Host: localhost:8080')
|
|
self.assertDictEqual(self.parser.headers, dict())
|
|
self.assertEqual(self.parser.buffer, b'Host: localhost:8080')
|
|
self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD)
|
|
|
|
self.parser.parse(CRLF * 2)
|
|
self.assertDictContainsSubset({b'host': (b'Host', b'localhost:8080')}, self.parser.headers)
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
def test_get_partial_parse2(self):
|
|
self.parser.parse(CRLF.join([
|
|
b'GET http://localhost:8080 HTTP/1.1',
|
|
b'Host: '
|
|
]))
|
|
self.assertEqual(self.parser.method, b'GET')
|
|
self.assertEqual(self.parser.url.hostname, b'localhost')
|
|
self.assertEqual(self.parser.url.port, 8080)
|
|
self.assertEqual(self.parser.version, b'HTTP/1.1')
|
|
self.assertEqual(self.parser.buffer, b'Host: ')
|
|
self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD)
|
|
|
|
self.parser.parse(b'localhost:8080' + CRLF)
|
|
self.assertDictContainsSubset({b'host': (b'Host', b'localhost:8080')}, self.parser.headers)
|
|
self.assertEqual(self.parser.buffer, b'')
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS)
|
|
|
|
self.parser.parse(b'Content-Type: text/plain' + CRLF)
|
|
self.assertEqual(self.parser.buffer, b'')
|
|
self.assertDictContainsSubset({b'content-type': (b'Content-Type', b'text/plain')}, self.parser.headers)
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS)
|
|
|
|
self.parser.parse(CRLF)
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
def test_post_full_parse(self):
|
|
raw = CRLF.join([
|
|
b'POST %s HTTP/1.1',
|
|
b'Host: localhost',
|
|
b'Content-Length: 7',
|
|
b'Content-Type: application/x-www-form-urlencoded' + CRLF,
|
|
b'a=b&c=d'
|
|
])
|
|
self.parser.parse(raw % b'http://localhost')
|
|
self.assertEqual(self.parser.method, b'POST')
|
|
self.assertEqual(self.parser.url.hostname, b'localhost')
|
|
self.assertEqual(self.parser.url.port, None)
|
|
self.assertEqual(self.parser.version, b'HTTP/1.1')
|
|
self.assertDictContainsSubset({b'content-type': (b'Content-Type', b'application/x-www-form-urlencoded')},
|
|
self.parser.headers)
|
|
self.assertDictContainsSubset({b'content-length': (b'Content-Length', b'7')}, self.parser.headers)
|
|
self.assertEqual(self.parser.body, b'a=b&c=d')
|
|
self.assertEqual(self.parser.buffer, b'')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(len(self.parser.build()), len(raw % b'/'))
|
|
|
|
def test_post_partial_parse(self):
|
|
self.parser.parse(CRLF.join([
|
|
b'POST http://localhost HTTP/1.1',
|
|
b'Host: localhost',
|
|
b'Content-Length: 7',
|
|
b'Content-Type: application/x-www-form-urlencoded'
|
|
]))
|
|
self.assertEqual(self.parser.method, b'POST')
|
|
self.assertEqual(self.parser.url.hostname, b'localhost')
|
|
self.assertEqual(self.parser.url.port, None)
|
|
self.assertEqual(self.parser.version, b'HTTP/1.1')
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS)
|
|
|
|
self.parser.parse(CRLF)
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS)
|
|
|
|
self.parser.parse(CRLF)
|
|
self.assertEqual(self.parser.state, HttpParser.states.HEADERS_COMPLETE)
|
|
|
|
self.parser.parse(b'a=b')
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_BODY)
|
|
self.assertEqual(self.parser.body, b'a=b')
|
|
self.assertEqual(self.parser.buffer, b'')
|
|
|
|
self.parser.parse(b'&c=d')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(self.parser.body, b'a=b&c=d')
|
|
self.assertEqual(self.parser.buffer, b'')
|
|
|
|
def test_connect_request_without_host_header_request_parse(self):
|
|
"""Case where clients can send CONNECT request without a Host header field.
|
|
|
|
Example:
|
|
1. pip3 --proxy http://localhost:8899 install <package name>
|
|
Uses HTTP/1.0, Host header missing with CONNECT requests
|
|
2. Android Emulator
|
|
Uses HTTP/1.1, Host header missing with CONNECT requests
|
|
|
|
See https://github.com/abhinavsingh/proxy.py/issues/5 for details.
|
|
"""
|
|
self.parser.parse(b'CONNECT pypi.org:443 HTTP/1.0\r\n\r\n')
|
|
self.assertEqual(self.parser.method, b'CONNECT')
|
|
self.assertEqual(self.parser.version, b'HTTP/1.0')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
def test_request_parse_without_content_length(self):
|
|
"""Case when incoming request doesn't contain a content-length header.
|
|
|
|
From http://w3-org.9356.n7.nabble.com/POST-with-empty-body-td103965.html
|
|
'A POST with no content-length and no body is equivalent to a POST with Content-Length: 0
|
|
and nothing following, as could perfectly happen when you upload an empty file for instance.'
|
|
|
|
See https://github.com/abhinavsingh/proxy.py/issues/20 for details.
|
|
"""
|
|
self.parser.parse(CRLF.join([
|
|
b'POST http://localhost HTTP/1.1',
|
|
b'Host: localhost',
|
|
b'Content-Type: application/x-www-form-urlencoded',
|
|
CRLF
|
|
]))
|
|
self.assertEqual(self.parser.method, b'POST')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
def test_response_parse_without_content_length(self):
|
|
"""Case when server response doesn't contain a content-length header for non-chunk response types.
|
|
|
|
HttpParser by itself has no way to know if more data should be expected.
|
|
In example below, parser reaches state HttpParser.states.HEADERS_COMPLETE
|
|
and it is responsibility of callee to change state to HttpParser.states.COMPLETE
|
|
when server stream closes.
|
|
|
|
See https://github.com/abhinavsingh/proxy.py/issues/20 for details.
|
|
"""
|
|
self.parser.type = HttpParser.types.RESPONSE_PARSER
|
|
self.parser.parse(b'HTTP/1.0 200 OK' + CRLF)
|
|
self.assertEqual(self.parser.code, b'200')
|
|
self.assertEqual(self.parser.version, b'HTTP/1.0')
|
|
self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD)
|
|
self.parser.parse(CRLF.join([
|
|
b'Server: BaseHTTP/0.3 Python/2.7.10',
|
|
b'Date: Thu, 13 Dec 2018 16:24:09 GMT',
|
|
CRLF
|
|
]))
|
|
self.assertEqual(self.parser.state, HttpParser.states.HEADERS_COMPLETE)
|
|
|
|
def test_response_parse(self):
|
|
self.parser.type = HttpParser.types.RESPONSE_PARSER
|
|
self.parser.parse(b''.join([
|
|
b'HTTP/1.1 301 Moved Permanently\r\n',
|
|
b'Location: http://www.google.com/\r\n',
|
|
b'Content-Type: text/html; charset=UTF-8\r\n',
|
|
b'Date: Wed, 22 May 2013 14:07:29 GMT\r\n',
|
|
b'Expires: Fri, 21 Jun 2013 14:07:29 GMT\r\n',
|
|
b'Cache-Control: public, max-age=2592000\r\n',
|
|
b'Server: gws\r\n',
|
|
b'Content-Length: 219\r\n',
|
|
b'X-XSS-Protection: 1; mode=block\r\n',
|
|
b'X-Frame-Options: SAMEORIGIN\r\n\r\n',
|
|
b'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n' +
|
|
b'<TITLE>301 Moved</TITLE></HEAD>',
|
|
b'<BODY>\n<H1>301 Moved</H1>\nThe document has moved\n' +
|
|
b'<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n'
|
|
]))
|
|
self.assertEqual(self.parser.code, b'301')
|
|
self.assertEqual(self.parser.reason, b'Moved Permanently')
|
|
self.assertEqual(self.parser.version, b'HTTP/1.1')
|
|
self.assertEqual(self.parser.body,
|
|
b'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n' +
|
|
b'<TITLE>301 Moved</TITLE></HEAD><BODY>\n<H1>301 Moved</H1>\nThe document has moved\n' +
|
|
b'<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n')
|
|
self.assertDictContainsSubset({b'content-length': (b'Content-Length', b'219')}, self.parser.headers)
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
def test_response_partial_parse(self):
|
|
self.parser.type = HttpParser.types.RESPONSE_PARSER
|
|
self.parser.parse(b''.join([
|
|
b'HTTP/1.1 301 Moved Permanently\r\n',
|
|
b'Location: http://www.google.com/\r\n',
|
|
b'Content-Type: text/html; charset=UTF-8\r\n',
|
|
b'Date: Wed, 22 May 2013 14:07:29 GMT\r\n',
|
|
b'Expires: Fri, 21 Jun 2013 14:07:29 GMT\r\n',
|
|
b'Cache-Control: public, max-age=2592000\r\n',
|
|
b'Server: gws\r\n',
|
|
b'Content-Length: 219\r\n',
|
|
b'X-XSS-Protection: 1; mode=block\r\n',
|
|
b'X-Frame-Options: SAMEORIGIN\r\n'
|
|
]))
|
|
self.assertDictContainsSubset({b'x-frame-options': (b'X-Frame-Options', b'SAMEORIGIN')}, self.parser.headers)
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS)
|
|
self.parser.parse(b'\r\n')
|
|
self.assertEqual(self.parser.state, HttpParser.states.HEADERS_COMPLETE)
|
|
self.parser.parse(
|
|
b'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n' +
|
|
b'<TITLE>301 Moved</TITLE></HEAD>')
|
|
self.assertEqual(self.parser.state, HttpParser.states.RCVING_BODY)
|
|
self.parser.parse(
|
|
b'<BODY>\n<H1>301 Moved</H1>\nThe document has moved\n' +
|
|
b'<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
def test_chunked_response_parse(self):
|
|
self.parser.type = HttpParser.types.RESPONSE_PARSER
|
|
self.parser.parse(b''.join([
|
|
b'HTTP/1.1 200 OK\r\n',
|
|
b'Content-Type: application/json\r\n',
|
|
b'Date: Wed, 22 May 2013 15:08:15 GMT\r\n',
|
|
b'Server: gunicorn/0.16.1\r\n',
|
|
b'transfer-encoding: chunked\r\n',
|
|
b'Connection: keep-alive\r\n\r\n',
|
|
b'4\r\n',
|
|
b'Wiki\r\n',
|
|
b'5\r\n',
|
|
b'pedia\r\n',
|
|
b'E\r\n',
|
|
b' in\r\n\r\nchunks.\r\n',
|
|
b'0\r\n',
|
|
b'\r\n'
|
|
]))
|
|
self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.')
|
|
self.assertEqual(self.parser.state, HttpParser.states.COMPLETE)
|
|
|
|
|
|
class MockConnection(object):
|
|
|
|
def __init__(self, b=b''):
|
|
self.buffer = b
|
|
|
|
def recv(self, b=8192):
|
|
data = self.buffer[:b]
|
|
self.buffer = self.buffer[b:]
|
|
return data
|
|
|
|
def send(self, data):
|
|
return len(data)
|
|
|
|
def queue(self, data):
|
|
self.buffer += data
|
|
|
|
|
|
class HTTPRequestHandler(BaseHTTPRequestHandler):
|
|
|
|
def do_GET(self):
|
|
self.send_response(200)
|
|
# TODO(abhinavsingh): Proxy should work just fine even without content-length header
|
|
self.send_header('content-length', 2)
|
|
self.end_headers()
|
|
self.wfile.write(b'OK')
|
|
|
|
|
|
class TestProxy(unittest.TestCase):
|
|
|
|
http_server = None
|
|
http_server_port = None
|
|
http_server_thread = None
|
|
|
|
@staticmethod
|
|
def get_available_port():
|
|
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
|
|
sock.bind(('', 0))
|
|
_, port = sock.getsockname()
|
|
return port
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.http_server_port = cls.get_available_port()
|
|
cls.http_server = HTTPServer(('127.0.0.1', cls.http_server_port), HTTPRequestHandler)
|
|
cls.http_server_thread = Thread(target=cls.http_server.serve_forever)
|
|
cls.http_server_thread.setDaemon(True)
|
|
cls.http_server_thread.start()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.http_server.shutdown()
|
|
cls.http_server.server_close()
|
|
cls.http_server_thread.join()
|
|
|
|
def setUp(self):
|
|
self._conn = MockConnection()
|
|
self._addr = ('127.0.0.1', 54382)
|
|
self.proxy = Proxy(Client(self._conn, self._addr))
|
|
|
|
def test_http_get(self):
|
|
# Send request line
|
|
self.proxy.client.conn.queue((b'GET http://localhost:%d HTTP/1.1' % self.http_server_port) + CRLF)
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.assertNotEqual(self.proxy.request.state, HttpParser.states.COMPLETE)
|
|
# Send headers and blank line, thus completing HTTP request
|
|
self.proxy.client.conn.queue(CRLF.join([
|
|
b'User-Agent: proxy.py/%s' % version,
|
|
b'Host: localhost:%d' % self.http_server_port,
|
|
b'Accept: */*',
|
|
b'Proxy-Connection: Keep-Alive',
|
|
CRLF
|
|
]))
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.assertEqual(self.proxy.request.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(self.proxy.server.addr, (b'localhost', self.http_server_port))
|
|
# Flush data queued for server
|
|
self.proxy.server.flush()
|
|
self.assertEqual(self.proxy.server.buffer_size(), 0)
|
|
# Receive full response from server
|
|
data = self.proxy.server.recv()
|
|
while data:
|
|
self.proxy._process_response(data)
|
|
logging.info(self.proxy.response.state)
|
|
if self.proxy.response.state == HttpParser.states.COMPLETE:
|
|
break
|
|
data = self.proxy.server.recv()
|
|
# Verify 200 success response code
|
|
self.assertEqual(self.proxy.response.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(int(self.proxy.response.code), 200)
|
|
|
|
def test_http_tunnel(self):
|
|
self.proxy.client.conn.queue(CRLF.join([
|
|
b'CONNECT localhost:%d HTTP/1.1' % self.http_server_port,
|
|
b'Host: localhost:%d' % self.http_server_port,
|
|
b'User-Agent: proxy.py/%s' % version,
|
|
b'Proxy-Connection: Keep-Alive',
|
|
CRLF
|
|
]))
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.assertFalse(self.proxy.server is None)
|
|
self.assertEqual(self.proxy.client.buffer, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT)
|
|
|
|
parser = HttpParser(HttpParser.types.RESPONSE_PARSER)
|
|
parser.parse(self.proxy.client.buffer)
|
|
self.assertEqual(parser.state, HttpParser.states.HEADERS_COMPLETE)
|
|
self.assertEqual(int(parser.code), 200)
|
|
|
|
self.proxy.client.flush()
|
|
self.assertEqual(self.proxy.client.buffer_size(), 0)
|
|
|
|
self.proxy.client.conn.queue(CRLF.join([
|
|
b'GET / HTTP/1.1',
|
|
b'Host: localhost:%d' % self.http_server_port,
|
|
b'User-Agent: proxy.py/%s' % version,
|
|
CRLF
|
|
]))
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.proxy.server.flush()
|
|
self.assertEqual(self.proxy.server.buffer_size(), 0)
|
|
|
|
parser = HttpParser(HttpParser.types.RESPONSE_PARSER)
|
|
data = self.proxy.server.recv()
|
|
while data:
|
|
parser.parse(data)
|
|
if parser.state == HttpParser.states.COMPLETE:
|
|
break
|
|
data = self.proxy.server.recv()
|
|
|
|
self.assertEqual(parser.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(int(parser.code), 200)
|
|
|
|
def test_proxy_connection_failed(self):
|
|
with self.assertRaises(ProxyConnectionFailed):
|
|
self.proxy._process_request(CRLF.join([
|
|
b'GET http://unknown.domain HTTP/1.1',
|
|
b'Host: unknown.domain',
|
|
CRLF
|
|
]))
|
|
|
|
def test_proxy_authentication_failed(self):
|
|
self.proxy = Proxy(Client(self._conn, self._addr), b'Basic %s' % base64.b64encode(b'user:pass'))
|
|
|
|
with self.assertRaises(ProxyAuthenticationFailed):
|
|
self.proxy._process_request(CRLF.join([
|
|
b'GET http://abhinavsingh.com HTTP/1.1',
|
|
b'Host: abhinavsingh.com',
|
|
CRLF
|
|
]))
|
|
|
|
def test_authenticated_proxy_http_get(self):
|
|
self.proxy = Proxy(Client(self._conn, self._addr), b'Basic %s' % base64.b64encode(b'user:pass'))
|
|
|
|
self.proxy.client.conn.queue((b'GET http://localhost:%d HTTP/1.1' % self.http_server_port) + CRLF)
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.assertNotEqual(self.proxy.request.state, HttpParser.states.COMPLETE)
|
|
|
|
self.proxy.client.conn.queue(CRLF.join([
|
|
b'User-Agent: proxy.py/%s' % version,
|
|
b'Host: localhost:%d' % self.http_server_port,
|
|
b'Accept: */*',
|
|
b'Proxy-Connection: Keep-Alive',
|
|
b'Proxy-Authorization: Basic dXNlcjpwYXNz',
|
|
CRLF
|
|
]))
|
|
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.assertEqual(self.proxy.request.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(self.proxy.server.addr, (b'localhost', self.http_server_port))
|
|
|
|
self.proxy.server.flush()
|
|
self.assertEqual(self.proxy.server.buffer_size(), 0)
|
|
|
|
data = self.proxy.server.recv()
|
|
while data:
|
|
self.proxy._process_response(data)
|
|
if self.proxy.response.state == HttpParser.states.COMPLETE:
|
|
break
|
|
data = self.proxy.server.recv()
|
|
|
|
self.assertEqual(self.proxy.response.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(int(self.proxy.response.code), 200)
|
|
|
|
def test_authenticated_proxy_http_tunnel(self):
|
|
self.proxy = Proxy(Client(self._conn, self._addr), b'Basic %s' % base64.b64encode(b'user:pass'))
|
|
|
|
self.proxy.client.conn.queue(CRLF.join([
|
|
b'CONNECT localhost:%d HTTP/1.1' % self.http_server_port,
|
|
b'Host: localhost:%d' % self.http_server_port,
|
|
b'User-Agent: proxy.py/%s' % version,
|
|
b'Proxy-Connection: Keep-Alive',
|
|
b'Proxy-Authorization: Basic dXNlcjpwYXNz',
|
|
CRLF
|
|
]))
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.assertFalse(self.proxy.server is None)
|
|
self.assertEqual(self.proxy.client.buffer, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT)
|
|
|
|
parser = HttpParser(HttpParser.types.RESPONSE_PARSER)
|
|
parser.parse(self.proxy.client.buffer)
|
|
self.assertEqual(parser.state, HttpParser.states.HEADERS_COMPLETE)
|
|
self.assertEqual(int(parser.code), 200)
|
|
|
|
self.proxy.client.flush()
|
|
self.assertEqual(self.proxy.client.buffer_size(), 0)
|
|
|
|
self.proxy.client.conn.queue(CRLF.join([
|
|
b'GET / HTTP/1.1',
|
|
b'Host: localhost:%d' % self.http_server_port,
|
|
b'User-Agent: proxy.py/%s' % version,
|
|
CRLF
|
|
]))
|
|
self.proxy._process_request(self.proxy.client.recv())
|
|
self.proxy.server.flush()
|
|
self.assertEqual(self.proxy.server.buffer_size(), 0)
|
|
|
|
parser = HttpParser(HttpParser.types.RESPONSE_PARSER)
|
|
data = self.proxy.server.recv()
|
|
while data:
|
|
parser.parse(data)
|
|
if parser.state == HttpParser.states.COMPLETE:
|
|
break
|
|
data = self.proxy.server.recv()
|
|
|
|
self.assertEqual(parser.state, HttpParser.states.COMPLETE)
|
|
self.assertEqual(int(parser.code), 200)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|