Documented BrokerConnection

This commit is contained in:
Ask Solem 2010-07-23 15:57:05 +02:00
parent cd2022670f
commit a67206541b
2 changed files with 132 additions and 11 deletions

View File

@ -10,10 +10,58 @@ from kombu.backends import get_backend_cls
from kombu.utils import retry_over_time, OrderedDict from kombu.utils import retry_over_time, OrderedDict
class BrokerConnection(object): class BrokerConnection(object):
"""A connection to the broker.
:keyword hostname: Hostname/address of the server to connect to.
Default is ``"localhost"``.
:keyword userid: Username. Default is ``"guest"``.
:keyword password: Password. Default is ``"guest"``.
:keyword virtual_host: Virtual host. Default is ``"/"``.
:keyword port: Port of the server. Default is backend specific.
:keyword insist: Insist on connecting to a server.
In a configuration with multiple load-sharing servers, the insist
option tells the server that the client is insisting on a connection
to the specified server.
Default is ``False``.
:keyword ssl: Use ssl to connect to the server. Default is ``False``.
:keyword backend_cls: Backend class to use. Can be a class,
or a string specifying the path to the class. (e.g.
``kombu.backends.pyamqplib.Backend``), or one of the aliases:
``amqplib``, ``pika``, ``redis``, ``memory``.
:keyword connect_timeout: Timeout in seconds for connecting to the
server. May not be suported by the specified backend.
**Usage**
Creating a connection::
>>> conn = BrokerConnection("rabbit.example.com")
The connection is established lazily when needed. If you need the
connection to be established, then do so expliclty using :meth:`connect`::
>>> conn.connect()
Remember to always close the connection::
>>> conn.release()
"""
port = None port = None
virtual_host = "/" virtual_host = "/"
connect_timeout = 5 connect_timeout = 5
@ -37,19 +85,27 @@ class BrokerConnection(object):
self.pool = pool self.pool = pool
def connect(self): def connect(self):
"""Establish a connection to the AMQP server.""" """Establish connection to server immediately."""
self._closed = False self._closed = False
return self.connection return self.connection
def channel(self): def channel(self):
"""Request a new AMQP channel.""" """Request a new channel."""
return self.backend.create_channel(self.connection) return self.backend.create_channel(self.connection)
def drain_events(self, **kwargs): def drain_events(self, **kwargs):
"""Wait for a single event from the server.
:keyword timeout: Timeout in seconds before we give up.
Raises :exc:`socket.timeout` if the timeout is execeded.
Usually used from an event loop.
"""
return self.backend.drain_events(self.connection, **kwargs) return self.backend.drain_events(self.connection, **kwargs)
def close(self): def close(self):
"""Close the currently open connection.""" """Close the connection (if open)."""
try: try:
if self._connection: if self._connection:
self.backend.close_connection(self._connection) self.backend.close_connection(self._connection)
@ -60,13 +116,66 @@ class BrokerConnection(object):
def ensure_connection(self, errback=None, max_retries=None, def ensure_connection(self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30): interval_start=2, interval_step=2, interval_max=30):
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
specified.
:keyword errback: Optional callback called each time the connection
can't be established. Arguments provided are the exception
raised and the interval that will be slept ``(exc, interval)``.
:keyword max_retries: Maximum number of times to retry.
If this limit is exceeded the connection error will be re-raised.
:keyword interval_start: The number of seconds we start sleeping for.
:keyword interval_step: How many seconds added to the interval
for each retry.
:keyword interval_max: Maximum number of seconds to sleep between
each retry.
"""
retry_over_time(self.connect, self.connection_errors, (), {}, retry_over_time(self.connect, self.connection_errors, (), {},
errback, max_retries, errback, max_retries,
interval_start, interval_step, interval_max) interval_start, interval_step, interval_max)
return self return self
def ensure(self, fun, errback=None, max_retries=None, def ensure(self, fun, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30): interval_start=1, interval_step=1, interval_max=1):
"""Ensure operation completes, regardless of any channel/connection
errors occuring.
Will retry by establishing the connection, and reapplying
the function.
:param fun: Method to apply.
:keyword errback: Optional callback called each time the connection
can't be established. Arguments provided are the exception
raised and the interval that will be slept ``(exc, interval)``.
:keyword max_retries: Maximum number of times to retry.
If this limit is exceeded the connection error will be re-raised.
:keyword interval_start: The number of seconds we start sleeping for.
:keyword interval_step: How many seconds added to the interval
for each retry.
:keyword interval_max: Maximum number of seconds to sleep between
each retry.
**Example**
This is an example ensuring a publish operation::
>>> def errback(exc, interval):
... print("Couldn't publish message: %r. Retry in %ds" % (
... exc, interval))
>>> publish = conn.ensure(producer.publish,
... errback=errback, max_retries=3)
>>> publish(message, routing_key)
"""
@wraps(fun) @wraps(fun)
def _insured(*args, **kwargs): def _insured(*args, **kwargs):
@ -87,9 +196,16 @@ class BrokerConnection(object):
return _insured return _insured
def acquire(self): def acquire(self):
"""Acquire connection.
Only here for API compatibility with :class:`BrokerConnectionPool`.
"""
return self return self
def release(self): def release(self):
"""Close the connection, or if the connection is managed by a pool
the connection will be released to the pool so it can be reused."""
if self.pool: if self.pool:
self.pool.release(self) self.pool.release(self)
else: else:
@ -99,6 +215,8 @@ class BrokerConnection(object):
return self.get_backend_cls()(client=self) return self.get_backend_cls()(client=self)
def clone(self, **kwargs): def clone(self, **kwargs):
"""Create a copy of the connection with the same connection
settings."""
return self.__class__(**dict(self.info(), **kwargs)) return self.__class__(**dict(self.info(), **kwargs))
def get_backend_cls(self): def get_backend_cls(self):
@ -109,6 +227,7 @@ class BrokerConnection(object):
return backend_cls return backend_cls
def info(self): def info(self):
"""Get connection info."""
return OrderedDict((("hostname", self.hostname), return OrderedDict((("hostname", self.hostname),
("userid", self.userid), ("userid", self.userid),
("password", self.password), ("password", self.password),
@ -124,12 +243,13 @@ class BrokerConnection(object):
return self.backend.establish_connection() return self.backend.establish_connection()
def __repr__(self): def __repr__(self):
"""``x.__repr__() <==> repr(x)``"""
info = self.info() info = self.info()
return "<BrokerConnection: %s>" % ( return "<BrokerConnection: %s>" % (
", ".join("%s=%r" % (item, info[item]) ", ".join("%s=%r" % (item, info[item])
for item in info.keys()[:8])) for item in info.keys()[:8]))
def __copy__(self): def __copy__(self):
"""``x.__copy__() <==> copy(x)``"""
return self.clone() return self.clone()
def __reduce__(self): def __reduce__(self):
@ -163,14 +283,15 @@ class BrokerConnection(object):
@property @property
def connection_errors(self): def connection_errors(self):
"""List of exceptions that may be raised by the connection."""
return self.backend.connection_errors return self.backend.connection_errors
@property @property
def channel_errors(self): def channel_errors(self):
"""List of exceptions that may be raised by the channel."""
return self.backend.channel_errors return self.backend.channel_errors
class BrokerConnectionPool(object): class BrokerConnectionPool(object):
_t = None _t = None

View File

@ -258,8 +258,8 @@ class Binding(MaybeChannelBound):
The routing key (if any), also called *binding key*. The routing key (if any), also called *binding key*.
The interpretation of the routing key The interpretation of the routing key depends on
depends on the the :attr:`Exchange.exchange_type`. the :attr:`Exchange.type`.
* direct exchange * direct exchange