http2: fix self-depended streams
This commit is contained in:
parent
7a5b21556b
commit
6da166d762
|
@ -206,37 +206,46 @@ class Http2Layer(base.Layer):
|
||||||
self.streams[event.pushed_stream_id].request_data_finished.set()
|
self.streams[event.pushed_stream_id].request_data_finished.set()
|
||||||
self.streams[event.pushed_stream_id].start()
|
self.streams[event.pushed_stream_id].start()
|
||||||
elif isinstance(event, events.PriorityUpdated):
|
elif isinstance(event, events.PriorityUpdated):
|
||||||
|
mapped_stream_id = event.stream_id
|
||||||
|
if mapped_stream_id in self.streams and self.streams[mapped_stream_id].server_stream_id:
|
||||||
|
# if the stream is already up and running and was sent to the server
|
||||||
|
# use the mapped server stream id to update priority information
|
||||||
|
mapped_stream_id = self.streams[mapped_stream_id].server_stream_id
|
||||||
|
|
||||||
if eid in self.streams:
|
if eid in self.streams:
|
||||||
if self.streams[eid].handled_priority_event is event:
|
if self.streams[eid].handled_priority_event is event:
|
||||||
# This event was already handled during stream creation
|
# this event was already handled during stream creation
|
||||||
# HeadersFrame + Priority information as RequestReceived
|
# HeadersFrame + Priority information as RequestReceived
|
||||||
return True
|
return True
|
||||||
if eid in self.streams:
|
self.streams[eid].priority_weight = event.weight
|
||||||
self.streams[eid].priority_weight = event.weight
|
self.streams[eid].priority_depends_on = event.depends_on
|
||||||
self.streams[eid].priority_depends_on = event.depends_on
|
self.streams[eid].priority_exclusive = event.exclusive
|
||||||
self.streams[eid].priority_exclusive = event.exclusive
|
|
||||||
|
|
||||||
stream_id = event.stream_id
|
|
||||||
if stream_id in self.streams.keys() and self.streams[stream_id].server_stream_id:
|
|
||||||
stream_id = self.streams[stream_id].server_stream_id
|
|
||||||
|
|
||||||
depends_on = event.depends_on
|
|
||||||
if depends_on in self.streams.keys() and self.streams[depends_on].server_stream_id:
|
|
||||||
depends_on = self.streams[depends_on].server_stream_id
|
|
||||||
|
|
||||||
with self.server_conn.h2.lock:
|
with self.server_conn.h2.lock:
|
||||||
self.server_conn.h2.prioritize(
|
self.server_conn.h2.prioritize(
|
||||||
stream_id,
|
mapped_stream_id,
|
||||||
weight=event.weight,
|
weight=event.weight,
|
||||||
depends_on=depends_on,
|
depends_on=self._map_depends_on_stream_id(mapped_stream_id, event.depends_on),
|
||||||
exclusive=event.exclusive
|
exclusive=event.exclusive
|
||||||
)
|
)
|
||||||
self.server_conn.send(self.server_conn.h2.data_to_send())
|
self.server_conn.send(self.server_conn.h2.data_to_send())
|
||||||
elif isinstance(event, events.TrailersReceived):
|
elif isinstance(event, events.TrailersReceived):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError("TrailersReceived not implemented")
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _map_depends_on_stream_id(self, stream_id, depends_on):
|
||||||
|
mapped_depends_on = depends_on
|
||||||
|
if mapped_depends_on in self.streams and self.streams[mapped_depends_on].server_stream_id:
|
||||||
|
# if the depends-on-stream is already up and running and was sent to the server
|
||||||
|
# use the mapped server stream id to update priority information
|
||||||
|
mapped_depends_on = self.streams[mapped_depends_on].server_stream_id
|
||||||
|
if stream_id == mapped_depends_on:
|
||||||
|
# looks like one of the streams wasn't opened yet
|
||||||
|
# prevent self-dependent streams which result in ProtocolError
|
||||||
|
mapped_depends_on += 2
|
||||||
|
return mapped_depends_on
|
||||||
|
|
||||||
def _cleanup_streams(self):
|
def _cleanup_streams(self):
|
||||||
death_time = time.time() - 10
|
death_time = time.time() - 10
|
||||||
for stream_id in self.streams.keys():
|
for stream_id in self.streams.keys():
|
||||||
|
@ -412,8 +421,6 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
|
||||||
headers.insert(0, ":path", message.path)
|
headers.insert(0, ":path", message.path)
|
||||||
headers.insert(0, ":method", message.method)
|
headers.insert(0, ":method", message.method)
|
||||||
headers.insert(0, ":scheme", message.scheme)
|
headers.insert(0, ":scheme", message.scheme)
|
||||||
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
|
|
||||||
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.server_conn.h2.safe_send_headers(
|
self.server_conn.h2.safe_send_headers(
|
||||||
|
@ -422,7 +429,7 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread)
|
||||||
headers,
|
headers,
|
||||||
end_stream=self.no_body,
|
end_stream=self.no_body,
|
||||||
priority_weight=self.priority_weight,
|
priority_weight=self.priority_weight,
|
||||||
priority_depends_on=self.priority_depends_on,
|
priority_depends_on=self._map_depends_on_stream_id(self.server_stream_id, self.priority_depends_on),
|
||||||
priority_exclusive=self.priority_exclusive,
|
priority_exclusive=self.priority_exclusive,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
Loading…
Reference in New Issue