mirror of https://github.com/celery/kombu.git
Use MSG_PEEK to verify amqplib connection
This commit is contained in:
parent
403684a857
commit
61fe421806
|
@ -171,14 +171,14 @@ class TransportCase(unittest.TestCase):
|
|||
for i in xrange(n)]
|
||||
digests = []
|
||||
chan1 = self.connection.channel()
|
||||
consumer = chan1.Consumer(self.queue)
|
||||
self.purge_consumer(consumer)
|
||||
producer = chan1.Producer(self.exchange)
|
||||
for i, message in enumerate(messages):
|
||||
producer.publish({"text": message,
|
||||
"i": i}, routing_key=self.prefix)
|
||||
digests.append(self._digest(message))
|
||||
|
||||
consumer = chan1.Consumer(self.queue)
|
||||
self.purge_consumer(consumer)
|
||||
received = [(msg["i"], msg["text"])
|
||||
for msg in consumeN(self.connection, consumer, n)]
|
||||
self.assertEqual(len(received), n)
|
||||
|
|
|
@ -112,6 +112,20 @@ class Connection(amqp.Connection): # pragma: no cover
|
|||
if prev != timeout:
|
||||
sock.settimeout(prev)
|
||||
|
||||
def is_alive(self):
|
||||
sock = self.transport.sock
|
||||
prev = sock.gettimeout()
|
||||
sock.settimeout(0.0001)
|
||||
try:
|
||||
sock.recv(1, socket.MSG_PEEK)
|
||||
except socket.timeout:
|
||||
pass
|
||||
except socket.error:
|
||||
return False
|
||||
finally:
|
||||
sock.settimeout(prev)
|
||||
return True
|
||||
|
||||
def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
|
||||
for channel_id in channel_ids:
|
||||
method_queue = self.channels[channel_id].method_queue
|
||||
|
@ -259,13 +273,7 @@ class Transport(base.Transport):
|
|||
connection.close()
|
||||
|
||||
def is_alive(self, connection):
|
||||
try:
|
||||
connection.drain_events(timeout=0.0001)
|
||||
except socket.timeout:
|
||||
return True
|
||||
except self.connection_errors:
|
||||
return False
|
||||
return True
|
||||
return connection.is_alive()
|
||||
|
||||
def verify_connection(self, connection):
|
||||
return connection.channels is not None and self.is_alive(connection)
|
||||
|
|
Loading…
Reference in New Issue