From 952fc27b034b4ab7e6e82a19c2894efca8dd4ff8 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 12 Nov 2010 13:30:30 +0100 Subject: [PATCH] Functional tests for Pika transport, also asyncore drain_events now properly raises socket.timeout on timeout --- funtests/setup.cfg | 4 ++ funtests/tests/__init__.py | 9 +++ funtests/tests/disabled_pika.py | 113 -------------------------------- funtests/tests/test_pika.py | 11 ++++ funtests/transport.py | 5 +- kombu/transport/pypika.py | 30 +++++++-- 6 files changed, 53 insertions(+), 119 deletions(-) create mode 100644 funtests/setup.cfg delete mode 100644 funtests/tests/disabled_pika.py create mode 100644 funtests/tests/test_pika.py diff --git a/funtests/setup.cfg b/funtests/setup.cfg new file mode 100644 index 00000000..321d1e49 --- /dev/null +++ b/funtests/setup.cfg @@ -0,0 +1,4 @@ +[nosetests] +verbosity = 1 +detailed-errors = 1 +where = tests diff --git a/funtests/tests/__init__.py b/funtests/tests/__init__.py index e69de29b..0300fda8 100644 --- a/funtests/tests/__init__.py +++ b/funtests/tests/__init__.py @@ -0,0 +1,9 @@ +import os +import sys + +print("HELLO") + +sys.path.insert(0, os.path.join(os.getcwd(), os.pardir)) +print(sys.path[0]) +sys.path.insert(0, os.getcwd()) +print(sys.path[0]) diff --git a/funtests/tests/disabled_pika.py b/funtests/tests/disabled_pika.py deleted file mode 100644 index 835dd41f..00000000 --- a/funtests/tests/disabled_pika.py +++ /dev/null @@ -1,113 +0,0 @@ -import socket -import time -import unittest2 as unittest - -from nose import SkipTest - -from kombu import BrokerConnection -from kombu import Producer, Consumer, Exchange, Queue - - -def consumeN(conn, consumer, n=1): - messages = [] - - def callback(message_data, message): - messages.append(message_data) - message.ack() - - prev, consumer.callbacks = consumer.callbacks, [callback] - - while True: - conn.drain_events(timeout=1) - if len(messages) >= n: - break - - consumer.callback = prev - return messages - - -class test_pika(unittest.TestCase): - - def purge(self, names): - chan = self.connection.channel() - map(chan.queue_purge, names) - - def setUp(self): - self.connection = BrokerConnection(transport="pika") - try: - self.connection.connect() - except socket.error: - self.connected = False - else: - self.connected = True - self.exchange = Exchange("tamqplib", "direct") - self.queue = Queue("tamqplib", self.exchange, "tamqplib") - - def test_produce__consume(self): - if not self.connected: - raise SkipTest("Broker not running.") - chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) - - producer.publish({"foo": "bar"}, routing_key="tamqplib") - chan1.close() - - chan2 = self.connection.channel() - consumer = Consumer(chan2, self.queue) - message = consumeN(self.connection, consumer) - self.assertDictEqual(message[0], {"foo": "bar"}) - chan2.close() - self.purge(["tamqplib"]) - - def test_produce__consume_multiple(self): - if not self.connected: - raise SkipTest("Broker not running.") - chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) - b1 = Queue("pyamqplib.b1", self.exchange, "b1") - b2 = Queue("pyamqplib.b2", self.exchange, "b2") - b3 = Queue("pyamqplib.b3", self.exchange, "b3") - - producer.publish("b1", routing_key="b1") - producer.publish("b2", routing_key="b2") - producer.publish("b3", routing_key="b3") - chan1.close() - - chan2 = self.connection.channel() - consumer = Consumer(chan2, [b1, b2, b3]) - messages = consumeN(self.connection, consumer, 3) - self.assertItemsEqual(messages, ["b1", "b2", "b3"]) - chan2.close() - self.purge(["pyamqplib.b1", "pyamqplib.b2", "pyamqplib.b3"]) - - def test_timeout(self): - if not self.connected: - raise SkipTest("Broker not running.") - chan = self.connection.channel() - self.purge([self.queue.name]) - consumer = Consumer(chan, self.queue) - self.assertRaises(socket.timeout, self.connection.drain_events, - timeout=0.3) - consumer.cancel() - - def test_basic_get(self): - chan1 = self.connection.channel() - producer = Producer(chan1, self.exchange) - producer.publish({"basic.get": "this"}, routing_key="basic_get") - chan1.close() - - chan2 = self.connection.channel() - queue = Queue("amqplib_basic_get", self.exchange, "basic_get") - queue = queue(chan2) - queue.declare() - for i in range(50): - m = queue.get() - if m: - break - time.sleep(0.1) - self.assertEqual(m.payload, {"basic.get": "this"}) - chan2.close() - - def tearDown(self): - if self.connected: - self.connection.close() diff --git a/funtests/tests/test_pika.py b/funtests/tests/test_pika.py new file mode 100644 index 00000000..02e51303 --- /dev/null +++ b/funtests/tests/test_pika.py @@ -0,0 +1,11 @@ +from funtests import transport + + +class test_pika_blocking(transport.TransportCase): + transport = "syncpika" + prefix = "syncpika" + + +class test_pika_async(transport.TransportCase): + transport = "pika" + prefix = "pika" diff --git a/funtests/transport.py b/funtests/transport.py index 14b647b4..6c1d79b4 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -19,7 +19,10 @@ def consumeN(conn, consumer, n=1): consumer.consume() while True: - conn.drain_events(timeout=1) + try: + conn.drain_events(timeout=1) + except socket.timeout: + pass if len(messages) >= n: break diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py index 30daf813..71ad54d4 100644 --- a/kombu/transport/pypika.py +++ b/kombu/transport/pypika.py @@ -8,6 +8,8 @@ Pika transport. :license: BSD, see LICENSE for more details. """ +import socket + from pika import asyncore_adapter from pika import blocking_adapter from pika import channel @@ -30,9 +32,10 @@ class Message(base.Message): "content_type": header.content_type, "content_encoding": header.content_encoding, "delivery_info": dict( - consumer_tag=method.consumer_tag, + consumer_tag=getattr(method, "consumer_tag", None), routing_key=method.routing_key, delivery_tag=method.delivery_tag, + redelivered=method.redelivered, exchange=method.exchange)}) super(Message, self).__init__(channel, **kwargs) @@ -42,10 +45,10 @@ class Channel(channel.Channel): Message = Message def basic_get(self, queue, no_ack): - m = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack) - if isinstance(m, Basic.GetEmpty): + method = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack) + if isinstance(method, Basic.GetEmpty): return - return m + return None, method, method._properties, method._body def queue_purge(self, queue=None, nowait=False): return channel.Channel.queue_purge(self, queue=queue, nowait=nowait) \ @@ -105,12 +108,29 @@ class BlockingConnection(blocking_adapter.BlockingConnection): def channel(self): return Channel(channel.ChannelHandler(self)) + def ensure_drain_events(self, timeout=None): + return self.drain_events(timeout=timeout) class AsyncoreConnection(asyncore_adapter.AsyncoreConnection): + _event_counter = 0 + Super = asyncore_adapter.AsyncoreConnection def channel(self): return Channel(channel.ChannelHandler(self)) + def ensure_drain_events(self, timeout=None): + # asyncore connection does not raise socket.timeout when timing out + # so need to do a little trick here to mimic the behavior + # of sync connection. + current_events = self._event_counter + self.drain_events(timeout=timeout) + if self._event_counter <= current_events: + raise socket.timeout("timed out") + + def on_data_available(self, buf): + self._event_counter += 1 + self.Super.on_data_available(self, buf) + class SyncTransport(base.Transport): default_port = DEFAULT_PORT @@ -141,7 +161,7 @@ class SyncTransport(base.Transport): return connection.channel() def drain_events(self, connection, **kwargs): - return connection.drain_events(**kwargs) + return connection.ensure_drain_events(**kwargs) def establish_connection(self): """Establish connection to the AMQP broker."""