######################## % Pipetool related tests ######################## + Basic tests = Test default test case s = PeriodicSource("hello", 1, name="src") d1 = Drain(name="d1") c = ConsoleSink(name="c") tf = TransformDrain(lambda x:"Got %r" % x) t = TermSink(name="t", keepterm=False) s > d1 > c d1 > tf > t p = PipeEngine(s) p.graph(type="png",target="> /tmp/pipe.png") p.start() time.sleep(3) p.stop() = Test add_pipe s = AutoSource() p = PipeEngine(s) p.add(Pipe()) assert len(p.active_pipes) == 2 x = p.spawn_Pipe() assert len(p.active_pipes) == 3 assert isinstance(x, Pipe) = Test exhausted source s = AutoSource() s._gen_data("hello") s.is_exhausted = True d1 = Drain(name="d1") c = ConsoleSink(name="c") s > d1 > c p = PipeEngine(s) p.start() p.wait_and_stop() = Test add_pipe on running instance test_val = None class TestSink(Sink): def push(self, msg): global test_val test_val = msg p = PipeEngine() p.start() s = AutoSource() s._gen_data("hello") s.is_exhausted = True d1 = Drain(name="d1") c = TestSink(name="c") s > d1 > c p.add(s) time.sleep(1) p.wait_and_stop() assert test_val == "hello" = Test Operators s = AutoSource() p = PipeEngine(s) assert p == p assert not p < p assert not p > p a = AutoSource() b = AutoSource() a >> b assert len(a.high_sinks) == 1 assert len(a.high_sources) == 0 assert len(b.high_sinks) == 0 assert len(b.high_sources) == 1 a b a = AutoSource() b = AutoSource() a << b assert len(a.high_sinks) == 0 assert len(a.high_sources) == 1 assert len(b.high_sinks) == 1 assert len(b.high_sources) == 0 a b a = AutoSource() b = AutoSource() a == b assert len(a.sinks) == 1 assert len(a.sources) == 1 assert len(b.sinks) == 1 assert len(b.sources) == 1 a = AutoSource() b = AutoSource() a//b assert len(a.high_sinks) == 1 assert len(a.high_sources) == 1 assert len(b.high_sinks) == 1 assert len(b.high_sources) == 1 a = AutoSource() b = AutoSource() a^b assert len(b.trigger_sources) == 1 assert len(a.trigger_sinks) == 1 = Test doc s = AutoSource() p = PipeEngine(s) p.list_pipes() p.list_pipes_detailed() = Test RawConsoleSink with CLIFeeder p = PipeEngine() s = CLIFeeder() s.send("hello") s.is_exhausted = True r, w = os.pipe() d1 = Drain(name="d1") c = RawConsoleSink(name="c") c._write_pipe = w s > d1 > c p.add(s) p.start() assert os.read(r, 20) == "hello\n" p.wait_and_stop() = Test QueueSink with CLIFeeder p = PipeEngine() s = CLIFeeder() s.send("hello") s.is_exhausted = True d1 = Drain(name="d1") c = QueueSink(name="c") s > d1 > c p.add(s) p.start() p.wait_and_stop() assert c.recv() == "hello" = Test UpDrain test_val = None class TestSink(Sink): def high_push(self, msg): global test_val test_val = msg p = PipeEngine() s = CLIFeeder() s.send("hello") s.is_exhausted = True d1 = UpDrain(name="d1") c = TestSink(name="c") s > d1 d1 >> c p.add(s) p.start() p.wait_and_stop() assert test_val == "hello" = Test DownDrain test_val = None class TestSink(Sink): def push(self, msg): global test_val test_val = msg p = PipeEngine() s = CLIHighFeeder() s.send("hello") s.is_exhausted = True d1 = DownDrain(name="d1") c = TestSink(name="c") s >> d1 d1 > c p.add(s) p.start() p.wait_and_stop() assert test_val == "hello"