diff --git a/tornado/ioloop.py b/tornado/ioloop.py index dd9639c0..bee3be48 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -485,6 +485,7 @@ class PollIOLoop(IOLoop): self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] + self._cancellations = 0 self._running = False self._stopped = False self._closing = False @@ -606,6 +607,7 @@ class PollIOLoop(IOLoop): if self._timeouts[0].callback is None: # the timeout was cancelled heapq.heappop(self._timeouts) + self._cancellations -= 1 elif self._timeouts[0].deadline <= now: timeout = heapq.heappop(self._timeouts) self._run_callback(timeout.callback) @@ -613,6 +615,14 @@ class PollIOLoop(IOLoop): seconds = self._timeouts[0].deadline - now poll_timeout = min(seconds, poll_timeout) break + if (self._cancellations > 512 + and self._cancellations > (len(self._timeouts) >> 1)): + # Clean up the timeout queue when it gets large and it's + # more than half cancellations. + self._cancellations = 0 + self._timeouts = [x for x in self._timeouts + if x.callback is not None] + heapq.heapify(self._timeouts) if self._callbacks: # If any callbacks or timeouts called add_callback, @@ -693,6 +703,7 @@ class PollIOLoop(IOLoop): # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None + self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): with self._callback_lock: diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index 4986637e..d5a3b3b6 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -157,6 +157,20 @@ class TestIOLoop(AsyncTestCase): self.wait() self.io_loop.remove_timeout(handle) + def test_remove_timeout_cleanup(self): + # Add and remove enough callbacks to trigger cleanup. + # Not a very thorough test, but it ensures that the cleanup code + # gets executed and doesn't blow up. This test is only really useful + # on PollIOLoop subclasses, but it should run silently on any + # implementation. + for i in range(2000): + timeout = self.io_loop.add_timeout(self.io_loop.time() + 3600, + lambda: None) + self.io_loop.remove_timeout(timeout) + # HACK: wait two IOLoop iterations for the GC to happen. + self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop)) + self.wait() + # Deliberately not a subclass of AsyncTestCase so the IOLoop isn't # automatically set as current.