From bdb54d7cefe9c489602492b4b989abd77ab2ac6c Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 6 Jan 2013 10:50:29 -0800 Subject: [PATCH] schema: various FileReader optimizations and bug fixes Now much faster. Change-Id: I94920b7ab6903e986b045a6ed862eff0796a7f7e --- pkg/schema/filereader.go | 47 ++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/schema/filereader.go b/pkg/schema/filereader.go index c730f4215..239f54d81 100644 --- a/pkg/schema/filereader.go +++ b/pkg/schema/filereader.go @@ -38,17 +38,18 @@ var errClosed = errors.New("filereader is closed") // A FileReader reads the bytes of "file" and "bytes" schema blobrefs. type FileReader struct { - *io.SectionReader - - ssmmu sync.Mutex // guards ssm - ssm map[string]*Superset // blobref -> superset + // Immutable stuff: + *io.SectionReader // provides Read, etc. + parent *FileReader // or nil for sub-region readers to find the ssm map in getSuperset + rootOff int64 // this FileReader's offset from the root + fetcher blobref.SeekFetcher + ss *Superset + size int64 // total number of bytes sfg singleflight.Group // for loading blobrefs for ssm - fetcher blobref.SeekFetcher - ss *Superset - - size int64 // total number of bytes + ssmmu sync.Mutex // guards ssm + ssm map[string]*Superset // blobref -> superset } // NewFileReader returns a new FileReader reading the contents of fileBlobRef, @@ -115,7 +116,9 @@ func (fr *FileReader) LoadAllChunks() { go func(off int64) { rc, err := fr.readerForOffset(off) if err == nil { - rc.Close() + defer rc.Close() + var b [1]byte + rc.Read(b[:]) // fault in the blob } }(off) } @@ -190,7 +193,6 @@ func (fr *FileReader) sendPartsChunks(c chan<- int64, off int64, parts []*BytesP return fmt.Errorf("part illegally contained both a blobRef and bytesRef") case p.BlobRef == nil && p.BytesRef == nil: // Don't send - break case p.BlobRef != nil: c <- off case p.BytesRef != nil: @@ -204,8 +206,7 @@ func (fr *FileReader) sendPartsChunks(c chan<- int64, off int64, parts []*BytesP errc <- err return } - fr.sendPartsChunks(c, offNow, ss.Parts) - errc <- nil + errc <- fr.sendPartsChunks(c, offNow, ss.Parts) }() } off += int64(p.Size) @@ -230,7 +231,17 @@ func (sw *sliceWriter) Write(p []byte) (n int, err error) { var eofReader io.ReadCloser = ioutil.NopCloser(strings.NewReader("")) +func (fr *FileReader) rootReader() *FileReader { + if fr.parent != nil { + return fr.parent.rootReader() + } + return fr +} + func (fr *FileReader) getSuperset(br *blobref.BlobRef) (*Superset, error) { + if root := fr.rootReader(); root != fr { + return root.getSuperset(br) + } brStr := br.String() ssi, err := fr.sfg.Do(brStr, func() (interface{}, error) { fr.ssmmu.Lock() @@ -259,10 +270,15 @@ func (fr *FileReader) getSuperset(br *blobref.BlobRef) (*Superset, error) { return ssi.(*Superset), nil } +var debug = os.Getenv("CAMLI_DEBUG") != "" + // readerForOffset returns a ReadCloser that reads some number of bytes and then EOF // from the provided offset. Seeing EOF doesn't mean the end of the whole file; just the // chunk at that offset. The caller must close the ReadCloser when done reading. func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) { + if debug { + log.Printf("(%p) readerForOffset %d + %d = %d", fr, fr.rootOff, off, fr.rootOff + off) + } if off < 0 { panic("negative offset") } @@ -270,9 +286,11 @@ func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) { return eofReader, nil } offRemain := off + var skipped int64 parts := fr.ss.Parts for len(parts) > 0 && parts[0].Size <= uint64(offRemain) { offRemain -= int64(parts[0].Size) + skipped += int64(parts[0].Size) parts = parts[1:] } if len(parts) == 0 { @@ -295,6 +313,11 @@ func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) { return nil, err } rsc, err = ss.NewFileReader(fr.fetcher) + if err == nil { + subFR := rsc.(*FileReader) + subFR.parent = fr.rootReader() + subFR.rootOff = fr.rootOff + skipped + } } if err != nil { return nil, err