From 4520ae31a6119ab1ee433e27c9516159581d45da Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 24 Nov 2020 21:37:11 +0530 Subject: [PATCH] Refactor base server interfaces into core modules (#461) * Ensure pending buffers are flushed before shutting down in base_server.py Handle unsupported scheme cases within connect_tunnel.py * Move base implementations within core module * Update ssl_echo_server --- examples/README.md | 8 +- examples/base_server.py | 75 ------------- examples/https_connect_tunnel.py | 85 ++++++++++++++ examples/ssl_echo_server.py | 14 ++- examples/tcp_echo_server.py | 11 +- proxy/core/base/__init__.py | 17 +++ proxy/core/base/tcp_server.py | 106 ++++++++++++++++++ .../core/base/tcp_tunnel.py | 72 ++---------- 8 files changed, 238 insertions(+), 150 deletions(-) delete mode 100644 examples/base_server.py create mode 100644 examples/https_connect_tunnel.py create mode 100644 proxy/core/base/__init__.py create mode 100644 proxy/core/base/tcp_server.py rename examples/connect_tunnel.py => proxy/core/base/tcp_tunnel.py (58%) diff --git a/examples/README.md b/examples/README.md index 04b38a00..84bf0f44 100644 --- a/examples/README.md +++ b/examples/README.md @@ -12,7 +12,7 @@ Table of Contents * [SSL Echo Server](#ssl-echo-server) * [SSL Echo Client](#ssl-echo-client) * [PubSub Eventing](#pubsub-eventing) -* [Connect Tunnel](#connect-tunnel) +* [Https Connect Tunnel](#https-connect-tunnel) ## WebSocket Client @@ -117,7 +117,7 @@ DEBUG:proxy.core.event.subscriber:Un-subscribed relay sub id 5eb22010764f4d44900 Received 52724 events from main thread, 60172 events from another process, in 21.50117802619934 seconds ``` -## Connect Tunnel +## HTTPS Connect Tunnel A simple HTTP proxy server supporting only CONNECT (https) requests. @@ -125,10 +125,10 @@ A simple HTTP proxy server supporting only CONNECT (https) requests. 2. Uses `TcpServerConnection` to establish upstream connection. 3. Overrides `BaseServer` methods to also register read/write events for upstream connection. -Start `connect_tunnel.py` as: +Start `https_connect_tunnel.py` as: ``` -❯ PYTHONPATH=. python examples/connect_tunnel.py +❯ PYTHONPATH=. python examples/https_connect_tunnel.py ``` Send https requests via tunnel as: diff --git a/examples/base_server.py b/examples/base_server.py deleted file mode 100644 index 19b3c7b6..00000000 --- a/examples/base_server.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -""" - proxy.py - ~~~~~~~~ - ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on - Network monitoring, controls & Application development, testing, debugging. - - :copyright: (c) 2013-present by Abhinav Singh and contributors. - :license: BSD, see LICENSE for more details. -""" -from abc import abstractmethod -import socket -import selectors - -from typing import Dict, Any - -from proxy.core.acceptor import Work -from proxy.common.types import Readables, Writables - - -class BaseServerHandler(Work): - """BaseServerHandler implements Work interface. - - An instance of BaseServerHandler is created for each client - connection. BaseServerHandler lifecycle is controlled by - Threadless core using asyncio. - - Implementation must provide: - a) handle_data(data: memoryview) - c) (optionally) intialize, is_inactive and shutdown methods - """ - - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - print('Connection accepted from {0}'.format(self.client.addr)) - - @abstractmethod - def handle_data(self, data: memoryview) -> None: - pass # pragma: no cover - - def get_events(self) -> Dict[socket.socket, int]: - # We always want to read from client - # Register for EVENT_READ events - events = {self.client.connection: selectors.EVENT_READ} - # If there is pending buffer for client - # also register for EVENT_WRITE events - if self.client.has_buffer(): - events[self.client.connection] |= selectors.EVENT_WRITE - return events - - def handle_events( - self, - readables: Readables, - writables: Writables) -> bool: - """Return True to shutdown work.""" - if self.client.connection in readables: - try: - data = self.client.recv() - if data is None: - # Client closed connection, signal shutdown - print( - 'Connection closed by client {0}'.format( - self.client.addr)) - return True - self.handle_data(data) - except ConnectionResetError: - print( - 'Connection reset by client {0}'.format( - self.client.addr)) - return True - - if self.client.connection in writables: - self.client.flush() - - return False diff --git a/examples/https_connect_tunnel.py b/examples/https_connect_tunnel.py new file mode 100644 index 00000000..95018646 --- /dev/null +++ b/examples/https_connect_tunnel.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import time +from typing import Any, Optional + +from proxy.proxy import Proxy +from proxy.common.utils import build_http_response +from proxy.http.codes import httpStatusCodes +from proxy.http.parser import httpParserStates +from proxy.http.methods import httpMethods +from proxy.core.acceptor import AcceptorPool +from proxy.core.base import BaseTcpTunnelHandler + + +class HttpsConnectTunnelHandler(BaseTcpTunnelHandler): + """A https CONNECT tunnel.""" + + PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT = memoryview(build_http_response( + httpStatusCodes.OK, + reason=b'Connection established' + )) + + PROXY_TUNNEL_UNSUPPORTED_SCHEME = memoryview(build_http_response( + httpStatusCodes.BAD_REQUEST, + headers={b'Connection': b'close'}, + reason=b'Unsupported protocol scheme' + )) + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + def handle_data(self, data: memoryview) -> Optional[bool]: + # Queue for upstream if connection has been established + if self.upstream and self.upstream._conn is not None: + self.upstream.queue(data) + return None + + # Parse client request + self.request.parse(data) + + # Drop the request if not a CONNECT request + if self.request.method != httpMethods.CONNECT: + self.client.queue( + HttpsConnectTunnelHandler.PROXY_TUNNEL_UNSUPPORTED_SCHEME) + return True + + # CONNECT requests are short and we need not worry about + # receiving partial request bodies here. + assert self.request.state == httpParserStates.COMPLETE + + # Establish connection with upstream + self.connect_upstream() + + # Queue tunnel established response to client + self.client.queue( + HttpsConnectTunnelHandler.PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT) + + return None + + +def main() -> None: + # This example requires `threadless=True` + pool = AcceptorPool( + flags=Proxy.initialize(port=12345, num_workers=1, threadless=True), + work_klass=HttpsConnectTunnelHandler) + try: + pool.setup() + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + pool.shutdown() + + +if __name__ == '__main__': + main() diff --git a/examples/ssl_echo_server.py b/examples/ssl_echo_server.py index dfd26dcf..013bc3a5 100644 --- a/examples/ssl_echo_server.py +++ b/examples/ssl_echo_server.py @@ -9,16 +9,17 @@ :license: BSD, see LICENSE for more details. """ import time +from typing import Optional from proxy.proxy import Proxy +from proxy.common.utils import wrap_socket from proxy.core.acceptor import AcceptorPool from proxy.core.connection import TcpClientConnection -from proxy.common.utils import wrap_socket -from examples.base_server import BaseServerHandler +from proxy.core.base import BaseTcpServerHandler -class EchoSSLServerHandler(BaseServerHandler): # type: ignore +class EchoSSLServerHandler(BaseTcpServerHandler): """Wraps client socket during initialization.""" def initialize(self) -> None: @@ -26,17 +27,18 @@ class EchoSSLServerHandler(BaseServerHandler): # type: ignore # here using wrap_socket() utility. assert self.flags.keyfile is not None and self.flags.certfile is not None conn = wrap_socket( - self.client.connection, # type: ignore + self.client.connection, self.flags.keyfile, self.flags.certfile) conn.setblocking(False) # Upgrade plain TcpClientConnection to SSL connection object self.client = TcpClientConnection( - conn=conn, addr=self.client.addr) # type: ignore + conn=conn, addr=self.client.addr) - def handle_data(self, data: memoryview) -> None: + def handle_data(self, data: memoryview) -> Optional[bool]: # echo back to client self.client.queue(data) + return None def main() -> None: diff --git a/examples/tcp_echo_server.py b/examples/tcp_echo_server.py index bb904323..c468b7ea 100644 --- a/examples/tcp_echo_server.py +++ b/examples/tcp_echo_server.py @@ -9,22 +9,23 @@ :license: BSD, see LICENSE for more details. """ import time +from typing import Optional -from proxy.core.acceptor import AcceptorPool from proxy.proxy import Proxy - -from examples.base_server import BaseServerHandler +from proxy.core.acceptor import AcceptorPool +from proxy.core.base import BaseTcpServerHandler -class EchoServerHandler(BaseServerHandler): # type: ignore +class EchoServerHandler(BaseTcpServerHandler): """Sets client socket to non-blocking during initialization.""" def initialize(self) -> None: self.client.connection.setblocking(False) - def handle_data(self, data: memoryview) -> None: + def handle_data(self, data: memoryview) -> Optional[bool]: # echo back to client self.client.queue(data) + return None def main() -> None: diff --git a/proxy/core/base/__init__.py b/proxy/core/base/__init__.py new file mode 100644 index 00000000..c60f1477 --- /dev/null +++ b/proxy/core/base/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +from .tcp_server import BaseTcpServerHandler +from .tcp_tunnel import BaseTcpTunnelHandler + +__all__ = [ + 'BaseTcpServerHandler', + 'BaseTcpTunnelHandler', +] diff --git a/proxy/core/base/tcp_server.py b/proxy/core/base/tcp_server.py new file mode 100644 index 00000000..bff04311 --- /dev/null +++ b/proxy/core/base/tcp_server.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +from abc import abstractmethod +import socket +import selectors + +from typing import Dict, Any, Optional + +from proxy.core.acceptor import Work +from proxy.common.types import Readables, Writables + + +class BaseTcpServerHandler(Work): + """BaseTcpServerHandler implements Work interface. + + An instance of BaseTcpServerHandler is created for each client + connection. BaseServerHandler lifecycle is controlled by + Threadless core using asyncio. + + BaseServerHandler ensures that pending buffers are flushed + before client connection is closed. + + Implementations must provide: + a) handle_data(data: memoryview) + c) (optionally) intialize, is_inactive and shutdown methods + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.must_flush_before_shutdown = False + print('Connection accepted from {0}'.format(self.client.addr)) + + @abstractmethod + def handle_data(self, data: memoryview) -> Optional[bool]: + """Optionally return True to close client connection.""" + pass # pragma: no cover + + def get_events(self) -> Dict[socket.socket, int]: + events = {} + # We always want to read from client + # Register for EVENT_READ events + if self.must_flush_before_shutdown is False: + events[self.client.connection] = selectors.EVENT_READ + # If there is pending buffer for client + # also register for EVENT_WRITE events + if self.client.has_buffer(): + if self.client.connection in events: + events[self.client.connection] |= selectors.EVENT_WRITE + else: + events[self.client.connection] = selectors.EVENT_WRITE + return events + + def handle_events( + self, + readables: Readables, + writables: Writables) -> bool: + """Return True to shutdown work.""" + do_shutdown = False + if self.client.connection in readables: + try: + data = self.client.recv() + if data is None: + # Client closed connection, signal shutdown + print( + 'Connection closed by client {0}'.format( + self.client.addr)) + do_shutdown = True + else: + r = self.handle_data(data) + if isinstance(r, bool) and r is True: + print( + 'Implementation signaled shutdown for client {0}'.format( + self.client.addr)) + if self.client.has_buffer(): + print( + 'Client {0} has pending buffer, will be flushed before shutting down'.format( + self.client.addr)) + self.must_flush_before_shutdown = True + else: + do_shutdown = True + except ConnectionResetError: + print( + 'Connection reset by client {0}'.format( + self.client.addr)) + do_shutdown = True + + if self.client.connection in writables: + print('Flushing buffer to client {0}'.format(self.client.addr)) + self.client.flush() + if self.must_flush_before_shutdown is True: + do_shutdown = True + self.must_flush_before_shutdown = False + + if do_shutdown: + print( + 'Shutting down client {0} connection'.format( + self.client.addr)) + return do_shutdown diff --git a/examples/connect_tunnel.py b/proxy/core/base/tcp_tunnel.py similarity index 58% rename from examples/connect_tunnel.py rename to proxy/core/base/tcp_tunnel.py index ad230ece..d83053ae 100644 --- a/examples/connect_tunnel.py +++ b/proxy/core/base/tcp_tunnel.py @@ -8,36 +8,31 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. """ -import time +from abc import abstractmethod import socket import selectors from typing import Any, Optional, Dict -from proxy.proxy import Proxy -from proxy.core.acceptor import AcceptorPool -from proxy.core.connection import TcpServerConnection -from proxy.http.parser import HttpParser, httpParserTypes, httpParserStates -from proxy.http.codes import httpStatusCodes -from proxy.http.methods import httpMethods -from proxy.common.types import Readables, Writables -from proxy.common.utils import build_http_response, text_ +from ...http.parser import HttpParser, httpParserTypes +from ...common.types import Readables, Writables +from ...common.utils import text_ -from examples.base_server import BaseServerHandler +from ..connection import TcpServerConnection +from .tcp_server import BaseTcpServerHandler -class ConnectTunnelHandler(BaseServerHandler): # type: ignore - """A http CONNECT tunnel server.""" - - PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT = memoryview(build_http_response( - httpStatusCodes.OK, - reason=b'Connection established' - )) +class BaseTcpTunnelHandler(BaseTcpServerHandler): + """Base TCP tunnel interface.""" def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.request = HttpParser(httpParserTypes.REQUEST_PARSER) self.upstream: Optional[TcpServerConnection] = None + @abstractmethod + def handle_data(self, data: memoryview) -> Optional[bool]: + pass # pragma: no cover + def initialize(self) -> None: self.client.connection.setblocking(False) @@ -48,30 +43,6 @@ class ConnectTunnelHandler(BaseServerHandler): # type: ignore self.upstream.close() super().shutdown() - def handle_data(self, data: memoryview) -> None: - # Queue for upstream if connection has been established - if self.upstream and self.upstream._conn is not None: - self.upstream.queue(data) - return - - # Parse client request - self.request.parse(data) - - # Drop the request if not a CONNECT request - if self.request.method != httpMethods.CONNECT: - pass - - # CONNECT requests are short and we need not worry about - # receiving partial request bodies here. - assert self.request.state == httpParserStates.COMPLETE - - # Establish connection with upstream - self.connect_upstream() - - # Queue tunnel established response to client - self.client.queue( - ConnectTunnelHandler.PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT) - def get_events(self) -> Dict[socket.socket, int]: # Get default client events ev: Dict[socket.socket, int] = super().get_events() @@ -115,22 +86,3 @@ class ConnectTunnelHandler(BaseServerHandler): # type: ignore self.upstream.connect() print('Connection established with upstream {0}:{1}'.format( text_(self.request.host), self.request.port)) - - -def main() -> None: - # This example requires `threadless=True` - pool = AcceptorPool( - flags=Proxy.initialize(port=12345, num_workers=1, threadless=True), - work_klass=ConnectTunnelHandler) - try: - pool.setup() - while True: - time.sleep(1) - except KeyboardInterrupt: - pass - finally: - pool.shutdown() - - -if __name__ == '__main__': - main()