Improve callback scheduling.

Collect all timeouts to be run before running any of them; this
prevents starvation when a slow callback reschedules itself.

Call time() again before setting poll_timeout to avoid scheduling
anomalies with slow callbacks.

Closes #947.
This commit is contained in:
Ben Darnell 2014-06-15 12:11:22 -04:00
parent 7a0eda1338
commit 1591ade4d0
1 changed files with 20 additions and 10 deletions

View File

@ -686,19 +686,16 @@ class PollIOLoop(IOLoop):
try:
while True:
poll_timeout = _POLL_TIMEOUT
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
for callback in callbacks:
self._run_callback(callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = None
# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
# schedule anything in this iteration.
if self._timeouts:
now = self.time()
while self._timeouts:
@ -708,11 +705,9 @@ class PollIOLoop(IOLoop):
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
timeout = heapq.heappop(self._timeouts)
self._run_callback(timeout.callback)
callbacks.append(timeout.callback)
del timeout
else:
seconds = self._timeouts[0].deadline - now
poll_timeout = min(seconds, poll_timeout)
break
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)):
@ -723,10 +718,25 @@ class PollIOLoop(IOLoop):
if x.callback is not None]
heapq.heapify(self._timeouts)
for callback in callbacks:
self._run_callback(callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = None
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
poll_timeout = 0.0
elif self._timeouts:
# If there are any timeouts, schedule the first one.
# Use self.time() instead of 'now' to account for time
# spent running callbacks.
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT
if not self._running:
break