Reorganizes kombu.utils.__init__ package

This commit is contained in:
Ask Solem 2016-07-16 13:33:32 -07:00
parent 2358f990d5
commit 85c191a48a
59 changed files with 905 additions and 814 deletions

View File

@ -25,5 +25,6 @@ globals().update(conf.build_config(
'kombu.async.aws.ext',
'kombu.async.aws.sqs.ext',
'kombu.transport.qpid_patches',
'kombu.utils',
],
))

View File

@ -55,15 +55,20 @@
kombu.transport.virtual
kombu.transport.virtual.exchange
kombu.serialization
kombu.utils
kombu.utils.scheduling
kombu.utils.eventio
kombu.utils.limits
kombu.utils.debug
kombu.utils.encoding
kombu.utils.functional
kombu.utils.json
kombu.utils.url
kombu.utils.text
kombu.utils.amq_manager
kombu.utils.collections
kombu.utils.compat
kombu.utils.debug
kombu.utils.div
kombu.utils.encoding
kombu.utils.eventio
kombu.utils.functional
kombu.utils.imports
kombu.utils.json
kombu.utils.limits
kombu.utils.objects
kombu.utils.scheduling
kombu.utils.text
kombu.utils.url
kombu.utils.uuid
kombu.five

View File

@ -0,0 +1,11 @@
==========================================================
Custom Collections - ``kombu.utils.collections``
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.collections
.. automodule:: kombu.utils.collections
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Python Compatibility - ``kombu.utils.compat``
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.compat
.. automodule:: kombu.utils.compat
:members:
:undoc-members:

View File

@ -1,11 +1,11 @@
==========================================================
Utilities - ``kombu.utils``
Div Utilities - ``kombu.utils.div``
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils
.. currentmodule:: kombu.utils.div
.. automodule:: kombu.utils
.. automodule:: kombu.utils.div
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Module Importing Utilities - ``kombu.utils.imports``
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.imports
.. automodule:: kombu.utils.imports
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
Object/Property Utilities - ``kombu.utils.objects``
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.objects
.. automodule:: kombu.utils.objects
:members:
:undoc-members:

View File

@ -0,0 +1,11 @@
==========================================================
UUID Utilities - ``kombu.utils.uuid``
==========================================================
.. contents::
:local:
.. currentmodule:: kombu.utils.uuid
.. automodule:: kombu.utils.uuid
:members:
:undoc-members:

View File

