diff --git a/pysnooper/tracer.py b/pysnooper/tracer.py index 21dd1ed..0f35500 100644 --- a/pysnooper/tracer.py +++ b/pysnooper/tracer.py @@ -20,6 +20,7 @@ if pycompat.PY2: ipython_filename_pattern = re.compile('^$') +ansible_filename_pattern = re.compile(r'^(.+\.zip)[/|\\](ansible[/|\\]modules[/|\\].+\.py)$') def get_local_reprs(frame, watch=(), custom_repr=(), max_length=None, normalize=False): @@ -67,6 +68,7 @@ def get_path_and_source_from_frame(frame): source = source.splitlines() if source is None: ipython_filename_match = ipython_filename_pattern.match(file_name) + ansible_filename_match = ansible_filename_pattern.match(file_name) if ipython_filename_match: entry_number = int(ipython_filename_match.group(1)) try: @@ -77,6 +79,13 @@ def get_path_and_source_from_frame(frame): source = source_chunk.splitlines() except Exception: pass + elif ansible_filename_match: + try: + import zipfile + archive_file = zipfile.ZipFile(ansible_filename_match.group(1), 'r') + source = archive_file.read(ansible_filename_match.group(2).replace('\\', '/')).splitlines() + except Exception: + pass else: try: with open(file_name, 'rb') as fp: diff --git a/tests/test_pysnooper.py b/tests/test_pysnooper.py index 78dbc80..530a2f7 100644 --- a/tests/test_pysnooper.py +++ b/tests/test_pysnooper.py @@ -8,6 +8,7 @@ import time import types import os import sys +import zipfile from pysnooper.utils import truncate import pytest @@ -1914,3 +1915,153 @@ def test_exception_on_entry(): with pytest.raises(TypeError): f() + + +def test_valid_zipfile(): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): + module_name = 'my_valid_zip_module' + zip_name = 'valid.zip' + zip_base_path = mini_toolbox.pathlib.Path('ansible/modules') + python_file_path = folder / zip_name / zip_base_path / ('%s.py' % (module_name)) + os.makedirs(str(folder / zip_name / zip_base_path)) + try: + sys.path.insert(0, str(folder / zip_name / zip_base_path)) + content = textwrap.dedent(u''' + import pysnooper + @pysnooper.snoop(color=False) + def f(x): + return x + ''') + + python_file_path.write_text(content) + + module = __import__(module_name) + + with zipfile.ZipFile(str(folder / 'foo_bar.zip'), 'w') as myZipFile: + myZipFile.write(str(folder / zip_name / zip_base_path / ('%s.py' % (module_name))), \ + '%s/%s.py' % (zip_base_path, module_name,), \ + zipfile.ZIP_DEFLATED) + + python_file_path.unlink() + folder.joinpath(zip_name).rename(folder.joinpath('%s.delete' % (zip_name))) + folder.joinpath('foo_bar.zip').rename(folder.joinpath(zip_name)) + + with mini_toolbox.OutputCapturer(stdout=False, + stderr=True) as output_capturer: + result = getattr(module, 'f')(7) + assert result == 7 + output = output_capturer.output + + assert_output( + output, + ( + SourcePathEntry(), + VariableEntry(stage='starting'), + CallEntry('def f(x):'), + LineEntry('return x'), + ReturnEntry('return x'), + ReturnValueEntry('7'), + ElapsedTimeEntry(), + ) + ) + finally: + sys.path.remove(str(folder / zip_name / zip_base_path)) + + +def test_invalid_zipfile(): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): + module_name = 'my_invalid_zip_module' + zip_name = 'invalid.zip' + zip_base_path = mini_toolbox.pathlib.Path('invalid/modules/path') + python_file_path = folder / zip_name / zip_base_path / ('%s.py' % (module_name)) + os.makedirs(str(folder / zip_name / zip_base_path)) + try: + sys.path.insert(0, str(folder / zip_name / zip_base_path)) + content = textwrap.dedent(u''' + import pysnooper + @pysnooper.snoop(color=False) + def f(x): + return x + ''') + python_file_path.write_text(content) + + module = __import__(module_name) + + with zipfile.ZipFile(str(folder / 'foo_bar.zip'), 'w') as myZipFile: + myZipFile.write(str(folder / zip_name / zip_base_path / ('%s.py' % (module_name))), \ + str(zip_base_path / ('%s.py' % (module_name,))), \ + zipfile.ZIP_DEFLATED) + + python_file_path.unlink() + folder.joinpath(zip_name).rename(folder.joinpath('%s.delete' % (zip_name))) + folder.joinpath('foo_bar.zip').rename(folder.joinpath(zip_name)) + + with mini_toolbox.OutputCapturer(stdout=False, + stderr=True) as output_capturer: + result = getattr(module, 'f')(7) + assert result == 7 + output = output_capturer.output + + assert_output( + output, + ( + SourcePathEntry(), + VariableEntry(stage='starting'), + CallEntry('SOURCE IS UNAVAILABLE'), + LineEntry('SOURCE IS UNAVAILABLE'), + ReturnEntry('SOURCE IS UNAVAILABLE'), + ReturnValueEntry('7'), + ElapsedTimeEntry(), + ) + ) + finally: + sys.path.remove(str(folder / zip_name / zip_base_path)) + + +def test_valid_damaged_zipfile(): + with mini_toolbox.create_temp_folder(prefix='pysnooper') as folder, \ + mini_toolbox.TempSysPathAdder(str(folder)): + module_name = 'my_damaged_module' + zip_name = 'damaged.zip' + zip_base_path = mini_toolbox.pathlib.Path('ansible/modules') + python_file_path = folder / zip_name / zip_base_path / ('%s.py' % (module_name)) + os.makedirs(str(folder / zip_name / zip_base_path)) + try: + sys.path.insert(0, str(folder / zip_name / zip_base_path)) + content = textwrap.dedent(u''' + import pysnooper + @pysnooper.snoop(color=False) + def f(x): + return x + ''') + python_file_path.write_text(content) + + module = __import__(module_name) + + python_file_path.unlink() + folder.joinpath(zip_name).rename(folder.joinpath('%s.delete' % (zip_name))) + + folder.joinpath(zip_name).write_text(u'I am not a zip file') + + with mini_toolbox.OutputCapturer(stdout=False, + stderr=True) as output_capturer: + result = getattr(module, 'f')(7) + assert result == 7 + output = output_capturer.output + + assert_output( + output, + ( + SourcePathEntry(), + VariableEntry(stage='starting'), + CallEntry('SOURCE IS UNAVAILABLE'), + LineEntry('SOURCE IS UNAVAILABLE'), + ReturnEntry('SOURCE IS UNAVAILABLE'), + ReturnValueEntry('7'), + ElapsedTimeEntry(), + ) + ) + finally: + sys.path.remove(str(folder / zip_name / zip_base_path)) diff --git a/tests/test_utils/test_regex.py b/tests/test_utils/test_regex.py new file mode 100644 index 0000000..17d8805 --- /dev/null +++ b/tests/test_utils/test_regex.py @@ -0,0 +1,72 @@ +# Copyright 2022 Ram Rachum and collaborators. +# This program is distributed under the MIT license. + +import pysnooper +from pysnooper.tracer import ansible_filename_pattern + +def test_ansible_filename_pattern(): + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = 'ansible/modules/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name).group(1) == archive_file + assert ansible_filename_pattern.match(file_name).group(2) == source_code_file + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_with.zip_name.zip' + source_code_file = 'ansible/modules/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name).group(1) == archive_file + assert ansible_filename_pattern.match(file_name).group(2) == source_code_file + + archive_file = '/my/new/path/payload.zip' + source_code_file = 'ansible/modules/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name).group(1) == archive_file + assert ansible_filename_pattern.match(file_name).group(2) == source_code_file + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = 'ansible/modules/in/new/path/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name).group(1) == archive_file + assert ansible_filename_pattern.match(file_name).group(2) == source_code_file + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = 'ansible/modules/my_module_is_called_.py.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name).group(1) == archive_file + assert ansible_filename_pattern.match(file_name).group(2) == source_code_file + + archive_file = 'C:\\Users\\vagrant\\AppData\\Local\\Temp\\pysnooperw5c2lg35\\valid.zip' + source_code_file = 'ansible\\modules\\my_valid_zip_module.py' + file_name = '%s\\%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name).group(1) == archive_file + assert ansible_filename_pattern.match(file_name).group(2) == source_code_file + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = 'ANSIBLE/modules/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name) is None + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = 'ansible/modules/my_module.PY' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name) is None + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.Zip' + source_code_file = 'ansible/modules/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name) is None + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = 'ansible/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name) is None + + archive_file = '/tmp/ansible_my_module_payload_xyz1234/ansible_my_module_payload.zip' + source_code_file = '' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name) is None + + archive_file = '' + source_code_file = 'ansible/modules/my_module.py' + file_name = '%s/%s' % (archive_file, source_code_file) + assert ansible_filename_pattern.match(file_name) is None