mirror of https://github.com/celery/kombu.git
Fixed functional tests (including basic_get for beanstalk)
This commit is contained in:
parent
c41b385428
commit
b7a07ce272
|
@ -10,6 +10,3 @@ class test_beanstalk(transport.TransportCase):
|
|||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
||||
|
||||
def test_basic_get(self):
|
||||
raise SkipTest("beanstalk does not support synchronous access.")
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from nose import SkipTest
|
||||
|
||||
from kombu.tests.test_functional import transport
|
||||
|
||||
|
||||
|
@ -10,6 +8,3 @@ class test_mongodb(transport.TransportCase):
|
|||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
||||
|
||||
#def test_basic_get(self):
|
||||
# raise SkipTest("beanstalk does not support synchronous access.")
|
||||
|
|
|
@ -6,4 +6,5 @@ class test_redis(transport.TransportCase):
|
|||
prefix = "redis"
|
||||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
||||
client = connection.channel().client
|
||||
client.info()
|
||||
|
|
|
@ -32,33 +32,47 @@ class TransportCase(unittest.TestCase):
|
|||
transport = None
|
||||
prefix = None
|
||||
event_loop_max = 100
|
||||
connection_options = {}
|
||||
|
||||
def purge(self, names):
|
||||
chan = self.connection.channel()
|
||||
map(chan.queue_purge, names)
|
||||
connected = False
|
||||
skip_test_reason = None
|
||||
|
||||
def do_connect(self):
|
||||
self.connection = BrokerConnection(transport=self.transport)
|
||||
try:
|
||||
self.connection.connect()
|
||||
self.after_connect(self.connection)
|
||||
except self.connection.connection_errors:
|
||||
self.connected = False
|
||||
else:
|
||||
self.connected = True
|
||||
def before_connect(self):
|
||||
pass
|
||||
|
||||
def after_connect(self, connection):
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
if self.transport:
|
||||
self.do_connect()
|
||||
try:
|
||||
self.before_connect()
|
||||
except SkipTest, exc:
|
||||
self.skip_test_reason = str(exc)
|
||||
else:
|
||||
self.do_connect()
|
||||
self.exchange = Exchange(self.prefix, "direct")
|
||||
self.queue = Queue(self.prefix, self.exchange, self.prefix)
|
||||
|
||||
def purge(self, names):
|
||||
chan = self.connection.channel()
|
||||
map(chan.queue_purge, names)
|
||||
|
||||
def do_connect(self):
|
||||
self.connection = BrokerConnection(transport=self.transport,
|
||||
**self.connection_options)
|
||||
try:
|
||||
self.connection.connect()
|
||||
self.after_connect(self.connection)
|
||||
except self.connection.connection_errors:
|
||||
self.skip_test_reason = "%s transport can't connect" % (
|
||||
self.transport, )
|
||||
else:
|
||||
self.connected = True
|
||||
|
||||
def verify_alive(self):
|
||||
if not self.connected:
|
||||
raise SkipTest("%s not running." % self.transport)
|
||||
raise SkipTest(self.skip_test_reason)
|
||||
|
||||
def test_produce__consume(self):
|
||||
if not self.transport:
|
||||
|
@ -118,13 +132,13 @@ class TransportCase(unittest.TestCase):
|
|||
self.verify_alive()
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
producer.publish({"basic.get": "this"}, routing_key="basic_get")
|
||||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
queue = Queue(self.P("basic_get"), self.exchange, "basic_get")
|
||||
queue = queue(chan2)
|
||||
queue.declare()
|
||||
producer.publish({"basic.get": "this"}, routing_key="basic_get")
|
||||
chan1.close()
|
||||
|
||||
for i in range(self.event_loop_max):
|
||||
m = queue.get()
|
||||
if m:
|
||||
|
|
Loading…
Reference in New Issue