mirror of https://github.com/perkeep/perkeep.git
schema: reduce FileReader blob reads 32-85x by caching the last blob read
Little reads into big blobs kept re-reading the same blob. Change-Id: I098c9d9a9443dacc93f60f96fff1edd421ced198
This commit is contained in:
parent
fe481e5da7
commit
71a1a1ff8d
|
@ -222,6 +222,81 @@ func TestReaderSeekStress(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
1KB ReadAt calls before:
|
||||
fileread_test.go:253: Blob Size: 4194304 raw, 4201523 with meta (1.00172x)
|
||||
fileread_test.go:283: Blobs fetched: 4160 (63.03x)
|
||||
fileread_test.go:284: Bytes fetched: 361174780 (85.96x)
|
||||
|
||||
2KB ReadAt calls before:
|
||||
fileread_test.go:253: Blob Size: 4194304 raw, 4201523 with meta (1.00172x)
|
||||
fileread_test.go:283: Blobs fetched: 2112 (32.00x)
|
||||
fileread_test.go:284: Bytes fetched: 182535389 (43.45x)
|
||||
|
||||
After fix:
|
||||
fileread_test.go:253: Blob Size: 4194304 raw, 4201523 with meta (1.00172x)
|
||||
fileread_test.go:283: Blobs fetched: 66 (1.00x)
|
||||
fileread_test.go:284: Bytes fetched: 4201523 (1.00x)
|
||||
*/
|
||||
func TestReaderEfficiency(t *testing.T) {
|
||||
const fileSize = 4 << 20
|
||||
bigFile := make([]byte, fileSize)
|
||||
rnd := rand.New(rand.NewSource(1))
|
||||
for i := range bigFile {
|
||||
bigFile[i] = byte(rnd.Intn(256))
|
||||
}
|
||||
|
||||
sto := new(test.Fetcher) // in-memory blob storage
|
||||
fileMap := NewFileMap("testfile")
|
||||
fileref, err := WriteFileMap(sto, fileMap, bytes.NewReader(bigFile))
|
||||
if err != nil {
|
||||
t.Fatalf("WriteFileMap: %v", err)
|
||||
}
|
||||
|
||||
fr, err := NewFileReader(sto, fileref)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
numBlobs := sto.NumBlobs()
|
||||
t.Logf("Num blobs = %d", numBlobs)
|
||||
sumSize := sto.SumBlobSize()
|
||||
t.Logf("Blob Size: %d raw, %d with meta (%.05fx)", fileSize, sumSize, float64(sumSize)/float64(fileSize))
|
||||
|
||||
const readSize = 2 << 10
|
||||
buf := make([]byte, readSize)
|
||||
for off := int64(0); off < fileSize; off += readSize {
|
||||
n, err := fr.ReadAt(buf, off)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadAt at offset %d: %v", off, err)
|
||||
}
|
||||
if n != readSize {
|
||||
t.Fatalf("Read %d bytes at offset %d; want %d", n, off, readSize)
|
||||
}
|
||||
got, want := buf, bigFile[off:off+readSize]
|
||||
if !bytes.Equal(buf, want) {
|
||||
t.Errorf("Incorrect read at offset %d:\n got: %s\n want: %s", off, summary(got), summary(want))
|
||||
off := 0
|
||||
for len(got) > 0 && len(want) > 0 && got[0] == want[0] {
|
||||
off++
|
||||
got = got[1:]
|
||||
want = want[1:]
|
||||
}
|
||||
t.Errorf(" differences start at offset %d:\n got: %s\n want: %s\n", off, summary(got), summary(want))
|
||||
break
|
||||
}
|
||||
}
|
||||
fr.Close()
|
||||
blobsFetched, bytesFetched := sto.Stats()
|
||||
if blobsFetched != int64(numBlobs) {
|
||||
t.Errorf("Fetched %d blobs; want %d", blobsFetched, numBlobs)
|
||||
}
|
||||
if bytesFetched != sumSize {
|
||||
t.Errorf("Fetched %d bytes; want %d", bytesFetched, sumSize)
|
||||
}
|
||||
}
|
||||
|
||||
type summary []byte
|
||||
|
||||
func (s summary) String() string {
|
||||
|
|
|
@ -48,6 +48,9 @@ type FileReader struct {
|
|||
|
||||
sfg singleflight.Group // for loading blobrefs for ssm
|
||||
|
||||
blobmu sync.Mutex // guards lastBlob
|
||||
lastBlob *blob.Blob // most recently fetched blob; cuts dup reads up to 85x
|
||||
|
||||
ssmmu sync.Mutex // guards ssm
|
||||
ssm map[blob.Ref]*superset // blobref -> superset
|
||||
}
|
||||
|
@ -272,6 +275,27 @@ func (fr *FileReader) rootReader() *FileReader {
|
|||
return fr
|
||||
}
|
||||
|
||||
func (fr *FileReader) getBlob(br blob.Ref) (*blob.Blob, error) {
|
||||
if root := fr.rootReader(); root != fr {
|
||||
return root.getBlob(br)
|
||||
}
|
||||
fr.blobmu.Lock()
|
||||
last := fr.lastBlob
|
||||
fr.blobmu.Unlock()
|
||||
if last != nil && last.Ref() == br {
|
||||
return last, nil
|
||||
}
|
||||
blob, err := blob.FromFetcher(fr.fetcher, br)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fr.blobmu.Lock()
|
||||
fr.lastBlob = blob
|
||||
fr.blobmu.Unlock()
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
func (fr *FileReader) getSuperset(br blob.Ref) (*superset, error) {
|
||||
if root := fr.rootReader(); root != fr {
|
||||
return root.getSuperset(br)
|
||||
|
@ -341,7 +365,7 @@ func (fr *FileReader) readerForOffset(off int64) (io.ReadCloser, error) {
|
|||
io.LimitReader(zeroReader{},
|
||||
int64(p0.Size-uint64(offRemain)))), nil
|
||||
case p0.BlobRef.Valid():
|
||||
blob, err := blob.FromFetcher(fr.fetcher, p0.BlobRef)
|
||||
blob, err := fr.getBlob(p0.BlobRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue