From f927701e74a3b6a22694a0d55e918febbeca9e98 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 30 Apr 2015 08:03:26 +1200 Subject: [PATCH] Websocket frame read limit. --- libpathod/language.py | 13 +++++++-- libpathod/pathoc.py | 64 ++++++++++++++++++++++++++++++++++++++----- test/test_language.py | 1 - test/test_pathod.py | 4 +-- test/tutils.py | 15 ++++++++-- 5 files changed, 82 insertions(+), 15 deletions(-) diff --git a/libpathod/language.py b/libpathod/language.py index d4c5b880e..0fd418a55 100644 --- a/libpathod/language.py +++ b/libpathod/language.py @@ -1172,9 +1172,16 @@ class WebsocketFrame(_Message): return resp def values(self, settings): - vals = [ - websockets.FrameHeader().to_bytes() - ] + vals = [] + if self.body: + length = len(self.body.value.get_generator(settings)) + else: + length = 0 + frame = websockets.FrameHeader( + mask = True, + payload_length = length + ) + vals = [frame.to_bytes()] if self.body: vals.append(self.body.value.get_generator(settings)) return vals diff --git a/libpathod/pathoc.py b/libpathod/pathoc.py index 89a8280bb..aee28c37d 100644 --- a/libpathod/pathoc.py +++ b/libpathod/pathoc.py @@ -1,7 +1,9 @@ import sys import os import hashlib +import Queue import random +import select import time import threading @@ -77,14 +79,28 @@ class Response: class WebsocketFrameReader(threading.Thread): - def __init__(self, rfile, callback): + def __init__(self, rfile, callback, ws_read_limit): threading.Thread.__init__(self) + self.ws_read_limit = ws_read_limit self.rfile, self.callback = rfile, callback - self.daemon = True + self.terminate = Queue.Queue() + self.is_done = Queue.Queue() def run(self): while 1: - print websockets.Frame.from_file(self.rfile) + if self.ws_read_limit == 0: + break + r, _, _ = select.select([self.rfile], [], [], 0.05) + try: + self.terminate.get_nowait() + break + except Queue.Empty: + pass + for rfile in r: + print websockets.Frame.from_file(self.rfile).human_readable() + if self.ws_read_limit is not None: + self.ws_read_limit -= 1 + self.is_done.put(None) class Pathoc(tcp.TCPClient): @@ -99,6 +115,9 @@ class Pathoc(tcp.TCPClient): clientcert=None, ciphers=None, + # Websockets + ws_read_limit = None, + # Output control showreq = False, showresp = False, @@ -131,6 +150,8 @@ class Pathoc(tcp.TCPClient): self.ciphers = ciphers self.sslinfo = None + self.ws_read_limit = ws_read_limit + self.showreq = showreq self.showresp = showresp self.explain = explain @@ -140,6 +161,8 @@ class Pathoc(tcp.TCPClient): self.showsummary = showsummary self.fp = fp + self.ws_framereader = None + def http_connect(self, connect_to): self.wfile.write( 'CONNECT %s:%s HTTP/1.1\r\n'%tuple(connect_to) + @@ -196,6 +219,19 @@ class Pathoc(tcp.TCPClient): print >> fp, "%s (unprintables escaped):"%header print >> fp, netlib.utils.cleanBin(data) + def stop(self): + self.ws_framereader.terminate.put(None) + + def wait(self): + if self.ws_framereader: + while 1: + try: + self.ws_framereader.is_done.get(timeout=0.05) + self.ws_framereader.join() + return + except Queue.Empty: + pass + def websocket_get_frame(self, frame): """ Called when a frame is received from the server. @@ -230,21 +266,30 @@ class Pathoc(tcp.TCPClient): print >> self.fp, ">> Spec:", r.spec() if self.showreq: self._show( - self.fp, ">> Request", + self.fp, ">> Websocket Frame", self.wfile.get_log(), self.hexdump ) - def websocket_start(self, r, callback=None): + def websocket_start(self, r, callback=None, limit=None): """ Performs an HTTP request, and attempts to drop into websocket connection. + + callback: A callback called within the websocket thread for every + server frame. + limit: Disconnect after receiving N server frames. """ resp = self.http(r) if resp.status_code == 101: if self.showsummary: - print >> self.fp, "Websocket connection established..." - WebsocketFrameReader(self.rfile, self.websocket_get_frame).start() + print >> self.fp, "<< websocket connection established..." + self.ws_framereader = WebsocketFrameReader( + self.rfile, + self.websocket_get_frame, + self.ws_read_limit + ) + self.ws_framereader.start() return resp def http(self, r): @@ -340,6 +385,7 @@ class Pathoc(tcp.TCPClient): def main(args): # pragma: nocover memo = set([]) trycount = 0 + p = None try: cnt = 0 while 1: @@ -406,5 +452,9 @@ def main(args): # pragma: nocover return except (http.HttpError, tcp.NetLibError), v: pass + p.wait() except KeyboardInterrupt: pass + if p: + p.stop() + p.wait() diff --git a/test/test_language.py b/test/test_language.py index 0fb8479d4..c0eafcaa9 100644 --- a/test/test_language.py +++ b/test/test_language.py @@ -638,7 +638,6 @@ class TestRequest: class TestWebsocketFrame: - def test_spec(self): e = language.WebsocketFrame.expr() wf = e.parseString("wf:b'foo'") diff --git a/test/test_pathod.py b/test/test_pathod.py index 1a10d2c25..bfff3274a 100644 --- a/test/test_pathod.py +++ b/test/test_pathod.py @@ -185,10 +185,10 @@ class CommonTests(tutils.DaemonTests): assert r.status_code == 202 def test_websocket(self): - r = self.pathoc("ws:/p/") + r = self.pathoc("ws:/p/", ws_read_limit=0) assert r.status_code == 101 - r = self.pathoc("ws:/p/ws") + r = self.pathoc("ws:/p/ws", ws_read_limit=0) assert r.status_code == 101 diff --git a/test/tutils.py b/test/tutils.py index 4c29f5b2d..f8a37a5e0 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -64,10 +64,21 @@ class DaemonTests(object): def get(self, spec): return requests.get(self.d.p(spec), verify=False) - def pathoc(self, spec, timeout=None, connect_to=None, ssl=None): + def pathoc( + self, + spec, + timeout=None, + connect_to=None, + ssl=None, + ws_read_limit=None + ): if ssl is None: ssl = self.ssl - c = pathoc.Pathoc(("localhost", self.d.port), ssl=ssl) + c = pathoc.Pathoc( + ("localhost", self.d.port), + ssl=ssl, + ws_read_limit=ws_read_limit + ) c.connect(connect_to) if timeout: c.settimeout(timeout)