From 8fdddfd1997847da6da6c9702d19d80a896ae665 Mon Sep 17 00:00:00 2001 From: Abhinav Singh <126065+abhinavsingh@users.noreply.github.com> Date: Sat, 20 Nov 2021 19:42:38 +0530 Subject: [PATCH] Acceptors performance (#767) * Use threads for delegation. Now `run_once` lock before `accept` not `select` * Add support to use master proxy within proxy pool plugin. When used, proxy pool plugin will be a no-op for the master node * Fix acceptor tests now that mask is being used * Use `cached_property` for web server routes * Use `select(timeout=1)` otherwise acceptor wont join if total blocking * mypy, flake, doc spell fixes * R0205: Class `cached_property` inherits from object, can be safely removed from bases in python3 (useless-object-inheritance) --- .gitignore | 1 + .vscode/settings.json | 3 +- Makefile | 4 +- SECURITY.md | 2 +- proxy/common/utils.py | 69 +++++++++++++++++++++++++++ proxy/core/acceptor/acceptor.py | 47 +++++++++++------- proxy/core/acceptor/pool.py | 8 ++-- proxy/http/server/web.py | 62 ++++++++++++------------ proxy/plugin/proxy_pool.py | 33 ++++++++++--- tests/core/test_acceptor.py | 2 +- tests/integration/test_integration.py | 2 +- 11 files changed, 165 insertions(+), 68 deletions(-) diff --git a/.gitignore b/.gitignore index 544c565b..217ce78f 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ dist build proxy/public +profile.svg diff --git a/.vscode/settings.json b/.vscode/settings.json index ccbc392b..05ba4c58 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,5 @@ { "editor.rulers": [100], - "editor.defaultFormatter": "esbenp.prettier-vscode", "editor.formatOnSaveMode": "modifications", "editor.formatOnSave": true, "editor.codeActionsOnSave": { @@ -20,7 +19,7 @@ "typescript.preferences.quoteStyle": "single", "[python]": { "editor.wordBasedSuggestions": true, - "editor.defaultFormatter": "rvest.vs-code-prettier-eslint" + "editor.defaultFormatter": null }, "python.testing.unittestEnabled": false, "python.testing.autoTestDiscoverOnSaveEnabled": true, diff --git a/Makefile b/Makefile index f522d016..4532c00e 100644 --- a/Makefile +++ b/Makefile @@ -124,7 +124,7 @@ lib-doc: $(OPEN) .tox/build-docs/docs_out/index.html lib-coverage: - pytest --cov=proxy --cov=tests --cov-report=html tests/ + pytest --cov=proxy --cov=tests --cov-report=html tests/ && \ $(OPEN) htmlcov/index.html lib-profile: @@ -137,7 +137,7 @@ lib-profile: --disable-http-proxy \ --enable-web-server \ --plugin proxy.plugin.WebServerPlugin \ - --log-file /tmp/proxy.log + --log-file /dev/null devtools: pushd dashboard && npm run devtools && popd diff --git a/SECURITY.md b/SECURITY.md index 606fe61b..a9fbc572 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -12,5 +12,5 @@ Follow these steps: 1. Start by [emailing developers](mailto:mailsforabhinav+proxy@gmail.com) -2. If unresponsive, [create a public issue](https://github.com/abhinavsingh/proxy.py/issues/new/choose) +2. If unresponsive, [create a public issue](https://github.com/abhinavsingh/proxy.py/issues/new/choose) without disclosure about the vulnerability itself. 3. [Pull requests](https://github.com/abhinavsingh/proxy.py/pulls) are always welcome diff --git a/proxy/common/utils.py b/proxy/common/utils.py index b25b6805..f289fc49 100644 --- a/proxy/common/utils.py +++ b/proxy/common/utils.py @@ -17,6 +17,7 @@ """ import sys import ssl +import time import socket import logging import functools @@ -290,3 +291,71 @@ def set_open_file_limit(soft_limit: int) -> None: logger.debug( 'Open file soft limit set to %d', soft_limit, ) + + +class cached_property: + """Decorator for read-only properties evaluated only once within TTL period. + It can be used to create a cached property like this:: + + import random + + # the class containing the property must be a new-style class + class MyClass: + # create property whose value is cached for ten minutes + @cached_property(ttl=600) + def randint(self): + # will only be evaluated every 10 min. at maximum. + return random.randint(0, 100) + + The value is cached in the '_cached_properties' attribute of the object instance that + has the property getter method wrapped by this decorator. The '_cached_properties' + attribute value is a dictionary which has a key for every property of the + object which is wrapped by this decorator. Each entry in the cache is + created only when the property is accessed for the first time and is a + two-element tuple with the last computed property value and the last time + it was updated in seconds since the epoch. + + The default time-to-live (TTL) is 300 seconds (5 minutes). Set the TTL to + zero for the cached value to never expire. + + To expire a cached property value manually just do:: + del instance._cached_properties[] + + Adopted from https://wiki.python.org/moin/PythonDecoratorLibrary#Cached_Properties + © 2011 Christopher Arndt, MIT License. + + NOTE: We need this function only because Python in-built are only available + for 3.8+. Hence, we must get rid of this function once proxy.py no longer + support version older than 3.8. + + .. spelling:: + + getter + Arndt + """ + + def __init__(self, ttl: float = 300.0): + self.ttl = ttl + + def __call__(self, fget: Any, doc: Any = None) -> 'cached_property': + self.fget = fget + self.__doc__ = doc or fget.__doc__ + self.__name__ = fget.__name__ + self.__module__ = fget.__module__ + return self + + def __get__(self, inst: Any, owner: Any) -> Any: + now = time.time() + try: + value, last_update = inst._cached_properties[self.__name__] + if self.ttl > 0 and now - last_update > self.ttl: # noqa: WPS333 + raise AttributeError + except (KeyError, AttributeError): + value = self.fget(inst) + try: + cache = inst._cached_properties + except AttributeError: + cache, inst._cached_properties = {}, {} + finally: + cache[self.__name__] = (value, now) + return value diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 64b4d067..164fdba0 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -16,6 +16,7 @@ import socket import logging import argparse +import threading import selectors import multiprocessing import multiprocessing.synchronize @@ -29,7 +30,6 @@ from proxy.core.acceptor.executors import ThreadlessPool from ..event import EventQueue -from ...common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT from ...common.utils import is_threadless from ...common.logger import Logger @@ -87,8 +87,7 @@ class Acceptor(multiprocessing.Process): self.event_queue = event_queue # Index assigned by `AcceptorPool` self.idd = idd - # Lock shared by all acceptor processes - # to avoid concurrent accept over server socket + # Mutex used for synchronization with acceptors self.lock = lock # Queue over which server socket fd is received on start-up self.fd_queue: connection.Connection = fd_queue @@ -106,14 +105,25 @@ class Acceptor(multiprocessing.Process): self._total: int = 0 def run_once(self) -> None: - with self.lock: - assert self.selector and self.sock - events = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT) + if self.selector is not None: + events = self.selector.select(timeout=1) if len(events) == 0: return - conn, addr = self.sock.accept() - addr = None if addr == '' else addr - self._work(conn, addr) + locked = False + try: + if self.lock.acquire(block=False): + locked = True + for _, mask in events: + if mask & selectors.EVENT_READ: + if self.sock is not None: + conn, addr = self.sock.accept() + addr = None if addr == '' else addr + self._work(conn, addr) + except BlockingIOError: + pass + finally: + if locked: + self.lock.release() def run(self) -> None: Logger.setup( @@ -134,7 +144,6 @@ class Acceptor(multiprocessing.Process): try: self.selector.register(self.sock, selectors.EVENT_READ) while not self.running.is_set(): - # logger.debug('Looking for new work') self.run_once() except KeyboardInterrupt: pass @@ -151,14 +160,18 @@ class Acceptor(multiprocessing.Process): # By default all acceptors will start sending work to # 1st workers. To randomize, we offset index by idd. index = (self._total + self.idd) % self.flags.num_workers - ThreadlessPool.delegate( - self.executor_pids[index], - self.executor_queues[index], - self.executor_locks[index], - conn, - addr, - self.flags.unix_socket_path, + thread = threading.Thread( + target=ThreadlessPool.delegate, + args=( + self.executor_pids[index], + self.executor_queues[index], + self.executor_locks[index], + conn, + addr, + self.flags.unix_socket_path, + ), ) + thread.start() logger.debug( 'Dispatched work#{0}.{1} to worker#{2}'.format( self.idd, self._total, index, diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 9114bcfb..dd110d32 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -31,10 +31,6 @@ from ..event import EventQueue from ...common.flag import flags from ...common.constants import DEFAULT_NUM_ACCEPTORS -# Lock shared by acceptors for -# sequential acceptance of work. -LOCK = multiprocessing.Lock() - logger = logging.getLogger(__name__) @@ -88,6 +84,8 @@ class AcceptorPool: self.acceptors: List[Acceptor] = [] # Fd queues used to share file descriptor with acceptor processes self.fd_queues: List[connection.Connection] = [] + # Internals + self.lock = multiprocessing.Lock() def __enter__(self) -> 'AcceptorPool': self.setup() @@ -126,7 +124,7 @@ class AcceptorPool: idd=acceptor_id, fd_queue=work_queue[1], flags=self.flags, - lock=LOCK, + lock=self.lock, event_queue=self.event_queue, executor_queues=self.executor_queues, executor_pids=self.executor_pids, diff --git a/proxy/http/server/web.py b/proxy/http/server/web.py index 513dbf55..d9ff773d 100644 --- a/proxy/http/server/web.py +++ b/proxy/http/server/web.py @@ -25,6 +25,7 @@ from ...common.constants import DEFAULT_STATIC_SERVER_DIR, PROXY_AGENT_HEADER_VA from ...common.constants import DEFAULT_ENABLE_STATIC_SERVER, DEFAULT_ENABLE_WEB_SERVER from ...common.constants import DEFAULT_MIN_COMPRESSION_LIMIT, DEFAULT_WEB_ACCESS_LOG_FORMAT from ...common.utils import bytes_, text_, build_http_response, build_websocket_handshake_response +from ...common.utils import cached_property from ...common.types import Readables, Writables from ...common.flag import flags @@ -110,11 +111,6 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin): self.start_time: float = time.time() self.pipeline_request: Optional[HttpParser] = None self.switched_protocol: Optional[int] = None - self.routes: Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]] = { - httpProtocolTypes.HTTP: {}, - httpProtocolTypes.HTTPS: {}, - httpProtocolTypes.WEBSOCKET: {}, - } self.route: Optional[HttpWebServerBasePlugin] = None self.plugins: Dict[str, HttpWebServerBasePlugin] = {} @@ -127,8 +123,18 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin): self.event_queue, ) self.plugins[instance.name()] = instance - for (protocol, route) in instance.routes(): - self.routes[protocol][re.compile(route)] = instance + + @cached_property(ttl=0) + def routes(self) -> Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]]: + r: Dict[int, Dict[Pattern[str], HttpWebServerBasePlugin]] = { + httpProtocolTypes.HTTP: {}, + httpProtocolTypes.HTTPS: {}, + httpProtocolTypes.WEBSOCKET: {}, + } + for name in self.plugins: + for (protocol, route) in self.plugins[name].routes(): + r[protocol][re.compile(route)] = self.plugins[name] + return r def encryption_enabled(self) -> bool: return self.flags.keyfile is not None and \ @@ -186,42 +192,35 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin): def on_request_complete(self) -> Union[socket.socket, bool]: if self.request.has_host(): return False - path = self.request.path or b'/' - - # If a websocket route exists for the path, try upgrade - for route in self.routes[httpProtocolTypes.WEBSOCKET]: - match = route.match(text_(path)) - if match: - self.route = self.routes[httpProtocolTypes.WEBSOCKET][route] - - # Connection upgrade - teardown = self.try_upgrade() - if teardown: - return True - - # For upgraded connections, nothing more to do - if self.switched_protocol: - # Invoke plugin.on_websocket_open - self.route.on_websocket_open() - return False - - break - # Routing for Http(s) requests protocol = httpProtocolTypes.HTTPS \ if self.encryption_enabled() else \ httpProtocolTypes.HTTP for route in self.routes[protocol]: - match = route.match(text_(path)) - if match: + if route.match(text_(path)): self.route = self.routes[protocol][route] + assert self.route self.route.handle_request(self.request) if self.request.has_header(b'connection') and \ self.request.header(b'connection').lower() == b'close': return True return False - + # If a websocket route exists for the path, try upgrade + for route in self.routes[httpProtocolTypes.WEBSOCKET]: + if route.match(text_(path)): + self.route = self.routes[httpProtocolTypes.WEBSOCKET][route] + # Connection upgrade + teardown = self.try_upgrade() + if teardown: + return True + # For upgraded connections, nothing more to do + if self.switched_protocol: + # Invoke plugin.on_websocket_open + assert self.route + self.route.on_websocket_open() + return False + break # No-route found, try static serving if enabled if self.flags.enable_static_server: path = text_(path).split('?')[0] @@ -232,7 +231,6 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin): ), ) return True - # Catch all unhandled web server requests, return 404 self.client.queue(self.DEFAULT_404_RESPONSE) return True diff --git a/proxy/plugin/proxy_pool.py b/proxy/plugin/proxy_pool.py index 02278671..7ae7372c 100644 --- a/proxy/plugin/proxy_pool.py +++ b/proxy/plugin/proxy_pool.py @@ -39,6 +39,11 @@ DEFAULT_HTTPS_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \ # on port 9000 and 9001 BUT WITHOUT ProxyPool plugin # to avoid infinite loops. DEFAULT_PROXY_POOL: List[str] = [ + # Yes you may use the instance running with ProxyPoolPlugin itself. + # ProxyPool plugin will act as a no-op. + # 'localhost:8899', + # + # Remote proxies # 'localhost:9000', # 'localhost:9001', ] @@ -83,11 +88,20 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin): # # Implement your own logic here e.g. round-robin, least connection etc. endpoint = random.choice(self.flags.proxy_pool)[0].split(':') + if endpoint[0] == 'localhost' and endpoint[1] == '8899': + return request logger.debug('Using endpoint: {0}:{1}'.format(*endpoint)) self.initialize_upstream(endpoint[0], int(endpoint[1])) assert self.upstream try: self.upstream.connect() + except TimeoutError: + logger.info( + 'Timed out connecting to upstream proxy {0}:{1}'.format( + *endpoint, + ), + ) + raise HttpProtocolException() except ConnectionRefusedError: # TODO(abhinavsingh): Try another choice, when all (or max configured) choices have # exhausted, retry for configured number of times before giving up. @@ -113,6 +127,8 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin): self, request: HttpParser, ) -> Optional[HttpParser]: """Only invoked once after client original proxy request has been received completely.""" + if not self.upstream: + return request assert self.upstream # For log sanity (i.e. to avoid None:None), expose upstream host:port from headers host, port = None, None @@ -143,6 +159,12 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin): self.upstream.queue(raw) return raw + def handle_upstream_chunk(self, chunk: memoryview) -> memoryview: + """Will never be called since we didn't establish an upstream connection.""" + if not self.upstream: + return chunk + raise Exception("This should have never been called") + def on_upstream_connection_close(self) -> None: """Called when client connection has been closed.""" if self.upstream and not self.upstream.closed: @@ -151,9 +173,10 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin): self.upstream = None def on_access_log(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: - addr, port = ( - self.upstream.addr[0], self.upstream.addr[1], - ) if self.upstream else (None, None) + if not self.upstream: + return context + addr, port = (self.upstream.addr[0], self.upstream.addr[1]) \ + if self.upstream else (None, None) context.update({ 'upstream_proxy_host': addr, 'upstream_proxy_port': port, @@ -171,7 +194,3 @@ class ProxyPoolPlugin(TcpUpstreamConnectionHandler, HttpProxyBasePlugin): if request_method and request_method != httpMethods.CONNECT: access_log_format = DEFAULT_HTTP_ACCESS_LOG_FORMAT logger.info(access_log_format.format_map(log_attrs)) - - def handle_upstream_chunk(self, chunk: memoryview) -> memoryview: - """Will never be called since we didn't establish an upstream connection.""" - raise Exception("This should have never been called") diff --git a/tests/core/test_acceptor.py b/tests/core/test_acceptor.py index 48ee7a43..a923a973 100644 --- a/tests/core/test_acceptor.py +++ b/tests/core/test_acceptor.py @@ -84,7 +84,7 @@ class TestAcceptor(unittest.TestCase): mock_thread.return_value.start.side_effect = KeyboardInterrupt() selector = mock_selector.return_value - selector.select.return_value = [(None, None)] + selector.select.return_value = [(None, selectors.EVENT_READ)] self.acceptor.run() diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 180f3491..c1195b98 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -24,7 +24,7 @@ def _proxy_py_instance() -> Generator[None, None, None]: After the testing is over, tear it down. """ proxy_cmd = ( - 'proxy', + 'python', '-m', 'proxy', '--hostname', '127.0.0.1', '--port', str(PROXY_PY_PORT), '--enable-web-server',