From 4d281948f2cb8df985bfda72881bf9c1394f15a6 Mon Sep 17 00:00:00 2001 From: huyenvu2101 Date: Mon, 22 Apr 2024 12:40:20 +0700 Subject: [PATCH] Fix: Fanout exchange messages mixed across virtual databases in Redis sentinel --- kombu/transport/redis.py | 2 ++ t/unit/transport/test_redis.py | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 912b0fe0..89cabda6 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -1433,6 +1433,8 @@ class SentinelChannel(Channel): ).connection_pool def _get_pool(self, asynchronous=False): + params = self._connparams(asynchronous=asynchronous) + self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db']) return self._sentinel_managed_pool(asynchronous) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index b471546b..1bc81e0e 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -1634,6 +1634,18 @@ class test_RedisSentinel: connection.channel() p.assert_called() + def test_keyprefix_fanout(self): + from kombu.transport.redis import SentinelChannel + with patch.object(SentinelChannel, '_sentinel_managed_pool'): + connection = Connection( + 'sentinel://localhost:65532/1', + transport_options={ + 'master_name': 'not_important', + }, + ) + channel = connection.channel() + assert channel.keyprefix_fanout == '/1.' + def test_getting_master_from_sentinel(self): with patch('redis.sentinel.Sentinel') as patched: connection = Connection(