core: Make iter_read() handle deadline (and non-blocking IO) properly
This commit is contained in:
parent
25adb7416a
commit
16950a1620
|
@ -167,27 +167,22 @@ def write_all(fd, s):
|
|||
return written
|
||||
|
||||
|
||||
def read_with_deadline(fd, size, deadline):
|
||||
timeout = deadline - time.time()
|
||||
if timeout > 0:
|
||||
rfds, _, _ = select.select([fd], [], [], timeout)
|
||||
if rfds:
|
||||
return os.read(fd, size)
|
||||
|
||||
raise mitogen.core.TimeoutError('read timed out')
|
||||
|
||||
|
||||
def iter_read(fd, deadline):
|
||||
if deadline is not None:
|
||||
LOG.debug('Warning: iter_read(.., deadline=...) unimplemented')
|
||||
|
||||
def iter_read(fd, deadline=None):
|
||||
bits = []
|
||||
timeout = None
|
||||
|
||||
while True:
|
||||
if deadline is not None:
|
||||
timeout = max(0, deadline - time.time())
|
||||
if timeout == 0:
|
||||
break
|
||||
|
||||
rfds, _, _ = select.select([fd], [], [], timeout)
|
||||
if not rfds:
|
||||
continue
|
||||
|
||||
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
|
||||
if disconnected:
|
||||
s = ''
|
||||
|
||||
if not s:
|
||||
raise mitogen.core.StreamError(
|
||||
'EOF on stream; last 300 bytes received: %r' %
|
||||
(''.join(bits)[-300:],)
|
||||
|
@ -196,6 +191,8 @@ def iter_read(fd, deadline):
|
|||
bits.append(s)
|
||||
yield s
|
||||
|
||||
raise mitogen.core.TimeoutError('read timed out')
|
||||
|
||||
|
||||
def discard_until(fd, s, deadline):
|
||||
for buf in iter_read(fd, deadline):
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
#!/bin/bash
|
||||
# I produce text every 100ms, for testing mitogen.core.iter_read()
|
||||
|
||||
i=0
|
||||
|
||||
while :; do
|
||||
i=$(($i + 1))
|
||||
echo "$i"
|
||||
sleep 0.1
|
||||
done
|
|
@ -0,0 +1,56 @@
|
|||
|
||||
import subprocess
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import testlib
|
||||
import mitogen.master
|
||||
|
||||
|
||||
class IterReadTest(unittest.TestCase):
|
||||
func = staticmethod(mitogen.master.iter_read)
|
||||
|
||||
def make_proc(self):
|
||||
args = [testlib.data_path('iter_read_generator.sh')]
|
||||
return subprocess.Popen(args, stdout=subprocess.PIPE)
|
||||
|
||||
def test_no_deadline(self):
|
||||
proc = self.make_proc()
|
||||
try:
|
||||
reader = self.func(proc.stdout.fileno())
|
||||
for i, chunk in enumerate(reader, 1):
|
||||
assert i == int(chunk)
|
||||
if i > 3:
|
||||
break
|
||||
finally:
|
||||
proc.terminate()
|
||||
|
||||
def test_deadline_exceeded_before_call(self):
|
||||
proc = self.make_proc()
|
||||
reader = self.func(proc.stdout.fileno(), 0)
|
||||
try:
|
||||
got = []
|
||||
try:
|
||||
for chunk in reader:
|
||||
got.append(chunk)
|
||||
assert 0, 'TimeoutError not raised'
|
||||
except mitogen.core.TimeoutError:
|
||||
assert len(got) == 0
|
||||
finally:
|
||||
proc.terminate()
|
||||
|
||||
def test_deadline_exceeded_during_call(self):
|
||||
proc = self.make_proc()
|
||||
reader = self.func(proc.stdout.fileno(), time.time() + 0.4)
|
||||
try:
|
||||
got = []
|
||||
try:
|
||||
for chunk in reader:
|
||||
got.append(chunk)
|
||||
assert 0, 'TimeoutError not raised'
|
||||
except mitogen.core.TimeoutError:
|
||||
# Give a little wiggle room in case of imperfect scheduling.
|
||||
# Ideal number should be 9.
|
||||
assert 3 < len(got) < 5
|
||||
finally:
|
||||
proc.terminate()
|
Loading…
Reference in New Issue