@ -37,7 +37,7 @@ Draining events from several consumers:
.. code-block:: python
from kombu.utils import nested
from kombu.utils.compat import nested
with connection.channel(), connection.channel() as (channel1, channel2):
with nested(Consumer(channel1, queues1, accept=['json']),

View File

@ -1,8 +1,7 @@
#!/usr/bin/env python
from __future__ import absolute_import, unicode_literals
from kombu import Connection, Producer, Consumer, Queue
from kombu.utils import uuid
from kombu import Connection, Producer, Consumer, Queue, uuid
class FibonacciRpcClient(object):

View File

@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu.utils import reprcall
from kombu.utils.functional import reprcall
from .queues import task_queues

View File

@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals
from kombu import Consumer, Producer, Exchange, Queue
from kombu.five import range
from kombu.utils import nested
from kombu.utils.compat import nested
from funtests import transport

View File

@ -6,7 +6,7 @@ from copy import copy
from .connection import maybe_channel
from .exceptions import NotBoundError
from .five import python_2_unicode_compatible
from .utils import ChannelPromise
from .utils.functional import ChannelPromise
__all__ = ['Object', 'MaybeChannelBound']

View File

@ -2,8 +2,8 @@
from __future__ import absolute_import, unicode_literals
from kombu.five import items, string_t
from kombu.utils import reprcall
from kombu.utils.eventio import READ, WRITE, ERR
from kombu.utils.functional import reprcall
def repr_flag(flag):

View File

@ -6,7 +6,7 @@ from vine import Thenable, promise, maybe_promise
from kombu.exceptions import HttpError
from kombu.five import items, python_2_unicode_compatible
from kombu.utils import coro
from kombu.utils.compat import coro
from kombu.utils.encoding import bytes_to_str
from kombu.utils.functional import maybe_list, memoize
@ -223,7 +223,7 @@ class BaseClient(object):
self._header_parser = header_parser()
def perform(self, request, **kwargs):
for req in maybe_list(request):
for req in maybe_list(request) or []:
if not isinstance(req, self.Request):
req = self.Request(req, **kwargs)
self.add_request(req)

View File

@ -12,8 +12,9 @@ from vine import Thenable, promise
from kombu.five import Empty, python_2_unicode_compatible, range
from kombu.log import get_logger
from kombu.utils import cached_property, fileno
from kombu.utils.compat import fileno
from kombu.utils.eventio import READ, WRITE, ERR, poll
from kombu.utils.objects import cached_property
from .timer import Timer

View File

@ -17,7 +17,7 @@ from .entity import Exchange, Queue
from .five import bytes_if_py2, range
from .log import get_logger
from .serialization import registry as serializers
from .utils import uuid
from .utils.uuid import uuid
try:
from _thread import get_ident

View File

@ -12,12 +12,14 @@ from operator import itemgetter
# jython breaks on relative import for .exceptions for some reason
# (Issue #112)
from kombu import exceptions
from .five import bytes_if_py2, python_2_unicode_compatible, string_t, text_t
from .log import get_logger
from .resource import Resource
from .transport import get_transport_cls, supports_librabbitmq
from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq
from .utils.functional import dictfilter, lazy
from .utils.collections import HashedSeq
from .utils.functional import dictfilter, lazy, retry_over_time, shufflecycle
from .utils.objects import cached_property
from .utils.url import as_url, parse_url, quote, urlparse
__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']

View File

@ -8,9 +8,9 @@ import sys
from logging.handlers import WatchedFileHandler
from .five import string_t
from .utils import cached_property
from .utils.encoding import safe_repr, safe_str
from .utils.functional import maybe_evaluate
from .utils.objects import cached_property
__all__ = ['LogMixin', 'LOG_LEVELS', 'get_loglevel', 'setup_logging']

View File

@ -1,10 +1,4 @@
"""
kombu.messaging
===============
Sending and receiving messages.
"""
"""Sending and receiving messages."""
from __future__ import absolute_import, unicode_literals
from itertools import count
@ -16,7 +10,7 @@ from .entity import Exchange, Queue, maybe_delivery_mode
from .exceptions import ContentDisallowed
from .five import items, python_2_unicode_compatible, text_t, values
from .serialization import dumps, prepare_accept_content
from .utils import ChannelPromise, maybe_list
from .utils.functional import ChannelPromise, maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']

View File

@ -19,9 +19,10 @@ from .common import ignore_errors
from .five import range
from .messaging import Consumer, Producer
from .log import get_logger
from .utils import cached_property, nested
from .utils.compat import nested
from .utils.encoding import safe_repr
from .utils.limits import TokenBucket
from .utils.objects import cached_property
__all__ = ['ConsumerMixin']

View File

@ -17,8 +17,9 @@ from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
from .five import range
from .log import get_logger
from .utils import cached_property, uuid, reprcall
from .utils.functional import maybe_evaluate
from .utils.functional import maybe_evaluate, reprcall
from .utils.objects import cached_property
from .utils.uuid import uuid
REPLY_QUEUE_EXPIRES = 10

View File

@ -8,7 +8,8 @@ from itertools import chain
from .connection import Resource
from .five import range, values
from .messaging import Producer
from .utils import EqualityDict, register_after_fork
from .utils.collections import EqualityDict
from .utils.compat import register_after_fork
from .utils.functional import lazy
__all__ = ['ProducerPool', 'PoolGroup', 'register_group',

View File

@ -7,7 +7,7 @@ from collections import deque
from . import exceptions
from .five import Empty, LifoQueue as _LifoQueue
from .utils import register_after_fork
from .utils.compat import register_after_fork
from .utils.functional import lazy

View File

@ -19,7 +19,7 @@ from .exceptions import (
ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
)
from .five import reraise, text_t
from .utils import entrypoints
from .utils.compat import entrypoints
from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t
__all__ = ['pickle', 'loads', 'dumps', 'register', 'unregister']

View File

@ -6,7 +6,7 @@ from kombu.async.aws.sqs.connection import (
)
from kombu.async.aws.sqs.message import AsyncMessage
from kombu.async.aws.sqs.queue import AsyncQueue
from kombu.utils import uuid
from kombu.utils.uuid import uuid
from kombu.tests.async.aws.case import AWSCase
from kombu.tests.case import PromiseMock, Mock

View File

@ -5,7 +5,7 @@ from kombu.async.aws.sqs.message import AsyncMessage
from kombu.tests.async.aws.case import AWSCase
from kombu.tests.case import PromiseMock, Mock
from kombu.utils import uuid
from kombu.utils.uuid import uuid
class test_AsyncMessage(AWSCase):

View File

@ -7,8 +7,8 @@ from collections import defaultdict
from kombu import Connection, Consumer, Producer, Exchange, Queue
from kombu.exceptions import MessageStateError
from kombu.utils import ChannelPromise
from kombu.utils import json
from kombu.utils.functional import ChannelPromise
from .case import Case, Mock, patch
from .mocks import Transport

View File

@ -6,7 +6,7 @@ import warnings
from kombu import Connection
from kombu import pidbox
from kombu.exceptions import ContentDisallowed, InconsistencyError
from kombu.utils import uuid
from kombu.utils.uuid import uuid
from .case import Case, Mock, patch

View File

@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from kombu import Connection, Producer
from kombu import pools
from kombu.connection import ConnectionPool
from kombu.utils import eqhash
from kombu.utils.collections import eqhash
from .case import Case, Mock

View File

@ -4,10 +4,10 @@ import sys
import warnings
from kombu import Connection
from kombu.compression import compress
from kombu.exceptions import ResourceError, ChannelError
from kombu.transport import virtual
from kombu.utils import uuid
from kombu.compression import compress
from kombu.utils.uuid import uuid
from kombu.tests.case import Case, MagicMock, Mock, mock, patch

View File

@ -0,0 +1,32 @@
from __future__ import absolute_import, unicode_literals
from kombu.utils.compat import entrypoints, maybe_fileno
from kombu.tests.case import Case, Mock, mock, patch
class test_entrypoints(Case):
@mock.mask_modules('pkg_resources')
def test_without_pkg_resources(self):
self.assertListEqual(list(entrypoints('kombu.test')), [])
@mock.module_exists('pkg_resources')
def test_with_pkg_resources(self):
with patch('pkg_resources.iter_entry_points', create=True) as iterep:
eps = iterep.return_value = [Mock(), Mock()]
self.assertTrue(list(entrypoints('kombu.test')))
iterep.assert_called_with('kombu.test')
eps[0].load.assert_called_with()
eps[1].load.assert_called_with()
class test_maybe_fileno(Case):
def test_maybe_fileno(self):
self.assertEqual(maybe_fileno(3), 3)
f = Mock(name='file')
self.assertIs(maybe_fileno(f), f.fileno())
f.fileno.side_effect = ValueError()
self.assertIsNone(maybe_fileno(f))

View File

@ -0,0 +1,51 @@
from __future__ import absolute_import, unicode_literals
import pickle
from io import StringIO, BytesIO
from kombu.utils.div import emergency_dump_state
from kombu.tests.case import Case, mock
class MyStringIO(StringIO):
def close(self):
pass
class MyBytesIO(BytesIO):
def close(self):
pass
class test_emergency_dump_state(Case):
@mock.stdouts
def test_dump(self, stdout, stderr):
fh = MyBytesIO()
emergency_dump_state(
{'foo': 'bar'}, open_file=lambda n, m: fh)
self.assertDictEqual(
pickle.loads(fh.getvalue()), {'foo': 'bar'})
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
@mock.stdouts
def test_dump_second_strategy(self, stdout, stderr):
fh = MyStringIO()
def raise_something(*args, **kwargs):
raise KeyError('foo')
emergency_dump_state(
{'foo': 'bar'},
open_file=lambda n, m: fh, dump=raise_something
)
self.assertIn('foo', fh.getvalue())
self.assertIn('bar', fh.getvalue())
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())

View File

@ -5,9 +5,44 @@ import pickle
from itertools import count
from kombu.five import items
from kombu.utils.functional import LRUCache, memoize, lazy, maybe_evaluate
from kombu.utils import functional as utils
from kombu.utils.functional import (
ChannelPromise, LRUCache, fxrange, fxrangemax, memoize, lazy,
maybe_evaluate, maybe_list, reprcall, reprkwargs, retry_over_time,
)
from kombu.tests.case import Case, skip
from kombu.tests.case import Case, Mock, mock, skip
class test_ChannelPromise(Case):
def test_repr(self):
obj = Mock(name='cb')
self.assertIn(
'promise',
repr(ChannelPromise(obj)),
)
obj.assert_not_called()
class test_shufflecycle(Case):
def test_shuffles(self):
prev_repeat, utils.repeat = utils.repeat, Mock()
try:
utils.repeat.return_value = list(range(10))
values = {'A', 'B', 'C'}
cycle = utils.shufflecycle(values)
seen = set()
for i in range(10):
next(cycle)
utils.repeat.assert_called_with(None)
self.assertTrue(seen.issubset(values))
with self.assertRaises(StopIteration):
next(cycle)
next(cycle)
finally:
utils.repeat = prev_repeat
def double(x):
@ -141,3 +176,112 @@ class test_maybe_evaluate(Case):
def test_evaluates(self):
self.assertEqual(maybe_evaluate(lazy(lambda: 10)), 10)
self.assertEqual(maybe_evaluate(20), 20)
class test_retry_over_time(Case):
def setup(self):
self.index = 0
class Predicate(Exception):
pass
def myfun(self):
if self.index < 9:
raise self.Predicate()
return 42
def errback(self, exc, intervals, retries):
interval = next(intervals)
sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0)
self.index += 1
self.assertEqual(interval, sleepvals[self.index])
return interval
@mock.sleepdeprived(module=utils)
def test_simple(self):
prev_count, utils.count = utils.count, Mock()
try:
utils.count.return_value = list(range(1))
x = retry_over_time(self.myfun, self.Predicate,
errback=None, interval_max=14)
self.assertIsNone(x)
utils.count.return_value = list(range(10))
cb = Mock()
x = retry_over_time(self.myfun, self.Predicate,
errback=self.errback, callback=cb,
interval_max=14)
self.assertEqual(x, 42)
self.assertEqual(self.index, 9)
cb.assert_called_with()
finally:
utils.count = prev_count
@mock.sleepdeprived(module=utils)
def test_retry_once(self):
with self.assertRaises(self.Predicate):
retry_over_time(
self.myfun, self.Predicate,
max_retries=1, errback=self.errback, interval_max=14,
)
self.assertEqual(self.index, 1)
# no errback
with self.assertRaises(self.Predicate):
retry_over_time(
self.myfun, self.Predicate,
max_retries=1, errback=None, interval_max=14,
)
@mock.sleepdeprived(module=utils)
def test_retry_always(self):
Predicate = self.Predicate
class Fun(object):
def __init__(self):
self.calls = 0
def __call__(self, *args, **kwargs):
try:
if self.calls >= 10:
return 42
raise Predicate()
finally:
self.calls += 1
fun = Fun()
self.assertEqual(
retry_over_time(
fun, self.Predicate,
max_retries=0, errback=None, interval_max=14,
),
42,
)
self.assertEqual(fun.calls, 11)
class test_utils(Case):
def test_maybe_list(self):
self.assertIsNone(maybe_list(None))
self.assertEqual(maybe_list(1), [1])
self.assertEqual(maybe_list([1, 2, 3]), [1, 2, 3])
def test_fxrange_no_repeatlast(self):
self.assertEqual(list(fxrange(1.0, 3.0, 1.0)),
[1.0, 2.0, 3.0])
def test_fxrangemax(self):
self.assertEqual(list(fxrangemax(1.0, 3.0, 1.0, 30.0)),
[1.0, 2.0, 3.0, 3.0, 3.0, 3.0,
3.0, 3.0, 3.0, 3.0, 3.0])
self.assertEqual(list(fxrangemax(1.0, None, 1.0, 30.0)),
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
def test_reprkwargs(self):
self.assertTrue(reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'}))
def test_reprcall(self):
self.assertTrue(
reprcall('add', (2, 2), {'copy': True}),
)

View File

@ -0,0 +1,37 @@
from __future__ import absolute_import, unicode_literals
from kombu import Exchange
from kombu.utils.imports import symbol_by_name
from kombu.tests.case import Case, Mock
class test_symbol_by_name(Case):
def test_instance_returns_instance(self):
instance = object()
self.assertIs(symbol_by_name(instance), instance)
def test_returns_default(self):
default = object()
self.assertIs(
symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default),
default,
)
def test_no_default(self):
with self.assertRaises(ImportError):
symbol_by_name('xyz.ryx.qedoa.weq:foz')
def test_imp_reraises_ValueError(self):
imp = Mock()
imp.side_effect = ValueError()
with self.assertRaises(ValueError):
symbol_by_name('kombu.Connection', imp=imp)
def test_package(self):
self.assertIs(
symbol_by_name('.entity:Exchange', package='kombu'),
Exchange,
)
self.assertTrue(symbol_by_name(':Consumer', package='kombu'))

View File

@ -0,0 +1,55 @@
from __future__ import absolute_import, unicode_literals
from kombu.utils.objects import cached_property
from kombu.tests.case import Case
class test_cached_property(Case):
def test_deleting(self):
class X(object):
xx = False
@cached_property
def foo(self):
return 42
@foo.deleter # noqa
def foo(self, value):
self.xx = value
x = X()
del(x.foo)
self.assertFalse(x.xx)
x.__dict__['foo'] = 'here'
del(x.foo)
self.assertEqual(x.xx, 'here')
def test_when_access_from_class(self):
class X(object):
xx = None
@cached_property
def foo(self):
return 42
@foo.setter # noqa
def foo(self, value):
self.xx = 10
desc = X.__dict__['foo']
self.assertIs(X.foo, desc)
self.assertIs(desc.__get__(None), desc)
self.assertIs(desc.__set__(None, 1), desc)
self.assertIs(desc.__delete__(None), desc)
self.assertTrue(desc.setter(1))
x = X()
x.foo = 30
self.assertEqual(x.xx, 10)
del(x.foo)

View File

@ -1,31 +1,9 @@
from __future__ import absolute_import, unicode_literals
import pickle
from io import StringIO, BytesIO
from kombu import version_info_t
from kombu import utils
from kombu.five import python_2_unicode_compatible
from kombu.utils.text import version_string_as_tuple
from kombu.tests.case import Case, Mock, patch, mock
@python_2_unicode_compatible
class OldString(object):
def __init__(self, value):
self.value = value
def __str__(self):
return self.value
def split(self, *args, **kwargs):
return self.value.split(*args, **kwargs)
def rsplit(self, *args, **kwargs):
return self.value.rsplit(*args, **kwargs)
from kombu.tests.case import Case
class test_kombu_module(Case):
@ -35,300 +13,6 @@ class test_kombu_module(Case):
self.assertTrue(dir(kombu))
class test_utils(Case):
def test_maybe_list(self):
self.assertEqual(utils.maybe_list(None), [])
self.assertEqual(utils.maybe_list(1), [1])
self.assertEqual(utils.maybe_list([1, 2, 3]), [1, 2, 3])
def test_fxrange_no_repeatlast(self):
self.assertEqual(list(utils.fxrange(1.0, 3.0, 1.0)),
[1.0, 2.0, 3.0])
def test_fxrangemax(self):
self.assertEqual(list(utils.fxrangemax(1.0, 3.0, 1.0, 30.0)),
[1.0, 2.0, 3.0, 3.0, 3.0, 3.0,
3.0, 3.0, 3.0, 3.0, 3.0])
self.assertEqual(list(utils.fxrangemax(1.0, None, 1.0, 30.0)),
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
def test_reprkwargs(self):
self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'}))
def test_reprcall(self):
self.assertTrue(
utils.reprcall('add', (2, 2), {'copy': True}),
)
class test_UUID(Case):
def test_uuid4(self):
self.assertNotEqual(utils.uuid4(),
utils.uuid4())
def test_uuid(self):
i1 = utils.uuid()
i2 = utils.uuid()
self.assertIsInstance(i1, str)
self.assertNotEqual(i1, i2)
class MyStringIO(StringIO):
def close(self):
pass
class MyBytesIO(BytesIO):
def close(self):
pass
class test_emergency_dump_state(Case):
@mock.stdouts
def test_dump(self, stdout, stderr):
fh = MyBytesIO()
utils.emergency_dump_state(
{'foo': 'bar'}, open_file=lambda n, m: fh)
self.assertDictEqual(
pickle.loads(fh.getvalue()), {'foo': 'bar'})
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
@mock.stdouts
def test_dump_second_strategy(self, stdout, stderr):
fh = MyStringIO()
def raise_something(*args, **kwargs):
raise KeyError('foo')
utils.emergency_dump_state(
{'foo': 'bar'},
open_file=lambda n, m: fh, dump=raise_something
)
self.assertIn('foo', fh.getvalue())
self.assertIn('bar', fh.getvalue())
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
class test_retry_over_time(Case):
def setup(self):
self.index = 0
class Predicate(Exception):
pass
def myfun(self):
if self.index < 9:
raise self.Predicate()
return 42
def errback(self, exc, intervals, retries):
interval = next(intervals)
sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0)
self.index += 1
self.assertEqual(interval, sleepvals[self.index])
return interval
@mock.sleepdeprived(module=utils)
def test_simple(self):
prev_count, utils.count = utils.count, Mock()
try:
utils.count.return_value = list(range(1))
x = utils.retry_over_time(self.myfun, self.Predicate,
errback=None, interval_max=14)
self.assertIsNone(x)
utils.count.return_value = list(range(10))
cb = Mock()
x = utils.retry_over_time(self.myfun, self.Predicate,
errback=self.errback, callback=cb,
interval_max=14)
self.assertEqual(x, 42)
self.assertEqual(self.index, 9)
cb.assert_called_with()
finally:
utils.count = prev_count
@mock.sleepdeprived(module=utils)
def test_retry_once(self):
with self.assertRaises(self.Predicate):
utils.retry_over_time(
self.myfun, self.Predicate,
max_retries=1, errback=self.errback, interval_max=14,
)
self.assertEqual(self.index, 1)
# no errback
with self.assertRaises(self.Predicate):
utils.retry_over_time(
self.myfun, self.Predicate,
max_retries=1, errback=None, interval_max=14,
)
@mock.sleepdeprived(module=utils)
def test_retry_always(self):
Predicate = self.Predicate
class Fun(object):
def __init__(self):
self.calls = 0
def __call__(self, *args, **kwargs):
try:
if self.calls >= 10:
return 42
raise Predicate()
finally:
self.calls += 1
fun = Fun()
self.assertEqual(
utils.retry_over_time(
fun, self.Predicate,
max_retries=0, errback=None, interval_max=14,
),
42,
)
self.assertEqual(fun.calls, 11)
class test_cached_property(Case):
def test_deleting(self):
class X(object):
xx = False
@utils.cached_property
def foo(self):
return 42
@foo.deleter # noqa
def foo(self, value):
self.xx = value
x = X()
del(x.foo)
self.assertFalse(x.xx)
x.__dict__['foo'] = 'here'
del(x.foo)
self.assertEqual(x.xx, 'here')
def test_when_access_from_class(self):
class X(object):
xx = None
@utils.cached_property
def foo(self):
return 42
@foo.setter # noqa
def foo(self, value):
self.xx = 10
desc = X.__dict__['foo']
self.assertIs(X.foo, desc)
self.assertIs(desc.__get__(None), desc)
self.assertIs(desc.__set__(None, 1), desc)
self.assertIs(desc.__delete__(None), desc)
self.assertTrue(desc.setter(1))
x = X()
x.foo = 30
self.assertEqual(x.xx, 10)
del(x.foo)
class test_symbol_by_name(Case):
def test_instance_returns_instance(self):
instance = object()
self.assertIs(utils.symbol_by_name(instance), instance)
def test_returns_default(self):
default = object()
self.assertIs(
utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default),
default,
)
def test_no_default(self):
with self.assertRaises(ImportError):
utils.symbol_by_name('xyz.ryx.qedoa.weq:foz')
def test_imp_reraises_ValueError(self):
imp = Mock()
imp.side_effect = ValueError()
with self.assertRaises(ValueError):
utils.symbol_by_name('kombu.Connection', imp=imp)
def test_package(self):
from kombu.entity import Exchange
self.assertIs(
utils.symbol_by_name('.entity:Exchange', package='kombu'),
Exchange,
)
self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu'))
class test_ChannelPromise(Case):
def test_repr(self):
obj = Mock(name='cb')
self.assertIn(
'promise',
repr(utils.ChannelPromise(obj)),
)
obj.assert_not_called()
class test_entrypoints(Case):
@mock.mask_modules('pkg_resources')
def test_without_pkg_resources(self):
self.assertListEqual(list(utils.entrypoints('kombu.test')), [])
@mock.module_exists('pkg_resources')
def test_with_pkg_resources(self):
with patch('pkg_resources.iter_entry_points', create=True) as iterep:
eps = iterep.return_value = [Mock(), Mock()]
self.assertTrue(list(utils.entrypoints('kombu.test')))
iterep.assert_called_with('kombu.test')
eps[0].load.assert_called_with()
eps[1].load.assert_called_with()
class test_shufflecycle(Case):
def test_shuffles(self):
prev_repeat, utils.repeat = utils.repeat, Mock()
try:
utils.repeat.return_value = list(range(10))
values = {'A', 'B', 'C'}
cycle = utils.shufflecycle(values)
seen = set()
for i in range(10):
next(cycle)
utils.repeat.assert_called_with(None)
self.assertTrue(seen.issubset(values))
with self.assertRaises(StopIteration):
next(cycle)
next(cycle)
finally:
utils.repeat = prev_repeat
class test_version_string_as_tuple(Case):
def test_versions(self):
@ -356,13 +40,3 @@ class test_version_string_as_tuple(Case):
version_string_as_tuple('3.3.1.a3.40c32'),
version_info_t(3, 3, 1, 'a3', '40c32'),
)
class test_maybe_fileno(Case):
def test_maybe_fileno(self):
self.assertEqual(utils.maybe_fileno(3), 3)
f = Mock(name='file')
self.assertIs(utils.maybe_fileno(f), f.fileno())
f.fileno.side_effect = ValueError()
self.assertIsNone(utils.maybe_fileno(f))

