schema: add FileReader.ForeachChunk

Will be used by the blobpacked storage server.

Also, we can then rewrite much of the legacy crufty methods on
FileReader in terms of this.

Change-Id: Ia371001f1ed502fa1da2dbc4a8e1c29e5f0c9e8c
This commit is contained in:
Brad Fitzpatrick 2014-08-29 14:51:33 -07:00
parent 6517bbc392
commit 3388e363b9
2 changed files with 108 additions and 1 deletions

View File

@ -19,11 +19,13 @@ package schema
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/test"
)
@ -297,6 +299,72 @@ func TestReaderEfficiency(t *testing.T) {
}
}
func TestReaderForeachChunk(t *testing.T) {
fileSize := 4 << 20
if testing.Short() {
fileSize = 1 << 20
}
bigFile := make([]byte, fileSize)
rnd := rand.New(rand.NewSource(1))
for i := range bigFile {
bigFile[i] = byte(rnd.Intn(256))
}
sto := new(test.Fetcher) // in-memory blob storage
fileMap := NewFileMap("testfile")
fileref, err := WriteFileMap(sto, fileMap, bytes.NewReader(bigFile))
if err != nil {
t.Fatalf("WriteFileMap: %v", err)
}
fr, err := NewFileReader(sto, fileref)
if err != nil {
t.Fatal(err)
}
var back bytes.Buffer
var totSize uint64
err = fr.ForeachChunk(func(sref blob.Ref, p BytesPart) error {
if !sref.Valid() {
t.Fatal("invalid schema blob")
}
if p.BytesRef.Valid() {
t.Fatal("should never see a valid BytesRef")
}
if !p.BlobRef.Valid() {
t.Fatal("saw part with invalid blobref")
}
rc, size, err := sto.Fetch(p.BlobRef)
if err != nil {
return fmt.Errorf("Error fetching blobref of chunk %+v: %v", p, err)
}
defer rc.Close()
totSize += p.Size
if uint64(size) != p.Size {
return fmt.Errorf("fetched size %d doesn't match expected for chunk %+v", size, p)
}
n, err := io.Copy(&back, rc)
if err != nil {
return err
}
if n != int64(size) {
return fmt.Errorf("Copied unexpected %d bytes of chunk %+v", n, p)
}
return nil
})
if err != nil {
t.Fatal("ForeachChunk = %v", err)
}
if back.Len() != fileSize {
t.Fatalf("Read file is %d bytes; want %d", back.Len(), fileSize)
}
if totSize != uint64(fileSize) {
t.Errorf("sum of parts = %d; want %d", totSize, fileSize)
}
if !bytes.Equal(back.Bytes(), bigFile) {
t.Errorf("file read mismatch")
}
}
type summary []byte
func (s summary) String() string {

View File

@ -82,6 +82,7 @@ func NewFileReader(fetcher blob.Fetcher, fileBlobRef blob.Ref) (*FileReader, err
if err != nil {
return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %v", err)
}
ss.BlobRef = fileBlobRef
if ss.Type != "file" && ss.Type != "bytes" {
return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type)
}
@ -186,6 +187,43 @@ func (fr *FileReader) ReadAt(p []byte, offset int64) (n int, err error) {
return n, err
}
// ForeachChunk calls fn for each chunk of fr, in order.
//
// The schema argument will be the "file" or "bytes" schema blob in
// which the part is defined. The BytesPart will be the actual
// chunk. The fn function will not be called with BytesParts
// referencing a "BytesRef"; those are followed recursively instead.
//
// If fn returns an error, iteration stops and that error is returned
// from ForeachChunk. Other errors may be returned from ForeachChunk
// if schema blob fetches fail.
func (fr *FileReader) ForeachChunk(fn func(schema blob.Ref, p BytesPart) error) error {
for _, bp := range fr.ss.Parts {
if bp.BytesRef.Valid() && bp.BlobRef.Valid() {
return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef)
}
if bp.BytesRef.Valid() {
ss, err := fr.getSuperset(bp.BytesRef)
if err != nil {
return err
}
subfr, err := ss.NewFileReader(fr.fetcher)
if err != nil {
return err
}
subfr.parent = fr
if err := subfr.ForeachChunk(fn); err != nil {
return err
}
} else {
if err := fn(fr.ss.BlobRef, *bp); err != nil {
return err
}
}
}
return nil
}
// GetChunkOffsets sends c each of the file's chunk offsets.
// The offsets are not necessarily sent in order, and all ranges of the file
// are not necessarily represented if the file contains zero holes.
@ -207,7 +245,7 @@ func (fr *FileReader) sendPartsChunks(c chan<- int64, firstErrc chan error, off
for _, p := range parts {
switch {
case p.BlobRef.Valid() && p.BytesRef.Valid():
return fmt.Errorf("part illegally contained both a blobRef and bytesRef")
return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef)
case !p.BlobRef.Valid() && !p.BytesRef.Valid():
// Don't send
case p.BlobRef.Valid():
@ -301,6 +339,7 @@ func (fr *FileReader) getSuperset(br blob.Ref) (*superset, error) {
if err != nil {
return nil, err
}
ss.BlobRef = br
fr.ssmmu.Lock()
defer fr.ssmmu.Unlock()
fr.ssm[br] = ss