Clean up cancelled timeout handles when the queue gets full of them.

Based on a patch by Giampaolo Radola:
https://github.com/facebook/tornado/issues/408

Closes #408.
This commit is contained in:
Ben Darnell 2013-05-12 23:32:24 -04:00
parent 6b8a38f07e
commit a6d158f155
2 changed files with 25 additions and 0 deletions

View File

@ -485,6 +485,7 @@ class PollIOLoop(IOLoop):
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
@ -606,6 +607,7 @@ class PollIOLoop(IOLoop):
if self._timeouts[0].callback is None:
# the timeout was cancelled
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
timeout = heapq.heappop(self._timeouts)
self._run_callback(timeout.callback)
@ -613,6 +615,14 @@ class PollIOLoop(IOLoop):
seconds = self._timeouts[0].deadline - now
poll_timeout = min(seconds, poll_timeout)
break
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
if self._callbacks:
# If any callbacks or timeouts called add_callback,
@ -693,6 +703,7 @@ class PollIOLoop(IOLoop):
# If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
self._cancellations += 1
def add_callback(self, callback, *args, **kwargs):
with self._callback_lock:

View File

@ -157,6 +157,20 @@ class TestIOLoop(AsyncTestCase):
self.wait()
self.io_loop.remove_timeout(handle)
def test_remove_timeout_cleanup(self):
# Add and remove enough callbacks to trigger cleanup.
# Not a very thorough test, but it ensures that the cleanup code
# gets executed and doesn't blow up. This test is only really useful
# on PollIOLoop subclasses, but it should run silently on any
# implementation.
for i in range(2000):
timeout = self.io_loop.add_timeout(self.io_loop.time() + 3600,
lambda: None)
self.io_loop.remove_timeout(timeout)
# HACK: wait two IOLoop iterations for the GC to happen.
self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
self.wait()
# Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
# automatically set as current.