Decouple transport framework from dashboard plugin (#953)

* Decouple transport framework from dashboard plugin

* Move `InspectTrafficPlugin` within `http.inspector` module

* Avoid exporting plugins within `__init__.py` files

* Use `/transport/` prefix to avoid #945 conflict issue

* Add todo
This commit is contained in:
Abhinav Singh 2022-01-10 19:48:17 +05:30 committed by GitHub
parent 372a9edb44
commit 32acdcb9fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 119 additions and 107 deletions

View File

@ -14,7 +14,7 @@ export class WebsocketApi {
private hostname: string = window.location.hostname ? window.location.hostname : 'localhost';
private port: number = window.location.port ? Number(window.location.port) : 8899;
// TODO: Must map to route registered by dashboard.py, don't hardcode
private wsPrefix: string = '/dashboard';
private wsPrefix: string = '/transport/';
private wsScheme: string = window.location.protocol === 'http:' ? 'ws' : 'wss';
private ws: WebSocket;
private wsPath: string = this.wsScheme + '://' + this.hostname + ':' + this.port + this.wsPrefix;

View File

@ -146,15 +146,16 @@ DEFAULT_ABC_PLUGINS = [
'HttpProtocolHandlerPlugin',
'HttpProxyBasePlugin',
'HttpWebServerBasePlugin',
'ProxyDashboardWebsocketPlugin',
'WebSocketTransportBasePlugin',
]
PLUGIN_PROXY_AUTH = 'proxy.http.proxy.AuthPlugin'
PLUGIN_DASHBOARD = 'proxy.dashboard.ProxyDashboard'
PLUGIN_HTTP_PROXY = 'proxy.http.proxy.HttpProxyPlugin'
PLUGIN_WEB_SERVER = 'proxy.http.server.HttpWebServerPlugin'
PLUGIN_PAC_FILE = 'proxy.http.server.HttpWebServerPacFilePlugin'
PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.DevtoolsProtocolPlugin'
PLUGIN_DASHBOARD = 'proxy.dashboard.ProxyDashboard'
PLUGIN_INSPECT_TRAFFIC = 'proxy.dashboard.InspectTrafficPlugin'
PLUGIN_PROXY_AUTH = 'proxy.http.proxy.AuthPlugin'
PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.devtools.DevtoolsProtocolPlugin'
PLUGIN_INSPECT_TRAFFIC = 'proxy.http.inspector.inspect_traffic.InspectTrafficPlugin'
PLUGIN_WEBSOCKET_TRANSPORT = 'proxy.http.websocket.transport.WebSocketTransport'
PY2_DEPRECATION_MESSAGE = '''DEPRECATION: proxy.py no longer supports Python 2.7. Kindly upgrade to Python 3+. '
'If for some reasons you cannot upgrade, use'

View File

@ -26,7 +26,7 @@ from .constants import COMMA, DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_NUM_ACCEPTORS
from .constants import DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE
from .constants import PLUGIN_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL, DEFAULT_MIN_COMPRESSION_LIMIT
from .constants import PLUGIN_HTTP_PROXY, PLUGIN_INSPECT_TRAFFIC, PLUGIN_PAC_FILE
from .constants import PLUGIN_WEB_SERVER, PLUGIN_PROXY_AUTH, IS_WINDOWS
from .constants import PLUGIN_WEB_SERVER, PLUGIN_PROXY_AUTH, IS_WINDOWS, PLUGIN_WEBSOCKET_TRANSPORT
from .logger import Logger
from .version import __version__
@ -395,6 +395,7 @@ class FlagParser:
default_plugins.append(PLUGIN_WEB_SERVER)
args.enable_static_server = True
default_plugins.append(PLUGIN_DASHBOARD)
default_plugins.append(PLUGIN_WEBSOCKET_TRANSPORT)
default_plugins.append(PLUGIN_INSPECT_TRAFFIC)
args.enable_events = True
args.enable_devtools = True

View File

@ -22,9 +22,10 @@ from .names import eventNames
class EventQueue:
"""Global event queue. Must be a multiprocessing.Manager queue because
subscribers need to dispatch their subscription queue over this global
queue.
"""Global event queue. Must be a multiprocess safe queue capable of
transporting other queues. This is necessary because currently
subscribers use a separate subscription queue to consume events.
Subscription queue is exchanged over the global event queue.
Each published event contains following schema::

View File

@ -34,18 +34,16 @@ class EventSubscriber:
can be different from publishers. Publishers can even be processes
outside of the proxy.py core.
Note that, EventSubscriber cannot share the `multiprocessing.Manager`
with the EventManager. Because EventSubscriber can be started
in a different process than EventManager.
`multiprocessing.Pipe` is used to initialize a new Queue for
receiving subscribed events from eventing core. Note that,
core EventDispatcher might be running in a separate process
and hence subscription queue must be multiprocess safe.
`multiprocessing.Manager` is used to initialize
a new Queue which is used for subscriptions. EventDispatcher
might be running in a separate process and hence
subscription queue must be multiprocess safe.
When `subscribe` method is called, EventManager stars
a relay thread which consumes event out of the subscription queue
and invoke callback.
When `subscribe` method is called, EventManager will
start a relay thread which consumes using the multiprocess
safe queue passed to the relay thread.
NOTE: Callback is executed in the context of relay thread.
"""
def __init__(self, event_queue: EventQueue, callback: Callable[[Dict[str, Any]], None]) -> None:

View File

@ -9,11 +9,7 @@
:license: BSD, see LICENSE for more details.
"""
from .dashboard import ProxyDashboard
from .inspect_traffic import InspectTrafficPlugin
from .plugin import ProxyDashboardWebsocketPlugin
__all__ = [
'ProxyDashboard',
'InspectTrafficPlugin',
'ProxyDashboardWebsocketPlugin',
]

View File

@ -9,17 +9,11 @@
:license: BSD, see LICENSE for more details.
"""
import os
import json
import logging
from typing import List, Tuple, Any, Dict
from .plugin import ProxyDashboardWebsocketPlugin
from ..common.utils import bytes_
from typing import List, Tuple
from ..http.responses import permanentRedirectResponse
from ..http.parser import HttpParser
from ..http.websocket import WebsocketFrame
from ..http.server import HttpWebServerPlugin, HttpWebServerBasePlugin, httpProtocolTypes
logger = logging.getLogger(__name__)
@ -42,24 +36,9 @@ class ProxyDashboard(HttpWebServerBasePlugin):
(httpProtocolTypes.HTTPS, r'/dashboard/$'),
]
# Handles WebsocketAPI requests for dashboard
WS_ROUTES = [
(httpProtocolTypes.WEBSOCKET, r'/dashboard$'),
]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.plugins: Dict[str, ProxyDashboardWebsocketPlugin] = {}
if b'ProxyDashboardWebsocketPlugin' in self.flags.plugins:
for klass in self.flags.plugins[b'ProxyDashboardWebsocketPlugin']:
p = klass(self.flags, self.client, self.event_queue)
for method in p.methods():
self.plugins[method] = p
def routes(self) -> List[Tuple[int, str]]:
return ProxyDashboard.REDIRECT_ROUTES + \
ProxyDashboard.INDEX_ROUTES + \
ProxyDashboard.WS_ROUTES
ProxyDashboard.INDEX_ROUTES
def handle_request(self, request: HttpParser) -> None:
if request.path == b'/dashboard/':
@ -76,40 +55,3 @@ class ProxyDashboard(HttpWebServerBasePlugin):
b'/dashboard/proxy.html',
):
self.client.queue(permanentRedirectResponse(b'/dashboard/'))
def on_websocket_open(self) -> None:
logger.info('app ws opened')
def on_websocket_message(self, frame: WebsocketFrame) -> None:
try:
assert frame.data
message = json.loads(frame.data)
except UnicodeDecodeError:
logger.error(frame.data)
logger.info(frame.opcode)
return
method = message['method']
if method == 'ping':
self.reply({'id': message['id'], 'response': 'pong'})
elif method in self.plugins:
self.plugins[method].handle_message(message)
else:
logger.info(frame.data)
logger.info(frame.opcode)
self.reply({'id': message['id'], 'response': 'not_implemented'})
def on_client_connection_close(self) -> None:
logger.info('app ws closed')
# TODO(abhinavsingh): unsubscribe
def reply(self, data: Dict[str, Any]) -> None:
self.client.queue(
memoryview(
WebsocketFrame.text(
bytes_(
json.dumps(data),
),
),
),
)

View File

@ -7,14 +7,4 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
.. spelling::
http
Submodules
"""
from .devtools import DevtoolsProtocolPlugin
__all__ = [
'DevtoolsProtocolPlugin',
]

View File

@ -11,15 +11,13 @@
import json
from typing import List, Dict, Any
from .plugin import ProxyDashboardWebsocketPlugin
from ..common.utils import bytes_
from ..core.event import EventSubscriber
from ..core.connection import TcpClientConnection
from ..http.websocket import WebsocketFrame
from ...common.utils import bytes_
from ...core.event import EventSubscriber
from ...core.connection import TcpClientConnection
from ..websocket import WebsocketFrame, WebSocketTransportBasePlugin
class InspectTrafficPlugin(ProxyDashboardWebsocketPlugin):
class InspectTrafficPlugin(WebSocketTransportBasePlugin):
"""Websocket API for inspect_traffic.ts frontend plugin."""
def __init__(self, *args: Any, **kwargs: Any) -> None:

View File

@ -17,9 +17,11 @@
"""
from .frame import WebsocketFrame, websocketOpcodes
from .client import WebsocketClient
from .plugin import WebSocketTransportBasePlugin
__all__ = [
'websocketOpcodes',
'WebsocketFrame',
'WebsocketClient',
'WebSocketTransportBasePlugin',
]

View File

@ -13,13 +13,13 @@ import json
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from ..common.utils import bytes_
from ..http.websocket import WebsocketFrame
from ..core.connection import TcpClientConnection
from ..core.event import EventQueue
from ...common.utils import bytes_
from . import WebsocketFrame
from ...core.connection import TcpClientConnection
from ...core.event import EventQueue
class ProxyDashboardWebsocketPlugin(ABC):
class WebSocketTransportBasePlugin(ABC):
"""Abstract class for plugins extending dashboard websocket API."""
def __init__(

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import json
import logging
from typing import List, Tuple, Any, Dict
from ...common.utils import bytes_
from ..server import httpProtocolTypes, HttpWebServerBasePlugin
from ..parser import HttpParser
from .frame import WebsocketFrame
from .plugin import WebSocketTransportBasePlugin
logger = logging.getLogger(__name__)
class WebSocketTransport(HttpWebServerBasePlugin):
"""WebSocket transport framework."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.plugins: Dict[str, WebSocketTransportBasePlugin] = {}
if b'WebSocketTransportBasePlugin' in self.flags.plugins:
for klass in self.flags.plugins[b'WebSocketTransportBasePlugin']:
p = klass(self.flags, self.client, self.event_queue)
for method in p.methods():
self.plugins[method] = p
def routes(self) -> List[Tuple[int, str]]:
return [
(httpProtocolTypes.WEBSOCKET, r'/transport/$'),
]
def handle_request(self, request: HttpParser) -> None:
raise NotImplementedError()
def on_websocket_open(self) -> None:
# TODO(abhinavsingh): Add connected callback invocation
logger.info('app ws opened')
def on_websocket_message(self, frame: WebsocketFrame) -> None:
try:
assert frame.data
message = json.loads(frame.data)
except UnicodeDecodeError:
logger.error(frame.data)
logger.info(frame.opcode)
return
method = message['method']
if method == 'ping':
self.reply({'id': message['id'], 'response': 'pong'})
elif method in self.plugins:
self.plugins[method].handle_message(message)
else:
logger.info(frame.data)
logger.info(frame.opcode)
self.reply({'id': message['id'], 'response': 'not_implemented'})
def on_client_connection_close(self) -> None:
# TODO(abhinavsingh): Add disconnected callback invocation
logger.info('app ws closed')
def reply(self, data: Dict[str, Any]) -> None:
self.client.queue(
memoryview(
WebsocketFrame.text(
bytes_(
json.dumps(data),
),
),
),
)

View File

@ -29,7 +29,7 @@ from proxy.common.constants import ( # noqa: WPS450
DEFAULT_ENABLE_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL,
DEFAULT_ENABLE_WEB_SERVER, DEFAULT_DISABLE_HTTP_PROXY,
DEFAULT_CA_SIGNING_KEY_FILE, DEFAULT_CLIENT_RECVBUF_SIZE,
DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_ENABLE_STATIC_SERVER,
DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_ENABLE_STATIC_SERVER, PLUGIN_WEBSOCKET_TRANSPORT,
_env_threadless_compliant,
)
@ -243,6 +243,7 @@ class TestMain(unittest.TestCase):
mock_load_plugins.call_args_list[0][0][0], [
bytes_(PLUGIN_WEB_SERVER),
bytes_(PLUGIN_DASHBOARD),
bytes_(PLUGIN_WEBSOCKET_TRANSPORT),
bytes_(PLUGIN_INSPECT_TRAFFIC),
bytes_(PLUGIN_DEVTOOLS_PROTOCOL),
bytes_(PLUGIN_HTTP_PROXY),