diff --git a/libmproxy/flow.py b/libmproxy/flow.py index b6b490222..5b99427a1 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -706,6 +706,13 @@ class FlowMaster(controller.Master): self.process_new_request(f) return f + def handle_responseheaders(self, r): + f = self.state.add_response(r) + if f: + self.run_script_hook("responseheaders", f) + r.reply() + return f + def handle_response(self, r): f = self.state.add_response(r) if f: diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py index 8a3210785..242443ec2 100644 --- a/libmproxy/protocol/http.py +++ b/libmproxy/protocol/http.py @@ -632,24 +632,21 @@ class HTTPResponse(HTTPMessage): return 'HTTP/%s.%s %s %s' % \ (self.httpversion[0], self.httpversion[1], self.code, self.msg) - def _assemble_headers(self): + def _assemble_headers(self, preserve_transfer_encoding=False): headers = self.headers.copy() - utils.del_all( - headers, - [ - 'Proxy-Connection', - 'Transfer-Encoding' - ] - ) + utils.del_all(headers,['Proxy-Connection']) + if not preserve_transfer_encoding: + utils.del_all(headers,['Transfer-Encoding']) + if self.content: headers["Content-Length"] = [str(len(self.content))] - elif 'Transfer-Encoding' in self.headers: # add content-length for chuncked transfer-encoding with no content + elif not preserve_transfer_encoding and 'Transfer-Encoding' in self.headers: # add content-length for chuncked transfer-encoding with no content headers["Content-Length"] = ["0"] return str(headers) - def _assemble_head(self): - return '%s\r\n%s\r\n' % (self._assemble_first_line(), self._assemble_headers()) + def _assemble_head(self, preserve_transfer_encoding=False): + return '%s\r\n%s\r\n' % (self._assemble_first_line(), self._assemble_headers(preserve_transfer_encoding=preserve_transfer_encoding)) def _assemble(self): """ @@ -865,15 +862,16 @@ class HTTPHandler(ProtocolHandler, TemporaryServerChangeMixin): pass self.c.close = True - def get_response_from_server(self, request): + def get_response_from_server(self, request, stream=False): self.c.establish_server_connection() request_raw = request._assemble() for i in range(2): try: self.c.server_conn.send(request_raw) - return HTTPResponse.from_stream(self.c.server_conn.rfile, request.method, - body_size_limit=self.c.config.body_size_limit) + res = HTTPResponse.from_stream(self.c.server_conn.rfile, request.method, + body_size_limit=self.c.config.body_size_limit, include_content=(not stream)) + return res except (tcp.NetLibDisconnect, http.HttpErrorConnClosed), v: self.c.log("error in server communication: %s" % str(v), level="debug") if i < 1: @@ -892,6 +890,8 @@ class HTTPHandler(ProtocolHandler, TemporaryServerChangeMixin): def handle_flow(self): flow = HTTPFlow(self.c.client_conn, self.c.server_conn, self.change_server) + flow.stream_expecting_body = False + flow.stream = False try: req = HTTPRequest.from_stream(self.c.client_conn.rfile, body_size_limit=self.c.config.body_size_limit) @@ -915,7 +915,23 @@ class HTTPHandler(ProtocolHandler, TemporaryServerChangeMixin): if isinstance(request_reply, HTTPResponse): flow.response = request_reply else: - flow.response = self.get_response_from_server(flow.request) + + # read initially in "stream" mode, so we can get the headers separately + flow.response = self.get_response_from_server(flow.request,stream=True) + + if flow.response.content == None: + flow.stream_expecting_body = True + flow.response.content = "" # set this to empty string or other things get really confused, + # flow.stream_expecting_body now contains the state info of whether or not + # body still remains to be read + + # call the appropriate script hook - this is an opportunity for an inline script to set flow.stream = True + responseheaders_reply = self.c.channel.ask("responseheaders", flow.response) + # hm - do we need to do something with responseheaders_reply?? + + # now get the rest of the request body, if body still needs to be read but not streaming this response + if flow.stream_expecting_body and not flow.stream: + flow.response.content = http.read_http_body(self.c.server_conn.rfile, flow.response.headers, self.c.config.body_size_limit, False) flow.server_conn = self.c.server_conn # no further manipulation of self.c.server_conn beyond this point # we can safely set it as the final attribute value here. @@ -925,10 +941,50 @@ class HTTPHandler(ProtocolHandler, TemporaryServerChangeMixin): if response_reply is None or response_reply == KILL: return False - self.c.client_conn.send(flow.response._assemble()) + disconnected_while_streaming = False + + if not flow.stream or not flow.stream_expecting_body: + # if not streaming or there is no body to be read, we'll already have the body, just send it + self.c.client_conn.send(flow.response._assemble()) + else: + + # if streaming, we still need to read the body and stream its bits back to the client + + # start with head + h = flow.response._assemble_head(preserve_transfer_encoding=True) + self.c.client_conn.send(h) + + # if chunked then we send back each chunk + if http.has_chunked_encoding(flow.response.headers): + while 1: + content = http.read_next_chunk(self.c.server_conn.rfile, flow.response.headers, False) + if not http.write_chunk(self.c.client_conn.wfile, content): + break + self.c.client_conn.wfile.flush() + self.c.client_conn.wfile.flush() + + else: # not chunked, we send back 4k at a time + clen = http.expected_http_body_size(flow.response.headers, False) + clen = clen if clen >= 0 else (64 * 1024 * 1024 * 1024) # arbitrary max of 64G if no length set + rcount = 0 + blocksize = 4096 + while 1: + bytes_to_read = min(blocksize, clen - rcount) + if bytes_to_read == 0: + break + content = self.c.server_conn.rfile.read(bytes_to_read) + if content == "": # check for EOF + disconnected_while_streaming = True + break + rcount += len(content) + self.c.client_conn.wfile.write(content) + self.c.client_conn.wfile.flush() + if rcount >= clen: # check for having read up to clen + break + flow.timestamp_end = utils.timestamp() - if (http.connection_close(flow.request.httpversion, flow.request.headers) or + if (disconnected_while_streaming or http.connection_close(flow.request.httpversion, flow.request.headers) or http.connection_close(flow.response.httpversion, flow.response.headers)): return False @@ -938,6 +994,7 @@ class HTTPHandler(ProtocolHandler, TemporaryServerChangeMixin): # If the user has changed the target server on this connection, # restore the original target server self.restore_server() + return True except (HttpAuthenticationError, http.HttpError, proxy.ProxyError, tcp.NetLibError), e: self.handle_error(e, flow) diff --git a/test/scripts/all.py b/test/scripts/all.py index 7d30d7571..15a5fc026 100644 --- a/test/scripts/all.py +++ b/test/scripts/all.py @@ -15,6 +15,10 @@ def response(ctx, r): ctx.log("XRESPONSE") log.append("response") +def responseheaders(ctx, r): + ctx.log("XRESPONSEHEADERS") + log.append("responseheaders") + def clientdisconnect(ctx, cc): ctx.log("XCLIENTDISCONNECT") log.append("clientdisconnect") diff --git a/test/test_server.py b/test/test_server.py index e1da31256..795a749ff 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -383,6 +383,64 @@ class TestRedirectRequest(tservers.HTTPProxTest): assert r3.content == r2.content == r1.content # Make sure that we actually use the same connection in this test case +class MasterStreamRequest(tservers.TestMaster): + """ + Enables the stream flag on the flow for all requests + """ + def handle_responseheaders(self, r): + f = self.state.add_response(r) + f.stream = True + r.reply() + return f + +class TestStreamRequest(tservers.HTTPProxTest): + masterclass = MasterStreamRequest + + def test_stream_simple(self): + p = self.pathoc() + + # a request with 100k of data but without content-length + self.server.clear_log() + r1 = p.request("get:'%s/p/200:r:b@100k:d102400'"%self.server.urlbase) + assert r1.status_code == 200 + assert len(r1.content) > 100000 + assert self.server.last_log() + + def test_stream_multiple(self): + p = self.pathoc() + + # simple request with streaming turned on + self.server.clear_log() + r1 = p.request("get:'%s/p/200'"%self.server.urlbase) + assert r1.status_code == 200 + assert self.server.last_log() + + # now send back 100k of data, streamed but not chunked + self.server.clear_log() + r1 = p.request("get:'%s/p/200:b@100k'"%self.server.urlbase) + assert r1.status_code == 200 + assert self.server.last_log() + + def test_stream_chunked(self): + + connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + connection.connect(("127.0.0.1", self.proxy.port)) + fconn = connection.makefile() + spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n7\\r\\nisatest\\r\\n0\\r\\n\\r\\n"' + connection.send("GET %s/p/%s HTTP/1.1\r\n"%(self.server.urlbase, spec)) + connection.send("\r\n"); + + httpversion, code, msg, headers, content = http.read_response(fconn, "GET", 100000, include_body=False) + + assert headers["Transfer-Encoding"][0] == 'chunked' + assert code == 200 + + assert http.read_next_chunk(fconn, headers, False) == "this" + assert http.read_next_chunk(fconn, headers, False) == "isatest" + assert http.read_next_chunk(fconn, headers, False) == None + + connection.close() + class MasterFakeResponse(tservers.TestMaster): def handle_request(self, m):