From 85c191a48ace265836bbce39a2c8a5fab98833b8 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sat, 16 Jul 2016 13:33:32 -0700 Subject: [PATCH] Reorganizes kombu.utils.__init__ package --- docs/conf.py | 1 + docs/reference/index.rst | 25 +- docs/reference/kombu.utils.collections.rst | 11 + docs/reference/kombu.utils.compat.rst | 11 + .../{kombu.utils.rst => kombu.utils.div.rst} | 6 +- docs/reference/kombu.utils.imports.rst | 11 + docs/reference/kombu.utils.objects.rst | 11 + docs/reference/kombu.utils.uuid.rst | 11 + docs/userguide/consumers.rst | 2 +- examples/rpc-tut6/rpc_client.py | 3 +- examples/simple_task_queue/worker.py | 2 +- funtests/tests/test_mongodb.py | 2 +- kombu/abstract.py | 2 +- kombu/async/debug.py | 2 +- kombu/async/http/base.py | 4 +- kombu/async/hub.py | 3 +- kombu/common.py | 2 +- kombu/connection.py | 6 +- kombu/log.py | 2 +- kombu/messaging.py | 10 +- kombu/mixins.py | 3 +- kombu/pidbox.py | 5 +- kombu/pools.py | 3 +- kombu/resource.py | 2 +- kombu/serialization.py | 2 +- kombu/tests/async/aws/sqs/test_connection.py | 2 +- kombu/tests/async/aws/sqs/test_message.py | 2 +- kombu/tests/test_messaging.py | 2 +- kombu/tests/test_pidbox.py | 2 +- kombu/tests/test_pools.py | 2 +- kombu/tests/transport/virtual/test_base.py | 4 +- kombu/tests/utils/test_compat.py | 32 ++ kombu/tests/utils/test_div.py | 51 +++ kombu/tests/utils/test_functional.py | 148 +++++- kombu/tests/utils/test_imports.py | 37 ++ kombu/tests/utils/test_objects.py | 55 +++ kombu/tests/utils/test_utils.py | 328 +------------ kombu/tests/utils/test_uuid.py | 17 + kombu/transport/SLMQ.py | 2 +- kombu/transport/SQS.py | 4 +- kombu/transport/__init__.py | 2 +- kombu/transport/base.py | 2 +- kombu/transport/consul.py | 2 +- kombu/transport/filesystem.py | 2 +- kombu/transport/mongodb.py | 2 +- kombu/transport/pyro.py | 2 +- kombu/transport/redis.py | 4 +- kombu/transport/virtual/__init__.py | 3 +- kombu/transport/virtual/exchange.py | 4 +- kombu/utils/__init__.py | 432 +----------------- kombu/utils/collections.py | 37 ++ kombu/utils/compat.py | 92 ++++ kombu/utils/div.py | 32 ++ kombu/utils/functional.py | 123 +++++ kombu/utils/imports.py | 65 +++ kombu/utils/objects.py | 63 +++ kombu/utils/scheduling.py | 2 +- kombu/utils/text.py | 7 + kombu/utils/uuid.py | 13 + 59 files changed, 905 insertions(+), 814 deletions(-) create mode 100644 docs/reference/kombu.utils.collections.rst create mode 100644 docs/reference/kombu.utils.compat.rst rename docs/reference/{kombu.utils.rst => kombu.utils.div.rst} (63%) create mode 100644 docs/reference/kombu.utils.imports.rst create mode 100644 docs/reference/kombu.utils.objects.rst create mode 100644 docs/reference/kombu.utils.uuid.rst create mode 100644 kombu/tests/utils/test_compat.py create mode 100644 kombu/tests/utils/test_div.py create mode 100644 kombu/tests/utils/test_imports.py create mode 100644 kombu/tests/utils/test_objects.py create mode 100644 kombu/tests/utils/test_uuid.py create mode 100644 kombu/utils/collections.py create mode 100644 kombu/utils/compat.py create mode 100644 kombu/utils/div.py create mode 100644 kombu/utils/imports.py create mode 100644 kombu/utils/objects.py create mode 100644 kombu/utils/uuid.py diff --git a/docs/conf.py b/docs/conf.py index f2f7482a..75fefc1c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -25,5 +25,6 @@ globals().update(conf.build_config( 'kombu.async.aws.ext', 'kombu.async.aws.sqs.ext', 'kombu.transport.qpid_patches', + 'kombu.utils', ], )) diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 6c3285ca..5e963c9b 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -55,15 +55,20 @@ kombu.transport.virtual kombu.transport.virtual.exchange kombu.serialization - kombu.utils - kombu.utils.scheduling - kombu.utils.eventio - kombu.utils.limits - kombu.utils.debug - kombu.utils.encoding - kombu.utils.functional - kombu.utils.json - kombu.utils.url - kombu.utils.text kombu.utils.amq_manager + kombu.utils.collections + kombu.utils.compat + kombu.utils.debug + kombu.utils.div + kombu.utils.encoding + kombu.utils.eventio + kombu.utils.functional + kombu.utils.imports + kombu.utils.json + kombu.utils.limits + kombu.utils.objects + kombu.utils.scheduling + kombu.utils.text + kombu.utils.url + kombu.utils.uuid kombu.five diff --git a/docs/reference/kombu.utils.collections.rst b/docs/reference/kombu.utils.collections.rst new file mode 100644 index 00000000..dc84ce32 --- /dev/null +++ b/docs/reference/kombu.utils.collections.rst @@ -0,0 +1,11 @@ +========================================================== + Custom Collections - ``kombu.utils.collections`` +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.collections + +.. automodule:: kombu.utils.collections + :members: + :undoc-members: diff --git a/docs/reference/kombu.utils.compat.rst b/docs/reference/kombu.utils.compat.rst new file mode 100644 index 00000000..1935a1c6 --- /dev/null +++ b/docs/reference/kombu.utils.compat.rst @@ -0,0 +1,11 @@ +========================================================== + Python Compatibility - ``kombu.utils.compat`` +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.compat + +.. automodule:: kombu.utils.compat + :members: + :undoc-members: diff --git a/docs/reference/kombu.utils.rst b/docs/reference/kombu.utils.div.rst similarity index 63% rename from docs/reference/kombu.utils.rst rename to docs/reference/kombu.utils.div.rst index a267d380..6fb07d8e 100644 --- a/docs/reference/kombu.utils.rst +++ b/docs/reference/kombu.utils.div.rst @@ -1,11 +1,11 @@ ========================================================== - Utilities - ``kombu.utils`` + Div Utilities - ``kombu.utils.div`` ========================================================== .. contents:: :local: -.. currentmodule:: kombu.utils +.. currentmodule:: kombu.utils.div -.. automodule:: kombu.utils +.. automodule:: kombu.utils.div :members: :undoc-members: diff --git a/docs/reference/kombu.utils.imports.rst b/docs/reference/kombu.utils.imports.rst new file mode 100644 index 00000000..36fdafe1 --- /dev/null +++ b/docs/reference/kombu.utils.imports.rst @@ -0,0 +1,11 @@ +========================================================== + Module Importing Utilities - ``kombu.utils.imports`` +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.imports + +.. automodule:: kombu.utils.imports + :members: + :undoc-members: diff --git a/docs/reference/kombu.utils.objects.rst b/docs/reference/kombu.utils.objects.rst new file mode 100644 index 00000000..3f411ae7 --- /dev/null +++ b/docs/reference/kombu.utils.objects.rst @@ -0,0 +1,11 @@ +========================================================== + Object/Property Utilities - ``kombu.utils.objects`` +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.objects + +.. automodule:: kombu.utils.objects + :members: + :undoc-members: diff --git a/docs/reference/kombu.utils.uuid.rst b/docs/reference/kombu.utils.uuid.rst new file mode 100644 index 00000000..5af7ff6b --- /dev/null +++ b/docs/reference/kombu.utils.uuid.rst @@ -0,0 +1,11 @@ +========================================================== + UUID Utilities - ``kombu.utils.uuid`` +========================================================== + +.. contents:: + :local: +.. currentmodule:: kombu.utils.uuid + +.. automodule:: kombu.utils.uuid + :members: + :undoc-members: diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst index e76a88b6..02ed2cf0 100644 --- a/docs/userguide/consumers.rst +++ b/docs/userguide/consumers.rst @@ -37,7 +37,7 @@ Draining events from several consumers: .. code-block:: python - from kombu.utils import nested + from kombu.utils.compat import nested with connection.channel(), connection.channel() as (channel1, channel2): with nested(Consumer(channel1, queues1, accept=['json']), diff --git a/examples/rpc-tut6/rpc_client.py b/examples/rpc-tut6/rpc_client.py index 27a23903..0d90bed6 100644 --- a/examples/rpc-tut6/rpc_client.py +++ b/examples/rpc-tut6/rpc_client.py @@ -1,8 +1,7 @@ #!/usr/bin/env python from __future__ import absolute_import, unicode_literals -from kombu import Connection, Producer, Consumer, Queue -from kombu.utils import uuid +from kombu import Connection, Producer, Consumer, Queue, uuid class FibonacciRpcClient(object): diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py index 6b617cf0..bd2dd7f8 100644 --- a/examples/simple_task_queue/worker.py +++ b/examples/simple_task_queue/worker.py @@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals from kombu.mixins import ConsumerMixin from kombu.log import get_logger -from kombu.utils import reprcall +from kombu.utils.functional import reprcall from .queues import task_queues diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py index 2f41972c..dcf00ef1 100644 --- a/funtests/tests/test_mongodb.py +++ b/funtests/tests/test_mongodb.py @@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals from kombu import Consumer, Producer, Exchange, Queue from kombu.five import range -from kombu.utils import nested +from kombu.utils.compat import nested from funtests import transport diff --git a/kombu/abstract.py b/kombu/abstract.py index 6f9423fd..e120fbd6 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -6,7 +6,7 @@ from copy import copy from .connection import maybe_channel from .exceptions import NotBoundError from .five import python_2_unicode_compatible -from .utils import ChannelPromise +from .utils.functional import ChannelPromise __all__ = ['Object', 'MaybeChannelBound'] diff --git a/kombu/async/debug.py b/kombu/async/debug.py index d1aebe69..6385d233 100644 --- a/kombu/async/debug.py +++ b/kombu/async/debug.py @@ -2,8 +2,8 @@ from __future__ import absolute_import, unicode_literals from kombu.five import items, string_t -from kombu.utils import reprcall from kombu.utils.eventio import READ, WRITE, ERR +from kombu.utils.functional import reprcall def repr_flag(flag): diff --git a/kombu/async/http/base.py b/kombu/async/http/base.py index e39f0ae9..92c6bdf4 100644 --- a/kombu/async/http/base.py +++ b/kombu/async/http/base.py @@ -6,7 +6,7 @@ from vine import Thenable, promise, maybe_promise from kombu.exceptions import HttpError from kombu.five import items, python_2_unicode_compatible -from kombu.utils import coro +from kombu.utils.compat import coro from kombu.utils.encoding import bytes_to_str from kombu.utils.functional import maybe_list, memoize @@ -223,7 +223,7 @@ class BaseClient(object): self._header_parser = header_parser() def perform(self, request, **kwargs): - for req in maybe_list(request): + for req in maybe_list(request) or []: if not isinstance(req, self.Request): req = self.Request(req, **kwargs) self.add_request(req) diff --git a/kombu/async/hub.py b/kombu/async/hub.py index cf2abde8..5e3da78d 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -12,8 +12,9 @@ from vine import Thenable, promise from kombu.five import Empty, python_2_unicode_compatible, range from kombu.log import get_logger -from kombu.utils import cached_property, fileno +from kombu.utils.compat import fileno from kombu.utils.eventio import READ, WRITE, ERR, poll +from kombu.utils.objects import cached_property from .timer import Timer diff --git a/kombu/common.py b/kombu/common.py index b2d811c0..fa75cd9f 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -17,7 +17,7 @@ from .entity import Exchange, Queue from .five import bytes_if_py2, range from .log import get_logger from .serialization import registry as serializers -from .utils import uuid +from .utils.uuid import uuid try: from _thread import get_ident diff --git a/kombu/connection.py b/kombu/connection.py index d6e4b0eb..3f08663d 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -12,12 +12,14 @@ from operator import itemgetter # jython breaks on relative import for .exceptions for some reason # (Issue #112) from kombu import exceptions + from .five import bytes_if_py2, python_2_unicode_compatible, string_t, text_t from .log import get_logger from .resource import Resource from .transport import get_transport_cls, supports_librabbitmq -from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq -from .utils.functional import dictfilter, lazy +from .utils.collections import HashedSeq +from .utils.functional import dictfilter, lazy, retry_over_time, shufflecycle +from .utils.objects import cached_property from .utils.url import as_url, parse_url, quote, urlparse __all__ = ['Connection', 'ConnectionPool', 'ChannelPool'] diff --git a/kombu/log.py b/kombu/log.py index 92a9bb21..7428e5e1 100644 --- a/kombu/log.py +++ b/kombu/log.py @@ -8,9 +8,9 @@ import sys from logging.handlers import WatchedFileHandler from .five import string_t -from .utils import cached_property from .utils.encoding import safe_repr, safe_str from .utils.functional import maybe_evaluate +from .utils.objects import cached_property __all__ = ['LogMixin', 'LOG_LEVELS', 'get_loglevel', 'setup_logging'] diff --git a/kombu/messaging.py b/kombu/messaging.py index a99e1723..ee2ab7e0 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -1,10 +1,4 @@ -""" -kombu.messaging -=============== - -Sending and receiving messages. - -""" +"""Sending and receiving messages.""" from __future__ import absolute_import, unicode_literals from itertools import count @@ -16,7 +10,7 @@ from .entity import Exchange, Queue, maybe_delivery_mode from .exceptions import ContentDisallowed from .five import items, python_2_unicode_compatible, text_t, values from .serialization import dumps, prepare_accept_content -from .utils import ChannelPromise, maybe_list +from .utils.functional import ChannelPromise, maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] diff --git a/kombu/mixins.py b/kombu/mixins.py index bb930c1f..112ce9f4 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -19,9 +19,10 @@ from .common import ignore_errors from .five import range from .messaging import Consumer, Producer from .log import get_logger -from .utils import cached_property, nested +from .utils.compat import nested from .utils.encoding import safe_repr from .utils.limits import TokenBucket +from .utils.objects import cached_property __all__ = ['ConsumerMixin'] diff --git a/kombu/pidbox.py b/kombu/pidbox.py index ee443495..99485ca8 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -17,8 +17,9 @@ from .common import maybe_declare, oid_from from .exceptions import InconsistencyError from .five import range from .log import get_logger -from .utils import cached_property, uuid, reprcall -from .utils.functional import maybe_evaluate +from .utils.functional import maybe_evaluate, reprcall +from .utils.objects import cached_property +from .utils.uuid import uuid REPLY_QUEUE_EXPIRES = 10 diff --git a/kombu/pools.py b/kombu/pools.py index bc1bcb48..ed5fc521 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -8,7 +8,8 @@ from itertools import chain from .connection import Resource from .five import range, values from .messaging import Producer -from .utils import EqualityDict, register_after_fork +from .utils.collections import EqualityDict +from .utils.compat import register_after_fork from .utils.functional import lazy __all__ = ['ProducerPool', 'PoolGroup', 'register_group', diff --git a/kombu/resource.py b/kombu/resource.py index 43d76911..11e44366 100644 --- a/kombu/resource.py +++ b/kombu/resource.py @@ -7,7 +7,7 @@ from collections import deque from . import exceptions from .five import Empty, LifoQueue as _LifoQueue -from .utils import register_after_fork +from .utils.compat import register_after_fork from .utils.functional import lazy diff --git a/kombu/serialization.py b/kombu/serialization.py index a4660f71..78233ad6 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -19,7 +19,7 @@ from .exceptions import ( ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled ) from .five import reraise, text_t -from .utils import entrypoints +from .utils.compat import entrypoints from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t __all__ = ['pickle', 'loads', 'dumps', 'register', 'unregister'] diff --git a/kombu/tests/async/aws/sqs/test_connection.py b/kombu/tests/async/aws/sqs/test_connection.py index a36c8085..9a9dcdf0 100644 --- a/kombu/tests/async/aws/sqs/test_connection.py +++ b/kombu/tests/async/aws/sqs/test_connection.py @@ -6,7 +6,7 @@ from kombu.async.aws.sqs.connection import ( ) from kombu.async.aws.sqs.message import AsyncMessage from kombu.async.aws.sqs.queue import AsyncQueue -from kombu.utils import uuid +from kombu.utils.uuid import uuid from kombu.tests.async.aws.case import AWSCase from kombu.tests.case import PromiseMock, Mock diff --git a/kombu/tests/async/aws/sqs/test_message.py b/kombu/tests/async/aws/sqs/test_message.py index 29107332..0f1a033b 100644 --- a/kombu/tests/async/aws/sqs/test_message.py +++ b/kombu/tests/async/aws/sqs/test_message.py @@ -5,7 +5,7 @@ from kombu.async.aws.sqs.message import AsyncMessage from kombu.tests.async.aws.case import AWSCase from kombu.tests.case import PromiseMock, Mock -from kombu.utils import uuid +from kombu.utils.uuid import uuid class test_AsyncMessage(AWSCase): diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 7a0fc134..8f48f7cd 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -7,8 +7,8 @@ from collections import defaultdict from kombu import Connection, Consumer, Producer, Exchange, Queue from kombu.exceptions import MessageStateError -from kombu.utils import ChannelPromise from kombu.utils import json +from kombu.utils.functional import ChannelPromise from .case import Case, Mock, patch from .mocks import Transport diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py index 1fa02ead..852dbbf5 100644 --- a/kombu/tests/test_pidbox.py +++ b/kombu/tests/test_pidbox.py @@ -6,7 +6,7 @@ import warnings from kombu import Connection from kombu import pidbox from kombu.exceptions import ContentDisallowed, InconsistencyError -from kombu.utils import uuid +from kombu.utils.uuid import uuid from .case import Case, Mock, patch diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index 05bf5bdc..7a632eee 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals from kombu import Connection, Producer from kombu import pools from kombu.connection import ConnectionPool -from kombu.utils import eqhash +from kombu.utils.collections import eqhash from .case import Case, Mock diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index a0c7b945..8b73a7fa 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -4,10 +4,10 @@ import sys import warnings from kombu import Connection +from kombu.compression import compress from kombu.exceptions import ResourceError, ChannelError from kombu.transport import virtual -from kombu.utils import uuid -from kombu.compression import compress +from kombu.utils.uuid import uuid from kombu.tests.case import Case, MagicMock, Mock, mock, patch diff --git a/kombu/tests/utils/test_compat.py b/kombu/tests/utils/test_compat.py new file mode 100644 index 00000000..e4bd2c7a --- /dev/null +++ b/kombu/tests/utils/test_compat.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.utils.compat import entrypoints, maybe_fileno + +from kombu.tests.case import Case, Mock, mock, patch + + +class test_entrypoints(Case): + + @mock.mask_modules('pkg_resources') + def test_without_pkg_resources(self): + self.assertListEqual(list(entrypoints('kombu.test')), []) + + @mock.module_exists('pkg_resources') + def test_with_pkg_resources(self): + with patch('pkg_resources.iter_entry_points', create=True) as iterep: + eps = iterep.return_value = [Mock(), Mock()] + + self.assertTrue(list(entrypoints('kombu.test'))) + iterep.assert_called_with('kombu.test') + eps[0].load.assert_called_with() + eps[1].load.assert_called_with() + + +class test_maybe_fileno(Case): + + def test_maybe_fileno(self): + self.assertEqual(maybe_fileno(3), 3) + f = Mock(name='file') + self.assertIs(maybe_fileno(f), f.fileno()) + f.fileno.side_effect = ValueError() + self.assertIsNone(maybe_fileno(f)) diff --git a/kombu/tests/utils/test_div.py b/kombu/tests/utils/test_div.py new file mode 100644 index 00000000..513cb1b7 --- /dev/null +++ b/kombu/tests/utils/test_div.py @@ -0,0 +1,51 @@ +from __future__ import absolute_import, unicode_literals + +import pickle + +from io import StringIO, BytesIO + +from kombu.utils.div import emergency_dump_state + +from kombu.tests.case import Case, mock + + +class MyStringIO(StringIO): + + def close(self): + pass + + +class MyBytesIO(BytesIO): + + def close(self): + pass + + +class test_emergency_dump_state(Case): + + @mock.stdouts + def test_dump(self, stdout, stderr): + fh = MyBytesIO() + + emergency_dump_state( + {'foo': 'bar'}, open_file=lambda n, m: fh) + self.assertDictEqual( + pickle.loads(fh.getvalue()), {'foo': 'bar'}) + self.assertTrue(stderr.getvalue()) + self.assertFalse(stdout.getvalue()) + + @mock.stdouts + def test_dump_second_strategy(self, stdout, stderr): + fh = MyStringIO() + + def raise_something(*args, **kwargs): + raise KeyError('foo') + + emergency_dump_state( + {'foo': 'bar'}, + open_file=lambda n, m: fh, dump=raise_something + ) + self.assertIn('foo', fh.getvalue()) + self.assertIn('bar', fh.getvalue()) + self.assertTrue(stderr.getvalue()) + self.assertFalse(stdout.getvalue()) diff --git a/kombu/tests/utils/test_functional.py b/kombu/tests/utils/test_functional.py index 5f26df32..e953979f 100644 --- a/kombu/tests/utils/test_functional.py +++ b/kombu/tests/utils/test_functional.py @@ -5,9 +5,44 @@ import pickle from itertools import count from kombu.five import items -from kombu.utils.functional import LRUCache, memoize, lazy, maybe_evaluate +from kombu.utils import functional as utils +from kombu.utils.functional import ( + ChannelPromise, LRUCache, fxrange, fxrangemax, memoize, lazy, + maybe_evaluate, maybe_list, reprcall, reprkwargs, retry_over_time, +) -from kombu.tests.case import Case, skip +from kombu.tests.case import Case, Mock, mock, skip + + +class test_ChannelPromise(Case): + + def test_repr(self): + obj = Mock(name='cb') + self.assertIn( + 'promise', + repr(ChannelPromise(obj)), + ) + obj.assert_not_called() + + +class test_shufflecycle(Case): + + def test_shuffles(self): + prev_repeat, utils.repeat = utils.repeat, Mock() + try: + utils.repeat.return_value = list(range(10)) + values = {'A', 'B', 'C'} + cycle = utils.shufflecycle(values) + seen = set() + for i in range(10): + next(cycle) + utils.repeat.assert_called_with(None) + self.assertTrue(seen.issubset(values)) + with self.assertRaises(StopIteration): + next(cycle) + next(cycle) + finally: + utils.repeat = prev_repeat def double(x): @@ -141,3 +176,112 @@ class test_maybe_evaluate(Case): def test_evaluates(self): self.assertEqual(maybe_evaluate(lazy(lambda: 10)), 10) self.assertEqual(maybe_evaluate(20), 20) + + +class test_retry_over_time(Case): + + def setup(self): + self.index = 0 + + class Predicate(Exception): + pass + + def myfun(self): + if self.index < 9: + raise self.Predicate() + return 42 + + def errback(self, exc, intervals, retries): + interval = next(intervals) + sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0) + self.index += 1 + self.assertEqual(interval, sleepvals[self.index]) + return interval + + @mock.sleepdeprived(module=utils) + def test_simple(self): + prev_count, utils.count = utils.count, Mock() + try: + utils.count.return_value = list(range(1)) + x = retry_over_time(self.myfun, self.Predicate, + errback=None, interval_max=14) + self.assertIsNone(x) + utils.count.return_value = list(range(10)) + cb = Mock() + x = retry_over_time(self.myfun, self.Predicate, + errback=self.errback, callback=cb, + interval_max=14) + self.assertEqual(x, 42) + self.assertEqual(self.index, 9) + cb.assert_called_with() + finally: + utils.count = prev_count + + @mock.sleepdeprived(module=utils) + def test_retry_once(self): + with self.assertRaises(self.Predicate): + retry_over_time( + self.myfun, self.Predicate, + max_retries=1, errback=self.errback, interval_max=14, + ) + self.assertEqual(self.index, 1) + # no errback + with self.assertRaises(self.Predicate): + retry_over_time( + self.myfun, self.Predicate, + max_retries=1, errback=None, interval_max=14, + ) + + @mock.sleepdeprived(module=utils) + def test_retry_always(self): + Predicate = self.Predicate + + class Fun(object): + + def __init__(self): + self.calls = 0 + + def __call__(self, *args, **kwargs): + try: + if self.calls >= 10: + return 42 + raise Predicate() + finally: + self.calls += 1 + fun = Fun() + + self.assertEqual( + retry_over_time( + fun, self.Predicate, + max_retries=0, errback=None, interval_max=14, + ), + 42, + ) + self.assertEqual(fun.calls, 11) + + +class test_utils(Case): + + def test_maybe_list(self): + self.assertIsNone(maybe_list(None)) + self.assertEqual(maybe_list(1), [1]) + self.assertEqual(maybe_list([1, 2, 3]), [1, 2, 3]) + + def test_fxrange_no_repeatlast(self): + self.assertEqual(list(fxrange(1.0, 3.0, 1.0)), + [1.0, 2.0, 3.0]) + + def test_fxrangemax(self): + self.assertEqual(list(fxrangemax(1.0, 3.0, 1.0, 30.0)), + [1.0, 2.0, 3.0, 3.0, 3.0, 3.0, + 3.0, 3.0, 3.0, 3.0, 3.0]) + self.assertEqual(list(fxrangemax(1.0, None, 1.0, 30.0)), + [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) + + def test_reprkwargs(self): + self.assertTrue(reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'})) + + def test_reprcall(self): + self.assertTrue( + reprcall('add', (2, 2), {'copy': True}), + ) diff --git a/kombu/tests/utils/test_imports.py b/kombu/tests/utils/test_imports.py new file mode 100644 index 00000000..49e2a72f --- /dev/null +++ b/kombu/tests/utils/test_imports.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import, unicode_literals + +from kombu import Exchange +from kombu.utils.imports import symbol_by_name + +from kombu.tests.case import Case, Mock + + +class test_symbol_by_name(Case): + + def test_instance_returns_instance(self): + instance = object() + self.assertIs(symbol_by_name(instance), instance) + + def test_returns_default(self): + default = object() + self.assertIs( + symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default), + default, + ) + + def test_no_default(self): + with self.assertRaises(ImportError): + symbol_by_name('xyz.ryx.qedoa.weq:foz') + + def test_imp_reraises_ValueError(self): + imp = Mock() + imp.side_effect = ValueError() + with self.assertRaises(ValueError): + symbol_by_name('kombu.Connection', imp=imp) + + def test_package(self): + self.assertIs( + symbol_by_name('.entity:Exchange', package='kombu'), + Exchange, + ) + self.assertTrue(symbol_by_name(':Consumer', package='kombu')) diff --git a/kombu/tests/utils/test_objects.py b/kombu/tests/utils/test_objects.py new file mode 100644 index 00000000..22893e48 --- /dev/null +++ b/kombu/tests/utils/test_objects.py @@ -0,0 +1,55 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.utils.objects import cached_property + +from kombu.tests.case import Case + + +class test_cached_property(Case): + + def test_deleting(self): + + class X(object): + xx = False + + @cached_property + def foo(self): + return 42 + + @foo.deleter # noqa + def foo(self, value): + self.xx = value + + x = X() + del(x.foo) + self.assertFalse(x.xx) + x.__dict__['foo'] = 'here' + del(x.foo) + self.assertEqual(x.xx, 'here') + + def test_when_access_from_class(self): + + class X(object): + xx = None + + @cached_property + def foo(self): + return 42 + + @foo.setter # noqa + def foo(self, value): + self.xx = 10 + + desc = X.__dict__['foo'] + self.assertIs(X.foo, desc) + + self.assertIs(desc.__get__(None), desc) + self.assertIs(desc.__set__(None, 1), desc) + self.assertIs(desc.__delete__(None), desc) + self.assertTrue(desc.setter(1)) + + x = X() + x.foo = 30 + self.assertEqual(x.xx, 10) + + del(x.foo) diff --git a/kombu/tests/utils/test_utils.py b/kombu/tests/utils/test_utils.py index 806fb042..84adf6c9 100644 --- a/kombu/tests/utils/test_utils.py +++ b/kombu/tests/utils/test_utils.py @@ -1,31 +1,9 @@ from __future__ import absolute_import, unicode_literals -import pickle - -from io import StringIO, BytesIO - from kombu import version_info_t -from kombu import utils -from kombu.five import python_2_unicode_compatible from kombu.utils.text import version_string_as_tuple -from kombu.tests.case import Case, Mock, patch, mock - - -@python_2_unicode_compatible -class OldString(object): - - def __init__(self, value): - self.value = value - - def __str__(self): - return self.value - - def split(self, *args, **kwargs): - return self.value.split(*args, **kwargs) - - def rsplit(self, *args, **kwargs): - return self.value.rsplit(*args, **kwargs) +from kombu.tests.case import Case class test_kombu_module(Case): @@ -35,300 +13,6 @@ class test_kombu_module(Case): self.assertTrue(dir(kombu)) -class test_utils(Case): - - def test_maybe_list(self): - self.assertEqual(utils.maybe_list(None), []) - self.assertEqual(utils.maybe_list(1), [1]) - self.assertEqual(utils.maybe_list([1, 2, 3]), [1, 2, 3]) - - def test_fxrange_no_repeatlast(self): - self.assertEqual(list(utils.fxrange(1.0, 3.0, 1.0)), - [1.0, 2.0, 3.0]) - - def test_fxrangemax(self): - self.assertEqual(list(utils.fxrangemax(1.0, 3.0, 1.0, 30.0)), - [1.0, 2.0, 3.0, 3.0, 3.0, 3.0, - 3.0, 3.0, 3.0, 3.0, 3.0]) - self.assertEqual(list(utils.fxrangemax(1.0, None, 1.0, 30.0)), - [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) - - def test_reprkwargs(self): - self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'})) - - def test_reprcall(self): - self.assertTrue( - utils.reprcall('add', (2, 2), {'copy': True}), - ) - - -class test_UUID(Case): - - def test_uuid4(self): - self.assertNotEqual(utils.uuid4(), - utils.uuid4()) - - def test_uuid(self): - i1 = utils.uuid() - i2 = utils.uuid() - self.assertIsInstance(i1, str) - self.assertNotEqual(i1, i2) - - -class MyStringIO(StringIO): - - def close(self): - pass - - -class MyBytesIO(BytesIO): - - def close(self): - pass - - -class test_emergency_dump_state(Case): - - @mock.stdouts - def test_dump(self, stdout, stderr): - fh = MyBytesIO() - - utils.emergency_dump_state( - {'foo': 'bar'}, open_file=lambda n, m: fh) - self.assertDictEqual( - pickle.loads(fh.getvalue()), {'foo': 'bar'}) - self.assertTrue(stderr.getvalue()) - self.assertFalse(stdout.getvalue()) - - @mock.stdouts - def test_dump_second_strategy(self, stdout, stderr): - fh = MyStringIO() - - def raise_something(*args, **kwargs): - raise KeyError('foo') - - utils.emergency_dump_state( - {'foo': 'bar'}, - open_file=lambda n, m: fh, dump=raise_something - ) - self.assertIn('foo', fh.getvalue()) - self.assertIn('bar', fh.getvalue()) - self.assertTrue(stderr.getvalue()) - self.assertFalse(stdout.getvalue()) - - -class test_retry_over_time(Case): - - def setup(self): - self.index = 0 - - class Predicate(Exception): - pass - - def myfun(self): - if self.index < 9: - raise self.Predicate() - return 42 - - def errback(self, exc, intervals, retries): - interval = next(intervals) - sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0) - self.index += 1 - self.assertEqual(interval, sleepvals[self.index]) - return interval - - @mock.sleepdeprived(module=utils) - def test_simple(self): - prev_count, utils.count = utils.count, Mock() - try: - utils.count.return_value = list(range(1)) - x = utils.retry_over_time(self.myfun, self.Predicate, - errback=None, interval_max=14) - self.assertIsNone(x) - utils.count.return_value = list(range(10)) - cb = Mock() - x = utils.retry_over_time(self.myfun, self.Predicate, - errback=self.errback, callback=cb, - interval_max=14) - self.assertEqual(x, 42) - self.assertEqual(self.index, 9) - cb.assert_called_with() - finally: - utils.count = prev_count - - @mock.sleepdeprived(module=utils) - def test_retry_once(self): - with self.assertRaises(self.Predicate): - utils.retry_over_time( - self.myfun, self.Predicate, - max_retries=1, errback=self.errback, interval_max=14, - ) - self.assertEqual(self.index, 1) - # no errback - with self.assertRaises(self.Predicate): - utils.retry_over_time( - self.myfun, self.Predicate, - max_retries=1, errback=None, interval_max=14, - ) - - @mock.sleepdeprived(module=utils) - def test_retry_always(self): - Predicate = self.Predicate - - class Fun(object): - - def __init__(self): - self.calls = 0 - - def __call__(self, *args, **kwargs): - try: - if self.calls >= 10: - return 42 - raise Predicate() - finally: - self.calls += 1 - fun = Fun() - - self.assertEqual( - utils.retry_over_time( - fun, self.Predicate, - max_retries=0, errback=None, interval_max=14, - ), - 42, - ) - self.assertEqual(fun.calls, 11) - - -class test_cached_property(Case): - - def test_deleting(self): - - class X(object): - xx = False - - @utils.cached_property - def foo(self): - return 42 - - @foo.deleter # noqa - def foo(self, value): - self.xx = value - - x = X() - del(x.foo) - self.assertFalse(x.xx) - x.__dict__['foo'] = 'here' - del(x.foo) - self.assertEqual(x.xx, 'here') - - def test_when_access_from_class(self): - - class X(object): - xx = None - - @utils.cached_property - def foo(self): - return 42 - - @foo.setter # noqa - def foo(self, value): - self.xx = 10 - - desc = X.__dict__['foo'] - self.assertIs(X.foo, desc) - - self.assertIs(desc.__get__(None), desc) - self.assertIs(desc.__set__(None, 1), desc) - self.assertIs(desc.__delete__(None), desc) - self.assertTrue(desc.setter(1)) - - x = X() - x.foo = 30 - self.assertEqual(x.xx, 10) - - del(x.foo) - - -class test_symbol_by_name(Case): - - def test_instance_returns_instance(self): - instance = object() - self.assertIs(utils.symbol_by_name(instance), instance) - - def test_returns_default(self): - default = object() - self.assertIs( - utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default), - default, - ) - - def test_no_default(self): - with self.assertRaises(ImportError): - utils.symbol_by_name('xyz.ryx.qedoa.weq:foz') - - def test_imp_reraises_ValueError(self): - imp = Mock() - imp.side_effect = ValueError() - with self.assertRaises(ValueError): - utils.symbol_by_name('kombu.Connection', imp=imp) - - def test_package(self): - from kombu.entity import Exchange - self.assertIs( - utils.symbol_by_name('.entity:Exchange', package='kombu'), - Exchange, - ) - self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu')) - - -class test_ChannelPromise(Case): - - def test_repr(self): - obj = Mock(name='cb') - self.assertIn( - 'promise', - repr(utils.ChannelPromise(obj)), - ) - obj.assert_not_called() - - -class test_entrypoints(Case): - - @mock.mask_modules('pkg_resources') - def test_without_pkg_resources(self): - self.assertListEqual(list(utils.entrypoints('kombu.test')), []) - - @mock.module_exists('pkg_resources') - def test_with_pkg_resources(self): - with patch('pkg_resources.iter_entry_points', create=True) as iterep: - eps = iterep.return_value = [Mock(), Mock()] - - self.assertTrue(list(utils.entrypoints('kombu.test'))) - iterep.assert_called_with('kombu.test') - eps[0].load.assert_called_with() - eps[1].load.assert_called_with() - - -class test_shufflecycle(Case): - - def test_shuffles(self): - prev_repeat, utils.repeat = utils.repeat, Mock() - try: - utils.repeat.return_value = list(range(10)) - values = {'A', 'B', 'C'} - cycle = utils.shufflecycle(values) - seen = set() - for i in range(10): - next(cycle) - utils.repeat.assert_called_with(None) - self.assertTrue(seen.issubset(values)) - with self.assertRaises(StopIteration): - next(cycle) - next(cycle) - finally: - utils.repeat = prev_repeat - - class test_version_string_as_tuple(Case): def test_versions(self): @@ -356,13 +40,3 @@ class test_version_string_as_tuple(Case): version_string_as_tuple('3.3.1.a3.40c32'), version_info_t(3, 3, 1, 'a3', '40c32'), ) - - -class test_maybe_fileno(Case): - - def test_maybe_fileno(self): - self.assertEqual(utils.maybe_fileno(3), 3) - f = Mock(name='file') - self.assertIs(utils.maybe_fileno(f), f.fileno()) - f.fileno.side_effect = ValueError() - self.assertIsNone(utils.maybe_fileno(f)) diff --git a/kombu/tests/utils/test_uuid.py b/kombu/tests/utils/test_uuid.py new file mode 100644 index 00000000..ac018f09 --- /dev/null +++ b/kombu/tests/utils/test_uuid.py @@ -0,0 +1,17 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.utils.uuid import uuid + +from kombu.tests.case import Case + + +class test_UUID(Case): + + def test_uuid4(self): + self.assertNotEqual(uuid(), uuid()) + + def test_uuid(self): + i1 = uuid() + i2 = uuid() + self.assertIsInstance(i1, str) + self.assertNotEqual(i1, i2) diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py index 4515e0d3..689046f7 100644 --- a/kombu/transport/SLMQ.py +++ b/kombu/transport/SLMQ.py @@ -7,9 +7,9 @@ import string import os from kombu.five import Empty, text_t -from kombu.utils import cached_property # , uuid from kombu.utils.encoding import bytes_to_str, safe_str from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property from . import virtual diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index ac2f58a7..0f1679c1 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -51,10 +51,10 @@ from kombu.async.aws.sqs.ext import regions from kombu.async.aws.sqs.message import Message from kombu.five import Empty, range, string_t, text_t from kombu.log import get_logger -from kombu.utils import cached_property +from kombu.utils import scheduling from kombu.utils.encoding import bytes_to_str, safe_str from kombu.utils.json import loads, dumps -from kombu.utils import scheduling +from kombu.utils.objects import cached_property from . import virtual diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index ff4bf69c..57b67236 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals from kombu.five import string_t from kombu.syn import _detect_environment -from kombu.utils import symbol_by_name +from kombu.utils.imports import symbol_by_name def supports_librabbitmq(): diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 7eeace88..4e2d6a9d 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -8,7 +8,7 @@ from amqp.exceptions import RecoverableConnectionError from kombu.exceptions import ChannelError, ConnectionError from kombu.message import Message -from kombu.utils import cached_property +from kombu.utils.objects import cached_property __all__ = ['Message', 'StdChannel', 'Management', 'Transport'] diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py index 898d68fa..2598e473 100644 --- a/kombu/transport/consul.py +++ b/kombu/transport/consul.py @@ -15,8 +15,8 @@ from contextlib import contextmanager from kombu.exceptions import ChannelError from kombu.five import Empty, monotonic from kombu.log import get_logger -from kombu.utils import cached_property from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property from . import virtual diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index 55a32f7e..1bace4cf 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -12,9 +12,9 @@ import tempfile from . import virtual from kombu.exceptions import ChannelError from kombu.five import Empty, monotonic -from kombu.utils import cached_property from kombu.utils.encoding import bytes_to_str, str_to_bytes from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property VERSION = (1, 0, 0) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 528b9970..1c1e0280 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -17,7 +17,7 @@ from kombu.five import Empty from kombu.syn import _detect_environment from kombu.utils.encoding import bytes_to_str from kombu.utils.json import loads, dumps -from kombu.utils import cached_property +from kombu.utils.objects import cached_property from . import virtual diff --git a/kombu/transport/pyro.py b/kombu/transport/pyro.py index 5a726dac..8fb15ad6 100644 --- a/kombu/transport/pyro.py +++ b/kombu/transport/pyro.py @@ -7,7 +7,7 @@ from __future__ import absolute_import, unicode_literals import sys from kombu.five import reraise -from kombu.utils import cached_property +from kombu.utils.objects import cached_property from . import virtual diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index d020eaa0..f1dc3d48 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -14,12 +14,14 @@ from vine import promise from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, values, string_t from kombu.log import get_logger -from kombu.utils import cached_property, register_after_fork, uuid +from kombu.utils.compat import register_after_fork from kombu.utils.eventio import poll, READ, ERR from kombu.utils.encoding import bytes_to_str from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property from kombu.utils.scheduling import cycle_by_name from kombu.utils.url import _parse_url +from kombu.utils.uuid import uuid from . import virtual diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index d6bd6ef9..5940159b 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -20,9 +20,10 @@ from amqp.protocol import queue_declare_ok_t from kombu.exceptions import ResourceError, ChannelError from kombu.five import Empty, items, monotonic from kombu.log import get_logger -from kombu.utils import emergency_dump_state, uuid from kombu.utils.encoding import str_to_bytes, bytes_to_str +from kombu.utils.div import emergency_dump_state from kombu.utils.scheduling import FairCycle +from kombu.utils.uuid import uuid from kombu.transport import base diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index 3f2de089..c61d7147 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -5,10 +5,10 @@ by the AMQ protocol (excluding the `headers` exchange). """ from __future__ import absolute_import, unicode_literals -from kombu.utils import escape_regex - import re +from kombu.utils.text import escape_regex + class ExchangeType(object): """Implements the specifics for an exchange type. diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 4ecb7314..b1a37ab7 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -1,36 +1,14 @@ -"""Internal utilities.""" +"""DEPRECATED - Import from modules below""" from __future__ import absolute_import, print_function, unicode_literals -import importlib -import numbers -import random -import sys - -from contextlib import contextmanager -from itertools import count, repeat -from time import sleep -from uuid import uuid4 - -from vine.utils import wraps - -from kombu.five import items, python_2_unicode_compatible, reraise, string_t - -from .encoding import default_encode, safe_repr as _safe_repr - -try: - from io import UnsupportedOperation - FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation) -except ImportError: # pragma: no cover - # Py2 - FILENO_ERRORS = (AttributeError, ValueError) # noqa - -try: - from billiard.util import register_after_fork -except ImportError: # pragma: no cover - try: - from multiprocessing.util import register_after_fork # noqa - except ImportError: - register_after_fork = None # noqa +from .collections import EqualityDict +from .compat import fileno, maybe_fileno, nested, register_after_fork +from .div import emergency_dump_state +from .functional import ( + fxrange, fxrangemax, maybe_list, reprcall, retry_over_time, +) +from .objects import cached_property +from .uuid import uuid __all__ = [ 'EqualityDict', 'uuid', 'maybe_list', @@ -38,395 +16,3 @@ __all__ = [ 'emergency_dump_state', 'cached_property', 'register_after_fork', 'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno', ] - - -def symbol_by_name(name, aliases={}, imp=None, package=None, - sep='.', default=None, **kwargs): - """Get symbol by qualified name. - - The name should be the full dot-separated path to the class:: - - modulename.ClassName - - Example:: - - celery.concurrency.processes.TaskPool - ^- class name - - or using ':' to separate module and symbol:: - - celery.concurrency.processes:TaskPool - - If `aliases` is provided, a dict containing short name/long name - mappings, the name is looked up in the aliases first. - - Examples: - >>> symbol_by_name('celery.concurrency.processes.TaskPool') - - - >>> symbol_by_name('default', { - ... 'default': 'celery.concurrency.processes.TaskPool'}) - - - # Does not try to look up non-string names. - >>> from celery.concurrency.processes import TaskPool - >>> symbol_by_name(TaskPool) is TaskPool - True - """ - if imp is None: - imp = importlib.import_module - - if not isinstance(name, string_t): - return name # already a class - - name = aliases.get(name) or name - sep = ':' if ':' in name else sep - module_name, _, cls_name = name.rpartition(sep) - if not module_name: - cls_name, module_name = None, package if package else cls_name - try: - try: - module = imp(module_name, package=package, **kwargs) - except ValueError as exc: - reraise(ValueError, - ValueError("Couldn't import {0!r}: {1}".format(name, exc)), - sys.exc_info()[2]) - return getattr(module, cls_name) if cls_name else module - except (ImportError, AttributeError): - if default is None: - raise - return default - - -class HashedSeq(list): - """type used for hash() to make sure the hash is not generated - multiple times.""" - __slots__ = 'hashvalue' - - def __init__(self, *seq): - self[:] = seq - self.hashvalue = hash(seq) - - def __hash__(self): - return self.hashvalue - - -def eqhash(o): - try: - return o.__eqhash__() - except AttributeError: - return hash(o) - - -class EqualityDict(dict): - - def __getitem__(self, key): - h = eqhash(key) - if h not in self: - return self.__missing__(key) - return dict.__getitem__(self, h) - - def __setitem__(self, key, value): - return dict.__setitem__(self, eqhash(key), value) - - def __delitem__(self, key): - return dict.__delitem__(self, eqhash(key)) - - -def uuid(): - """Generate a unique id, having - hopefully - a very small chance of - collision. - - For now this is provided by :func:`uuid.uuid4`. - """ - return str(uuid4()) - - -def maybe_list(v): - if v is None: - return [] - if hasattr(v, '__iter__'): - return v - return [v] - - -def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False): - cur = start * 1.0 - while 1: - if not stop or cur <= stop: - yield cur - cur += step - else: - if not repeatlast: - break - yield cur - step - - -def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0): - sum_, cur = 0, start * 1.0 - while 1: - if sum_ >= max: - break - yield cur - if stop: - cur = min(cur + step, stop) - else: - cur += step - sum_ += cur - - -def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, - max_retries=None, interval_start=2, interval_step=2, - interval_max=30, callback=None): - """Retry the function over and over until max retries is exceeded. - - For each retry we sleep a for a while before we try again, this interval - is increased for every retry until the max seconds is reached. - - Arguments: - fun (Callable): The function to try - catch (Tuple[BaseException]): Exceptions to catch, can be either - tuple or a single exception class. - - Keyword Arguments: - args (Tuple): Positional arguments passed on to the function. - kwargs (Dict): Keyword arguments passed on to the function. - errback (Callable): Callback for when an exception in ``catch`` - is raised. The callback must take three arguments: - ``exc``, ``interval_range`` and ``retries``, where ``exc`` - is the exception instance, ``interval_range`` is an iterator - which return the time in seconds to sleep next, and ``retries`` - is the number of previous retries. - max_retries (int): Maximum number of retries before we give up. - If this is not set, we will retry forever. - interval_start (float): How long (in seconds) we start sleeping - between retries. - interval_step (float): By how much the interval is increased for - each retry. - interval_max (float): Maximum number of seconds to sleep - between retries. - """ - retries = 0 - interval_range = fxrange(interval_start, - interval_max + interval_start, - interval_step, repeatlast=True) - for retries in count(): - try: - return fun(*args, **kwargs) - except catch as exc: - if max_retries and retries >= max_retries: - raise - if callback: - callback() - tts = float(errback(exc, interval_range, retries) if errback - else next(interval_range)) - if tts: - for _ in range(int(tts)): - if callback: - callback() - sleep(1.0) - # sleep remainder after int truncation above. - sleep(abs(int(tts) - tts)) - - -def emergency_dump_state(state, open_file=open, dump=None, stderr=None): - from pprint import pformat - from tempfile import mktemp - stderr = sys.stderr if stderr is None else stderr - - if dump is None: - import pickle - dump = pickle.dump - persist = mktemp() - print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), ## noqa - file=stderr) - fh = open_file(persist, 'w') - try: - try: - dump(state, fh, protocol=0) - except Exception as exc: - print( # noqa - 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc), - file=stderr, - ) - fh.write(default_encode(pformat(state))) - finally: - fh.flush() - fh.close() - return persist - - -class cached_property(object): - """Property descriptor that caches the return value - of the get function. - - Examples: - .. code-block:: python - - @cached_property - def connection(self): - return Connection() - - @connection.setter # Prepares stored value - def connection(self, value): - if value is None: - raise TypeError('Connection must be a connection') - return value - - @connection.deleter - def connection(self, value): - # Additional action to do at del(self.attr) - if value is not None: - print('Connection {0!r} deleted'.format(value) - """ - - def __init__(self, fget=None, fset=None, fdel=None, doc=None): - self.__get = fget - self.__set = fset - self.__del = fdel - self.__doc__ = doc or fget.__doc__ - self.__name__ = fget.__name__ - self.__module__ = fget.__module__ - - def __get__(self, obj, type=None): - if obj is None: - return self - try: - return obj.__dict__[self.__name__] - except KeyError: - value = obj.__dict__[self.__name__] = self.__get(obj) - return value - - def __set__(self, obj, value): - if obj is None: - return self - if self.__set is not None: - value = self.__set(obj, value) - obj.__dict__[self.__name__] = value - - def __delete__(self, obj, _sentinel=object()): - if obj is None: - return self - value = obj.__dict__.pop(self.__name__, _sentinel) - if self.__del is not None and value is not _sentinel: - self.__del(obj, value) - - def setter(self, fset): - return self.__class__(self.__get, fset, self.__del) - - def deleter(self, fdel): - return self.__class__(self.__get, self.__set, fdel) - - -def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'): - return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs)) - - -def reprcall(name, args=(), kwargs={}, sep=', '): - return '{0}({1}{2}{3})'.format( - name, sep.join(map(_safe_repr, args or ())), - (args and kwargs) and sep or '', - reprkwargs(kwargs, sep), - ) - - -@contextmanager -def nested(*managers): # pragma: no cover - # flake8: noqa - """Combine multiple context managers into a single nested - context manager.""" - exits = [] - vars = [] - exc = (None, None, None) - try: - try: - for mgr in managers: - exit = mgr.__exit__ - enter = mgr.__enter__ - vars.append(enter()) - exits.append(exit) - yield vars - except: - exc = sys.exc_info() - finally: - while exits: - exit = exits.pop() - try: - if exit(*exc): - exc = (None, None, None) - except: - exc = sys.exc_info() - if exc != (None, None, None): - # Don't rely on sys.exc_info() still containing - # the right information. Another exception may - # have been raised and caught by an exit method - reraise(exc[0], exc[1], exc[2]) - finally: - del(exc) - - -def shufflecycle(it): - it = list(it) # don't modify callers list - shuffle = random.shuffle - for _ in repeat(None): - shuffle(it) - yield it[0] - - -def entrypoints(namespace): - try: - from pkg_resources import iter_entry_points - except ImportError: - return iter([]) - return ((ep, ep.load()) for ep in iter_entry_points(namespace)) - - -@python_2_unicode_compatible -class ChannelPromise(object): - - def __init__(self, contract): - self.__contract__ = contract - - def __call__(self): - try: - return self.__value__ - except AttributeError: - value = self.__value__ = self.__contract__() - return value - - def __repr__(self): - try: - return repr(self.__value__) - except AttributeError: - return ''.format(id(self.__contract__)) - - -def escape_regex(p, white=''): - # what's up with re.escape? that code must be neglected or someting - return ''.join(c if c.isalnum() or c in white - else ('\\000' if c == '\000' else '\\' + c) - for c in p) - - -def fileno(f): - if isinstance(f, numbers.Integral): - return f - return f.fileno() - - -def maybe_fileno(f): - """Get object fileno, or :const:`None` if not defined.""" - try: - return fileno(f) - except FILENO_ERRORS: - pass - - -def coro(gen): - - @wraps(gen) - def wind_up(*args, **kwargs): - it = gen(*args, **kwargs) - next(it) - return it - return wind_up diff --git a/kombu/utils/collections.py b/kombu/utils/collections.py new file mode 100644 index 00000000..7d8fd1ee --- /dev/null +++ b/kombu/utils/collections.py @@ -0,0 +1,37 @@ +"""Custom maps, sequences, etc.""" +from __future__ import absolute_import, unicode_literals + + +class HashedSeq(list): + """type used for hash() to make sure the hash is not generated + multiple times.""" + __slots__ = 'hashvalue' + + def __init__(self, *seq): + self[:] = seq + self.hashvalue = hash(seq) + + def __hash__(self): + return self.hashvalue + + +def eqhash(o): + try: + return o.__eqhash__() + except AttributeError: + return hash(o) + + +class EqualityDict(dict): + + def __getitem__(self, key): + h = eqhash(key) + if h not in self: + return self.__missing__(key) + return dict.__getitem__(self, h) + + def __setitem__(self, key, value): + return dict.__setitem__(self, eqhash(key), value) + + def __delitem__(self, key): + return dict.__delitem__(self, eqhash(key)) diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py new file mode 100644 index 00000000..7d21222d --- /dev/null +++ b/kombu/utils/compat.py @@ -0,0 +1,92 @@ +from __future__ import absolute_import, unicode_literals + +import numbers +import sys + +from functools import wraps + +from contextlib import contextmanager + +from kombu.five import reraise + +try: + from io import UnsupportedOperation + FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation) +except ImportError: # pragma: no cover + # Py2 + FILENO_ERRORS = (AttributeError, ValueError) # noqa + +try: + from billiard.util import register_after_fork +except ImportError: # pragma: no cover + try: + from multiprocessing.util import register_after_fork # noqa + except ImportError: + register_after_fork = None # noqa + + +def coro(gen): + + @wraps(gen) + def wind_up(*args, **kwargs): + it = gen(*args, **kwargs) + next(it) + return it + return wind_up + + +def entrypoints(namespace): + try: + from pkg_resources import iter_entry_points + except ImportError: + return iter([]) + return ((ep, ep.load()) for ep in iter_entry_points(namespace)) + + +def fileno(f): + if isinstance(f, numbers.Integral): + return f + return f.fileno() + + +def maybe_fileno(f): + """Get object fileno, or :const:`None` if not defined.""" + try: + return fileno(f) + except FILENO_ERRORS: + pass + + +@contextmanager +def nested(*managers): # pragma: no cover + # flake8: noqa + """Combine multiple context managers into a single nested + context manager.""" + exits = [] + vars = [] + exc = (None, None, None) + try: + try: + for mgr in managers: + exit = mgr.__exit__ + enter = mgr.__enter__ + vars.append(enter()) + exits.append(exit) + yield vars + except: + exc = sys.exc_info() + finally: + while exits: + exit = exits.pop() + try: + if exit(*exc): + exc = (None, None, None) + except: + exc = sys.exc_info() + if exc != (None, None, None): + # Don't rely on sys.exc_info() still containing + # the right information. Another exception may + # have been raised and caught by an exit method + reraise(exc[0], exc[1], exc[2]) + finally: + del(exc) diff --git a/kombu/utils/div.py b/kombu/utils/div.py new file mode 100644 index 00000000..d73afe07 --- /dev/null +++ b/kombu/utils/div.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import, unicode_literals, print_function + +from .encoding import default_encode + +import sys + + +def emergency_dump_state(state, open_file=open, dump=None, stderr=None): + from pprint import pformat + from tempfile import mktemp + stderr = sys.stderr if stderr is None else stderr + + if dump is None: + import pickle + dump = pickle.dump + persist = mktemp() + print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), # noqa + file=stderr) + fh = open_file(persist, 'w') + try: + try: + dump(state, fh, protocol=0) + except Exception as exc: + print( # noqa + 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc), + file=stderr, + ) + fh.write(default_encode(pformat(state))) + finally: + fh.flush() + fh.close() + return persist diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 6ea7ba96..114ecb95 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -1,9 +1,12 @@ from __future__ import absolute_import, unicode_literals +import random import sys import threading from collections import Iterable, Mapping, OrderedDict +from itertools import count, repeat +from time import sleep from vine.utils import wraps @@ -11,6 +14,8 @@ from kombu.five import ( UserDict, items, keys, python_2_unicode_compatible, string_t, ) +from .encoding import safe_repr as _safe_repr + __all__ = [ 'LRUCache', 'memoize', 'lazy', 'maybe_evaluate', 'is_list', 'maybe_list', 'dictfilter', @@ -19,6 +24,26 @@ __all__ = [ KEYWORD_MARK = object() +@python_2_unicode_compatible +class ChannelPromise(object): + + def __init__(self, contract): + self.__contract__ = contract + + def __call__(self): + try: + return self.__value__ + except AttributeError: + value = self.__value__ = self.__contract__() + return value + + def __repr__(self): + try: + return repr(self.__value__) + except AttributeError: + return ''.format(id(self.__contract__)) + + class LRUCache(UserDict): """LRU Cache implementation using a doubly linked list to track access. @@ -231,6 +256,104 @@ def dictfilter(d=None, **kw): return {k: v for k, v in items(d) if v is not None} +def shufflecycle(it): + it = list(it) # don't modify callers list + shuffle = random.shuffle + for _ in repeat(None): + shuffle(it) + yield it[0] + + +def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False): + cur = start * 1.0 + while 1: + if not stop or cur <= stop: + yield cur + cur += step + else: + if not repeatlast: + break + yield cur - step + + +def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0): + sum_, cur = 0, start * 1.0 + while 1: + if sum_ >= max: + break + yield cur + if stop: + cur = min(cur + step, stop) + else: + cur += step + sum_ += cur + + +def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, + max_retries=None, interval_start=2, interval_step=2, + interval_max=30, callback=None): + """Retry the function over and over until max retries is exceeded. + + For each retry we sleep a for a while before we try again, this interval + is increased for every retry until the max seconds is reached. + + Arguments: + fun (Callable): The function to try + catch (Tuple[BaseException]): Exceptions to catch, can be either + tuple or a single exception class. + + Keyword Arguments: + args (Tuple): Positional arguments passed on to the function. + kwargs (Dict): Keyword arguments passed on to the function. + errback (Callable): Callback for when an exception in ``catch`` + is raised. The callback must take three arguments: + ``exc``, ``interval_range`` and ``retries``, where ``exc`` + is the exception instance, ``interval_range`` is an iterator + which return the time in seconds to sleep next, and ``retries`` + is the number of previous retries. + max_retries (int): Maximum number of retries before we give up. + If this is not set, we will retry forever. + interval_start (float): How long (in seconds) we start sleeping + between retries. + interval_step (float): By how much the interval is increased for + each retry. + interval_max (float): Maximum number of seconds to sleep + between retries. + """ + retries = 0 + interval_range = fxrange(interval_start, + interval_max + interval_start, + interval_step, repeatlast=True) + for retries in count(): + try: + return fun(*args, **kwargs) + except catch as exc: + if max_retries and retries >= max_retries: + raise + if callback: + callback() + tts = float(errback(exc, interval_range, retries) if errback + else next(interval_range)) + if tts: + for _ in range(int(tts)): + if callback: + callback() + sleep(1.0) + # sleep remainder after int truncation above. + sleep(abs(int(tts) - tts)) + + +def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'): + return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs)) + + +def reprcall(name, args=(), kwargs={}, sep=', '): + return '{0}({1}{2}{3})'.format( + name, sep.join(map(_safe_repr, args or ())), + (args and kwargs) and sep or '', + reprkwargs(kwargs, sep), + ) + # Compat names (before kombu 3.0) promise = lazy maybe_promise = maybe_evaluate diff --git a/kombu/utils/imports.py b/kombu/utils/imports.py new file mode 100644 index 00000000..05f48d1d --- /dev/null +++ b/kombu/utils/imports.py @@ -0,0 +1,65 @@ +"""Import related utilities.""" +from __future__ import absolute_import, unicode_literals + +import importlib +import sys + +from kombu.five import reraise, string_t + + +def symbol_by_name(name, aliases={}, imp=None, package=None, + sep='.', default=None, **kwargs): + """Get symbol by qualified name. + + The name should be the full dot-separated path to the class:: + + modulename.ClassName + + Example:: + + celery.concurrency.processes.TaskPool + ^- class name + + or using ':' to separate module and symbol:: + + celery.concurrency.processes:TaskPool + + If `aliases` is provided, a dict containing short name/long name + mappings, the name is looked up in the aliases first. + + Examples: + >>> symbol_by_name('celery.concurrency.processes.TaskPool') + + + >>> symbol_by_name('default', { + ... 'default': 'celery.concurrency.processes.TaskPool'}) + + + # Does not try to look up non-string names. + >>> from celery.concurrency.processes import TaskPool + >>> symbol_by_name(TaskPool) is TaskPool + True + """ + if imp is None: + imp = importlib.import_module + + if not isinstance(name, string_t): + return name # already a class + + name = aliases.get(name) or name + sep = ':' if ':' in name else sep + module_name, _, cls_name = name.rpartition(sep) + if not module_name: + cls_name, module_name = None, package if package else cls_name + try: + try: + module = imp(module_name, package=package, **kwargs) + except ValueError as exc: + reraise(ValueError, + ValueError("Couldn't import {0!r}: {1}".format(name, exc)), + sys.exc_info()[2]) + return getattr(module, cls_name) if cls_name else module + except (ImportError, AttributeError): + if default is None: + raise + return default diff --git a/kombu/utils/objects.py b/kombu/utils/objects.py new file mode 100644 index 00000000..b1f09e96 --- /dev/null +++ b/kombu/utils/objects.py @@ -0,0 +1,63 @@ +from __future__ import absolute_import, unicode_literals + + +class cached_property(object): + """Property descriptor that caches the return value + of the get function. + + Examples: + .. code-block:: python + + @cached_property + def connection(self): + return Connection() + + @connection.setter # Prepares stored value + def connection(self, value): + if value is None: + raise TypeError('Connection must be a connection') + return value + + @connection.deleter + def connection(self, value): + # Additional action to do at del(self.attr) + if value is not None: + print('Connection {0!r} deleted'.format(value) + """ + + def __init__(self, fget=None, fset=None, fdel=None, doc=None): + self.__get = fget + self.__set = fset + self.__del = fdel + self.__doc__ = doc or fget.__doc__ + self.__name__ = fget.__name__ + self.__module__ = fget.__module__ + + def __get__(self, obj, type=None): + if obj is None: + return self + try: + return obj.__dict__[self.__name__] + except KeyError: + value = obj.__dict__[self.__name__] = self.__get(obj) + return value + + def __set__(self, obj, value): + if obj is None: + return self + if self.__set is not None: + value = self.__set(obj, value) + obj.__dict__[self.__name__] = value + + def __delete__(self, obj, _sentinel=object()): + if obj is None: + return self + value = obj.__dict__.pop(self.__name__, _sentinel) + if self.__del is not None and value is not _sentinel: + self.__del(obj, value) + + def setter(self, fset): + return self.__class__(self.__get, fset, self.__del) + + def deleter(self, fdel): + return self.__class__(self.__get, self.__set, fdel) diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py index d2ccf272..f44222db 100644 --- a/kombu/utils/scheduling.py +++ b/kombu/utils/scheduling.py @@ -5,7 +5,7 @@ from itertools import count from kombu.five import python_2_unicode_compatible -from . import symbol_by_name +from .imports import symbol_by_name __all__ = [ 'FairCycle', 'priority_cycle', 'round_robin_cycle', 'sorted_cycle', diff --git a/kombu/utils/text.py b/kombu/utils/text.py index 1b450c03..717b7ecc 100644 --- a/kombu/utils/text.py +++ b/kombu/utils/text.py @@ -7,6 +7,13 @@ from kombu import version_info_t from kombu.five import string_t +def escape_regex(p, white=''): + # what's up with re.escape? that code must be neglected or someting + return ''.join(c if c.isalnum() or c in white + else ('\\000' if c == '\000' else '\\' + c) + for c in p) + + def fmatch_iter(needle, haystack, min_ratio=0.6): for key in haystack: ratio = SequenceMatcher(None, needle, key).ratio() diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py new file mode 100644 index 00000000..286f07fb --- /dev/null +++ b/kombu/utils/uuid.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import, unicode_literals + +from uuid import uuid4 + + +def uuid(_uuid=uuid4): + """Generate a unique id, having - hopefully - a very small chance of + collision. + + See Also: + For now this is provided by :func:`uuid.uuid4`. + """ + return str(_uuid())