From e9eed5e4c206e507665f5a4ff92654e969f4dd89 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Tue, 29 Sep 2015 16:23:55 +0200 Subject: [PATCH] --wip-- --- libmproxy/models/http.py | 4 - libmproxy/protocol/http.py | 430 ++++++++++++++++++++++++++----------- requirements.txt | 2 +- setup.py | 1 + 4 files changed, 301 insertions(+), 136 deletions(-) diff --git a/libmproxy/models/http.py b/libmproxy/models/http.py index e07dff69e..3914440ea 100644 --- a/libmproxy/models/http.py +++ b/libmproxy/models/http.py @@ -241,8 +241,6 @@ class HTTPRequest(MessageMixin, Request): timestamp_end=request.timestamp_end, form_out=(request.form_out if hasattr(request, 'form_out') else None), ) - if hasattr(request, 'stream_id'): - req.stream_id = request.stream_id return req def __hash__(self): @@ -347,8 +345,6 @@ class HTTPResponse(MessageMixin, Response): timestamp_start=response.timestamp_start, timestamp_end=response.timestamp_end, ) - if hasattr(response, 'stream_id'): - resp.stream_id = response.stream_id return resp def _refresh_cookie(self, c, delta): diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py index 12d09e718..fc045e0d4 100644 --- a/libmproxy/protocol/http.py +++ b/libmproxy/protocol/http.py @@ -1,27 +1,37 @@ from __future__ import (absolute_import, print_function, division) + import sys import traceback - import six +import struct +import threading +import Queue from netlib import tcp from netlib.exceptions import HttpException, HttpReadDisconnect, NetlibException -from netlib.http import http1, Headers -from netlib.http import CONTENT_MISSING -from netlib.tcp import Address -from netlib.http.http2.connections import HTTP2Protocol -from netlib.http.http2.frame import GoAwayFrame, PriorityFrame, WindowUpdateFrame +from netlib.http import http1, Headers, CONTENT_MISSING +from netlib.tcp import Address, ssl_read_select + +import h2 +from h2.connection import H2Connection +from h2.events import * +from hyperframe import frame + +from .base import Layer, Kill from .. import utils from ..exceptions import HttpProtocolException, ProtocolException from ..models import ( - HTTPFlow, HTTPRequest, HTTPResponse, make_error_response, make_connect_response, Error, expect_continue_response + HTTPFlow, + HTTPRequest, + HTTPResponse, + make_error_response, + make_connect_response, + Error, + expect_continue_response ) -from .base import Layer, Kill class _HttpLayer(Layer): - supports_streaming = False - def read_request(self): raise NotImplementedError() @@ -31,26 +41,6 @@ class _HttpLayer(Layer): def send_request(self, request): raise NotImplementedError() - def read_response(self, request): - raise NotImplementedError() - - def send_response(self, response): - raise NotImplementedError() - - def check_close_connection(self, flow): - raise NotImplementedError() - - -class _StreamingHttpLayer(_HttpLayer): - supports_streaming = True - - def read_response_headers(self): - raise NotImplementedError - - def read_response_body(self, request, response): - raise NotImplementedError() - yield "this is a generator" # pragma: no cover - def read_response(self, request): response = self.read_response_headers() response.data.content = b"".join( @@ -58,21 +48,30 @@ class _StreamingHttpLayer(_HttpLayer): ) return response - def send_response_headers(self, response): - raise NotImplementedError - - def send_response_body(self, response, chunks): + def read_response_headers(self): raise NotImplementedError() + def read_response_body(self, request, response): + raise NotImplementedError() + yield "this is a generator" # pragma: no cover + def send_response(self, response): if response.content == CONTENT_MISSING: raise HttpException("Cannot assemble flow with CONTENT_MISSING") self.send_response_headers(response) self.send_response_body(response, [response.content]) + def send_response_headers(self, response): + raise NotImplementedError() -class Http1Layer(_StreamingHttpLayer): + def send_response_body(self, response, chunks): + raise NotImplementedError() + def check_close_connection(self, flow): + raise NotImplementedError() + + +class Http1Layer(_HttpLayer): def __init__(self, ctx, mode): super(Http1Layer, self).__init__(ctx) self.mode = mode @@ -130,104 +129,277 @@ class Http1Layer(_StreamingHttpLayer): layer = HttpLayer(self, self.mode) layer() +class SafeH2Connection(H2Connection): + def __init__(self, conn, *args, **kwargs): + super(SafeH2Connection, self).__init__(*args, **kwargs) + self.conn = conn + self.lock = threading.RLock() -# TODO: The HTTP2 layer is missing multiplexing, which requires a major rewrite. -class Http2Layer(_HttpLayer): + def safe_increment_flow_control(self, stream_id, length): + with self.lock: + self.increment_flow_control_window(length) + self.conn.send(self.data_to_send()) + with self.lock: + if stream_id in self.streams and not self.streams[stream_id].closed: + self.increment_flow_control_window(length, stream_id=stream_id) + self.conn.send(self.data_to_send()) + def safe_reset_stream(self, stream_id, error_code): + with self.lock: + self.reset_stream(stream_id, error_code) + self.conn.send(self.h2.data_to_send()) + + def safe_acknowledge_settings(self, event): + with self.conn.h2.lock: + self.conn.h2.acknowledge_settings(event) + self.conn.send(self.data_to_send()) + + def safe_update_settings(self, new_settings): + with self.conn.h2.lock: + self.update_settings(new_settings) + self.conn.send(self.data_to_send()) + + def safe_send_headers(self, stream_id, headers): + with self.lock: + self.send_headers(stream_id, headers) + self.conn.send(self.data_to_send()) + + def safe_send_body(self, stream_id, chunks): + for chunk in chunks: + max_outbound_frame_size = self.max_outbound_frame_size + for i in xrange(0, len(chunk), max_outbound_frame_size): + frame_chunk = chunk[i:i+max_outbound_frame_size] + with self.lock: + self.send_data(stream_id, frame_chunk) + self.conn.send(self.data_to_send()) + with self.lock: + self.end_stream(stream_id) + self.conn.send(self.data_to_send()) + +class Http2Layer(Layer): def __init__(self, ctx, mode): super(Http2Layer, self).__init__(ctx) self.mode = mode - self.client_protocol = HTTP2Protocol(self.client_conn, is_server=True, - unhandled_frame_cb=self.handle_unexpected_frame_from_client) - self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False, - unhandled_frame_cb=self.handle_unexpected_frame_from_server) + self.streams = dict() + self.server_to_client_stream_ids = dict([(0, 0)]) + self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False) - def read_request(self): - request = HTTPRequest.from_protocol( - self.client_protocol, - body_size_limit=self.config.body_size_limit - ) - self._stream_id = request.stream_id - return request + # make sure that we only pass actual SSL.Connection objects in here, + # because otherwise ssl_read_select fails! + self.active_conns = [self.client_conn.connection] - def send_request(self, message): - # TODO: implement flow control and WINDOW_UPDATE frames - self.server_conn.send(self.server_protocol.assemble(message)) + if self.server_conn: + self._initiate_server_conn() - def read_response(self, request): - return HTTPResponse.from_protocol( - self.server_protocol, - request_method=request.method, - body_size_limit=self.config.body_size_limit, - include_body=True, - stream_id=self._stream_id - ) - - def send_response(self, message): - # TODO: implement flow control to prevent client buffer filling up - # maintain a send buffer size, and read WindowUpdateFrames from client to increase the send buffer - self.client_conn.send(self.client_protocol.assemble(message)) - - def check_close_connection(self, flow): - # TODO: add a timer to disconnect after a 10 second timeout - return False + def _initiate_server_conn(self): + self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True) + self.server_conn.h2.initiate_connection() + self.server_conn.h2.update_settings({frame.SettingsFrame.ENABLE_PUSH: False}) + self.server_conn.send(self.server_conn.h2.data_to_send()) + self.active_conns.append(self.server_conn.connection) def connect(self): self.ctx.connect() - self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False, - unhandled_frame_cb=self.handle_unexpected_frame_from_server) - self.server_protocol.perform_connection_preface() + self.server_conn.connect() + self._initiate_server_conn() - def set_server(self, *args, **kwargs): - self.ctx.set_server(*args, **kwargs) - self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False, - unhandled_frame_cb=self.handle_unexpected_frame_from_server) - self.server_protocol.perform_connection_preface() + def set_server(self): + raise NotImplementedError("Cannot change server for HTTP2 connections.") + + def disconnect(self): + raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.") def __call__(self): - self.server_protocol.perform_connection_preface() + preamble = self.client_conn.rfile.read(24) + self.client_conn.h2.initiate_connection() + self.client_conn.h2.update_settings({frame.SettingsFrame.ENABLE_PUSH: False}) + self.client_conn.h2.receive_data(preamble) + self.client_conn.send(self.client_conn.h2.data_to_send()) + + while True: + r = ssl_read_select(self.active_conns, 1) + for conn in r: + source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn + other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn + is_server = (conn == self.server_conn.connection) + + fields = struct.unpack("!HB", source_conn.rfile.peek(3)) + length = (fields[0] << 8) + fields[1] + raw_frame = source_conn.rfile.safe_read(9 + length) + + with source_conn.h2.lock: + events = source_conn.h2.receive_data(raw_frame) + source_conn.send(source_conn.h2.data_to_send()) + + for event in events: + if hasattr(event, 'stream_id'): + if is_server: + eid = self.server_to_client_stream_ids[event.stream_id] + else: + eid = event.stream_id + + if isinstance(event, RequestReceived): + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) + self.streams[eid].start() + elif isinstance(event, ResponseReceived): + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + self.streams[eid].response_headers = headers + self.streams[eid].response_arrived.set() + elif isinstance(event, DataReceived): + self.streams[eid].data_queue.put(event.data) + source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data)) + elif isinstance(event, StreamEnded): + self.streams[eid].data_finished.set() + elif isinstance(event, StreamReset): + self.streams[eid].zombie = True + if eid in self.streams and event.error_code == 0x8: + if is_server: + other_stream_id = self.streams[eid].client_stream_id + else: + other_stream_id = self.streams[eid].server_stream_id + other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) + elif isinstance(event, RemoteSettingsChanged): + source_conn.h2.safe_acknowledge_settings(event) + new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()]) + other_conn.h2.safe_update_settings(new_settings) + + # TODO: cleanup resources once we are sure nobody needs them + # for stream_id in self.streams.keys(): + # if self.streams[stream_id].zombie: + # self.streams.pop(stream_id, None) + + +class Http2SingleStreamLayer(_HttpLayer, threading.Thread): + def __init__(self, ctx, stream_id, request_headers): + super(Http2SingleStreamLayer, self).__init__(ctx) + self.zombie = False + self.client_stream_id = stream_id + self.server_stream_id = None + self.request_headers = request_headers + self.response_headers = None + self.data_queue = Queue.Queue() + + self.response_arrived = threading.Event() + self.data_finished = threading.Event() + + def read_request(self): + self.data_finished.wait() + self.data_finished.clear() + + authority = self.request_headers.get(':authority', '') + method = self.request_headers.get(':method', 'GET') + scheme = self.request_headers.get(':scheme', 'https') + path = self.request_headers.get(':path', '/') + host = None + port = None + + if path == '*' or path.startswith("/"): + form_in = "relative" + elif method == 'CONNECT': + form_in = "authority" + if ":" in authority: + host, port = authority.split(":", 1) + else: + host = authority + else: + form_in = "absolute" + # FIXME: verify if path or :host contains what we need + scheme, host, port, _ = utils.parse_url(path) + + if host is None: + host = 'localhost' + if port is None: + port = 80 if scheme == 'http' else 443 + port = int(port) + + data = [] + while self.data_queue.qsize() > 0: + data.append(self.data_queue.get()) + + return HTTPRequest( + form_in, + method, + scheme, + host, + port, + path, + (2, 0), + self.request_headers, + data, + # TODO: timestamp_start=None, + # TODO: timestamp_end=None, + form_out=None, # TODO: (request.form_out if hasattr(request, 'form_out') else None), + ) + + def send_request(self, message): + with self.server_conn.h2.lock: + self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() + self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id + + self.server_conn.h2.safe_send_headers( + self.server_stream_id, + message.headers + ) + self.server_conn.h2.safe_send_body( + self.server_stream_id, + message.body + ) + + def read_response_headers(self): + self.response_arrived.wait() + + status_code = int(self.response_headers.get(':status', 502)) + + return HTTPResponse( + http_version=(2, 0), + status_code=status_code, + reason='', + headers=self.response_headers, + content=None, + # TODO: timestamp_start=response.timestamp_start, + # TODO: timestamp_end=response.timestamp_end, + ) + + def read_response_body(self, request, response): + while True: + try: + yield self.data_queue.get(timeout=1) + except Queue.Empty: + pass + if self.data_finished.is_set(): + while self.data_queue.qsize() > 0: + yield self.data_queue.get() + return + + def send_response_headers(self, response): + self.client_conn.h2.safe_send_headers( + self.client_stream_id, + response.headers + ) + + def send_response_body(self, _response, chunks): + self.client_conn.h2.safe_send_body( + self.client_stream_id, + chunks + ) + + def check_close_connection(self, flow): + # This layer only handles a single stream. + # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream. + return True + + def connect(self): + raise ValueError("CONNECT inside an HTTP2 stream is not supported.") + + def set_server(self, *args, **kwargs): + # do not mess with the server connection - all streams share it. + pass + + def run(self): layer = HttpLayer(self, self.mode) layer() - - # terminate the connection - self.client_conn.send(GoAwayFrame().to_bytes()) - - def handle_unexpected_frame_from_client(self, frame): - if isinstance(frame, WindowUpdateFrame): - # Clients are sending WindowUpdate frames depending on their flow control algorithm. - # Since we cannot predict these frames, and we do not need to respond to them, - # simply accept them, and hide them from the log. - # Ideally we should keep track of our own flow control window and - # stall transmission if the outgoing flow control buffer is full. - return - if isinstance(frame, PriorityFrame): - # Clients are sending Priority frames depending on their implementation. - # The RFC does not clearly state when or which priority preferences should be set. - # Since we cannot predict these frames, and we do not need to respond to them, - # simply accept them, and hide them from the log. - # Ideally we should forward them to the server. - return - if isinstance(frame, GoAwayFrame): - # Client wants to terminate the connection, - # relay it to the server. - self.server_conn.send(frame.to_bytes()) - return - self.log("Unexpected HTTP2 frame from client: %s" % frame.human_readable(), "info") - - def handle_unexpected_frame_from_server(self, frame): - if isinstance(frame, WindowUpdateFrame): - # Servers are sending WindowUpdate frames depending on their flow control algorithm. - # Since we cannot predict these frames, and we do not need to respond to them, - # simply accept them, and hide them from the log. - # Ideally we should keep track of our own flow control window and - # stall transmission if the outgoing flow control buffer is full. - return - if isinstance(frame, GoAwayFrame): - # Server wants to terminate the connection, - # relay it to the client. - self.client_conn.send(frame.to_bytes()) - return - self.log("Unexpected HTTP2 frame from server: %s" % frame.human_readable(), "info") + self.zombie = True class ConnectServerConnection(object): @@ -420,7 +592,7 @@ class HttpLayer(Layer): layer() def send_response_to_client(self, flow): - if not (self.supports_streaming and flow.response.stream): + if not flow.response.stream: # no streaming: # we already received the full response from the server and can # send it to the client straight away. @@ -441,10 +613,7 @@ class HttpLayer(Layer): def get_response_from_server(self, flow): def get_response(): self.send_request(flow.request) - if self.supports_streaming: - flow.response = self.read_response_headers() - else: - flow.response = self.read_response(flow.request) + flow.response = self.read_response_headers() try: get_response() @@ -474,15 +643,14 @@ class HttpLayer(Layer): if flow == Kill: raise Kill() - if self.supports_streaming: - if flow.response.stream: - flow.response.data.content = CONTENT_MISSING - else: - flow.response.data.content = b"".join(self.read_response_body( - flow.request, - flow.response - )) - flow.response.timestamp_end = utils.timestamp() + if flow.response.stream: + flow.response.data.content = CONTENT_MISSING + else: + flow.response.data.content = b"".join(self.read_response_body( + flow.request, + flow.response + )) + flow.response.timestamp_end = utils.timestamp() # no further manipulation of self.server_conn beyond this point # we can safely set it as the final attribute value here. diff --git a/requirements.txt b/requirements.txt index 3832c9538..49f86b9a6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -e git+https://github.com/mitmproxy/netlib.git#egg=netlib -e git+https://github.com/mitmproxy/pathod.git#egg=pathod --e .[dev,examples,contentviews] \ No newline at end of file +-e .[dev,examples,contentviews] diff --git a/setup.py b/setup.py index 29ac4753c..c7686c6eb 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f: # This will break `pip install` on systems with old setuptools versions. deps = { "netlib>=%s, <%s" % (version.MINORVERSION, version.NEXT_MINORVERSION), + "h2>=2.0.0", "tornado>=4.3.0, <4.4", "configargparse>=0.10.0, <0.11", "pyperclip>=1.5.22, <1.6",