mirror of https://github.com/celery/kombu.git
flakes
This commit is contained in:
parent
d5eb0d7da7
commit
a8569f3a82
|
@ -6,7 +6,7 @@ import os
|
|||
# If your extensions are in another directory, add it here. If the directory
|
||||
# is relative to the documentation root, use os.path.abspath to make it
|
||||
# absolute, like shown here.
|
||||
sys.path.append(os.path.join(os.pardir, "tests"))
|
||||
sys.path.append(os.path.join(os.pardir, 'tests'))
|
||||
import kombu # noqa
|
||||
|
||||
from django.conf import settings # noqa
|
||||
|
@ -41,7 +41,7 @@ copyright = '2009-2016, Ask Solem'
|
|||
# built documents.
|
||||
#
|
||||
# The short X.Y version.
|
||||
version = ".".join(map(str, kombu.VERSION[0:2]))
|
||||
version = '.'.join(map(str, kombu.VERSION[0:2]))
|
||||
# The full version, including alpha/beta/rc tags.
|
||||
release = kombu.__version__
|
||||
|
||||
|
@ -71,8 +71,8 @@ latex_documents = [
|
|||
'Ask Solem', 'manual'),
|
||||
]
|
||||
|
||||
html_theme = "celery"
|
||||
html_theme_path = ["_theme"]
|
||||
html_theme = 'celery'
|
||||
html_theme_path = ['_theme']
|
||||
html_sidebars = {
|
||||
'index': ['sidebarintro.html', 'sourcelink.html', 'searchbox.html'],
|
||||
'**': ['sidebarlogo.html', 'localtoc.html', 'relations.html',
|
||||
|
|
|
@ -13,15 +13,15 @@
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
>>> mailbox = pidbox.Mailbox("celerybeat", type="direct")
|
||||
>>> mailbox = pidbox.Mailbox('celerybeat', type='direct')
|
||||
|
||||
>>> @mailbox.handler
|
||||
>>> def reload_schedule(state, **kwargs):
|
||||
... state["beat"].reload_schedule()
|
||||
... state['beat'].reload_schedule()
|
||||
|
||||
>>> @mailbox.handler
|
||||
>>> def connection_info(state, **kwargs):
|
||||
... return {"connection": state["connection"].info()}
|
||||
... return {'connection': state['connection'].info()}
|
||||
|
||||
Example Node
|
||||
~~~~~~~~~~~~
|
||||
|
@ -29,8 +29,8 @@
|
|||
.. code-block:: python
|
||||
|
||||
>>> connection = kombu.Connection()
|
||||
>>> state = {"beat": beat,
|
||||
"connection": connection}
|
||||
>>> state = {'beat': beat,
|
||||
'connection': connection}
|
||||
>>> consumer = mailbox(connection).Node(hostname).listen()
|
||||
>>> try:
|
||||
... while True:
|
||||
|
@ -43,8 +43,8 @@
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
>>> mailbox.cast("reload_schedule") # cast is async.
|
||||
>>> info = celerybeat.call("connection_info", timeout=1)
|
||||
>>> mailbox.cast('reload_schedule') # cast is async.
|
||||
>>> info = celerybeat.call('connection_info', timeout=1)
|
||||
|
||||
Mailbox
|
||||
-------
|
||||
|
|
|
@ -60,7 +60,7 @@ Or using :class:`~kombu.mixins.ConsumerMixin`:
|
|||
]
|
||||
|
||||
def on_message(self, body, message):
|
||||
print("RECEIVED MESSAGE: {0!r}".format(body))
|
||||
print('RECEIVED MESSAGE: {0!r}'.format(body))
|
||||
message.ack()
|
||||
|
||||
C(connection).run()
|
||||
|
|
|
@ -89,12 +89,12 @@ use one of the following options.
|
|||
|
||||
>>> producer = Producer(channel,
|
||||
... exchange=exchange,
|
||||
... serializer="yaml")
|
||||
... serializer='yaml')
|
||||
|
||||
2. Set the serialization option per message::
|
||||
|
||||
>>> producer.publish(message, routing_key=rkey,
|
||||
... serializer="pickle")
|
||||
... serializer='pickle')
|
||||
|
||||
Note that a `Consumer` do not need the serialization method specified.
|
||||
They can auto-detect the serialization method as the
|
||||
|
@ -112,10 +112,10 @@ not waste cycles serializing/deserializing the data.
|
|||
You can optionally specify a `content_encoding`
|
||||
for the raw data::
|
||||
|
||||
>>> with open("~/my_picture.jpg", "rb") as fh:
|
||||
>>> with open('~/my_picture.jpg', 'rb') as fh:
|
||||
... producer.publish(fh.read(),
|
||||
content_type="image/jpeg",
|
||||
content_encoding="binary",
|
||||
content_type='image/jpeg',
|
||||
content_encoding='binary',
|
||||
routing_key=rkey)
|
||||
|
||||
The `Message` object returned by the `Consumer` class will have a
|
||||
|
|
|
@ -3,6 +3,6 @@ from kombu import Connection
|
|||
with Connection('amqp://guest:guest@localhost:5672//') as conn:
|
||||
simple_queue = conn.SimpleQueue('simple_queue')
|
||||
message = simple_queue.get(block=True, timeout=1)
|
||||
print("Received: %s" % message.payload)
|
||||
print('Received: %s' % message.payload)
|
||||
message.ack()
|
||||
simple_queue.close()
|
||||
|
|
|
@ -45,7 +45,7 @@ class Worker(ConsumerProducerMixin):
|
|||
|
||||
def start_worker(broker_url):
|
||||
connection = Connection(broker_url)
|
||||
print " [x] Awaiting RPC requests"
|
||||
print(' [x] Awaiting RPC requests')
|
||||
worker = Worker(connection)
|
||||
worker.run()
|
||||
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
def hello_task(who="world"):
|
||||
print("Hello {0}".format(who))
|
||||
def hello_task(who='world'):
|
||||
print('Hello {0}'.format(who))
|
||||
|
|
|
@ -34,15 +34,15 @@ class StringVersion(object):
|
|||
|
||||
def decode(self, s):
|
||||
s = rq(s)
|
||||
text = ""
|
||||
major, minor, release = s.split(".")
|
||||
text = ''
|
||||
major, minor, release = s.split('.')
|
||||
if not release.isdigit():
|
||||
pos = release.index(re.split("\d+", release)[1][0])
|
||||
pos = release.index(re.split('\d+', release)[1][0])
|
||||
release, text = release[:pos], release[pos:]
|
||||
return int(major), int(minor), int(release), text
|
||||
|
||||
def encode(self, v):
|
||||
return ".".join(map(str, v[:3])) + v[3]
|
||||
return '.'.join(map(str, v[:3])) + v[3]
|
||||
to_str = StringVersion().encode
|
||||
from_str = StringVersion().decode
|
||||
|
||||
|
@ -50,9 +50,9 @@ from_str = StringVersion().decode
|
|||
class TupleVersion(object):
|
||||
|
||||
def decode(self, s):
|
||||
v = list(map(rq, s.split(", ")))
|
||||
v = list(map(rq, s.split(', ')))
|
||||
return (tuple(map(int, v[0:3])) +
|
||||
tuple(["".join(v[3:])]))
|
||||
tuple([''.join(v[3:])]))
|
||||
|
||||
def encode(self, v):
|
||||
v = list(v)
|
||||
|
@ -64,7 +64,7 @@ class TupleVersion(object):
|
|||
|
||||
if not v[-1]:
|
||||
v.pop()
|
||||
return ", ".join(map(quote, v))
|
||||
return ', '.join(map(quote, v))
|
||||
|
||||
|
||||
class VersionFile(object):
|
||||
|
@ -74,8 +74,8 @@ class VersionFile(object):
|
|||
self._kept = None
|
||||
|
||||
def _as_orig(self, version):
|
||||
return self.wb % {"version": self.type.encode(version),
|
||||
"kept": self._kept}
|
||||
return self.wb % {'version': self.type.encode(version),
|
||||
'kept': self._kept}
|
||||
|
||||
def write(self, version):
|
||||
pattern = self.regex
|
||||
|
@ -96,14 +96,14 @@ class VersionFile(object):
|
|||
for line in fh:
|
||||
m = pattern.match(line)
|
||||
if m:
|
||||
if "?P<keep>" in pattern.pattern:
|
||||
self._kept, gpos = m.groupdict()["keep"], 1
|
||||
if '?P<keep>' in pattern.pattern:
|
||||
self._kept, gpos = m.groupdict()['keep'], 1
|
||||
return self.type.decode(m.groups()[gpos])
|
||||
|
||||
|
||||
class PyVersion(VersionFile):
|
||||
regex = re.compile(r'^VERSION\s*=\s*\((.+?)\)')
|
||||
wb = "VERSION = (%(version)s)\n"
|
||||
wb = 'VERSION = (%(version)s)\n'
|
||||
type = TupleVersion()
|
||||
|
||||
|
||||
|
@ -119,19 +119,19 @@ class CPPVersion(VersionFile):
|
|||
type = StringVersion()
|
||||
|
||||
|
||||
_filetype_to_type = {"py": PyVersion,
|
||||
"rst": SphinxVersion,
|
||||
"c": CPPVersion,
|
||||
"h": CPPVersion}
|
||||
_filetype_to_type = {'py': PyVersion,
|
||||
'rst': SphinxVersion,
|
||||
'c': CPPVersion,
|
||||
'h': CPPVersion}
|
||||
|
||||
|
||||
def filetype_to_type(filename):
|
||||
_, _, suffix = filename.rpartition(".")
|
||||
_, _, suffix = filename.rpartition('.')
|
||||
return _filetype_to_type[suffix](filename)
|
||||
|
||||
|
||||
def bump(*files, **kwargs):
|
||||
version = kwargs.get("version")
|
||||
version = kwargs.get('version')
|
||||
files = [filetype_to_type(f) for f in files]
|
||||
versions = [v.parse() for v in files]
|
||||
current = list(reversed(sorted(versions)))[0] # find highest
|
||||
|
@ -144,26 +144,26 @@ def bump(*files, **kwargs):
|
|||
raise Exception("Can't bump alpha releases")
|
||||
next = (major, minor, release + 1, text)
|
||||
|
||||
print("Bump version from %s -> %s" % (to_str(current), to_str(next)))
|
||||
print('Bump version from %s -> %s' % (to_str(current), to_str(next)))
|
||||
|
||||
for v in files:
|
||||
print(" writing %r..." % (v.filename,))
|
||||
print(' writing %r...' % (v.filename,))
|
||||
v.write(next)
|
||||
|
||||
print(cmd("git", "commit", "-m", "Bumps version to %s" % (to_str(next),),
|
||||
print(cmd('git', 'commit', '-m', 'Bumps version to %s' % (to_str(next),),
|
||||
*[f.filename for f in files]))
|
||||
print(cmd("git", "tag", "v%s" % (to_str(next),)))
|
||||
print(cmd('git', 'tag', 'v%s' % (to_str(next),)))
|
||||
|
||||
|
||||
def main(argv=sys.argv, version=None):
|
||||
if not len(argv) > 1:
|
||||
print("Usage: distdir [docfile] -- <custom version>")
|
||||
print('Usage: distdir [docfile] -- <custom version>')
|
||||
sys.exit(0)
|
||||
if "--" in argv:
|
||||
if '--' in argv:
|
||||
c = argv.index('--')
|
||||
version = argv[c + 1]
|
||||
argv = argv[:c]
|
||||
bump(*argv[1:], version=version)
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -1,125 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
from collections import defaultdict
|
||||
from unipath import Path
|
||||
|
||||
RE_COMMENT = r'^\s*\#'
|
||||
RE_NOQA = r'.+?\#\s+noqa+'
|
||||
RE_MULTILINE_COMMENT_O = r'^\s*(?:\'\'\'|""").+?(?:\'\'\'|""")'
|
||||
RE_MULTILINE_COMMENT_S = r'^\s*(?:\'\'\'|""")'
|
||||
RE_MULTILINE_COMMENT_E = r'(?:^|.+?)(?:\'\'\'|""")'
|
||||
RE_WITH = r'(?:^|\s+)with\s+'
|
||||
RE_WITH_IMPORT = r'''from\s+ __future__\s+ import\s+ with_statement'''
|
||||
RE_PRINT = r'''(?:^|\s+)print\((?:"|')(?:\W+?)?[A-Z0-9:]{2,}'''
|
||||
RE_ABS_IMPORT = r'''from\s+ __future__\s+ import\s+ absolute_import'''
|
||||
|
||||
acc = defaultdict(lambda: {"abs": False, "print": False})
|
||||
|
||||
|
||||
def compile(regex):
|
||||
return re.compile(regex, re.VERBOSE)
|
||||
|
||||
|
||||
class FlakePP(object):
|
||||
re_comment = compile(RE_COMMENT)
|
||||
re_ml_comment_o = compile(RE_MULTILINE_COMMENT_O)
|
||||
re_ml_comment_s = compile(RE_MULTILINE_COMMENT_S)
|
||||
re_ml_comment_e = compile(RE_MULTILINE_COMMENT_E)
|
||||
re_abs_import = compile(RE_ABS_IMPORT)
|
||||
re_print = compile(RE_PRINT)
|
||||
re_with_import = compile(RE_WITH_IMPORT)
|
||||
re_with = compile(RE_WITH)
|
||||
re_noqa = compile(RE_NOQA)
|
||||
map = {"abs": True, "print": False,
|
||||
"with": False, "with-used": False}
|
||||
|
||||
def __init__(self, verbose=False):
|
||||
self.verbose = verbose
|
||||
self.steps = (("abs", self.re_abs_import),
|
||||
("with", self.re_with_import),
|
||||
("with-used", self.re_with),
|
||||
("print", self.re_print))
|
||||
|
||||
def analyze_fh(self, fh):
|
||||
steps = self.steps
|
||||
filename = fh.name
|
||||
acc = dict(self.map)
|
||||
index = 0
|
||||
errors = [0]
|
||||
|
||||
def error(fmt, **kwargs):
|
||||
errors[0] += 1
|
||||
self.announce(fmt, **dict(kwargs, filename=filename))
|
||||
|
||||
for index, line in enumerate(self.strip_comments(fh)):
|
||||
for key, pattern in steps:
|
||||
if pattern.match(line):
|
||||
acc[key] = True
|
||||
if index:
|
||||
if not acc["abs"]:
|
||||
error("%(filename)s: missing abs import")
|
||||
if acc["with-used"] and not acc["with"]:
|
||||
error("%(filename)s: missing with import")
|
||||
if acc["print"]:
|
||||
error("%(filename)s: left over print statement")
|
||||
|
||||
return filename, errors[0], acc
|
||||
|
||||
def analyze_file(self, filename):
|
||||
with open(filename) as fh:
|
||||
return self.analyze_fh(fh)
|
||||
|
||||
def analyze_tree(self, dir):
|
||||
for dirpath, _, filenames in os.walk(dir):
|
||||
for path in (Path(dirpath, f) for f in filenames):
|
||||
if path.endswith(".py"):
|
||||
yield self.analyze_file(path)
|
||||
|
||||
def analyze(self, *paths):
|
||||
for path in map(Path, paths):
|
||||
if path.isdir():
|
||||
for res in self.analyze_tree(path):
|
||||
yield res
|
||||
else:
|
||||
yield self.analyze_file(path)
|
||||
|
||||
def strip_comments(self, fh):
|
||||
re_comment = self.re_comment
|
||||
re_ml_comment_o = self.re_ml_comment_o
|
||||
re_ml_comment_s = self.re_ml_comment_s
|
||||
re_ml_comment_e = self.re_ml_comment_e
|
||||
re_noqa = self.re_noqa
|
||||
in_ml = False
|
||||
|
||||
for line in fh.readlines():
|
||||
if in_ml:
|
||||
if re_ml_comment_e.match(line):
|
||||
in_ml = False
|
||||
else:
|
||||
if re_noqa.match(line) or re_ml_comment_o.match(line):
|
||||
pass
|
||||
elif re_ml_comment_s.match(line):
|
||||
in_ml = True
|
||||
elif re_comment.match(line):
|
||||
pass
|
||||
else:
|
||||
yield line
|
||||
|
||||
def announce(self, fmt, **kwargs):
|
||||
sys.stderr.write((fmt + "\n") % kwargs)
|
||||
|
||||
|
||||
def main(argv=sys.argv, exitcode=0):
|
||||
for _, errors, _ in FlakePP(verbose=True).analyze(*argv[1:]):
|
||||
if errors:
|
||||
exitcode = 1
|
||||
return exitcode
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
|
@ -5,8 +5,8 @@ import os
|
|||
|
||||
|
||||
class test_SLMQ(transport.TransportCase):
|
||||
transport = "SLMQ"
|
||||
prefix = "slmq"
|
||||
transport = 'SLMQ'
|
||||
prefix = 'slmq'
|
||||
event_loop_max = 100
|
||||
message_size_limit = 4192
|
||||
reliable_purge = False
|
||||
|
@ -14,16 +14,16 @@ class test_SLMQ(transport.TransportCase):
|
|||
suppress_disorder_warning = True
|
||||
|
||||
def before_connect(self):
|
||||
if "SLMQ_ACCOUNT" not in os.environ:
|
||||
raise SkipTest("Missing envvar SLMQ_ACCOUNT")
|
||||
if "SL_USERNAME" not in os.environ:
|
||||
raise SkipTest("Missing envvar SL_USERNAME")
|
||||
if "SL_API_KEY" not in os.environ:
|
||||
raise SkipTest("Missing envvar SL_API_KEY")
|
||||
if "SLMQ_HOST" not in os.environ:
|
||||
raise SkipTest("Missing envvar SLMQ_HOST")
|
||||
if "SLMQ_SECURE" not in os.environ:
|
||||
raise SkipTest("Missing envvar SLMQ_SECURE")
|
||||
if 'SLMQ_ACCOUNT' not in os.environ:
|
||||
raise SkipTest('Missing envvar SLMQ_ACCOUNT')
|
||||
if 'SL_USERNAME' not in os.environ:
|
||||
raise SkipTest('Missing envvar SL_USERNAME')
|
||||
if 'SL_API_KEY' not in os.environ:
|
||||
raise SkipTest('Missing envvar SL_API_KEY')
|
||||
if 'SLMQ_HOST' not in os.environ:
|
||||
raise SkipTest('Missing envvar SLMQ_HOST')
|
||||
if 'SLMQ_SECURE' not in os.environ:
|
||||
raise SkipTest('Missing envvar SLMQ_SECURE')
|
||||
|
||||
def after_connect(self, connection):
|
||||
pass
|
||||
|
|
|
@ -177,5 +177,5 @@ class Message(object):
|
|||
details['delivery_info'] = {
|
||||
'routing_key': self.delivery_info['routing_key'],
|
||||
}
|
||||
return "<%s object at 0x%x with details %s>" % (
|
||||
return '<%s object at 0x%x with details %s>' % (
|
||||
self.__class__.__name__, id(self), details)
|
||||
|
|
|
@ -11,7 +11,7 @@ class FakeJob(object):
|
|||
|
||||
def stats(self):
|
||||
return {
|
||||
"tube": self.tube
|
||||
'tube': self.tube
|
||||
}
|
||||
|
||||
def delete(self):
|
||||
|
|
|
@ -30,22 +30,20 @@ class ExtraAssertionsMixin(object):
|
|||
"""A mixin class adding assertDictEqual and assertDictContainsSubset"""
|
||||
|
||||
def assertDictEqual(self, a, b):
|
||||
"""
|
||||
Test that two dictionaries are equal.
|
||||
"""Test that two dictionaries are equal.
|
||||
|
||||
Implemented here because this method was not available until Python
|
||||
2.6. This asserts that the unique set of keys are the same in a and b.
|
||||
Also asserts that the value of each key is the same in a and b using
|
||||
the is operator.
|
||||
|
||||
"""
|
||||
self.assertEqual(set(a.keys()), set(b.keys()))
|
||||
for key in a.keys():
|
||||
self.assertEqual(a[key], b[key])
|
||||
|
||||
def assertDictContainsSubset(self, a, b):
|
||||
"""
|
||||
Assert that all the key/value pairs in a exist in b.
|
||||
"""
|
||||
"""Assert that all the key/value pairs in a exist in b."""
|
||||
for key in a.keys():
|
||||
self.assertTrue(key in b)
|
||||
self.assertTrue(a[key] == b[key])
|
||||
|
@ -85,8 +83,8 @@ class TestQpidMessagingExceptionHandler(Case):
|
|||
try:
|
||||
decorated_fun()
|
||||
except:
|
||||
self.fail("QpidMessagingExceptionHandler allowed an exception "
|
||||
"to be raised that should have been silenced!")
|
||||
self.fail('QpidMessagingExceptionHandler allowed an exception '
|
||||
'to be raised that should have been silenced!')
|
||||
|
||||
def test_exception_negative(self):
|
||||
"""Assert that an exception that does not contain the
|
||||
|
@ -385,7 +383,7 @@ class TestConnectionInit(ExtraAssertionsMixin, ConnectionTestBase):
|
|||
except Exception as error:
|
||||
self.assertEqual(error.code, 999)
|
||||
else:
|
||||
self.fail("Connection should have thrown an exception")
|
||||
self.fail('Connection should have thrown an exception')
|
||||
|
||||
@patch.object(Transport, 'channel_errors', new=(MockException,))
|
||||
@patch(QPID_MODULE + '.qpid')
|
||||
|
@ -903,8 +901,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
|
|||
|
||||
def test_verify_Message_class_attribute(self):
|
||||
"""Verify that the class attribute Message refers to the Message
|
||||
object
|
||||
"""
|
||||
object."""
|
||||
self.assertIs(Message, Channel.Message)
|
||||
|
||||
def test_body_encoding_class_attribute(self):
|
||||
|
@ -913,8 +910,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
|
|||
|
||||
def test_codecs_class_attribute(self):
|
||||
"""Verify that the codecs class attribute has a correct key and
|
||||
value
|
||||
"""
|
||||
value."""
|
||||
self.assertIsInstance(Channel.codecs, dict)
|
||||
self.assertIn('base64', Channel.codecs)
|
||||
self.assertIsInstance(Channel.codecs['base64'], Base64)
|
||||
|
@ -925,8 +921,7 @@ class TestChannel(ExtraAssertionsMixin, Case):
|
|||
|
||||
def test_size(self):
|
||||
"""Test getting the number of messages in a queue specified by
|
||||
name and returning them.
|
||||
"""
|
||||
name and returning them."""
|
||||
message_count = 5
|
||||
queue = Mock(name='queue')
|
||||
queue_to_check = Mock(name='queue_to_check')
|
||||
|
@ -1937,7 +1932,7 @@ class TestTransportVerifyRuntimeEnvironment(Case):
|
|||
self.verify_runtime_environment(self.transport)
|
||||
except Exception:
|
||||
self.fail(
|
||||
"verify_runtime_environment raised an unexpected Exception")
|
||||
'verify_runtime_environment raised an unexpected Exception')
|
||||
|
||||
|
||||
@case_no_python3
|
||||
|
|
|
@ -41,10 +41,10 @@ class Channel(virtual.Channel):
|
|||
|
||||
This function will change '@' to '.'.
|
||||
"""
|
||||
if "@" not in tube_name:
|
||||
if '@' not in tube_name:
|
||||
new_tube_name = tube_name
|
||||
else:
|
||||
new_tube_name = tube_name.replace("@", ".")
|
||||
new_tube_name = tube_name.replace('@', '.')
|
||||
self._tube_map[new_tube_name] = tube_name
|
||||
return new_tube_name
|
||||
|
||||
|
|
|
@ -92,9 +92,9 @@ except ImportError: # pragma: no cover
|
|||
|
||||
# Prepare for Monkey Patch 2
|
||||
def resolve_declare_monkey(self, sst, lnk, dir, action): # pragma: no cover
|
||||
declare = lnk.options.get("create") in ("always", dir)
|
||||
assrt = lnk.options.get("assert") in ("always", dir)
|
||||
requested_type = lnk.options.get("node", {}).get("type")
|
||||
declare = lnk.options.get('create') in ('always', dir)
|
||||
assrt = lnk.options.get('assert') in ('always', dir)
|
||||
requested_type = lnk.options.get('node', {}).get('type')
|
||||
|
||||
def do_resolved(type, subtype):
|
||||
err = None
|
||||
|
@ -102,13 +102,13 @@ def resolve_declare_monkey(self, sst, lnk, dir, action): # pragma: no cover
|
|||
if declare:
|
||||
err = self.declare(sst, lnk, action)
|
||||
else:
|
||||
err = NotFound(text="no such queue: %s" % lnk.name)
|
||||
err = NotFound(text='no such queue: %s' % lnk.name)
|
||||
else:
|
||||
if assrt:
|
||||
expected = lnk.options.get("node", {}).get("type")
|
||||
expected = lnk.options.get('node', {}).get('type')
|
||||
if expected and type != expected:
|
||||
err = AssertionFailed(
|
||||
text="expected %s, got %s" % (expected, type))
|
||||
text='expected %s, got %s' % (expected, type))
|
||||
if err is None:
|
||||
action(type, subtype)
|
||||
if err:
|
||||
|
@ -139,16 +139,16 @@ def resolve_monkey(self, sst, name, action, force=False,
|
|||
def do_action(r):
|
||||
do_result(r)
|
||||
er, qr = args
|
||||
if node_type == "topic" and not er.not_found:
|
||||
type, subtype = "topic", er.type
|
||||
elif node_type == "queue" and qr.queue:
|
||||
type, subtype = "queue", None
|
||||
if node_type == 'topic' and not er.not_found:
|
||||
type, subtype = 'topic', er.type
|
||||
elif node_type == 'queue' and qr.queue:
|
||||
type, subtype = 'queue', None
|
||||
elif er.not_found and not qr.queue:
|
||||
type, subtype = None, None
|
||||
elif qr.queue:
|
||||
type, subtype = "queue", None
|
||||
type, subtype = 'queue', None
|
||||
else:
|
||||
type, subtype = "topic", er.type
|
||||
type, subtype = 'topic', er.type
|
||||
if type is not None:
|
||||
self.address_cache[name] = (type, subtype)
|
||||
action(type, subtype)
|
||||
|
|
|
@ -164,7 +164,7 @@ class Channel(virtual.Channel):
|
|||
host_port = (conninfo.hostname, conninfo.port or DEFAULT_PORT)
|
||||
if host_port not in hosts:
|
||||
hosts.insert(0, host_port)
|
||||
conn_str = ",".join(['%s:%s' % (h, p) for h, p in hosts])
|
||||
conn_str = ','.join(['%s:%s' % (h, p) for h, p in hosts])
|
||||
conn = KazooClient(conn_str)
|
||||
conn.start()
|
||||
return conn
|
||||
|
|
Loading…
Reference in New Issue