diff --git a/kombu/clocks.py b/kombu/clocks.py index ee06d750..c37e3fff 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -7,12 +7,59 @@ Logical Clocks and Synchronization. """ from __future__ import absolute_import -from itertools import islice from threading import Lock +from itertools import islice +from operator import itemgetter from .five import zip -__all__ = ['LamportClock'] +__all__ = ['LamportClock', 'timetuple'] + +R_CLOCK = '_lamport(clock={0}, timestamp={1}, id={2} {3!r})' + + +class timetuple(tuple): + """Tuple of event clock information. + + Can be used as part of a heap to keep events ordered. + + :param clock: Event clock value. + :param timestamp: Event UNIX timestamp value. + :param id: Event host id (e.g. ``hostname:pid``). + :param obj: Optional obj to associate with this event. + + """ + __slots__ = () + + def __new__(cls, clock, timestamp, id, obj=None): + return tuple.__new__(cls, (clock, timestamp, id, obj)) + + def __repr__(self): + return R_CLOCK.format(*self) + + def __getnewargs__(self): + return tuple(self) + + def __lt__(self, other): + # 0: clock 1: timestamp 3: process id + try: + A, B = self[0], other[0] + # uses logical clock value first + if A and B: # use logical clock if available + if A == B: # equal clocks use lower process id + return self[2] < other[2] + return A < B + return self[1] < other[1] # ... or use timestamp + except IndexError: + return NotImplemented + __gt__ = lambda self, other: other < self + __le__ = lambda self, other: not other < self + __ge__ = lambda self, other: not self < other + + clock = property(itemgetter(0)) + timestamp = property(itemgetter(1)) + id = property(itemgetter(2)) + obj = property(itemgetter(3)) class LamportClock(object): diff --git a/kombu/tests/test_clocks.py b/kombu/tests/test_clocks.py index f3e10991..538ed0a5 100644 --- a/kombu/tests/test_clocks.py +++ b/kombu/tests/test_clocks.py @@ -1,10 +1,13 @@ from __future__ import absolute_import +import pickle + from heapq import heappush +from time import time -from kombu.clocks import LamportClock +from kombu.clocks import LamportClock, timetuple -from .case import Case +from .case import Mock, Case class test_LamportClock(Case): @@ -53,3 +56,48 @@ class test_LamportClock(Case): self.assertEqual(c.sort_heap(events), m1) self.assertEqual(c.sort_heap([m4, m5]), m4) self.assertEqual(c.sort_heap([m4, m5, m1]), m4) + + +class test_timetuple(Case): + + def test_repr(self): + x = timetuple(133, time(), 'id', Mock()) + self.assertTrue(repr(x)) + + def test_pickleable(self): + x = timetuple(133, time(), 'id', 'obj') + self.assertEqual(pickle.loads(pickle.dumps(x)), tuple(x)) + + def test_order(self): + t1 = time() + a = timetuple(133, t1, 'A', 'obj') + b = timetuple(140, t1, 'A', 'obj') + self.assertTrue(a.__getnewargs__()) + self.assertEqual(a.clock, 133) + self.assertEqual(a.timestamp, t1) + self.assertEqual(a.id, 'A') + self.assertEqual(a.obj, 'obj') + self.assertTrue( + a <= b, + ) + self.assertTrue( + b >= a, + ) + + self.assertEqual( + timetuple(134, time(), 'A', 'obj').__lt__(tuple()), + NotImplemented, + ) + self.assertGreater( + timetuple(134, time(), 'A', 'obj'), + timetuple(133, time(), 'A', 'obj'), + ) + self.assertGreater( + timetuple(134, t1, 'B', 'obj'), + timetuple(134, t1, 'A', 'obj'), + ) + + self.assertGreater( + timetuple(None, time(), 'B', 'obj'), + timetuple(None, t1, 'A', 'obj'), + )