http2: cleanup

This commit is contained in:
Thomas Kriechbaumer 2016-05-17 22:44:38 -07:00
parent 9a280119d2
commit eeccb2faa0
1 changed files with 25 additions and 22 deletions

View File

@ -5,19 +5,19 @@ import time
from six.moves import queue from six.moves import queue
import traceback import traceback
import h2
import six import six
from h2.connection import H2Connection from h2.connection import H2Connection
from h2.exceptions import StreamClosedError
from h2 import events
from netlib.tcp import ssl_read_select from netlib.tcp import ssl_read_select
from netlib.exceptions import HttpException from netlib.exceptions import HttpException
from netlib.http import Headers from netlib.http import Headers
from netlib.utils import http2_read_raw_frame from netlib.utils import http2_read_raw_frame, parse_url
from .base import Layer from .base import Layer
from .http import _HttpTransmissionLayer, HttpLayer from .http import _HttpTransmissionLayer, HttpLayer
from ..exceptions import ProtocolException, Http2ProtocolException from ..exceptions import ProtocolException, Http2ProtocolException
from .. import utils
from ..models import HTTPRequest, HTTPResponse from ..models import HTTPRequest, HTTPResponse
@ -44,7 +44,7 @@ class SafeH2Connection(H2Connection):
with self.lock: with self.lock:
try: try:
self.reset_stream(stream_id, error_code) self.reset_stream(stream_id, error_code)
except h2.exceptions.StreamClosedError: # pragma: no cover except StreamClosedError: # pragma: no cover
# stream is already closed - good # stream is already closed - good
pass pass
self.conn.send(self.data_to_send()) self.conn.send(self.data_to_send())
@ -109,10 +109,10 @@ class Http2Layer(Layer):
raise Http2ProtocolException("HTTP2 layer should already have a connection.") raise Http2ProtocolException("HTTP2 layer should already have a connection.")
def set_server(self): # pragma: no cover def set_server(self): # pragma: no cover
raise NotImplementedError("Cannot change server for HTTP2 connections.") raise Http2ProtocolException("Cannot change server for HTTP2 connections.")
def disconnect(self): # pragma: no cover def disconnect(self): # pragma: no cover
raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.") raise Http2ProtocolException("Cannot dis- or reconnect in HTTP2 connections.")
def next_layer(self): # pragma: no cover def next_layer(self): # pragma: no cover
# WebSockets over HTTP/2? # WebSockets over HTTP/2?
@ -132,27 +132,27 @@ class Http2Layer(Layer):
else: else:
eid = event.stream_id eid = event.stream_id
if isinstance(event, h2.events.RequestReceived): if isinstance(event, events.RequestReceived):
headers = Headers([[k, v] for k, v in event.headers]) headers = Headers([[k, v] for k, v in event.headers])
self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
self.streams[eid].timestamp_start = time.time() self.streams[eid].timestamp_start = time.time()
self.streams[eid].start() self.streams[eid].start()
elif isinstance(event, h2.events.ResponseReceived): elif isinstance(event, events.ResponseReceived):
headers = Headers([[k, v] for k, v in event.headers]) headers = Headers([[k, v] for k, v in event.headers])
self.streams[eid].queued_data_length = 0 self.streams[eid].queued_data_length = 0
self.streams[eid].timestamp_start = time.time() self.streams[eid].timestamp_start = time.time()
self.streams[eid].response_headers = headers self.streams[eid].response_headers = headers
self.streams[eid].response_arrived.set() self.streams[eid].response_arrived.set()
elif isinstance(event, h2.events.DataReceived): elif isinstance(event, events.DataReceived):
if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit: if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit:
raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit)) raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit))
self.streams[eid].data_queue.put(event.data) self.streams[eid].data_queue.put(event.data)
self.streams[eid].queued_data_length += len(event.data) self.streams[eid].queued_data_length += len(event.data)
source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length) source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length)
elif isinstance(event, h2.events.StreamEnded): elif isinstance(event, events.StreamEnded):
self.streams[eid].timestamp_end = time.time() self.streams[eid].timestamp_end = time.time()
self.streams[eid].data_finished.set() self.streams[eid].data_finished.set()
elif isinstance(event, h2.events.StreamReset): elif isinstance(event, events.StreamReset):
self.streams[eid].zombie = time.time() self.streams[eid].zombie = time.time()
if eid in self.streams and event.error_code == 0x8: if eid in self.streams and event.error_code == 0x8:
if is_server: if is_server:
@ -161,14 +161,14 @@ class Http2Layer(Layer):
other_stream_id = self.streams[eid].server_stream_id other_stream_id = self.streams[eid].server_stream_id
if other_stream_id is not None: if other_stream_id is not None:
other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
elif isinstance(event, h2.events.RemoteSettingsChanged): elif isinstance(event, events.RemoteSettingsChanged):
new_settings = dict([(id, cs.new_value) for (id, cs) in six.iteritems(event.changed_settings)]) new_settings = dict([(id, cs.new_value) for (id, cs) in six.iteritems(event.changed_settings)])
other_conn.h2.safe_update_settings(new_settings) other_conn.h2.safe_update_settings(new_settings)
elif isinstance(event, h2.events.ConnectionTerminated): elif isinstance(event, events.ConnectionTerminated):
# Do not immediately terminate the other connection. # Do not immediately terminate the other connection.
# Some streams might be still sending data to the client. # Some streams might be still sending data to the client.
return False return False
elif isinstance(event, h2.events.PushedStreamReceived): elif isinstance(event, events.PushedStreamReceived):
# pushed stream ids should be uniq and not dependent on race conditions # pushed stream ids should be uniq and not dependent on race conditions
# only the parent stream id must be looked up first # only the parent stream id must be looked up first
parent_eid = self.server_to_client_stream_ids[event.parent_stream_id] parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
@ -184,7 +184,7 @@ class Http2Layer(Layer):
self.streams[event.pushed_stream_id].timestamp_end = time.time() self.streams[event.pushed_stream_id].timestamp_end = time.time()
self.streams[event.pushed_stream_id].request_data_finished.set() self.streams[event.pushed_stream_id].request_data_finished.set()
self.streams[event.pushed_stream_id].start() self.streams[event.pushed_stream_id].start()
elif isinstance(event, h2.events.TrailersReceived): elif isinstance(event, events.TrailersReceived):
raise NotImplementedError() raise NotImplementedError()
return True return True
@ -222,10 +222,10 @@ class Http2Layer(Layer):
stream.zombie = time.time() stream.zombie = time.time()
return return
events = source_conn.h2.receive_data(raw_frame) incoming_events = source_conn.h2.receive_data(raw_frame)
source_conn.send(source_conn.h2.data_to_send()) source_conn.send(source_conn.h2.data_to_send())
for event in events: for event in incoming_events:
if not self._handle_event(event, source_conn, other_conn, is_server): if not self._handle_event(event, source_conn, other_conn, is_server):
return return
@ -275,10 +275,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
@queued_data_length.setter @queued_data_length.setter
def queued_data_length(self, v): def queued_data_length(self, v):
if self.response_arrived.is_set(): self.request_queued_data_length = v
return self.response_queued_data_length
else:
return self.request_queued_data_length
def is_zombie(self): def is_zombie(self):
return self.zombie is not None return self.zombie is not None
@ -300,7 +297,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
else: # pragma: no cover else: # pragma: no cover
first_line_format = "absolute" first_line_format = "absolute"
# FIXME: verify if path or :host contains what we need # FIXME: verify if path or :host contains what we need
scheme, host, port, _ = utils.parse_url(path) scheme, host, port, _ = parse_url(path)
if authority: if authority:
host, _, port = authority.partition(':') host, _, port = authority.partition(':')
@ -330,6 +327,9 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
timestamp_end=self.timestamp_end, timestamp_end=self.timestamp_end,
) )
def read_request_body(self, request): # pragma: no cover
raise NotImplementedError()
def send_request(self, message): def send_request(self, message):
if self.pushed: if self.pushed:
# nothing to do here # nothing to do here
@ -412,6 +412,9 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
pass pass
def run(self): def run(self):
self()
def __call__(self):
layer = HttpLayer(self, self.mode) layer = HttpLayer(self, self.mode)
try: try: