mirror of https://github.com/mahmoud/boltons.git
Merge branch 'wrap_trace'
This commit is contained in:
commit
c3b9a04007
5
TODO.rst
5
TODO.rst
|
@ -27,6 +27,11 @@ jsonutils
|
|||
misc?
|
||||
-----
|
||||
|
||||
- wrap_trace debug utility. Takes an object, looks at its dir, wraps
|
||||
everything callable, with a hook. Needs an enable/disable flag.
|
||||
- get/set/call/return/exception
|
||||
- __slots__
|
||||
|
||||
- Tracking proxy. An object that always succeeds for all operations, saving the call history.
|
||||
- Top/Bottom singletons (greater than and less than everything)
|
||||
|
||||
|
|
|
@ -5,6 +5,22 @@ applications. Currently this focuses on ways to use :mod:`pdb`, the
|
|||
built-in Python debugger.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
try:
|
||||
basestring
|
||||
from repr import Repr
|
||||
except NameError:
|
||||
basestring = (str, bytes) # py3
|
||||
from reprlib import Repr
|
||||
|
||||
try:
|
||||
from typeutils import make_sentinel
|
||||
_UNSET = make_sentinel(var_name='_UNSET')
|
||||
except ImportError:
|
||||
_UNSET = object()
|
||||
|
||||
__all__ = ['pdb_on_signal', 'pdb_on_exception']
|
||||
|
||||
|
||||
|
@ -64,3 +80,195 @@ def pdb_on_exception(limit=100):
|
|||
pdb.post_mortem(exc_tb)
|
||||
|
||||
sys.excepthook = pdb_excepthook
|
||||
return
|
||||
|
||||
_repr_obj = Repr()
|
||||
_repr_obj.maxstring = 50
|
||||
_repr_obj.maxother = 50
|
||||
brief_repr = _repr_obj.repr
|
||||
|
||||
|
||||
# events: call, return, get, set, del, raise
|
||||
def trace_print_hook(event, label, obj, attr_name,
|
||||
args=(), kwargs={}, result=_UNSET):
|
||||
fargs = (event.ljust(6), time.time(), label.rjust(10),
|
||||
obj.__class__.__name__, attr_name)
|
||||
if event == 'get':
|
||||
tmpl = '%s %s - %s - %s.%s -> %s'
|
||||
fargs += (brief_repr(result),)
|
||||
elif event == 'set':
|
||||
tmpl = '%s %s - %s - %s.%s = %s'
|
||||
fargs += (brief_repr(args[0]),)
|
||||
elif event == 'del':
|
||||
tmpl = '%s %s - %s - %s.%s'
|
||||
else: # call/return/raise
|
||||
tmpl = '%s %s - %s - %s.%s(%s)'
|
||||
fargs += (', '.join([brief_repr(a) for a in args]),)
|
||||
if kwargs:
|
||||
tmpl = '%s %s - %s - %s.%s(%s, %s)'
|
||||
fargs += (', '.join(['%s=%s' % (k, brief_repr(v))
|
||||
for k, v in kwargs.items()]),)
|
||||
if result is not _UNSET:
|
||||
tmpl += ' -> %s'
|
||||
fargs += (brief_repr(result),)
|
||||
print(tmpl % fargs)
|
||||
return
|
||||
|
||||
|
||||
def wrap_trace(obj, hook=trace_print_hook,
|
||||
which=None, events=None, label=None):
|
||||
"""Monitor an object for interactions. Whenever code calls a method,
|
||||
gets an attribute, or sets an attribute, an event is called. By
|
||||
default the trace output is printed, but a custom tracing *hook*
|
||||
can be passed.
|
||||
|
||||
Args:
|
||||
obj (object): New- or old-style object to be traced. Built-in
|
||||
objects like lists and dicts also supported.
|
||||
hook (callable): A function called once for every event. See
|
||||
below for details.
|
||||
which (str): One or more attribute names to trace, or a
|
||||
function accepting attribute name and value, and returing
|
||||
True/False.
|
||||
events (str): One or more kinds of events to call *hook*
|
||||
on. Expected values are ``['get', 'set', 'del', 'call',
|
||||
'raise', 'return']``. Defaults to all events.
|
||||
label (str): A name to associate with the traced object
|
||||
Defaults to hexadecimal memory address, similar to repr.
|
||||
|
||||
The object returned is not the same object as the one passed
|
||||
in. It will not pass identity checks. However, it will pass
|
||||
:func:`isinstance` checks, as it is a new instance of a new
|
||||
subtype of the object passed.
|
||||
|
||||
"""
|
||||
# other actions: pdb.set_trace, print, aggregate, aggregate_return
|
||||
# (like aggregate but with the return value)
|
||||
|
||||
# TODO: test classmethod/staticmethod/property
|
||||
# TODO: wrap __dict__ for old-style classes?
|
||||
|
||||
if isinstance(which, basestring):
|
||||
which_func = lambda attr_name, attr_val: attr_name == which
|
||||
elif callable(getattr(which, '__contains__', None)):
|
||||
which_func = lambda attr_name, attr_val: attr_name in which
|
||||
elif which is None or callable(which):
|
||||
which_func = which
|
||||
else:
|
||||
raise TypeError('expected attr name(s) or callable, not: %r' % which)
|
||||
|
||||
label = label or hex(id(obj))
|
||||
|
||||
if isinstance(events, basestring):
|
||||
events = [events]
|
||||
do_get = not events or 'get' in events
|
||||
do_set = not events or 'set' in events
|
||||
do_del = not events or 'del' in events
|
||||
do_call = not events or 'call' in events
|
||||
do_raise = not events or 'raise' in events
|
||||
do_return = not events or 'return' in events
|
||||
|
||||
def wrap_method(attr_name, func, _hook=hook, _label=label):
|
||||
def wrapped(*a, **kw):
|
||||
a = a[1:]
|
||||
if do_call:
|
||||
hook(event='call', label=_label, obj=obj,
|
||||
attr_name=attr_name, args=a, kwargs=kw)
|
||||
if do_raise:
|
||||
try:
|
||||
ret = func(*a, **kw)
|
||||
except:
|
||||
if not hook(event='raise', label=_label, obj=obj,
|
||||
attr_name=attr_name, args=a, kwargs=kw,
|
||||
result=sys.exc_info()):
|
||||
raise
|
||||
else:
|
||||
ret = func(*a, **kw)
|
||||
if do_return:
|
||||
hook(event='return', label=_label, obj=obj,
|
||||
attr_name=attr_name, args=a, kwargs=kw, result=ret)
|
||||
return ret
|
||||
|
||||
wrapped.__name__ = func.__name__
|
||||
wrapped.__doc__ = func.__doc__
|
||||
try:
|
||||
wrapped.__module__ = func.__module__
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if func.__dict__:
|
||||
wrapped.__dict__.update(func.__dict__)
|
||||
except Exception:
|
||||
pass
|
||||
return wrapped
|
||||
|
||||
def __getattribute__(self, attr_name):
|
||||
ret = type(obj).__getattribute__(obj, attr_name)
|
||||
if callable(ret): # wrap any bound methods
|
||||
ret = type(obj).__getattribute__(self, attr_name)
|
||||
if do_get:
|
||||
hook('get', label, obj, attr_name, (), {}, result=ret)
|
||||
return ret
|
||||
|
||||
def __setattr__(self, attr_name, value):
|
||||
type(obj).__setattr__(obj, attr_name, value)
|
||||
if do_set:
|
||||
hook('set', label, obj, attr_name, (value,), {})
|
||||
return
|
||||
|
||||
def __delattr__(self, attr_name):
|
||||
type(obj).__delattr__(obj, attr_name)
|
||||
if do_del:
|
||||
hook('del', label, obj, attr_name, (), {})
|
||||
return
|
||||
|
||||
attrs = {}
|
||||
for attr_name in dir(obj):
|
||||
try:
|
||||
attr_val = getattr(obj, attr_name)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not callable(attr_val) or attr_name in ('__new__',):
|
||||
continue
|
||||
elif which_func and not which_func(attr_name, attr_val):
|
||||
continue
|
||||
|
||||
if attr_name == '__getattribute__':
|
||||
wrapped_method = __getattribute__
|
||||
elif attr_name == '__setattr__':
|
||||
wrapped_method = __setattr__
|
||||
elif attr_name == '__delattr__':
|
||||
wrapped_method = __delattr__
|
||||
else:
|
||||
wrapped_method = wrap_method(attr_name, attr_val)
|
||||
attrs[attr_name] = wrapped_method
|
||||
|
||||
cls_name = obj.__class__.__name__
|
||||
if cls_name == cls_name.lower():
|
||||
type_name = 'traced_' + cls_name
|
||||
else:
|
||||
type_name = 'Traced' + cls_name
|
||||
|
||||
if hasattr(obj, '__mro__'):
|
||||
bases = (obj.__class__,)
|
||||
else:
|
||||
# need new-style class for even basic wrapping of callables to
|
||||
# work. getattribute won't work for old-style classes of course.
|
||||
bases = (obj.__class__, object)
|
||||
|
||||
trace_type = type(type_name, bases, attrs)
|
||||
for cls in trace_type.__mro__:
|
||||
try:
|
||||
return cls.__new__(trace_type)
|
||||
except Exception:
|
||||
pass
|
||||
raise TypeError('unable to wrap_trace %r instance %r'
|
||||
% (obj.__class__, obj))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
obj = wrap_trace({})
|
||||
obj['hi'] = 'hello'
|
||||
obj.fail
|
||||
import pdb;pdb.set_trace()
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
|
||||
from collections import namedtuple
|
||||
|
||||
from pytest import raises
|
||||
|
||||
from boltons.debugutils import wrap_trace
|
||||
|
||||
|
||||
def test_trace_dict():
|
||||
target = {}
|
||||
wrapped = wrap_trace(target)
|
||||
|
||||
assert target is not wrapped
|
||||
assert isinstance(wrapped, dict)
|
||||
|
||||
wrapped['a'] = 'A'
|
||||
assert target['a'] == 'A'
|
||||
assert len(wrapped) == len(target)
|
||||
|
||||
wrapped.pop('a')
|
||||
assert 'a' not in target
|
||||
|
||||
with raises(AttributeError):
|
||||
wrapped.nonexistent_attr = 'nope'
|
||||
|
||||
return
|
||||
|
||||
|
||||
def test_trace_bytes():
|
||||
target = u'Hello'.encode('ascii')
|
||||
|
||||
wrapped = wrap_trace(target)
|
||||
|
||||
assert target is not wrapped
|
||||
assert isinstance(wrapped, bytes)
|
||||
|
||||
assert len(wrapped) == len(target)
|
||||
assert wrapped.decode('utf-8') == u'Hello'
|
||||
assert wrapped.lower() == target.lower()
|
||||
|
||||
|
||||
def test_trace_exc():
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
target = TestException('exceptions can be a good thing')
|
||||
wrapped = wrap_trace(target)
|
||||
|
||||
try:
|
||||
raise wrapped
|
||||
except TestException as te:
|
||||
assert te.args == target.args
|
||||
|
||||
|
||||
def test_trace_which():
|
||||
class Config(object):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
config = Config('first')
|
||||
wrapped = wrap_trace(config, which='__setattr__')
|
||||
|
||||
wrapped.value = 'second'
|
||||
assert config.value == 'second'
|
||||
|
||||
|
||||
def test_trace_namedtuple():
|
||||
TargetType = namedtuple('TargetType', 'x y z')
|
||||
target = TargetType(1, 2, 3)
|
||||
|
||||
wrapped = wrap_trace(target)
|
||||
|
||||
assert wrapped == (1, 2, 3)
|
||||
|
||||
|
||||
def test_trace_oldstyle():
|
||||
class Oldie:
|
||||
test = object()
|
||||
|
||||
def get_test(self):
|
||||
return self.test
|
||||
|
||||
oldie = Oldie()
|
||||
|
||||
wrapped = wrap_trace(oldie)
|
||||
assert wrapped.get_test() is oldie.test
|
||||
|
||||
return
|
Loading…
Reference in New Issue