View File

@ -0,0 +1,17 @@
from __future__ import absolute_import, unicode_literals
from kombu.utils.uuid import uuid
from kombu.tests.case import Case
class test_UUID(Case):
def test_uuid4(self):
self.assertNotEqual(uuid(), uuid())
def test_uuid(self):
i1 = uuid()
i2 = uuid()
self.assertIsInstance(i1, str)
self.assertNotEqual(i1, i2)

View File

@ -7,9 +7,9 @@ import string
import os
from kombu.five import Empty, text_t
from kombu.utils import cached_property # , uuid
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
from . import virtual

View File

@ -51,10 +51,10 @@ from kombu.async.aws.sqs.ext import regions
from kombu.async.aws.sqs.message import Message
from kombu.five import Empty, range, string_t, text_t
from kombu.log import get_logger
from kombu.utils import cached_property
from kombu.utils import scheduling
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
from kombu.utils import scheduling
from kombu.utils.objects import cached_property
from . import virtual

View File

@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from kombu.five import string_t
from kombu.syn import _detect_environment
from kombu.utils import symbol_by_name
from kombu.utils.imports import symbol_by_name
def supports_librabbitmq():

View File

@ -8,7 +8,7 @@ from amqp.exceptions import RecoverableConnectionError
from kombu.exceptions import ChannelError, ConnectionError
from kombu.message import Message
from kombu.utils import cached_property
from kombu.utils.objects import cached_property
__all__ = ['Message', 'StdChannel', 'Management', 'Transport']

