diff --git a/funtests/transport.py b/funtests/transport.py index 8b067557..76a0e010 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -171,14 +171,14 @@ class TransportCase(unittest.TestCase): for i in xrange(n)] digests = [] chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) producer = chan1.Producer(self.exchange) for i, message in enumerate(messages): producer.publish({"text": message, "i": i}, routing_key=self.prefix) digests.append(self._digest(message)) - consumer = chan1.Consumer(self.queue) - self.purge_consumer(consumer) received = [(msg["i"], msg["text"]) for msg in consumeN(self.connection, consumer, n)] self.assertEqual(len(received), n) diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 99fa651f..ec2f1ca9 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -112,20 +112,6 @@ class Connection(amqp.Connection): # pragma: no cover if prev != timeout: sock.settimeout(prev) - def is_alive(self): - sock = self.transport.sock - prev = sock.gettimeout() - sock.settimeout(0.0001) - try: - sock.recv(1, socket.MSG_PEEK) - except socket.timeout: - pass - except socket.error: - return False - finally: - sock.settimeout(prev) - return True - def _wait_multiple(self, channel_ids, allowed_methods, timeout=None): for channel_id in channel_ids: method_queue = self.channels[channel_id].method_queue @@ -273,7 +259,18 @@ class Transport(base.Transport): connection.close() def is_alive(self, connection): - return connection.is_alive() + sock = connection.transport.sock + prev = sock.gettimeout() + sock.settimeout(0.0001) + try: + sock.recv(1, socket.MSG_PEEK) + except socket.timeout: + pass + except socket.error: + return False + finally: + sock.settimeout(prev) + return True def verify_connection(self, connection): return connection.channels is not None and self.is_alive(connection)