From 3388e363b908a02b947900bd6982c3040bc0ee12 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 29 Aug 2014 14:51:33 -0700 Subject: [PATCH] schema: add FileReader.ForeachChunk Will be used by the blobpacked storage server. Also, we can then rewrite much of the legacy crufty methods on FileReader in terms of this. Change-Id: Ia371001f1ed502fa1da2dbc4a8e1c29e5f0c9e8c --- pkg/schema/fileread_test.go | 68 +++++++++++++++++++++++++++++++++++++ pkg/schema/filereader.go | 41 +++++++++++++++++++++- 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/pkg/schema/fileread_test.go b/pkg/schema/fileread_test.go index 1cfb91143..d507171fb 100644 --- a/pkg/schema/fileread_test.go +++ b/pkg/schema/fileread_test.go @@ -19,11 +19,13 @@ package schema import ( "bytes" "fmt" + "io" "io/ioutil" "math/rand" "os" "testing" + "camlistore.org/pkg/blob" "camlistore.org/pkg/test" ) @@ -297,6 +299,72 @@ func TestReaderEfficiency(t *testing.T) { } } +func TestReaderForeachChunk(t *testing.T) { + fileSize := 4 << 20 + if testing.Short() { + fileSize = 1 << 20 + } + bigFile := make([]byte, fileSize) + rnd := rand.New(rand.NewSource(1)) + for i := range bigFile { + bigFile[i] = byte(rnd.Intn(256)) + } + sto := new(test.Fetcher) // in-memory blob storage + fileMap := NewFileMap("testfile") + fileref, err := WriteFileMap(sto, fileMap, bytes.NewReader(bigFile)) + if err != nil { + t.Fatalf("WriteFileMap: %v", err) + } + + fr, err := NewFileReader(sto, fileref) + if err != nil { + t.Fatal(err) + } + + var back bytes.Buffer + var totSize uint64 + err = fr.ForeachChunk(func(sref blob.Ref, p BytesPart) error { + if !sref.Valid() { + t.Fatal("invalid schema blob") + } + if p.BytesRef.Valid() { + t.Fatal("should never see a valid BytesRef") + } + if !p.BlobRef.Valid() { + t.Fatal("saw part with invalid blobref") + } + rc, size, err := sto.Fetch(p.BlobRef) + if err != nil { + return fmt.Errorf("Error fetching blobref of chunk %+v: %v", p, err) + } + defer rc.Close() + totSize += p.Size + if uint64(size) != p.Size { + return fmt.Errorf("fetched size %d doesn't match expected for chunk %+v", size, p) + } + n, err := io.Copy(&back, rc) + if err != nil { + return err + } + if n != int64(size) { + return fmt.Errorf("Copied unexpected %d bytes of chunk %+v", n, p) + } + return nil + }) + if err != nil { + t.Fatal("ForeachChunk = %v", err) + } + if back.Len() != fileSize { + t.Fatalf("Read file is %d bytes; want %d", back.Len(), fileSize) + } + if totSize != uint64(fileSize) { + t.Errorf("sum of parts = %d; want %d", totSize, fileSize) + } + if !bytes.Equal(back.Bytes(), bigFile) { + t.Errorf("file read mismatch") + } +} + type summary []byte func (s summary) String() string { diff --git a/pkg/schema/filereader.go b/pkg/schema/filereader.go index ad4b778fa..4570dc775 100644 --- a/pkg/schema/filereader.go +++ b/pkg/schema/filereader.go @@ -82,6 +82,7 @@ func NewFileReader(fetcher blob.Fetcher, fileBlobRef blob.Ref) (*FileReader, err if err != nil { return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %v", err) } + ss.BlobRef = fileBlobRef if ss.Type != "file" && ss.Type != "bytes" { return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type) } @@ -186,6 +187,43 @@ func (fr *FileReader) ReadAt(p []byte, offset int64) (n int, err error) { return n, err } +// ForeachChunk calls fn for each chunk of fr, in order. +// +// The schema argument will be the "file" or "bytes" schema blob in +// which the part is defined. The BytesPart will be the actual +// chunk. The fn function will not be called with BytesParts +// referencing a "BytesRef"; those are followed recursively instead. +// +// If fn returns an error, iteration stops and that error is returned +// from ForeachChunk. Other errors may be returned from ForeachChunk +// if schema blob fetches fail. +func (fr *FileReader) ForeachChunk(fn func(schema blob.Ref, p BytesPart) error) error { + for _, bp := range fr.ss.Parts { + if bp.BytesRef.Valid() && bp.BlobRef.Valid() { + return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef) + } + if bp.BytesRef.Valid() { + ss, err := fr.getSuperset(bp.BytesRef) + if err != nil { + return err + } + subfr, err := ss.NewFileReader(fr.fetcher) + if err != nil { + return err + } + subfr.parent = fr + if err := subfr.ForeachChunk(fn); err != nil { + return err + } + } else { + if err := fn(fr.ss.BlobRef, *bp); err != nil { + return err + } + } + } + return nil +} + // GetChunkOffsets sends c each of the file's chunk offsets. // The offsets are not necessarily sent in order, and all ranges of the file // are not necessarily represented if the file contains zero holes. @@ -207,7 +245,7 @@ func (fr *FileReader) sendPartsChunks(c chan<- int64, firstErrc chan error, off for _, p := range parts { switch { case p.BlobRef.Valid() && p.BytesRef.Valid(): - return fmt.Errorf("part illegally contained both a blobRef and bytesRef") + return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef) case !p.BlobRef.Valid() && !p.BytesRef.Valid(): // Don't send case p.BlobRef.Valid(): @@ -301,6 +339,7 @@ func (fr *FileReader) getSuperset(br blob.Ref) (*superset, error) { if err != nil { return nil, err } + ss.BlobRef = br fr.ssmmu.Lock() defer fr.ssmmu.Unlock() fr.ssm[br] = ss