add FileTracker which could track opening files

This commit is contained in:
Prodesire 2017-12-27 01:02:58 +08:00
parent 3ecb79e505
commit e39d406dd2
3 changed files with 90 additions and 2 deletions

View File

@ -5,6 +5,13 @@ import types
PY2 = sys.version_info[0] == 2
# builtins
if PY2:
import __builtin__ as builtins
else:
import builtins
# url*
if PY2:
import urllib as urlib

View File

@ -2,8 +2,64 @@ import os
import sys
import shutil
from . import logger
from .platform import WINDOWS
from .compat import PY2
from .compat import PY2, builtins
_openfiles = set()
_origin_open = builtins.open
if PY2:
_origin_file = builtins.file
class _trackfile(builtins.file):
def __init__(self, *args):
self.path = args[0]
logger.debug('Opening "%s"', self.path)
super(_trackfile, self).__init__(*args)
_openfiles.add(self)
def close(self):
logger.debug('Closing "%s"', self.path)
super(_trackfile, self).close()
_openfiles.remove(self)
def _trackopen(*args):
return _trackfile(*args)
else:
def _trackopen(*args, **kwargs):
f = _origin_open(*args, **kwargs)
path = args[0]
logger.debug('Opening "%s"', path)
_openfiles.add(f)
origin_close = f.close
def close():
logger.debug('Closing "%s"', path)
origin_close()
_openfiles.remove(f)
f.close = close
return f
class FileTracker(object):
@classmethod
def track(cls):
builtins.open = _trackopen
if PY2:
builtins.file = _trackfile
@classmethod
def untrack(cls):
builtins.open = _origin_open
if PY2:
builtins.file = _origin_file
@classmethod
def get_openfiles(cls):
return _openfiles
def makedirs(path, mode=0o755, ignore_errors=False, exist_ok=False):

View File

@ -4,13 +4,38 @@ import time
import pytest
from pydu.platform import WINDOWS
from pydu.system import (makedirs, remove, removes, open_file, copy, touch,
from pydu.system import (FileTracker,
makedirs, remove, removes, open_file, copy, touch,
chmod, which)
if not WINDOWS:
from pydu.system import link, symlink
class TestFileTracker:
def test_track_open(self, tmpdir):
FileTracker.track()
path = tmpdir.join('test').strpath
f = open(path, 'w')
assert f in FileTracker.get_openfiles()
f.close()
assert f not in FileTracker.get_openfiles()
def test_track_context_open(self, tmpdir):
FileTracker.track()
path = tmpdir.join('test').strpath
with open(path, 'w') as f:
assert f in FileTracker.get_openfiles()
assert f not in FileTracker.get_openfiles()
def test_untrack(self, tmpdir):
FileTracker.track()
FileTracker.untrack()
path = tmpdir.join('test').strpath
f = open(path, 'w')
assert f not in FileTracker.get_openfiles()
class TestMakeDirs:
def test_makedirs(self, tmpdir):
path = str(tmpdir.join('test'))