View File

@ -15,8 +15,8 @@ from contextlib import contextmanager
from kombu.exceptions import ChannelError
from kombu.five import Empty, monotonic
from kombu.log import get_logger
from kombu.utils import cached_property
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
from . import virtual

View File

@ -12,9 +12,9 @@ import tempfile
from . import virtual
from kombu.exceptions import ChannelError
from kombu.five import Empty, monotonic
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str, str_to_bytes
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
VERSION = (1, 0, 0)

View File

@ -17,7 +17,7 @@ from kombu.five import Empty
from kombu.syn import _detect_environment
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads, dumps
from kombu.utils import cached_property
from kombu.utils.objects import cached_property
from . import virtual

View File

@ -7,7 +7,7 @@ from __future__ import absolute_import, unicode_literals
import sys
from kombu.five import reraise
from kombu.utils import cached_property
from kombu.utils.objects import cached_property
from . import virtual

View File

@ -14,12 +14,14 @@ from vine import promise
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, values, string_t
from kombu.log import get_logger
from kombu.utils import cached_property, register_after_fork, uuid
from kombu.utils.compat import register_after_fork
from kombu.utils.eventio import poll, READ, ERR
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
from kombu.utils.scheduling import cycle_by_name
from kombu.utils.url import _parse_url
from kombu.utils.uuid import uuid
from . import virtual

