This commit is contained in:
Ask Solem 2010-10-27 09:41:35 +02:00
parent 897f81a5a5
commit a3d7a07e21
4 changed files with 32 additions and 16 deletions

View File

@ -0,0 +1,21 @@
#!/bin/bash
verify_index() {
modules=$(grep "kombu." "$1" | \
perl -ple's/^\s*|\s*$//g;s{\.}{/}g;')
retval=0
for module in $modules; do
if [ ! -f "$module.py" ]; then
if [ ! -f "$module/__init__.py" ]; then
echo "Outdated reference: $module"
retval=1
fi
fi
done
return $retval
}
verify_index docs/reference/index.rst && \
verify_index docs/internals/reference/index.rst

View File

@ -16,7 +16,7 @@ from Queue import Empty, Queue as _Queue
from kombu import exceptions
from kombu.transport import get_transport_cls
from kombu.simple import SimpleQueue
from kombu.simple import SimpleQueue, SimpleBuffer
from kombu.utils import retry_over_time, OrderedDict
from kombu.utils.functional import wraps

View File

@ -11,8 +11,6 @@ Emulates the AMQ API for non-AMQ transports.
"""
import socket
import sys
import traceback
from itertools import count
from multiprocessing.util import Finalize
@ -25,6 +23,11 @@ from kombu.transport.virtual.scheduling import FairCycle
from kombu.transport.virtual.exchange import STANDARD_EXCHANGE_TYPES
class NotEquivalentError(Exception):
"""Entity declaration is not equivalent to the previous declaration."""
pass
class BrokerState(object):
#: exchange declarations.
@ -512,13 +515,14 @@ class Transport(base.Transport):
def drain_events(self, connection, timeout=None):
cycle_seconds = len(self.channels) * self.interval
time_spent = 0
loop = 0
while 1:
try:
item, channel = self.cycle.get()
except Empty:
if timeout and cycle_seconds >= timeout:
if timeout and cycle_seconds * loop >= timeout:
raise socket.timeout()
loop += 1
else:
break

View File

@ -12,11 +12,6 @@ by the AMQ protocol (excluding the `headers` exchange).
import re
class NotEquivalentError(Exception):
"""Entity declaration is not equivalent to the previous declaration."""
pass
class ExchangeType(object):
"""Implements the specifics for an exchange type.
@ -42,11 +37,7 @@ class ExchangeType(object):
def equivalent(self, prev, exchange, type, durable, auto_delete,
arguments):
"""Assert equivalence to previous declaration.
:raises NotEquivalentError: If the declarations are not equivalent.
"""
"""Returns true if `prev` and `*exchange* is equivalent."""
return (type == prev["type"] and
durable == prev["durable"] and
auto_delete == prev["auto_delete"] and
@ -91,7 +82,7 @@ class TopicExchange(ExchangeType):
compiled = self._compiled[pattern]
except KeyError:
compiled = self._compiled[pattern] = re.compile(pattern, re.u)
return compiled.match(routing_key)
return compiled.match(string)
class FanoutExchange(ExchangeType):