mirror of https://github.com/celery/kombu.git
Functional tests for Pika transport, also asyncore drain_events now properly raises socket.timeout on timeout
This commit is contained in:
parent
ff2f590186
commit
952fc27b03
|
@ -0,0 +1,4 @@
|
||||||
|
[nosetests]
|
||||||
|
verbosity = 1
|
||||||
|
detailed-errors = 1
|
||||||
|
where = tests
|
|
@ -0,0 +1,9 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
print("HELLO")
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
|
||||||
|
print(sys.path[0])
|
||||||
|
sys.path.insert(0, os.getcwd())
|
||||||
|
print(sys.path[0])
|
|
@ -1,113 +0,0 @@
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
import unittest2 as unittest
|
|
||||||
|
|
||||||
from nose import SkipTest
|
|
||||||
|
|
||||||
from kombu import BrokerConnection
|
|
||||||
from kombu import Producer, Consumer, Exchange, Queue
|
|
||||||
|
|
||||||
|
|
||||||
def consumeN(conn, consumer, n=1):
|
|
||||||
messages = []
|
|
||||||
|
|
||||||
def callback(message_data, message):
|
|
||||||
messages.append(message_data)
|
|
||||||
message.ack()
|
|
||||||
|
|
||||||
prev, consumer.callbacks = consumer.callbacks, [callback]
|
|
||||||
|
|
||||||
while True:
|
|
||||||
conn.drain_events(timeout=1)
|
|
||||||
if len(messages) >= n:
|
|
||||||
break
|
|
||||||
|
|
||||||
consumer.callback = prev
|
|
||||||
return messages
|
|
||||||
|
|
||||||
|
|
||||||
class test_pika(unittest.TestCase):
|
|
||||||
|
|
||||||
def purge(self, names):
|
|
||||||
chan = self.connection.channel()
|
|
||||||
map(chan.queue_purge, names)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.connection = BrokerConnection(transport="pika")
|
|
||||||
try:
|
|
||||||
self.connection.connect()
|
|
||||||
except socket.error:
|
|
||||||
self.connected = False
|
|
||||||
else:
|
|
||||||
self.connected = True
|
|
||||||
self.exchange = Exchange("tamqplib", "direct")
|
|
||||||
self.queue = Queue("tamqplib", self.exchange, "tamqplib")
|
|
||||||
|
|
||||||
def test_produce__consume(self):
|
|
||||||
if not self.connected:
|
|
||||||
raise SkipTest("Broker not running.")
|
|
||||||
chan1 = self.connection.channel()
|
|
||||||
producer = Producer(chan1, self.exchange)
|
|
||||||
|
|
||||||
producer.publish({"foo": "bar"}, routing_key="tamqplib")
|
|
||||||
chan1.close()
|
|
||||||
|
|
||||||
chan2 = self.connection.channel()
|
|
||||||
consumer = Consumer(chan2, self.queue)
|
|
||||||
message = consumeN(self.connection, consumer)
|
|
||||||
self.assertDictEqual(message[0], {"foo": "bar"})
|
|
||||||
chan2.close()
|
|
||||||
self.purge(["tamqplib"])
|
|
||||||
|
|
||||||
def test_produce__consume_multiple(self):
|
|
||||||
if not self.connected:
|
|
||||||
raise SkipTest("Broker not running.")
|
|
||||||
chan1 = self.connection.channel()
|
|
||||||
producer = Producer(chan1, self.exchange)
|
|
||||||
b1 = Queue("pyamqplib.b1", self.exchange, "b1")
|
|
||||||
b2 = Queue("pyamqplib.b2", self.exchange, "b2")
|
|
||||||
b3 = Queue("pyamqplib.b3", self.exchange, "b3")
|
|
||||||
|
|
||||||
producer.publish("b1", routing_key="b1")
|
|
||||||
producer.publish("b2", routing_key="b2")
|
|
||||||
producer.publish("b3", routing_key="b3")
|
|
||||||
chan1.close()
|
|
||||||
|
|
||||||
chan2 = self.connection.channel()
|
|
||||||
consumer = Consumer(chan2, [b1, b2, b3])
|
|
||||||
messages = consumeN(self.connection, consumer, 3)
|
|
||||||
self.assertItemsEqual(messages, ["b1", "b2", "b3"])
|
|
||||||
chan2.close()
|
|
||||||
self.purge(["pyamqplib.b1", "pyamqplib.b2", "pyamqplib.b3"])
|
|
||||||
|
|
||||||
def test_timeout(self):
|
|
||||||
if not self.connected:
|
|
||||||
raise SkipTest("Broker not running.")
|
|
||||||
chan = self.connection.channel()
|
|
||||||
self.purge([self.queue.name])
|
|
||||||
consumer = Consumer(chan, self.queue)
|
|
||||||
self.assertRaises(socket.timeout, self.connection.drain_events,
|
|
||||||
timeout=0.3)
|
|
||||||
consumer.cancel()
|
|
||||||
|
|
||||||
def test_basic_get(self):
|
|
||||||
chan1 = self.connection.channel()
|
|
||||||
producer = Producer(chan1, self.exchange)
|
|
||||||
producer.publish({"basic.get": "this"}, routing_key="basic_get")
|
|
||||||
chan1.close()
|
|
||||||
|
|
||||||
chan2 = self.connection.channel()
|
|
||||||
queue = Queue("amqplib_basic_get", self.exchange, "basic_get")
|
|
||||||
queue = queue(chan2)
|
|
||||||
queue.declare()
|
|
||||||
for i in range(50):
|
|
||||||
m = queue.get()
|
|
||||||
if m:
|
|
||||||
break
|
|
||||||
time.sleep(0.1)
|
|
||||||
self.assertEqual(m.payload, {"basic.get": "this"})
|
|
||||||
chan2.close()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
if self.connected:
|
|
||||||
self.connection.close()
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
from funtests import transport
|
||||||
|
|
||||||
|
|
||||||
|
class test_pika_blocking(transport.TransportCase):
|
||||||
|
transport = "syncpika"
|
||||||
|
prefix = "syncpika"
|
||||||
|
|
||||||
|
|
||||||
|
class test_pika_async(transport.TransportCase):
|
||||||
|
transport = "pika"
|
||||||
|
prefix = "pika"
|
|
@ -19,7 +19,10 @@ def consumeN(conn, consumer, n=1):
|
||||||
consumer.consume()
|
consumer.consume()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
conn.drain_events(timeout=1)
|
try:
|
||||||
|
conn.drain_events(timeout=1)
|
||||||
|
except socket.timeout:
|
||||||
|
pass
|
||||||
if len(messages) >= n:
|
if len(messages) >= n:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,8 @@ Pika transport.
|
||||||
:license: BSD, see LICENSE for more details.
|
:license: BSD, see LICENSE for more details.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
import socket
|
||||||
|
|
||||||
from pika import asyncore_adapter
|
from pika import asyncore_adapter
|
||||||
from pika import blocking_adapter
|
from pika import blocking_adapter
|
||||||
from pika import channel
|
from pika import channel
|
||||||
|
@ -30,9 +32,10 @@ class Message(base.Message):
|
||||||
"content_type": header.content_type,
|
"content_type": header.content_type,
|
||||||
"content_encoding": header.content_encoding,
|
"content_encoding": header.content_encoding,
|
||||||
"delivery_info": dict(
|
"delivery_info": dict(
|
||||||
consumer_tag=method.consumer_tag,
|
consumer_tag=getattr(method, "consumer_tag", None),
|
||||||
routing_key=method.routing_key,
|
routing_key=method.routing_key,
|
||||||
delivery_tag=method.delivery_tag,
|
delivery_tag=method.delivery_tag,
|
||||||
|
redelivered=method.redelivered,
|
||||||
exchange=method.exchange)})
|
exchange=method.exchange)})
|
||||||
|
|
||||||
super(Message, self).__init__(channel, **kwargs)
|
super(Message, self).__init__(channel, **kwargs)
|
||||||
|
@ -42,10 +45,10 @@ class Channel(channel.Channel):
|
||||||
Message = Message
|
Message = Message
|
||||||
|
|
||||||
def basic_get(self, queue, no_ack):
|
def basic_get(self, queue, no_ack):
|
||||||
m = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack)
|
method = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack)
|
||||||
if isinstance(m, Basic.GetEmpty):
|
if isinstance(method, Basic.GetEmpty):
|
||||||
return
|
return
|
||||||
return m
|
return None, method, method._properties, method._body
|
||||||
|
|
||||||
def queue_purge(self, queue=None, nowait=False):
|
def queue_purge(self, queue=None, nowait=False):
|
||||||
return channel.Channel.queue_purge(self, queue=queue, nowait=nowait) \
|
return channel.Channel.queue_purge(self, queue=queue, nowait=nowait) \
|
||||||
|
@ -105,12 +108,29 @@ class BlockingConnection(blocking_adapter.BlockingConnection):
|
||||||
def channel(self):
|
def channel(self):
|
||||||
return Channel(channel.ChannelHandler(self))
|
return Channel(channel.ChannelHandler(self))
|
||||||
|
|
||||||
|
def ensure_drain_events(self, timeout=None):
|
||||||
|
return self.drain_events(timeout=timeout)
|
||||||
|
|
||||||
class AsyncoreConnection(asyncore_adapter.AsyncoreConnection):
|
class AsyncoreConnection(asyncore_adapter.AsyncoreConnection):
|
||||||
|
_event_counter = 0
|
||||||
|
Super = asyncore_adapter.AsyncoreConnection
|
||||||
|
|
||||||
def channel(self):
|
def channel(self):
|
||||||
return Channel(channel.ChannelHandler(self))
|
return Channel(channel.ChannelHandler(self))
|
||||||
|
|
||||||
|
def ensure_drain_events(self, timeout=None):
|
||||||
|
# asyncore connection does not raise socket.timeout when timing out
|
||||||
|
# so need to do a little trick here to mimic the behavior
|
||||||
|
# of sync connection.
|
||||||
|
current_events = self._event_counter
|
||||||
|
self.drain_events(timeout=timeout)
|
||||||
|
if self._event_counter <= current_events:
|
||||||
|
raise socket.timeout("timed out")
|
||||||
|
|
||||||
|
def on_data_available(self, buf):
|
||||||
|
self._event_counter += 1
|
||||||
|
self.Super.on_data_available(self, buf)
|
||||||
|
|
||||||
|
|
||||||
class SyncTransport(base.Transport):
|
class SyncTransport(base.Transport):
|
||||||
default_port = DEFAULT_PORT
|
default_port = DEFAULT_PORT
|
||||||
|
@ -141,7 +161,7 @@ class SyncTransport(base.Transport):
|
||||||
return connection.channel()
|
return connection.channel()
|
||||||
|
|
||||||
def drain_events(self, connection, **kwargs):
|
def drain_events(self, connection, **kwargs):
|
||||||
return connection.drain_events(**kwargs)
|
return connection.ensure_drain_events(**kwargs)
|
||||||
|
|
||||||
def establish_connection(self):
|
def establish_connection(self):
|
||||||
"""Establish connection to the AMQP broker."""
|
"""Establish connection to the AMQP broker."""
|
||||||
|
|
Loading…
Reference in New Issue