Merge branch '2.5'

Conflicts:
	kombu/transport/redis.py
This commit is contained in:
Ask Solem 2012-12-10 15:22:52 +00:00
commit f1b51afe5a
9 changed files with 194 additions and 112 deletions

View File

@ -4,6 +4,36 @@
Change history
================
.. _version-2.5.3:
2.5.3
=====
:release-date: 2012-11-29 12:35 PM UTC
- Pidbox: Fixed compatibility with Python 2.6
2.5.2
=====
:release-date: 2012-11-29 12:35 PM UTC
.. _version-2.5.2:
2.5.2
=====
:release-date: 2012-11-29 12:35 PM UTC
- [Redis] Fixed connection leak and added a new 'max_connections' transport
option.
.. _version-2.5.1:
2.5.1
=====
:release-date: 2012-11-28 12:45 P.M UTC
- Fixed bug where return value of Queue.as_dict could not be serialized with
JSON (Issue #177).
.. _version-2.5.0:
2.5.0

View File

@ -2,7 +2,7 @@
kombu - Messaging Framework for Python
========================================
:Version: 2.5.0
:Version: 2.5.3
`Kombu` is a messaging framework for Python.

View File

@ -27,7 +27,7 @@ method::
You can also check whether the connection is connected::
>>> connection.connected()
>>> connection.connected
True
Connections must always be closed after use::

View File

@ -1,7 +1,7 @@
"""Messaging Framework for Python"""
from __future__ import absolute_import
VERSION = (2, 5, 0)
VERSION = (2, 5, 3)
__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'

View File

@ -38,11 +38,12 @@ class Object(object):
setattr(self, name, None)
def as_dict(self, recurse=False):
def f(obj):
def f(obj, type):
if recurse and isinstance(obj, Object):
return obj.as_dict(recurse=True)
return obj
return dict((attr, f(getattr(self, attr))) for attr, _ in self.attrs)
return type(obj) if type else obj
return dict((attr, f(getattr(self, attr), type))
for attr, type in self.attrs)
def __reduce__(self):
return unpickle_dict, (self.__class__, self.as_dict())

View File

@ -17,6 +17,10 @@ DELIVERY_MODES = {'transient': TRANSIENT_DELIVERY_MODE,
__all__ = ['Exchange', 'Queue']
def pretty_bindings(bindings):
return '[%s]' % (', '.join(map(str, bindings)))
class Exchange(MaybeChannelBound):
"""An Exchange declaration.
@ -266,7 +270,7 @@ class Exchange(MaybeChannelBound):
return super(Exchange, self).__repr__(str(self))
def __str__(self):
return 'Exchange %s(%s)' % (self.name, self.type)
return 'Exchange %s(%s)' % (self.name or repr(''), self.type)
@property
def can_cache_declaration(self):
@ -311,7 +315,10 @@ class binding(object):
nowait=nowait)
def __repr__(self):
return '<binding: %r -> %r>' % (self.exchange.name, self.routing_key)
return '<binding: %s>' % (self, )
def __str__(self):
return '%s->%s' % (self.exchange.name, self.routing_key)
class Queue(MaybeChannelBound):
@ -427,7 +434,7 @@ class Queue(MaybeChannelBound):
('auto_delete', bool),
('no_ack', None),
('alias', None),
('bindings', None))
('bindings', list))
def __init__(self, name='', exchange=None, routing_key='', channel=None,
bindings=None, **kwargs):
@ -611,10 +618,17 @@ class Queue(MaybeChannelBound):
return False
def __repr__(self):
return super(Queue, self).__repr__(
'Queue %s -> %s -> %s' % (self.name,
self.exchange,
self.routing_key))
s = super(Queue, self).__repr__
if self.bindings:
return s('Queue %r -> %s' % (
self.name,
pretty_bindings(self.bindings),
))
return s('Queue %r -> %s -> %r' % (
self.name,
self.exchange,
self.routing_key or '',
))
@property
def can_cache_declaration(self):

View File

@ -103,7 +103,7 @@ class Node(object):
if message:
self.adjust_clock(message.headers.get('clock') or 0)
if not destination or self.hostname in destination:
return self.dispatch(**body)
return self.dispatch(**kwdict(body))
dispatch_from_message = handle_message
def reply(self, data, exchange, routing_key, ticket, **kwargs):

View File

@ -382,7 +382,8 @@ class test_Channel(TestCase):
def test_invalid_database_raises_ValueError(self):
with self.assertRaises(ValueError):
Connection('redis:///dwqwewqe').channel()
self.channel.connection.client.virtual_host = 'dwqeq'
self.channel._connparams()
@skip_if_not_module('redis')
def test_get_client(self):
@ -409,13 +410,18 @@ class test_Channel(TestCase):
self.channel._in_poll = False
c = self.channel.client = Mock()
self.assertIs(self.channel._avail_client, c)
with self.channel.conn_or_acquire() as client:
self.assertIs(client, c)
def test_avail_client_when_in_poll(self):
self.channel._in_poll = True
self.channel._pool = Mock()
cc = self.channel._create_client = Mock()
client = cc.return_value = Mock()
self.assertTrue(self.channel._avail_client)
with self.channel.conn_or_acquire():
pass
self.channel.pool.release.assert_called_with(client.connection)
cc.assert_called_with()
@skip_if_not_module('redis')

View File

@ -24,6 +24,15 @@ from kombu.log import get_logger
from kombu.utils import cached_property, uuid
from kombu.utils.eventio import poll, READ, ERR
try:
from billiard.util import register_after_fork
except ImportError:
try:
from multiprocessing.util import register_after_fork # noqa
except ImportError:
def register_after_fork(*args, **kwargs): # noqa
pass
try:
import redis
except ImportError:
@ -91,12 +100,12 @@ class QoS(virtual.QoS):
def append(self, message, delivery_tag):
delivery = message.delivery_info
EX, RK = delivery['exchange'], delivery['routing_key']
self.client.pipeline() \
.zadd(self.unacked_index_key, delivery_tag, time()) \
with self.pipe_or_acquire() as pipe:
pipe.zadd(self.unacked_index_key, delivery_tag, time()) \
.hset(self.unacked_key, delivery_tag,
dumps([message._raw, EX, RK])) \
.execute()
super(QoS, self).append(message, delivery_tag)
super(QoS, self).append(message, delivery_tag)
def restore_unacked(self):
for tag in self._delivered:
@ -110,39 +119,43 @@ class QoS(virtual.QoS):
def reject(self, delivery_tag, requeue=False):
self.ack(delivery_tag)
@contextmanager
def pipe_or_acquire(self, pipe=None):
if pipe:
yield pipe
else:
with self.channel.conn_or_acquire() as client:
yield client.pipeline()
def _remove_from_indices(self, delivery_tag, pipe=None):
return (pipe or self.client.pipeline()) \
.zrem(self.unacked_index_key, delivery_tag) \
.hdel(self.unacked_key, delivery_tag)
with self.pipe_or_acquire(pipe) as pipe:
return pipe.zrem(self.unacked_index_key, delivery_tag) \
.hdel(self.unacked_key, delivery_tag)
def restore_visible(self, start=0, num=10, interval=10):
self._vrestore_count += 1
if (self._vrestore_count - 1) % interval:
return
client = self.client
ceil = time() - self.visibility_timeout
try:
with Mutex(client, self.unacked_mutex_key,
self.unacked_mutex_expire):
visible = client.zrevrangebyscore(
with self.channel.conn_or_acquire() as client:
ceil = time() - self.visibility_timeout
try:
with Mutex(client, self.unacked_mutex_key,
self.unacked_mutex_expire):
visible = client.zrevrangebyscore(
self.unacked_index_key, ceil, 0,
start=num and start, num=num, withscores=True)
for tag, score in visible or []:
self.restore_by_tag(tag, client)
except MutexHeld:
pass
for tag, score in visible or []:
self.restore_by_tag(tag, client)
except MutexHeld:
pass
def restore_by_tag(self, tag, client=None):
client = client or self.client
p, _, _ = self._remove_from_indices(tag,
client.pipeline().hget(self.unacked_key, tag)).execute()
if p:
M, EX, RK = loads(p)
self.channel._do_restore_message(M, EX, RK, client)
@property
def client(self):
return self.channel._avail_client
with self.channel.conn_or_acquire(client) as client:
p, _, _ = self._remove_from_indices(tag,
client.pipeline().hget(self.unacked_key, tag)).execute()
if p:
M, EX, RK = loads(p)
self.channel._do_restore_message(M, EX, RK, client)
@cached_property
def unacked_key(self):
@ -302,6 +315,7 @@ class Channel(virtual.Channel):
unacked_restore_limit = None
visibility_timeout = 3600 # 1 hour
priority_steps = PRIORITY_STEPS
max_connections = 10
_pool = None
from_transport_options = (virtual.Channel.from_transport_options
@ -311,6 +325,7 @@ class Channel(virtual.Channel):
'unacked_mutex_expire',
'visibility_timeout',
'unacked_restore_limit',
'max_connections',
'priority_steps'))
def __init__(self, *args, **kwargs):
@ -338,29 +353,35 @@ class Channel(virtual.Channel):
# are still waiting for data.
self.connection_errors = self.connection.connection_errors
register_after_fork(self, self._after_fork)
def _after_fork(self):
if self._pool is not None:
self._pool.disconnect()
def _do_restore_message(self, payload, exchange, routing_key, client=None):
client = client or self._avail_client
try:
with self.conn_or_acquire(client) as client:
try:
payload['headers']['redelivered'] = True
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
client.lpush(queue, dumps(payload))
except Exception:
logger.critical('Could not restore message: %r', payload,
exc_info=True)
try:
payload['headers']['redelivered'] = True
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
client.lpush(queue, dumps(payload))
except Exception:
logger.critical('Could not restore message: %r', payload,
exc_info=True)
def _restore(self, message, payload=None):
tag = message.delivery_tag
client = self._avail_client
P, _ = client.pipeline() \
.hget(self.unacked_key, tag) \
.hdel(self.unacked_key, tag) \
.execute()
if P:
M, EX, RK = loads(P)
self._do_restore_message(M, EX, RK, client)
with self.conn_or_acquire() as client:
P, _ = client.pipeline() \
.hget(self.unacked_key, tag) \
.hdel(self.unacked_key, tag) \
.execute()
if P:
M, EX, RK = loads(P)
self._do_restore_message(M, EX, RK, client)
def _next_delivery_tag(self):
return uuid()
@ -460,18 +481,20 @@ class Channel(virtual.Channel):
pass
def _get(self, queue):
for pri in PRIORITY_STEPS:
item = self._avail_client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(item)
raise Empty()
with self.conn_or_acquire() as client:
for pri in PRIORITY_STEPS:
item = client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(item)
raise Empty()
def _size(self, queue):
cmds = self._avail_client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.llen(self._q_for_pri(queue, pri))
sizes = cmds.execute()
return sum(size for size in sizes if isinstance(size, int))
with self.conn_or_acquire() as client:
cmds = client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.llen(self._q_for_pri(queue, pri))
sizes = cmds.execute()
return sum(size for size in sizes if isinstance(size, int))
def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
@ -488,11 +511,13 @@ class Channel(virtual.Channel):
message['properties']['delivery_info']['priority']), 9), 0)
except (TypeError, ValueError, KeyError):
pri = 0
self._avail_client.lpush(self._q_for_pri(queue, pri), dumps(message))
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
self._avail_client.publish(exchange, dumps(message))
with self.conn_or_acquire() as client:
client.publish(exchange, dumps(message))
def _new_queue(self, queue, auto_delete=False, **kwargs):
if auto_delete:
@ -502,43 +527,48 @@ class Channel(virtual.Channel):
if self.typeof(exchange).type == 'fanout':
# Mark exchange as fanout.
self._fanout_queues[queue] = exchange
self._avail_client.sadd(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
with self.conn_or_acquire() as client:
client.sadd(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
def _delete(self, queue, exchange, routing_key, pattern, *args):
self.auto_delete_queues.discard(queue)
self._avail_client.srem(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
cmds = self._avail_client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.delete(self._q_for_pri(queue, pri))
cmds.execute()
with self.conn_or_acquire() as client:
client.srem(self.keyprefix_queue % (exchange, ),
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
cmds = client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.delete(self._q_for_pri(queue, pri))
cmds.execute()
def _has_queue(self, queue, **kwargs):
cmds = self._avail_client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.exists(self._q_for_pri(queue, pri))
return any(cmds.execute())
with self.conn_or_acquire() as client:
cmds = client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.exists(self._q_for_pri(queue, pri))
return any(cmds.execute())
def get_table(self, exchange):
key = self.keyprefix_queue % exchange
values = self._avail_client.smembers(key)
if not values:
raise InconsistencyError(
'Queue list empty or key does not exist: {0!r}'.format(key))
return [tuple(val.split(self.sep)) for val in values]
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(
'Queue list empty or does not exist: {0!r}'.format(key))
return [tuple(val.split(self.sep)) for val in values]
def _purge(self, queue):
cmds = self.pipeline()
for pri in PRIORITY_STEPS:
priq = self._q_for_pri(queue, pri)
cmds = cmds.llen(priq).delete(priq)
sizes = cmds.execute()
return sum(sizes[::2])
with self.conn_or_acquire() as client:
cmds = client.pipeline()
for pri in PRIORITY_STEPS:
priq = self._q_for_pri(queue, pri)
cmds = cmds.llen(priq).delete(priq)
sizes = cmds.execute()
return sum(sizes[::2])
def close(self):
if self._pool:
@ -576,7 +606,8 @@ class Channel(virtual.Channel):
return {'host': conninfo.hostname or '127.0.0.1',
'port': conninfo.port or DEFAULT_PORT,
'db': database,
'password': conninfo.password}
'password': conninfo.password,
'max_connections': self.max_connections}
def _create_client(self):
return self.Client(connection_pool=self.pool)
@ -612,17 +643,17 @@ class Channel(virtual.Channel):
return KombuRedis
def pipeline(self):
"""Create a new pipeline using available connection."""
return self._avail_client.pipeline()
@property
def _avail_client(self):
"""Returns new connection from the pool if the main client is
currently in polling-mode."""
if self._in_poll:
return self._create_client()
return self.client
@contextmanager
def conn_or_acquire(self, client=None):
if client:
yield client
else:
if self._in_poll:
client = self._create_client()
yield client
self.pool.release(client.connection)
else:
yield self.client
@property
def pool(self):