View File

@ -20,9 +20,10 @@ from amqp.protocol import queue_declare_ok_t
from kombu.exceptions import ResourceError, ChannelError
from kombu.five import Empty, items, monotonic
from kombu.log import get_logger
from kombu.utils import emergency_dump_state, uuid
from kombu.utils.encoding import str_to_bytes, bytes_to_str
from kombu.utils.div import emergency_dump_state
from kombu.utils.scheduling import FairCycle
from kombu.utils.uuid import uuid
from kombu.transport import base

View File

@ -5,10 +5,10 @@ by the AMQ protocol (excluding the `headers` exchange).
"""
from __future__ import absolute_import, unicode_literals
from kombu.utils import escape_regex
import re
from kombu.utils.text import escape_regex
class ExchangeType(object):
"""Implements the specifics for an exchange type.

View File

@ -1,36 +1,14 @@
"""Internal utilities."""
"""DEPRECATED - Import from modules below"""
from __future__ import absolute_import, print_function, unicode_literals
import importlib
import numbers
import random
import sys
from contextlib import contextmanager
from itertools import count, repeat
from time import sleep
from uuid import uuid4
from vine.utils import wraps
from kombu.five import items, python_2_unicode_compatible, reraise, string_t
from .encoding import default_encode, safe_repr as _safe_repr
try:
from io import UnsupportedOperation
FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
except ImportError: # pragma: no cover
# Py2
FILENO_ERRORS = (AttributeError, ValueError) # noqa
try:
from billiard.util import register_after_fork
except ImportError: # pragma: no cover
try:
from multiprocessing.util import register_after_fork # noqa
except ImportError:
register_after_fork = None # noqa
from .collections import EqualityDict
from .compat import fileno, maybe_fileno, nested, register_after_fork
from .div import emergency_dump_state
from .functional import (
fxrange, fxrangemax, maybe_list, reprcall, retry_over_time,
)
from .objects import cached_property
from .uuid import uuid
__all__ = [
'EqualityDict', 'uuid', 'maybe_list',
@ -38,395 +16,3 @@ __all__ = [
'emergency_dump_state', 'cached_property', 'register_after_fork',
'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno',
]
def symbol_by_name(name, aliases={}, imp=None, package=None,
sep='.', default=None, **kwargs):
"""Get symbol by qualified name.
The name should be the full dot-separated path to the class::
modulename.ClassName
Example::
celery.concurrency.processes.TaskPool
^- class name
or using ':' to separate module and symbol::
celery.concurrency.processes:TaskPool
If `aliases` is provided, a dict containing short name/long name
mappings, the name is looked up in the aliases first.
Examples:
>>> symbol_by_name('celery.concurrency.processes.TaskPool')
<class 'celery.concurrency.processes.TaskPool'>
>>> symbol_by_name('default', {
... 'default': 'celery.concurrency.processes.TaskPool'})
<class 'celery.concurrency.processes.TaskPool'>
# Does not try to look up non-string names.
>>> from celery.concurrency.processes import TaskPool
>>> symbol_by_name(TaskPool) is TaskPool
True
"""
if imp is None:
imp = importlib.import_module
if not isinstance(name, string_t):
return name # already a class
name = aliases.get(name) or name
sep = ':' if ':' in name else sep
module_name, _, cls_name = name.rpartition(sep)
if not module_name:
cls_name, module_name = None, package if package else cls_name
try:
try:
module = imp(module_name, package=package, **kwargs)
except ValueError as exc:
reraise(ValueError,
ValueError("Couldn't import {0!r}: {1}".format(name, exc)),
sys.exc_info()[2])
return getattr(module, cls_name) if cls_name else module
except (ImportError, AttributeError):
if default is None:
raise
return default
class HashedSeq(list):
"""type used for hash() to make sure the hash is not generated
multiple times."""
__slots__ = 'hashvalue'
def __init__(self, *seq):
self[:] = seq
self.hashvalue = hash(seq)
def __hash__(self):
return self.hashvalue
def eqhash(o):
try:
return o.__eqhash__()
except AttributeError:
return hash(o)
class EqualityDict(dict):
def __getitem__(self, key):
h = eqhash(key)
if h not in self:
return self.__missing__(key)
return dict.__getitem__(self, h)
def __setitem__(self, key, value):
return dict.__setitem__(self, eqhash(key), value)
def __delitem__(self, key):
return dict.__delitem__(self, eqhash(key))
def uuid():
"""Generate a unique id, having - hopefully - a very small chance of
collision.
For now this is provided by :func:`uuid.uuid4`.
"""
return str(uuid4())
def maybe_list(v):
if v is None:
return []
if hasattr(v, '__iter__'):
return v
return [v]
def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False):
cur = start * 1.0
while 1:
if not stop or cur <= stop:
yield cur
cur += step
else:
if not repeatlast:
break
yield cur - step
def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
sum_, cur = 0, start * 1.0
while 1:
if sum_ >= max:
break
yield cur
if stop:
cur = min(cur + step, stop)
else:
cur += step
sum_ += cur
def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
is increased for every retry until the max seconds is reached.
Arguments:
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``
is raised. The callback must take three arguments:
``exc``, ``interval_range`` and ``retries``, where ``exc``
is the exception instance, ``interval_range`` is an iterator
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
If this is not set, we will retry forever.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
"""
retries = 0
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
for retries in count():
try:
return fun(*args, **kwargs)
except catch as exc:
if max_retries and retries >= max_retries:
raise
if callback:
callback()
tts = float(errback(exc, interval_range, retries) if errback
else next(interval_range))
if tts:
for _ in range(int(tts)):
if callback:
callback()
sleep(1.0)
# sleep remainder after int truncation above.
sleep(abs(int(tts) - tts))
def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
from pprint import pformat
from tempfile import mktemp
stderr = sys.stderr if stderr is None else stderr
if dump is None:
import pickle
dump = pickle.dump
persist = mktemp()
print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), ## noqa
file=stderr)
fh = open_file(persist, 'w')
try:
try:
dump(state, fh, protocol=0)
except Exception as exc:
print( # noqa
'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc),
file=stderr,
)
fh.write(default_encode(pformat(state)))
finally:
fh.flush()
fh.close()
return persist
class cached_property(object):
"""Property descriptor that caches the return value
of the get function.
Examples:
.. code-block:: python
@cached_property
def connection(self):
return Connection()
@connection.setter # Prepares stored value
def connection(self, value):
if value is None:
raise TypeError('Connection must be a connection')
return value
@connection.deleter
def connection(self, value):
# Additional action to do at del(self.attr)
if value is not None:
print('Connection {0!r} deleted'.format(value)
"""
def __init__(self, fget=None, fset=None, fdel=None, doc=None):
self.__get = fget
self.__set = fset
self.__del = fdel
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
def __get__(self, obj, type=None):
if obj is None:
return self
try:
return obj.__dict__[self.__name__]
except KeyError:
value = obj.__dict__[self.__name__] = self.__get(obj)
return value
def __set__(self, obj, value):
if obj is None:
return self
if self.__set is not None:
value = self.__set(obj, value)
obj.__dict__[self.__name__] = value
def __delete__(self, obj, _sentinel=object()):
if obj is None:
return self
value = obj.__dict__.pop(self.__name__, _sentinel)
if self.__del is not None and value is not _sentinel:
self.__del(obj, value)
def setter(self, fset):
return self.__class__(self.__get, fset, self.__del)
def deleter(self, fdel):
return self.__class__(self.__get, self.__set, fdel)
def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'):
return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs))
def reprcall(name, args=(), kwargs={}, sep=', '):
return '{0}({1}{2}{3})'.format(
name, sep.join(map(_safe_repr, args or ())),
(args and kwargs) and sep or '',
reprkwargs(kwargs, sep),
)
@contextmanager
def nested(*managers): # pragma: no cover
# flake8: noqa
"""Combine multiple context managers into a single nested
context manager."""
exits = []
vars = []
exc = (None, None, None)
try:
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
# Don't rely on sys.exc_info() still containing
# the right information. Another exception may
# have been raised and caught by an exit method
reraise(exc[0], exc[1], exc[2])
finally:
del(exc)
def shufflecycle(it):
it = list(it) # don't modify callers list
shuffle = random.shuffle
for _ in repeat(None):
shuffle(it)
yield it[0]
def entrypoints(namespace):
try:
from pkg_resources import iter_entry_points
except ImportError:
return iter([])
return ((ep, ep.load()) for ep in iter_entry_points(namespace))
@python_2_unicode_compatible
class ChannelPromise(object):
def __init__(self, contract):
self.__contract__ = contract
def __call__(self):
try:
return self.__value__
except AttributeError:
value = self.__value__ = self.__contract__()
return value
def __repr__(self):
try:
return repr(self.__value__)
except AttributeError:
return '<promise: 0x{0:x}>'.format(id(self.__contract__))
def escape_regex(p, white=''):
# what's up with re.escape? that code must be neglected or someting
return ''.join(c if c.isalnum() or c in white
else ('\\000' if c == '\000' else '\\' + c)
for c in p)
def fileno(f):
if isinstance(f, numbers.Integral):
return f
return f.fileno()
def maybe_fileno(f):
"""Get object fileno, or :const:`None` if not defined."""
try:
return fileno(f)
except FILENO_ERRORS:
pass
def coro(gen):
@wraps(gen)
def wind_up(*args, **kwargs):
it = gen(*args, **kwargs)
next(it)
return it
return wind_up

View File

@ -0,0 +1,37 @@
"""Custom maps, sequences, etc."""
from __future__ import absolute_import, unicode_literals
class HashedSeq(list):
"""type used for hash() to make sure the hash is not generated
multiple times."""
__slots__ = 'hashvalue'
def __init__(self, *seq):
self[:] = seq
self.hashvalue = hash(seq)
def __hash__(self):
return self.hashvalue
def eqhash(o):
try:
return o.__eqhash__()
except AttributeError:
return hash(o)
class EqualityDict(dict):
def __getitem__(self, key):
h = eqhash(key)
if h not in self:
return self.__missing__(key)
return dict.__getitem__(self, h)
def __setitem__(self, key, value):
return dict.__setitem__(self, eqhash(key), value)
def __delitem__(self, key):
return dict.__delitem__(self, eqhash(key))

92
kombu/utils/compat.py Normal file
View File

@ -0,0 +1,92 @@
from __future__ import absolute_import, unicode_literals
import numbers
import sys
from functools import wraps
from contextlib import contextmanager
from kombu.five import reraise
try:
from io import UnsupportedOperation
FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
except ImportError: # pragma: no cover
# Py2
FILENO_ERRORS = (AttributeError, ValueError) # noqa
try:
from billiard.util import register_after_fork
except ImportError: # pragma: no cover
try:
from multiprocessing.util import register_after_fork # noqa
except ImportError:
register_after_fork = None # noqa
def coro(gen):
@wraps(gen)
def wind_up(*args, **kwargs):
it = gen(*args, **kwargs)
next(it)
return it
return wind_up
def entrypoints(namespace):
try:
from pkg_resources import iter_entry_points
except ImportError:
return iter([])
return ((ep, ep.load()) for ep in iter_entry_points(namespace))
def fileno(f):
if isinstance(f, numbers.Integral):
return f
return f.fileno()
def maybe_fileno(f):
"""Get object fileno, or :const:`None` if not defined."""
try:
return fileno(f)
except FILENO_ERRORS:
pass
@contextmanager
def nested(*managers): # pragma: no cover
# flake8: noqa
"""Combine multiple context managers into a single nested
context manager."""
exits = []
vars = []
exc = (None, None, None)
try:
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
# Don't rely on sys.exc_info() still containing
# the right information. Another exception may
# have been raised and caught by an exit method
reraise(exc[0], exc[1], exc[2])
finally:
del(exc)

32
kombu/utils/div.py Normal file
View File

@ -0,0 +1,32 @@
from __future__ import absolute_import, unicode_literals, print_function
from .encoding import default_encode
import sys
def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
from pprint import pformat
from tempfile import mktemp
stderr = sys.stderr if stderr is None else stderr
if dump is None:
import pickle
dump = pickle.dump
persist = mktemp()
print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), # noqa
file=stderr)
fh = open_file(persist, 'w')
try:
try:
dump(state, fh, protocol=0)
except Exception as exc:
print( # noqa
'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc),
file=stderr,
)
fh.write(default_encode(pformat(state)))
finally:
fh.flush()
fh.close()
return persist

View File

@ -1,9 +1,12 @@
from __future__ import absolute_import, unicode_literals
import random
import sys
import threading
from collections import Iterable, Mapping, OrderedDict
from itertools import count, repeat
from time import sleep
from vine.utils import wraps
@ -11,6 +14,8 @@ from kombu.five import (
UserDict, items, keys, python_2_unicode_compatible, string_t,
)
from .encoding import safe_repr as _safe_repr
__all__ = [
'LRUCache', 'memoize', 'lazy', 'maybe_evaluate',
'is_list', 'maybe_list', 'dictfilter',
@ -19,6 +24,26 @@ __all__ = [
KEYWORD_MARK = object()
@python_2_unicode_compatible
class ChannelPromise(object):
def __init__(self, contract):
self.__contract__ = contract
def __call__(self):
try:
return self.__value__
except AttributeError:
value = self.__value__ = self.__contract__()
return value
def __repr__(self):
try:
return repr(self.__value__)
except AttributeError:
return '<promise: 0x{0:x}>'.format(id(self.__contract__))
class LRUCache(UserDict):
"""LRU Cache implementation using a doubly linked list to track access.
@ -231,6 +256,104 @@ def dictfilter(d=None, **kw):
return {k: v for k, v in items(d) if v is not None}
def shufflecycle(it):
it = list(it) # don't modify callers list
shuffle = random.shuffle
for _ in repeat(None):
shuffle(it)
yield it[0]
def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False):
cur = start * 1.0
while 1:
if not stop or cur <= stop:
yield cur
cur += step
else:
if not repeatlast:
break
yield cur - step
def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
sum_, cur = 0, start * 1.0
while 1:
if sum_ >= max:
break
yield cur
if stop:
cur = min(cur + step, stop)
else:
cur += step
sum_ += cur
def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
is increased for every retry until the max seconds is reached.
Arguments:
fun (Callable): The function to try
catch (Tuple[BaseException]): Exceptions to catch, can be either
tuple or a single exception class.
Keyword Arguments:
args (Tuple): Positional arguments passed on to the function.
kwargs (Dict): Keyword arguments passed on to the function.
errback (Callable): Callback for when an exception in ``catch``
is raised. The callback must take three arguments:
``exc``, ``interval_range`` and ``retries``, where ``exc``
is the exception instance, ``interval_range`` is an iterator
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
If this is not set, we will retry forever.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
"""
retries = 0
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
for retries in count():
try:
return fun(*args, **kwargs)
except catch as exc:
if max_retries and retries >= max_retries:
raise
if callback:
callback()
tts = float(errback(exc, interval_range, retries) if errback
else next(interval_range))
if tts:
for _ in range(int(tts)):
if callback:
callback()
sleep(1.0)
# sleep remainder after int truncation above.
sleep(abs(int(tts) - tts))
def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'):
return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs))
def reprcall(name, args=(), kwargs={}, sep=', '):
return '{0}({1}{2}{3})'.format(
name, sep.join(map(_safe_repr, args or ())),
(args and kwargs) and sep or '',
reprkwargs(kwargs, sep),
)
# Compat names (before kombu 3.0)
promise = lazy
maybe_promise = maybe_evaluate

65
kombu/utils/imports.py Normal file
View File

@ -0,0 +1,65 @@
"""Import related utilities."""
from __future__ import absolute_import, unicode_literals
import importlib
import sys
from kombu.five import reraise, string_t
def symbol_by_name(name, aliases={}, imp=None, package=None,
sep='.', default=None, **kwargs):
"""Get symbol by qualified name.
The name should be the full dot-separated path to the class::
modulename.ClassName
Example::
celery.concurrency.processes.TaskPool
^- class name
or using ':' to separate module and symbol::
celery.concurrency.processes:TaskPool
If `aliases` is provided, a dict containing short name/long name
mappings, the name is looked up in the aliases first.
Examples:
>>> symbol_by_name('celery.concurrency.processes.TaskPool')
<class 'celery.concurrency.processes.TaskPool'>
>>> symbol_by_name('default', {
... 'default': 'celery.concurrency.processes.TaskPool'})
<class 'celery.concurrency.processes.TaskPool'>
# Does not try to look up non-string names.
>>> from celery.concurrency.processes import TaskPool
>>> symbol_by_name(TaskPool) is TaskPool
True
"""
if imp is None:
imp = importlib.import_module
if not isinstance(name, string_t):
return name # already a class
name = aliases.get(name) or name
sep = ':' if ':' in name else sep
module_name, _, cls_name = name.rpartition(sep)
if not module_name:
cls_name, module_name = None, package if package else cls_name
try:
try:
module = imp(module_name, package=package, **kwargs)
except ValueError as exc:
reraise(ValueError,
ValueError("Couldn't import {0!r}: {1}".format(name, exc)),
sys.exc_info()[2])
return getattr(module, cls_name) if cls_name else module
except (ImportError, AttributeError):
if default is None:
raise
return default

63
kombu/utils/objects.py Normal file
View File

@ -0,0 +1,63 @@
from __future__ import absolute_import, unicode_literals
class cached_property(object):
"""Property descriptor that caches the return value
of the get function.
Examples:
.. code-block:: python
@cached_property
def connection(self):
return Connection()
@connection.setter # Prepares stored value
def connection(self, value):
if value is None:
raise TypeError('Connection must be a connection')
return value
@connection.deleter
def connection(self, value):
# Additional action to do at del(self.attr)
if value is not None:
print('Connection {0!r} deleted'.format(value)
"""
def __init__(self, fget=None, fset=None, fdel=None, doc=None):
self.__get = fget
self.__set = fset
self.__del = fdel
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
def __get__(self, obj, type=None):
if obj is None:
return self
try:
return obj.__dict__[self.__name__]
except KeyError:
value = obj.__dict__[self.__name__] = self.__get(obj)
return value
def __set__(self, obj, value):
if obj is None:
return self
if self.__set is not None:
value = self.__set(obj, value)
obj.__dict__[self.__name__] = value
def __delete__(self, obj, _sentinel=object()):
if obj is None:
return self
value = obj.__dict__.pop(self.__name__, _sentinel)
if self.__del is not None and value is not _sentinel:
self.__del(obj, value)
def setter(self, fset):
return self.__class__(self.__get, fset, self.__del)
def deleter(self, fdel):
return self.__class__(self.__get, self.__set, fdel)

View File

@ -5,7 +5,7 @@ from itertools import count
from kombu.five import python_2_unicode_compatible
from . import symbol_by_name
from .imports import symbol_by_name
__all__ = [
'FairCycle', 'priority_cycle', 'round_robin_cycle', 'sorted_cycle',

View File

@ -7,6 +7,13 @@ from kombu import version_info_t
from kombu.five import string_t
def escape_regex(p, white=''):
# what's up with re.escape? that code must be neglected or someting
return ''.join(c if c.isalnum() or c in white
else ('\\000' if c == '\000' else '\\' + c)
for c in p)
def fmatch_iter(needle, haystack, min_ratio=0.6):
for key in haystack:
ratio = SequenceMatcher(None, needle, key).ratio()

13
kombu/utils/uuid.py Normal file
View File

@ -0,0 +1,13 @@
from __future__ import absolute_import, unicode_literals
from uuid import uuid4
def uuid(_uuid=uuid4):
"""Generate a unique id, having - hopefully - a very small chance of
collision.
See Also:
For now this is provided by :func:`uuid.uuid4`.
"""
return str(_uuid())