From 4cf8344d8307c5845d14e84d45b124b895901702 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 16 Sep 2012 19:26:03 -0700 Subject: [PATCH] Add tornado.process.Subprocess --- tornado/iostream.py | 7 ++++++- tornado/process.py | 40 ++++++++++++++++++++++++++++++++++++ tornado/test/process_test.py | 28 +++++++++++++++++++++++-- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/tornado/iostream.py b/tornado/iostream.py index 2f716a72..0ec3bd6f 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -379,7 +379,7 @@ class BaseIOStream(object): """ try: chunk = self.read_from_fd() - except socket.error, e: + except (socket.error, IOError, OSError), e: # ssl.SSLError is a subclass of socket.error gen_log.warning("Read error on %d: %s", self.fileno(), e) @@ -824,6 +824,11 @@ class PipeIOStream(BaseIOStream): except (IOError, OSError), e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return None + elif e.args[0] == errno.EBADF: + # If the writing half of a pipe is closed, select will + # report it as readable but reads will fail with EBADF. + self.close() + return None else: raise if not chunk: diff --git a/tornado/process.py b/tornado/process.py index a2c63b83..3e971119 100644 --- a/tornado/process.py +++ b/tornado/process.py @@ -20,12 +20,14 @@ from __future__ import absolute_import, division, with_statement import errno import os +import subprocess import sys import time from binascii import hexlify from tornado import ioloop +from tornado.iostream import PipeIOStream from tornado.log import gen_log try: @@ -156,3 +158,41 @@ def task_id(): """ global _task_id return _task_id + +class Subprocess(object): + """Wraps ``subprocess.Popen`` with IOStream support. + + The constructor is the same as ``subprocess.Popen`` with the following + additions: + + * ``stdin``, ``stdout``, and ``stderr`` may have the value + `tornado.process.Subprocess.STREAM`, which will make the corresponding + attribute of the resulting Subprocess a `PipeIOStream`. + * A new keyword argument ``io_loop`` may be used to pass in an IOLoop. + """ + STREAM = object() + + def __init__(self, *args, **kwargs): + io_loop = kwargs.pop('io_loop', None) + to_close = [] + if kwargs.get('stdin') is Subprocess.STREAM: + in_r, in_w = os.pipe() + kwargs['stdin'] = in_r + to_close.append(in_r) + self.stdin = PipeIOStream(in_w, io_loop=io_loop) + if kwargs.get('stdout') is Subprocess.STREAM: + out_r, out_w = os.pipe() + kwargs['stdout'] = out_w + to_close.append(out_w) + self.stdout = PipeIOStream(out_r, io_loop=io_loop) + if kwargs.get('stderr') is Subprocess.STREAM: + err_r, err_w = os.pipe() + kwargs['stderr'] = err_w + to_close.append(err_w) + self.stdout = PipeIOStream(err_r, io_loop=io_loop) + self.proc = subprocess.Popen(*args, **kwargs) + for fd in to_close: + os.close(fd) + for attr in ['stdin', 'stdout', 'stderr', 'pid']: + if not hasattr(self, attr): # don't clobber streams set above + setattr(self, attr, getattr(self.proc, attr)) diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py index d076cac4..18aeea33 100644 --- a/tornado/test/process_test.py +++ b/tornado/test/process_test.py @@ -5,15 +5,17 @@ from __future__ import absolute_import, division, with_statement import logging import os import signal +import subprocess import sys from tornado.httpclient import HTTPClient, HTTPError from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop from tornado.log import gen_log -from tornado.process import fork_processes, task_id +from tornado.process import fork_processes, task_id, Subprocess from tornado.simple_httpclient import SimpleAsyncHTTPClient -from tornado.testing import bind_unused_port, ExpectLog +from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase from tornado.test.util import unittest +from tornado.util import b from tornado.web import RequestHandler, Application # Not using AsyncHTTPTestCase because we need control over the IOLoop. @@ -120,3 +122,25 @@ class ProcessTest(unittest.TestCase): raise ProcessTest = unittest.skipIf(os.name != 'posix' or sys.platform == 'cygwin', "non-unix platform")(ProcessTest) + + +class SubprocessTest(AsyncTestCase): + def test_subprocess(self): + subproc = Subprocess([sys.executable, '-u', '-i'], + stdin=Subprocess.STREAM, + stdout=Subprocess.STREAM, stderr=subprocess.STDOUT, + io_loop=self.io_loop) + self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM)) + subproc.stdout.read_until(b('>>> '), self.stop) + self.wait() + subproc.stdin.write(b("print('hello')\n")) + subproc.stdout.read_until(b('\n'), self.stop) + data = self.wait() + self.assertEqual(data, b("hello\n")) + + subproc.stdout.read_until(b(">>> "), self.stop) + self.wait() + subproc.stdin.write(b("raise SystemExit\n")) + subproc.stdout.read_until_close(self.stop) + data = self.wait() + self.assertEqual(data, b(""))