The changes to the stateful codecs in 2.4 resulted in StreamReader.readline()

trying to return a complete line even if a size parameter was given (see
http://www.python.org/sf/1076985). This leads to buffer overflows with long
source lines under Windows if e.g. cp1252 is used as the source encoding.
This patch reverts the behaviour of readline() to something that behaves more
like Python 2.3: If a size parameter is given, read() is called only once.

As a side effect of this, readline() now supports all types of linebreaks
supported by unicode.splitlines().

Note that the tokenizer is still broken and it's possible to provoke segfaults
(see http://www.python.org/sf/1089395).
This commit is contained in:
Walter Dörwald 2004-12-21 22:24:00 +00:00
parent dcba6622f5
commit e57d7b179a
2 changed files with 121 additions and 43 deletions

View File

@ -230,6 +230,7 @@ def __init__(self, stream, errors='strict'):
self.errors = errors self.errors = errors
self.bytebuffer = "" self.bytebuffer = ""
self.charbuffer = u"" self.charbuffer = u""
self.atcr = False
def decode(self, input, errors='strict'): def decode(self, input, errors='strict'):
raise NotImplementedError raise NotImplementedError
@ -256,41 +257,39 @@ def read(self, size=-1, chars=-1):
definition of the encoding and the given size, e.g. if definition of the encoding and the given size, e.g. if
optional encoding endings or state markers are available optional encoding endings or state markers are available
on the stream, these should be read too. on the stream, these should be read too.
""" """
# read until we get the required number of characters (if available) # read until we get the required number of characters (if available)
done = False
while True: while True:
# can the request can be satisfied from the character buffer? # can the request can be satisfied from the character buffer?
if chars < 0: if chars < 0:
if self.charbuffer: if self.charbuffer:
done = True break
else: else:
if len(self.charbuffer) >= chars: if len(self.charbuffer) >= chars:
done = True
if done:
if chars < 0:
result = self.charbuffer
self.charbuffer = u""
break
else:
result = self.charbuffer[:chars]
self.charbuffer = self.charbuffer[chars:]
break break
# we need more data # we need more data
if size < 0: if size < 0:
newdata = self.stream.read() newdata = self.stream.read()
else: else:
newdata = self.stream.read(size) newdata = self.stream.read(size)
# decode bytes (those remaining from the last call included)
data = self.bytebuffer + newdata data = self.bytebuffer + newdata
object, decodedbytes = self.decode(data, self.errors) newchars, decodedbytes = self.decode(data, self.errors)
# keep undecoded bytes until the next call # keep undecoded bytes until the next call
self.bytebuffer = data[decodedbytes:] self.bytebuffer = data[decodedbytes:]
# put new characters in the character buffer # put new characters in the character buffer
self.charbuffer += object self.charbuffer += newchars
# there was no data available # there was no data available
if not newdata: if not newdata:
done = True break
if chars < 0:
# Return everything we've got
result = self.charbuffer
self.charbuffer = u""
else:
# Return the first chars characters
result = self.charbuffer[:chars]
self.charbuffer = self.charbuffer[chars:]
return result return result
def readline(self, size=None, keepends=True): def readline(self, size=None, keepends=True):
@ -302,24 +301,36 @@ def readline(self, size=None, keepends=True):
read() method. read() method.
""" """
if size is None: readsize = size or 72
size = 10
line = u"" line = u""
# If size is given, we call read() only once
while True: while True:
data = self.read(size) data = self.read(readsize)
if self.atcr and data.startswith(u"\n"):
data = data[1:]
if data:
self.atcr = data.endswith(u"\r")
line += data line += data
pos = line.find("\n") lines = line.splitlines(True)
if pos>=0: if lines:
self.charbuffer = line[pos+1:] + self.charbuffer line0withend = lines[0]
if keepends: line0withoutend = lines[0].splitlines(False)[0]
line = line[:pos+1] if line0withend != line0withoutend: # We really have a line end
else: # Put the rest back together and keep it until the next call
line = line[:pos] self.charbuffer = u"".join(lines[1:]) + self.charbuffer
return line if keepends:
elif not data: line = line0withend
return line else:
if size<8000: line = line0withoutend
size *= 2 break
# we didn't get anything or this was our only try
elif not data or size is not None:
if line and not keepends:
line = line.splitlines(False)[0]
break
if readsize<8000:
readsize *= 2
return line
def readlines(self, sizehint=None, keepends=True): def readlines(self, sizehint=None, keepends=True):

View File

@ -23,16 +23,16 @@ def read(self, size=-1):
self._buffer = self._buffer[size:] self._buffer = self._buffer[size:]
return s return s
class PartialReadTest(unittest.TestCase): class ReadTest(unittest.TestCase):
def check_partial(self, encoding, input, partialresults): def check_partial(self, input, partialresults):
# get a StreamReader for the encoding and feed the bytestring version # get a StreamReader for the encoding and feed the bytestring version
# of input to the reader byte by byte. Read every available from # of input to the reader byte by byte. Read every available from
# the StreamReader and check that the results equal the appropriate # the StreamReader and check that the results equal the appropriate
# entries from partialresults. # entries from partialresults.
q = Queue() q = Queue()
r = codecs.getreader(encoding)(q) r = codecs.getreader(self.encoding)(q)
result = u"" result = u""
for (c, partialresult) in zip(input.encode(encoding), partialresults): for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
q.write(c) q.write(c)
result += r.read() result += r.read()
self.assertEqual(result, partialresult) self.assertEqual(result, partialresult)
@ -41,13 +41,81 @@ def check_partial(self, encoding, input, partialresults):
self.assertEqual(r.bytebuffer, "") self.assertEqual(r.bytebuffer, "")
self.assertEqual(r.charbuffer, u"") self.assertEqual(r.charbuffer, u"")
class UTF16Test(PartialReadTest): def test_readline(self):
def getreader(input):
stream = StringIO.StringIO(input.encode(self.encoding))
return codecs.getreader(self.encoding)(stream)
def readalllines(input, keepends=True):
reader = getreader(input)
lines = []
while True:
line = reader.readline(keepends=keepends)
if not line:
break
lines.append(line)
return "".join(lines)
s = u"foo\nbar\r\nbaz\rspam\u2028eggs"
self.assertEqual(readalllines(s, True), s)
self.assertEqual(readalllines(s, False), u"foobarbazspameggs")
# Test long lines (multiple calls to read() in readline())
vw = []
vwo = []
for (i, lineend) in enumerate(u"\n \r\n \r \u2028".split()):
vw.append((i*200)*u"\3042" + lineend)
vwo.append((i*200)*u"\3042")
self.assertEqual(readalllines("".join(vw), True), "".join(vw))
self.assertEqual(readalllines("".join(vw), False),"".join(vwo))
# Test lines where the first read might end with \r, so the
# reader has to look ahead whether this is a lone \r or a \r\n
for size in xrange(80):
for lineend in u"\n \r\n \r \u2028".split():
s = size*u"a" + lineend + u"xxx\n"
self.assertEqual(
getreader(s).readline(keepends=True),
size*u"a" + lineend,
)
self.assertEqual(
getreader(s).readline(keepends=False),
size*u"a",
)
def test_readlinequeue(self):
q = Queue()
writer = codecs.getwriter(self.encoding)(q)
reader = codecs.getreader(self.encoding)(q)
# No lineends
writer.write(u"foo\r")
self.assertEqual(reader.readline(keepends=False), u"foo")
writer.write(u"\nbar\r")
self.assertEqual(reader.readline(keepends=False), u"bar")
writer.write(u"baz")
self.assertEqual(reader.readline(keepends=False), u"baz")
self.assertEqual(reader.readline(keepends=False), u"")
# Lineends
writer.write(u"foo\r")
self.assertEqual(reader.readline(keepends=True), u"foo\r")
writer.write(u"\nbar\r")
self.assertEqual(reader.readline(keepends=True), u"bar\r")
writer.write(u"baz")
self.assertEqual(reader.readline(keepends=True), u"baz")
self.assertEqual(reader.readline(keepends=True), u"")
writer.write(u"foo\r\n")
self.assertEqual(reader.readline(keepends=True), u"foo\r\n")
class UTF16Test(ReadTest):
encoding = "utf-16"
spamle = '\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00' spamle = '\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
spambe = '\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m' spambe = '\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
def test_only_one_bom(self): def test_only_one_bom(self):
_,_,reader,writer = codecs.lookup("utf-16") _,_,reader,writer = codecs.lookup(self.encoding)
# encode some stream # encode some stream
s = StringIO.StringIO() s = StringIO.StringIO()
f = writer(s) f = writer(s)
@ -63,7 +131,6 @@ def test_only_one_bom(self):
def test_partial(self): def test_partial(self):
self.check_partial( self.check_partial(
"utf-16",
u"\x00\xff\u0100\uffff", u"\x00\xff\u0100\uffff",
[ [
u"", # first byte of BOM read u"", # first byte of BOM read
@ -79,11 +146,11 @@ def test_partial(self):
] ]
) )
class UTF16LETest(PartialReadTest): class UTF16LETest(ReadTest):
encoding = "utf-16-le"
def test_partial(self): def test_partial(self):
self.check_partial( self.check_partial(
"utf-16-le",
u"\x00\xff\u0100\uffff", u"\x00\xff\u0100\uffff",
[ [
u"", u"",
@ -97,11 +164,11 @@ def test_partial(self):
] ]
) )
class UTF16BETest(PartialReadTest): class UTF16BETest(ReadTest):
encoding = "utf-16-be"
def test_partial(self): def test_partial(self):
self.check_partial( self.check_partial(
"utf-16-be",
u"\x00\xff\u0100\uffff", u"\x00\xff\u0100\uffff",
[ [
u"", u"",
@ -115,11 +182,11 @@ def test_partial(self):
] ]
) )
class UTF8Test(PartialReadTest): class UTF8Test(ReadTest):
encoding = "utf-8"
def test_partial(self): def test_partial(self):
self.check_partial( self.check_partial(
"utf-8",
u"\x00\xff\u07ff\u0800\uffff", u"\x00\xff\u07ff\u0800\uffff",
[ [
u"\x00", u"\x00",