Add plugin "FilterByURLRegexPlugin" (#397)

* Initial draft of filter_by_url_regex.py

* Add FilterByURLRegexPlugin

* Fix dictionary key & add logging

* Add proper logging

* Add better logging

* Add logging

* move code to handle_client_request

* development logging

* development

* development

* development

* dev

* dev

* dev

* dev

* dev

* dev

* dev

* dev

* dev

* dev

* dev

* Fix blocked log

* Add to FILTER_LIST, some tidy up

* Update FILTER_LIST

* dev

* remove scheme from url

* Add to FILTER_LIST

* Add to FILTER_LIST

* Update FILTER_LIST

* commenting

* Update FILTER_LIST

* After autopep8

* Fix Anomalous backslash in string (pep8)

* Address code quality checks - flake8 F401 & W605

* Address flake8 errors

* Attempt to fix flake8 errors

* Fix linting issues

* Address flake8 W292

* Attempt to create tests

* Add FilterByURLRegexPlugin

* Rename test

* Work on tests

* Work on tests

* Work on tests

Co-authored-by: Abhinav Singh <mailsforabhinav@gmail.com>
This commit is contained in:
Mike 2020-07-13 13:10:34 +08:00 committed by GitHub
parent 1867d58338
commit aedf5933f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 1 deletions

View File

@ -19,6 +19,7 @@ from .web_server_route import WebServerPlugin
from .reverse_proxy import ReverseProxyPlugin
from .proxy_pool import ProxyPoolPlugin
from .filter_by_client_ip import FilterByClientIpPlugin
from .filter_by_url_regex import FilterByURLRegexPlugin
from .modify_chunk_response import ModifyChunkResponsePlugin
__all__ = [
@ -35,4 +36,5 @@ __all__ = [
'ProxyPoolPlugin',
'FilterByClientIpPlugin',
'ModifyChunkResponsePlugin',
'FilterByURLRegexPlugin',
]

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import logging
from typing import Optional, List, Dict, Any
from ..http.exception import HttpRequestRejected
from ..http.parser import HttpParser
from ..http.codes import httpStatusCodes
from ..http.proxy import HttpProxyBasePlugin
from ..common.utils import text_
import re
logger = logging.getLogger(__name__)
class FilterByURLRegexPlugin(HttpProxyBasePlugin):
"""
Drop traffic by inspecting request URL,
checking against a list of regular expressions,
then returning a HTTP status code.
"""
FILTER_LIST: List[Dict[str, Any]] = [
{
'regex': b'tpc.googlesyndication.com/simgad/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Google image ads',
},
{
'regex': b'tpc.googlesyndication.com/sadbundle/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Google animated ad bundles',
},
{
'regex': b'pagead\\d+.googlesyndication.com/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Google tracking',
},
{
'regex': b'(www){0,1}.google-analytics.com/r/collect\\?.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Google tracking',
},
{
'regex': b'(www){0,1}.facebook.com/tr/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Facebook tracking',
},
{
'regex': b'tpc.googlesyndication.com/daca_images/simgad/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Google image ads',
},
{
'regex': b'.*.2mdn.net/videoplayback/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Twitch.tv video ads',
},
{
'regex': b'(www.){0,1}google.com(.*)/pagead/.*',
'status_code': httpStatusCodes.NOT_FOUND,
'notes': 'Google ads',
},
]
def before_upstream_connection(
self, request: HttpParser) -> Optional[HttpParser]:
return request
def handle_client_request(
self, request: HttpParser) -> Optional[HttpParser]:
# determine host
request_host = None
if request.host:
request_host = request.host
else:
if b'host' in request.headers:
request_host = request.header(b'host')
if not request_host:
logger.error("Cannot determine host")
return request
# build URL
url = b'%s%s' % (
request_host,
request.path,
)
# check URL against list
rule_number = 1
for blocked_entry in self.FILTER_LIST:
# if regex matches on URL
if re.search(text_(blocked_entry['regex']), text_(url)):
# log that the request has been filtered
logger.info("Blocked: %r with status_code '%r' by rule number '%r'" % (
text_(url),
blocked_entry['status_code'],
rule_number,
))
# close the connection with the status code from the filter
# list
raise HttpRequestRejected(
status_code=blocked_entry['status_code'],
headers={b'Connection': b'close'},
reason=b'Blocked',
)
# stop looping through filter list
break
# increment rule number
rule_number += 1
return request
def handle_upstream_chunk(self, chunk: memoryview) -> memoryview:
return chunk
def on_upstream_connection_close(self) -> None:
pass

View File

@ -254,3 +254,30 @@ class TestHttpProxyPluginExamples(unittest.TestCase):
httpStatusCodes.OK,
reason=b'OK', body=b'Hello from man in the middle')
)
@mock.patch('proxy.http.proxy.server.TcpServerConnection')
def test_filter_by_url_regex_plugin(
self, mock_server_conn: mock.Mock) -> None:
request = build_http_request(
b'GET', b'http://www.facebook.com/tr/',
headers={
b'Host': b'www.facebook.com',
}
)
self._conn.recv.return_value = request
self.mock_selector.return_value.select.side_effect = [
[(selectors.SelectorKey(
fileobj=self._conn,
fd=self._conn.fileno,
events=selectors.EVENT_READ,
data=None), selectors.EVENT_READ)], ]
self.protocol_handler.run_once()
self.assertEqual(
self.protocol_handler.client.buffer[0].tobytes(),
build_http_response(
status_code=httpStatusCodes.NOT_FOUND,
reason=b'Blocked',
headers={b'Connection': b'close'},
)
)

View File

@ -12,7 +12,7 @@ from typing import Type
from proxy.http.proxy import HttpProxyBasePlugin
from proxy.plugin import ModifyPostDataPlugin, ProposedRestApiPlugin, RedirectToCustomServerPlugin, \
FilterByUpstreamHostPlugin, CacheResponsesPlugin, ManInTheMiddlePlugin
FilterByUpstreamHostPlugin, CacheResponsesPlugin, ManInTheMiddlePlugin, FilterByURLRegexPlugin
def get_plugin_by_test_name(test_name: str) -> Type[HttpProxyBasePlugin]:
@ -29,4 +29,6 @@ def get_plugin_by_test_name(test_name: str) -> Type[HttpProxyBasePlugin]:
plugin = CacheResponsesPlugin
elif test_name == 'test_man_in_the_middle_plugin':
plugin = ManInTheMiddlePlugin
elif test_name == 'test_filter_by_url_regex_plugin':
plugin = FilterByURLRegexPlugin
return plugin