mirror of https://github.com/mahmoud/boltons.git
Compare commits
5 Commits
8ba3a7fa0d
...
843d675730
Author | SHA1 | Date |
---|---|---|
Victor Shih | 843d675730 | |
Mezbaul Haque | 93185b224e | |
bwatson-dk | 6505c6b556 | |
Induane | 9420cfeb3b | |
Mahmoud Hashemi | d35693e875 |
|
@ -18,18 +18,17 @@ jobs:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
include:
|
include:
|
||||||
- {name: Linux, python: '3.9', os: ubuntu-latest, tox: py39}
|
- {name: Linux, python: '3.10', os: ubuntu-latest, tox: py310}
|
||||||
- {name: Windows, python: '3.9', os: windows-latest, tox: py39}
|
- {name: Windows, python: '3.10', os: windows-latest, tox: py310}
|
||||||
- {name: Mac, python: '3.9', os: macos-latest, tox: py39}
|
- {name: Mac, python: '3.10', os: macos-latest, tox: py310}
|
||||||
|
- {name: '3.9', python: '3.9', os: ubuntu-latest, tox: py39}
|
||||||
- {name: '3.8', python: '3.8', os: ubuntu-latest, tox: py38}
|
- {name: '3.8', python: '3.8', os: ubuntu-latest, tox: py38}
|
||||||
- {name: '3.7', python: '3.7', os: ubuntu-latest, tox: py37}
|
- {name: '3.7', python: '3.7', os: ubuntu-latest, tox: py37}
|
||||||
- {name: '3.6', python: '3.6', os: ubuntu-latest, tox: py36}
|
- {name: '2.7', python: '2.7.18', os: ubuntu-latest, tox: py27}
|
||||||
- {name: '2.7', python: '2.7', os: ubuntu-latest, tox: py27}
|
- {name: 'PyPy3', python: 'pypy-3.9', os: ubuntu-latest, tox: pypy3}
|
||||||
- {name: 'PyPy2', python: 'pypy2', os: ubuntu-latest, tox: pypy}
|
|
||||||
- {name: 'PyPy3', python: 'pypy3', os: ubuntu-latest, tox: pypy3}
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-python@v2
|
- uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: ${{ matrix.python }}
|
python-version: ${{ matrix.python }}
|
||||||
- name: update pip
|
- name: update pip
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
docs/_build
|
docs/_build
|
||||||
tmp.py
|
tmp.py
|
||||||
htmlcov/
|
htmlcov/
|
||||||
|
venv/
|
||||||
*.py[cod]
|
*.py[cod]
|
||||||
|
|
||||||
# emacs
|
# emacs
|
||||||
|
|
|
@ -40,7 +40,7 @@ are useful when dealing with input, output, and bytestreams in a variety of
|
||||||
ways.
|
ways.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
from io import BytesIO
|
from io import BytesIO, IOBase
|
||||||
from abc import (
|
from abc import (
|
||||||
ABCMeta,
|
ABCMeta,
|
||||||
abstractmethod,
|
abstractmethod,
|
||||||
|
@ -50,6 +50,11 @@ from errno import EINVAL
|
||||||
from codecs import EncodedFile
|
from codecs import EncodedFile
|
||||||
from tempfile import TemporaryFile
|
from tempfile import TemporaryFile
|
||||||
|
|
||||||
|
try:
|
||||||
|
from itertools import izip_longest as zip_longest # Python 2
|
||||||
|
except ImportError:
|
||||||
|
from itertools import zip_longest # Python 3
|
||||||
|
|
||||||
try:
|
try:
|
||||||
text_type = unicode # Python 2
|
text_type = unicode # Python 2
|
||||||
binary_type = str
|
binary_type = str
|
||||||
|
@ -66,16 +71,14 @@ value.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
class SpooledIOBase(object):
|
class SpooledIOBase(IOBase):
|
||||||
"""
|
"""
|
||||||
The SpooledTempoaryFile class doesn't support a number of attributes and
|
A base class shared by the SpooledBytesIO and SpooledStringIO classes.
|
||||||
methods that a StringIO instance does. This brings the api as close to
|
|
||||||
compatible as possible with StringIO so that it may be used as a near
|
|
||||||
drop-in replacement to save memory.
|
|
||||||
|
|
||||||
Another issue with SpooledTemporaryFile is that the spooled file is always
|
The SpooledTemporaryFile class is missing several attributes and methods
|
||||||
a cStringIO rather than a StringIO which causes issues with some of our
|
present in the StringIO implementation. This brings the api as close to
|
||||||
tools.
|
parity as possible so that classes derived from SpooledIOBase can be used
|
||||||
|
as near drop-in replacements to save memory.
|
||||||
"""
|
"""
|
||||||
__metaclass__ = ABCMeta
|
__metaclass__ = ABCMeta
|
||||||
|
|
||||||
|
@ -83,6 +86,11 @@ class SpooledIOBase(object):
|
||||||
self._max_size = max_size
|
self._max_size = max_size
|
||||||
self._dir = dir
|
self._dir = dir
|
||||||
|
|
||||||
|
def _checkClosed(self, msg=None):
|
||||||
|
"""Raise a ValueError if file is closed"""
|
||||||
|
if self.closed:
|
||||||
|
raise ValueError('I/O operation on closed file.'
|
||||||
|
if msg is None else msg)
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def read(self, n=-1):
|
def read(self, n=-1):
|
||||||
"""Read n characters from the buffer"""
|
"""Read n characters from the buffer"""
|
||||||
|
@ -103,6 +111,16 @@ class SpooledIOBase(object):
|
||||||
def readlines(self, sizehint=0):
|
def readlines(self, sizehint=0):
|
||||||
"""Returns a list of all lines from the current position forward"""
|
"""Returns a list of all lines from the current position forward"""
|
||||||
|
|
||||||
|
def writelines(self, lines):
|
||||||
|
"""
|
||||||
|
Write lines to the file from an interable.
|
||||||
|
|
||||||
|
NOTE: writelines() does NOT add line separators.
|
||||||
|
"""
|
||||||
|
self._checkClosed()
|
||||||
|
for line in lines:
|
||||||
|
self.write(line)
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def rollover(self):
|
def rollover(self):
|
||||||
"""Roll file-like-object over into a real temporary file"""
|
"""Roll file-like-object over into a real temporary file"""
|
||||||
|
@ -139,22 +157,13 @@ class SpooledIOBase(object):
|
||||||
return self.buffer.close()
|
return self.buffer.close()
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
|
self._checkClosed()
|
||||||
return self.buffer.flush()
|
return self.buffer.flush()
|
||||||
|
|
||||||
def isatty(self):
|
def isatty(self):
|
||||||
|
self._checkClosed()
|
||||||
return self.buffer.isatty()
|
return self.buffer.isatty()
|
||||||
|
|
||||||
def next(self):
|
|
||||||
line = self.readline()
|
|
||||||
if not line:
|
|
||||||
pos = self.buffer.tell()
|
|
||||||
self.buffer.seek(0, os.SEEK_END)
|
|
||||||
if pos == self.buffer.tell():
|
|
||||||
raise StopIteration
|
|
||||||
else:
|
|
||||||
self.buffer.seek(pos)
|
|
||||||
return line
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def closed(self):
|
def closed(self):
|
||||||
return self.buffer.closed
|
return self.buffer.closed
|
||||||
|
@ -173,10 +182,13 @@ class SpooledIOBase(object):
|
||||||
|
|
||||||
def truncate(self, size=None):
|
def truncate(self, size=None):
|
||||||
"""
|
"""
|
||||||
|
Truncate the contents of the buffer.
|
||||||
|
|
||||||
Custom version of truncate that takes either no arguments (like the
|
Custom version of truncate that takes either no arguments (like the
|
||||||
real SpooledTemporaryFile) or a single argument that truncates the
|
real SpooledTemporaryFile) or a single argument that truncates the
|
||||||
value to a certain index location.
|
value to a certain index location.
|
||||||
"""
|
"""
|
||||||
|
self._checkClosed()
|
||||||
if size is None:
|
if size is None:
|
||||||
return self.buffer.truncate()
|
return self.buffer.truncate()
|
||||||
|
|
||||||
|
@ -191,7 +203,8 @@ class SpooledIOBase(object):
|
||||||
self.seek(pos)
|
self.seek(pos)
|
||||||
|
|
||||||
def getvalue(self):
|
def getvalue(self):
|
||||||
"""Return the entire files contents"""
|
"""Return the entire files contents."""
|
||||||
|
self._checkClosed()
|
||||||
pos = self.tell()
|
pos = self.tell()
|
||||||
self.seek(0)
|
self.seek(0)
|
||||||
val = self.read()
|
val = self.read()
|
||||||
|
@ -207,15 +220,29 @@ class SpooledIOBase(object):
|
||||||
def writable(self):
|
def writable(self):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
__next__ = next
|
def __next__(self):
|
||||||
|
self._checkClosed()
|
||||||
|
line = self.readline()
|
||||||
|
if not line:
|
||||||
|
pos = self.buffer.tell()
|
||||||
|
self.buffer.seek(0, os.SEEK_END)
|
||||||
|
if pos == self.buffer.tell():
|
||||||
|
raise StopIteration
|
||||||
|
else:
|
||||||
|
self.buffer.seek(pos)
|
||||||
|
return line
|
||||||
|
|
||||||
|
next = __next__
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return self.len
|
return self.len
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
|
self._checkClosed()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
|
self._checkClosed()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
|
@ -223,7 +250,31 @@ class SpooledIOBase(object):
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
if isinstance(other, self.__class__):
|
if isinstance(other, self.__class__):
|
||||||
return self.getvalue() == other.getvalue()
|
self_pos = self.tell()
|
||||||
|
other_pos = other.tell()
|
||||||
|
try:
|
||||||
|
self.seek(0)
|
||||||
|
other.seek(0)
|
||||||
|
eq = True
|
||||||
|
for self_line, other_line in zip_longest(self, other):
|
||||||
|
if self_line != other_line:
|
||||||
|
eq = False
|
||||||
|
break
|
||||||
|
self.seek(self_pos)
|
||||||
|
other.seek(other_pos)
|
||||||
|
except Exception:
|
||||||
|
# Attempt to return files to original position if there were any errors
|
||||||
|
try:
|
||||||
|
self.seek(self_pos)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
other.seek(other_pos)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
return eq
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def __ne__(self, other):
|
def __ne__(self, other):
|
||||||
|
@ -232,6 +283,13 @@ class SpooledIOBase(object):
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
"""Can fail when called at program exit so suppress traceback."""
|
||||||
|
try:
|
||||||
|
self.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
__nonzero__ = __bool__
|
__nonzero__ = __bool__
|
||||||
|
|
||||||
|
|
||||||
|
@ -253,11 +311,13 @@ class SpooledBytesIO(SpooledIOBase):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def read(self, n=-1):
|
def read(self, n=-1):
|
||||||
|
self._checkClosed()
|
||||||
return self.buffer.read(n)
|
return self.buffer.read(n)
|
||||||
|
|
||||||
def write(self, s):
|
def write(self, s):
|
||||||
|
self._checkClosed()
|
||||||
if not isinstance(s, binary_type):
|
if not isinstance(s, binary_type):
|
||||||
raise TypeError("{0} expected, got {1}".format(
|
raise TypeError("{} expected, got {}".format(
|
||||||
binary_type.__name__,
|
binary_type.__name__,
|
||||||
type(s).__name__
|
type(s).__name__
|
||||||
))
|
))
|
||||||
|
@ -267,9 +327,11 @@ class SpooledBytesIO(SpooledIOBase):
|
||||||
self.buffer.write(s)
|
self.buffer.write(s)
|
||||||
|
|
||||||
def seek(self, pos, mode=0):
|
def seek(self, pos, mode=0):
|
||||||
|
self._checkClosed()
|
||||||
return self.buffer.seek(pos, mode)
|
return self.buffer.seek(pos, mode)
|
||||||
|
|
||||||
def readline(self, length=None):
|
def readline(self, length=None):
|
||||||
|
self._checkClosed()
|
||||||
if length:
|
if length:
|
||||||
return self.buffer.readline(length)
|
return self.buffer.readline(length)
|
||||||
else:
|
else:
|
||||||
|
@ -314,6 +376,7 @@ class SpooledBytesIO(SpooledIOBase):
|
||||||
return val
|
return val
|
||||||
|
|
||||||
def tell(self):
|
def tell(self):
|
||||||
|
self._checkClosed()
|
||||||
return self.buffer.tell()
|
return self.buffer.tell()
|
||||||
|
|
||||||
|
|
||||||
|
@ -340,13 +403,15 @@ class SpooledStringIO(SpooledIOBase):
|
||||||
super(SpooledStringIO, self).__init__(*args, **kwargs)
|
super(SpooledStringIO, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def read(self, n=-1):
|
def read(self, n=-1):
|
||||||
|
self._checkClosed()
|
||||||
ret = self.buffer.reader.read(n, n)
|
ret = self.buffer.reader.read(n, n)
|
||||||
self._tell = self.tell() + len(ret)
|
self._tell = self.tell() + len(ret)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def write(self, s):
|
def write(self, s):
|
||||||
|
self._checkClosed()
|
||||||
if not isinstance(s, text_type):
|
if not isinstance(s, text_type):
|
||||||
raise TypeError("{0} expected, got {1}".format(
|
raise TypeError("{} expected, got {}".format(
|
||||||
text_type.__name__,
|
text_type.__name__,
|
||||||
type(s).__name__
|
type(s).__name__
|
||||||
))
|
))
|
||||||
|
@ -384,6 +449,7 @@ class SpooledStringIO(SpooledIOBase):
|
||||||
|
|
||||||
def seek(self, pos, mode=0):
|
def seek(self, pos, mode=0):
|
||||||
"""Traverse from offset to the specified codepoint"""
|
"""Traverse from offset to the specified codepoint"""
|
||||||
|
self._checkClosed()
|
||||||
# Seek to position from the start of the file
|
# Seek to position from the start of the file
|
||||||
if mode == os.SEEK_SET:
|
if mode == os.SEEK_SET:
|
||||||
self.buffer.seek(0)
|
self.buffer.seek(0)
|
||||||
|
@ -406,6 +472,7 @@ class SpooledStringIO(SpooledIOBase):
|
||||||
return self.tell()
|
return self.tell()
|
||||||
|
|
||||||
def readline(self, length=None):
|
def readline(self, length=None):
|
||||||
|
self._checkClosed()
|
||||||
ret = self.buffer.readline(length).decode('utf-8')
|
ret = self.buffer.readline(length).decode('utf-8')
|
||||||
self._tell = self.tell() + len(ret)
|
self._tell = self.tell() + len(ret)
|
||||||
return ret
|
return ret
|
||||||
|
@ -428,7 +495,7 @@ class SpooledStringIO(SpooledIOBase):
|
||||||
return not isinstance(self.buffer.stream, BytesIO)
|
return not isinstance(self.buffer.stream, BytesIO)
|
||||||
|
|
||||||
def rollover(self):
|
def rollover(self):
|
||||||
"""Roll the StringIO over to a TempFile"""
|
"""Roll the buffer over to a TempFile"""
|
||||||
if not self._rolled:
|
if not self._rolled:
|
||||||
tmp = EncodedFile(TemporaryFile(dir=self._dir),
|
tmp = EncodedFile(TemporaryFile(dir=self._dir),
|
||||||
data_encoding='utf-8')
|
data_encoding='utf-8')
|
||||||
|
@ -440,6 +507,7 @@ class SpooledStringIO(SpooledIOBase):
|
||||||
|
|
||||||
def tell(self):
|
def tell(self):
|
||||||
"""Return the codepoint position"""
|
"""Return the codepoint position"""
|
||||||
|
self._checkClosed()
|
||||||
return self._tell
|
return self._tell
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -208,7 +208,7 @@ def split_iter(src, sep=None, maxsplit=None):
|
||||||
|
|
||||||
def lstrip(iterable, strip_value=None):
|
def lstrip(iterable, strip_value=None):
|
||||||
"""Strips values from the beginning of an iterable. Stripped items will
|
"""Strips values from the beginning of an iterable. Stripped items will
|
||||||
match the value of the argument strip_value. Functionality is analigous
|
match the value of the argument strip_value. Functionality is analogous
|
||||||
to that of the method str.lstrip. Returns a list.
|
to that of the method str.lstrip. Returns a list.
|
||||||
|
|
||||||
>>> lstrip(['Foo', 'Bar', 'Bam'], 'Foo')
|
>>> lstrip(['Foo', 'Bar', 'Bam'], 'Foo')
|
||||||
|
@ -220,7 +220,7 @@ def lstrip(iterable, strip_value=None):
|
||||||
|
|
||||||
def lstrip_iter(iterable, strip_value=None):
|
def lstrip_iter(iterable, strip_value=None):
|
||||||
"""Strips values from the beginning of an iterable. Stripped items will
|
"""Strips values from the beginning of an iterable. Stripped items will
|
||||||
match the value of the argument strip_value. Functionality is analigous
|
match the value of the argument strip_value. Functionality is analogous
|
||||||
to that of the method str.lstrip. Returns a generator.
|
to that of the method str.lstrip. Returns a generator.
|
||||||
|
|
||||||
>>> list(lstrip_iter(['Foo', 'Bar', 'Bam'], 'Foo'))
|
>>> list(lstrip_iter(['Foo', 'Bar', 'Bam'], 'Foo'))
|
||||||
|
@ -238,7 +238,7 @@ def lstrip_iter(iterable, strip_value=None):
|
||||||
|
|
||||||
def rstrip(iterable, strip_value=None):
|
def rstrip(iterable, strip_value=None):
|
||||||
"""Strips values from the end of an iterable. Stripped items will
|
"""Strips values from the end of an iterable. Stripped items will
|
||||||
match the value of the argument strip_value. Functionality is analigous
|
match the value of the argument strip_value. Functionality is analogous
|
||||||
to that of the method str.rstrip. Returns a list.
|
to that of the method str.rstrip. Returns a list.
|
||||||
|
|
||||||
>>> rstrip(['Foo', 'Bar', 'Bam'], 'Bam')
|
>>> rstrip(['Foo', 'Bar', 'Bam'], 'Bam')
|
||||||
|
@ -250,7 +250,7 @@ def rstrip(iterable, strip_value=None):
|
||||||
|
|
||||||
def rstrip_iter(iterable, strip_value=None):
|
def rstrip_iter(iterable, strip_value=None):
|
||||||
"""Strips values from the end of an iterable. Stripped items will
|
"""Strips values from the end of an iterable. Stripped items will
|
||||||
match the value of the argument strip_value. Functionality is analigous
|
match the value of the argument strip_value. Functionality is analogous
|
||||||
to that of the method str.rstrip. Returns a generator.
|
to that of the method str.rstrip. Returns a generator.
|
||||||
|
|
||||||
>>> list(rstrip_iter(['Foo', 'Bar', 'Bam'], 'Bam'))
|
>>> list(rstrip_iter(['Foo', 'Bar', 'Bam'], 'Bam'))
|
||||||
|
@ -279,7 +279,7 @@ def rstrip_iter(iterable, strip_value=None):
|
||||||
def strip(iterable, strip_value=None):
|
def strip(iterable, strip_value=None):
|
||||||
"""Strips values from the beginning and end of an iterable. Stripped items
|
"""Strips values from the beginning and end of an iterable. Stripped items
|
||||||
will match the value of the argument strip_value. Functionality is
|
will match the value of the argument strip_value. Functionality is
|
||||||
analigous to that of the method str.strip. Returns a list.
|
analogous to that of the method str.strip. Returns a list.
|
||||||
|
|
||||||
>>> strip(['Fu', 'Foo', 'Bar', 'Bam', 'Fu'], 'Fu')
|
>>> strip(['Fu', 'Foo', 'Bar', 'Bam', 'Fu'], 'Fu')
|
||||||
['Foo', 'Bar', 'Bam']
|
['Foo', 'Bar', 'Bam']
|
||||||
|
@ -291,7 +291,7 @@ def strip(iterable, strip_value=None):
|
||||||
def strip_iter(iterable,strip_value=None):
|
def strip_iter(iterable,strip_value=None):
|
||||||
"""Strips values from the beginning and end of an iterable. Stripped items
|
"""Strips values from the beginning and end of an iterable. Stripped items
|
||||||
will match the value of the argument strip_value. Functionality is
|
will match the value of the argument strip_value. Functionality is
|
||||||
analigous to that of the method str.strip. Returns a generator.
|
analogous to that of the method str.strip. Returns a generator.
|
||||||
|
|
||||||
>>> list(strip_iter(['Fu', 'Foo', 'Bar', 'Bam', 'Fu'], 'Fu'))
|
>>> list(strip_iter(['Fu', 'Foo', 'Bar', 'Bam', 'Fu'], 'Fu'))
|
||||||
['Foo', 'Bar', 'Bam']
|
['Foo', 'Bar', 'Bam']
|
||||||
|
|
|
@ -1176,7 +1176,7 @@ class MultiReplace(object):
|
||||||
|
|
||||||
Dictionary Usage::
|
Dictionary Usage::
|
||||||
|
|
||||||
from lrmslib import stringutils
|
from boltons import stringutils
|
||||||
s = stringutils.MultiReplace({
|
s = stringutils.MultiReplace({
|
||||||
'foo': 'zoo',
|
'foo': 'zoo',
|
||||||
'cat': 'hat',
|
'cat': 'hat',
|
||||||
|
@ -1187,7 +1187,7 @@ class MultiReplace(object):
|
||||||
|
|
||||||
Iterable Usage::
|
Iterable Usage::
|
||||||
|
|
||||||
from lrmslib import stringutils
|
from boltons import stringutils
|
||||||
s = stringutils.MultiReplace([
|
s = stringutils.MultiReplace([
|
||||||
('foo', 'zoo'),
|
('foo', 'zoo'),
|
||||||
('cat', 'hat'),
|
('cat', 'hat'),
|
||||||
|
@ -1239,10 +1239,7 @@ class MultiReplace(object):
|
||||||
else:
|
else:
|
||||||
exp = vals[0].pattern
|
exp = vals[0].pattern
|
||||||
|
|
||||||
regex_values.append('(?P<{0}>{1})'.format(
|
regex_values.append('(?P<{}>{})'.format(group_name, exp))
|
||||||
group_name,
|
|
||||||
exp
|
|
||||||
))
|
|
||||||
self.group_map[group_name] = vals[1]
|
self.group_map[group_name] = vals[1]
|
||||||
|
|
||||||
self.combined_pattern = re.compile(
|
self.combined_pattern = re.compile(
|
||||||
|
@ -1267,7 +1264,18 @@ class MultiReplace(object):
|
||||||
|
|
||||||
|
|
||||||
def multi_replace(text, sub_map, **kwargs):
|
def multi_replace(text, sub_map, **kwargs):
|
||||||
"""Shortcut function to invoke MultiReplace in a single call."""
|
"""
|
||||||
|
Shortcut function to invoke MultiReplace in a single call.
|
||||||
|
|
||||||
|
Example Usage::
|
||||||
|
|
||||||
|
from boltons.stringutils import multi_replace
|
||||||
|
new = multi_replace(
|
||||||
|
'The foo bar cat ate a bat',
|
||||||
|
{'foo': 'zoo', 'cat': 'hat', 'bat': 'kraken'}
|
||||||
|
)
|
||||||
|
new == 'The zoo bar hat ate a kraken'
|
||||||
|
"""
|
||||||
m = MultiReplace(sub_map, **kwargs)
|
m = MultiReplace(sub_map, **kwargs)
|
||||||
return m.sub(text)
|
return m.sub(text)
|
||||||
|
|
||||||
|
|
|
@ -382,7 +382,7 @@ def daterange(start, stop, step=1, inclusive=False):
|
||||||
|
|
||||||
if stop is None:
|
if stop is None:
|
||||||
finished = lambda now, stop: False
|
finished = lambda now, stop: False
|
||||||
elif start < stop:
|
elif start <= stop:
|
||||||
finished = operator.gt if inclusive else operator.ge
|
finished = operator.gt if inclusive else operator.ge
|
||||||
else:
|
else:
|
||||||
finished = operator.lt if inclusive else operator.le
|
finished = operator.lt if inclusive else operator.le
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
coverage==4.5.1
|
coverage==6.5.0
|
||||||
pytest==3.0.7
|
pytest==7.2.0
|
||||||
pytest-cov==2.5.1
|
pytest-cov==4.0.0
|
||||||
tox<3.0
|
tox<3.0
|
||||||
|
|
|
@ -29,15 +29,8 @@ _TEST_TMPLS = ["example 1: {hello}",
|
||||||
"example 2: {hello:*10}",
|
"example 2: {hello:*10}",
|
||||||
"example 3: {hello:*{width}}",
|
"example 3: {hello:*{width}}",
|
||||||
"example 4: {hello!r:{fchar}{width}}, {width}, yes",
|
"example 4: {hello!r:{fchar}{width}}, {width}, yes",
|
||||||
"example 5: {0}, {1:d}, {2:f}, {1}"]
|
"example 5: {0}, {1:d}, {2:f}, {1}",
|
||||||
|
"example 6: {}, {}, {}, {1}"]
|
||||||
try:
|
|
||||||
from collections import OrderedDict
|
|
||||||
except ImportError:
|
|
||||||
pass # skip the non-2.6 compatible tests on 2.6
|
|
||||||
else:
|
|
||||||
_TEST_TMPLS.append("example 6: {}, {}, {}, {1}")
|
|
||||||
del OrderedDict
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_fstr_args():
|
def test_get_fstr_args():
|
||||||
|
@ -45,8 +38,7 @@ def test_get_fstr_args():
|
||||||
for t in _TEST_TMPLS:
|
for t in _TEST_TMPLS:
|
||||||
inferred_t = infer_positional_format_args(t)
|
inferred_t = infer_positional_format_args(t)
|
||||||
res = get_format_args(inferred_t)
|
res = get_format_args(inferred_t)
|
||||||
results.append(res)
|
assert res
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
def test_split_fstr():
|
def test_split_fstr():
|
||||||
|
@ -54,7 +46,7 @@ def test_split_fstr():
|
||||||
for t in _TEST_TMPLS:
|
for t in _TEST_TMPLS:
|
||||||
res = split_format_str(t)
|
res = split_format_str(t)
|
||||||
results.append(res)
|
results.append(res)
|
||||||
return results
|
assert res
|
||||||
|
|
||||||
|
|
||||||
def test_tokenize_format_str():
|
def test_tokenize_format_str():
|
||||||
|
@ -62,7 +54,7 @@ def test_tokenize_format_str():
|
||||||
for t in _TEST_TMPLS:
|
for t in _TEST_TMPLS:
|
||||||
res = tokenize_format_str(t)
|
res = tokenize_format_str(t)
|
||||||
results.append(res)
|
results.append(res)
|
||||||
return results
|
assert res
|
||||||
|
|
||||||
|
|
||||||
def test_deferredvalue():
|
def test_deferredvalue():
|
||||||
|
|
|
@ -75,6 +75,29 @@ class BaseTestMixin(object):
|
||||||
finally:
|
finally:
|
||||||
os.rmdir(custom_dir)
|
os.rmdir(custom_dir)
|
||||||
|
|
||||||
|
def test_compare_err(self):
|
||||||
|
"""Read-heads are reset if a comparison raises an error."""
|
||||||
|
def _monkey_err(*args, **kwargs):
|
||||||
|
raise Exception('A sad error has occurred today')
|
||||||
|
|
||||||
|
a = self.spooled_flo.__class__()
|
||||||
|
a.write(self.test_str)
|
||||||
|
b = self.spooled_flo.__class__()
|
||||||
|
b.write(self.test_str)
|
||||||
|
|
||||||
|
a.seek(1)
|
||||||
|
b.seek(2)
|
||||||
|
|
||||||
|
b.__next__ = _monkey_err
|
||||||
|
|
||||||
|
try:
|
||||||
|
a == b
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.assertEqual(a.tell(), 1)
|
||||||
|
self.assertEqual(b.tell(), 2)
|
||||||
|
|
||||||
def test_truncate_noargs_norollover(self):
|
def test_truncate_noargs_norollover(self):
|
||||||
"""Test truncating with no args with in-memory flo"""
|
"""Test truncating with no args with in-memory flo"""
|
||||||
self.spooled_flo.write(self.test_str)
|
self.spooled_flo.write(self.test_str)
|
||||||
|
@ -190,6 +213,23 @@ class BaseTestMixin(object):
|
||||||
if not self.spooled_flo:
|
if not self.spooled_flo:
|
||||||
raise AssertionError("Instance is not truthy")
|
raise AssertionError("Instance is not truthy")
|
||||||
|
|
||||||
|
def test_instance_check(self):
|
||||||
|
"""Instance checks against IOBase succeed."""
|
||||||
|
if not isinstance(self.spooled_flo, io.IOBase):
|
||||||
|
raise AssertionError('{} is not an instance of IOBase'.format(type(self.spooled_flo)))
|
||||||
|
|
||||||
|
def test_closed_file_method_valueerrors(self):
|
||||||
|
"""ValueError raised on closed files for certain methods."""
|
||||||
|
self.spooled_flo.close()
|
||||||
|
methods = (
|
||||||
|
'flush', 'isatty', 'pos', 'buf', 'truncate', '__next__', '__iter__',
|
||||||
|
'__enter__', 'read', 'readline', 'tell',
|
||||||
|
)
|
||||||
|
for method_name in methods:
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
getattr(self.spooled_flo, method_name)()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TestSpooledBytesIO(TestCase, BaseTestMixin, AssertionsMixin):
|
class TestSpooledBytesIO(TestCase, BaseTestMixin, AssertionsMixin):
|
||||||
linesep = os.linesep.encode('ascii')
|
linesep = os.linesep.encode('ascii')
|
||||||
|
@ -270,6 +310,13 @@ class TestSpooledBytesIO(TestCase, BaseTestMixin, AssertionsMixin):
|
||||||
self.spooled_flo.seek(0)
|
self.spooled_flo.seek(0)
|
||||||
self.assertEqual([x for x in self.spooled_flo], [b"a\n", b"b"])
|
self.assertEqual([x for x in self.spooled_flo], [b"a\n", b"b"])
|
||||||
|
|
||||||
|
def test_writelines(self):
|
||||||
|
"""An iterable of lines can be written"""
|
||||||
|
lines = [b"1", b"2", b"3"]
|
||||||
|
expected = b"123"
|
||||||
|
self.spooled_flo.writelines(lines)
|
||||||
|
self.assertEqual(self.spooled_flo.getvalue(), expected)
|
||||||
|
|
||||||
|
|
||||||
class TestSpooledStringIO(TestCase, BaseTestMixin, AssertionsMixin):
|
class TestSpooledStringIO(TestCase, BaseTestMixin, AssertionsMixin):
|
||||||
linesep = os.linesep
|
linesep = os.linesep
|
||||||
|
@ -408,6 +455,13 @@ class TestSpooledStringIO(TestCase, BaseTestMixin, AssertionsMixin):
|
||||||
self.spooled_flo.seek(0)
|
self.spooled_flo.seek(0)
|
||||||
self.assertEqual([x for x in self.spooled_flo], [u"a\n", u"b"])
|
self.assertEqual([x for x in self.spooled_flo], [u"a\n", u"b"])
|
||||||
|
|
||||||
|
def test_writelines(self):
|
||||||
|
"""An iterable of lines can be written"""
|
||||||
|
lines = [u"1", u"2", u"3"]
|
||||||
|
expected = u"123"
|
||||||
|
self.spooled_flo.writelines(lines)
|
||||||
|
self.assertEqual(self.spooled_flo.getvalue(), expected)
|
||||||
|
|
||||||
|
|
||||||
class TestMultiFileReader(TestCase):
|
class TestMultiFileReader(TestCase):
|
||||||
def test_read_seek_bytes(self):
|
def test_read_seek_bytes(self):
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
|
|
||||||
from datetime import timedelta, date
|
from datetime import timedelta, date
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from boltons.timeutils import total_seconds, daterange
|
from boltons.timeutils import total_seconds, daterange
|
||||||
|
|
||||||
|
|
||||||
|
@ -60,10 +63,23 @@ def test_daterange_years_step():
|
||||||
dates = list(daterange(start_day, end_day, step=(0, 13, 0), inclusive=False))
|
dates = list(daterange(start_day, end_day, step=(0, 13, 0), inclusive=False))
|
||||||
expected = [date(year=2012, month=12, day=25), date(year=2014, month=1, day=25), date(year=2015, month=2, day=25)]
|
expected = [date(year=2012, month=12, day=25), date(year=2014, month=1, day=25), date(year=2015, month=2, day=25)]
|
||||||
assert dates == expected
|
assert dates == expected
|
||||||
|
|
||||||
|
|
||||||
def test_daterange_infinite():
|
def test_daterange_infinite():
|
||||||
today = date.today()
|
today = date.today()
|
||||||
infinite_dates = daterange(today, None)
|
infinite_dates = daterange(today, None)
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
assert next(infinite_dates) == today + timedelta(days=i)
|
assert next(infinite_dates) == today + timedelta(days=i)
|
||||||
|
|
||||||
|
|
||||||
|
def test_daterange_with_same_start_stop():
|
||||||
|
today = date.today()
|
||||||
|
|
||||||
|
date_range = daterange(today, today)
|
||||||
|
with pytest.raises(StopIteration):
|
||||||
|
next(date_range)
|
||||||
|
|
||||||
|
date_range_inclusive = daterange(today, today, inclusive=True)
|
||||||
|
assert next(date_range_inclusive) == today
|
||||||
|
with pytest.raises(StopIteration):
|
||||||
|
next(date_range_inclusive)
|
||||||
|
|
|
@ -99,7 +99,7 @@ def test_idna():
|
||||||
def test_query_params(test_url):
|
def test_query_params(test_url):
|
||||||
url_obj = URL(test_url)
|
url_obj = URL(test_url)
|
||||||
if not url_obj.query_params or url_obj.fragment:
|
if not url_obj.query_params or url_obj.fragment:
|
||||||
return True
|
return
|
||||||
qp_text = url_obj.query_params.to_text(full_quote=True)
|
qp_text = url_obj.query_params.to_text(full_quote=True)
|
||||||
assert test_url.endswith(qp_text)
|
assert test_url.endswith(qp_text)
|
||||||
|
|
||||||
|
|
9
tox.ini
9
tox.ini
|
@ -1,6 +1,13 @@
|
||||||
[tox]
|
[tox]
|
||||||
envlist = py27,py34,py37,py39,pypy
|
envlist = py27,py37,py39,py310,pypy3
|
||||||
[testenv]
|
[testenv]
|
||||||
changedir = .tox
|
changedir = .tox
|
||||||
deps = -rrequirements-test.txt
|
deps = -rrequirements-test.txt
|
||||||
commands = py.test --doctest-modules {envsitepackagesdir}/boltons {toxinidir}/tests {posargs}
|
commands = py.test --doctest-modules {envsitepackagesdir}/boltons {toxinidir}/tests {posargs}
|
||||||
|
|
||||||
|
[testenv:py27]
|
||||||
|
deps =
|
||||||
|
coverage==5.5
|
||||||
|
pytest==4.6.11
|
||||||
|
pytest-cov==2.12
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue