diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 9000c1b1..952ec8cd 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -496,7 +496,7 @@ class test_Channel(Case): c.parse_response = Mock() self.channel._poll_error('BRPOP') - c.parse_response.assert_called_with('BRPOP') + c.parse_response.assert_called_with(c.connection, 'BRPOP') c.parse_response.side_effect = KeyError('foo') self.assertIsNone(self.channel._poll_error('BRPOP')) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index b4d613b4..64fc865d 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -529,9 +529,25 @@ class Channel(virtual.Channel): def _poll_error(self, type, **options): try: - self.client.parse_response(type) + # This might error out if the (redis)client.connection is already disconnected + self.client.parse_response(self.client.connection, type) except self.connection_errors: - pass + warn("Connection poll error on command type {}".format(type), exc_info=True) + except AttributeError as ex: + # This is definitely hacky, but currently fixes a problem where Redis connection has already been closed + # when we call parse_response. Hoping for suggestions on the best way to handle this. + if not "object has no attribute 'readline'" in ex.message: + raise + # Maybe we can get more information on the root cause from this logging. + warn("Unable to read from channel {} of type {}. Err: {}".format(self, type, ex.message)) + # Disconnect so that we will automatically reconnect in the next polling cycle + # Someone may want to verify this and determine if there's a better way + # Used _brpop_read and close methods as examples to deduce what should work + for attr in 'client', 'subclient': + try: + self.__dict__[attr].connection.disconnect() + except (KeyError, AttributeError, self.ResponseError): + pass def _get(self, queue): with self.conn_or_acquire() as client: