diff --git a/rq/job.py b/rq/job.py index 884b5051..b645010f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -50,11 +50,16 @@ class Job(object): # Job construction @classmethod - def create(cls, func, *args, **kwargs): + def create(cls, func, args=None, kwargs=None, connection=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ - connection = kwargs.pop('connection', None) + if args is None: + args = () + if kwargs is None: + kwargs = {} + assert isinstance(args, tuple), '%r is not a valid args list.' % (args,) + assert isinstance(kwargs, dict), '%r is not a valid kwargs dict.' % (kwargs,) job = cls(connection=connection) if inspect.ismethod(func): job._instance = func.im_self diff --git a/rq/queue.py b/rq/queue.py index bc56ce73..e87333fe 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -107,25 +107,43 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) + def enqueue_call(self, func, args, kwargs, **options): + """Creates a job to represent the delayed function call and enqueues + it. + + It is much like `.enqueue()`, except that it takes the function's args + and kwargs as explicit arguments. Any kwargs passed to this function + contain options for RQ itself. + """ + timeout = options.get('timeout', self._default_timeout) + job = Job.create(func, args, kwargs, connection=self.connection) + return self.enqueue_job(job, timeout=timeout) + def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues it. Expects the function to call, along with the arguments and keyword - arguments. May be a fully qualified string of the function instance, - in which case the function must be meaningful to the worker. + arguments. - The special keyword `timeout` is reserved for `enqueue()` itself and - it won't be passed to the actual job function. + The function argument `f` may be any of the following: + + * A reference to a function + * A reference to an object's instance method + * A string, representing the location of a function (must be + meaningful to the import context of the workers) """ if not isinstance(f, basestring) and f.__module__ == '__main__': raise ValueError( 'Functions from the __main__ module cannot be processed ' 'by workers.') - timeout = kwargs.pop('timeout', self._default_timeout) - job = Job.create(f, *args, connection=self.connection, **kwargs) - return self.enqueue_job(job, timeout=timeout) + options = {} + try: + options['timeout'] = kwargs.pop('timeout') + except KeyError: + pass + return self.enqueue_call(func=f, args=args, kwargs=kwargs, **options) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. diff --git a/tests/test_job.py b/tests/test_job.py index 111c3f32..f35ca764 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -30,7 +30,7 @@ class TestJob(RQTestCase): def test_create_typical_job(self): """Creation of jobs for function calls.""" - job = Job.create(some_calculation, 3, 4, z=2) + job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) # Jobs have a random UUID self.assertIsNotNone(job.id) @@ -51,7 +51,7 @@ class TestJob(RQTestCase): def test_create_instance_method_job(self): """Creation of jobs for instance methods.""" c = Calculator(2) - job = Job.create(c.calculate, 3, 4) + job = Job.create(func=c.calculate, args=(3, 4)) # Job data is set self.assertEquals(job.func, c.calculate) @@ -60,7 +60,7 @@ class TestJob(RQTestCase): def test_create_job_from_string_function(self): """Creation of jobs using string specifier.""" - job = Job.create('tests.fixtures.say_hello', 'World') + job = Job.create(func='tests.fixtures.say_hello', args=('World',)) # Job data is set self.assertEquals(job.func, say_hello) @@ -69,7 +69,7 @@ class TestJob(RQTestCase): def test_save(self): # noqa """Storing jobs.""" - job = Job.create(some_calculation, 3, 4, z=2) + job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) # Saving creates a Redis hash self.assertEquals(self.testconn.exists(job.key), False) @@ -116,7 +116,7 @@ class TestJob(RQTestCase): def test_persistence_of_typical_jobs(self): """Storing typical jobs.""" - job = Job.create(some_calculation, 3, 4, z=2) + job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() expected_date = strip_milliseconds(job.created_at) @@ -132,7 +132,7 @@ class TestJob(RQTestCase): def test_store_then_fetch(self): """Store, then fetch.""" - job = Job.create(some_calculation, 3, 4, z=2) + job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() job2 = Job.fetch(job.id) @@ -151,7 +151,7 @@ class TestJob(RQTestCase): def test_fetching_unreadable_data(self): """Fetching fails on unreadable data.""" # Set up - job = Job.create(some_calculation, 3, 4, z=2) + job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() # Just replace the data hkey with some random noise @@ -161,7 +161,7 @@ class TestJob(RQTestCase): def test_job_is_unimportable(self): """Jobs that cannot be imported throw exception on access.""" - job = Job.create(say_hello, 'Lionel') + job = Job.create(func=say_hello, args=('Lionel',)) job.save() # Now slightly modify the job to make it unimportable (this is @@ -181,7 +181,7 @@ class TestJob(RQTestCase): - Saved in Redis when job.save() is called - Attached back to job instance when job.refresh() is called """ - job = Job.create(say_hello, 'Lionel') + job = Job.create(func=say_hello, args=('Lionel',)) job.foo = 'bar' job.save() self.assertEqual(self.testconn.hget(job.key, 'foo'), 'bar') diff --git a/tests/test_queue.py b/tests/test_queue.py index fb6b1fca..f667bdda 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -86,7 +86,7 @@ class TestQueue(RQTestCase): def test_enqueue_sets_metadata(self): """Enqueueing job onto queues modifies meta data.""" q = Queue() - job = Job.create(say_hello, 'Nick', foo='bar') + job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar')) # Preconditions self.assertIsNone(job.origin) @@ -209,7 +209,7 @@ class TestQueue(RQTestCase): class TestFailedQueue(RQTestCase): def test_requeue_job(self): """Requeueing existing jobs.""" - job = Job.create(div_by_zero, 1, 2, 3) + job = Job.create(func=div_by_zero, args=(1, 2, 3)) job.origin = 'fake' job.save() get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa @@ -233,7 +233,7 @@ class TestFailedQueue(RQTestCase): def test_quarantine_preserves_timeout(self): """Quarantine preserves job timeout.""" - job = Job.create(div_by_zero, 1, 2, 3) + job = Job.create(func=div_by_zero, args=(1, 2, 3)) job.origin = 'fake' job.timeout = 200 job.save() @@ -243,7 +243,7 @@ class TestFailedQueue(RQTestCase): def test_requeueing_preserves_timeout(self): """Requeueing preserves job timeout.""" - job = Job.create(div_by_zero, 1, 2, 3) + job = Job.create(func=div_by_zero, args=(1, 2, 3)) job.origin = 'fake' job.timeout = 200 job.save() diff --git a/tests/test_worker.py b/tests/test_worker.py index 9cd8aa26..6bfbe99f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -45,7 +45,7 @@ class TestWorker(RQTestCase): # NOTE: We have to fake this enqueueing for this test case. # What we're simulating here is a call to a function that is not # importable from the worker process. - job = Job.create(div_by_zero, 3) + job = Job.create(func=div_by_zero, args=(3,)) job.save() data = self.testconn.hget(job.key, 'data') invalid_data = data.replace('div_by_zero', 'nonexisting_job')