From 32acdcb9fe50f2ab425ddb2ef9c5a544e20c1b10 Mon Sep 17 00:00:00 2001 From: Abhinav Singh <126065+abhinavsingh@users.noreply.github.com> Date: Mon, 10 Jan 2022 19:48:17 +0530 Subject: [PATCH] Decouple transport framework from dashboard plugin (#953) * Decouple transport framework from dashboard plugin * Move `InspectTrafficPlugin` within `http.inspector` module * Avoid exporting plugins within `__init__.py` files * Use `/transport/` prefix to avoid #945 conflict issue * Add todo --- dashboard/src/core/ws.ts | 2 +- proxy/common/constants.py | 11 +-- proxy/common/flag.py | 3 +- proxy/core/event/queue.py | 7 +- proxy/core/event/subscriber.py | 18 ++-- proxy/dashboard/__init__.py | 4 - proxy/dashboard/dashboard.py | 62 +------------- proxy/http/inspector/__init__.py | 10 --- .../inspector}/inspect_traffic.py | 12 ++- proxy/http/websocket/__init__.py | 2 + proxy/{dashboard => http/websocket}/plugin.py | 10 +-- proxy/http/websocket/transport.py | 82 +++++++++++++++++++ tests/test_main.py | 3 +- 13 files changed, 119 insertions(+), 107 deletions(-) rename proxy/{dashboard => http/inspector}/inspect_traffic.py (89%) rename proxy/{dashboard => http/websocket}/plugin.py (89%) create mode 100644 proxy/http/websocket/transport.py diff --git a/dashboard/src/core/ws.ts b/dashboard/src/core/ws.ts index 86d534a7..dc08e0da 100644 --- a/dashboard/src/core/ws.ts +++ b/dashboard/src/core/ws.ts @@ -14,7 +14,7 @@ export class WebsocketApi { private hostname: string = window.location.hostname ? window.location.hostname : 'localhost'; private port: number = window.location.port ? Number(window.location.port) : 8899; // TODO: Must map to route registered by dashboard.py, don't hardcode - private wsPrefix: string = '/dashboard'; + private wsPrefix: string = '/transport/'; private wsScheme: string = window.location.protocol === 'http:' ? 'ws' : 'wss'; private ws: WebSocket; private wsPath: string = this.wsScheme + '://' + this.hostname + ':' + this.port + this.wsPrefix; diff --git a/proxy/common/constants.py b/proxy/common/constants.py index 176a08fe..1da2f5e3 100644 --- a/proxy/common/constants.py +++ b/proxy/common/constants.py @@ -146,15 +146,16 @@ DEFAULT_ABC_PLUGINS = [ 'HttpProtocolHandlerPlugin', 'HttpProxyBasePlugin', 'HttpWebServerBasePlugin', - 'ProxyDashboardWebsocketPlugin', + 'WebSocketTransportBasePlugin', ] +PLUGIN_PROXY_AUTH = 'proxy.http.proxy.AuthPlugin' +PLUGIN_DASHBOARD = 'proxy.dashboard.ProxyDashboard' PLUGIN_HTTP_PROXY = 'proxy.http.proxy.HttpProxyPlugin' PLUGIN_WEB_SERVER = 'proxy.http.server.HttpWebServerPlugin' PLUGIN_PAC_FILE = 'proxy.http.server.HttpWebServerPacFilePlugin' -PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.DevtoolsProtocolPlugin' -PLUGIN_DASHBOARD = 'proxy.dashboard.ProxyDashboard' -PLUGIN_INSPECT_TRAFFIC = 'proxy.dashboard.InspectTrafficPlugin' -PLUGIN_PROXY_AUTH = 'proxy.http.proxy.AuthPlugin' +PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.devtools.DevtoolsProtocolPlugin' +PLUGIN_INSPECT_TRAFFIC = 'proxy.http.inspector.inspect_traffic.InspectTrafficPlugin' +PLUGIN_WEBSOCKET_TRANSPORT = 'proxy.http.websocket.transport.WebSocketTransport' PY2_DEPRECATION_MESSAGE = '''DEPRECATION: proxy.py no longer supports Python 2.7. Kindly upgrade to Python 3+. ' 'If for some reasons you cannot upgrade, use' diff --git a/proxy/common/flag.py b/proxy/common/flag.py index 06783b9a..1908aad4 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -26,7 +26,7 @@ from .constants import COMMA, DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_NUM_ACCEPTORS from .constants import DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE from .constants import PLUGIN_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL, DEFAULT_MIN_COMPRESSION_LIMIT from .constants import PLUGIN_HTTP_PROXY, PLUGIN_INSPECT_TRAFFIC, PLUGIN_PAC_FILE -from .constants import PLUGIN_WEB_SERVER, PLUGIN_PROXY_AUTH, IS_WINDOWS +from .constants import PLUGIN_WEB_SERVER, PLUGIN_PROXY_AUTH, IS_WINDOWS, PLUGIN_WEBSOCKET_TRANSPORT from .logger import Logger from .version import __version__ @@ -395,6 +395,7 @@ class FlagParser: default_plugins.append(PLUGIN_WEB_SERVER) args.enable_static_server = True default_plugins.append(PLUGIN_DASHBOARD) + default_plugins.append(PLUGIN_WEBSOCKET_TRANSPORT) default_plugins.append(PLUGIN_INSPECT_TRAFFIC) args.enable_events = True args.enable_devtools = True diff --git a/proxy/core/event/queue.py b/proxy/core/event/queue.py index f0110e04..f86ba09b 100644 --- a/proxy/core/event/queue.py +++ b/proxy/core/event/queue.py @@ -22,9 +22,10 @@ from .names import eventNames class EventQueue: - """Global event queue. Must be a multiprocessing.Manager queue because - subscribers need to dispatch their subscription queue over this global - queue. + """Global event queue. Must be a multiprocess safe queue capable of + transporting other queues. This is necessary because currently + subscribers use a separate subscription queue to consume events. + Subscription queue is exchanged over the global event queue. Each published event contains following schema:: diff --git a/proxy/core/event/subscriber.py b/proxy/core/event/subscriber.py index 14b9e8f1..14567616 100644 --- a/proxy/core/event/subscriber.py +++ b/proxy/core/event/subscriber.py @@ -34,18 +34,16 @@ class EventSubscriber: can be different from publishers. Publishers can even be processes outside of the proxy.py core. - Note that, EventSubscriber cannot share the `multiprocessing.Manager` - with the EventManager. Because EventSubscriber can be started - in a different process than EventManager. + `multiprocessing.Pipe` is used to initialize a new Queue for + receiving subscribed events from eventing core. Note that, + core EventDispatcher might be running in a separate process + and hence subscription queue must be multiprocess safe. - `multiprocessing.Manager` is used to initialize - a new Queue which is used for subscriptions. EventDispatcher - might be running in a separate process and hence - subscription queue must be multiprocess safe. + When `subscribe` method is called, EventManager stars + a relay thread which consumes event out of the subscription queue + and invoke callback. - When `subscribe` method is called, EventManager will - start a relay thread which consumes using the multiprocess - safe queue passed to the relay thread. + NOTE: Callback is executed in the context of relay thread. """ def __init__(self, event_queue: EventQueue, callback: Callable[[Dict[str, Any]], None]) -> None: diff --git a/proxy/dashboard/__init__.py b/proxy/dashboard/__init__.py index 0f5d3295..96a79aff 100644 --- a/proxy/dashboard/__init__.py +++ b/proxy/dashboard/__init__.py @@ -9,11 +9,7 @@ :license: BSD, see LICENSE for more details. """ from .dashboard import ProxyDashboard -from .inspect_traffic import InspectTrafficPlugin -from .plugin import ProxyDashboardWebsocketPlugin __all__ = [ 'ProxyDashboard', - 'InspectTrafficPlugin', - 'ProxyDashboardWebsocketPlugin', ] diff --git a/proxy/dashboard/dashboard.py b/proxy/dashboard/dashboard.py index 196ff16a..6e719d23 100644 --- a/proxy/dashboard/dashboard.py +++ b/proxy/dashboard/dashboard.py @@ -9,17 +9,11 @@ :license: BSD, see LICENSE for more details. """ import os -import json import logging -from typing import List, Tuple, Any, Dict - -from .plugin import ProxyDashboardWebsocketPlugin - -from ..common.utils import bytes_ +from typing import List, Tuple from ..http.responses import permanentRedirectResponse from ..http.parser import HttpParser -from ..http.websocket import WebsocketFrame from ..http.server import HttpWebServerPlugin, HttpWebServerBasePlugin, httpProtocolTypes logger = logging.getLogger(__name__) @@ -42,24 +36,9 @@ class ProxyDashboard(HttpWebServerBasePlugin): (httpProtocolTypes.HTTPS, r'/dashboard/$'), ] - # Handles WebsocketAPI requests for dashboard - WS_ROUTES = [ - (httpProtocolTypes.WEBSOCKET, r'/dashboard$'), - ] - - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self.plugins: Dict[str, ProxyDashboardWebsocketPlugin] = {} - if b'ProxyDashboardWebsocketPlugin' in self.flags.plugins: - for klass in self.flags.plugins[b'ProxyDashboardWebsocketPlugin']: - p = klass(self.flags, self.client, self.event_queue) - for method in p.methods(): - self.plugins[method] = p - def routes(self) -> List[Tuple[int, str]]: return ProxyDashboard.REDIRECT_ROUTES + \ - ProxyDashboard.INDEX_ROUTES + \ - ProxyDashboard.WS_ROUTES + ProxyDashboard.INDEX_ROUTES def handle_request(self, request: HttpParser) -> None: if request.path == b'/dashboard/': @@ -76,40 +55,3 @@ class ProxyDashboard(HttpWebServerBasePlugin): b'/dashboard/proxy.html', ): self.client.queue(permanentRedirectResponse(b'/dashboard/')) - - def on_websocket_open(self) -> None: - logger.info('app ws opened') - - def on_websocket_message(self, frame: WebsocketFrame) -> None: - try: - assert frame.data - message = json.loads(frame.data) - except UnicodeDecodeError: - logger.error(frame.data) - logger.info(frame.opcode) - return - - method = message['method'] - if method == 'ping': - self.reply({'id': message['id'], 'response': 'pong'}) - elif method in self.plugins: - self.plugins[method].handle_message(message) - else: - logger.info(frame.data) - logger.info(frame.opcode) - self.reply({'id': message['id'], 'response': 'not_implemented'}) - - def on_client_connection_close(self) -> None: - logger.info('app ws closed') - # TODO(abhinavsingh): unsubscribe - - def reply(self, data: Dict[str, Any]) -> None: - self.client.queue( - memoryview( - WebsocketFrame.text( - bytes_( - json.dumps(data), - ), - ), - ), - ) diff --git a/proxy/http/inspector/__init__.py b/proxy/http/inspector/__init__.py index 6e968dde..232621f0 100644 --- a/proxy/http/inspector/__init__.py +++ b/proxy/http/inspector/__init__.py @@ -7,14 +7,4 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. - - .. spelling:: - - http - Submodules """ -from .devtools import DevtoolsProtocolPlugin - -__all__ = [ - 'DevtoolsProtocolPlugin', -] diff --git a/proxy/dashboard/inspect_traffic.py b/proxy/http/inspector/inspect_traffic.py similarity index 89% rename from proxy/dashboard/inspect_traffic.py rename to proxy/http/inspector/inspect_traffic.py index 33fa2120..4c646974 100644 --- a/proxy/dashboard/inspect_traffic.py +++ b/proxy/http/inspector/inspect_traffic.py @@ -11,15 +11,13 @@ import json from typing import List, Dict, Any -from .plugin import ProxyDashboardWebsocketPlugin - -from ..common.utils import bytes_ -from ..core.event import EventSubscriber -from ..core.connection import TcpClientConnection -from ..http.websocket import WebsocketFrame +from ...common.utils import bytes_ +from ...core.event import EventSubscriber +from ...core.connection import TcpClientConnection +from ..websocket import WebsocketFrame, WebSocketTransportBasePlugin -class InspectTrafficPlugin(ProxyDashboardWebsocketPlugin): +class InspectTrafficPlugin(WebSocketTransportBasePlugin): """Websocket API for inspect_traffic.ts frontend plugin.""" def __init__(self, *args: Any, **kwargs: Any) -> None: diff --git a/proxy/http/websocket/__init__.py b/proxy/http/websocket/__init__.py index 350ab7b8..50173873 100644 --- a/proxy/http/websocket/__init__.py +++ b/proxy/http/websocket/__init__.py @@ -17,9 +17,11 @@ """ from .frame import WebsocketFrame, websocketOpcodes from .client import WebsocketClient +from .plugin import WebSocketTransportBasePlugin __all__ = [ 'websocketOpcodes', 'WebsocketFrame', 'WebsocketClient', + 'WebSocketTransportBasePlugin', ] diff --git a/proxy/dashboard/plugin.py b/proxy/http/websocket/plugin.py similarity index 89% rename from proxy/dashboard/plugin.py rename to proxy/http/websocket/plugin.py index 86032c69..f6881632 100644 --- a/proxy/dashboard/plugin.py +++ b/proxy/http/websocket/plugin.py @@ -13,13 +13,13 @@ import json from abc import ABC, abstractmethod from typing import List, Dict, Any -from ..common.utils import bytes_ -from ..http.websocket import WebsocketFrame -from ..core.connection import TcpClientConnection -from ..core.event import EventQueue +from ...common.utils import bytes_ +from . import WebsocketFrame +from ...core.connection import TcpClientConnection +from ...core.event import EventQueue -class ProxyDashboardWebsocketPlugin(ABC): +class WebSocketTransportBasePlugin(ABC): """Abstract class for plugins extending dashboard websocket API.""" def __init__( diff --git a/proxy/http/websocket/transport.py b/proxy/http/websocket/transport.py new file mode 100644 index 00000000..07a8328a --- /dev/null +++ b/proxy/http/websocket/transport.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import json +import logging +from typing import List, Tuple, Any, Dict + +from ...common.utils import bytes_ + +from ..server import httpProtocolTypes, HttpWebServerBasePlugin +from ..parser import HttpParser + +from .frame import WebsocketFrame +from .plugin import WebSocketTransportBasePlugin + +logger = logging.getLogger(__name__) + + +class WebSocketTransport(HttpWebServerBasePlugin): + """WebSocket transport framework.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.plugins: Dict[str, WebSocketTransportBasePlugin] = {} + if b'WebSocketTransportBasePlugin' in self.flags.plugins: + for klass in self.flags.plugins[b'WebSocketTransportBasePlugin']: + p = klass(self.flags, self.client, self.event_queue) + for method in p.methods(): + self.plugins[method] = p + + def routes(self) -> List[Tuple[int, str]]: + return [ + (httpProtocolTypes.WEBSOCKET, r'/transport/$'), + ] + + def handle_request(self, request: HttpParser) -> None: + raise NotImplementedError() + + def on_websocket_open(self) -> None: + # TODO(abhinavsingh): Add connected callback invocation + logger.info('app ws opened') + + def on_websocket_message(self, frame: WebsocketFrame) -> None: + try: + assert frame.data + message = json.loads(frame.data) + except UnicodeDecodeError: + logger.error(frame.data) + logger.info(frame.opcode) + return + + method = message['method'] + if method == 'ping': + self.reply({'id': message['id'], 'response': 'pong'}) + elif method in self.plugins: + self.plugins[method].handle_message(message) + else: + logger.info(frame.data) + logger.info(frame.opcode) + self.reply({'id': message['id'], 'response': 'not_implemented'}) + + def on_client_connection_close(self) -> None: + # TODO(abhinavsingh): Add disconnected callback invocation + logger.info('app ws closed') + + def reply(self, data: Dict[str, Any]) -> None: + self.client.queue( + memoryview( + WebsocketFrame.text( + bytes_( + json.dumps(data), + ), + ), + ), + ) diff --git a/tests/test_main.py b/tests/test_main.py index c2fb4ea8..bcad1e5b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -29,7 +29,7 @@ from proxy.common.constants import ( # noqa: WPS450 DEFAULT_ENABLE_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL, DEFAULT_ENABLE_WEB_SERVER, DEFAULT_DISABLE_HTTP_PROXY, DEFAULT_CA_SIGNING_KEY_FILE, DEFAULT_CLIENT_RECVBUF_SIZE, - DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_ENABLE_STATIC_SERVER, + DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_ENABLE_STATIC_SERVER, PLUGIN_WEBSOCKET_TRANSPORT, _env_threadless_compliant, ) @@ -243,6 +243,7 @@ class TestMain(unittest.TestCase): mock_load_plugins.call_args_list[0][0][0], [ bytes_(PLUGIN_WEB_SERVER), bytes_(PLUGIN_DASHBOARD), + bytes_(PLUGIN_WEBSOCKET_TRANSPORT), bytes_(PLUGIN_INSPECT_TRAFFIC), bytes_(PLUGIN_DEVTOOLS_PROTOCOL), bytes_(PLUGIN_HTTP_PROXY),