diff --git a/.travis.yml b/.travis.yml index 1e170244..99870fa6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ sudo: false language: python python: 3.5 -branches: # remove travis double-check on pull requests in main repo - only: - - master - - /^\d\.\d+$/ +# branches: # remove travis double-check on pull requests in main repo +# only: +# - master +# - /^\d\.\d+$/ env: - TOXENV=py26 - TOXENV=py27 diff --git a/README.rst b/README.rst index c3e47723..d0eb90e1 100644 --- a/README.rst +++ b/README.rst @@ -126,7 +126,7 @@ If the optional variable ``total`` (or an iterable with ``len()``) is provided, predictive stats are displayed. ``with`` is also optional (you can just assign ``tqdm()`` to a variable, -but in this case don't forget to ``close()`` at the end: +but in this case don't forget to ``del`` or ``close()`` at the end: .. code:: python @@ -148,12 +148,11 @@ Documentation progressbar every time a value is requested. """ - def __init__(self, iterable=None, desc=None, total=None, leave=False, + def __init__(self, iterable=None, desc=None, total=None, leave=True, file=sys.stderr, ncols=None, mininterval=0.1, - maxinterval=10.0, miniters=None, ascii=None, - disable=False, unit='it', unit_scale=False, - dynamic_ncols=False, smoothing=0.3, nested=False, - bar_format=None, initial=0, gui=False): + maxinterval=10.0, miniters=None, ascii=None, disable=False, + unit='it', unit_scale=False, dynamic_ncols=False, + smoothing=0.3, bar_format=None, initial=0, position=None): Parameters ~~~~~~~~~~ @@ -170,7 +169,7 @@ Parameters True and this parameter needs subsequent updating, specify an initial arbitrary large positive integer, e.g. int(9e9). * leave : bool, optional - If [default: False], removes all traces of the progressbar + If [default: True], removes all traces of the progressbar upon termination of iteration. * file : `io.TextIOWrapper` or `io.StringIO`, optional Specifies where to output the progress messages @@ -210,10 +209,6 @@ Parameters Exponential moving average smoothing factor for speed estimates (ignored in GUI mode). Ranges from 0 (average speed) to 1 (current/instantaneous speed) [default: 0.3]. -* nested : bool, optional - Whether this iterable is nested in another one also managed by - `tqdm` [default: False]. Allows display of multiple, nested - progress bars. * bar_format : str, optional Specify a custom bar string formatting. May impact performance. [default: '{l_bar}{bar}{r_bar}'], where l_bar is @@ -224,7 +219,9 @@ Parameters * initial : int, optional The initial counter value. Useful when restarting a progress bar [default: 0]. - +* position : int, optional + Specify the line offset to print this bar. Useful to manage + multiple bars at once (eg, from threads). Returns ~~~~~~~ @@ -373,23 +370,25 @@ folder or import the module and run ``help()``. Nested progress bars ~~~~~~~~~~~~~~~~~~~~ -``tqdm`` supports nested progress bars, you just need to specify the -`nested=True` argument for all tqdm instantiations except the **outermost** -bar. Here's an example: +``tqdm`` supports nested progress bars. Here's an example: .. code:: python from tqdm import trange from time import sleep - for i in trange(10, desc='1st loop', leave=True): - for j in trange(5, desc='2nd loop', leave=True, nested=True): - for k in trange(100, desc='3nd loop', leave=True, nested=True): + for i in trange(10, desc='1st loop'): + for j in trange(5, desc='2nd loop', leave=False): + for k in trange(100, desc='3nd loop'): sleep(0.01) On Windows `colorama `__ will be used if available to produce a beautiful nested display. +For manual control over positioning (e.g. for multi-threaded use), +you may specify `position=n` where `n=0` for the outermost bar, +`n=1` for the next, and so on. + How to make a good progress bar ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -400,16 +399,14 @@ a variety of use cases with no or minimal configuration. However, there is one thing that ``tqdm`` cannot do: choose a pertinent progress indicator. To display a useful progress bar, it is very important that -you ensure that you supply ``tqdm`` with the most pertinent progress indicator, -which will reflect most accurately the current state of your program. +``tqdm`` is supplied with the most pertinent progress indicator. +This will reflect most accurately the current state of your program. Usually, a good way is to preprocess quickly to first evaluate the total amount of work to do before beginning the real processing. -To illustrate the importance of a good progress indicator, let's take the +To illustrate the importance of a good progress indicator, take the following example: you want to walk through all files of a directory and -process their contents to do your biddings. - -Here is a basic program to do that: +process their contents with some external function: .. code:: python @@ -436,7 +433,7 @@ Here is a basic program to do that: buf = fh.read(blocksize) dosomething(buf) -``process_content_no_progress()`` does the job alright, but it does not show +``process_content_no_progress()`` does the job, but does not show any information about the current progress, nor how long it will take. To quickly fix that using ``tqdm``, we can use this naive approach: @@ -485,10 +482,10 @@ now we have predictive information: However, the progress is not smooth: it increments in steps, 1 step being 1 file processed. The problem is that we do not just walk through files tree, -but we process the files contents. Thus, if we stumble on one big fat file, -it will take a huge deal more time to process than other smaller files, but -the progress bar cannot know that, because we only supplied the files count, -so it considers that every element is of equal processing weight. +but we process the files contents. Thus, if we stumble on one very large file +which takes a great deal more time to process than other smaller files, +the progress bar +will still considers that file is of equal processing weight. To fix this, we should use another indicator than the files count: the total sum of all files sizes. This would be more pertinent since the data we @@ -525,6 +522,7 @@ predicted time and statistics: 47%|██████████████████▍\ \| 152K/321K [00:03<00:03, 46.2KB/s] + Contributions ------------- @@ -551,7 +549,7 @@ file for more information. License ------- -`MIT LICENSE `__. +Mostly `CC, MIT licence `__. Authors diff --git a/examples/simple_examples.py b/examples/simple_examples.py index 5e3a525b..b84cf212 100644 --- a/examples/simple_examples.py +++ b/examples/simple_examples.py @@ -21,7 +21,7 @@ stmts = ( ' ascii=True, desc="cool", dynamic_ncols=True):\n\tpass', # Nested bars 'from tqdm import trange\nfor i in trange(10):\n\t' - 'for j in trange(int(1e7), nested=True):\n\t\tpass', + 'for j in trange(int(1e7), leave=False, unit_scale=True):\n\t\tpass', # Experimental GUI demo 'import tqdm\nfor i in tqdm.tgrange(int(1e8)):\n\tpass', # Comparison to https://code.google.com/p/python-progressbar/ diff --git a/tqdm/_tqdm.py b/tqdm/_tqdm.py index bb4d6b33..d19e19d3 100644 --- a/tqdm/_tqdm.py +++ b/tqdm/_tqdm.py @@ -12,7 +12,7 @@ Usage: from __future__ import division, absolute_import # import compatibility functions and utilities from ._utils import _supports_unicode, _environ_cols_wrapper, _range, _unich, \ - _term_move_up, _unicode + _term_move_up, _unicode, WeakSet import sys from time import time @@ -88,11 +88,17 @@ class tqdm(object): if not getattr(fp, 'flush', False): # pragma: no cover fp.flush = lambda: None + def fp_write(s): + try: + fp.write(_unicode(s)) + except UnicodeEncodeError: # pragma: no cover + fp.write(repr(s)) + last_printed_len = [0] # closure over mutable variable (fast) def print_status(s): len_s = len(s) - fp.write('\r' + s + (' ' * max(last_printed_len[0] - len_s, 0))) + fp_write('\r' + s + (' ' * max(last_printed_len[0] - len_s, 0))) fp.flush() last_printed_len[0] = len_s return print_status @@ -163,7 +169,7 @@ class tqdm(object): rate_fmt = ((format_sizeof(inv_rate if inv_rate else rate) if unit_scale else '{0:5.2f}'.format(inv_rate if inv_rate else rate)) - if elapsed else '?') \ + if rate else '?') \ + ('s' if inv_rate else unit) + '/' + (unit if inv_rate else 's') if unit_scale: @@ -247,12 +253,44 @@ class tqdm(object): return (prefix if prefix else '') + '{0}{1} [{2}, {3}]'.format( n_fmt, unit, elapsed_str, rate_fmt) - def __init__(self, iterable=None, desc=None, total=None, leave=False, + def __new__(cls, *args, **kwargs): + instance = object.__new__(cls) + if "_instances" not in cls.__dict__: + cls._instances = WeakSet() + cls._instances.add(instance) + return instance + + @classmethod + def _get_free_pos(cls, instance=None): + """ Skips specified instance """ + try: + return max(inst.pos for inst in cls._instances + if inst is not instance) + 1 + except ValueError as e: + if "arg is an empty sequence" in str(e): + return 0 + raise # pragma: no cover + + @classmethod + def _decr_instances(cls, instance): + """ + Remove from list and reposition other bars + so that newer bars won't overlap previous bars + """ + try: # in case instance was explicitly positioned, it won't be in set + cls._instances.remove(instance) + for inst in cls._instances: + if inst.pos > instance.pos: + inst.pos -= 1 + except KeyError: + pass + + def __init__(self, iterable=None, desc=None, total=None, leave=True, file=sys.stderr, ncols=None, mininterval=0.1, - maxinterval=10.0, miniters=None, ascii=None, - disable=False, unit='it', unit_scale=False, - dynamic_ncols=False, smoothing=0.3, nested=False, - bar_format=None, initial=0, gui=False): + maxinterval=10.0, miniters=None, ascii=None, disable=False, + unit='it', unit_scale=False, dynamic_ncols=False, + smoothing=0.3, bar_format=None, initial=0, position=None, + gui=False, **kwargs): """ Parameters ---------- @@ -268,7 +306,7 @@ class tqdm(object): True and this parameter needs subsequent updating, specify an initial arbitrary large positive integer, e.g. int(9e9). leave : bool, optional - If [default: False], removes all traces of the progressbar + If [default: True], removes all traces of the progressbar upon termination of iteration. file : `io.TextIOWrapper` or `io.StringIO`, optional Specifies where to output the progress messages @@ -307,10 +345,6 @@ class tqdm(object): Exponential moving average smoothing factor for speed estimates (ignored in GUI mode). Ranges from 0 (average speed) to 1 (current/instantaneous speed) [default: 0.3]. - nested : bool, optional - Whether this iterable is nested in another one also managed by - `tqdm` [default: False]. Allows display of multiple, nested - progress bars. bar_format : str, optional Specify a custom bar string formatting. May impact performance. [default: '{l_bar}{bar}{r_bar}'], where l_bar is @@ -321,6 +355,9 @@ class tqdm(object): initial : int, optional The initial counter value. Useful when restarting a progress bar [default: 0]. + position : int, optional + Specify the line offset to print this bar [default: 0]. + Useful to manage multiple bars at once (eg, from threads). gui : bool, optional WARNING: internal parameter - do not use. Use tqdm_gui(...) instead. If set, will attempt to use @@ -330,6 +367,23 @@ class tqdm(object): ------- out : decorated iterator. """ + if disable: + self.iterable = iterable + self.disable = disable + self.pos = self._get_free_pos(self) + self._instances.remove(self) + return + + if kwargs: + self.disable = True + self.pos = self._get_free_pos(self) + self._instances.remove(self) + raise (DeprecationWarning("nested is deprecated and" + " automated.\nUse position instead" + " for manual control") + if "nested" in kwargs else + Warning("Unknown argument(s): " + str(kwargs))) + # Preprocess the arguments if total is None and iterable is not None: try: @@ -342,7 +396,7 @@ class tqdm(object): if dynamic_ncols: # pragma: no cover dynamic_ncols = _environ_cols_wrapper() ncols = dynamic_ncols(file) - else: + else: # pragma: no cover ncols = _environ_cols_wrapper()(file) if miniters is None: @@ -387,24 +441,26 @@ class tqdm(object): self.smoothing = smoothing self.avg_time = None self._time = time - # if nested, at initial sp() call we replace '\r' by '\n' to - # not overwrite the outer progress bar - self.nested = nested self.bar_format = bar_format # Init the iterations counters self.last_print_n = initial self.n = initial + # if nested, at initial sp() call we replace '\r' by '\n' to + # not overwrite the outer progress bar + self.pos = self._get_free_pos(self) if position is None else position + if not gui: # Initialize the screen printer self.sp = self.status_printer(self.fp) - if not disable: - if self.nested: - self.fp.write('\n') - self.sp(self.format_meter(self.n, total, 0, - (dynamic_ncols(file) if dynamic_ncols else ncols), - self.desc, ascii, unit, unit_scale, None, bar_format)) + if self.pos: + self.moveto(self.pos) + self.sp(self.format_meter(self.n, total, 0, + (dynamic_ncols(file) if dynamic_ncols else ncols), + self.desc, ascii, unit, unit_scale, None, bar_format)) + if self.pos: + self.moveto(-self.pos) # Init the time counter self.start_t = self.last_print_t = self._time() @@ -419,6 +475,42 @@ class tqdm(object): self.close() return False + def __del__(self): + self.close() + + def __repr__(self): + return self.format_meter(self.n, self.total, time() - self.last_print_t, + self.ncols, self.desc, self.ascii, self.unit, + self.unit_scale, 1 / self.avg_time + if self.avg_time else None, self.bar_format) + + def __lt__(self, other): + # try: + return self.pos < other.pos + # except AttributeError: + # return self.start_t < other.start_t + + def __le__(self, other): + return (self < other) or (self == other) + + def __eq__(self, other): + # try: + return self.pos == other.pos + # except AttributeError: + # return self.start_t == other.start_t + + def __ne__(self, other): + return not (self == other) + + def __gt__(self, other): + return not (self <= other) + + def __ge__(self, other): + return not (self < other) + + def __hash__(self): + return id(self) + def __iter__(self): ''' Backward-compatibility to use: for x in tqdm(iterable) ''' @@ -475,6 +567,10 @@ class tqdm(object): else smoothing * delta_t / delta_it + \ (1 - smoothing) * avg_time + if self.pos: + self.moveto(self.pos) + + # Printing the bar's update sp(format_meter( n, self.total, elapsed, (dynamic_ncols(self.fp) if dynamic_ncols @@ -482,6 +578,9 @@ class tqdm(object): self.desc, ascii, unit, unit_scale, 1 / avg_time if avg_time else None, bar_format)) + if self.pos: + self.moveto(-self.pos) + # If no `miniters` was specified, adjust automatically # to the maximum iteration rate seen so far. if dynamic_miniters: @@ -552,6 +651,10 @@ class tqdm(object): raise DeprecationWarning('Please use tqdm_gui(...)' ' instead of tqdm(..., gui=True)') + if self.pos: + self.moveto(self.pos) + + # Print bar's update self.sp(self.format_meter( self.n, self.total, elapsed, (self.dynamic_ncols(self.fp) if self.dynamic_ncols @@ -560,6 +663,9 @@ class tqdm(object): 1 / self.avg_time if self.avg_time else None, self.bar_format)) + if self.pos: + self.moveto(-self.pos) + # If no `miniters` was specified, adjust automatically to the # maximum iteration rate seen so far. # e.g.: After running `tqdm.update(5)`, subsequent @@ -588,6 +694,34 @@ class tqdm(object): if self.disable: return + # Prevent multiple closures + self.disable = True + + # decrement instance pos and remove from internal set + pos = self.pos + self._decr_instances(self) + + # GUI mode + if not hasattr(self, "sp"): + return + + # annoyingly, _supports_unicode isn't good enough + def fp_write(s): + try: + self.fp.write(_unicode(s)) + except UnicodeEncodeError: # pragma: no cover + self.fp.write(repr(s)) + + try: + fp_write('') + except ValueError as e: + if 'closed' in str(e): + return + raise # pragma: no cover + + if pos: + self.moveto(pos) + if self.leave: if self.last_print_n < self.n: cur_t = self._time() @@ -598,10 +732,16 @@ class tqdm(object): else self.ncols), self.desc, self.ascii, self.unit, self.unit_scale, None, self.bar_format)) - self.fp.write('\r' + _term_move_up() if self.nested else '\n') + if pos: + self.moveto(-pos) + else: + fp_write('\n') else: self.sp('') # clear up last bar - self.fp.write('\r' + _term_move_up() if self.nested else '\r') + if pos: + self.moveto(-pos) + else: + fp_write('\r') def unpause(self): """ @@ -617,6 +757,9 @@ class tqdm(object): """ self.desc = desc + ': ' if desc else '' + def moveto(self, n): + self.fp.write(_unicode('\n' * n + _term_move_up() * -n)) + def trange(*args, **kwargs): """ diff --git a/tqdm/_tqdm_gui.py b/tqdm/_tqdm_gui.py index e3b1e1bb..90b71b2c 100644 --- a/tqdm/_tqdm_gui.py +++ b/tqdm/_tqdm_gui.py @@ -330,6 +330,10 @@ class tqdm_gui(tqdm): # pragma: no cover if self.disable: return + self.disable = True + + self._instances.remove(self) + # Restore toolbars self.mpl.rcParams['toolbar'] = self.toolbar # Return to non-interactive mode diff --git a/tqdm/_utils.py b/tqdm/_utils.py index 51b2e5f8..88a1f019 100755 --- a/tqdm/_utils.py +++ b/tqdm/_utils.py @@ -22,6 +22,11 @@ try: # pragma: no cover except ImportError: # pragma: no cover colorama = None +try: # pragma: no cover + from weakref import WeakSet +except ImportError: # pragma: nocover + WeakSet = set + def _is_utf(encoding): return ('U8' == encoding) or ('utf' in encoding) or ('UTF' in encoding) diff --git a/tqdm/_version.py b/tqdm/_version.py index 84e3a4cb..139196ef 100644 --- a/tqdm/_version.py +++ b/tqdm/_version.py @@ -1,5 +1,5 @@ # Definition of the version number -version_info = 3, 8, 0 # major, minor, patch, -extra +version_info = 4, 0, 0 # major, minor, patch, -extra # Nice string for the version __version__ = '.'.join(map(str, version_info)).replace('.-', '-').strip('.-') diff --git a/tqdm/tests/tests_pandas.py b/tqdm/tests/tests_pandas.py index b245d6c1..7737ad4d 100644 --- a/tqdm/tests/tests_pandas.py +++ b/tqdm/tests/tests_pandas.py @@ -1,21 +1,12 @@ from nose.plugins.skip import SkipTest from tqdm import tqdm - -try: - from StringIO import StringIO -except: - from io import StringIO -# Ensure we can use `with closing(...) as ... :` syntax -if getattr(StringIO, '__exit__', False) and \ - getattr(StringIO, '__enter__', False): - def closing(arg): - return arg -else: - from contextlib import closing +from tests_tqdm import with_setup, pretest, posttest, StringIO, closing +@with_setup(pretest, posttest) def test_pandas(): + """ Test pandas.DataFrame.groupby(0).progress_apply """ try: from numpy.random import randint from tqdm import tqdm_pandas @@ -30,15 +21,18 @@ def test_pandas(): our_file.seek(0) - try: - # don't expect final output since no `leave` and - # high dynamic `miniters` - assert '100%|##########| 101/101' not in our_file.read() - except: - raise AssertionError('Did not expect:\n\t100%|##########| 101/101') + # don't expect final output since no `leave` and + # high dynamic `miniters` + nexres = '100%|##########| 101/101' + if nexres in our_file.read(): + our_file.seek(0) + raise AssertionError("\nDid not expect:\n{0}\nIn:{1}\n".format( + nexres, our_file.read())) +@with_setup(pretest, posttest) def test_pandas_leave(): + """ Test pandas with `leave=True` """ try: from numpy.random import randint from tqdm import tqdm_pandas @@ -53,10 +47,8 @@ def test_pandas_leave(): our_file.seek(0) - try: - assert '100%|##########| 101/101' in our_file.read() - except: + exres = '100%|##########| 101/101' + if exres not in our_file.read(): our_file.seek(0) - raise AssertionError('\n'.join(('Expected:', - '100%|##########| 101/101', 'Got:', - our_file.read()))) + raise AssertionError("\nExpected:\n{0}\nIn:{1}\n".format( + exres, our_file.read())) diff --git a/tqdm/tests/tests_perf.py b/tqdm/tests/tests_perf.py index a11be5c7..1e1e4011 100644 --- a/tqdm/tests/tests_perf.py +++ b/tqdm/tests/tests_perf.py @@ -10,22 +10,7 @@ from time import sleep, time from tqdm import trange from tqdm import tqdm -try: - from StringIO import StringIO -except: - from io import StringIO -# Ensure we can use `with closing(...) as ... :` syntax -if getattr(StringIO, '__exit__', False) and \ - getattr(StringIO, '__enter__', False): - def closing(arg): - return arg -else: - from contextlib import closing - -try: - _range = xrange -except: - _range = range +from tests_tqdm import with_setup, pretest, posttest, StringIO, closing, _range # Use relative/cpu timer to have reliable timings when there is a sudden load try: @@ -64,10 +49,15 @@ def checkCpuTime(sleeptime=0.2): @contextmanager def relative_timer(): start = process_time() - elapser = lambda: process_time() - start + + def elapser(): + return process_time() - start + yield lambda: elapser() spent = process_time() - start - elapser = lambda: spent + + def elapser(): # NOQA + return spent class MockIO(StringIO): @@ -141,6 +131,7 @@ def simple_progress(iterable=None, total=None, file=sys.stdout, desc='', return update_and_print +@with_setup(pretest, posttest) def test_iter_overhead(): """ Test overhead of iteration based tqdm """ try: @@ -164,13 +155,12 @@ def test_iter_overhead(): our_file.write(a) # Compute relative overhead of tqdm against native range() - try: - assert(time_tqdm() < 3 * time_bench()) - except AssertionError: + if time_tqdm() > 9 * time_bench(): raise AssertionError('trange(%g): %f, range(%g): %f' % (total, time_tqdm(), total, time_bench())) +@with_setup(pretest, posttest) def test_manual_overhead(): """ Test overhead of manual tqdm """ try: @@ -181,12 +171,12 @@ def test_manual_overhead(): total = int(1e6) with closing(MockIO()) as our_file: - t = tqdm(total=total * 10, file=our_file, leave=True) - a = 0 - with relative_timer() as time_tqdm: - for i in _range(total): - a += i - t.update(10) + with tqdm(total=total * 10, file=our_file, leave=True) as t: + a = 0 + with relative_timer() as time_tqdm: + for i in _range(total): + a += i + t.update(10) a = 0 with relative_timer() as time_bench: @@ -195,13 +185,12 @@ def test_manual_overhead(): our_file.write(a) # Compute relative overhead of tqdm against native range() - try: - assert(time_tqdm() < 10 * time_bench()) - except AssertionError: + if time_tqdm() > 10 * time_bench(): raise AssertionError('tqdm(%g): %f, range(%g): %f' % (total, time_tqdm(), total, time_bench())) +@with_setup(pretest, posttest) def test_iter_overhead_hard(): """ Test overhead of iteration based tqdm (hard) """ try: @@ -233,6 +222,7 @@ def test_iter_overhead_hard(): (total, time_tqdm(), total, time_bench())) +@with_setup(pretest, posttest) def test_manual_overhead_hard(): """ Test overhead of manual tqdm (hard) """ try: @@ -265,6 +255,7 @@ def test_manual_overhead_hard(): (total, time_tqdm(), total, time_bench())) +@with_setup(pretest, posttest) def test_iter_overhead_simplebar_hard(): """ Test overhead of iteration based tqdm vs simple progress bar (hard) """ try: @@ -298,6 +289,7 @@ def test_iter_overhead_simplebar_hard(): (total, time_tqdm(), total, time_bench())) +@with_setup(pretest, posttest) def test_manual_overhead_simplebar_hard(): """ Test overhead of manual tqdm vs simple progress bar (hard) """ try: diff --git a/tqdm/tests/tests_tqdm.py b/tqdm/tests/tests_tqdm.py index c61d2ffd..7ca433e5 100644 --- a/tqdm/tests/tests_tqdm.py +++ b/tqdm/tests/tests_tqdm.py @@ -5,6 +5,9 @@ from __future__ import unicode_literals import csv import re +import os +from nose import with_setup +from nose.plugins.skip import SkipTest from tqdm import tqdm from tqdm import trange @@ -13,6 +16,9 @@ try: from StringIO import StringIO except: from io import StringIO + +from io import IOBase # to support unicode strings + # Ensure we can use `with closing(...) as ... :` syntax if getattr(StringIO, '__exit__', False) and \ getattr(StringIO, '__enter__', False): @@ -31,6 +37,21 @@ try: except NameError: _unicode = str +nt_and_no_colorama = False +if os.name == 'nt': + try: + import colorama # NOQA + except: + nt_and_no_colorama = True + +# Regex definitions +# List of control characters +CTRLCHR = [r'\r', r'\n', r'\x1b\[A'] # Need to escape [ for regex +# Regular expressions compilation +RE_rate = re.compile(r'(\d+\.\d+)it/s') +RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars +RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars + class DiscreteTimer(object): '''Virtual discrete time manager, to precisely control time for tests''' @@ -56,12 +77,56 @@ def cpu_timify(t, timer=None): return timer -RE_rate = re.compile(r'(\d+\.\d+)it/s') +def pretest(): + if getattr(tqdm, "_instances", False): + n = len(tqdm._instances) + if n: + tqdm._instances.clear() + raise EnvironmentError( + "{0} `tqdm` instances still in existence PRE-test".format(n)) + + +def posttest(): + if getattr(tqdm, "_instances", False): + n = len(tqdm._instances) + if n: + tqdm._instances.clear() + raise EnvironmentError( + "{0} `tqdm` instances still in existence POST-test".format(n)) + + +class UnicodeIO(IOBase): + ''' Unicode version of StringIO ''' + + def __init__(self, *args, **kwargs): + super(UnicodeIO, self).__init__(*args, **kwargs) + self.encoding = 'U8' # io.StringIO supports unicode, but no encoding + self.text = '' + self.cursor = 0 + + def seek(self, offset): + self.cursor = offset + + def tell(self): + return self.cursor + + def write(self, s): + self.text += s + self.cursor = len(self.text) + + def read(self): + return self.text[self.cursor:] + + def getvalue(self): + return self.text def get_bar(all_bars, i): """ Get a specific update from a whole bar traceback """ - return all_bars.strip('\r').split('\r')[i] + # Split according to any used control characters + bars_split = RE_ctrlchr_excl.split(all_bars) + bars_split = list(filter(None, bars_split)) # filter out empty splits + return bars_split[i] def progressbar_rate(bar_str): @@ -71,6 +136,7 @@ def progressbar_rate(bar_str): def test_format_interval(): """ Test time interval format """ format_interval = tqdm.format_interval + assert format_interval(60) == '01:00' assert format_interval(6160) == '1:42:40' assert format_interval(238113) == '66:08:33' @@ -86,9 +152,9 @@ def test_format_meter(): format_meter = tqdm.format_meter assert format_meter(0, 1000, 13) == \ - " 0%| | 0/1000 [00:13= (bigstep - 1) and ((i - (bigstep - 1)) % smallstep) == 0: - timer.sleep(1e-2) - if i >= 3 * bigstep: - break + for i in t2: + if i >= (bigstep - 1) and \ + ((i - (bigstep - 1)) % smallstep) == 0: + timer.sleep(1e-2) + if i >= 3 * bigstep: + break our_file.seek(0) - out = our_file.read() - assert "15%" in out + assert "15%" in our_file.read() +@with_setup(pretest, posttest) def test_min_iters(): """ Test miniters """ with closing(StringIO()) as our_file: @@ -300,8 +379,9 @@ def test_min_iters(): assert '| 3/3 ' in our_file.read() +@with_setup(pretest, posttest) def test_dynamic_min_iters(): - """ Test purely dynamic miniters """ + """ Test purely dynamic miniters (and manual updates and __del__) """ with closing(StringIO()) as our_file: total = 10 t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, @@ -319,11 +399,13 @@ def test_dynamic_min_iters(): our_file.seek(0) out = our_file.read() assert t.dynamic_miniters - assert ' 0%| | 0/10 [00:00<' in out - assert '40%' in out - assert '50%' not in out - assert '60%' not in out - assert '70%' in out + t.__del__() # simulate immediate del gc + + assert ' 0%| | 0/10 [00:00<' in out + assert '40%' in out + assert '50%' not in out + assert '60%' not in out + assert '70%' in out with closing(StringIO()) as our_file: t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None) @@ -338,6 +420,7 @@ def test_dynamic_min_iters(): assert not t.dynamic_miniters +@with_setup(pretest, posttest) def test_big_min_interval(): """ Test large mininterval """ with closing(StringIO()) as our_file: @@ -347,78 +430,80 @@ def test_big_min_interval(): assert '50%' not in our_file.read() with closing(StringIO()) as our_file: - t = tqdm(_range(2), file=our_file, mininterval=1E10) - t.update() - t.update() - our_file.seek(0) - assert '50%' not in our_file.read() + with tqdm(_range(2), file=our_file, mininterval=1E10) as t: + t.update() + t.update() + our_file.seek(0) + assert '50%' not in our_file.read() +@with_setup(pretest, posttest) def test_smoothed_dynamic_min_iters(): """ Test smoothed dynamic miniters """ timer = DiscreteTimer() with closing(StringIO()) as our_file: - total = 100 - t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, - smoothing=0.5, maxinterval=0) - cpu_timify(t, timer) + with tqdm(total=100, file=our_file, miniters=None, mininterval=0, + smoothing=0.5, maxinterval=0) as t: + cpu_timify(t, timer) - # Increase 10 iterations at once - t.update(10) - # The next iterations should be partially skipped - for _ in _range(2): - t.update(4) - for _ in _range(20): - t.update() + # Increase 10 iterations at once + t.update(10) + # The next iterations should be partially skipped + for _ in _range(2): + t.update(4) + for _ in _range(20): + t.update() - our_file.seek(0) - out = our_file.read() - assert t.dynamic_miniters - assert ' 0%| | 0/100 [00:00<' in out - assert '10%' in out - assert '14%' not in out - assert '18%' in out - assert '20%' not in out - assert '25%' in out - assert '30%' not in out - assert '32%' in out + our_file.seek(0) + out = our_file.read() + assert t.dynamic_miniters + assert ' 0%| | 0/100 [00:00<' in out + assert '10%' in out + assert '14%' not in out + assert '18%' in out + assert '20%' not in out + assert '25%' in out + assert '30%' not in out + assert '32%' in out +@with_setup(pretest, posttest) def test_smoothed_dynamic_min_iters_with_min_interval(): """ Test smoothed dynamic miniters with mininterval """ timer = DiscreteTimer() - # Basically in this test, miniters should gradually decline + # In this test, `miniters` should gradually decline + total = 100 + with closing(StringIO()) as our_file: - total = 100 - # Test manual updating tqdm - t = tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, - smoothing=1, maxinterval=0) - cpu_timify(t, timer) + with tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, + smoothing=1, maxinterval=0) as t: + cpu_timify(t, timer) - t.update(10) - timer.sleep(1e-2) - for _ in _range(4): - t.update() + t.update(10) timer.sleep(1e-2) - our_file.seek(0) - out = our_file.read() + for _ in _range(4): + t.update() + timer.sleep(1e-2) + our_file.seek(0) + out = our_file.read() + assert t.dynamic_miniters with closing(StringIO()) as our_file: # Test iteration-based tqdm - t2 = tqdm(_range(total), file=our_file, miniters=None, - mininterval=0.01, smoothing=1, maxinterval=0) - cpu_timify(t2, timer) + with tqdm(_range(total), file=our_file, miniters=None, + mininterval=0.01, smoothing=1, maxinterval=0) as t2: + cpu_timify(t2, timer) - for i in t2: - if i >= 10: - timer.sleep(0.1) - if i >= 14: - break - our_file.seek(0) - out2 = our_file.read() + for i in t2: + if i >= 10: + timer.sleep(0.1) + if i >= 14: + break + our_file.seek(0) + out2 = our_file.read() assert t.dynamic_miniters assert ' 0%| | 0/100 [00:00<' in out @@ -428,6 +513,7 @@ def test_smoothed_dynamic_min_iters_with_min_interval(): assert '14%' in out and '14%' in out2 +@with_setup(pretest, posttest) def test_disable(): """ Test disable """ with closing(StringIO()) as our_file: @@ -444,6 +530,7 @@ def test_disable(): assert our_file.read() == '' +@with_setup(pretest, posttest) def test_unit(): """ Test SI unit prefix """ with closing(StringIO()) as our_file: @@ -453,12 +540,13 @@ def test_unit(): assert 'bytes/s' in our_file.read() +@with_setup(pretest, posttest) def test_ascii(): """ Test ascii/unicode bar """ # Test ascii autodetection with closing(StringIO()) as our_file: - t = tqdm(total=10, file=our_file, ascii=None) - assert t.ascii # TODO: this may fail in the future + with tqdm(total=10, file=our_file, ascii=None) as t: + assert t.ascii # TODO: this may fail in the future # Test ascii bar with closing(StringIO()) as our_file: @@ -467,97 +555,108 @@ def test_ascii(): pass our_file.seek(0) res = our_file.read().strip("\r").split("\r") - assert '7%|6' in res[1] - assert '13%|#3' in res[2] - assert '20%|##' in res[3] + assert '7%|6' in res[1] + assert '13%|#3' in res[2] + assert '20%|##' in res[3] # Test unicode bar - from io import StringIO as uIO # supports unicode strings - with closing(uIO()) as our_file: - t = tqdm(total=15, file=our_file, ascii=False, mininterval=0) - for _ in _range(3): - t.update() + with closing(UnicodeIO()) as our_file: + with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t: + for _ in _range(3): + t.update() our_file.seek(0) res = our_file.read().strip("\r").split("\r") - assert "7%|\u258b" in res[1] - assert "13%|\u2588\u258e" in res[2] - assert "20%|\u2588\u2588" in res[3] + assert "7%|\u258b" in res[1] + assert "13%|\u2588\u258e" in res[2] + assert "20%|\u2588\u2588" in res[3] +@with_setup(pretest, posttest) def test_update(): """ Test manual creation and updates """ with closing(StringIO()) as our_file: - progressbar = tqdm(total=2, file=our_file, miniters=1, mininterval=0) - assert len(progressbar) == 2 - progressbar.update(2) - our_file.seek(0) - assert '| 2/2' in our_file.read() - progressbar.desc = 'dynamically notify of 4 increments in total' - progressbar.total = 4 - try: - progressbar.update(-10) - except ValueError as e: - if str(e) != "n (-10) cannot be less than 1": - raise - progressbar.update() # should default to +1 - else: - raise ValueError("Should not support negative updates") - our_file.seek(0) - assert '| 3/4 ' in our_file.read() - our_file.seek(0) - assert 'dynamically notify of 4 increments in total' in our_file.read() + with tqdm(total=2, file=our_file, miniters=1, mininterval=0) \ + as progressbar: + assert len(progressbar) == 2 + progressbar.update(2) + our_file.seek(0) + assert '| 2/2' in our_file.read() + progressbar.desc = 'dynamically notify of 4 increments in total' + progressbar.total = 4 + try: + progressbar.update(-10) + except ValueError as e: + if str(e) != "n (-10) cannot be less than 1": + raise + progressbar.update() # should default to +1 + else: + raise ValueError("Should not support negative updates") + our_file.seek(0) + res = our_file.read() + assert '| 3/4 ' in res + assert 'dynamically notify of 4 increments in total' in res +@with_setup(pretest, posttest) def test_close(): - """ Test manual creation and closure """ - # With `leave` option - with closing(StringIO()) as our_file: - progressbar = tqdm(total=3, file=our_file, miniters=10, leave=True) - progressbar.update(3) - our_file.seek(0) - assert '| 3/3 ' not in our_file.read() # Should be blank - progressbar.close() - our_file.seek(0) - assert '| 3/3 ' in our_file.read() + """ Test manual creation and closure and n_instances """ - # Without `leave` option + # With `leave` option with closing(StringIO()) as our_file: progressbar = tqdm(total=3, file=our_file, miniters=10) progressbar.update(3) + assert '| 3/3 ' not in our_file.getvalue() # Should be blank + assert len(tqdm._instances) == 1 progressbar.close() - our_file.seek(0) - assert '| 3/3 ' not in our_file.read() # Should be blank + assert len(tqdm._instances) == 0 + assert '| 3/3 ' in our_file.getvalue() + + # Without `leave` option + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=10, leave=False) + progressbar.update(3) + progressbar.close() + assert '| 3/3 ' not in our_file.getvalue() # Should be blank # With all updates with closing(StringIO()) as our_file: + assert len(tqdm._instances) == 0 with tqdm(total=3, file=our_file, miniters=0, mininterval=0, leave=True) as progressbar: + assert len(tqdm._instances) == 1 progressbar.update(3) - our_file.seek(0) - res = our_file.read() + res = our_file.getvalue() assert '| 3/3 ' in res # Should be blank # close() called + assert len(tqdm._instances) == 0 our_file.seek(0) - try: - assert res + '\n' == our_file.read() - except AssertionError: + + exres = res + '\n' + if exres != our_file.read(): our_file.seek(0) - raise AssertionError('\n'.join( - ('expected extra line (\\n) after `close`:', - 'before:', res, 'closed:', our_file.read(), 'end debug\n'))) + raise AssertionError("\nExpected:\n{0}\nGot:{1}\n".format( + exres, our_file.read())) + + # Closing after the output stream has closed + with closing(StringIO()) as our_file: + t = tqdm(total=2, file=our_file) + t.update() + t.update() + t.close() +@with_setup(pretest, posttest) def test_smoothing(): """ Test exponential weighted average smoothing """ timer = DiscreteTimer() # -- Test disabling smoothing with closing(StringIO()) as our_file: - t = tqdm(_range(3), file=our_file, smoothing=None, leave=True) - cpu_timify(t, timer) + with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t: + cpu_timify(t, timer) - for _ in t: - pass + for _ in t: + pass our_file.seek(0) assert '| 3/3 ' in our_file.read() @@ -570,26 +669,28 @@ def test_smoothing(): miniters=1, mininterval=0) cpu_timify(t, timer) - t2 = tqdm(_range(3), file=our_file, smoothing=None, leave=True, - miniters=1, mininterval=0) - cpu_timify(t2, timer) + with tqdm(_range(3), file=our_file, smoothing=None, leave=True, + miniters=1, mininterval=0) as t2: + cpu_timify(t2, timer) - for i in t2: - # Sleep more for first iteration and - # see how quickly rate is updated - if i == 0: - timer.sleep(0.01) - else: - # Need to sleep in all iterations to calculate smoothed rate - # (else delta_t is 0!) - timer.sleep(0.001) - t.update() + for i in t2: + # Sleep more for first iteration and + # see how quickly rate is updated + if i == 0: + timer.sleep(0.01) + else: + # Need to sleep in all iterations + # to calculate smoothed rate + # (else delta_t is 0!) + timer.sleep(0.001) + t.update() + n_old = len(tqdm._instances) + t.close() + assert len(tqdm._instances) == n_old - 1 # Get result for iter-based bar - our_file.seek(0) - a = progressbar_rate(get_bar(our_file.read(), 3)) + a = progressbar_rate(get_bar(our_file.getvalue(), 3)) # Get result for manually updated bar - our_file2.seek(0) - a2 = progressbar_rate(get_bar(our_file2.read(), 3)) + a2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) # 2nd case: use max smoothing (= instant rate) with closing(StringIO()) as our_file2: @@ -598,22 +699,21 @@ def test_smoothing(): miniters=1, mininterval=0) cpu_timify(t, timer) - t2 = tqdm(_range(3), file=our_file, smoothing=1, leave=True, - miniters=1, mininterval=0) - cpu_timify(t2, timer) + with tqdm(_range(3), file=our_file, smoothing=1, leave=True, + miniters=1, mininterval=0) as t2: + cpu_timify(t2, timer) - for i in t2: - if i == 0: - timer.sleep(0.01) - else: - timer.sleep(0.001) - t.update() + for i in t2: + if i == 0: + timer.sleep(0.01) + else: + timer.sleep(0.001) + t.update() + t.close() # Get result for iter-based bar - our_file.seek(0) - b = progressbar_rate(get_bar(our_file.read(), 3)) + b = progressbar_rate(get_bar(our_file.getvalue(), 3)) # Get result for manually updated bar - our_file2.seek(0) - b2 = progressbar_rate(get_bar(our_file2.read(), 3)) + b2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) # 3rd case: use medium smoothing with closing(StringIO()) as our_file2: @@ -632,159 +732,56 @@ def test_smoothing(): else: timer.sleep(0.001) t.update() + t2.close() + t.close() # Get result for iter-based bar - our_file.seek(0) - c = progressbar_rate(get_bar(our_file.read(), 3)) + c = progressbar_rate(get_bar(our_file.getvalue(), 3)) # Get result for manually updated bar - our_file2.seek(0) - c2 = progressbar_rate(get_bar(our_file2.read(), 3)) + c2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) # Check that medium smoothing's rate is between no and max smoothing rates assert a < c < b assert a2 < c2 < b2 -def test_nested(): +@with_setup(pretest, posttest) +def test_deprecated_nested(): """ Test nested progress bars """ - # Use regexp because the it rates can change - RE_nested = re.compile(r'((\x1b\[A|\r|\n)+((outer|inner) loop:\s+\d+%|\s{3,6})?)') # NOQA - RE_nested2 = re.compile(r'((\x1b\[A|\r|\n)+((outer0|inner1|inner2) loop:\s+\d+%|\s{3,6})?)') # NOQA + if nt_and_no_colorama: + raise SkipTest + # TODO: test degradation on windows without colorama? # Artificially test nested loop printing # Without leave our_file = StringIO() - t = tqdm(total=2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='inner loop', leave=False, nested=True) - t.update() - t.close() - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_nested.findall(out)] - assert res == ['\n\rinner loop: 0%', - '\rinner loop: 50%', - '\r ', - '\r\x1b[A'] - - # With leave - our_file = StringIO() - t = tqdm(total=2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='inner loop', leave=True, nested=True) - t.update() - t.close() - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_nested.findall(out)] - assert res == ['\n\rinner loop: 0%', - '\rinner loop: 50%', - '\r\x1b[A'] - - # Test simple nested, without leave - our_file = StringIO() - for i in trange(2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='outer loop', leave=True): - for j in trange(4, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='inner loop', nested=True): - pass - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_nested.findall(out)] - assert res == ['\router loop: 0%', - '\n\rinner loop: 0%', - '\rinner loop: 25%', - '\rinner loop: 50%', - '\rinner loop: 75%', - '\rinner loop: 100%', - '\r ', - '\r\x1b[A\router loop: 50%', - '\n\rinner loop: 0%', - '\rinner loop: 25%', - '\rinner loop: 50%', - '\rinner loop: 75%', - '\rinner loop: 100%', - '\r ', - '\r\x1b[A\router loop: 100%', - '\n'] - - # Test nested with leave - our_file = StringIO() - for i in trange(2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='outer loop', leave=True): - for j in trange(4, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='inner loop', leave=True, - nested=True): - pass - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_nested.findall(out)] - assert res == ['\router loop: 0%', - '\n\rinner loop: 0%', - '\rinner loop: 25%', - '\rinner loop: 50%', - '\rinner loop: 75%', - '\rinner loop: 100%', - '\r\x1b[A\router loop: 50%', - '\n\rinner loop: 0%', - '\rinner loop: 25%', - '\rinner loop: 50%', - '\rinner loop: 75%', - '\rinner loop: 100%', - '\r\x1b[A\router loop: 100%', - '\n'] - - # Test 2 nested loops with leave - our_file = StringIO() - for i in trange(2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='outer0 loop', leave=True): - for j in trange(2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='inner1 loop', leave=True, - nested=True): - for k in trange(2, file=our_file, miniters=1, mininterval=0, - maxinterval=0, desc='inner2 loop', leave=True, - nested=True): - pass - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_nested2.findall(out)] - assert res == ['\router0 loop: 0%', - '\n\rinner1 loop: 0%', - '\n\rinner2 loop: 0%', - '\rinner2 loop: 50%', - '\rinner2 loop: 100%', - '\r\x1b[A\rinner1 loop: 50%', - '\n\rinner2 loop: 0%', - '\rinner2 loop: 50%', - '\rinner2 loop: 100%', - '\r\x1b[A\rinner1 loop: 100%', - '\r\x1b[A\router0 loop: 50%', - '\n\rinner1 loop: 0%', - '\n\rinner2 loop: 0%', - '\rinner2 loop: 50%', - '\rinner2 loop: 100%', - '\r\x1b[A\rinner1 loop: 50%', - '\n\rinner2 loop: 0%', - '\rinner2 loop: 50%', - '\rinner2 loop: 100%', - '\r\x1b[A\rinner1 loop: 100%', - '\r\x1b[A\router0 loop: 100%', - '\n'] - # TODO: test degradation on windows without colorama? + try: + tqdm(total=2, file=our_file, nested=True) + except DeprecationWarning as e: + if str(e) != ("nested is deprecated and automated.\nUse position" + " instead for manual control"): + raise + else: + raise DeprecationWarning("Should not allow nested kwarg") +@with_setup(pretest, posttest) def test_bar_format(): - ''' Test custom bar formatting''' + """ Test custom bar formatting """ with closing(StringIO()) as our_file: bar_format = r'{l_bar}{bar}|{n_fmt}/{total_fmt}-{n}/{total}{percentage}{rate}{rate_fmt}{elapsed}{remaining}' # NOQA for i in trange(2, file=our_file, leave=True, bar_format=bar_format): pass out = our_file.getvalue() - assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out + assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out - # Test unicode string auto conversion + # Test unicode string auto conversion + with closing(StringIO()) as our_file: bar_format = r'hello world' - t = tqdm(file=our_file, ascii=False, bar_format=bar_format) - assert isinstance(t.bar_format, _unicode) + with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t: + assert isinstance(t.bar_format, _unicode) +@with_setup(pretest, posttest) def test_unpause(): """ Test unpause """ timer = DiscreteTimer() @@ -807,18 +804,156 @@ def test_unpause(): assert r_before == r_after +@with_setup(pretest, posttest) +def test_position(): + """ Test positioned progress bars """ + if nt_and_no_colorama: + raise SkipTest + + # Use regexp because the it rates can change + RE_pos = re.compile(r'((\x1b\[A|\r|\n)+((pos\d+) bar:\s+\d+%|\s{3,6})?)') # NOQA + + # Artificially test nested loop printing + # Without leave + our_file = StringIO() + t = tqdm(total=2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos2 bar', leave=False, position=2) + t.update() + t.close() + our_file.seek(0) + out = our_file.read() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', + '\x1b[A\x1b[A\n\n\r ', + '\x1b[A\x1b[A'] + if res != exres: + raise AssertionError("\nExpected:\n{0}\nGot:\n{1}\nRaw:\n{2}\n".format( + str(exres), str(res), str([out]))) + + # Test iteration-based tqdm positioning + our_file = StringIO() + for i in trange(2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos0 bar', position=0): + for j in trange(2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos1 bar', position=1): + for k in trange(2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos2 bar', position=2): + pass + our_file.seek(0) + out = our_file.read() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', + '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 50%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', + '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 100%', + '\x1b[A\n\x1b[A\rpos0 bar: 50%', + '\n\rpos1 bar: 0%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', + '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 50%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', + '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', + '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 100%', + '\x1b[A\n\x1b[A\rpos0 bar: 100%', + '\n'] + if res != exres: + raise AssertionError("\nExpected:\n{0}\nGot:\n{1}\nRaw:\n{2}\n".format( + str(exres), str(res), str([out]))) + + # Test manual tqdm positioning + our_file = StringIO() + t1 = tqdm(total=2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos0 bar', position=0) + t2 = tqdm(total=2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos1 bar', position=1) + t3 = tqdm(total=2, file=our_file, miniters=1, mininterval=0, + maxinterval=0, desc='pos2 bar', position=2) + for i in _range(2): + t1.update() + t3.update() + t2.update() + our_file.seek(0) + out = our_file.read() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\rpos0 bar: 50%', + '\n\n\rpos2 bar: 50%', + '\x1b[A\x1b[A\n\rpos1 bar: 50%', + '\x1b[A\rpos0 bar: 100%', + '\n\n\rpos2 bar: 100%', + '\x1b[A\x1b[A\n\rpos1 bar: 100%', + '\x1b[A'] + if res != exres: + raise AssertionError("\nExpected:\n{0}\nGot:\n{1}\nRaw:\n{2}\n".format( + str(exres), str(res), str([out]))) + t1.close() + t2.close() + t3.close() + + # Test auto repositionning of bars when a bar is prematurely closed + # tqdm._instances.clear() # reset number of instances + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', mininterval=0) + t2 = tqdm(total=10, file=our_file, desc='pos1 bar', mininterval=0) + t3 = tqdm(total=10, file=our_file, desc='pos2 bar', mininterval=0) + res = [m[0] for m in RE_pos.findall(our_file.getvalue())] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A'] + if res != exres: + raise AssertionError( + "\nExpected:\n{0}\nGot:\n{1}\n".format( + str(exres), str(res))) + + t2.close() + t4 = tqdm(total=10, file=our_file, desc='pos3 bar', mininterval=0) + t1.update(1) + t3.update(1) + t4.update(1) + res = [m[0] for m in RE_pos.findall(our_file.getvalue())] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\x1b[A\n\n\rpos2 bar: 0%', + '\x1b[A\x1b[A\n\x1b[A\n\n\rpos3 bar: 0%', + '\x1b[A\x1b[A\rpos0 bar: 10%', + '\n\rpos2 bar: 10%', + '\x1b[A\n\n\rpos3 bar: 10%', + '\x1b[A\x1b[A'] + if res != exres: + raise AssertionError( + "\nExpected:\n{0}\nGot:\n{1}\n".format( + str(exres), str(res))) + t4.close() + t3.close() + t1.close() + + +@with_setup(pretest, posttest) def test_set_description(): """ Test set description """ with closing(StringIO()) as our_file: - t = tqdm(file=our_file, desc='Hello') - assert t.desc == 'Hello: ' - t.set_description('World') - assert t.desc == 'World: ' - t.set_description() - assert t.desc == '' + with tqdm(desc='Hello', file=our_file) as t: + assert t.desc == 'Hello: ' + t.set_description('World') + assert t.desc == 'World: ' + t.set_description() + assert t.desc == '' -def test_no_gui(): +@with_setup(pretest, posttest) +def test_deprecated_gui(): """ Test internal GUI properties """ # Check: StatusPrinter iff gui is disabled with closing(StringIO()) as our_file: @@ -826,21 +961,71 @@ def test_no_gui(): assert not hasattr(t, "sp") try: t.update(1) - except DeprecationWarning: - pass + except DeprecationWarning as e: + if str(e) != ('Please use tqdm_gui(...) instead of' + ' tqdm(..., gui=True)'): + raise else: raise DeprecationWarning('Should not allow manual gui=True without' ' overriding __iter__() and update()') + finally: + t._instances.clear() + # t.close() + # len(tqdm._instances) += 1 # undo the close() decrement + t = tqdm(_range(3), gui=True, file=our_file, + miniters=1, mininterval=0) try: - for _ in tqdm(_range(3), gui=True, file=our_file, - miniters=1, mininterval=0): + for _ in t: pass - except DeprecationWarning: - pass + except DeprecationWarning as e: + if str(e) != ('Please use tqdm_gui(...) instead of' + ' tqdm(..., gui=True)'): + raise e else: raise DeprecationWarning('Should not allow manual gui=True without' ' overriding __iter__() and update()') + finally: + t._instances.clear() + # t.close() + # len(tqdm._instances) += 1 # undo the close() decrement - t = tqdm(total=1, gui=False, file=our_file) - assert hasattr(t, "sp") + with tqdm(total=1, gui=False, file=our_file) as t: + assert hasattr(t, "sp") + + +@with_setup(pretest, posttest) +def test_cmp(): + """ Test comparison functions """ + with closing(StringIO()) as our_file: + t0 = tqdm(total=10, file=our_file) + t1 = tqdm(total=10, file=our_file) + t2 = tqdm(total=10, file=our_file) + + assert t0 < t1 + assert t2 >= t0 + assert t0 <= t2 + + t3 = tqdm(total=10, file=our_file) + t4 = tqdm(total=10, file=our_file) + t5 = tqdm(total=10, file=our_file) + t5.close() + t6 = tqdm(total=10, file=our_file) + + assert t3 != t4 + assert t3 > t2 + assert t5 == t6 + t6.close() + t4.close() + t3.close() + t2.close() + t1.close() + t0.close() + + +@with_setup(pretest, posttest) +def test_repr(): + """ Test representation """ + with closing(StringIO()) as our_file: + with tqdm(total=10, ascii=True, file=our_file) as t: + assert str(t) == ' 0%| | 0/10 [00:00