diff --git a/TESTS b/TESTS index 7b5844db3..8244062d2 100644 --- a/TESTS +++ b/TESTS @@ -1,7 +1,5 @@ Tests needed --- finish storagetest.Test - -- pkg/client --- test FetchVia against a server returning compressed content. (fix in 3fa6d69405f036308931dd36e5070b2b19dbeadf without a new test) diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index 6d3e04fe6..e91881e8a 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -238,7 +238,7 @@ func (s *storage) filename(file int64) string { func (s *storage) RemoveBlobs(blobs []blob.Ref) error { // TODO(adg): remove blob from index and pad data with spaces - return errors.New("diskpacked: RemoveBlobs not implemented") + return blobserver.ErrNotImplemented } var statGate = syncutil.NewGate(20) // arbitrary @@ -268,7 +268,7 @@ func (s *storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (err er func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) (err error) { t := s.index.Find(after) - for i := 0; i < limit && t.Next(); i++ { + for i := 0; i < limit && t.Next(); { br, ok := blob.Parse(t.Key()) if !ok { err = fmt.Errorf("diskpacked: couldn't parse index key %q", t.Key()) @@ -280,6 +280,7 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit continue } dest <- m.SizedRef(br) + i++ } if err2 := t.Close(); err == nil && err2 != nil { err = err2 diff --git a/pkg/blobserver/diskpacked/diskpacked_test.go b/pkg/blobserver/diskpacked/diskpacked_test.go index 0958e45b4..21b2d6c92 100644 --- a/pkg/blobserver/diskpacked/diskpacked_test.go +++ b/pkg/blobserver/diskpacked/diskpacked_test.go @@ -30,6 +30,7 @@ func newTempDiskpacked(t *testing.T) (sto blobserver.Storage, cleanup func()) { if err != nil { t.Fatal(err) } + t.Logf("diskpacked test dir is %q", dir) s, err := newStorage(dir, 1<<20) if err != nil { t.Fatalf("newStorage: %v", err) diff --git a/pkg/blobserver/interface.go b/pkg/blobserver/interface.go index 46e1c569c..1c47720ea 100644 --- a/pkg/blobserver/interface.go +++ b/pkg/blobserver/interface.go @@ -35,6 +35,9 @@ const MaxBlobSize = 16 << 20 var ErrCorruptBlob = errors.New("corrupt blob; digest doesn't match") +// ErrNotImplemented should be returned in methods where the function is not implemented +var ErrNotImplemented = errors.New("not implemented") + // BlobReceiver is the interface for receiving type BlobReceiver interface { // ReceiveBlob accepts a newly uploaded blob and writes it to @@ -120,6 +123,7 @@ type BlobRemover interface { // RemoveBlobs removes 0 or more blobs. Removal of // non-existent items isn't an error. Returns failure if any // items existed but failed to be deleted. + // ErrNotImplemented may be returned for storage types not implementing removal. RemoveBlobs(blobs []blob.Ref) error } diff --git a/pkg/blobserver/localdisk/localdisk_test.go b/pkg/blobserver/localdisk/localdisk_test.go index 958f210a0..9d3ba8e76 100644 --- a/pkg/blobserver/localdisk/localdisk_test.go +++ b/pkg/blobserver/localdisk/localdisk_test.go @@ -27,6 +27,8 @@ import ( "testing" "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/test" . "camlistore.org/pkg/test/asserts" ) @@ -187,3 +189,10 @@ func TestRename(t *testing.T) { t.Fatal(err) } } + +func TestLocaldisk(t *testing.T) { + storagetest.Test(t, func(t *testing.T) (blobserver.Storage, func()) { + ds := NewStorage(t) + return ds, func() { cleanUp(ds) } + }) +} diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 8116cae45..88ff5f4fa 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -18,26 +18,208 @@ limitations under the License. package storagetest import ( + "crypto/sha1" + "io" + "sort" + "strconv" + "strings" "testing" + "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/test" + "camlistore.org/pkg/types" ) func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup func())) { sto, cleanup := fn(t) - defer cleanup() + defer func() { + if t.Failed() { + t.Logf("test %T FAILED, skipping cleanup!", sto) + } else { + cleanup() + } + }() + t.Logf("Testing blobserver storage %T", sto) + blobs := make([]*test.Blob, 0, 5) + blobRefs := make([]blob.Ref, 0, cap(blobs)) + blobSizedRefs := make([]blob.SizedRef, 0, cap(blobs)) + + t.Logf("Testing Enumerate for empty") + dest := make(chan blob.SizedRef) + go func() { + if err := sto.EnumerateBlobs(dest, "", 1000); err != nil { + t.Fatalf("EnumerateBlob: %v", err) + } + }() + testEnumerate(t, dest, blobSizedRefs) + + contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"} + if !testing.Short() { + for i := 0; i < 95; i++ { + contents = append(contents, "foo-"+strconv.Itoa(i)) + } + } t.Logf("Testing receive") + for _, x := range contents { + b1 := &test.Blob{x} + b1s, err := sto.ReceiveBlob(b1.BlobRef(), b1.Reader()) + if err != nil { + t.Fatalf("ReceiveBlob of %s: %v", b1, err) + } + if b1s != b1.SizedRef() { + t.Fatal("Received %v; want %v", b1s, b1.SizedRef()) + } + blobs = append(blobs, b1) + blobRefs = append(blobRefs, b1.BlobRef()) + blobSizedRefs = append(blobSizedRefs, b1.SizedRef()) - b1 := &test.Blob{"foo"} - b1s, err := sto.ReceiveBlob(b1.BlobRef(), b1.Reader()) - if err != nil { - t.Fatalf("ReceiveBlob of b1: %v", err) + switch len(blobSizedRefs) { + case 1, 5, 100: + t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs)) + dest = make(chan blob.SizedRef) + go func() { + if err := sto.EnumerateBlobs(dest, "", 1000); err != nil { + t.Fatalf("EnumerateBlob: %v", err) + } + }() + testEnumerate(t, dest, blobSizedRefs) + } } - if b1s != b1.SizedRef() { - t.Fatal("Received %v; want %v", b1s, b1.SizedRef()) + b1 := blobs[0] + + // finish here if you want to examine the test directory + //t.Fatalf("FINISH") + + t.Logf("Testing FetchStreaming") + for i, b2 := range blobs { + t.Logf("%d. fetching %s", i, b2.BlobRef()) + rc, size, err := sto.FetchStreaming(b2.BlobRef()) + if err != nil { + t.Fatalf("error fetching %d. %s: %v", i, b2, err) + } + defer rc.Close() + testSizedBlob(t, rc, b2.BlobRef(), size) } - // TODO: test all the other methods + if fetcher, ok := sto.(fetcher); ok { + t.Logf("fetching %s", b1.BlobRef()) + rsc, size, err := fetcher.Fetch(b1.BlobRef()) + if err != nil { + t.Fatalf("error fetching %s: %v", b1, err) + } + defer rsc.Close() + n, err := rsc.Seek(0, 0) + if err != nil { + t.Fatalf("error seeking in %s: %v", rsc, err) + } + if n != 0 { + t.Fatalf("after seeking to 0, we are at %d!", n) + } + testSizedBlob(t, rsc, b1.BlobRef(), size) + } + + t.Logf("Testing Stat") + dest = make(chan blob.SizedRef) + go func() { + if err := sto.StatBlobs(dest, blobRefs); err != nil { + t.Fatalf("error stating blobs %s: %v", blobRefs, err) + } + }() + testStat(t, dest, blobSizedRefs) + + t.Logf("Testing Enumerate") + dest = make(chan blob.SizedRef) + go func() { + if err := sto.EnumerateBlobs(dest, "", 1000); err != nil { + t.Fatalf("EnumerateBlob: %v", err) + } + }() + testEnumerate(t, dest, blobSizedRefs) + + t.Logf("Testing Remove") + if err := sto.RemoveBlobs(blobRefs); err != nil { + if strings.Index(err.Error(), "not implemented") >= 0 { + t.Logf("RemoveBlob %s: %v", b1, err) + } else { + t.Fatalf("RemoveBlob %s: %v", b1, err) + } + } +} + +type fetcher interface { + Fetch(blob blob.Ref) (types.ReadSeekCloser, int64, error) +} + +func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) { + h := sha1.New() + n, err := io.Copy(h, r) + if err != nil { + t.Fatalf("error reading from %s: %v", r, err) + } + if n != size { + t.Fatalf("read %d bytes from %s, metadata said %d!", n, size) + } + b2 := blob.RefFromHash(h) + if b2 != b1 { + t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2) + } +} + +func testEnumerate(t *testing.T, dest <-chan blob.SizedRef, want []blob.SizedRef) { + // must sort want slice! + m := make(map[string]int, len(want)) + refs := make([]string, 0, len(want)) + for i, sb := range want { + m[sb.Ref.String()] = i + refs = append(refs, sb.Ref.String()) + } + sort.Strings(refs) + + i := 0 + for sb := range dest { + t.Logf("enumerated %s", sb) + if !sb.Valid() { + break + } + wanted := want[m[refs[i]]] + if wanted.Size != sb.Size { + t.Fatalf("received blob size is %d, wanted %d for &%d", sb.Size, wanted.Size, i) + } + if wanted.Ref != sb.Ref { + t.Fatalf("received blob ref mismatch &%d: wanted %s, got %s", i, sb.Ref, wanted.Ref) + } + i++ + if i >= len(want) { + break + } + } +} + +func testStat(t *testing.T, dest <-chan blob.SizedRef, want []blob.SizedRef) { + // blobs may arrive in ANY order + m := make(map[string]int, len(want)) + for i, sb := range want { + m[sb.Ref.String()] = i + } + + i := 0 + for sb := range dest { + t.Logf("statted %s", sb) + if !sb.Valid() { + break + } + wanted := want[m[sb.Ref.String()]] + if wanted.Size != sb.Size { + t.Fatalf("received blob size is %d, wanted %d for &%d", sb.Size, wanted.Size, i) + } + if wanted.Ref != sb.Ref { + t.Fatalf("received blob ref mismatch &%d: wanted %s, got %s", i, sb.Ref, wanted.Ref) + } + i++ + if i >= len(want) { + break + } + } }