mirror of https://github.com/celery/kombu.git
Added common base class for transport functional tests
This commit is contained in:
parent
caf0069cda
commit
917eb5c3bd
|
@ -1,114 +1,6 @@
|
|||
import socket
|
||||
import time
|
||||
import unittest2 as unittest
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
from kombu import BrokerConnection
|
||||
from kombu import Producer, Consumer, Exchange, Queue
|
||||
from kombu.tests.test_functional import transport
|
||||
|
||||
|
||||
def consumeN(conn, consumer, n=1):
|
||||
messages = []
|
||||
|
||||
def callback(message_data, message):
|
||||
messages.append(message_data)
|
||||
message.ack()
|
||||
|
||||
prev, consumer.callbacks = consumer.callbacks, [callback]
|
||||
consumer.consume()
|
||||
|
||||
while True:
|
||||
conn.drain_events(timeout=1)
|
||||
if len(messages) >= n:
|
||||
break
|
||||
|
||||
consumer.cancel()
|
||||
consumer.callback = prev
|
||||
return messages
|
||||
|
||||
|
||||
class test_amqplib(unittest.TestCase):
|
||||
|
||||
def purge(self, names):
|
||||
chan = self.connection.channel()
|
||||
map(chan.queue_purge, names)
|
||||
|
||||
def setUp(self):
|
||||
self.connection = BrokerConnection(transport="amqplib")
|
||||
try:
|
||||
self.connection.connect()
|
||||
except socket.error:
|
||||
self.connected = False
|
||||
else:
|
||||
self.connected = True
|
||||
self.exchange = Exchange("tamqplib", "direct")
|
||||
self.queue = Queue("tamqplib", self.exchange, "tamqplib")
|
||||
|
||||
def test_produce__consume(self):
|
||||
if not self.connected:
|
||||
raise SkipTest("Broker not running.")
|
||||
chan1 = self.connection.channel()
|
||||
consumer = Consumer(chan1, self.queue)
|
||||
consumer.queues[0].purge()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
producer.publish({"foo": "bar"}, routing_key="tamqplib")
|
||||
message = consumeN(self.connection, consumer)
|
||||
self.assertDictEqual(message[0], {"foo": "bar"})
|
||||
chan1.close()
|
||||
self.purge([self.queue.name])
|
||||
|
||||
def test_produce__consume_multiple(self):
|
||||
if not self.connected:
|
||||
raise SkipTest("Broker not running.")
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
b1 = Queue("pyamqplib.b1", self.exchange, "b1")(chan1)
|
||||
b2 = Queue("pyamqplib.b2", self.exchange, "b2")(chan1)
|
||||
b3 = Queue("pyamqplib.b3", self.exchange, "b3")(chan1)
|
||||
[q.declare() for q in (b1, b2, b3)]
|
||||
[q.purge() for q in (b1, b2, b3)]
|
||||
|
||||
producer.publish("b1", routing_key="b1")
|
||||
producer.publish("b2", routing_key="b2")
|
||||
producer.publish("b3", routing_key="b3")
|
||||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
consumer = Consumer(chan2, [b1, b2, b3])
|
||||
messages = consumeN(self.connection, consumer, 3)
|
||||
self.assertItemsEqual(messages, ["b1", "b2", "b3"])
|
||||
chan2.close()
|
||||
self.purge(["pyamqplib.b1", "pyamqplib.b2", "pyamqplib.b3"])
|
||||
|
||||
def test_timeout(self):
|
||||
if not self.connected:
|
||||
raise SkipTest("Broker not running.")
|
||||
chan = self.connection.channel()
|
||||
self.purge([self.queue.name])
|
||||
consumer = Consumer(chan, self.queue)
|
||||
self.assertRaises(socket.timeout, self.connection.drain_events,
|
||||
timeout=0.3)
|
||||
consumer.cancel()
|
||||
|
||||
def test_basic_get(self):
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
producer.publish({"basic.get": "this"}, routing_key="basic_get")
|
||||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
queue = Queue("amqplib_basic_get", self.exchange, "basic_get")
|
||||
queue = queue(chan2)
|
||||
queue.declare()
|
||||
for i in range(50):
|
||||
m = queue.get()
|
||||
if m:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(m.payload, {"basic.get": "this"})
|
||||
chan2.close()
|
||||
|
||||
def tearDown(self):
|
||||
if self.connected:
|
||||
self.connection.close()
|
||||
class test_amqplib(transport.TransportCase):
|
||||
transport = "amqplib"
|
||||
prefix = "amqplib"
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
import socket
|
||||
import time
|
||||
import unittest2 as unittest
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
from kombu import BrokerConnection
|
||||
from kombu import Producer, Consumer, Exchange, Queue
|
||||
|
||||
|
||||
def consumeN(conn, consumer, n=1):
|
||||
messages = []
|
||||
|
||||
def callback(message_data, message):
|
||||
messages.append(message_data)
|
||||
message.ack()
|
||||
|
||||
prev, consumer.callbacks = consumer.callbacks, [callback]
|
||||
consumer.consume()
|
||||
|
||||
while True:
|
||||
conn.drain_events(timeout=1)
|
||||
if len(messages) >= n:
|
||||
break
|
||||
|
||||
consumer.cancel()
|
||||
consumer.callback = prev
|
||||
return messages
|
||||
|
||||
|
||||
class TransportCase(unittest.TestCase):
|
||||
transport = None
|
||||
prefix = None
|
||||
event_loop_max = 100
|
||||
|
||||
def purge(self, names):
|
||||
chan = self.connection.channel()
|
||||
map(chan.queue_purge, names)
|
||||
|
||||
def do_connect(self):
|
||||
self.connection = BrokerConnection(transport=self.transport)
|
||||
try:
|
||||
self.connection.connect()
|
||||
self.after_connect(self.connection)
|
||||
except self.connection.connection_errors:
|
||||
self.connected = False
|
||||
else:
|
||||
self.connected = True
|
||||
|
||||
def after_connect(self, connection):
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
if self.transport:
|
||||
self.do_connect()
|
||||
self.exchange = Exchange(self.prefix, "direct")
|
||||
self.queue = Queue(self.prefix, self.exchange, self.prefix)
|
||||
|
||||
def verify_alive(self):
|
||||
if not self.connected:
|
||||
raise SkipTest("%s not running." % self.transport)
|
||||
|
||||
def test_produce__consume(self):
|
||||
if not self.transport:
|
||||
return
|
||||
self.verify_alive()
|
||||
chan1 = self.connection.channel()
|
||||
consumer = Consumer(chan1, self.queue)
|
||||
consumer.queues[0].purge()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
producer.publish({"foo": "bar"}, routing_key=self.prefix)
|
||||
message = consumeN(self.connection, consumer)
|
||||
self.assertDictEqual(message[0], {"foo": "bar"})
|
||||
chan1.close()
|
||||
self.purge([self.queue.name])
|
||||
|
||||
def P(self, rest):
|
||||
return "%s.%s" % (self.prefix, rest)
|
||||
|
||||
def test_produce__consume_multiple(self):
|
||||
if not self.transport:
|
||||
return
|
||||
self.verify_alive()
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
b1 = Queue(self.P("b1"), self.exchange, "b1")(chan1)
|
||||
b2 = Queue(self.P("b2"), self.exchange, "b2")(chan1)
|
||||
b3 = Queue(self.P("b3"), self.exchange, "b3")(chan1)
|
||||
[q.declare() for q in (b1, b2, b3)]
|
||||
[q.purge() for q in (b1, b2, b3)]
|
||||
|
||||
producer.publish("b1", routing_key="b1")
|
||||
producer.publish("b2", routing_key="b2")
|
||||
producer.publish("b3", routing_key="b3")
|
||||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
consumer = Consumer(chan2, [b1, b2, b3])
|
||||
messages = consumeN(self.connection, consumer, 3)
|
||||
self.assertItemsEqual(messages, ["b1", "b2", "b3"])
|
||||
chan2.close()
|
||||
self.purge([self.P("b1"), self.P("b2"), self.P("b3")])
|
||||
|
||||
def test_timeout(self):
|
||||
if not self.transport:
|
||||
return
|
||||
self.verify_alive()
|
||||
chan = self.connection.channel()
|
||||
self.purge([self.queue.name])
|
||||
consumer = Consumer(chan, self.queue)
|
||||
self.assertRaises(socket.timeout, self.connection.drain_events,
|
||||
timeout=0.3)
|
||||
consumer.cancel()
|
||||
|
||||
def test_basic_get(self):
|
||||
if not self.transport:
|
||||
return
|
||||
self.verify_alive()
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
producer.publish({"basic.get": "this"}, routing_key="basic_get")
|
||||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
queue = Queue(self.P("basic_get"), self.exchange, "basic_get")
|
||||
queue = queue(chan2)
|
||||
queue.declare()
|
||||
for i in range(self.event_loop_max):
|
||||
m = queue.get()
|
||||
if m:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
self.assertEqual(m.payload, {"basic.get": "this"})
|
||||
chan2.close()
|
||||
|
||||
def tearDown(self):
|
||||
if self.transport and self.connected:
|
||||
self.connection.close()
|
Loading…
Reference in New Issue