diff --git a/grpclib/protocol.py b/grpclib/protocol.py index 356f77f..91973f3 100644 --- a/grpclib/protocol.py +++ b/grpclib/protocol.py @@ -301,10 +301,12 @@ class Stream: await self._connection.write_ready.wait() window = self._h2_connection.local_flow_control_window(self.id) - if not window: + if not window > 0: self.__window_updated__.clear() await self.__window_updated__.wait() - window = self._h2_connection.local_flow_control_window(self.id) + # during "await" above other streams were able to send data and + # decrease current window size, so try from the beginning + continue max_frame_size = self._h2_connection.max_outbound_frame_size f_chunk = f.read(min(window, max_frame_size, f_last - f_pos)) diff --git a/tests/test_protocol.py b/tests/test_protocol.py index ff42631..32287eb 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -215,7 +215,7 @@ async def test_initial_window_size_update(loop): assert client_h2c.local_flow_control_window(client_stream.id) == 1 await asyncio.wait([send_task], timeout=0.01) - assert send_task.done() + assert send_task.result() is None @pytest.mark.asyncio @@ -337,3 +337,55 @@ async def test_released_stream_data_ack(loop): to_server_transport.process(server_proc) # client-side flow control window will increase to initial value eventually assert client_h2c.outbound_flow_control_window > 1 + + +@pytest.mark.asyncio +async def test_negative_window_size(loop): + client_h2c, server_h2c = create_connections() + + to_client_transport = TransportStub(client_h2c) + server_conn = Connection(server_h2c, to_client_transport, loop=loop) + + to_server_transport = TransportStub(server_h2c) + client_conn = Connection(client_h2c, to_server_transport, loop=loop) + + client_proc = EventsProcessor(DummyHandler(), client_conn) + client_stream = client_conn.create_stream() + + await client_stream.send_request(create_headers(), _processor=client_proc) + + # data should be bigger than window size + initial_window = server_h2c.local_settings.initial_window_size + data = b'0' * (initial_window + 1) + + assert (client_h2c.local_flow_control_window(client_stream.id) + == initial_window) + + # send_data should wait until settings/window updated + send_task = loop.create_task(client_stream.send_data(data)) + await asyncio.wait([send_task], timeout=0.01) + + assert client_h2c.local_flow_control_window(client_stream.id) == 0 + + # updating settings, this should decrease connection window size + server_h2c.update_settings({ + SettingCodes.INITIAL_WINDOW_SIZE: initial_window - 1 + }) + server_conn.flush() + to_client_transport.process(client_proc) + + # window is negative and client should wait + assert client_h2c.local_flow_control_window(client_stream.id) == -1 + await asyncio.wait([send_task], timeout=0.01) + assert not send_task.done() + + # now `send_data` should succeed + server_h2c.increment_flow_control_window(2, stream_id=client_stream.id) + # INITIAL_WINDOW_SIZE not changes connection's window size, so it should be + # incremented only by 1 + server_h2c.increment_flow_control_window(1, stream_id=None) + server_conn.flush() + to_client_transport.process(client_proc) + assert client_h2c.local_flow_control_window(client_stream.id) == 1 + await asyncio.wait([send_task], timeout=0.01) + assert send_task.result() is None