From 47781af0508a168a8726f024165ce583c1d5de24 Mon Sep 17 00:00:00 2001 From: Paul Brown Date: Thu, 23 Dec 2021 16:26:44 -0600 Subject: [PATCH] prevent event loop polling on closed redis transports (and causing leak) --- kombu/transport/redis.py | 6 ++++++ t/unit/transport/test_redis.py | 15 +++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 5624658b..b41b0a49 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -1237,6 +1237,12 @@ class Transport(virtual.Transport): def _on_disconnect(connection): if connection._sock: loop.remove(connection._sock) + + # stop polling in the event loop + try: + loop.on_tick.remove(on_poll_start) + except KeyError: + pass cycle._on_connection_disconnect = _on_disconnect def on_poll_start(): diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 13dfc356..13f45385 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -854,6 +854,21 @@ class test_Channel: call(13, transport.on_readable, 13), ]) + def test_register_with_event_loop__on_disconnect__loop_cleanup(self): + """Ensure event loop polling stops on disconnect.""" + transport = self.connection.transport + self.connection._sock = None + transport.cycle = Mock(name='cycle') + transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'} + conn = Mock(name='conn') + conn.client = Mock(name='client', transport_options={}) + loop = Mock(name='loop') + loop.on_tick = set() + redis.Transport.register_with_event_loop(transport, conn, loop) + assert len(loop.on_tick) == 1 + transport.cycle._on_connection_disconnect(self.connection) + assert loop.on_tick == set() + def test_configurable_health_check(self): transport = self.connection.transport transport.cycle = Mock(name='cycle')