Fix TwistedIOLoop on epoll.

Twisted's removeAll doesn't like it if a socket is closed without being
unregistered.  EPoll generates events slightly differently so be more
careful about doubled connectionLost events.
This commit is contained in:
Ben Darnell 2012-10-07 16:59:36 -07:00
parent 3654790fcb
commit c04005b7ba
5 changed files with 33 additions and 10 deletions

View File

@ -1,5 +1,5 @@
[tox]
envlist = py27-full, py25-full, py32, py25, py27, py27-select
envlist = py27-full, py25-full, py32, py25, py27, py27-select, py27-twisted
setupdir=/tornado
toxworkdir=/home/vagrant/tox-tornado
@ -36,3 +36,11 @@ deps =
pycurl
twisted==12.2.0
commands = python -m tornado.test.runtests --ioloop=tornado.platform.select.SelectIOLoop {posargs:}
[testenv:py27-twisted]
basepython = python2.7
deps =
futures
pycurl
twisted==12.2.0
commands = python -m tornado.test.runtests --ioloop=tornado.platform.twisted.TwistedIOLoop {posargs:}

View File

@ -191,6 +191,8 @@ class TornadoReactor(PosixReactorBase):
# IReactorFDSet
def _invoke_callback(self, fd, events):
if fd not in self._fds:
return
(reader, writer) = self._fds[fd]
if reader:
err = None
@ -356,18 +358,23 @@ class _FD(object):
self.handler = handler
self.reading = False
self.writing = False
self.lost = False
def fileno(self):
return self.fd
def doRead(self):
self.handler(self.fd, tornado.ioloop.IOLoop.READ)
if not self.lost:
self.handler(self.fd, tornado.ioloop.IOLoop.READ)
def doWrite(self):
self.handler(self.fd, tornado.ioloop.IOLoop.WRITE)
if not self.lost:
self.handler(self.fd, tornado.ioloop.IOLoop.WRITE)
def connectionLost(self, reason):
self.handler(self.fd, tornado.ioloop.IOLoop.ERROR)
if not self.lost:
self.handler(self.fd, tornado.ioloop.IOLoop.ERROR)
self.lost = True
def logPrefix(self):
return ''
@ -424,6 +431,7 @@ class TwistedIOLoop(tornado.ioloop.IOLoop):
self.reactor.removeWriter(self.fds[fd])
def remove_handler(self, fd):
self.fds[fd].lost = True
if self.fds[fd].reading:
self.reactor.removeReader(self.fds[fd])
if self.fds[fd].writing:

View File

@ -133,6 +133,7 @@ Transfer-Encoding: chunked
resp = self.wait()
resp.rethrow()
self.assertEqual(resp.body, b("12"))
self.io_loop.remove_handler(sock.fileno())
def test_basic_auth(self):
self.assertEqual(self.fetch("/auth", auth_username="Aladdin",

View File

@ -62,6 +62,7 @@ class TestIOLoop(AsyncTestCase):
sock.fileno(), lambda fd, events: None,
IOLoop.READ)
finally:
self.io_loop.remove_handler(sock.fileno())
sock.close()
def test_add_callback_from_signal(self):

View File

@ -18,6 +18,11 @@ from tornado.test.util import unittest, skipIfNonUnix
from tornado.util import b
from tornado.web import RequestHandler, Application
def skip_if_twisted():
if IOLoop.configured_class().__name__ == 'TwistedIOLoop':
raise unittest.SkipTest("Process tests not compatible with TwistedIOLoop")
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
class ProcessTest(unittest.TestCase):
def get_app(self):
@ -49,6 +54,9 @@ class ProcessTest(unittest.TestCase):
super(ProcessTest, self).tearDown()
def test_multi_process(self):
# This test can't work on twisted because we use the global reactor
# and have no way to get it back into a sane state after the fork.
skip_if_twisted()
with ExpectLog(gen_log, "(Starting .* processes|child .* exited|uncaught exception)"):
self.assertFalse(IOLoop.initialized())
sock, port = bind_unused_port()
@ -144,12 +152,9 @@ class SubprocessTest(AsyncTestCase):
data = self.wait()
self.assertEqual(data, b(""))
def skip_if_twisted(self):
if self.io_loop.__class__.__name__ == 'TwistedIOLoop':
raise unittest.SkipTest("SIGCHLD not compatible with Twisted IOLoop")
def test_sigchild(self):
self.skip_if_twisted()
# Twisted's SIGCHLD handler and Subprocess's conflict with each other.
skip_if_twisted()
Subprocess.initialize(io_loop=self.io_loop)
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c', 'pass'],
@ -160,7 +165,7 @@ class SubprocessTest(AsyncTestCase):
self.assertEqual(subproc.returncode, ret)
def test_sigchild_signal(self):
self.skip_if_twisted()
skip_if_twisted()
Subprocess.initialize(io_loop=self.io_loop)
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c',