Fixes a race condition bug which causes finished jobs to be moved to FailedJobRegistry

This commit is contained in:
Selwin Ong 2021-08-24 13:53:53 +07:00
parent b80045d615
commit 2cbad700f2
5 changed files with 54 additions and 29 deletions

View File

@ -794,7 +794,8 @@ class Job:
"""Set job metadata before execution begins"""
self.worker_name = worker_name
self.last_heartbeat = utcnow()
self.started_at = self.last_heartbeat
if not self.started_at: # This should be set by the worker
self.started_at = self.last_heartbeat
self._status = JobStatus.STARTED
mapping = {
'last_heartbeat': utcformat(self.last_heartbeat),

View File

@ -791,9 +791,13 @@ class Worker:
either executes successfully or the status of the job is set to
failed
"""
ret_val = None
job.started_at = utcnow()
with self.connection.pipeline() as pipeline:
heartbeat_ttl = self.get_heartbeat_ttl(job)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
@ -865,6 +869,7 @@ class Worker:
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
job.started_at = utcnow()
self.set_state(WorkerStatus.BUSY)
self.fork_work_horse(job, queue)
self.monitor_work_horse(job, queue)
@ -908,9 +913,9 @@ class Worker:
self.set_current_job_id(job.id, pipeline=pipeline)
self.set_current_job_working_time(0, pipeline=pipeline)
heartbeat_ttl = self.get_heartbeat_ttl(job)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
# heartbeat_ttl = self.get_heartbeat_ttl(job)
# self.heartbeat(heartbeat_ttl, pipeline=pipeline)
# job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
@ -1037,7 +1042,6 @@ class Worker:
try:
self.prepare_job_execution(job)
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
rv = job.perform()

View File

@ -449,12 +449,10 @@ class TestRQCli(RQTestCase):
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n'
print(result.stdout)
self.assertTrue(result.output.startswith(prefix))
self.assertTrue(result.output.endswith(suffix))
self.assertTrue(result.stdout.startswith(prefix))
self.assertTrue(result.stdout.endswith(suffix))
job_id = result.stdout[len(prefix):-len(suffix)]
job_id = result.output[len(prefix):-len(suffix)]
queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)

View File

@ -2,7 +2,7 @@
from __future__ import absolute_import
from datetime import datetime, timedelta
from rq.serializers import JSONSerializer
from multiprocessing import Process
from rq.compat import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
@ -14,9 +14,10 @@ from rq.worker import Worker
from rq.registry import (CanceledJobRegistry, clean_registries, DeferredJobRegistry,
FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from rq.serializers import JSONSerializer
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
from tests.fixtures import div_by_zero, long_running_job, say_hello
class CustomJob(Job):
@ -177,24 +178,24 @@ class TestRegistry(RQTestCase):
queue = Queue(connection=self.testconn)
worker = Worker([queue])
job = queue.enqueue(say_hello)
self.assertTrue(job.is_queued)
def check_registry():
"""Check job ID is in started job registry when job is performed"""
self.assertEqual(registry.get_job_ids(), [job.id])
worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids())
self.assertTrue(job.is_started)
job = queue.enqueue(long_running_job, 2)
p = Process(target=check_registry)
worker.perform_job(job, queue)
p.start()
worker.work(burst=True)
p.join(1)
# After work is done, job is no longer in StartedJobRegistry
self.assertNotIn(job.id, registry.get_job_ids())
self.assertTrue(job.is_finished)
# Job that fails
# If a job fails, it will also be removed from registry after execution
job = queue.enqueue(div_by_zero)
worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids())
worker.perform_job(job, queue)
worker.work(burst=True)
self.assertNotIn(job.id, registry.get_job_ids())
def test_job_deletion(self):
@ -207,6 +208,7 @@ class TestRegistry(RQTestCase):
self.assertTrue(job.is_queued)
worker.prepare_job_execution(job)
job.heartbeat(datetime.utcnow(), 60)
self.assertIn(job.id, registry.get_job_ids())
job.delete()

View File

@ -345,7 +345,7 @@ class TestWorker(RQTestCase):
q = Queue()
w = Worker([q], job_monitoring_interval=5)
for timeout, expected_heartbeats in [(2, 0), (7, 1), (12, 2)]:
for timeout, expected_heartbeats in [(2, 1), (7, 2), (12, 3)]:
job = q.enqueue(long_running_job,
args=(timeout,),
job_timeout=30,
@ -780,8 +780,8 @@ class TestWorker(RQTestCase):
worker.prepare_job_execution(job)
# Updates working queue
registry = StartedJobRegistry(connection=self.testconn)
self.assertEqual(registry.get_job_ids(), [job.id])
# registry = StartedJobRegistry(connection=self.testconn)
# self.assertEqual(registry.get_job_ids(), [job.id])
# Updates worker statuses
self.assertEqual(worker.get_state(), 'busy')
@ -791,6 +791,24 @@ class TestWorker(RQTestCase):
self.assertEqual(job._status, JobStatus.STARTED)
self.assertEqual(job.worker_name, worker.name)
def test_monitor_work_horse(self):
"""Monitor work horse calls worker.heartbeat() and job.heartbeat()"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
worker = Worker([queue])
def check_registry():
"""Check job ID is in started job registry when job is performed"""
registry = StartedJobRegistry(queue=queue)
self.assertEqual(registry.get_job_ids(), [job.id])
job = queue.enqueue(long_running_job, 2)
p = Process(target=check_registry)
p.start()
worker.work(burst=True)
p.join(1)
def test_work_unicode_friendly(self):
"""Worker processes work with unicode description, then quits."""
q = Queue('foo')
@ -1186,6 +1204,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.fork_work_horse(job, queue)
p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5))
p.start()
job.started_at = utcnow()
w.monitor_work_horse(job, queue)
job_status = job.get_status()
p.join(1)
@ -1216,6 +1235,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
job.started_at = utcnow()
w.monitor_work_horse(job, queue)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor