Compare commits

...

5 Commits

Author SHA1 Message Date
Victor Shih 843d675730
Update iterutils.py (#313)
Spelling
2022-12-08 02:11:58 -08:00
Mezbaul Haque 93185b224e
Fix infinite daterange issue when start and stop is same (#302)
Co-authored-by: Mahmoud Hashemi <mahmoud@hatnote.com>
2022-12-08 01:51:37 -08:00
bwatson-dk 6505c6b556
Fixes and improvements to SpooledBytesIO & SpooledStringIO (#305)
- Inherit from IOBase so instance & type checks are simpler
- __eq__ and __ne__ no longer need to load both files completely into memory
- __eq__ and __ne__ reset read-head in comparison errors
- Some methods now raise a ValueError if the file is closed. This brings the implementations closer to StringIO and implementations from the io module

Co-authored-by: Brant Watson <oldspiceap@gmail.com>
2022-12-08 01:45:47 -08:00
Induane 9420cfeb3b
Add more documentation to multi_replace (#306)
- Fixed some documentation copypasta
- Added example usage to multi_replace function
- Removed some old Python 2.6 string formatting
2022-12-08 01:38:13 -08:00
Mahmoud Hashemi d35693e875
Refresh CI matrix (#320)
* add py3.10 to gha and tox
* drop pypy2 from tox/gha
* drop 3.6 from gha
2022-12-08 01:28:02 -08:00
12 changed files with 213 additions and 69 deletions

View File

@ -18,18 +18,17 @@ jobs:
fail-fast: false
matrix:
include:
- {name: Linux, python: '3.9', os: ubuntu-latest, tox: py39}
- {name: Windows, python: '3.9', os: windows-latest, tox: py39}
- {name: Mac, python: '3.9', os: macos-latest, tox: py39}
- {name: Linux, python: '3.10', os: ubuntu-latest, tox: py310}
- {name: Windows, python: '3.10', os: windows-latest, tox: py310}
- {name: Mac, python: '3.10', os: macos-latest, tox: py310}
- {name: '3.9', python: '3.9', os: ubuntu-latest, tox: py39}
- {name: '3.8', python: '3.8', os: ubuntu-latest, tox: py38}
- {name: '3.7', python: '3.7', os: ubuntu-latest, tox: py37}
- {name: '3.6', python: '3.6', os: ubuntu-latest, tox: py36}
- {name: '2.7', python: '2.7', os: ubuntu-latest, tox: py27}
- {name: 'PyPy2', python: 'pypy2', os: ubuntu-latest, tox: pypy}
- {name: 'PyPy3', python: 'pypy3', os: ubuntu-latest, tox: pypy3}
- {name: '2.7', python: '2.7.18', os: ubuntu-latest, tox: py27}
- {name: 'PyPy3', python: 'pypy-3.9', os: ubuntu-latest, tox: pypy3}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}
- name: update pip

2
.gitignore vendored
View File

@ -1,7 +1,7 @@
docs/_build
tmp.py
htmlcov/
venv/
*.py[cod]
# emacs

View File

@ -40,7 +40,7 @@ are useful when dealing with input, output, and bytestreams in a variety of
ways.
"""
import os
from io import BytesIO
from io import BytesIO, IOBase
from abc import (
ABCMeta,
abstractmethod,
@ -50,6 +50,11 @@ from errno import EINVAL
from codecs import EncodedFile
from tempfile import TemporaryFile
try:
from itertools import izip_longest as zip_longest # Python 2
except ImportError:
from itertools import zip_longest # Python 3
try:
text_type = unicode # Python 2
binary_type = str
@ -66,16 +71,14 @@ value.
"""
class SpooledIOBase(object):
class SpooledIOBase(IOBase):
"""
The SpooledTempoaryFile class doesn't support a number of attributes and
methods that a StringIO instance does. This brings the api as close to
compatible as possible with StringIO so that it may be used as a near
drop-in replacement to save memory.
A base class shared by the SpooledBytesIO and SpooledStringIO classes.
Another issue with SpooledTemporaryFile is that the spooled file is always
a cStringIO rather than a StringIO which causes issues with some of our
tools.
The SpooledTemporaryFile class is missing several attributes and methods
present in the StringIO implementation. This brings the api as close to
parity as possible so that classes derived from SpooledIOBase can be used
as near drop-in replacements to save memory.
"""
__metaclass__ = ABCMeta
@ -83,6 +86,11 @@ class SpooledIOBase(object):
self._max_size = max_size
self._dir = dir
def _checkClosed(self, msg=None):
"""Raise a ValueError if file is closed"""
if self.closed:
raise ValueError('I/O operation on closed file.'
if msg is None else msg)
@abstractmethod
def read(self, n=-1):
"""Read n characters from the buffer"""
@ -103,6 +111,16 @@ class SpooledIOBase(object):
def readlines(self, sizehint=0):
"""Returns a list of all lines from the current position forward"""
def writelines(self, lines):
"""
Write lines to the file from an interable.
NOTE: writelines() does NOT add line separators.
"""
self._checkClosed()
for line in lines:
self.write(line)
@abstractmethod
def rollover(self):
"""Roll file-like-object over into a real temporary file"""
@ -139,22 +157,13 @@ class SpooledIOBase(object):
return self.buffer.close()
def flush(self):
self._checkClosed()
return self.buffer.flush()
def isatty(self):
self._checkClosed()
return self.buffer.isatty()
def next(self):
line = self.readline()
if not line:
pos = self.buffer.tell()
self.buffer.seek(0, os.SEEK_END)
if pos == self.buffer.tell():
raise StopIteration
else:
self.buffer.seek(pos)
return line
@property
def closed(self):
return self.buffer.closed
@ -173,10 +182,13 @@ class SpooledIOBase(object):
def truncate(self, size=None):
"""
Truncate the contents of the buffer.
Custom version of truncate that takes either no arguments (like the
real SpooledTemporaryFile) or a single argument that truncates the
value to a certain index location.
"""
self._checkClosed()
if size is None:
return self.buffer.truncate()
@ -191,7 +203,8 @@ class SpooledIOBase(object):
self.seek(pos)
def getvalue(self):
"""Return the entire files contents"""
"""Return the entire files contents."""
self._checkClosed()
pos = self.tell()
self.seek(0)
val = self.read()
@ -207,15 +220,29 @@ class SpooledIOBase(object):
def writable(self):
return True
__next__ = next
def __next__(self):
self._checkClosed()
line = self.readline()
if not line:
pos = self.buffer.tell()
self.buffer.seek(0, os.SEEK_END)
if pos == self.buffer.tell():
raise StopIteration
else:
self.buffer.seek(pos)
return line
next = __next__
def __len__(self):
return self.len
def __iter__(self):
self._checkClosed()
return self
def __enter__(self):
self._checkClosed()
return self
def __exit__(self, *args):
@ -223,7 +250,31 @@ class SpooledIOBase(object):
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.getvalue() == other.getvalue()
self_pos = self.tell()
other_pos = other.tell()
try:
self.seek(0)
other.seek(0)
eq = True
for self_line, other_line in zip_longest(self, other):
if self_line != other_line:
eq = False
break
self.seek(self_pos)
other.seek(other_pos)
except Exception:
# Attempt to return files to original position if there were any errors
try:
self.seek(self_pos)
except Exception:
pass
try:
other.seek(other_pos)
except Exception:
pass
raise
else:
return eq
return False
def __ne__(self, other):
@ -232,6 +283,13 @@ class SpooledIOBase(object):
def __bool__(self):
return True
def __del__(self):
"""Can fail when called at program exit so suppress traceback."""
try:
self.close()
except Exception:
pass
__nonzero__ = __bool__
@ -253,11 +311,13 @@ class SpooledBytesIO(SpooledIOBase):
"""
def read(self, n=-1):
self._checkClosed()
return self.buffer.read(n)
def write(self, s):
self._checkClosed()
if not isinstance(s, binary_type):
raise TypeError("{0} expected, got {1}".format(
raise TypeError("{} expected, got {}".format(
binary_type.__name__,
type(s).__name__
))
@ -267,9 +327,11 @@ class SpooledBytesIO(SpooledIOBase):
self.buffer.write(s)
def seek(self, pos, mode=0):
self._checkClosed()
return self.buffer.seek(pos, mode)
def readline(self, length=None):
self._checkClosed()
if length:
return self.buffer.readline(length)
else:
@ -314,6 +376,7 @@ class SpooledBytesIO(SpooledIOBase):
return val
def tell(self):
self._checkClosed()
return self.buffer.tell()
@ -340,13 +403,15 @@ class SpooledStringIO(SpooledIOBase):
super(SpooledStringIO, self).__init__(*args, **kwargs)
def read(self, n=-1):
self._checkClosed()
ret = self.buffer.reader.read(n, n)
self._tell = self.tell() + len(ret)
return ret
def write(self, s):
self._checkClosed()
if not isinstance(s, text_type):
raise TypeError("{0} expected, got {1}".format(
raise TypeError("{} expected, got {}".format(
text_type.__name__,
type(s).__name__
))
@ -384,6 +449,7 @@ class SpooledStringIO(SpooledIOBase):
def seek(self, pos, mode=0):
"""Traverse from offset to the specified codepoint"""
self._checkClosed()
# Seek to position from the start of the file
if mode == os.SEEK_SET:
self.buffer.seek(0)
@ -406,6 +472,7 @@ class SpooledStringIO(SpooledIOBase):
return self.tell()
def readline(self, length=None):
self._checkClosed()
ret = self.buffer.readline(length).decode('utf-8')
self._tell = self.tell() + len(ret)
return ret
@ -428,7 +495,7 @@ class SpooledStringIO(SpooledIOBase):
return not isinstance(self.buffer.stream, BytesIO)
def rollover(self):
"""Roll the StringIO over to a TempFile"""
"""Roll the buffer over to a TempFile"""
if not self._rolled:
tmp = EncodedFile(TemporaryFile(dir=self._dir),
data_encoding='utf-8')
@ -440,6 +507,7 @@ class SpooledStringIO(SpooledIOBase):
def tell(self):
"""Return the codepoint position"""
self._checkClosed()
return self._tell
@property

View File

@ -208,7 +208,7 @@ def split_iter(src, sep=None, maxsplit=None):
def lstrip(iterable, strip_value=None):
"""Strips values from the beginning of an iterable. Stripped items will
match the value of the argument strip_value. Functionality is analigous
match the value of the argument strip_value. Functionality is analogous
to that of the method str.lstrip. Returns a list.
>>> lstrip(['Foo', 'Bar', 'Bam'], 'Foo')
@ -220,7 +220,7 @@ def lstrip(iterable, strip_value=None):
def lstrip_iter(iterable, strip_value=None):
"""Strips values from the beginning of an iterable. Stripped items will
match the value of the argument strip_value. Functionality is analigous
match the value of the argument strip_value. Functionality is analogous
to that of the method str.lstrip. Returns a generator.
>>> list(lstrip_iter(['Foo', 'Bar', 'Bam'], 'Foo'))
@ -238,7 +238,7 @@ def lstrip_iter(iterable, strip_value=None):
def rstrip(iterable, strip_value=None):
"""Strips values from the end of an iterable. Stripped items will
match the value of the argument strip_value. Functionality is analigous
match the value of the argument strip_value. Functionality is analogous
to that of the method str.rstrip. Returns a list.
>>> rstrip(['Foo', 'Bar', 'Bam'], 'Bam')
@ -250,7 +250,7 @@ def rstrip(iterable, strip_value=None):
def rstrip_iter(iterable, strip_value=None):
"""Strips values from the end of an iterable. Stripped items will
match the value of the argument strip_value. Functionality is analigous
match the value of the argument strip_value. Functionality is analogous
to that of the method str.rstrip. Returns a generator.
>>> list(rstrip_iter(['Foo', 'Bar', 'Bam'], 'Bam'))
@ -279,7 +279,7 @@ def rstrip_iter(iterable, strip_value=None):
def strip(iterable, strip_value=None):
"""Strips values from the beginning and end of an iterable. Stripped items
will match the value of the argument strip_value. Functionality is
analigous to that of the method str.strip. Returns a list.
analogous to that of the method str.strip. Returns a list.
>>> strip(['Fu', 'Foo', 'Bar', 'Bam', 'Fu'], 'Fu')
['Foo', 'Bar', 'Bam']
@ -291,7 +291,7 @@ def strip(iterable, strip_value=None):
def strip_iter(iterable,strip_value=None):
"""Strips values from the beginning and end of an iterable. Stripped items
will match the value of the argument strip_value. Functionality is
analigous to that of the method str.strip. Returns a generator.
analogous to that of the method str.strip. Returns a generator.
>>> list(strip_iter(['Fu', 'Foo', 'Bar', 'Bam', 'Fu'], 'Fu'))
['Foo', 'Bar', 'Bam']

View File

@ -1176,7 +1176,7 @@ class MultiReplace(object):
Dictionary Usage::
from lrmslib import stringutils
from boltons import stringutils
s = stringutils.MultiReplace({
'foo': 'zoo',
'cat': 'hat',
@ -1187,7 +1187,7 @@ class MultiReplace(object):
Iterable Usage::
from lrmslib import stringutils
from boltons import stringutils
s = stringutils.MultiReplace([
('foo', 'zoo'),
('cat', 'hat'),
@ -1239,10 +1239,7 @@ class MultiReplace(object):
else:
exp = vals[0].pattern
regex_values.append('(?P<{0}>{1})'.format(
group_name,
exp
))
regex_values.append('(?P<{}>{})'.format(group_name, exp))
self.group_map[group_name] = vals[1]
self.combined_pattern = re.compile(
@ -1267,7 +1264,18 @@ class MultiReplace(object):
def multi_replace(text, sub_map, **kwargs):
"""Shortcut function to invoke MultiReplace in a single call."""
"""
Shortcut function to invoke MultiReplace in a single call.
Example Usage::
from boltons.stringutils import multi_replace
new = multi_replace(
'The foo bar cat ate a bat',
{'foo': 'zoo', 'cat': 'hat', 'bat': 'kraken'}
)
new == 'The zoo bar hat ate a kraken'
"""
m = MultiReplace(sub_map, **kwargs)
return m.sub(text)

View File

@ -382,7 +382,7 @@ def daterange(start, stop, step=1, inclusive=False):
if stop is None:
finished = lambda now, stop: False
elif start < stop:
elif start <= stop:
finished = operator.gt if inclusive else operator.ge
else:
finished = operator.lt if inclusive else operator.le

View File

@ -1,4 +1,4 @@
coverage==4.5.1
pytest==3.0.7
pytest-cov==2.5.1
coverage==6.5.0
pytest==7.2.0
pytest-cov==4.0.0
tox<3.0

View File

@ -29,15 +29,8 @@ _TEST_TMPLS = ["example 1: {hello}",
"example 2: {hello:*10}",
"example 3: {hello:*{width}}",
"example 4: {hello!r:{fchar}{width}}, {width}, yes",
"example 5: {0}, {1:d}, {2:f}, {1}"]
try:
from collections import OrderedDict
except ImportError:
pass # skip the non-2.6 compatible tests on 2.6
else:
_TEST_TMPLS.append("example 6: {}, {}, {}, {1}")
del OrderedDict
"example 5: {0}, {1:d}, {2:f}, {1}",
"example 6: {}, {}, {}, {1}"]
def test_get_fstr_args():
@ -45,8 +38,7 @@ def test_get_fstr_args():
for t in _TEST_TMPLS:
inferred_t = infer_positional_format_args(t)
res = get_format_args(inferred_t)
results.append(res)
return results
assert res
def test_split_fstr():
@ -54,7 +46,7 @@ def test_split_fstr():
for t in _TEST_TMPLS:
res = split_format_str(t)
results.append(res)
return results
assert res
def test_tokenize_format_str():
@ -62,7 +54,7 @@ def test_tokenize_format_str():
for t in _TEST_TMPLS:
res = tokenize_format_str(t)
results.append(res)
return results
assert res
def test_deferredvalue():

View File

@ -75,6 +75,29 @@ class BaseTestMixin(object):
finally:
os.rmdir(custom_dir)
def test_compare_err(self):
"""Read-heads are reset if a comparison raises an error."""
def _monkey_err(*args, **kwargs):
raise Exception('A sad error has occurred today')
a = self.spooled_flo.__class__()
a.write(self.test_str)
b = self.spooled_flo.__class__()
b.write(self.test_str)
a.seek(1)
b.seek(2)
b.__next__ = _monkey_err
try:
a == b
except Exception:
pass
self.assertEqual(a.tell(), 1)
self.assertEqual(b.tell(), 2)
def test_truncate_noargs_norollover(self):
"""Test truncating with no args with in-memory flo"""
self.spooled_flo.write(self.test_str)
@ -190,6 +213,23 @@ class BaseTestMixin(object):
if not self.spooled_flo:
raise AssertionError("Instance is not truthy")
def test_instance_check(self):
"""Instance checks against IOBase succeed."""
if not isinstance(self.spooled_flo, io.IOBase):
raise AssertionError('{} is not an instance of IOBase'.format(type(self.spooled_flo)))
def test_closed_file_method_valueerrors(self):
"""ValueError raised on closed files for certain methods."""
self.spooled_flo.close()
methods = (
'flush', 'isatty', 'pos', 'buf', 'truncate', '__next__', '__iter__',
'__enter__', 'read', 'readline', 'tell',
)
for method_name in methods:
with self.assertRaises(ValueError):
getattr(self.spooled_flo, method_name)()
class TestSpooledBytesIO(TestCase, BaseTestMixin, AssertionsMixin):
linesep = os.linesep.encode('ascii')
@ -270,6 +310,13 @@ class TestSpooledBytesIO(TestCase, BaseTestMixin, AssertionsMixin):
self.spooled_flo.seek(0)
self.assertEqual([x for x in self.spooled_flo], [b"a\n", b"b"])
def test_writelines(self):
"""An iterable of lines can be written"""
lines = [b"1", b"2", b"3"]
expected = b"123"
self.spooled_flo.writelines(lines)
self.assertEqual(self.spooled_flo.getvalue(), expected)
class TestSpooledStringIO(TestCase, BaseTestMixin, AssertionsMixin):
linesep = os.linesep
@ -408,6 +455,13 @@ class TestSpooledStringIO(TestCase, BaseTestMixin, AssertionsMixin):
self.spooled_flo.seek(0)
self.assertEqual([x for x in self.spooled_flo], [u"a\n", u"b"])
def test_writelines(self):
"""An iterable of lines can be written"""
lines = [u"1", u"2", u"3"]
expected = u"123"
self.spooled_flo.writelines(lines)
self.assertEqual(self.spooled_flo.getvalue(), expected)
class TestMultiFileReader(TestCase):
def test_read_seek_bytes(self):

View File

@ -1,5 +1,8 @@
from datetime import timedelta, date
import pytest
from boltons.timeutils import total_seconds, daterange
@ -67,3 +70,16 @@ def test_daterange_infinite():
infinite_dates = daterange(today, None)
for i in range(10):
assert next(infinite_dates) == today + timedelta(days=i)
def test_daterange_with_same_start_stop():
today = date.today()
date_range = daterange(today, today)
with pytest.raises(StopIteration):
next(date_range)
date_range_inclusive = daterange(today, today, inclusive=True)
assert next(date_range_inclusive) == today
with pytest.raises(StopIteration):
next(date_range_inclusive)

View File

@ -99,7 +99,7 @@ def test_idna():
def test_query_params(test_url):
url_obj = URL(test_url)
if not url_obj.query_params or url_obj.fragment:
return True
return
qp_text = url_obj.query_params.to_text(full_quote=True)
assert test_url.endswith(qp_text)

View File

@ -1,6 +1,13 @@
[tox]
envlist = py27,py34,py37,py39,pypy
envlist = py27,py37,py39,py310,pypy3
[testenv]
changedir = .tox
deps = -rrequirements-test.txt
commands = py.test --doctest-modules {envsitepackagesdir}/boltons {toxinidir}/tests {posargs}
[testenv:py27]
deps =
coverage==5.5
pytest==4.6.11
pytest-cov==2.12