Refactor base server interfaces into core modules (#461)

* Ensure pending buffers are flushed before shutting down in base_server.py

Handle unsupported scheme cases within connect_tunnel.py

* Move base implementations within core module

* Update ssl_echo_server
This commit is contained in:
Abhinav Singh 2020-11-24 21:37:11 +05:30 committed by GitHub
parent da23ae03bc
commit 4520ae31a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 238 additions and 150 deletions

View File

@ -12,7 +12,7 @@ Table of Contents
* [SSL Echo Server](#ssl-echo-server)
* [SSL Echo Client](#ssl-echo-client)
* [PubSub Eventing](#pubsub-eventing)
* [Connect Tunnel](#connect-tunnel)
* [Https Connect Tunnel](#https-connect-tunnel)
## WebSocket Client
@ -117,7 +117,7 @@ DEBUG:proxy.core.event.subscriber:Un-subscribed relay sub id 5eb22010764f4d44900
Received 52724 events from main thread, 60172 events from another process, in 21.50117802619934 seconds
```
## Connect Tunnel
## HTTPS Connect Tunnel
A simple HTTP proxy server supporting only CONNECT (https) requests.
@ -125,10 +125,10 @@ A simple HTTP proxy server supporting only CONNECT (https) requests.
2. Uses `TcpServerConnection` to establish upstream connection.
3. Overrides `BaseServer` methods to also register read/write events for upstream connection.
Start `connect_tunnel.py` as:
Start `https_connect_tunnel.py` as:
```
PYTHONPATH=. python examples/connect_tunnel.py
PYTHONPATH=. python examples/https_connect_tunnel.py
```
Send https requests via tunnel as:

View File

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from abc import abstractmethod
import socket
import selectors
from typing import Dict, Any
from proxy.core.acceptor import Work
from proxy.common.types import Readables, Writables
class BaseServerHandler(Work):
"""BaseServerHandler implements Work interface.
An instance of BaseServerHandler is created for each client
connection. BaseServerHandler lifecycle is controlled by
Threadless core using asyncio.
Implementation must provide:
a) handle_data(data: memoryview)
c) (optionally) intialize, is_inactive and shutdown methods
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
print('Connection accepted from {0}'.format(self.client.addr))
@abstractmethod
def handle_data(self, data: memoryview) -> None:
pass # pragma: no cover
def get_events(self) -> Dict[socket.socket, int]:
# We always want to read from client
# Register for EVENT_READ events
events = {self.client.connection: selectors.EVENT_READ}
# If there is pending buffer for client
# also register for EVENT_WRITE events
if self.client.has_buffer():
events[self.client.connection] |= selectors.EVENT_WRITE
return events
def handle_events(
self,
readables: Readables,
writables: Writables) -> bool:
"""Return True to shutdown work."""
if self.client.connection in readables:
try:
data = self.client.recv()
if data is None:
# Client closed connection, signal shutdown
print(
'Connection closed by client {0}'.format(
self.client.addr))
return True
self.handle_data(data)
except ConnectionResetError:
print(
'Connection reset by client {0}'.format(
self.client.addr))
return True
if self.client.connection in writables:
self.client.flush()
return False

View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time
from typing import Any, Optional
from proxy.proxy import Proxy
from proxy.common.utils import build_http_response
from proxy.http.codes import httpStatusCodes
from proxy.http.parser import httpParserStates
from proxy.http.methods import httpMethods
from proxy.core.acceptor import AcceptorPool
from proxy.core.base import BaseTcpTunnelHandler
class HttpsConnectTunnelHandler(BaseTcpTunnelHandler):
"""A https CONNECT tunnel."""
PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT = memoryview(build_http_response(
httpStatusCodes.OK,
reason=b'Connection established'
))
PROXY_TUNNEL_UNSUPPORTED_SCHEME = memoryview(build_http_response(
httpStatusCodes.BAD_REQUEST,
headers={b'Connection': b'close'},
reason=b'Unsupported protocol scheme'
))
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
def handle_data(self, data: memoryview) -> Optional[bool]:
# Queue for upstream if connection has been established
if self.upstream and self.upstream._conn is not None:
self.upstream.queue(data)
return None
# Parse client request
self.request.parse(data)
# Drop the request if not a CONNECT request
if self.request.method != httpMethods.CONNECT:
self.client.queue(
HttpsConnectTunnelHandler.PROXY_TUNNEL_UNSUPPORTED_SCHEME)
return True
# CONNECT requests are short and we need not worry about
# receiving partial request bodies here.
assert self.request.state == httpParserStates.COMPLETE
# Establish connection with upstream
self.connect_upstream()
# Queue tunnel established response to client
self.client.queue(
HttpsConnectTunnelHandler.PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT)
return None
def main() -> None:
# This example requires `threadless=True`
pool = AcceptorPool(
flags=Proxy.initialize(port=12345, num_workers=1, threadless=True),
work_klass=HttpsConnectTunnelHandler)
try:
pool.setup()
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
pool.shutdown()
if __name__ == '__main__':
main()

View File

@ -9,16 +9,17 @@
:license: BSD, see LICENSE for more details.
"""
import time
from typing import Optional
from proxy.proxy import Proxy
from proxy.common.utils import wrap_socket
from proxy.core.acceptor import AcceptorPool
from proxy.core.connection import TcpClientConnection
from proxy.common.utils import wrap_socket
from examples.base_server import BaseServerHandler
from proxy.core.base import BaseTcpServerHandler
class EchoSSLServerHandler(BaseServerHandler): # type: ignore
class EchoSSLServerHandler(BaseTcpServerHandler):
"""Wraps client socket during initialization."""
def initialize(self) -> None:
@ -26,17 +27,18 @@ class EchoSSLServerHandler(BaseServerHandler): # type: ignore
# here using wrap_socket() utility.
assert self.flags.keyfile is not None and self.flags.certfile is not None
conn = wrap_socket(
self.client.connection, # type: ignore
self.client.connection,
self.flags.keyfile,
self.flags.certfile)
conn.setblocking(False)
# Upgrade plain TcpClientConnection to SSL connection object
self.client = TcpClientConnection(
conn=conn, addr=self.client.addr) # type: ignore
conn=conn, addr=self.client.addr)
def handle_data(self, data: memoryview) -> None:
def handle_data(self, data: memoryview) -> Optional[bool]:
# echo back to client
self.client.queue(data)
return None
def main() -> None:

View File

@ -9,22 +9,23 @@
:license: BSD, see LICENSE for more details.
"""
import time
from typing import Optional
from proxy.core.acceptor import AcceptorPool
from proxy.proxy import Proxy
from examples.base_server import BaseServerHandler
from proxy.core.acceptor import AcceptorPool
from proxy.core.base import BaseTcpServerHandler
class EchoServerHandler(BaseServerHandler): # type: ignore
class EchoServerHandler(BaseTcpServerHandler):
"""Sets client socket to non-blocking during initialization."""
def initialize(self) -> None:
self.client.connection.setblocking(False)
def handle_data(self, data: memoryview) -> None:
def handle_data(self, data: memoryview) -> Optional[bool]:
# echo back to client
self.client.queue(data)
return None
def main() -> None:

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from .tcp_server import BaseTcpServerHandler
from .tcp_tunnel import BaseTcpTunnelHandler
__all__ = [
'BaseTcpServerHandler',
'BaseTcpTunnelHandler',
]

View File

@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from abc import abstractmethod
import socket
import selectors
from typing import Dict, Any, Optional
from proxy.core.acceptor import Work
from proxy.common.types import Readables, Writables
class BaseTcpServerHandler(Work):
"""BaseTcpServerHandler implements Work interface.
An instance of BaseTcpServerHandler is created for each client
connection. BaseServerHandler lifecycle is controlled by
Threadless core using asyncio.
BaseServerHandler ensures that pending buffers are flushed
before client connection is closed.
Implementations must provide:
a) handle_data(data: memoryview)
c) (optionally) intialize, is_inactive and shutdown methods
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.must_flush_before_shutdown = False
print('Connection accepted from {0}'.format(self.client.addr))
@abstractmethod
def handle_data(self, data: memoryview) -> Optional[bool]:
"""Optionally return True to close client connection."""
pass # pragma: no cover
def get_events(self) -> Dict[socket.socket, int]:
events = {}
# We always want to read from client
# Register for EVENT_READ events
if self.must_flush_before_shutdown is False:
events[self.client.connection] = selectors.EVENT_READ
# If there is pending buffer for client
# also register for EVENT_WRITE events
if self.client.has_buffer():
if self.client.connection in events:
events[self.client.connection] |= selectors.EVENT_WRITE
else:
events[self.client.connection] = selectors.EVENT_WRITE
return events
def handle_events(
self,
readables: Readables,
writables: Writables) -> bool:
"""Return True to shutdown work."""
do_shutdown = False
if self.client.connection in readables:
try:
data = self.client.recv()
if data is None:
# Client closed connection, signal shutdown
print(
'Connection closed by client {0}'.format(
self.client.addr))
do_shutdown = True
else:
r = self.handle_data(data)
if isinstance(r, bool) and r is True:
print(
'Implementation signaled shutdown for client {0}'.format(
self.client.addr))
if self.client.has_buffer():
print(
'Client {0} has pending buffer, will be flushed before shutting down'.format(
self.client.addr))
self.must_flush_before_shutdown = True
else:
do_shutdown = True
except ConnectionResetError:
print(
'Connection reset by client {0}'.format(
self.client.addr))
do_shutdown = True
if self.client.connection in writables:
print('Flushing buffer to client {0}'.format(self.client.addr))
self.client.flush()
if self.must_flush_before_shutdown is True:
do_shutdown = True
self.must_flush_before_shutdown = False
if do_shutdown:
print(
'Shutting down client {0} connection'.format(
self.client.addr))
return do_shutdown

View File

@ -8,36 +8,31 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time
from abc import abstractmethod
import socket
import selectors
from typing import Any, Optional, Dict
from proxy.proxy import Proxy
from proxy.core.acceptor import AcceptorPool
from proxy.core.connection import TcpServerConnection
from proxy.http.parser import HttpParser, httpParserTypes, httpParserStates
from proxy.http.codes import httpStatusCodes
from proxy.http.methods import httpMethods
from proxy.common.types import Readables, Writables
from proxy.common.utils import build_http_response, text_
from ...http.parser import HttpParser, httpParserTypes
from ...common.types import Readables, Writables
from ...common.utils import text_
from examples.base_server import BaseServerHandler
from ..connection import TcpServerConnection
from .tcp_server import BaseTcpServerHandler
class ConnectTunnelHandler(BaseServerHandler): # type: ignore
"""A http CONNECT tunnel server."""
PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT = memoryview(build_http_response(
httpStatusCodes.OK,
reason=b'Connection established'
))
class BaseTcpTunnelHandler(BaseTcpServerHandler):
"""Base TCP tunnel interface."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.request = HttpParser(httpParserTypes.REQUEST_PARSER)
self.upstream: Optional[TcpServerConnection] = None
@abstractmethod
def handle_data(self, data: memoryview) -> Optional[bool]:
pass # pragma: no cover
def initialize(self) -> None:
self.client.connection.setblocking(False)
@ -48,30 +43,6 @@ class ConnectTunnelHandler(BaseServerHandler): # type: ignore
self.upstream.close()
super().shutdown()
def handle_data(self, data: memoryview) -> None:
# Queue for upstream if connection has been established
if self.upstream and self.upstream._conn is not None:
self.upstream.queue(data)
return
# Parse client request
self.request.parse(data)
# Drop the request if not a CONNECT request
if self.request.method != httpMethods.CONNECT:
pass
# CONNECT requests are short and we need not worry about
# receiving partial request bodies here.
assert self.request.state == httpParserStates.COMPLETE
# Establish connection with upstream
self.connect_upstream()
# Queue tunnel established response to client
self.client.queue(
ConnectTunnelHandler.PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT)
def get_events(self) -> Dict[socket.socket, int]:
# Get default client events
ev: Dict[socket.socket, int] = super().get_events()
@ -115,22 +86,3 @@ class ConnectTunnelHandler(BaseServerHandler): # type: ignore
self.upstream.connect()
print('Connection established with upstream {0}:{1}'.format(
text_(self.request.host), self.request.port))
def main() -> None:
# This example requires `threadless=True`
pool = AcceptorPool(
flags=Proxy.initialize(port=12345, num_workers=1, threadless=True),
work_klass=ConnectTunnelHandler)
try:
pool.setup()
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
pool.shutdown()
if __name__ == '__main__':
main()