diff --git a/rq/job.py b/rq/job.py index ebb87c15..3e4fc41c 100644 --- a/rq/job.py +++ b/rq/job.py @@ -794,7 +794,8 @@ class Job: """Set job metadata before execution begins""" self.worker_name = worker_name self.last_heartbeat = utcnow() - self.started_at = self.last_heartbeat + if not self.started_at: # This should be set by the worker + self.started_at = self.last_heartbeat self._status = JobStatus.STARTED mapping = { 'last_heartbeat': utcformat(self.last_heartbeat), diff --git a/rq/worker.py b/rq/worker.py index 46ace250..10c22641 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -791,9 +791,13 @@ class Worker: either executes successfully or the status of the job is set to failed """ - ret_val = None - job.started_at = utcnow() + + with self.connection.pipeline() as pipeline: + heartbeat_ttl = self.get_heartbeat_ttl(job) + self.heartbeat(heartbeat_ttl, pipeline=pipeline) + job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline) + while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): @@ -865,6 +869,7 @@ class Worker: within the given timeout bounds, or will end the work horse with SIGALRM. """ + job.started_at = utcnow() self.set_state(WorkerStatus.BUSY) self.fork_work_horse(job, queue) self.monitor_work_horse(job, queue) @@ -908,9 +913,9 @@ class Worker: self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_working_time(0, pipeline=pipeline) - heartbeat_ttl = self.get_heartbeat_ttl(job) - self.heartbeat(heartbeat_ttl, pipeline=pipeline) - job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline) + # heartbeat_ttl = self.get_heartbeat_ttl(job) + # self.heartbeat(heartbeat_ttl, pipeline=pipeline) + # job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline) job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() @@ -1037,7 +1042,6 @@ class Worker: try: self.prepare_job_execution(job) - job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): rv = job.perform() diff --git a/tests/test_cli.py b/tests/test_cli.py index 72fc5100..a03aaf64 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -449,12 +449,10 @@ class TestRQCli(RQTestCase): prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' suffix = '\'.\n' - print(result.stdout) + self.assertTrue(result.output.startswith(prefix)) + self.assertTrue(result.output.endswith(suffix)) - self.assertTrue(result.stdout.startswith(prefix)) - self.assertTrue(result.stdout.endswith(suffix)) - - job_id = result.stdout[len(prefix):-len(suffix)] + job_id = result.output[len(prefix):-len(suffix)] queue_key = 'rq:queue:default' self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) diff --git a/tests/test_registry.py b/tests/test_registry.py index 6e569db0..e2a7ea43 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from datetime import datetime, timedelta -from rq.serializers import JSONSerializer +from multiprocessing import Process from rq.compat import as_text from rq.defaults import DEFAULT_FAILURE_TTL @@ -14,9 +14,10 @@ from rq.worker import Worker from rq.registry import (CanceledJobRegistry, clean_registries, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry) +from rq.serializers import JSONSerializer from tests import RQTestCase -from tests.fixtures import div_by_zero, say_hello +from tests.fixtures import div_by_zero, long_running_job, say_hello class CustomJob(Job): @@ -177,24 +178,24 @@ class TestRegistry(RQTestCase): queue = Queue(connection=self.testconn) worker = Worker([queue]) - job = queue.enqueue(say_hello) - self.assertTrue(job.is_queued) + def check_registry(): + """Check job ID is in started job registry when job is performed""" + self.assertEqual(registry.get_job_ids(), [job.id]) - worker.prepare_job_execution(job) - self.assertIn(job.id, registry.get_job_ids()) - self.assertTrue(job.is_started) + job = queue.enqueue(long_running_job, 2) + p = Process(target=check_registry) - worker.perform_job(job, queue) + p.start() + worker.work(burst=True) + p.join(1) + + # After work is done, job is no longer in StartedJobRegistry self.assertNotIn(job.id, registry.get_job_ids()) self.assertTrue(job.is_finished) - # Job that fails + # If a job fails, it will also be removed from registry after execution job = queue.enqueue(div_by_zero) - - worker.prepare_job_execution(job) - self.assertIn(job.id, registry.get_job_ids()) - - worker.perform_job(job, queue) + worker.work(burst=True) self.assertNotIn(job.id, registry.get_job_ids()) def test_job_deletion(self): @@ -207,6 +208,7 @@ class TestRegistry(RQTestCase): self.assertTrue(job.is_queued) worker.prepare_job_execution(job) + job.heartbeat(datetime.utcnow(), 60) self.assertIn(job.id, registry.get_job_ids()) job.delete() diff --git a/tests/test_worker.py b/tests/test_worker.py index 327c6176..b3bf6a6e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -345,7 +345,7 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q], job_monitoring_interval=5) - for timeout, expected_heartbeats in [(2, 0), (7, 1), (12, 2)]: + for timeout, expected_heartbeats in [(2, 1), (7, 2), (12, 3)]: job = q.enqueue(long_running_job, args=(timeout,), job_timeout=30, @@ -780,8 +780,8 @@ class TestWorker(RQTestCase): worker.prepare_job_execution(job) # Updates working queue - registry = StartedJobRegistry(connection=self.testconn) - self.assertEqual(registry.get_job_ids(), [job.id]) + # registry = StartedJobRegistry(connection=self.testconn) + # self.assertEqual(registry.get_job_ids(), [job.id]) # Updates worker statuses self.assertEqual(worker.get_state(), 'busy') @@ -791,6 +791,24 @@ class TestWorker(RQTestCase): self.assertEqual(job._status, JobStatus.STARTED) self.assertEqual(job.worker_name, worker.name) + def test_monitor_work_horse(self): + """Monitor work horse calls worker.heartbeat() and job.heartbeat()""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + + def check_registry(): + """Check job ID is in started job registry when job is performed""" + registry = StartedJobRegistry(queue=queue) + self.assertEqual(registry.get_job_ids(), [job.id]) + + job = queue.enqueue(long_running_job, 2) + p = Process(target=check_registry) + + p.start() + worker.work(burst=True) + p.join(1) + def test_work_unicode_friendly(self): """Worker processes work with unicode description, then quits.""" q = Queue('foo') @@ -1186,6 +1204,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w.fork_work_horse(job, queue) p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5)) p.start() + job.started_at = utcnow() w.monitor_work_horse(job, queue) job_status = job.get_status() p.join(1) @@ -1216,6 +1235,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) + job.started_at = utcnow() w.monitor_work_horse(job, queue) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor