Prometheus Metrics (#1447)
* Add metrics server endpoint * Setup metrics subscriber * `MetricsSubscriber` as context manager * Fix lint issues * `--enable-metrics` flag which setup Metrics subscriber, collector and web endpoint * Use file storage based mechanism to share internal metrics with prometheus exporter endpoint * Lint fixes * Move `_setup_metrics_directory` within subscriber which only run once * Use global `metrics_lock` via flags * Remove top-level imports for prometheus_client * Add `requirements-metrics.txt` * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix typo in makefile * Fix typo * fix type, lint, flake issues * Remove event queue prop * Fix typo * Give any role to `proxy.http.server.metrics.get_collector` * rtype * `emit_request_complete` for web servers * Fix doc issues * Refactor * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Rename metrics to start with proxypy_work_ * Startup `MetricsEventSubscriber` as part of proxy --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
71f3c658e8
commit
091ba361ef
|
@ -163,6 +163,8 @@ repos:
|
|||
- cryptography==36.0.2; python_version <= '3.6'
|
||||
- types-setuptools == 57.4.2
|
||||
- pyyaml==5.3.1
|
||||
# From requirements-metrics.txt
|
||||
- prometheus_client==0.20.0
|
||||
args:
|
||||
# FIXME: get rid of missing imports ignore
|
||||
- --ignore-missing-imports
|
||||
|
|
|
@ -37,5 +37,6 @@ python:
|
|||
path: .
|
||||
- requirements: requirements-tunnel.txt
|
||||
- requirements: docs/requirements.txt
|
||||
- requirements: requirements-metrics.txt
|
||||
|
||||
...
|
||||
|
|
3
Makefile
3
Makefile
|
@ -104,7 +104,8 @@ lib-dep:
|
|||
pip install \
|
||||
-r requirements-testing.txt \
|
||||
-r requirements-release.txt \
|
||||
-r requirements-tunnel.txt && \
|
||||
-r requirements-tunnel.txt \
|
||||
-r requirements-metrics.txt && \
|
||||
pip install "setuptools>=42"
|
||||
|
||||
lib-pre-commit:
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#
|
||||
# This file is autogenerated by pip-compile with Python 3.10
|
||||
# This file is autogenerated by pip-compile with Python 3.11
|
||||
# by the following command:
|
||||
#
|
||||
# pip-compile --allow-unsafe --generate-hashes --output-file=docs/requirements.txt --strip-extras docs/requirements.in
|
||||
|
|
|
@ -137,6 +137,8 @@ DEFAULT_NUM_WORKERS = 0
|
|||
DEFAULT_OPEN_FILE_LIMIT = 1024
|
||||
DEFAULT_PAC_FILE = None
|
||||
DEFAULT_PAC_FILE_URL_PATH = b'/'
|
||||
DEFAULT_ENABLE_METRICS = False
|
||||
DEFAULT_METRICS_URL_PATH = b"/metrics"
|
||||
DEFAULT_PID_FILE = None
|
||||
DEFAULT_PORT_FILE = None
|
||||
DEFAULT_PLUGINS: List[Any] = []
|
||||
|
@ -172,6 +174,7 @@ DEFAULT_CACHE_DIRECTORY_PATH = os.path.join(
|
|||
)
|
||||
DEFAULT_CACHE_REQUESTS = False
|
||||
DEFAULT_CACHE_BY_CONTENT_TYPE = False
|
||||
DEFAULT_METRICS_DIRECTORY_PATH = os.path.join(DEFAULT_DATA_DIRECTORY_PATH, "metrics")
|
||||
|
||||
# Cor plugins enabled by default or via flags
|
||||
DEFAULT_ABC_PLUGINS = [
|
||||
|
@ -190,6 +193,7 @@ PLUGIN_PAC_FILE = 'proxy.http.server.HttpWebServerPacFilePlugin'
|
|||
PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.devtools.DevtoolsProtocolPlugin'
|
||||
PLUGIN_INSPECT_TRAFFIC = 'proxy.http.inspector.inspect_traffic.InspectTrafficPlugin'
|
||||
PLUGIN_WEBSOCKET_TRANSPORT = 'proxy.http.websocket.transport.WebSocketTransport'
|
||||
PLUGIN_METRICS = "proxy.http.server.MetricsWebServerPlugin"
|
||||
|
||||
PY2_DEPRECATION_MESSAGE = '''DEPRECATION: proxy.py no longer supports Python 2.7. Kindly upgrade to Python 3+. '
|
||||
'If for some reasons you cannot upgrade, use'
|
||||
|
|
|
@ -24,12 +24,13 @@ from .logger import Logger
|
|||
from .plugins import Plugins
|
||||
from .version import __version__
|
||||
from .constants import (
|
||||
COMMA, PLUGIN_PAC_FILE, PLUGIN_DASHBOARD, PLUGIN_HTTP_PROXY,
|
||||
PLUGIN_PROXY_AUTH, PLUGIN_WEB_SERVER, DEFAULT_NUM_WORKERS,
|
||||
PLUGIN_REVERSE_PROXY, DEFAULT_NUM_ACCEPTORS, PLUGIN_INSPECT_TRAFFIC,
|
||||
DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE, DEFAULT_DEVTOOLS_WS_PATH,
|
||||
PLUGIN_DEVTOOLS_PROTOCOL, PLUGIN_WEBSOCKET_TRANSPORT,
|
||||
DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_MIN_COMPRESSION_LENGTH,
|
||||
COMMA, PLUGIN_METRICS, PLUGIN_PAC_FILE, PLUGIN_DASHBOARD,
|
||||
PLUGIN_HTTP_PROXY, PLUGIN_PROXY_AUTH, PLUGIN_WEB_SERVER,
|
||||
DEFAULT_NUM_WORKERS, PLUGIN_REVERSE_PROXY, DEFAULT_NUM_ACCEPTORS,
|
||||
PLUGIN_INSPECT_TRAFFIC, DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE,
|
||||
DEFAULT_DEVTOOLS_WS_PATH, PLUGIN_DEVTOOLS_PROTOCOL,
|
||||
PLUGIN_WEBSOCKET_TRANSPORT, DEFAULT_DATA_DIRECTORY_PATH,
|
||||
DEFAULT_MIN_COMPRESSION_LENGTH,
|
||||
)
|
||||
|
||||
|
||||
|
@ -182,6 +183,13 @@ class FlagParser:
|
|||
args.enable_events,
|
||||
),
|
||||
)
|
||||
args.enable_metrics = cast(
|
||||
bool,
|
||||
opts.get(
|
||||
'enable_metrics',
|
||||
args.enable_metrics,
|
||||
),
|
||||
)
|
||||
|
||||
# Load default plugins along with user provided --plugins
|
||||
default_plugins = [
|
||||
|
@ -195,6 +203,9 @@ class FlagParser:
|
|||
default_plugins + auth_plugins + requested_plugins,
|
||||
)
|
||||
|
||||
if bytes_(PLUGIN_METRICS) in default_plugins:
|
||||
args.metrics_lock = multiprocessing.Lock()
|
||||
|
||||
# https://github.com/python/mypy/issues/5865
|
||||
#
|
||||
# def option(t: object, key: str, default: Any) -> Any:
|
||||
|
@ -422,6 +433,10 @@ class FlagParser:
|
|||
default_plugins.append(PLUGIN_INSPECT_TRAFFIC)
|
||||
args.enable_events = True
|
||||
args.enable_devtools = True
|
||||
if hasattr(args, 'enable_metrics') and args.enable_metrics:
|
||||
default_plugins.append(PLUGIN_WEB_SERVER)
|
||||
default_plugins.append(PLUGIN_METRICS)
|
||||
args.enable_events = True
|
||||
if hasattr(args, 'enable_devtools') and args.enable_devtools:
|
||||
default_plugins.append(PLUGIN_DEVTOOLS_PROTOCOL)
|
||||
default_plugins.append(PLUGIN_WEB_SERVER)
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
proxy.py
|
||||
~~~~~~~~
|
||||
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
|
||||
Network monitoring, controls & Application development, testing, debugging.
|
||||
|
||||
:copyright: (c) 2013-present by Abhinav Singh and contributors.
|
||||
:license: BSD, see LICENSE for more details.
|
||||
"""
|
||||
import os
|
||||
import glob
|
||||
from typing import Any, Dict
|
||||
from pathlib import Path
|
||||
from multiprocessing.synchronize import Lock
|
||||
|
||||
from ...core.event import EventQueue, EventSubscriber, eventNames
|
||||
from ...common.constants import DEFAULT_METRICS_DIRECTORY_PATH
|
||||
|
||||
|
||||
class MetricsStorage:
|
||||
|
||||
def __init__(self, lock: Lock) -> None:
|
||||
self._lock = lock
|
||||
|
||||
def get_counter(self, name: str) -> float:
|
||||
with self._lock:
|
||||
return self._get_counter(name)
|
||||
|
||||
def _get_counter(self, name: str) -> float:
|
||||
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.counter')
|
||||
if not os.path.exists(path):
|
||||
return 0
|
||||
return float(Path(path).read_text(encoding='utf-8').strip())
|
||||
|
||||
def incr_counter(self, name: str, by: float = 1.0) -> None:
|
||||
with self._lock:
|
||||
self._incr_counter(name, by)
|
||||
|
||||
def _incr_counter(self, name: str, by: float = 1.0) -> None:
|
||||
current = self._get_counter(name)
|
||||
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.counter')
|
||||
Path(path).write_text(str(current + by), encoding='utf-8')
|
||||
|
||||
def get_gauge(self, name: str) -> float:
|
||||
with self._lock:
|
||||
return self._get_gauge(name)
|
||||
|
||||
def _get_gauge(self, name: str) -> float:
|
||||
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.gauge')
|
||||
if not os.path.exists(path):
|
||||
return 0
|
||||
return float(Path(path).read_text(encoding='utf-8').strip())
|
||||
|
||||
def set_gauge(self, name: str, value: float) -> None:
|
||||
"""Stores a single values."""
|
||||
with self._lock:
|
||||
self._set_gauge(name, value)
|
||||
|
||||
def _set_gauge(self, name: str, value: float) -> None:
|
||||
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.gauge')
|
||||
with open(path, 'w', encoding='utf-8') as g:
|
||||
g.write(str(value))
|
||||
|
||||
|
||||
class MetricsEventSubscriber:
|
||||
|
||||
def __init__(self, event_queue: EventQueue, metrics_lock: Lock) -> None:
|
||||
"""Aggregates metric events pushed by proxy.py core and plugins.
|
||||
|
||||
1) Metrics are stored and managed by multiprocessing safe MetricsStorage
|
||||
2) Collection must be done via MetricsWebServerPlugin endpoint
|
||||
"""
|
||||
self.storage = MetricsStorage(metrics_lock)
|
||||
self.subscriber = EventSubscriber(
|
||||
event_queue,
|
||||
callback=lambda event: MetricsEventSubscriber.callback(self.storage, event),
|
||||
)
|
||||
|
||||
def setup(self) -> None:
|
||||
self._setup_metrics_directory()
|
||||
self.subscriber.setup()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self.subscriber.shutdown()
|
||||
|
||||
def __enter__(self) -> 'MetricsEventSubscriber':
|
||||
self.setup()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args: Any) -> None:
|
||||
self.shutdown()
|
||||
|
||||
@staticmethod
|
||||
def callback(storage: MetricsStorage, event: Dict[str, Any]) -> None:
|
||||
if event['event_name'] == eventNames.WORK_STARTED:
|
||||
storage.incr_counter('work_started')
|
||||
elif event['event_name'] == eventNames.REQUEST_COMPLETE:
|
||||
storage.incr_counter('request_complete')
|
||||
elif event['event_name'] == eventNames.WORK_FINISHED:
|
||||
storage.incr_counter('work_finished')
|
||||
else:
|
||||
print('Unhandled', event)
|
||||
|
||||
def _setup_metrics_directory(self) -> None:
|
||||
os.makedirs(DEFAULT_METRICS_DIRECTORY_PATH, exist_ok=True)
|
||||
patterns = ['*.counter', '*.gauge']
|
||||
for pattern in patterns:
|
||||
files = glob.glob(os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, pattern))
|
||||
for file_path in files:
|
||||
try:
|
||||
os.remove(file_path)
|
||||
except OSError as e:
|
||||
print(f'Error deleting file {file_path}: {e}')
|
|
@ -10,6 +10,7 @@
|
|||
"""
|
||||
from .web import HttpWebServerPlugin
|
||||
from .plugin import ReverseProxyBasePlugin, HttpWebServerBasePlugin
|
||||
from .metrics import MetricsWebServerPlugin
|
||||
from .protocols import httpProtocolTypes
|
||||
from .pac_plugin import HttpWebServerPacFilePlugin
|
||||
|
||||
|
@ -20,4 +21,5 @@ __all__ = [
|
|||
'HttpWebServerBasePlugin',
|
||||
'httpProtocolTypes',
|
||||
'ReverseProxyBasePlugin',
|
||||
'MetricsWebServerPlugin',
|
||||
]
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
proxy.py
|
||||
~~~~~~~~
|
||||
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
|
||||
Network monitoring, controls & Application development, testing, debugging.
|
||||
|
||||
:copyright: (c) 2013-present by Abhinav Singh and contributors.
|
||||
:license: BSD, see LICENSE for more details.
|
||||
"""
|
||||
from typing import Any, List, Tuple, Generator, cast
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from multiprocessing.synchronize import Lock
|
||||
|
||||
from .plugin import HttpWebServerBasePlugin
|
||||
from ..parser import HttpParser
|
||||
from .protocols import httpProtocolTypes
|
||||
from ...common.flag import flags
|
||||
from ...common.utils import text_, build_http_response
|
||||
from ...common.constants import (
|
||||
DEFAULT_ENABLE_METRICS, DEFAULT_METRICS_URL_PATH,
|
||||
)
|
||||
from ...core.event.metrics import MetricsStorage
|
||||
|
||||
|
||||
flags.add_argument(
|
||||
'--enable-metrics',
|
||||
action='store_true',
|
||||
default=DEFAULT_ENABLE_METRICS,
|
||||
help='Default: False. Enables metrics.',
|
||||
)
|
||||
|
||||
flags.add_argument(
|
||||
'--metrics-path',
|
||||
type=str,
|
||||
default=text_(DEFAULT_METRICS_URL_PATH),
|
||||
help='Default: %s. Web server path to serve proxy.py metrics.'
|
||||
% text_(DEFAULT_METRICS_URL_PATH),
|
||||
)
|
||||
|
||||
|
||||
def get_collector(metrics_lock: Lock) -> Any:
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from prometheus_client.core import Metric
|
||||
from prometheus_client.registry import Collector
|
||||
|
||||
class MetricsCollector(Collector):
|
||||
|
||||
def __init__(self, metrics_lock: Lock) -> None:
|
||||
self.storage = MetricsStorage(metrics_lock)
|
||||
|
||||
def collect(self) -> Generator[Metric, None, None]:
|
||||
"""Serves from aggregates metrics managed by MetricsEventSubscriber."""
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from prometheus_client.core import (
|
||||
GaugeMetricFamily, CounterMetricFamily,
|
||||
)
|
||||
|
||||
started = self.storage.get_counter('work_started')
|
||||
finished = self.storage.get_counter('work_finished')
|
||||
|
||||
work_started = CounterMetricFamily(
|
||||
'proxypy_work_started',
|
||||
'Total work accepted and started by proxy.py core',
|
||||
)
|
||||
work_started.add_metric(
|
||||
['proxypy_work_started'],
|
||||
started,
|
||||
)
|
||||
yield work_started
|
||||
|
||||
request_complete = CounterMetricFamily(
|
||||
'proxypy_work_request_received',
|
||||
'Total work finished sending initial request',
|
||||
)
|
||||
request_complete.add_metric(
|
||||
['proxypy_work_request_received'],
|
||||
self.storage.get_counter('request_complete'),
|
||||
)
|
||||
yield request_complete
|
||||
|
||||
work_finished = CounterMetricFamily(
|
||||
'proxypy_work_finished',
|
||||
'Total work finished by proxy.py core',
|
||||
)
|
||||
work_finished.add_metric(
|
||||
['work_finished'],
|
||||
finished,
|
||||
)
|
||||
yield work_finished
|
||||
|
||||
ongoing_work = GaugeMetricFamily(
|
||||
'proxypy_work_active',
|
||||
'Total work under active execution',
|
||||
value=started - finished,
|
||||
)
|
||||
yield ongoing_work
|
||||
|
||||
return MetricsCollector(metrics_lock)
|
||||
|
||||
|
||||
class MetricsWebServerPlugin(HttpWebServerBasePlugin):
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from prometheus_client.core import CollectorRegistry
|
||||
from prometheus_client.registry import Collector
|
||||
|
||||
self.registry = CollectorRegistry()
|
||||
self.registry.register(cast(Collector, get_collector(self.flags.metrics_lock)))
|
||||
|
||||
def routes(self) -> List[Tuple[int, str]]:
|
||||
if self.flags.metrics_path:
|
||||
return [
|
||||
(
|
||||
httpProtocolTypes.HTTP,
|
||||
r'{0}$'.format(
|
||||
text_(self.flags.metrics_path),
|
||||
),
|
||||
),
|
||||
(
|
||||
httpProtocolTypes.HTTPS,
|
||||
r'{0}$'.format(
|
||||
text_(self.flags.metrics_path),
|
||||
),
|
||||
),
|
||||
]
|
||||
return [] # pragma: no cover
|
||||
|
||||
def handle_request(self, request: HttpParser) -> None:
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from prometheus_client.exposition import _bake_output
|
||||
|
||||
# flake8: noqa
|
||||
status, headers, output = _bake_output( # type: ignore[no-untyped-call]
|
||||
self.registry,
|
||||
(
|
||||
request.header(b'Accept').decode()
|
||||
if request.has_header(b'Accept')
|
||||
else '*/*'
|
||||
),
|
||||
(
|
||||
request.header(b'Accept-Encoding').decode()
|
||||
if request.has_header(b'Accept-Encoding')
|
||||
else None
|
||||
),
|
||||
parse_qs(urlparse(request.path).query),
|
||||
False,
|
||||
)
|
||||
statuses = status.split(' ', maxsplit=1)
|
||||
response = build_http_response(
|
||||
int(statuses[0]),
|
||||
reason=statuses[1].encode(),
|
||||
headers={key.encode(): value.encode() for key, value in headers},
|
||||
body=output,
|
||||
)
|
||||
self.client.queue(memoryview(response))
|
|
@ -17,11 +17,13 @@ from typing import Any, Dict, List, Tuple, Union, Pattern, Optional
|
|||
from .plugin import HttpWebServerBasePlugin
|
||||
from ..parser import HttpParser, httpParserTypes
|
||||
from ..plugin import HttpProtocolHandlerPlugin
|
||||
from ..methods import httpMethods
|
||||
from .protocols import httpProtocolTypes
|
||||
from ..exception import HttpProtocolException
|
||||
from ..protocols import httpProtocols
|
||||
from ..responses import NOT_FOUND_RESPONSE_PKT
|
||||
from ..websocket import WebsocketFrame, websocketOpcodes
|
||||
from ...core.event import eventNames
|
||||
from ...common.flag import flags
|
||||
from ...common.types import Readables, Writables, Descriptors
|
||||
from ...common.utils import text_, build_websocket_handshake_response
|
||||
|
@ -139,6 +141,7 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
|
|||
self.switched_protocol = httpProtocolTypes.WEBSOCKET
|
||||
|
||||
def on_request_complete(self) -> Union[socket.socket, bool]:
|
||||
self.emit_request_complete()
|
||||
path = self.request.path or b'/'
|
||||
teardown = self._try_route(path)
|
||||
if teardown:
|
||||
|
@ -220,8 +223,8 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
|
|||
self._response_size += sum(len(c) for c in chunk)
|
||||
return chunk
|
||||
|
||||
def on_client_connection_close(self) -> None:
|
||||
context = {
|
||||
def _context(self) -> Dict[str, Any]:
|
||||
return {
|
||||
'client_ip': None if not self.client.addr else self.client.addr[0],
|
||||
'client_port': None if not self.client.addr else self.client.addr[1],
|
||||
'connection_time_ms': '%.2f' % ((time.time() - self.start_time) * 1000),
|
||||
|
@ -249,6 +252,9 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
|
|||
# 'response_code': text_(self.response.code),
|
||||
# 'response_reason': text_(self.response.reason),
|
||||
}
|
||||
|
||||
def on_client_connection_close(self) -> None:
|
||||
context = self._context()
|
||||
log_handled = False
|
||||
if self.route:
|
||||
# May be merge on_client_connection_close and on_access_log???
|
||||
|
@ -304,3 +310,33 @@ class HttpWebServerPlugin(HttpProtocolHandlerPlugin):
|
|||
self.flags.min_compression_length,
|
||||
),
|
||||
)
|
||||
|
||||
def emit_request_complete(self) -> None:
|
||||
if not self.flags.enable_events:
|
||||
return
|
||||
assert self.request.port and self.event_queue
|
||||
self.event_queue.publish(
|
||||
request_id=self.uid,
|
||||
event_name=eventNames.REQUEST_COMPLETE,
|
||||
event_payload={
|
||||
'url': 'http://%s%s'
|
||||
% (
|
||||
text_(self.request.header(b'host')),
|
||||
text_(self.request.path),
|
||||
),
|
||||
'method': text_(self.request.method),
|
||||
'headers': (
|
||||
{}
|
||||
if not self.request.headers
|
||||
else {
|
||||
text_(k): text_(v[1]) for k, v in self.request.headers.items()
|
||||
}
|
||||
),
|
||||
'body': (
|
||||
text_(self.request.body, errors='ignore')
|
||||
if self.request.method == httpMethods.POST
|
||||
else None
|
||||
),
|
||||
},
|
||||
publisher_id=self.__class__.__qualname__,
|
||||
)
|
||||
|
|
|
@ -43,6 +43,7 @@ from .common.constants import (
|
|||
DEFAULT_ENABLE_DASHBOARD, DEFAULT_ENABLE_SSH_TUNNEL,
|
||||
DEFAULT_SSH_LISTENER_KLASS,
|
||||
)
|
||||
from .core.event.metrics import MetricsEventSubscriber
|
||||
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
|
@ -204,6 +205,7 @@ class Proxy:
|
|||
self.acceptors: Optional[AcceptorPool] = None
|
||||
self.event_manager: Optional[EventManager] = None
|
||||
self.ssh_tunnel_listener: Optional[BaseSshTunnelListener] = None
|
||||
self.metrics_subscriber: Optional[MetricsEventSubscriber] = None
|
||||
|
||||
def __enter__(self) -> 'Proxy':
|
||||
self.setup()
|
||||
|
@ -222,9 +224,6 @@ class Proxy:
|
|||
# TODO: Python shell within running proxy.py environment
|
||||
# https://github.com/abhinavsingh/proxy.py/discussions/1021
|
||||
#
|
||||
# TODO: Near realtime resource / stats monitoring
|
||||
# https://github.com/abhinavsingh/proxy.py/discussions/1023
|
||||
#
|
||||
self._write_pid_file()
|
||||
# We setup listeners first because of flags.port override
|
||||
# in case of ephemeral port being used
|
||||
|
@ -259,9 +258,9 @@ class Proxy:
|
|||
logger.info('Core Event enabled')
|
||||
self.event_manager = EventManager()
|
||||
self.event_manager.setup()
|
||||
event_queue = self.event_manager.queue \
|
||||
if self.event_manager is not None \
|
||||
else None
|
||||
event_queue = (
|
||||
self.event_manager.queue if self.event_manager is not None else None
|
||||
)
|
||||
# Setup remote executors only if
|
||||
# --local-executor mode isn't enabled.
|
||||
if self.remote_executors_enabled:
|
||||
|
@ -287,6 +286,12 @@ class Proxy:
|
|||
flags=self.flags,
|
||||
**self.opts,
|
||||
)
|
||||
if event_queue is not None and self.flags.enable_metrics:
|
||||
self.metrics_subscriber = MetricsEventSubscriber(
|
||||
event_queue,
|
||||
self.flags.metrics_lock,
|
||||
)
|
||||
self.metrics_subscriber.setup()
|
||||
# TODO: May be close listener fd as we don't need it now
|
||||
if threading.current_thread() == threading.main_thread():
|
||||
self._register_signals()
|
||||
|
@ -309,6 +314,8 @@ class Proxy:
|
|||
return tunnel
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if self.metrics_subscriber is not None:
|
||||
self.metrics_subscriber.shutdown()
|
||||
if self.flags.enable_ssh_tunnel:
|
||||
assert self.ssh_tunnel_listener is not None
|
||||
self.ssh_tunnel_listener.shutdown()
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
prometheus_client==0.17.1; python_version < '3.8'
|
||||
prometheus_client==0.20.0; python_version >= '3.8'
|
|
@ -27,15 +27,16 @@ from proxy.common.constants import ( # noqa: WPS450
|
|||
DEFAULT_LOG_FORMAT, DEFAULT_THREADLESS, DEFAULT_WORK_KLASS,
|
||||
DEFAULT_CA_CERT_DIR, DEFAULT_CA_KEY_FILE, DEFAULT_NUM_WORKERS,
|
||||
DEFAULT_CA_CERT_FILE, DEFAULT_ENABLE_EVENTS, DEFAULT_IPV6_HOSTNAME,
|
||||
DEFAULT_NUM_ACCEPTORS, DEFAULT_LOCAL_EXECUTOR, PLUGIN_INSPECT_TRAFFIC,
|
||||
DEFAULT_ENABLE_DEVTOOLS, DEFAULT_OPEN_FILE_LIMIT, DEFAULT_DEVTOOLS_WS_PATH,
|
||||
DEFAULT_ENABLE_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL,
|
||||
DEFAULT_ENABLE_SSH_TUNNEL, DEFAULT_ENABLE_WEB_SERVER,
|
||||
DEFAULT_DISABLE_HTTP_PROXY, PLUGIN_WEBSOCKET_TRANSPORT,
|
||||
DEFAULT_CA_SIGNING_KEY_FILE, DEFAULT_CLIENT_RECVBUF_SIZE,
|
||||
DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_SERVER_RECVBUF_SIZE,
|
||||
DEFAULT_CACHE_DIRECTORY_PATH, DEFAULT_ENABLE_REVERSE_PROXY,
|
||||
DEFAULT_ENABLE_STATIC_SERVER, _env_threadless_compliant,
|
||||
DEFAULT_NUM_ACCEPTORS, DEFAULT_ENABLE_METRICS, DEFAULT_LOCAL_EXECUTOR,
|
||||
PLUGIN_INSPECT_TRAFFIC, DEFAULT_ENABLE_DEVTOOLS, DEFAULT_OPEN_FILE_LIMIT,
|
||||
DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_ENABLE_DASHBOARD,
|
||||
PLUGIN_DEVTOOLS_PROTOCOL, DEFAULT_ENABLE_SSH_TUNNEL,
|
||||
DEFAULT_ENABLE_WEB_SERVER, DEFAULT_DISABLE_HTTP_PROXY,
|
||||
PLUGIN_WEBSOCKET_TRANSPORT, DEFAULT_CA_SIGNING_KEY_FILE,
|
||||
DEFAULT_CLIENT_RECVBUF_SIZE, DEFAULT_DATA_DIRECTORY_PATH,
|
||||
DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_CACHE_DIRECTORY_PATH,
|
||||
DEFAULT_ENABLE_REVERSE_PROXY, DEFAULT_ENABLE_STATIC_SERVER,
|
||||
_env_threadless_compliant,
|
||||
)
|
||||
|
||||
|
||||
|
@ -85,6 +86,7 @@ class TestMain(unittest.TestCase):
|
|||
mock_args.unix_socket_path = None
|
||||
mock_args.data_dir = DEFAULT_DATA_DIRECTORY_PATH
|
||||
mock_args.cache_dir = DEFAULT_CACHE_DIRECTORY_PATH
|
||||
mock_args.enable_metrics = DEFAULT_ENABLE_METRICS
|
||||
|
||||
@mock.patch('os.remove')
|
||||
@mock.patch('os.path.exists')
|
||||
|
|
7
tox.ini
7
tox.ini
|
@ -7,6 +7,7 @@ minversion = 3.21.0
|
|||
deps =
|
||||
-rrequirements-testing.txt
|
||||
-rrequirements-tunnel.txt
|
||||
-rrequirements-metrics.txt
|
||||
# NOTE: The command is invoked by the script name and not via
|
||||
# NOTE: `{envpython} -m pytest` because it'd add CWD into $PYTHONPATH
|
||||
# NOTE: testing the project from the Git checkout
|
||||
|
@ -24,8 +25,9 @@ allowlist_externals =
|
|||
git
|
||||
basepython = python3
|
||||
commands_pre =
|
||||
# Paramiko:
|
||||
{envpython} -m pip install -r{toxinidir}/requirements-tunnel.txt
|
||||
{envpython} -m pip install \
|
||||
-r{toxinidir}/requirements-tunnel.txt \
|
||||
-r{toxinidir}/requirements-metrics.txt
|
||||
commands =
|
||||
# Retrieve possibly missing commits:
|
||||
-git fetch --unshallow
|
||||
|
@ -264,6 +266,7 @@ deps =
|
|||
pylint-pytest < 1.1.0
|
||||
-r docs/requirements.in
|
||||
-r requirements-tunnel.txt
|
||||
-r requirements-metrics.txt
|
||||
-r requirements-testing.txt
|
||||
-r benchmark/requirements.txt
|
||||
isolated_build = true
|
||||
|
|
Loading…
Reference in New Issue