mirror of https://github.com/celery/kombu.git
Adds kombu.clocks.timetuple
This commit is contained in:
parent
d260459269
commit
8ddfb92557
|
@ -7,12 +7,59 @@ Logical Clocks and Synchronization.
|
|||
"""
|
||||
from __future__ import absolute_import
|
||||
|
||||
from itertools import islice
|
||||
from threading import Lock
|
||||
from itertools import islice
|
||||
from operator import itemgetter
|
||||
|
||||
from .five import zip
|
||||
|
||||
__all__ = ['LamportClock']
|
||||
__all__ = ['LamportClock', 'timetuple']
|
||||
|
||||
R_CLOCK = '_lamport(clock={0}, timestamp={1}, id={2} {3!r})'
|
||||
|
||||
|
||||
class timetuple(tuple):
|
||||
"""Tuple of event clock information.
|
||||
|
||||
Can be used as part of a heap to keep events ordered.
|
||||
|
||||
:param clock: Event clock value.
|
||||
:param timestamp: Event UNIX timestamp value.
|
||||
:param id: Event host id (e.g. ``hostname:pid``).
|
||||
:param obj: Optional obj to associate with this event.
|
||||
|
||||
"""
|
||||
__slots__ = ()
|
||||
|
||||
def __new__(cls, clock, timestamp, id, obj=None):
|
||||
return tuple.__new__(cls, (clock, timestamp, id, obj))
|
||||
|
||||
def __repr__(self):
|
||||
return R_CLOCK.format(*self)
|
||||
|
||||
def __getnewargs__(self):
|
||||
return tuple(self)
|
||||
|
||||
def __lt__(self, other):
|
||||
# 0: clock 1: timestamp 3: process id
|
||||
try:
|
||||
A, B = self[0], other[0]
|
||||
# uses logical clock value first
|
||||
if A and B: # use logical clock if available
|
||||
if A == B: # equal clocks use lower process id
|
||||
return self[2] < other[2]
|
||||
return A < B
|
||||
return self[1] < other[1] # ... or use timestamp
|
||||
except IndexError:
|
||||
return NotImplemented
|
||||
__gt__ = lambda self, other: other < self
|
||||
__le__ = lambda self, other: not other < self
|
||||
__ge__ = lambda self, other: not self < other
|
||||
|
||||
clock = property(itemgetter(0))
|
||||
timestamp = property(itemgetter(1))
|
||||
id = property(itemgetter(2))
|
||||
obj = property(itemgetter(3))
|
||||
|
||||
|
||||
class LamportClock(object):
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import pickle
|
||||
|
||||
from heapq import heappush
|
||||
from time import time
|
||||
|
||||
from kombu.clocks import LamportClock
|
||||
from kombu.clocks import LamportClock, timetuple
|
||||
|
||||
from .case import Case
|
||||
from .case import Mock, Case
|
||||
|
||||
|
||||
class test_LamportClock(Case):
|
||||
|
@ -53,3 +56,48 @@ class test_LamportClock(Case):
|
|||
self.assertEqual(c.sort_heap(events), m1)
|
||||
self.assertEqual(c.sort_heap([m4, m5]), m4)
|
||||
self.assertEqual(c.sort_heap([m4, m5, m1]), m4)
|
||||
|
||||
|
||||
class test_timetuple(Case):
|
||||
|
||||
def test_repr(self):
|
||||
x = timetuple(133, time(), 'id', Mock())
|
||||
self.assertTrue(repr(x))
|
||||
|
||||
def test_pickleable(self):
|
||||
x = timetuple(133, time(), 'id', 'obj')
|
||||
self.assertEqual(pickle.loads(pickle.dumps(x)), tuple(x))
|
||||
|
||||
def test_order(self):
|
||||
t1 = time()
|
||||
a = timetuple(133, t1, 'A', 'obj')
|
||||
b = timetuple(140, t1, 'A', 'obj')
|
||||
self.assertTrue(a.__getnewargs__())
|
||||
self.assertEqual(a.clock, 133)
|
||||
self.assertEqual(a.timestamp, t1)
|
||||
self.assertEqual(a.id, 'A')
|
||||
self.assertEqual(a.obj, 'obj')
|
||||
self.assertTrue(
|
||||
a <= b,
|
||||
)
|
||||
self.assertTrue(
|
||||
b >= a,
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
timetuple(134, time(), 'A', 'obj').__lt__(tuple()),
|
||||
NotImplemented,
|
||||
)
|
||||
self.assertGreater(
|
||||
timetuple(134, time(), 'A', 'obj'),
|
||||
timetuple(133, time(), 'A', 'obj'),
|
||||
)
|
||||
self.assertGreater(
|
||||
timetuple(134, t1, 'B', 'obj'),
|
||||
timetuple(134, t1, 'A', 'obj'),
|
||||
)
|
||||
|
||||
self.assertGreater(
|
||||
timetuple(None, time(), 'B', 'obj'),
|
||||
timetuple(None, t1, 'A', 'obj'),
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue