rq/tests/fixtures.py

137 lines
3.1 KiB
Python
Raw Normal View History

2014-05-05 08:49:29 +00:00
# -*- coding: utf-8 -*-
"""
This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics.
"""
2014-05-05 08:49:29 +00:00
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
import time
2016-07-25 11:04:47 +00:00
import sys
2014-05-05 08:49:29 +00:00
2016-06-04 14:15:27 +00:00
from rq import Connection, get_current_job, get_current_connection, Queue
2012-07-23 06:25:31 +00:00
from rq.decorators import job
from rq.compat import PY2
2016-06-04 14:15:27 +00:00
from rq.worker import HerokuWorker
2012-07-23 06:25:31 +00:00
2014-09-08 18:29:24 +00:00
def say_pid():
return os.getpid()
2012-02-22 16:39:14 +00:00
2014-09-08 18:29:24 +00:00
def say_hello(name=None):
"""A job with a single argument and a return value."""
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)
2012-02-22 16:39:14 +00:00
def do_nothing():
"""The best job in the world."""
pass
2012-02-22 16:39:14 +00:00
def div_by_zero(x):
"""Prepare for a division-by-zero exception."""
return x / 0
2012-02-22 16:39:14 +00:00
def some_calculation(x, y, z=1):
"""Some arbitrary calculation with three numbers. Choose z smartly if you
want a division by zero exception.
"""
return x * y / z
2012-02-22 16:39:14 +00:00
def create_file(path):
"""Creates a file at the given path. Actually, leaves evidence that the
job ran."""
with open(path, 'w') as f:
f.write('Just a sentinel.')
def create_file_after_timeout(path, timeout):
time.sleep(timeout)
create_file(path)
def access_self():
assert get_current_connection() is not None
2015-04-12 09:15:55 +00:00
assert get_current_job() is not None
def modify_self(meta):
j = get_current_job()
j.meta.update(meta)
j.save()
def echo(*args, **kwargs):
return (args, kwargs)
class Number(object):
def __init__(self, value):
self.value = value
@classmethod
def divide(cls, x, y):
return x * y
def div(self, y):
return self.value / y
2012-07-23 06:25:31 +00:00
2012-07-24 10:33:22 +00:00
class CallableObject(object):
def __call__(self):
return u"I'm callable"
class UnicodeStringObject(object):
def __repr__(self):
if PY2:
return u'é'.encode('utf-8')
else:
return u'é'
2012-07-24 10:33:22 +00:00
with Connection():
@job(queue='default')
def decorated_job(x, y):
return x + y
def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue)
return False
def long_running_job(timeout=10):
time.sleep(timeout)
return 'Done sleeping...'
2016-06-04 14:15:27 +00:00
def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay):
"""
2016-06-04 15:05:52 +00:00
Run the work horse for a simplified heroku worker where perform_job just
creates two sentinel files 2 seconds apart.
2016-06-04 14:15:27 +00:00
:param sandbox: directory to create files in
2016-06-04 15:05:52 +00:00
:param _imminent_shutdown_delay: delay to use for HerokuWorker
2016-06-04 14:15:27 +00:00
"""
2016-07-25 11:04:47 +00:00
sys.stderr = open(os.path.join(sandbox, 'stderr.log'), 'w')
2016-06-04 14:15:27 +00:00
class TestHerokuWorker(HerokuWorker):
imminent_shutdown_delay = _imminent_shutdown_delay
def perform_job(self, job, queue):
create_file(os.path.join(sandbox, 'started'))
2016-06-04 15:05:52 +00:00
# have to loop here rather than one sleep to avoid holding the GIL
# and preventing signals being received
2016-06-04 14:15:27 +00:00
for i in range(20):
time.sleep(0.1)
create_file(os.path.join(sandbox, 'finished'))
w = TestHerokuWorker(Queue('dummy'))
w.main_work_horse(None, None)