tqdm/tests/tests_synchronisation.py

225 lines
7.5 KiB
Python

from __future__ import division
import sys
from functools import wraps
from threading import Event
from time import sleep, time
from tqdm import TMonitor, tqdm, trange
from .tests_perf import retry_on_except
from .tests_tqdm import StringIO, closing, importorskip, patch_lock, skip
class Time(object):
"""Fake time class class providing an offset"""
offset = 0
@classmethod
def reset(cls):
"""zeroes internal offset"""
cls.offset = 0
@classmethod
def time(cls):
"""time.time() + offset"""
return time() + cls.offset
@staticmethod
def sleep(dur):
"""identical to time.sleep()"""
sleep(dur)
@classmethod
def fake_sleep(cls, dur):
"""adds `dur` to internal offset"""
cls.offset += dur
sleep(0.000001) # sleep to allow interrupt (instead of pass)
def FakeEvent():
"""patched `threading.Event` where `wait()` uses `Time.fake_sleep()`"""
event = Event() # not a class in py2 so can't inherit
def wait(timeout=None):
"""uses Time.fake_sleep"""
if timeout is not None:
Time.fake_sleep(timeout)
return event.is_set()
event.wait = wait
return event
def patch_sleep(func):
"""Temporarily makes TMonitor use Time.fake_sleep"""
@wraps(func)
def inner(*args, **kwargs):
"""restores TMonitor on completion regardless of Exceptions"""
TMonitor._test["time"] = Time.time
TMonitor._test["Event"] = FakeEvent
if tqdm.monitor:
assert not tqdm.monitor.get_instances()
tqdm.monitor.exit()
del tqdm.monitor
tqdm.monitor = None
try:
return func(*args, **kwargs)
finally:
# Check that class var monitor is deleted if no instance left
tqdm.monitor_interval = 10
if tqdm.monitor:
assert not tqdm.monitor.get_instances()
tqdm.monitor.exit()
del tqdm.monitor
tqdm.monitor = None
TMonitor._test.pop("Event")
TMonitor._test.pop("time")
return inner
def cpu_timify(t, timer=Time):
"""Force tqdm to use the specified timer instead of system-wide time"""
t._time = timer.time
t._sleep = timer.fake_sleep
t.start_t = t.last_print_t = t._time()
return timer
class FakeTqdm(object):
_instances = set()
get_lock = tqdm.get_lock
def incr(x):
return x + 1
def incr_bar(x):
with closing(StringIO()) as our_file:
for _ in trange(x, lock_args=(False,), file=our_file):
pass
return incr(x)
@patch_sleep
def test_monitor_thread():
"""Test dummy monitoring thread"""
monitor = TMonitor(FakeTqdm, 10)
# Test if alive, then killed
assert monitor.report()
monitor.exit()
assert not monitor.report()
assert not monitor.is_alive()
del monitor
@patch_sleep
def test_monitoring_and_cleanup():
"""Test for stalled tqdm instance and monitor deletion"""
# Note: should fix miniters for these tests, else with dynamic_miniters
# it's too complicated to handle with monitoring update and maxinterval...
maxinterval = tqdm.monitor_interval
assert maxinterval == 10
total = 1000
with closing(StringIO()) as our_file:
with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
maxinterval=maxinterval) as t:
cpu_timify(t, Time)
# Do a lot of iterations in a small timeframe
# (smaller than monitor interval)
Time.fake_sleep(maxinterval / 10) # monitor won't wake up
t.update(500)
# check that our fixed miniters is still there
assert t.miniters <= 500 # TODO: should really be == 500
# Then do 1 it after monitor interval, so that monitor kicks in
Time.fake_sleep(maxinterval)
t.update(1)
# Wait for the monitor to get out of sleep's loop and update tqdm.
timeend = Time.time()
while not (t.monitor.woken >= timeend and t.miniters == 1):
Time.fake_sleep(1) # Force awake up if it woken too soon
assert t.miniters == 1 # check that monitor corrected miniters
# Note: at this point, there may be a race condition: monitor saved
# current woken time but Time.sleep() happen just before monitor
# sleep. To fix that, either sleep here or increase time in a loop
# to ensure that monitor wakes up at some point.
# Try again but already at miniters = 1 so nothing will be done
Time.fake_sleep(maxinterval)
t.update(2)
timeend = Time.time()
while t.monitor.woken < timeend:
Time.fake_sleep(1) # Force awake if it woken too soon
# Wait for the monitor to get out of sleep's loop and update
# tqdm
assert t.miniters == 1 # check that monitor corrected miniters
@patch_sleep
def test_monitoring_multi():
"""Test on multiple bars, one not needing miniters adjustment"""
# Note: should fix miniters for these tests, else with dynamic_miniters
# it's too complicated to handle with monitoring update and maxinterval...
maxinterval = tqdm.monitor_interval
assert maxinterval == 10
total = 1000
with closing(StringIO()) as our_file:
with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
maxinterval=maxinterval) as t1:
# Set high maxinterval for t2 so monitor does not need to adjust it
with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
maxinterval=1E5) as t2:
cpu_timify(t1, Time)
cpu_timify(t2, Time)
# Do a lot of iterations in a small timeframe
Time.fake_sleep(maxinterval / 10)
t1.update(500)
t2.update(500)
assert t1.miniters <= 500 # TODO: should really be == 500
assert t2.miniters == 500
# Then do 1 it after monitor interval, so that monitor kicks in
Time.fake_sleep(maxinterval)
t1.update(1)
t2.update(1)
# Wait for the monitor to get out of sleep and update tqdm
timeend = Time.time()
while not (t1.monitor.woken >= timeend and t1.miniters == 1):
Time.fake_sleep(1)
assert t1.miniters == 1 # check that monitor corrected miniters
assert t2.miniters == 500 # check that t2 was not adjusted
def test_imap():
"""Test multiprocessing.Pool"""
try:
from multiprocessing import Pool
except ImportError as err:
skip(str(err))
pool = Pool()
res = list(tqdm(pool.imap(incr, range(100)), disable=True))
pool.close()
assert res[-1] == 100
# py2: locks won't propagate to incr_bar so may cause `AttributeError`
@retry_on_except(n=3 if sys.version_info < (3,) else 1, check_cpu_time=False)
@patch_lock(thread=True)
def test_threadpool():
"""Test concurrent.futures.ThreadPoolExecutor"""
ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor
with ThreadPoolExecutor(8) as pool:
try:
res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
except AttributeError:
if sys.version_info < (3,):
skip("not supported on py2")
else:
raise
assert sum(res) == sum(range(1, 101))