Add tornado.process.Subprocess

This commit is contained in:
Ben Darnell 2012-09-16 19:26:03 -07:00
parent 6449189d4e
commit 4cf8344d83
3 changed files with 72 additions and 3 deletions

View File

@ -379,7 +379,7 @@ class BaseIOStream(object):
"""
try:
chunk = self.read_from_fd()
except socket.error, e:
except (socket.error, IOError, OSError), e:
# ssl.SSLError is a subclass of socket.error
gen_log.warning("Read error on %d: %s",
self.fileno(), e)
@ -824,6 +824,11 @@ class PipeIOStream(BaseIOStream):
except (IOError, OSError), e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return None
elif e.args[0] == errno.EBADF:
# If the writing half of a pipe is closed, select will
# report it as readable but reads will fail with EBADF.
self.close()
return None
else:
raise
if not chunk:

View File

@ -20,12 +20,14 @@ from __future__ import absolute_import, division, with_statement
import errno
import os
import subprocess
import sys
import time
from binascii import hexlify
from tornado import ioloop
from tornado.iostream import PipeIOStream
from tornado.log import gen_log
try:
@ -156,3 +158,41 @@ def task_id():
"""
global _task_id
return _task_id
class Subprocess(object):
"""Wraps ``subprocess.Popen`` with IOStream support.
The constructor is the same as ``subprocess.Popen`` with the following
additions:
* ``stdin``, ``stdout``, and ``stderr`` may have the value
`tornado.process.Subprocess.STREAM`, which will make the corresponding
attribute of the resulting Subprocess a `PipeIOStream`.
* A new keyword argument ``io_loop`` may be used to pass in an IOLoop.
"""
STREAM = object()
def __init__(self, *args, **kwargs):
io_loop = kwargs.pop('io_loop', None)
to_close = []
if kwargs.get('stdin') is Subprocess.STREAM:
in_r, in_w = os.pipe()
kwargs['stdin'] = in_r
to_close.append(in_r)
self.stdin = PipeIOStream(in_w, io_loop=io_loop)
if kwargs.get('stdout') is Subprocess.STREAM:
out_r, out_w = os.pipe()
kwargs['stdout'] = out_w
to_close.append(out_w)
self.stdout = PipeIOStream(out_r, io_loop=io_loop)
if kwargs.get('stderr') is Subprocess.STREAM:
err_r, err_w = os.pipe()
kwargs['stderr'] = err_w
to_close.append(err_w)
self.stdout = PipeIOStream(err_r, io_loop=io_loop)
self.proc = subprocess.Popen(*args, **kwargs)
for fd in to_close:
os.close(fd)
for attr in ['stdin', 'stdout', 'stderr', 'pid']:
if not hasattr(self, attr): # don't clobber streams set above
setattr(self, attr, getattr(self.proc, attr))

View File

@ -5,15 +5,17 @@ from __future__ import absolute_import, division, with_statement
import logging
import os
import signal
import subprocess
import sys
from tornado.httpclient import HTTPClient, HTTPError
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.log import gen_log
from tornado.process import fork_processes, task_id
from tornado.process import fork_processes, task_id, Subprocess
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.testing import bind_unused_port, ExpectLog
from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase
from tornado.test.util import unittest
from tornado.util import b
from tornado.web import RequestHandler, Application
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
@ -120,3 +122,25 @@ class ProcessTest(unittest.TestCase):
raise
ProcessTest = unittest.skipIf(os.name != 'posix' or sys.platform == 'cygwin',
"non-unix platform")(ProcessTest)
class SubprocessTest(AsyncTestCase):
def test_subprocess(self):
subproc = Subprocess([sys.executable, '-u', '-i'],
stdin=Subprocess.STREAM,
stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
io_loop=self.io_loop)
self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
subproc.stdout.read_until(b('>>> '), self.stop)
self.wait()
subproc.stdin.write(b("print('hello')\n"))
subproc.stdout.read_until(b('\n'), self.stop)
data = self.wait()
self.assertEqual(data, b("hello\n"))
subproc.stdout.read_until(b(">>> "), self.stop)
self.wait()
subproc.stdin.write(b("raise SystemExit\n"))
subproc.stdout.read_until_close(self.stop)
data = self.wait()
self.assertEqual(data, b(""))