103 lines
3.3 KiB
Python
103 lines
3.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
proxy.py
|
|
~~~~~~~~
|
|
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
|
|
Network monitoring, controls & Application development, testing, debugging.
|
|
|
|
:copyright: (c) 2013-present by Abhinav Singh and contributors.
|
|
:license: BSD, see LICENSE for more details.
|
|
"""
|
|
import time
|
|
import logging
|
|
import multiprocessing
|
|
from typing import Any, Dict, Optional
|
|
|
|
from proxy.core.event import (
|
|
EventQueue, EventManager, EventSubscriber, eventNames,
|
|
)
|
|
from proxy.common.constants import DEFAULT_LOG_FORMAT
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format=DEFAULT_LOG_FORMAT)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
num_events_received = [0, 0]
|
|
|
|
|
|
# Execute within a separate thread context
|
|
def on_event(payload: Dict[str, Any]) -> None:
|
|
'''Subscriber callback.'''
|
|
global num_events_received
|
|
if payload['request_id'] == '1234':
|
|
num_events_received[0] += 1
|
|
else:
|
|
num_events_received[1] += 1
|
|
|
|
|
|
def publisher_process(
|
|
shutdown_event: multiprocessing.synchronize.Event,
|
|
dispatcher_queue: EventQueue,
|
|
) -> None:
|
|
logger.info('publisher started')
|
|
try:
|
|
while not shutdown_event.is_set():
|
|
dispatcher_queue.publish(
|
|
request_id='12345',
|
|
event_name=eventNames.WORK_STARTED,
|
|
event_payload={'time': time.time()},
|
|
publisher_id='eventing_pubsub_process',
|
|
)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
logger.info('publisher shutdown')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
start_time = time.time()
|
|
|
|
# Start eventing core
|
|
subscriber: Optional[EventSubscriber] = None
|
|
with EventManager() as event_manager:
|
|
assert event_manager.queue
|
|
|
|
# Create a subscriber.
|
|
# Internally, subscribe will start a separate thread
|
|
# to receive incoming published messages.
|
|
with EventSubscriber(event_manager.queue, callback=on_event) as subscriber:
|
|
# Start a publisher process to demonstrate safe exchange
|
|
# of messages between processes.
|
|
publisher_shutdown_event = multiprocessing.Event()
|
|
publisher = multiprocessing.Process(
|
|
target=publisher_process, args=(
|
|
publisher_shutdown_event, event_manager.queue, ),
|
|
)
|
|
publisher.start()
|
|
|
|
# Dispatch event from main process too
|
|
# to demonstrate safe exchange of messages
|
|
# between threads.
|
|
try:
|
|
while True:
|
|
event_manager.queue.publish(
|
|
request_id='1234',
|
|
event_name=eventNames.WORK_STARTED,
|
|
event_payload={'time': time.time()},
|
|
publisher_id='eventing_pubsub_main',
|
|
)
|
|
except KeyboardInterrupt:
|
|
logger.info('KBE!!!')
|
|
finally:
|
|
# Stop publisher process
|
|
publisher_shutdown_event.set()
|
|
publisher.join()
|
|
logger.info(
|
|
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
|
|
num_events_received[0], num_events_received[1], time.time(
|
|
) - start_time,
|
|
),
|
|
)
|
|
logger.info('Done!!!')
|