schema filereader: verify top-level size matches part sizes when reading

This commit is contained in:
Brad Fitzpatrick 2011-06-06 08:50:20 -07:00
parent 9b701a0e37
commit 5f86c64b95
2 changed files with 25 additions and 7 deletions

View File

@ -55,6 +55,13 @@ func parts(parts ...*ContentPart) []*ContentPart {
return parts
}
func sizeSum(parts []*ContentPart) (s uint64) {
for _, p := range parts {
s += uint64(p.Size)
}
return
}
var readTests = []readTest{
{parts(all(blobA)), 0, "AAAAAaaaaa"},
{parts(all(blobA)), 2, "AAAaaaaa"},
@ -78,7 +85,7 @@ func TestReader(t *testing.T) {
ss := new(Superset)
ss.Type = "file"
ss.Version = 1
// TODO: set size?
ss.Size = sizeSum(rt.parts)
ss.ContentParts = rt.parts
fr := ss.NewFileReader(testFetcher)
fr.Skip(rt.skip)

View File

@ -32,9 +32,10 @@ type FileReader struct {
ss *Superset
ci int // index into contentparts
ccon uint64 // bytes into current chunk already consumed
remain int64 // bytes remaining
cr blobref.ReadSeekCloser // cached reader
crbr *blobref.BlobRef // the blobref that cr is for
crbr *blobref.BlobRef // the blobref that cr is for
}
// TODO: make this take a blobref.FetcherAt instead?
@ -54,9 +55,9 @@ func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (*
}
func (ss *Superset) NewFileReader(fetcher blobref.SeekFetcher) *FileReader {
// TODO: return an error if ss isn't a Type "file" ?
// TODO: return some error if the redundant ss.Size field doesn't match ContentParts?
return &FileReader{fetcher: fetcher, ss: ss}
// TODO: return an error if ss isn't a Type "file"
//
return &FileReader{fetcher: fetcher, ss: ss, remain: int64(ss.Size)}
}
// FileSchema returns the reader's schema superset. Don't mutate it.
@ -70,6 +71,7 @@ func (fr *FileReader) Skip(skipBytes uint64) {
thisChunkSkippable := cp.Size - fr.ccon
toSkip := minu64(skipBytes, thisChunkSkippable)
fr.ccon += toSkip
fr.remain -= int64(toSkip)
if fr.ccon == cp.Size {
fr.ci++
fr.ccon = 0
@ -105,6 +107,9 @@ func (fr *FileReader) Read(p []byte) (n int, err os.Error) {
for {
if fr.ci >= len(fr.ss.ContentParts) {
fr.closeOpenBlobs()
if fr.remain > 0 {
return 0, fmt.Errorf("schema: declared file schema size was larger than sum of content parts")
}
return 0, os.EOF
}
cp = fr.ss.ContentParts[fr.ci]
@ -117,6 +122,10 @@ func (fr *FileReader) Read(p []byte) (n int, err os.Error) {
break
}
if cp.Size == 0 {
return 0, fmt.Errorf("blobref content part contained illegal size 0")
}
br := cp.blobref()
if br == nil {
return 0, fmt.Errorf("no blobref in content part %d", fr.ci)
@ -141,8 +150,10 @@ func (fr *FileReader) Read(p []byte) (n int, err os.Error) {
}
n, err = rsc.Read(p[:int(readSize)])
if err == nil || err == os.EOF {
fr.ccon += uint64(n)
fr.ccon += uint64(n)
fr.remain -= int64(n)
if fr.remain < 0 {
err = fmt.Errorf("schema: file schema was invalid; content parts sum to over declared size")
}
return
}