diff --git a/contrib/release/verify-reference-index.sh b/contrib/release/verify-reference-index.sh new file mode 100755 index 00000000..f56eefdd --- /dev/null +++ b/contrib/release/verify-reference-index.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +verify_index() { + modules=$(grep "kombu." "$1" | \ + perl -ple's/^\s*|\s*$//g;s{\.}{/}g;') + retval=0 + for module in $modules; do + if [ ! -f "$module.py" ]; then + if [ ! -f "$module/__init__.py" ]; then + echo "Outdated reference: $module" + retval=1 + fi + fi + done + + return $retval +} + +verify_index docs/reference/index.rst && \ + verify_index docs/internals/reference/index.rst + diff --git a/kombu/connection.py b/kombu/connection.py index 7ab6f8a9..905c1d1c 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -16,7 +16,7 @@ from Queue import Empty, Queue as _Queue from kombu import exceptions from kombu.transport import get_transport_cls -from kombu.simple import SimpleQueue +from kombu.simple import SimpleQueue, SimpleBuffer from kombu.utils import retry_over_time, OrderedDict from kombu.utils.functional import wraps diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index faf7f562..57635cb4 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -11,8 +11,6 @@ Emulates the AMQ API for non-AMQ transports. """ import socket -import sys -import traceback from itertools import count from multiprocessing.util import Finalize @@ -25,6 +23,11 @@ from kombu.transport.virtual.scheduling import FairCycle from kombu.transport.virtual.exchange import STANDARD_EXCHANGE_TYPES +class NotEquivalentError(Exception): + """Entity declaration is not equivalent to the previous declaration.""" + pass + + class BrokerState(object): #: exchange declarations. @@ -512,13 +515,14 @@ class Transport(base.Transport): def drain_events(self, connection, timeout=None): cycle_seconds = len(self.channels) * self.interval - time_spent = 0 + loop = 0 while 1: try: item, channel = self.cycle.get() except Empty: - if timeout and cycle_seconds >= timeout: + if timeout and cycle_seconds * loop >= timeout: raise socket.timeout() + loop += 1 else: break diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index e3c1f836..f75a0769 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -12,11 +12,6 @@ by the AMQ protocol (excluding the `headers` exchange). import re -class NotEquivalentError(Exception): - """Entity declaration is not equivalent to the previous declaration.""" - pass - - class ExchangeType(object): """Implements the specifics for an exchange type. @@ -42,11 +37,7 @@ class ExchangeType(object): def equivalent(self, prev, exchange, type, durable, auto_delete, arguments): - """Assert equivalence to previous declaration. - - :raises NotEquivalentError: If the declarations are not equivalent. - - """ + """Returns true if `prev` and `*exchange* is equivalent.""" return (type == prev["type"] and durable == prev["durable"] and auto_delete == prev["auto_delete"] and @@ -91,7 +82,7 @@ class TopicExchange(ExchangeType): compiled = self._compiled[pattern] except KeyError: compiled = self._compiled[pattern] = re.compile(pattern, re.u) - return compiled.match(routing_key) + return compiled.match(string) class FanoutExchange(ExchangeType):