rq/tests/test_rq.py

52 lines
1.3 KiB
Python
Raw Normal View History

2011-11-14 11:10:59 +00:00
import unittest
from blinker import signal
from redis import Redis
from rq import conn, Queue
2011-11-14 11:10:59 +00:00
class RQTestCase(unittest.TestCase):
def setUp(self):
super(RQTestCase, self).setUp()
# Set up connection to Redis
testconn = Redis()
conn.push(testconn)
# Flush beforewards (we like our hygiene)
conn.flushdb()
2011-11-14 11:10:59 +00:00
signal('setup').send(self)
# Store the connection (for sanity checking)
self.testconn = testconn
2011-11-14 11:10:59 +00:00
def tearDown(self):
signal('teardown').send(self)
# Flush afterwards
conn.flushdb()
# Pop the connection to Redis
testconn = conn.pop()
assert testconn == self.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'
2011-11-14 11:10:59 +00:00
super(RQTestCase, self).tearDown()
class TestQueue(RQTestCase):
def test_create_queue(self):
"""Creating queues."""
q = Queue('my-queue')
self.assertEquals(q.name, 'my-queue')
def test_queue_empty(self):
"""Detecting empty queues."""
q = Queue('my-queue')
self.assertEquals(q.empty, True)
conn.rpush('rq:my-queue', 'some val')
self.assertEquals(q.empty, False)
2011-11-14 11:10:59 +00:00
if __name__ == '__main__':
unittest.main()