locks: Avoid reusing Future objects in Event

Now that futures support cancellation, it is unsafe to return the same
Future object to multiple callers.

Fixes #2189
This commit is contained in:
Ben Darnell 2018-01-21 18:17:47 -05:00
parent b66acec055
commit 606f0b5e90
1 changed files with 24 additions and 8 deletions

View File

@ -15,6 +15,7 @@
from __future__ import absolute_import, division, print_function
import collections
from concurrent.futures import CancelledError
from tornado import gen, ioloop
from tornado.concurrent import Future, future_set_result_unless_cancelled
@ -196,7 +197,8 @@ class Event(object):
Done
"""
def __init__(self):
self._future = Future()
self._value = False
self._waiters = set()
def __repr__(self):
return '<%s %s>' % (
@ -204,23 +206,26 @@ class Event(object):
def is_set(self):
"""Return ``True`` if the internal flag is true."""
return self._future.done()
return self._value
def set(self):
"""Set the internal flag to ``True``. All waiters are awakened.
Calling `.wait` once the flag is set will not block.
"""
if not self._future.done():
self._future.set_result(None)
if not self._value:
self._value = True
for fut in self._waiters:
if not fut.done():
fut.set_result(None)
def clear(self):
"""Reset the internal flag to ``False``.
Calls to `.wait` will block until `.set` is called.
"""
if self._future.done():
self._future = Future()
self._value = False
def wait(self, timeout=None):
"""Block until the internal flag is true.
@ -228,10 +233,21 @@ class Event(object):
Returns a Future, which raises `tornado.util.TimeoutError` after a
timeout.
"""
fut = Future()
if self._value:
fut.set_result(None)
return fut
self._waiters.add(fut)
fut.add_done_callback(lambda fut: self._waiters.remove(fut))
if timeout is None:
return self._future
return fut
else:
return gen.with_timeout(timeout, self._future)
timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,))
# This is a slightly clumsy workaround for the fact that
# gen.with_timeout doesn't cancel its futures. Cancelling
# fut will remove it from the waiters list.
timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None)
return timeout_fut
class _ReleasingContextManager(object):