Added high push tests + drainers

This commit is contained in:
gpotter2 2017-05-13 13:13:37 +02:00
parent 37b8c8237a
commit 6178b48843
2 changed files with 96 additions and 2 deletions

View File

@ -316,6 +316,8 @@ class Source(Pipe):
self._send(msg)
def fileno(self):
return None
def checkRecv(self):
return False
def exhausted(self):
return self.is_exhausted
def start(self):
@ -418,14 +420,15 @@ class RawConsoleSink(Sink):
def __init__(self, name=None, newlines=True):
Sink.__init__(self, name=name)
self.newlines = newlines
self._write_pipe = 1
def push(self, msg):
if self.newlines:
msg += "\n"
os.write(1, str(msg))
os.write(self._write_pipe, str(msg))
def high_push(self, msg):
if self.newlines:
msg += "\n"
os.write(1, str(msg))
os.write(self._write_pipe, str(msg))
class CLIFeeder(AutoSource):
"""Send messages from python command line

View File

@ -126,3 +126,94 @@ s = AutoSource()
p = PipeEngine(s)
p.list_pipes()
p.list_pipes_detailed()
= Test RawConsoleSink with CLIFeeder
p = PipeEngine()
p.start()
s = CLIFeeder()
s.send("hello")
s.is_exhausted = True
r, w = os.pipe()
d1 = Drain(name="d1")
c = RawConsoleSink(name="c")
c._write_pipe = w
s > d1 > c
p.add(s)
time.sleep(1)
p.wait_and_stop()
assert os.read(r, 20) == "hello\n"
= Test QueueSink with CLIFeeder
p = PipeEngine()
p.start()
s = CLIFeeder()
s.send("hello")
s.is_exhausted = True
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c
p.add(s)
time.sleep(1)
p.wait_and_stop()
assert c.recv() == "hello"
= Test UpDrain
test_val = None
class TestSink(Sink):
def high_push(self, msg):
global test_val
test_val = msg
p = PipeEngine()
p.start()
s = CLIFeeder()
s.send("hello")
s.is_exhausted = True
d1 = UpDrain(name="d1")
c = TestSink(name="c")
s > d1
d1 >> c
p.add(s)
time.sleep(1)
p.wait_and_stop()
assert test_val == "hello"
= Test DownDrain
test_val = None
class TestSink(Sink):
def push(self, msg):
global test_val
test_val = msg
p = PipeEngine()
p.start()
s = CLIHighFeeder()
s.send("hello")
s.is_exhausted = True
d1 = DownDrain(name="d1")
c = TestSink(name="c")
s >> d1
d1 > c
p.add(s)
time.sleep(1)
p.wait_and_stop()
assert test_val == "hello"