From 0bfed5c5783d9e883abd6055079c833340052616 Mon Sep 17 00:00:00 2001 From: gpotter2 Date: Mon, 5 Jun 2017 02:05:07 +0200 Subject: [PATCH] Use passive form to select.select --- scapy/automaton.py | 144 +++++++++++++++++++++++++++++++++------------ scapy/pipetool.py | 47 ++++++++------- test/pipetool.uts | 1 - 3 files changed, 134 insertions(+), 58 deletions(-) diff --git a/scapy/automaton.py b/scapy/automaton.py index 5cf02f56f..0ece6e611 100644 --- a/scapy/automaton.py +++ b/scapy/automaton.py @@ -19,17 +19,119 @@ from scapy.data import MTU from scapy.supersocket import SuperSocket from scapy.consts import WINDOWS -class ObjectPipe: +class SelectableObject: + trigger = threading.Lock() + was_ended = False + def check_recv(self): + """DEV: will be called only once (at beggining) to check if the object is ready.""" + return False + + def _wait_non_ressources(self, callback): + self.call_release() + self.trigger.acquire() + self.trigger.acquire() + if not self.was_ended: + callback(self) + + def wait_return(self, callback): + if self.check_recv(): + return callback(self) + threading.Thread(target=self._wait_non_ressources, args=(callback,)).start() + + def call_release(self, arborted=False): + """DEV: Must be call when the object is ready to read.""" + self.was_ended = arborted + try: + self.trigger.release() + except: + pass + +class SelectableSelector(object): + """ + Select SelectableObject objects. + + inputs: objects to process + remain: timeout. If 0, return []. + customTypes: types of the objects that have the checkRecv function. + """ + results = None + inputs = None + available_lock = None + _ended = False + def _release_all(self): + for i in self.inputs: + i.call_release(True) + self.available_lock.release() + + def _timeout_thread(self, remain): + time.sleep(remain) + if not self._ended: + self._ended = True + self._release_all() + + def _exit_door(self,_input): + self.results.append(_input) + if self._ended: + return + self._ended = True + self._release_all() + + def __init__(self, inputs, remain): + self.results = [] + self.inputs = list(inputs) + self.remain = remain + self.available_lock = threading.Lock() + self.available_lock.acquire() + self._ended = False + + def process(self): + if WINDOWS: + if not self.remain: + for i in self.inputs: + if not isinstance(i, SelectableObject): + warning("Unknown ignored object type: " + type(i)) + else: + if i.check_recv(): + self.results.append(i) + return self.results + + for i in self.inputs: + if not isinstance(i, SelectableObject): + warning("Unknown ignored object type: " + type(i)) + else: + i.wait_return(self._exit_door) + + threading.Thread(target=self._timeout_thread, args=(self.remain,)).start() + if not self._ended: + self.available_lock.acquire() + return self.results + else: + r,_,_ = select(self.inputs,[],[],self.remain) + return r + +def select_objects(inputs, remain): + """ + Select SelectableObject objects. + + inputs: objects to process + remain: timeout. If 0, return []. + customTypes: types of the objects that have the checkRecv function. + """ + handler = SelectableSelector(inputs, remain) + return handler.process() + +class ObjectPipe(SelectableObject): def __init__(self): self.rd,self.wr = os.pipe() self.queue = deque() def fileno(self): return self.rd - def checkRecv(self): + def check_recv(self): return len(self.queue) > 0 def send(self, obj): self.queue.append(obj) os.write(self.wr,"X") + self.call_release() def write(self, obj): self.send(obj) def recv(self, n=0): @@ -323,37 +425,6 @@ class Automaton_metaclass(type): s += '\t"%s" -> "%s" [label="%s",color=blue];\n' % (k,n,l) s += "}\n" return do_graph(s, **kargs) - -def select_objects(inputs, remain, customTypes=()): - """ - Select object that have checkRecv function. - inputs: objects to process - remain: timeout. If 0, return []. - customTypes: types of the objects that have the checkRecv function. - """ - if WINDOWS: - r = [] - def look_for_select(): - for fd in list(inputs): - if isinstance(fd, (ObjectPipe, Automaton._IO_fdwrapper) + customTypes): - if fd.checkRecv(): - r.append(fd) - else: - raise OSError("Not supported type of socket:" + str(type(fd))) - break - def search_select(): - while len(r) == 0: - look_for_select() - if remain == 0: - look_for_select() - return r - t_select = threading.Thread(target=search_select) - t_select.start() - t_select.join(remain) - return r - else: - r,_,_ = select(inputs,[],[],remain) - return r class Automaton: __metaclass__ = Automaton_metaclass @@ -372,7 +443,7 @@ class Automaton: ## Utility classes and exceptions - class _IO_fdwrapper: + class _IO_fdwrapper(SelectableObject): def __init__(self,rd,wr): if WINDOWS: # rd will be used for reading and sending @@ -389,7 +460,7 @@ class Automaton: self.wr = wr def fileno(self): return self.rd - def checkRecv(self): + def check_recv(self): return self.rd.checkRecv() def read(self, n=65535): if WINDOWS: @@ -397,7 +468,8 @@ class Automaton: return os.read(self.rd, n) def write(self, msg): if WINDOWS: - return self.rd.send(msg) + self.rd.send(msg) + return self.call_release() return os.write(self.wr,msg) def recv(self, n=65535): return self.read(n) diff --git a/scapy/pipetool.py b/scapy/pipetool.py index 5f86d73d6..e4fa2f2e6 100644 --- a/scapy/pipetool.py +++ b/scapy/pipetool.py @@ -13,7 +13,7 @@ import time import Queue from threading import Lock, Thread -from scapy.automaton import Message, select_objects +from scapy.automaton import Message, select_objects, SelectableObject from scapy.consts import WINDOWS from scapy.error import log_interactive, warning from scapy.config import conf @@ -21,7 +21,7 @@ from scapy.utils import get_temp_file, do_graph import scapy.arch -class PipeEngine: +class PipeEngine(SelectableObject): pipes = {} @classmethod def list_pipes(cls): @@ -46,9 +46,9 @@ class PipeEngine: self._add_pipes(*pipes) self.thread_lock = Lock() self.command_lock = Lock() - self.__fd_queue = [] + self.__fd_queue = collections.deque() self.__fdr,self.__fdw = os.pipe() - self.threadid = None + self.thread = None def __getattr__(self, attr): if attr.startswith("spawn_"): dname = attr[6:] @@ -61,7 +61,7 @@ class PipeEngine: return f raise AttributeError(attr) - def checkRecv(self): + def check_recv(self): """As select.select is not available, we check if there is some data to read by using a list that stores pointers.""" return len(self.__fd_queue) > 0 @@ -70,12 +70,13 @@ class PipeEngine: return self.__fdr def _read_cmd(self): - self.__fd_queue.pop() - return os.read(self.__fdr,1) + os.read(self.__fdr,1) + return self.__fd_queue.popleft() def _write_cmd(self, _cmd): - os.write(self.__fdw, _cmd) - self.__fd_queue.append("X") + self.__fd_queue.append(_cmd) + os.write(self.__fdw, "X") + self.call_release() def add_one_pipe(self, pipe): self.active_pipes.add(pipe) @@ -117,7 +118,7 @@ class PipeEngine: RUN=True STOP_IF_EXHAUSTED = False while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1): - fds = select_objects(sources, 2, customTypes=(AutoSource, PipeEngine)) + fds = select_objects(sources, 2) for fd in fds: if fd is self: cmd = self._read_cmd() @@ -154,7 +155,7 @@ class PipeEngine: if self.thread_lock.acquire(0): _t = Thread(target=self.run) _t.start() - self.threadid = _t.ident + self.thread = _t else: warning("Pipe engine already running") def wait_and_stop(self): @@ -162,11 +163,13 @@ class PipeEngine: def stop(self, _cmd="X"): try: with self.command_lock: - if self.threadid is not None: + if self.thread is not None: self._write_cmd(_cmd) - while not self.thread_lock.acquire(0): - time.sleep(0.01) # interruptible wait for thread to terminate - self.thread_lock.release() # (not using .join() because it needs 'threading' module) + self.thread.join() + try: + self.thread_lock.release() + except: + pass else: warning("Pipe engine thread not running") except KeyboardInterrupt: @@ -175,7 +178,7 @@ class PipeEngine: def add(self, *pipes): pipes = self._add_pipes(*pipes) with self.command_lock: - if self.threadid is not None: + if self.thread is not None: for p in pipes: p.start() self._write_cmd("A") @@ -305,7 +308,7 @@ class Pipe(_ConnectorLogic): s += ct.punct(">") return s -class Source(Pipe): +class Source(Pipe, SelectableObject): def __init__(self, name=None): Pipe.__init__(self, name=name) self.is_exhausted = False @@ -316,7 +319,7 @@ class Source(Pipe): self._send(msg) def fileno(self): return None - def checkRecv(self): + def check_recv(self): return False def exhausted(self): return self.is_exhausted @@ -353,14 +356,14 @@ class Sink(Pipe): pass -class AutoSource(Source): +class AutoSource(Source, SelectableObject): def __init__(self, name=None): Source.__init__(self, name=name) self.__fdr,self.__fdw = os.pipe() self._queue = collections.deque() def fileno(self): return self.__fdr - def checkRecv(self): + def check_recv(self): return len(self._queue) > 0 def _gen_data(self, msg): self._queue.append((msg,False)) @@ -369,7 +372,7 @@ class AutoSource(Source): self._queue.append((msg,True)) self._wake_up() def _wake_up(self): - os.write(self.__fdw,"x") + os.write(self.__fdw,"X") def deliver(self): os.read(self.__fdr,1) try: @@ -377,6 +380,7 @@ class AutoSource(Source): except IndexError: #empty queue. Exhausted source pass else: + self.call_release() if high: self._high_send(msg) else: @@ -502,6 +506,7 @@ class TermSink(Sink): if not self.opened: self.opened = True self.__f = get_temp_file() + open(self.__f, "a").close() self.name = "Scapy" if self.name is None else self.name # Start a powershell in a new window and print the PID cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\")) diff --git a/test/pipetool.uts b/test/pipetool.uts index 9e0c67c53..06a87ce33 100644 --- a/test/pipetool.uts +++ b/test/pipetool.uts @@ -65,7 +65,6 @@ c = TestSink(name="c") s > d1 > c p.add(s) -time.sleep(1) p.wait_and_stop() assert test_val == "hello"