2012-01-28 06:58:40 +00:00
|
|
|
from tests import RQTestCase
|
2012-08-22 10:21:22 +00:00
|
|
|
from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation
|
2012-03-21 14:24:32 +00:00
|
|
|
from rq import Queue, get_failed_queue
|
2012-08-27 07:40:59 +00:00
|
|
|
from rq.job import Job, Status
|
2012-02-15 15:59:27 +00:00
|
|
|
from rq.exceptions import InvalidJobOperationError
|
2011-11-15 07:13:16 +00:00
|
|
|
|
|
|
|
|
2011-11-14 13:18:21 +00:00
|
|
|
class TestQueue(RQTestCase):
|
|
|
|
def test_create_queue(self):
|
|
|
|
"""Creating queues."""
|
|
|
|
q = Queue('my-queue')
|
|
|
|
self.assertEquals(q.name, 'my-queue')
|
|
|
|
|
2011-11-15 20:15:51 +00:00
|
|
|
def test_create_default_queue(self):
|
|
|
|
"""Instantiating the default queue."""
|
|
|
|
q = Queue()
|
|
|
|
self.assertEquals(q.name, 'default')
|
|
|
|
|
2011-11-16 11:45:16 +00:00
|
|
|
|
2012-02-14 21:55:39 +00:00
|
|
|
def test_equality(self): # noqa
|
2011-11-16 11:45:16 +00:00
|
|
|
"""Mathematical equality of queues."""
|
|
|
|
q1 = Queue('foo')
|
|
|
|
q2 = Queue('foo')
|
|
|
|
q3 = Queue('bar')
|
|
|
|
|
|
|
|
self.assertEquals(q1, q2)
|
|
|
|
self.assertEquals(q2, q1)
|
|
|
|
self.assertNotEquals(q1, q3)
|
|
|
|
self.assertNotEquals(q2, q3)
|
|
|
|
|
|
|
|
|
2012-02-14 21:55:39 +00:00
|
|
|
def test_empty_queue(self): # noqa
|
2012-02-14 16:53:09 +00:00
|
|
|
"""Emptying queues."""
|
|
|
|
q = Queue('example')
|
|
|
|
|
|
|
|
self.testconn.rpush('rq:queue:example', 'foo')
|
|
|
|
self.testconn.rpush('rq:queue:example', 'bar')
|
|
|
|
self.assertEquals(q.is_empty(), False)
|
|
|
|
|
|
|
|
q.empty()
|
|
|
|
|
|
|
|
self.assertEquals(q.is_empty(), True)
|
|
|
|
self.assertIsNone(self.testconn.lpop('rq:queue:example'))
|
|
|
|
|
|
|
|
def test_queue_is_empty(self):
|
2011-11-14 13:18:21 +00:00
|
|
|
"""Detecting empty queues."""
|
2012-02-07 23:40:43 +00:00
|
|
|
q = Queue('example')
|
2012-02-07 19:53:06 +00:00
|
|
|
self.assertEquals(q.is_empty(), True)
|
2011-11-14 13:18:21 +00:00
|
|
|
|
2012-02-07 23:40:43 +00:00
|
|
|
self.testconn.rpush('rq:queue:example', 'sentinel message')
|
2012-02-07 19:53:06 +00:00
|
|
|
self.assertEquals(q.is_empty(), False)
|
2011-11-14 11:10:59 +00:00
|
|
|
|
2012-02-14 21:55:51 +00:00
|
|
|
def test_compact(self):
|
|
|
|
"""Compacting queueus."""
|
|
|
|
q = Queue()
|
|
|
|
|
2012-02-15 20:55:06 +00:00
|
|
|
q.enqueue(say_hello, 'Alice')
|
|
|
|
bob = q.enqueue(say_hello, 'Bob')
|
|
|
|
q.enqueue(say_hello, 'Charlie')
|
|
|
|
debrah = q.enqueue(say_hello, 'Debrah')
|
2012-02-14 21:55:51 +00:00
|
|
|
|
|
|
|
bob.cancel()
|
|
|
|
debrah.cancel()
|
|
|
|
|
|
|
|
self.assertEquals(q.count, 4)
|
|
|
|
|
|
|
|
q.compact()
|
|
|
|
|
|
|
|
self.assertEquals(q.count, 2)
|
|
|
|
|
2011-11-15 08:36:32 +00:00
|
|
|
|
2012-02-14 21:55:39 +00:00
|
|
|
def test_enqueue(self): # noqa
|
2012-02-08 13:18:17 +00:00
|
|
|
"""Enqueueing job onto queues."""
|
2012-02-07 23:40:43 +00:00
|
|
|
q = Queue()
|
2012-02-07 19:53:06 +00:00
|
|
|
self.assertEquals(q.is_empty(), True)
|
2011-11-15 07:13:16 +00:00
|
|
|
|
2012-02-15 20:55:06 +00:00
|
|
|
# say_hello spec holds which queue this is sent to
|
|
|
|
job = q.enqueue(say_hello, 'Nick', foo='bar')
|
2012-02-07 23:40:43 +00:00
|
|
|
job_id = job.id
|
|
|
|
|
|
|
|
# Inspect data inside Redis
|
|
|
|
q_key = 'rq:queue:default'
|
|
|
|
self.assertEquals(self.testconn.llen(q_key), 1)
|
|
|
|
self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id)
|
2011-11-15 07:43:06 +00:00
|
|
|
|
2012-02-10 16:19:30 +00:00
|
|
|
def test_enqueue_sets_metadata(self):
|
|
|
|
"""Enqueueing job onto queues modifies meta data."""
|
|
|
|
q = Queue()
|
2012-07-23 09:39:21 +00:00
|
|
|
job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
|
2012-02-10 16:19:30 +00:00
|
|
|
|
|
|
|
# Preconditions
|
|
|
|
self.assertIsNone(job.origin)
|
|
|
|
self.assertIsNone(job.enqueued_at)
|
|
|
|
|
|
|
|
# Action
|
|
|
|
q.enqueue_job(job)
|
|
|
|
|
|
|
|
# Postconditions
|
|
|
|
self.assertEquals(job.origin, q.name)
|
|
|
|
self.assertIsNotNone(job.enqueued_at)
|
|
|
|
|
2012-01-30 18:41:13 +00:00
|
|
|
|
2012-02-14 21:55:39 +00:00
|
|
|
def test_pop_job_id(self): # noqa
|
2012-02-08 13:18:17 +00:00
|
|
|
"""Popping job IDs from queues."""
|
|
|
|
# Set up
|
|
|
|
q = Queue()
|
|
|
|
uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da'
|
|
|
|
q.push_job_id(uuid)
|
|
|
|
|
|
|
|
# Pop it off the queue...
|
|
|
|
self.assertEquals(q.count, 1)
|
|
|
|
self.assertEquals(q.pop_job_id(), uuid)
|
|
|
|
|
|
|
|
# ...and assert the queue count when down
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
|
2011-11-15 08:36:29 +00:00
|
|
|
def test_dequeue(self):
|
2012-02-08 13:18:17 +00:00
|
|
|
"""Dequeueing jobs from queues."""
|
|
|
|
# Set up
|
|
|
|
q = Queue()
|
2012-02-15 20:55:06 +00:00
|
|
|
result = q.enqueue(say_hello, 'Rick', foo='bar')
|
2011-11-15 08:36:29 +00:00
|
|
|
|
2012-02-08 13:18:17 +00:00
|
|
|
# Dequeue a job (not a job ID) off the queue
|
|
|
|
self.assertEquals(q.count, 1)
|
2011-11-16 11:44:33 +00:00
|
|
|
job = q.dequeue()
|
2012-02-08 13:18:17 +00:00
|
|
|
self.assertEquals(job.id, result.id)
|
2012-02-15 20:55:06 +00:00
|
|
|
self.assertEquals(job.func, say_hello)
|
2012-02-08 16:55:38 +00:00
|
|
|
self.assertEquals(job.origin, q.name)
|
2011-11-16 11:44:33 +00:00
|
|
|
self.assertEquals(job.args[0], 'Rick')
|
|
|
|
self.assertEquals(job.kwargs['foo'], 'bar')
|
|
|
|
|
2012-02-08 13:18:17 +00:00
|
|
|
# ...and assert the queue count when down
|
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
|
2012-07-17 10:48:41 +00:00
|
|
|
def test_dequeue_instance_method(self):
|
|
|
|
"""Dequeueing instance method jobs from queues."""
|
|
|
|
q = Queue()
|
|
|
|
c = Calculator(2)
|
|
|
|
result = q.enqueue(c.calculate, 3, 4)
|
|
|
|
|
|
|
|
job = q.dequeue()
|
|
|
|
# The instance has been pickled and unpickled, so it is now a separate
|
|
|
|
# object. Test for equality using each object's __dict__ instead.
|
|
|
|
self.assertEquals(job.instance.__dict__, c.__dict__)
|
|
|
|
self.assertEquals(job.func.__name__, 'calculate')
|
|
|
|
self.assertEquals(job.args, (3, 4))
|
|
|
|
|
2012-02-08 13:18:17 +00:00
|
|
|
def test_dequeue_ignores_nonexisting_jobs(self):
|
|
|
|
"""Dequeuing silently ignores non-existing jobs."""
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
|
|
|
|
q.push_job_id(uuid)
|
2012-02-08 14:18:24 +00:00
|
|
|
q.push_job_id(uuid)
|
2012-02-15 20:55:06 +00:00
|
|
|
result = q.enqueue(say_hello, 'Nick', foo='bar')
|
2012-02-08 14:18:24 +00:00
|
|
|
q.push_job_id(uuid)
|
2012-02-08 13:18:17 +00:00
|
|
|
|
|
|
|
# Dequeue simply ignores the missing job and returns None
|
2012-02-08 14:18:24 +00:00
|
|
|
self.assertEquals(q.count, 4)
|
|
|
|
self.assertEquals(q.dequeue().id, result.id)
|
|
|
|
self.assertIsNone(q.dequeue())
|
2012-02-08 13:18:17 +00:00
|
|
|
self.assertEquals(q.count, 0)
|
|
|
|
|
2011-11-16 11:44:33 +00:00
|
|
|
def test_dequeue_any(self):
|
|
|
|
"""Fetching work from any given queue."""
|
|
|
|
fooq = Queue('foo')
|
|
|
|
barq = Queue('bar')
|
|
|
|
|
|
|
|
self.assertEquals(Queue.dequeue_any([fooq, barq], False), None)
|
|
|
|
|
|
|
|
# Enqueue a single item
|
2012-02-15 20:55:06 +00:00
|
|
|
barq.enqueue(say_hello)
|
2012-02-08 16:55:38 +00:00
|
|
|
job, queue = Queue.dequeue_any([fooq, barq], False)
|
2012-02-15 20:55:06 +00:00
|
|
|
self.assertEquals(job.func, say_hello)
|
2012-02-08 16:55:38 +00:00
|
|
|
self.assertEquals(queue, barq)
|
2011-11-16 11:44:33 +00:00
|
|
|
|
|
|
|
# Enqueue items on both queues
|
2012-02-15 20:55:06 +00:00
|
|
|
barq.enqueue(say_hello, 'for Bar')
|
|
|
|
fooq.enqueue(say_hello, 'for Foo')
|
2011-11-16 11:44:33 +00:00
|
|
|
|
2012-02-08 16:55:38 +00:00
|
|
|
job, queue = Queue.dequeue_any([fooq, barq], False)
|
|
|
|
self.assertEquals(queue, fooq)
|
2012-02-15 20:55:06 +00:00
|
|
|
self.assertEquals(job.func, say_hello)
|
2012-02-08 16:55:38 +00:00
|
|
|
self.assertEquals(job.origin, fooq.name)
|
2012-02-14 21:55:39 +00:00
|
|
|
self.assertEquals(job.args[0], 'for Foo',
|
|
|
|
'Foo should be dequeued first.')
|
2011-11-16 11:44:33 +00:00
|
|
|
|
2012-02-08 16:55:38 +00:00
|
|
|
job, queue = Queue.dequeue_any([fooq, barq], False)
|
|
|
|
self.assertEquals(queue, barq)
|
2012-02-15 20:55:06 +00:00
|
|
|
self.assertEquals(job.func, say_hello)
|
2012-02-08 16:55:38 +00:00
|
|
|
self.assertEquals(job.origin, barq.name)
|
2012-02-14 21:55:39 +00:00
|
|
|
self.assertEquals(job.args[0], 'for Bar',
|
|
|
|
'Bar should be dequeued second.')
|
2011-11-16 11:44:33 +00:00
|
|
|
|
2012-02-08 13:18:17 +00:00
|
|
|
def test_dequeue_any_ignores_nonexisting_jobs(self):
|
|
|
|
"""Dequeuing (from any queue) silently ignores non-existing jobs."""
|
2012-01-27 14:15:13 +00:00
|
|
|
|
2012-02-08 13:18:17 +00:00
|
|
|
q = Queue('low')
|
|
|
|
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
|
|
|
|
q.push_job_id(uuid)
|
2012-01-27 14:15:13 +00:00
|
|
|
|
2012-02-08 13:18:17 +00:00
|
|
|
# Dequeue simply ignores the missing job and returns None
|
|
|
|
self.assertEquals(q.count, 1)
|
New connection management.
Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.
For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly. The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.
Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.
This is way more clear than it used to be.
Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.
You can push/pop connections now:
>>> my_conn = Redis()
>>> push_connection(my_conn)
>>> q = Queue()
>>> q.connection == my_conn
True
>>> pop_connection() == my_conn
Also, you can stack them syntactically:
>>> conn1 = Redis()
>>> conn2 = Redis('example.org', 1234)
>>> with Connection(conn1):
... q = Queue()
... with Connection(conn2):
... q2 = Queue()
... q3 = Queue()
>>> q.connection == conn1
True
>>> q2.connection == conn2
True
>>> q3.connection == conn1
True
Or, if you only require a single connection to Redis (for most uses):
>>> use_connection(Redis())
2012-03-23 13:33:49 +00:00
|
|
|
self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), # noqa
|
2012-02-14 21:55:39 +00:00
|
|
|
None)
|
2012-02-08 13:18:17 +00:00
|
|
|
self.assertEquals(q.count, 0)
|
2012-02-15 15:59:27 +00:00
|
|
|
|
2012-08-25 10:58:15 +00:00
|
|
|
def test_enqueue_sets_status(self):
|
|
|
|
"""Enqueueing a job sets its status to "queued"."""
|
|
|
|
q = Queue()
|
|
|
|
job = q.enqueue(say_hello)
|
2012-08-27 07:40:59 +00:00
|
|
|
self.assertEqual(job.status, Status.QUEUED)
|
2012-08-25 10:58:15 +00:00
|
|
|
|
2012-02-15 15:59:27 +00:00
|
|
|
|
|
|
|
class TestFailedQueue(RQTestCase):
|
|
|
|
def test_requeue_job(self):
|
|
|
|
"""Requeueing existing jobs."""
|
2012-07-23 09:39:21 +00:00
|
|
|
job = Job.create(func=div_by_zero, args=(1, 2, 3))
|
2012-02-15 15:59:27 +00:00
|
|
|
job.origin = 'fake'
|
|
|
|
job.save()
|
New connection management.
Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.
For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly. The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.
Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.
This is way more clear than it used to be.
Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.
You can push/pop connections now:
>>> my_conn = Redis()
>>> push_connection(my_conn)
>>> q = Queue()
>>> q.connection == my_conn
True
>>> pop_connection() == my_conn
Also, you can stack them syntactically:
>>> conn1 = Redis()
>>> conn2 = Redis('example.org', 1234)
>>> with Connection(conn1):
... q = Queue()
... with Connection(conn2):
... q2 = Queue()
... q3 = Queue()
>>> q.connection == conn1
True
>>> q2.connection == conn2
True
>>> q3.connection == conn1
True
Or, if you only require a single connection to Redis (for most uses):
>>> use_connection(Redis())
2012-03-23 13:33:49 +00:00
|
|
|
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
|
2012-02-15 15:59:27 +00:00
|
|
|
|
New connection management.
Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.
For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly. The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.
Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.
This is way more clear than it used to be.
Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.
You can push/pop connections now:
>>> my_conn = Redis()
>>> push_connection(my_conn)
>>> q = Queue()
>>> q.connection == my_conn
True
>>> pop_connection() == my_conn
Also, you can stack them syntactically:
>>> conn1 = Redis()
>>> conn2 = Redis('example.org', 1234)
>>> with Connection(conn1):
... q = Queue()
... with Connection(conn2):
... q2 = Queue()
... q3 = Queue()
>>> q.connection == conn1
True
>>> q2.connection == conn2
True
>>> q3.connection == conn1
True
Or, if you only require a single connection to Redis (for most uses):
>>> use_connection(Redis())
2012-03-23 13:33:49 +00:00
|
|
|
self.assertItemsEqual(Queue.all(), [get_failed_queue()]) # noqa
|
2012-03-21 14:24:32 +00:00
|
|
|
self.assertEquals(get_failed_queue().count, 1)
|
2012-02-15 15:59:27 +00:00
|
|
|
|
2012-03-21 14:24:32 +00:00
|
|
|
get_failed_queue().requeue(job.id)
|
2012-02-15 15:59:27 +00:00
|
|
|
|
2012-03-21 14:24:32 +00:00
|
|
|
self.assertEquals(get_failed_queue().count, 0)
|
2012-02-15 15:59:27 +00:00
|
|
|
self.assertEquals(Queue('fake').count, 1)
|
|
|
|
|
|
|
|
def test_requeue_nonfailed_job_fails(self):
|
|
|
|
"""Requeueing non-failed jobs raises error."""
|
|
|
|
q = Queue()
|
2012-02-15 20:55:06 +00:00
|
|
|
job = q.enqueue(say_hello, 'Nick', foo='bar')
|
2012-02-15 15:59:27 +00:00
|
|
|
|
|
|
|
# Assert that we cannot requeue a job that's not on the failed queue
|
|
|
|
with self.assertRaises(InvalidJobOperationError):
|
2012-03-21 14:24:32 +00:00
|
|
|
get_failed_queue().requeue(job.id)
|
2012-07-17 06:08:35 +00:00
|
|
|
|
|
|
|
def test_quarantine_preserves_timeout(self):
|
|
|
|
"""Quarantine preserves job timeout."""
|
2012-07-23 09:39:21 +00:00
|
|
|
job = Job.create(func=div_by_zero, args=(1, 2, 3))
|
2012-07-17 06:08:35 +00:00
|
|
|
job.origin = 'fake'
|
|
|
|
job.timeout = 200
|
|
|
|
job.save()
|
|
|
|
get_failed_queue().quarantine(job, Exception('Some fake error'))
|
|
|
|
|
|
|
|
self.assertEquals(job.timeout, 200)
|
2012-07-17 06:41:24 +00:00
|
|
|
|
|
|
|
def test_requeueing_preserves_timeout(self):
|
|
|
|
"""Requeueing preserves job timeout."""
|
2012-07-23 09:39:21 +00:00
|
|
|
job = Job.create(func=div_by_zero, args=(1, 2, 3))
|
2012-07-17 06:41:24 +00:00
|
|
|
job.origin = 'fake'
|
|
|
|
job.timeout = 200
|
|
|
|
job.save()
|
|
|
|
get_failed_queue().quarantine(job, Exception('Some fake error'))
|
|
|
|
get_failed_queue().requeue(job.id)
|
|
|
|
|
|
|
|
job = Job.fetch(job.id)
|
|
|
|
self.assertEquals(job.timeout, 200)
|
2012-08-07 07:34:51 +00:00
|
|
|
|
|
|
|
def test_enqueue_preserves_result_ttl(self):
|
2012-08-07 09:05:50 +00:00
|
|
|
"""Enqueueing persists result_ttl."""
|
2012-08-07 07:34:51 +00:00
|
|
|
q = Queue()
|
|
|
|
job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10)
|
|
|
|
self.assertEqual(job.result_ttl, 10)
|
|
|
|
job_from_queue = Job.fetch(job.id, connection=self.testconn)
|
|
|
|
self.assertEqual(int(job_from_queue.result_ttl), 10)
|
2012-08-22 10:21:22 +00:00
|
|
|
|
|
|
|
def test_async_false(self):
|
|
|
|
"""Executes a job immediately if async=False."""
|
|
|
|
q = Queue(async=False)
|
|
|
|
job = q.enqueue(some_calculation, args=(2, 3))
|
2012-08-27 07:40:59 +00:00
|
|
|
self.assertEqual(job.return_value, 6)
|