Acceptors performance (#767)

* Use threads for delegation. Now `run_once` lock before `accept` not `select`

* Add support to use master proxy within proxy pool plugin.  When used, proxy pool plugin will be a no-op for the master node

* Fix acceptor tests now that mask is being used

* Use `cached_property` for web server routes

* Use `select(timeout=1)` otherwise acceptor wont join if total blocking

* mypy, flake, doc spell fixes

* R0205: Class `cached_property` inherits from object, can be safely removed from bases in python3 (useless-object-inheritance)
This commit is contained in:
Abhinav Singh 2021-11-20 19:42:38 +05:30 committed by GitHub
parent 27f85038b1
commit 8fdddfd199
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 165 additions and 68 deletions

1
.gitignore vendored
View File

@ -30,3 +30,4 @@ dist
build
proxy/public
profile.svg

View File

@ -1,6 +1,5 @@
{
"editor.rulers": [100],
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.formatOnSaveMode": "modifications",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
@ -20,7 +19,7 @@
"typescript.preferences.quoteStyle": "single",
"[python]": {
"editor.wordBasedSuggestions": true,
"editor.defaultFormatter": "rvest.vs-code-prettier-eslint"
"editor.defaultFormatter": null
},
"python.testing.unittestEnabled": false,
"python.testing.autoTestDiscoverOnSaveEnabled": true,

View File

@ -124,7 +124,7 @@ lib-doc:
$(OPEN) .tox/build-docs/docs_out/index.html
lib-coverage:
pytest --cov=proxy --cov=tests --cov-report=html tests/
pytest --cov=proxy --cov=tests --cov-report=html tests/ && \
$(OPEN) htmlcov/index.html
lib-profile:
@ -137,7 +137,7 @@ lib-profile:
--disable-http-proxy \
--enable-web-server \
--plugin proxy.plugin.WebServerPlugin \
--log-file /tmp/proxy.log
--log-file /dev/null
devtools:
pushd dashboard && npm run devtools && popd

View File

@ -12,5 +12,5 @@
Follow these steps:
1. Start by [emailing developers](mailto:mailsforabhinav+proxy@gmail.com)
2. If unresponsive, [create a public issue](https://github.com/abhinavsingh/proxy.py/issues/new/choose)
2. If unresponsive, [create a public issue](https://github.com/abhinavsingh/proxy.py/issues/new/choose) without disclosure about the vulnerability itself.
3. [Pull requests](https://github.com/abhinavsingh/proxy.py/pulls) are always welcome

View File

@ -17,6 +17,7 @@
"""
import sys
import ssl
import time
import socket
import logging
import functools
@ -290,3 +291,71 @@ def set_open_file_limit(soft_limit: int) -> None:
logger.debug(
'Open file soft limit set to %d', soft_limit,
)
class cached_property:
"""Decorator for read-only properties evaluated only once within TTL period.
It can be used to create a cached property like this::
import random
# the class containing the property must be a new-style class
class MyClass:
# create property whose value is cached for ten minutes
@cached_property(ttl=600)
def randint(self):
# will only be evaluated every 10 min. at maximum.
return random.randint(0, 100)
The value is cached in the '_cached_properties' attribute of the object instance that
has the property getter method wrapped by this decorator. The '_cached_properties'
attribute value is a dictionary which has a key for every property of the
object which is wrapped by this decorator. Each entry in the cache is
created only when the property is accessed for the first time and is a
two-element tuple with the last computed property value and the last time
it was updated in seconds since the epoch.
The default time-to-live (TTL) is 300 seconds (5 minutes). Set the TTL to
zero for the cached value to never expire.
To expire a cached property value manually just do::
del instance._cached_properties[<property name>]
Adopted from https://wiki.python.org/moin/PythonDecoratorLibrary#Cached_Properties
© 2011 Christopher Arndt, MIT License.
NOTE: We need this function only because Python in-built are only available
for 3.8+. Hence, we must get rid of this function once proxy.py no longer
support version older than 3.8.
.. spelling::
getter
Arndt
"""
def __init__(self, ttl: float = 300.0):
self.ttl = ttl
def __call__(self, fget: Any, doc: Any = None) -> 'cached_property':
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
self.__module__ = fget.__module__
return self
def __get__(self, inst: Any, owner: Any) -> Any:
now = time.time()
try:
value, last_update = inst._cached_properties[self.__name__]
if self.ttl > 0 and now - last_update > self.ttl: # noqa: WPS333
raise AttributeError
except (KeyError, AttributeError):
value = self.fget(inst)
try:
cache = inst._cached_properties
except AttributeError:
cache, inst._cached_properties = {}, {}
finally:
cache[self.__name__] = (value, now)
return value

View File

@ -16,6 +16,7 @@
import socket
import logging
import argparse
import threading
import selectors
import multiprocessing
import multiprocessing.synchronize
@ -29,7 +30,6 @@ from proxy.core.acceptor.executors import ThreadlessPool
from ..event import EventQueue
from ...common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT
from ...common.utils import is_threadless
from ...common.logger import Logger
@ -87,8 +87,7 @@ class Acceptor(multiprocessing.Process):
self.event_queue = event_queue
# Index assigned by `AcceptorPool`
self.idd = idd
# Lock shared by all acceptor processes
# to avoid concurrent accept over server socket
# Mutex used for synchronization with acceptors
self.lock = lock
# Queue over which server socket fd is received on start-up
self.fd_queue: connection.Connection = fd_queue
@ -106,14 +105,25 @@ class Acceptor(multiprocessing.Process):
self._total: int = 0
def run_once(self) -> None:
with self.lock:
assert self.selector and self.sock
events = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT)
if self.selector is not None:
events = self.selector.select(timeout=1)
if len(events) == 0:
return
locked = False
try:
if self.lock.acquire(block=False):
locked = True
for _, mask in events:
if mask & selectors.EVENT_READ:
if self.sock is not None:
conn, addr = self.sock.accept()
addr = None if addr == '' else addr
self._work(conn, addr)
except BlockingIOError:
pass
finally:
if locked:
self.lock.release()
def run(self) -> None:
Logger.setup(
@ -134,7 +144,6 @@ class Acceptor(multiprocessing.Process):
try:
self.selector.register(self.sock, selectors.EVENT_READ)
while not self.running.is_set():
# logger.debug('Looking for new work')
self.run_once()
except KeyboardInterrupt:
pass
@ -151,14 +160,18 @@ class Acceptor(multiprocessing.Process):
# By default all acceptors will start sending work to
# 1st workers. To randomize, we offset index by idd.
index = (self._total + self.idd) % self.flags.num_workers
ThreadlessPool.delegate(
thread = threading.Thread(
target=ThreadlessPool.delegate,
args=(
self.executor_pids[index],
self.executor_queues[index],
self.executor_locks[index],
conn,
addr,
self.flags.unix_socket_path,
),
)
thread.start()
logger.debug(
'Dispatched work#{0}.{1} to worker#{2}'.format(
self.idd, self._total, index,

View File

@ -31,10 +31,6 @@ from ..event import EventQueue
from ...common.flag import flags
from ...common.constants import DEFAULT_NUM_ACCEPTORS
# Lock shared by acceptors for
# sequential acceptance of work.
LOCK = multiprocessing.Lock()
logger = logging.getLogger(__name__)
@ -88,6 +84,8 @@ class AcceptorPool:
self.acceptors: List[Acceptor] = []
# Fd queues used to share file descriptor with acceptor processes
self.fd_queues: List[connection.Connection] = []
# Internals
self.lock = multiprocessing.Lock()
def __enter__(self) -> 'AcceptorPool':
self.setup()
@ -126,7 +124,7 @@ class AcceptorPool:
idd=acceptor_id,
fd_queue=work_queue[1],
flags=self.flags,
lock=LOCK,
lock=self.lock,
event_queue=self.event_queue,
executor_queues=self.executor_queues,
executor_pids=self.executor_pids,

View File

@ -25,6 +25,7 @@ from ...common.constants import DEFAULT_STATIC_SERVER_DIR, PROXY_AGENT_HEADER_VA
from ...common.constants import DEFAULT_ENABLE_STATIC_SERVER, DEFAULT_ENABLE_WEB_SERVER
from ...common.constants import DEFAULT_MIN_COMPRESSION_LIMIT, DEFAULT_WEB_ACCESS_LOG_FORMAT
from ...common.utils import bytes_, text_, build_http_response, build_websocket_handshake_response
from ...common.utils import cached_property
from ...common.types import Readables, Writables
from ...common.flag import flags
@ -110,11 +111,6 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
self.start_time: float = time.time()
self.pipeline_request: Optional[HttpParser] = None
self.switched_protocol: Optional[int] = None
self.routes: Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]] = {
httpProtocolTypes.HTTP: {},
httpProtocolTypes.HTTPS: {},
httpProtocolTypes.WEBSOCKET: {},
}
self.route: Optional[HttpWebServerBasePlugin] = None
self.plugins: Dict[str, HttpWebServerBasePlugin] = {}
@ -127,8 +123,18 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
self.event_queue,
)
self.plugins[instance.name()] = instance
for (protocol, route) in instance.routes():
self.routes[protocol][re.compile(route)] = instance
@cached_property(ttl=0)
def routes(self) -> Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]]:
r: Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]] = {
httpProtocolTypes.HTTP: {},
httpProtocolTypes.HTTPS: {},
httpProtocolTypes.WEBSOCKET: {},
}
for name in self.plugins:
for (protocol, route) in self.plugins[name].routes():
r[protocol][re.compile(route)] = self.plugins[name]
return r
def encryption_enabled(self) -> bool:
return self.flags.keyfile is not None and \
@ -186,42 +192,35 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
def on_request_complete(self) -> Union[socket.socket, bool]:
if self.request.has_host():
return False
path = self.request.path or b'/'
# If a websocket route exists for the path, try upgrade
for route in self.routes[httpProtocolTypes.WEBSOCKET]:
match = route.match(text_(path))
if match:
self.route = self.routes[httpProtocolTypes.WEBSOCKET][route]
# Connection upgrade
teardown = self.try_upgrade()
if teardown:
return True
# For upgraded connections, nothing more to do
if self.switched_protocol:
# Invoke plugin.on_websocket_open
self.route.on_websocket_open()
return False
break
# Routing for Http(s) requests
protocol = httpProtocolTypes.HTTPS \
if self.encryption_enabled() else \
httpProtocolTypes.HTTP
for route in self.routes[protocol]:
match = route.match(text_(path))
if match:
if route.match(text_(path)):
self.route = self.routes[protocol][route]
assert self.route
self.route.handle_request(self.request)
if self.request.has_header(b'connection') and \
self.request.header(b'connection').lower() == b'close':
return True
return False
# If a websocket route exists for the path, try upgrade
for route in self.routes[httpProtocolTypes.WEBSOCKET]:
if route.match(text_(path)):
self.route = self.routes[httpProtocolTypes.WEBSOCKET][route]
# Connection upgrade
teardown = self.try_upgrade()
if teardown:
return True
# For upgraded connections, nothing more to do
if self.switched_protocol:
# Invoke plugin.on_websocket_open
assert self.route
self.route.on_websocket_open()
return False
break
# No-route found, try static serving if enabled
if self.flags.enable_static_server:
path = text_(path).split('?')[0]
@ -232,7 +231,6 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
),
)
return True
# Catch all unhandled web server requests, return 404
self.client.queue(self.DEFAULT_404_RESPONSE)
return True

View File

@ -39,6 +39,11 @@ DEFAULT_HTTPS_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
# on port 9000 and 9001 BUT WITHOUT ProxyPool plugin
# to avoid infinite loops.
DEFAULT_PROXY_POOL: List[str] = [
# Yes you may use the instance running with ProxyPoolPlugin itself.
# ProxyPool plugin will act as a no-op.
# 'localhost:8899',
#
# Remote proxies
# 'localhost:9000',
# 'localhost:9001',
]
@ -83,11 +88,20 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
#
# Implement your own logic here e.g. round-robin, least connection etc.
endpoint = random.choice(self.flags.proxy_pool)[0].split(':')
if endpoint[0] == 'localhost' and endpoint[1] == '8899':
return request
logger.debug('Using endpoint: {0}:{1}'.format(*endpoint))
self.initialize_upstream(endpoint[0], int(endpoint[1]))
assert self.upstream
try:
self.upstream.connect()
except TimeoutError:
logger.info(
'Timed out connecting to upstream proxy {0}:{1}'.format(
*endpoint,
),
)
raise HttpProtocolException()
except ConnectionRefusedError:
# TODO(abhinavsingh): Try another choice, when all (or max configured) choices have
# exhausted, retry for configured number of times before giving up.
@ -113,6 +127,8 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
self, request: HttpParser,
) -> Optional[HttpParser]:
"""Only invoked once after client original proxy request has been received completely."""
if not self.upstream:
return request
assert self.upstream
# For log sanity (i.e. to avoid None:None), expose upstream host:port from headers
host, port = None, None
@ -143,6 +159,12 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
self.upstream.queue(raw)
return raw
def handle_upstream_chunk(self, chunk: memoryview) -> memoryview:
"""Will never be called since we didn't establish an upstream connection."""
if not self.upstream:
return chunk
raise Exception("This should have never been called")
def on_upstream_connection_close(self) -> None:
"""Called when client connection has been closed."""
if self.upstream and not self.upstream.closed:
@ -151,9 +173,10 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
self.upstream = None
def on_access_log(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
addr, port = (
self.upstream.addr[0], self.upstream.addr[1],
) if self.upstream else (None, None)
if not self.upstream:
return context
addr, port = (self.upstream.addr[0], self.upstream.addr[1]) \
if self.upstream else (None, None)
context.update({
'upstream_proxy_host': addr,
'upstream_proxy_port': port,
@ -171,7 +194,3 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin):
if request_method and request_method != httpMethods.CONNECT:
access_log_format = DEFAULT_HTTP_ACCESS_LOG_FORMAT
logger.info(access_log_format.format_map(log_attrs))
def handle_upstream_chunk(self, chunk: memoryview) -> memoryview:
"""Will never be called since we didn't establish an upstream connection."""
raise Exception("This should have never been called")

View File

@ -84,7 +84,7 @@ class TestAcceptor(unittest.TestCase):
mock_thread.return_value.start.side_effect = KeyboardInterrupt()
selector = mock_selector.return_value
selector.select.return_value = [(None, None)]
selector.select.return_value = [(None, selectors.EVENT_READ)]
self.acceptor.run()

View File

@ -24,7 +24,7 @@ def _proxy_py_instance() -> Generator[None, None, None]:
After the testing is over, tear it down.
"""
proxy_cmd = (
'proxy',
'python', '-m', 'proxy',
'--hostname', '127.0.0.1',
'--port', str(PROXY_PY_PORT),
'--enable-web-server',