diff --git a/Changelog b/Changelog index f9be9733..5da016b9 100644 --- a/Changelog +++ b/Changelog @@ -42,7 +42,7 @@ can be used to check if the connection instance has established a connection. -* ``ConnectionPool.acquire_channel` now returns the connections +* ``ConnectionPool.acquire_channel`` now returns the connections default channel rather than establising a new channel that must be manually handled. diff --git a/contrib/release/flakeplus.py b/contrib/release/flakeplus.py new file mode 100755 index 00000000..6fe1f1fc --- /dev/null +++ b/contrib/release/flakeplus.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +from __future__ import absolute_import +from __future__ import with_statement + +import os +import re +import sys + +from collections import defaultdict +from unipath import Path + +RE_COMMENT = r'^\s*\#' +RE_NOQA = r'.+?\#\s+noqa+' +RE_MULTILINE_COMMENT_O = r'^\s*(?:\'\'\'|""").+?(?:\'\'\'|""")' +RE_MULTILINE_COMMENT_S = r'^\s*(?:\'\'\'|""")' +RE_MULTILINE_COMMENT_E = r'(?:^|.+?)(?:\'\'\'|""")' +RE_WITH = r'(?:^|\s+)with\s+' +RE_WITH_IMPORT = r'''from\s+ __future__\s+ import\s+ with_statement''' +RE_PRINT = r'''(?:^|\s+)print\((?:"|')(?:\W+?)?[A-Z0-9:]{2,}''' +RE_ABS_IMPORT = r'''from\s+ __future__\s+ import\s+ absolute_import''' + +acc = defaultdict(lambda: {"abs": False, "print": False}) + + +def compile(regex): + return re.compile(regex, re.VERBOSE) + + +class FlakePP(object): + re_comment = compile(RE_COMMENT) + re_ml_comment_o = compile(RE_MULTILINE_COMMENT_O) + re_ml_comment_s = compile(RE_MULTILINE_COMMENT_S) + re_ml_comment_e = compile(RE_MULTILINE_COMMENT_E) + re_abs_import = compile(RE_ABS_IMPORT) + re_print = compile(RE_PRINT) + re_with_import = compile(RE_WITH_IMPORT) + re_with = compile(RE_WITH) + re_noqa = compile(RE_NOQA) + map = {"abs": True, "print": False, + "with": False, "with-used": False} + + def __init__(self, verbose=False): + self.verbose = verbose + self.steps = (("abs", self.re_abs_import), + ("with", self.re_with_import), + ("with-used", self.re_with), + ("print", self.re_print)) + + def analyze_fh(self, fh): + steps = self.steps + filename = fh.name + acc = dict(self.map) + index = 0 + errors = [0] + + def error(fmt, **kwargs): + errors[0] += 1 + self.announce(fmt, **dict(kwargs, filename=filename)) + + for index, line in enumerate(self.strip_comments(fh)): + for key, pattern in steps: + if pattern.match(line): + acc[key] = True + if index: + if not acc["abs"]: + error("%(filename)s: missing abs import") + if acc["with-used"] and not acc["with"]: + error("%(filename)s: missing with import") + if acc["print"]: + error("%(filename)s: left over print statement") + + return filename, errors[0], acc + + def analyze_file(self, filename): + with open(filename) as fh: + return self.analyze_fh(fh) + + def analyze_tree(self, dir): + for dirpath, _, filenames in os.walk(dir): + for path in (Path(dirpath, f) for f in filenames): + if path.endswith(".py"): + yield self.analyze_file(path) + + def analyze(self, *paths): + for path in map(Path, paths): + if path.isdir(): + for res in self.analyze_tree(path): + yield res + else: + yield self.analyze_file(path) + + def strip_comments(self, fh): + re_comment = self.re_comment + re_ml_comment_o = self.re_ml_comment_o + re_ml_comment_s = self.re_ml_comment_s + re_ml_comment_e = self.re_ml_comment_e + re_noqa = self.re_noqa + in_ml = False + + for line in fh.readlines(): + if in_ml: + if re_ml_comment_e.match(line): + in_ml = False + else: + if re_noqa.match(line) or re_ml_comment_o.match(line): + pass + elif re_ml_comment_s.match(line): + in_ml = True + elif re_comment.match(line): + pass + else: + yield line + + def announce(self, fmt, **kwargs): + sys.stderr.write((fmt + "\n") % kwargs) + + +def main(argv=sys.argv, exitcode=0): + for _, errors, _ in FlakePP(verbose=True).analyze(*argv[1:]): + if errors: + exitcode = 1 + return exitcode + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 59925ada..709e8366 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -13,6 +13,7 @@ kombu.messaging kombu.entity kombu.common + kombu.mixins kombu.clocks kombu.compat kombu.pidbox @@ -37,6 +38,7 @@ kombu.abstract kombu.syn kombu.utils + kombu.utils.limits kombu.utils.compat kombu.utils.debug kombu.utils.encoding diff --git a/docs/reference/kombu.mixins.rst b/docs/reference/kombu.mixins.rst new file mode 100644 index 00000000..7a87ac27 --- /dev/null +++ b/docs/reference/kombu.mixins.rst @@ -0,0 +1,11 @@ +========================================================== + Mixin Classes - kombu.mixins +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.mixins + +.. automodule:: kombu.mixins + :members: + :undoc-members: diff --git a/docs/reference/kombu.utils.limits.rst b/docs/reference/kombu.utils.limits.rst new file mode 100644 index 00000000..59df5508 --- /dev/null +++ b/docs/reference/kombu.utils.limits.rst @@ -0,0 +1,11 @@ +========================================================== + Rate limiting - kombu.utils.limits +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.limits + +.. automodule:: kombu.utils.limits + :members: + :undoc-members: diff --git a/docs/userguide/examples.rst b/docs/userguide/examples.rst index ed807780..6e9c4e3d 100644 --- a/docs/userguide/examples.rst +++ b/docs/userguide/examples.rst @@ -1,3 +1,11 @@ +.. _examples: + +======================== + Examples +======================== + +.. _task-queue-example: + Task Queue Example ================== diff --git a/docs/userguide/index.rst b/docs/userguide/index.rst index b400e289..0f427251 100644 --- a/docs/userguide/index.rst +++ b/docs/userguide/index.rst @@ -9,7 +9,7 @@ :maxdepth: 2 connections - simple examples + simple pools serialization diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py index 489aa2e6..edf63f76 100644 --- a/examples/simple_task_queue/client.py +++ b/examples/simple_task_queue/client.py @@ -9,6 +9,7 @@ priority_to_routing_key = {"high": "hipri", "mid": "midpri", "low": "lopri"} + def send_as_task(connection, fun, args, kwargs, priority="mid"): payload = {"fun": fun, "args": args, "kwargs": kwargs} routing_key = priority_to_routing_key[priority] @@ -23,6 +24,6 @@ if __name__ == "__main__": from kombu import BrokerConnection from .tasks import hello_task - conection = BrokerConnection("amqp://guest:guest@localhost:5672//") + connection = BrokerConnection("amqp://guest:guest@localhost:5672//") send_as_task(connection, fun=hello_task, args=("Kombu", ), priority="high") diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py index 5f1feef2..680e7575 100644 --- a/examples/simple_task_queue/queues.py +++ b/examples/simple_task_queue/queues.py @@ -4,4 +4,3 @@ task_exchange = Exchange("tasks", type="direct") task_queues = [Queue("hipri", task_exchange, routing_key="hipri"), Queue("midpri", task_exchange, routing_key="midpri"), Queue("lopri", task_exchange, routing_key="lopri")] - diff --git a/examples/simple_task_queue/tasks.py b/examples/simple_task_queue/tasks.py index 47146e45..f6e9da03 100644 --- a/examples/simple_task_queue/tasks.py +++ b/examples/simple_task_queue/tasks.py @@ -1,4 +1,2 @@ def hello_task(who="world"): print("Hello %s" % (who, )) - - diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 68e7d6b6..fb9e9a7c 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -1,11 +1,11 @@ from __future__ import with_statement -from kombu import Exchange, Queue from kombu.mixins import ConsumerMixin from kombu.utils import kwdict from queues import task_queues + class Worker(ConsumerMixin): def get_consumers(self, Consumer, channel): @@ -26,5 +26,3 @@ if __name__ == "__main__": with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: Worker(conn).run() - - diff --git a/kombu/clocks.py b/kombu/clocks.py index 2e6a7b9b..ac3f9f53 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -33,8 +33,12 @@ class LamportClock(object): .. seealso:: - http://en.wikipedia.org/wiki/Lamport_timestamps - http://en.wikipedia.org/wiki/Lamport's_Distributed_Mutual_Exclusion_Algorithm + * `Lamport timestamps`_ + + * `Lamports distributed mutex`_ + + .. _`Lamport Timestamps`: http://en.wikipedia.org/wiki/Lamport_timestamps + .. _`Lamports distributed mutex`: http://bit.ly/p99ybE *Usage* diff --git a/kombu/connection.py b/kombu/connection.py index dc348433..f278212c 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -19,7 +19,7 @@ from urlparse import urlparse try: from urlparse import parse_qsl except ImportError: - from cgi import parse_qsl + from cgi import parse_qsl # noqa from kombu import exceptions from kombu.transport import get_transport_cls diff --git a/kombu/mixins.py b/kombu/mixins.py index 97d44ba9..5f637a41 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -12,7 +12,6 @@ from __future__ import absolute_import from __future__ import with_statement import socket -import sys from contextlib import nested, contextmanager from functools import partial diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index 9db5057d..bef67d31 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -280,9 +280,10 @@ class LifoQueue(Queue): try: from collections import defaultdict except ImportError: + # Written by Jason Kirtland, taken from Python Cookbook: # - class defaultdict(dict): + class defaultdict(dict): # noqa def __init__(self, default_factory=None, *args, **kwargs): dict.__init__(self, *args, **kwargs) @@ -321,4 +322,3 @@ except ImportError: dict.__repr__(self)) import collections collections.defaultdict = defaultdict # Pickle needs this. - diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py index 3f9e4fce..4a4e3d41 100644 --- a/kombu/utils/encoding.py +++ b/kombu/utils/encoding.py @@ -13,6 +13,7 @@ import traceback if sys.version_info >= (3, 0): + def str_to_bytes(s): if isinstance(s, str): return s.encode() @@ -23,18 +24,24 @@ if sys.version_info >= (3, 0): return s.decode() return s -else: # noqa - def str_to_bytes(s): +else: + + def str_to_bytes(s): # noqa return s - def bytes_to_str(s): + def bytes_to_str(s): # noqa return s -def default_encoding(): - if sys.platform.startswith("java"): +if sys.platform.startswith("java"): + + def default_encoding(): return "utf-8" - return sys.getfilesystemencoding() + +else: + + def default_encoding(): # noqa + return sys.getfilesystemencoding() def safe_str(s, errors="replace"): diff --git a/pavement.py b/pavement.py index 4c0c2070..45e6d54a 100644 --- a/pavement.py +++ b/pavement.py @@ -67,11 +67,6 @@ def verifyindex(options): sh("contrib/release/verify-reference-index.sh") -@task -def flakes(options): - sh("find kombu funtests examples -name '*.py' | xargs pyflakes") - - @task def clean_readme(options): path("README").unlink() @@ -122,6 +117,25 @@ def flake8(options): '""" % (complexity, ), ignore_error=noerror) +@task +@cmdopts([ + ("noerror", "E", "Ignore errors"), +]) +def flakeplus(options): + noerror = getattr(options, "noerror", False) + sh("python contrib/release/flakeplus.py kombu", + ignore_error=noerror) + + +@task +@cmdopts([ + ("noerror", "E", "Ignore errors"), +]) +def flakes(options): + flake8(options) + flakeplus(options) + + @task @cmdopts([ ("noerror", "E", "Ignore errors"), @@ -150,7 +164,7 @@ def gitcleanforce(options): @task -@needs("flake8", "autodoc", "verifyindex", "test", "gitclean") +@needs("flakes", "autodoc", "verifyindex", "test", "gitclean") def releaseok(options): pass