From c556106a38505ea7485daec9420c408f720d2961 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 26 Aug 2021 19:16:52 +0700 Subject: [PATCH] Main worker should use zadd(xx=True) to update heartbeat. (#1550) --- rq/job.py | 4 ++-- rq/registry.py | 6 +++--- rq/worker.py | 2 +- tests/test_cli.py | 8 +++----- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/rq/job.py b/rq/job.py index 0b128b46..188d47bf 100644 --- a/rq/job.py +++ b/rq/job.py @@ -442,11 +442,11 @@ class Job: raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value - def heartbeat(self, timestamp, ttl, pipeline=None): + def heartbeat(self, timestamp, ttl, pipeline=None, xx=False): self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) - self.started_job_registry.add(self, ttl, pipeline=pipeline) + self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx) id = property(get_id, set_id) diff --git a/rq/registry.py b/rq/registry.py index ac8bca7f..ba1a1dde 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -61,15 +61,15 @@ class BaseRegistry: self.cleanup() return self.connection.zcard(self.key) - def add(self, job, ttl=0, pipeline=None): + def add(self, job, ttl=0, pipeline=None, xx=False): """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf""" score = ttl if ttl < 0 else current_timestamp() + ttl if score == -1: score = '+inf' if pipeline is not None: - return pipeline.zadd(self.key, {job.id: score}) + return pipeline.zadd(self.key, {job.id: score}, xx=xx) - return self.connection.zadd(self.key, {job.id: score}) + return self.connection.zadd(self.key, {job.id: score}, xx=xx) def remove(self, job, pipeline=None, delete_job=False): """Removes job from registry and deletes it if `delete_job == True`""" diff --git a/rq/worker.py b/rq/worker.py index 46ace250..e745eefd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -814,7 +814,7 @@ class Worker: with self.connection.pipeline() as pipeline: self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) ttl = self.get_heartbeat_ttl(job) - job.heartbeat(utcnow(), ttl, pipeline=pipeline) + job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True) pipeline.execute() except OSError as e: diff --git a/tests/test_cli.py b/tests/test_cli.py index 72fc5100..a03aaf64 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -449,12 +449,10 @@ class TestRQCli(RQTestCase): prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' suffix = '\'.\n' - print(result.stdout) + self.assertTrue(result.output.startswith(prefix)) + self.assertTrue(result.output.endswith(suffix)) - self.assertTrue(result.stdout.startswith(prefix)) - self.assertTrue(result.stdout.endswith(suffix)) - - job_id = result.stdout[len(prefix):-len(suffix)] + job_id = result.output[len(prefix):-len(suffix)] queue_key = 'rq:queue:default' self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)