diff --git a/pkg/schema/fileread_test.go b/pkg/schema/fileread_test.go index faa199d3a..1cfb91143 100644 --- a/pkg/schema/fileread_test.go +++ b/pkg/schema/fileread_test.go @@ -222,6 +222,81 @@ func TestReaderSeekStress(t *testing.T) { } } +/* + +1KB ReadAt calls before: +fileread_test.go:253: Blob Size: 4194304 raw, 4201523 with meta (1.00172x) +fileread_test.go:283: Blobs fetched: 4160 (63.03x) +fileread_test.go:284: Bytes fetched: 361174780 (85.96x) + +2KB ReadAt calls before: +fileread_test.go:253: Blob Size: 4194304 raw, 4201523 with meta (1.00172x) +fileread_test.go:283: Blobs fetched: 2112 (32.00x) +fileread_test.go:284: Bytes fetched: 182535389 (43.45x) + +After fix: +fileread_test.go:253: Blob Size: 4194304 raw, 4201523 with meta (1.00172x) +fileread_test.go:283: Blobs fetched: 66 (1.00x) +fileread_test.go:284: Bytes fetched: 4201523 (1.00x) +*/ +func TestReaderEfficiency(t *testing.T) { + const fileSize = 4 << 20 + bigFile := make([]byte, fileSize) + rnd := rand.New(rand.NewSource(1)) + for i := range bigFile { + bigFile[i] = byte(rnd.Intn(256)) + } + + sto := new(test.Fetcher) // in-memory blob storage + fileMap := NewFileMap("testfile") + fileref, err := WriteFileMap(sto, fileMap, bytes.NewReader(bigFile)) + if err != nil { + t.Fatalf("WriteFileMap: %v", err) + } + + fr, err := NewFileReader(sto, fileref) + if err != nil { + t.Fatal(err) + } + + numBlobs := sto.NumBlobs() + t.Logf("Num blobs = %d", numBlobs) + sumSize := sto.SumBlobSize() + t.Logf("Blob Size: %d raw, %d with meta (%.05fx)", fileSize, sumSize, float64(sumSize)/float64(fileSize)) + + const readSize = 2 << 10 + buf := make([]byte, readSize) + for off := int64(0); off < fileSize; off += readSize { + n, err := fr.ReadAt(buf, off) + if err != nil { + t.Fatalf("ReadAt at offset %d: %v", off, err) + } + if n != readSize { + t.Fatalf("Read %d bytes at offset %d; want %d", n, off, readSize) + } + got, want := buf, bigFile[off:off+readSize] + if !bytes.Equal(buf, want) { + t.Errorf("Incorrect read at offset %d:\n got: %s\n want: %s", off, summary(got), summary(want)) + off := 0 + for len(got) > 0 && len(want) > 0 && got[0] == want[0] { + off++ + got = got[1:] + want = want[1:] + } + t.Errorf(" differences start at offset %d:\n got: %s\n want: %s\n", off, summary(got), summary(want)) + break + } + } + fr.Close() + blobsFetched, bytesFetched := sto.Stats() + if blobsFetched != int64(numBlobs) { + t.Errorf("Fetched %d blobs; want %d", blobsFetched, numBlobs) + } + if bytesFetched != sumSize { + t.Errorf("Fetched %d bytes; want %d", bytesFetched, sumSize) + } +} + type summary []byte func (s summary) String() string { diff --git a/pkg/schema/filereader.go b/pkg/schema/filereader.go index 9011d5c8e..f1ef9a165 100644 --- a/pkg/schema/filereader.go +++ b/pkg/schema/filereader.go @@ -48,6 +48,9 @@ type FileReader struct { sfg singleflight.Group // for loading blobrefs for ssm + blobmu sync.Mutex // guards lastBlob + lastBlob *blob.Blob // most recently fetched blob; cuts dup reads up to 85x + ssmmu sync.Mutex // guards ssm ssm map[blob.Ref]*superset // blobref -> superset } @@ -272,6 +275,27 @@ func (fr *FileReader) rootReader() *FileReader { return fr } +func (fr *FileReader) getBlob(br blob.Ref) (*blob.Blob, error) { + if root := fr.rootReader(); root != fr { + return root.getBlob(br) + } + fr.blobmu.Lock() + last := fr.lastBlob + fr.blobmu.Unlock() + if last != nil && last.Ref() == br { + return last, nil + } + blob, err := blob.FromFetcher(fr.fetcher, br) + if err != nil { + return nil, err + } + + fr.blobmu.Lock() + fr.lastBlob = blob + fr.blobmu.Unlock() + return blob, nil +} + func (fr *FileReader) getSuperset(br blob.Ref) (*superset, error) { if root := fr.rootReader(); root != fr { return root.getSuperset(br) @@ -341,7 +365,7 @@ func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) { io.LimitReader(zeroReader{}, int64(p0.Size-uint64(offRemain)))), nil case p0.BlobRef.Valid(): - blob, err := blob.FromFetcher(fr.fetcher, p0.BlobRef) + blob, err := fr.getBlob(p0.BlobRef) if err != nil { return nil, err }