From a67206541bf6ebaf64622ea7a27de5a7708612f8 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 23 Jul 2010 15:57:05 +0200 Subject: [PATCH] Documented BrokerConnection --- kombu/connection.py | 139 +++++++++++++++++++++++++++++++++++++++++--- kombu/entity.py | 4 +- 2 files changed, 132 insertions(+), 11 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index 50dddd47..009dfa73 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -10,10 +10,58 @@ from kombu.backends import get_backend_cls from kombu.utils import retry_over_time, OrderedDict - - - class BrokerConnection(object): + """A connection to the broker. + + :keyword hostname: Hostname/address of the server to connect to. + Default is ``"localhost"``. + + :keyword userid: Username. Default is ``"guest"``. + + :keyword password: Password. Default is ``"guest"``. + + :keyword virtual_host: Virtual host. Default is ``"/"``. + + :keyword port: Port of the server. Default is backend specific. + + :keyword insist: Insist on connecting to a server. + + In a configuration with multiple load-sharing servers, the insist + option tells the server that the client is insisting on a connection + to the specified server. + + Default is ``False``. + + :keyword ssl: Use ssl to connect to the server. Default is ``False``. + + :keyword backend_cls: Backend class to use. Can be a class, + or a string specifying the path to the class. (e.g. + ``kombu.backends.pyamqplib.Backend``), or one of the aliases: + ``amqplib``, ``pika``, ``redis``, ``memory``. + + :keyword connect_timeout: Timeout in seconds for connecting to the + server. May not be suported by the specified backend. + + + **Usage** + + Creating a connection:: + + >>> conn = BrokerConnection("rabbit.example.com") + + The connection is established lazily when needed. If you need the + connection to be established, then do so expliclty using :meth:`connect`:: + + >>> conn.connect() + + + Remember to always close the connection:: + + >>> conn.release() + + + + """ port = None virtual_host = "/" connect_timeout = 5 @@ -37,19 +85,27 @@ class BrokerConnection(object): self.pool = pool def connect(self): - """Establish a connection to the AMQP server.""" + """Establish connection to server immediately.""" self._closed = False return self.connection def channel(self): - """Request a new AMQP channel.""" + """Request a new channel.""" return self.backend.create_channel(self.connection) def drain_events(self, **kwargs): + """Wait for a single event from the server. + + :keyword timeout: Timeout in seconds before we give up. + Raises :exc:`socket.timeout` if the timeout is execeded. + + Usually used from an event loop. + + """ return self.backend.drain_events(self.connection, **kwargs) def close(self): - """Close the currently open connection.""" + """Close the connection (if open).""" try: if self._connection: self.backend.close_connection(self._connection) @@ -60,13 +116,66 @@ class BrokerConnection(object): def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30): + """Ensure we have a connection to the server. + + If not retry establishing the connection with the settings + specified. + + :keyword errback: Optional callback called each time the connection + can't be established. Arguments provided are the exception + raised and the interval that will be slept ``(exc, interval)``. + + :keyword max_retries: Maximum number of times to retry. + If this limit is exceeded the connection error will be re-raised. + + :keyword interval_start: The number of seconds we start sleeping for. + :keyword interval_step: How many seconds added to the interval + for each retry. + :keyword interval_max: Maximum number of seconds to sleep between + each retry. + + """ retry_over_time(self.connect, self.connection_errors, (), {}, errback, max_retries, interval_start, interval_step, interval_max) return self def ensure(self, fun, errback=None, max_retries=None, - interval_start=2, interval_step=2, interval_max=30): + interval_start=1, interval_step=1, interval_max=1): + """Ensure operation completes, regardless of any channel/connection + errors occuring. + + Will retry by establishing the connection, and reapplying + the function. + + :param fun: Method to apply. + + :keyword errback: Optional callback called each time the connection + can't be established. Arguments provided are the exception + raised and the interval that will be slept ``(exc, interval)``. + + :keyword max_retries: Maximum number of times to retry. + If this limit is exceeded the connection error will be re-raised. + + :keyword interval_start: The number of seconds we start sleeping for. + :keyword interval_step: How many seconds added to the interval + for each retry. + :keyword interval_max: Maximum number of seconds to sleep between + each retry. + + + **Example** + + This is an example ensuring a publish operation:: + + >>> def errback(exc, interval): + ... print("Couldn't publish message: %r. Retry in %ds" % ( + ... exc, interval)) + >>> publish = conn.ensure(producer.publish, + ... errback=errback, max_retries=3) + >>> publish(message, routing_key) + + """ @wraps(fun) def _insured(*args, **kwargs): @@ -87,9 +196,16 @@ class BrokerConnection(object): return _insured def acquire(self): + """Acquire connection. + + Only here for API compatibility with :class:`BrokerConnectionPool`. + + """ return self def release(self): + """Close the connection, or if the connection is managed by a pool + the connection will be released to the pool so it can be reused.""" if self.pool: self.pool.release(self) else: @@ -99,6 +215,8 @@ class BrokerConnection(object): return self.get_backend_cls()(client=self) def clone(self, **kwargs): + """Create a copy of the connection with the same connection + settings.""" return self.__class__(**dict(self.info(), **kwargs)) def get_backend_cls(self): @@ -109,6 +227,7 @@ class BrokerConnection(object): return backend_cls def info(self): + """Get connection info.""" return OrderedDict((("hostname", self.hostname), ("userid", self.userid), ("password", self.password), @@ -124,12 +243,13 @@ class BrokerConnection(object): return self.backend.establish_connection() def __repr__(self): + """``x.__repr__() <==> repr(x)``""" info = self.info() return "" % ( ", ".join("%s=%r" % (item, info[item]) for item in info.keys()[:8])) - def __copy__(self): + """``x.__copy__() <==> copy(x)``""" return self.clone() def __reduce__(self): @@ -163,14 +283,15 @@ class BrokerConnection(object): @property def connection_errors(self): + """List of exceptions that may be raised by the connection.""" return self.backend.connection_errors @property def channel_errors(self): + """List of exceptions that may be raised by the channel.""" return self.backend.channel_errors - class BrokerConnectionPool(object): _t = None diff --git a/kombu/entity.py b/kombu/entity.py index dbfbde94..94f12a1f 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -258,8 +258,8 @@ class Binding(MaybeChannelBound): The routing key (if any), also called *binding key*. - The interpretation of the routing key - depends on the the :attr:`Exchange.exchange_type`. + The interpretation of the routing key depends on + the :attr:`Exchange.type`. * direct exchange