schema: various FileReader optimizations and bug fixes

Now much faster.

Change-Id: I94920b7ab6903e986b045a6ed862eff0796a7f7e
This commit is contained in:
Brad Fitzpatrick 2013-01-06 10:50:29 -08:00
parent d689b8df88
commit bdb54d7cef
1 changed files with 35 additions and 12 deletions

View File

@ -38,17 +38,18 @@ var errClosed = errors.New("filereader is closed")
// A FileReader reads the bytes of "file" and "bytes" schema blobrefs.
type FileReader struct {
*io.SectionReader
ssmmu sync.Mutex // guards ssm
ssm map[string]*Superset // blobref -> superset
// Immutable stuff:
*io.SectionReader // provides Read, etc.
parent *FileReader // or nil for sub-region readers to find the ssm map in getSuperset
rootOff int64 // this FileReader's offset from the root
fetcher blobref.SeekFetcher
ss *Superset
size int64 // total number of bytes
sfg singleflight.Group // for loading blobrefs for ssm
fetcher blobref.SeekFetcher
ss *Superset
size int64 // total number of bytes
ssmmu sync.Mutex // guards ssm
ssm map[string]*Superset // blobref -> superset
}
// NewFileReader returns a new FileReader reading the contents of fileBlobRef,
@ -115,7 +116,9 @@ func (fr *FileReader) LoadAllChunks() {
go func(off int64) {
rc, err := fr.readerForOffset(off)
if err == nil {
rc.Close()
defer rc.Close()
var b [1]byte
rc.Read(b[:]) // fault in the blob
}
}(off)
}
@ -190,7 +193,6 @@ func (fr *FileReader) sendPartsChunks(c chan<- int64, off int64, parts []*BytesP
return fmt.Errorf("part illegally contained both a blobRef and bytesRef")
case p.BlobRef == nil && p.BytesRef == nil:
// Don't send
break
case p.BlobRef != nil:
c <- off
case p.BytesRef != nil:
@ -204,8 +206,7 @@ func (fr *FileReader) sendPartsChunks(c chan<- int64, off int64, parts []*BytesP
errc <- err
return
}
fr.sendPartsChunks(c, offNow, ss.Parts)
errc <- nil
errc <- fr.sendPartsChunks(c, offNow, ss.Parts)
}()
}
off += int64(p.Size)
@ -230,7 +231,17 @@ func (sw *sliceWriter) Write(p []byte) (n int, err error) {
var eofReader io.ReadCloser = ioutil.NopCloser(strings.NewReader(""))
func (fr *FileReader) rootReader() *FileReader {
if fr.parent != nil {
return fr.parent.rootReader()
}
return fr
}
func (fr *FileReader) getSuperset(br *blobref.BlobRef) (*Superset, error) {
if root := fr.rootReader(); root != fr {
return root.getSuperset(br)
}
brStr := br.String()
ssi, err := fr.sfg.Do(brStr, func() (interface{}, error) {
fr.ssmmu.Lock()
@ -259,10 +270,15 @@ func (fr *FileReader) getSuperset(br *blobref.BlobRef) (*Superset, error) {
return ssi.(*Superset), nil
}
var debug = os.Getenv("CAMLI_DEBUG") != ""
// readerForOffset returns a ReadCloser that reads some number of bytes and then EOF
// from the provided offset. Seeing EOF doesn't mean the end of the whole file; just the
// chunk at that offset. The caller must close the ReadCloser when done reading.
func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) {
if debug {
log.Printf("(%p) readerForOffset %d + %d = %d", fr, fr.rootOff, off, fr.rootOff + off)
}
if off < 0 {
panic("negative offset")
}
@ -270,9 +286,11 @@ func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) {
return eofReader, nil
}
offRemain := off
var skipped int64
parts := fr.ss.Parts
for len(parts) > 0 && parts[0].Size <= uint64(offRemain) {
offRemain -= int64(parts[0].Size)
skipped += int64(parts[0].Size)
parts = parts[1:]
}
if len(parts) == 0 {
@ -295,6 +313,11 @@ func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) {
return nil, err
}
rsc, err = ss.NewFileReader(fr.fetcher)
if err == nil {
subFR := rsc.(*FileReader)
subFR.parent = fr.rootReader()
subFR.rootOff = fr.rootOff + skipped
}
}
if err != nil {
return nil, err