Move `TCPServer` to `netutil`, change `handle_stream` callback to overridden method hook, move IP address hack for unix domain sockets to `HTTPConnection`
This commit is contained in:
parent
dfda3a555f
commit
437c477ac6
|
@ -35,7 +35,7 @@ from tornado.escape import utf8, native_str, parse_qs_bytes
|
|||
from tornado import httputil
|
||||
from tornado import ioloop
|
||||
from tornado import iostream
|
||||
from tornado import netutil
|
||||
from tornado.netutil import TCPServer
|
||||
from tornado import process
|
||||
from tornado import stack_context
|
||||
from tornado.util import b, bytes_type
|
||||
|
@ -45,188 +45,6 @@ try:
|
|||
except ImportError:
|
||||
ssl = None
|
||||
|
||||
|
||||
class TCPServer(object):
|
||||
r"""A non-blocking, single-threaded TCP server.
|
||||
|
||||
`TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL.
|
||||
To make this server serve SSL traffic, send the ssl_options dictionary
|
||||
argument with the arguments required for the `ssl.wrap_socket` method,
|
||||
including "certfile" and "keyfile"::
|
||||
|
||||
TCPServer(applicaton, ssl_options={
|
||||
"certfile": os.path.join(data_dir, "mydomain.crt"),
|
||||
"keyfile": os.path.join(data_dir, "mydomain.key"),
|
||||
})
|
||||
|
||||
`TCPServer` initialization follows one of three patterns:
|
||||
|
||||
1. `listen`: simple single-process::
|
||||
|
||||
server = TCPServer(app)
|
||||
server.listen(8888)
|
||||
IOLoop.instance().start()
|
||||
|
||||
2. `bind`/`start`: simple multi-process::
|
||||
|
||||
server = TCPServer(app)
|
||||
server.bind(8888)
|
||||
server.start(0) # Forks multiple sub-processes
|
||||
IOLoop.instance().start()
|
||||
|
||||
When using this interface, an `IOLoop` must *not* be passed
|
||||
to the `TCPServer` constructor. `start` will always start
|
||||
the server on the default singleton `IOLoop`.
|
||||
|
||||
3. `add_sockets`: advanced multi-process::
|
||||
|
||||
sockets = tornado.netutil.bind_sockets(8888)
|
||||
tornado.process.fork_processes(0)
|
||||
server = TCPServer(app)
|
||||
server.add_sockets(sockets)
|
||||
IOLoop.instance().start()
|
||||
|
||||
The `add_sockets` interface is more complicated, but it can be
|
||||
used with `tornado.process.fork_processes` to give you more
|
||||
flexibility in when the fork happens. `add_sockets` can
|
||||
also be used in single-process servers if you want to create
|
||||
your listening sockets in some way other than
|
||||
`tornado.netutil.bind_sockets`.
|
||||
"""
|
||||
def __init__(self, handle_stream, io_loop=None, ssl_options=None):
|
||||
self.handle_stream = handle_stream
|
||||
self.io_loop = io_loop
|
||||
self.ssl_options = ssl_options
|
||||
self._sockets = {} # fd -> socket object
|
||||
self._pending_sockets = []
|
||||
self._started = False
|
||||
|
||||
def listen(self, port, address=""):
|
||||
"""Starts accepting connections on the given port.
|
||||
|
||||
This method may be called more than once to listen on multiple ports.
|
||||
`listen` takes effect immediately; it is not necessary to call
|
||||
`TCPServer.start` afterwards. It is, however, necessary to start
|
||||
the `IOLoop`.
|
||||
"""
|
||||
sockets = netutil.bind_sockets(port, address=address)
|
||||
self.add_sockets(sockets)
|
||||
|
||||
def add_sockets(self, sockets):
|
||||
"""Makes this server start accepting connections on the given sockets.
|
||||
|
||||
The ``sockets`` parameter is a list of socket objects such as
|
||||
those returned by `tornado.netutil.bind_sockets`.
|
||||
`add_sockets` is typically used in combination with that
|
||||
method and `tornado.process.fork_processes` to provide greater
|
||||
control over the initialization of a multi-process server.
|
||||
"""
|
||||
if self.io_loop is None:
|
||||
self.io_loop = ioloop.IOLoop.instance()
|
||||
|
||||
for sock in sockets:
|
||||
self._sockets[sock.fileno()] = sock
|
||||
netutil.add_accept_handler(sock, self._handle_connection,
|
||||
io_loop=self.io_loop)
|
||||
|
||||
def add_socket(self, socket):
|
||||
"""Singular version of `add_sockets`. Takes a single socket object."""
|
||||
self.add_sockets([socket])
|
||||
|
||||
def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128):
|
||||
"""Binds this server to the given port on the given address.
|
||||
|
||||
To start the server, call `start`. If you want to run this server
|
||||
in a single process, you can call `listen` as a shortcut to the
|
||||
sequence of `bind` and `start` calls.
|
||||
|
||||
Address may be either an IP address or hostname. If it's a hostname,
|
||||
the server will listen on all IP addresses associated with the
|
||||
name. Address may be an empty string or None to listen on all
|
||||
available interfaces. Family may be set to either ``socket.AF_INET``
|
||||
or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise
|
||||
both will be used if available.
|
||||
|
||||
The ``backlog`` argument has the same meaning as for
|
||||
`socket.listen`.
|
||||
|
||||
This method may be called multiple times prior to `start` to listen
|
||||
on multiple ports or interfaces.
|
||||
"""
|
||||
sockets = netutil.bind_sockets(port, address=address,
|
||||
family=family, backlog=backlog)
|
||||
if self._started:
|
||||
self.add_sockets(sockets)
|
||||
else:
|
||||
self._pending_sockets.extend(sockets)
|
||||
|
||||
def start(self, num_processes=1):
|
||||
"""Starts this server in the IOLoop.
|
||||
|
||||
By default, we run the server in this process and do not fork any
|
||||
additional child process.
|
||||
|
||||
If num_processes is ``None`` or <= ``0``, we detect the number of cores
|
||||
available on this machine and fork that number of child
|
||||
processes. If num_processes is given and > ```1``, we fork that
|
||||
specific number of sub-processes.
|
||||
|
||||
Since we use processes and not threads, there is no shared memory
|
||||
between any server code.
|
||||
|
||||
Note that multiple processes are not compatible with the autoreload
|
||||
module (or the ``debug=True`` option to `tornado.web.Application`).
|
||||
When using multiple processes, no IOLoops can be created or
|
||||
referenced until after the call to ``TCPServer.start(n)``.
|
||||
"""
|
||||
assert not self._started
|
||||
self._started = True
|
||||
if num_processes != 1:
|
||||
process.fork_processes(num_processes)
|
||||
sockets = self._pending_sockets
|
||||
self._pending_sockets = []
|
||||
self.add_sockets(sockets)
|
||||
|
||||
def stop(self):
|
||||
"""Stops listening for new connections.
|
||||
|
||||
Requests currently in progress may still continue after the
|
||||
server is stopped.
|
||||
"""
|
||||
for fd, sock in self._sockets.iteritems():
|
||||
self.io_loop.remove_handler(fd)
|
||||
sock.close()
|
||||
|
||||
def _handle_connection(self, connection, address):
|
||||
if self.ssl_options is not None:
|
||||
assert ssl, "Python 2.6+ and OpenSSL required for SSL"
|
||||
try:
|
||||
connection = ssl.wrap_socket(connection,
|
||||
server_side=True,
|
||||
do_handshake_on_connect=False,
|
||||
**self.ssl_options)
|
||||
except ssl.SSLError, err:
|
||||
if err.args[0] == ssl.SSL_ERROR_EOF:
|
||||
return connection.close()
|
||||
else:
|
||||
raise
|
||||
except socket.error, err:
|
||||
if err.args[0] == errno.ECONNABORTED:
|
||||
return connection.close()
|
||||
else:
|
||||
raise
|
||||
try:
|
||||
if self.ssl_options is not None:
|
||||
stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
|
||||
else:
|
||||
stream = iostream.IOStream(connection, io_loop=self.io_loop)
|
||||
if connection.family not in (socket.AF_INET, socket.AF_INET6):
|
||||
# Unix (or other) socket; fake the remote address
|
||||
address = ('0.0.0.0', 0)
|
||||
self.handle_stream(stream, address)
|
||||
except Exception:
|
||||
logging.error("Error in connection callback", exc_info=True)
|
||||
|
||||
class HTTPServer(TCPServer):
|
||||
"""
|
||||
A server is defined by a request callback that takes an HTTPRequest
|
||||
|
@ -273,9 +91,9 @@ class HTTPServer(TCPServer):
|
|||
self.request_callback = request_callback
|
||||
self.no_keep_alive = no_keep_alive
|
||||
self.xheaders = xheaders
|
||||
TCPServer.__init__(self, self._handle_stream, **kwargs)
|
||||
TCPServer.__init__(self, **kwargs)
|
||||
|
||||
def _handle_stream(self, stream, address):
|
||||
def handle_stream(self, stream, address):
|
||||
HTTPConnection(stream, address, self.request_callback,
|
||||
self.no_keep_alive, self.xheaders)
|
||||
|
||||
|
@ -292,6 +110,9 @@ class HTTPConnection(object):
|
|||
def __init__(self, stream, address, request_callback, no_keep_alive=False,
|
||||
xheaders=False):
|
||||
self.stream = stream
|
||||
if self.stream.socket.family not in (socket.AF_INET, socket.AF_INET6):
|
||||
# Unix (or other) socket; fake the remote address
|
||||
address = ('0.0.0.0', 0)
|
||||
self.address = address
|
||||
self.request_callback = request_callback
|
||||
self.no_keep_alive = no_keep_alive
|
||||
|
|
|
@ -17,13 +17,203 @@
|
|||
"""Miscellaneous network utility code."""
|
||||
|
||||
import errno
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import stat
|
||||
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.iostream import IOStream, SSLIOStream
|
||||
from tornado.platform.auto import set_close_exec
|
||||
|
||||
try:
|
||||
import ssl # Python 2.6+
|
||||
except ImportError:
|
||||
ssl = None
|
||||
|
||||
class TCPServer(object):
|
||||
r"""A non-blocking, single-threaded server with additional methods for TCP
|
||||
connections.
|
||||
|
||||
`TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL.
|
||||
To make this server serve SSL traffic, send the ssl_options dictionary
|
||||
argument with the arguments required for the `ssl.wrap_socket` method,
|
||||
including "certfile" and "keyfile"::
|
||||
|
||||
TCPServer(applicaton, ssl_options={
|
||||
"certfile": os.path.join(data_dir, "mydomain.crt"),
|
||||
"keyfile": os.path.join(data_dir, "mydomain.key"),
|
||||
})
|
||||
|
||||
`TCPServer` initialization follows one of three patterns:
|
||||
|
||||
1. `listen`: simple single-process::
|
||||
|
||||
server = TCPServer(app)
|
||||
server.listen(8888)
|
||||
IOLoop.instance().start()
|
||||
|
||||
2. `bind`/`start`: simple multi-process::
|
||||
|
||||
server = TCPServer(app)
|
||||
server.bind(8888)
|
||||
server.start(0) # Forks multiple sub-processes
|
||||
IOLoop.instance().start()
|
||||
|
||||
When using this interface, an `IOLoop` must *not* be passed
|
||||
to the `TCPServer` constructor. `start` will always start
|
||||
the server on the default singleton `IOLoop`.
|
||||
|
||||
3. `add_sockets`: advanced multi-process::
|
||||
|
||||
sockets = bind_sockets(8888)
|
||||
tornado.process.fork_processes(0)
|
||||
server = TCPServer(app)
|
||||
server.add_sockets(sockets)
|
||||
IOLoop.instance().start()
|
||||
|
||||
The `add_sockets` interface is more complicated, but it can be
|
||||
used with `tornado.process.fork_processes` to give you more
|
||||
flexibility in when the fork happens. `add_sockets` can
|
||||
also be used in single-process servers if you want to create
|
||||
your listening sockets in some way other than
|
||||
`bind_sockets`.
|
||||
"""
|
||||
def __init__(self, io_loop=None, ssl_options=None):
|
||||
self.io_loop = io_loop
|
||||
self.ssl_options = ssl_options
|
||||
self._sockets = {} # fd -> socket object
|
||||
self._pending_sockets = []
|
||||
self._started = False
|
||||
|
||||
def listen(self, port, address=""):
|
||||
"""Starts accepting connections on the given port.
|
||||
|
||||
This method may be called more than once to listen on multiple ports.
|
||||
`listen` takes effect immediately; it is not necessary to call
|
||||
`TCPServer.start` afterwards. It is, however, necessary to start
|
||||
the `IOLoop`.
|
||||
"""
|
||||
sockets = bind_sockets(port, address=address)
|
||||
self.add_sockets(sockets)
|
||||
|
||||
def add_sockets(self, sockets):
|
||||
"""Makes this server start accepting connections on the given sockets.
|
||||
|
||||
The ``sockets`` parameter is a list of socket objects such as
|
||||
those returned by `bind_sockets`.
|
||||
`add_sockets` is typically used in combination with that
|
||||
method and `tornado.process.fork_processes` to provide greater
|
||||
control over the initialization of a multi-process server.
|
||||
"""
|
||||
if self.io_loop is None:
|
||||
self.io_loop = IOLoop.instance()
|
||||
|
||||
for sock in sockets:
|
||||
self._sockets[sock.fileno()] = sock
|
||||
add_accept_handler(sock, self._handle_connection,
|
||||
io_loop=self.io_loop)
|
||||
|
||||
def add_socket(self, socket):
|
||||
"""Singular version of `add_sockets`. Takes a single socket object."""
|
||||
self.add_sockets([socket])
|
||||
|
||||
def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128):
|
||||
"""Binds this server to the given port on the given address.
|
||||
|
||||
To start the server, call `start`. If you want to run this server
|
||||
in a single process, you can call `listen` as a shortcut to the
|
||||
sequence of `bind` and `start` calls.
|
||||
|
||||
Address may be either an IP address or hostname. If it's a hostname,
|
||||
the server will listen on all IP addresses associated with the
|
||||
name. Address may be an empty string or None to listen on all
|
||||
available interfaces. Family may be set to either ``socket.AF_INET``
|
||||
or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise
|
||||
both will be used if available.
|
||||
|
||||
The ``backlog`` argument has the same meaning as for
|
||||
`socket.listen`.
|
||||
|
||||
This method may be called multiple times prior to `start` to listen
|
||||
on multiple ports or interfaces.
|
||||
"""
|
||||
sockets = bind_sockets(port, address=address, family=family,
|
||||
backlog=backlog)
|
||||
if self._started:
|
||||
self.add_sockets(sockets)
|
||||
else:
|
||||
self._pending_sockets.extend(sockets)
|
||||
|
||||
def start(self, num_processes=1):
|
||||
"""Starts this server in the IOLoop.
|
||||
|
||||
By default, we run the server in this process and do not fork any
|
||||
additional child process.
|
||||
|
||||
If num_processes is ``None`` or <= 0, we detect the number of cores
|
||||
available on this machine and fork that number of child
|
||||
processes. If num_processes is given and > 1, we fork that
|
||||
specific number of sub-processes.
|
||||
|
||||
Since we use processes and not threads, there is no shared memory
|
||||
between any server code.
|
||||
|
||||
Note that multiple processes are not compatible with the autoreload
|
||||
module (or the ``debug=True`` option to `tornado.web.Application`).
|
||||
When using multiple processes, no IOLoops can be created or
|
||||
referenced until after the call to ``TCPServer.start(n)``.
|
||||
"""
|
||||
assert not self._started
|
||||
self._started = True
|
||||
if num_processes != 1:
|
||||
process.fork_processes(num_processes)
|
||||
sockets = self._pending_sockets
|
||||
self._pending_sockets = []
|
||||
self.add_sockets(sockets)
|
||||
|
||||
def stop(self):
|
||||
"""Stops listening for new connections.
|
||||
|
||||
Requests currently in progress may still continue after the
|
||||
server is stopped.
|
||||
"""
|
||||
for fd, sock in self._sockets.iteritems():
|
||||
self.io_loop.remove_handler(fd)
|
||||
sock.close()
|
||||
|
||||
def handle_stream(self, stream, address):
|
||||
"""Override to handle a new `IOStream` from an incoming connection."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _handle_connection(self, connection, address):
|
||||
if self.ssl_options is not None:
|
||||
assert ssl, "Python 2.6+ and OpenSSL required for SSL"
|
||||
try:
|
||||
connection = ssl.wrap_socket(connection,
|
||||
server_side=True,
|
||||
do_handshake_on_connect=False,
|
||||
**self.ssl_options)
|
||||
except ssl.SSLError, err:
|
||||
if err.args[0] == ssl.SSL_ERROR_EOF:
|
||||
return connection.close()
|
||||
else:
|
||||
raise
|
||||
except socket.error, err:
|
||||
if err.args[0] == errno.ECONNABORTED:
|
||||
return connection.close()
|
||||
else:
|
||||
raise
|
||||
try:
|
||||
if self.ssl_options is not None:
|
||||
stream = SSLIOStream(connection, io_loop=self.io_loop)
|
||||
else:
|
||||
stream = IOStream(connection, io_loop=self.io_loop)
|
||||
self.handle_stream(stream, address)
|
||||
except Exception:
|
||||
logging.error("Error in connection callback", exc_info=True)
|
||||
|
||||
|
||||
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128):
|
||||
"""Creates listening sockets bound to the given port and address.
|
||||
|
||||
|
|
Loading…
Reference in New Issue