From d79458151c6992a1b8258f056e574dade2cc62ac Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Fri, 11 May 2012 21:48:21 -0700 Subject: [PATCH] Move tornado.platform.windows.Waker to new module platform.common. Cleaned up the few windows-specific bits of code. The socket-based Waker is also usable on Jython. --- tornado/platform/auto.py | 3 +- tornado/platform/common.py | 88 +++++++++++++++++++++++++++++++++++++ tornado/platform/windows.py | 78 -------------------------------- 3 files changed, 90 insertions(+), 79 deletions(-) create mode 100644 tornado/platform/common.py diff --git a/tornado/platform/auto.py b/tornado/platform/auto.py index 68cf2d21..7bfec116 100644 --- a/tornado/platform/auto.py +++ b/tornado/platform/auto.py @@ -28,6 +28,7 @@ from __future__ import absolute_import, division, with_statement import os if os.name == 'nt': - from tornado.platform.windows import set_close_exec, Waker + from tornado.platform.common import Waker + from tornado.platform.windows import set_close_exec else: from tornado.platform.posix import set_close_exec, Waker diff --git a/tornado/platform/common.py b/tornado/platform/common.py new file mode 100644 index 00000000..e1eafc2d --- /dev/null +++ b/tornado/platform/common.py @@ -0,0 +1,88 @@ +"""Lowest-common-denominator implementations of platform functionality.""" +from __future__ import absolute_import, division, with_statement + +import errno +import socket + +from tornado.platform import interface +from tornado.util import b + +class Waker(interface.Waker): + """Create an OS independent asynchronous pipe. + + For use on platforms that don't have os.pipe() (or where pipes cannot + be passed to select()), but do have sockets. This includes Windows + and Jython. + """ + def __init__(self): + # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py + + self.writer = socket.socket() + # Disable buffering -- pulling the trigger sends 1 byte, + # and we want that sent immediately, to wake up ASAP. + self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + count = 0 + while 1: + count += 1 + # Bind to a local port; for efficiency, let the OS pick + # a free port for us. + # Unfortunately, stress tests showed that we may not + # be able to connect to that port ("Address already in + # use") despite that the OS picked it. This appears + # to be a race bug in the Windows socket implementation. + # So we loop until a connect() succeeds (almost always + # on the first try). See the long thread at + # http://mail.zope.org/pipermail/zope/2005-July/160433.html + # for hideous details. + a = socket.socket() + a.bind(("127.0.0.1", 0)) + a.listen(1) + connect_address = a.getsockname() # assigned (host, port) pair + try: + self.writer.connect(connect_address) + break # success + except socket.error, detail: + if (not hasattr(errno, 'WSAEADDRINUSE') or + detail[0] != errno.WSAEADDRINUSE): + # "Address already in use" is the only error + # I've seen on two WinXP Pro SP2 boxes, under + # Pythons 2.3.5 and 2.4.1. + raise + # (10048, 'Address already in use') + # assert count <= 2 # never triggered in Tim's tests + if count >= 10: # I've never seen it go above 2 + a.close() + self.writer.close() + raise socket.error("Cannot bind trigger!") + # Close `a` and try again. Note: I originally put a short + # sleep() here, but it didn't appear to help or hurt. + a.close() + + self.reader, addr = a.accept() + self.reader.setblocking(0) + self.writer.setblocking(0) + a.close() + self.reader_fd = self.reader.fileno() + + def fileno(self): + return self.reader.fileno() + + def wake(self): + try: + self.writer.send(b("x")) + except (IOError, socket.error): + pass + + def consume(self): + try: + while True: + result = self.reader.recv(1024) + if not result: + break + except (IOError, socket.error): + pass + + def close(self): + self.reader.close() + self.writer.close() diff --git a/tornado/platform/windows.py b/tornado/platform/windows.py index a39a83b7..58016bfa 100644 --- a/tornado/platform/windows.py +++ b/tornado/platform/windows.py @@ -5,11 +5,6 @@ from __future__ import absolute_import, division, with_statement import ctypes import ctypes.wintypes -import socket -import errno - -from tornado.platform import interface -from tornado.util import b # See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation @@ -25,76 +20,3 @@ def set_close_exec(fd): raise ctypes.GetLastError() -class Waker(interface.Waker): - """Create an OS independent asynchronous pipe""" - def __init__(self): - # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py - - self.writer = socket.socket() - # Disable buffering -- pulling the trigger sends 1 byte, - # and we want that sent immediately, to wake up ASAP. - self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - count = 0 - while 1: - count += 1 - # Bind to a local port; for efficiency, let the OS pick - # a free port for us. - # Unfortunately, stress tests showed that we may not - # be able to connect to that port ("Address already in - # use") despite that the OS picked it. This appears - # to be a race bug in the Windows socket implementation. - # So we loop until a connect() succeeds (almost always - # on the first try). See the long thread at - # http://mail.zope.org/pipermail/zope/2005-July/160433.html - # for hideous details. - a = socket.socket() - a.bind(("127.0.0.1", 0)) - connect_address = a.getsockname() # assigned (host, port) pair - a.listen(1) - try: - self.writer.connect(connect_address) - break # success - except socket.error, detail: - if detail[0] != errno.WSAEADDRINUSE: - # "Address already in use" is the only error - # I've seen on two WinXP Pro SP2 boxes, under - # Pythons 2.3.5 and 2.4.1. - raise - # (10048, 'Address already in use') - # assert count <= 2 # never triggered in Tim's tests - if count >= 10: # I've never seen it go above 2 - a.close() - self.writer.close() - raise socket.error("Cannot bind trigger!") - # Close `a` and try again. Note: I originally put a short - # sleep() here, but it didn't appear to help or hurt. - a.close() - - self.reader, addr = a.accept() - self.reader.setblocking(0) - self.writer.setblocking(0) - a.close() - self.reader_fd = self.reader.fileno() - - def fileno(self): - return self.reader.fileno() - - def wake(self): - try: - self.writer.send(b("x")) - except IOError: - pass - - def consume(self): - try: - while True: - result = self.reader.recv(1024) - if not result: - break - except IOError: - pass - - def close(self): - self.reader.close() - self.writer.close()