From 016da14723d21f6801b19cbff97f2fb9bee5c5db Mon Sep 17 00:00:00 2001 From: JackBoreczky <37283387+JackBoreczky@users.noreply.github.com> Date: Fri, 27 Nov 2020 21:05:29 -0800 Subject: [PATCH] Fix custom serializer in job fetches (#1381) * Ensure that the custom serializer defined is passed into the job fetch calls * add serializer as argument to fetch_many and dequeue_any methods * add worker test for custom serializer * move json serializer to serializers.py --- rq/job.py | 8 ++++---- rq/queue.py | 12 +++++++----- rq/serializers.py | 11 +++++++++++ rq/worker.py | 5 +++-- tests/test_worker.py | 16 ++++++++++++++++ 5 files changed, 41 insertions(+), 11 deletions(-) diff --git a/rq/job.py b/rq/job.py index eb3ca457..29e9fc27 100644 --- a/rq/job.py +++ b/rq/job.py @@ -190,7 +190,7 @@ class Job(object): return None if hasattr(self, '_dependency'): return self._dependency - job = self.fetch(self._dependency_ids[0], connection=self.connection) + job = self.fetch(self._dependency_ids[0], connection=self.connection, serializer=self.serializer) self._dependency = job return job @@ -301,7 +301,7 @@ class Job(object): return job @classmethod - def fetch_many(cls, job_ids, connection): + def fetch_many(cls, job_ids, connection, serializer=None): """ Bulk version of Job.fetch @@ -316,7 +316,7 @@ class Job(object): jobs = [] for i, job_id in enumerate(job_ids): if results[i]: - job = cls(job_id, connection=connection) + job = cls(job_id, connection=connection, serializer=serializer) job.restore(results[i]) jobs.append(job) else: @@ -679,7 +679,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection for dependent_id in self.dependent_ids: try: - job = Job.fetch(dependent_id, connection=self.connection) + job = Job.fetch(dependent_id, connection=self.connection, serializer=self.serializer) job.delete(pipeline=pipeline, remove_from_queue=False) except NoSuchJobError: diff --git a/rq/queue.py b/rq/queue.py index 5e98bcd8..0e6afc34 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -155,7 +155,7 @@ class Queue(object): def fetch_job(self, job_id): try: - job = self.job_class.fetch(job_id, connection=self.connection) + job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: self.remove(job_id) else: @@ -511,7 +511,8 @@ nd dependent_job for dependent_job in self.job_class.fetch_many( dependent_job_ids, - connection=self.connection + connection=self.connection, + serializer=self.serializer ) if dependent_job.dependencies_are_met( exclude_job_id=job.id, pipeline=pipe @@ -581,7 +582,7 @@ nd return None @classmethod - def dequeue_any(cls, queues, timeout, connection=None, job_class=None): + def dequeue_any(cls, queues, timeout, connection=None, job_class=None, serializer=None): """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -602,9 +603,10 @@ nd queue_key, job_id = map(as_text, result) queue = cls.from_queue_key(queue_key, connection=connection, - job_class=job_class) + job_class=job_class, + serializer=serializer) try: - job = job_class.fetch(job_id, connection=connection) + job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: # Silently pass on jobs that don't exist (anymore), # and continue in the look diff --git a/rq/serializers.py b/rq/serializers.py index 27f892f2..19cd7968 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -1,5 +1,6 @@ from functools import partial import pickle +import json from .compat import string_types from .utils import import_attribute @@ -10,6 +11,16 @@ class DefaultSerializer: loads = pickle.loads +class JSONSerializer(): + @staticmethod + def dumps(*args, **kwargs): + return json.dumps(*args, **kwargs).encode('utf-8') + + @staticmethod + def loads(s, *args, **kwargs): + return json.loads(s.decode('utf-8'), *args, **kwargs) + + def resolve_serializer(serializer): """This function checks the user defined serializer for ('dumps', 'loads') methods It returns a default pickle serializer if not found else it returns a MySerializer diff --git a/rq/worker.py b/rq/worker.py index ad5d0aea..8aa07bdd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -387,7 +387,7 @@ class Worker(object): if job_id is None: return None - return self.job_class.fetch(job_id, self.connection) + return self.job_class.fetch(job_id, self.connection, self.serializer) def _install_signal_handlers(self): """Installs signal handlers for handling SIGINT and SIGTERM @@ -638,7 +638,8 @@ class Worker(object): try: result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection, - job_class=self.job_class) + job_class=self.job_class, + serializer=self.serializer) if result is not None: job, queue = result diff --git a/tests/test_worker.py b/tests/test_worker.py index bf11b926..f30cc1af 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -38,6 +38,7 @@ from rq.suspension import resume, suspend from rq.utils import utcnow from rq.version import VERSION from rq.worker import HerokuWorker, WorkerStatus +from rq.serializers import JSONSerializer class CustomJob(Job): pass @@ -122,6 +123,21 @@ class TestWorker(RQTestCase): 'Expected at least some work done.' ) + def test_work_and_quit_custom_serializer(self): + """Worker processes work, then quits.""" + fooq, barq = Queue('foo', serializer=JSONSerializer), Queue('bar', serializer=JSONSerializer) + w = Worker([fooq, barq], serializer=JSONSerializer) + self.assertEqual( + w.work(burst=True), False, + 'Did not expect any work on the queue.' + ) + + fooq.enqueue(say_hello, name='Frank') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) + def test_worker_all(self): """Worker.all() works properly""" foo_queue = Queue('foo')