From 61fe421806a495186e261bbca79f4023673c9a97 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 3 Feb 2012 18:03:11 +0000 Subject: [PATCH] Use MSG_PEEK to verify amqplib connection --- funtests/transport.py | 4 ++-- kombu/transport/amqplib.py | 22 +++++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/funtests/transport.py b/funtests/transport.py index 76a0e010..8b067557 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -171,14 +171,14 @@ class TransportCase(unittest.TestCase): for i in xrange(n)] digests = [] chan1 = self.connection.channel() - consumer = chan1.Consumer(self.queue) - self.purge_consumer(consumer) producer = chan1.Producer(self.exchange) for i, message in enumerate(messages): producer.publish({"text": message, "i": i}, routing_key=self.prefix) digests.append(self._digest(message)) + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) received = [(msg["i"], msg["text"]) for msg in consumeN(self.connection, consumer, n)] self.assertEqual(len(received), n) diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index e8870549..99fa651f 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -112,6 +112,20 @@ class Connection(amqp.Connection): # pragma: no cover if prev != timeout: sock.settimeout(prev) + def is_alive(self): + sock = self.transport.sock + prev = sock.gettimeout() + sock.settimeout(0.0001) + try: + sock.recv(1, socket.MSG_PEEK) + except socket.timeout: + pass + except socket.error: + return False + finally: + sock.settimeout(prev) + return True + def _wait_multiple(self, channel_ids, allowed_methods, timeout=None): for channel_id in channel_ids: method_queue = self.channels[channel_id].method_queue @@ -259,13 +273,7 @@ class Transport(base.Transport): connection.close() def is_alive(self, connection): - try: - connection.drain_events(timeout=0.0001) - except socket.timeout: - return True - except self.connection_errors: - return False - return True + return connection.is_alive() def verify_connection(self, connection): return connection.channels is not None and self.is_alive(connection)