Finish implementing storagetest

Add use into localdisk (diskpacked already uses it).
Add ErrNotImplemented error for blobserver and mention the possibility
for RemoveBlobs (diskpacked deficit).

Change-Id: I6a50f263a58c8d3d1611ff9a060ea9fa4aee6163
This commit is contained in:
Tamás Gulácsi 2013-11-30 22:58:16 +01:00
parent f544114844
commit f69306cbee
6 changed files with 207 additions and 12 deletions

2
TESTS
View File

@ -1,7 +1,5 @@
Tests needed
-- finish storagetest.Test
-- pkg/client --- test FetchVia against a server returning compressed content.
(fix in 3fa6d69405f036308931dd36e5070b2b19dbeadf without a new test)

View File

@ -238,7 +238,7 @@ func (s *storage) filename(file int64) string {
func (s *storage) RemoveBlobs(blobs []blob.Ref) error {
// TODO(adg): remove blob from index and pad data with spaces
return errors.New("diskpacked: RemoveBlobs not implemented")
return blobserver.ErrNotImplemented
}
var statGate = syncutil.NewGate(20) // arbitrary
@ -268,7 +268,7 @@ func (s *storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (err er
func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) (err error) {
t := s.index.Find(after)
for i := 0; i < limit && t.Next(); i++ {
for i := 0; i < limit && t.Next(); {
br, ok := blob.Parse(t.Key())
if !ok {
err = fmt.Errorf("diskpacked: couldn't parse index key %q", t.Key())
@ -280,6 +280,7 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit
continue
}
dest <- m.SizedRef(br)
i++
}
if err2 := t.Close(); err == nil && err2 != nil {
err = err2

View File

@ -30,6 +30,7 @@ func newTempDiskpacked(t *testing.T) (sto blobserver.Storage, cleanup func()) {
if err != nil {
t.Fatal(err)
}
t.Logf("diskpacked test dir is %q", dir)
s, err := newStorage(dir, 1<<20)
if err != nil {
t.Fatalf("newStorage: %v", err)

View File

@ -35,6 +35,9 @@ const MaxBlobSize = 16 << 20
var ErrCorruptBlob = errors.New("corrupt blob; digest doesn't match")
// ErrNotImplemented should be returned in methods where the function is not implemented
var ErrNotImplemented = errors.New("not implemented")
// BlobReceiver is the interface for receiving
type BlobReceiver interface {
// ReceiveBlob accepts a newly uploaded blob and writes it to
@ -120,6 +123,7 @@ type BlobRemover interface {
// RemoveBlobs removes 0 or more blobs. Removal of
// non-existent items isn't an error. Returns failure if any
// items existed but failed to be deleted.
// ErrNotImplemented may be returned for storage types not implementing removal.
RemoveBlobs(blobs []blob.Ref) error
}

View File

@ -27,6 +27,8 @@ import (
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/storagetest"
"camlistore.org/pkg/test"
. "camlistore.org/pkg/test/asserts"
)
@ -187,3 +189,10 @@ func TestRename(t *testing.T) {
t.Fatal(err)
}
}
func TestLocaldisk(t *testing.T) {
storagetest.Test(t, func(t *testing.T) (blobserver.Storage, func()) {
ds := NewStorage(t)
return ds, func() { cleanUp(ds) }
})
}

View File

@ -18,26 +18,208 @@ limitations under the License.
package storagetest
import (
"crypto/sha1"
"io"
"sort"
"strconv"
"strings"
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/test"
"camlistore.org/pkg/types"
)
func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup func())) {
sto, cleanup := fn(t)
defer cleanup()
defer func() {
if t.Failed() {
t.Logf("test %T FAILED, skipping cleanup!", sto)
} else {
cleanup()
}
}()
t.Logf("Testing blobserver storage %T", sto)
blobs := make([]*test.Blob, 0, 5)
blobRefs := make([]blob.Ref, 0, cap(blobs))
blobSizedRefs := make([]blob.SizedRef, 0, cap(blobs))
t.Logf("Testing Enumerate for empty")
dest := make(chan blob.SizedRef)
go func() {
if err := sto.EnumerateBlobs(dest, "", 1000); err != nil {
t.Fatalf("EnumerateBlob: %v", err)
}
}()
testEnumerate(t, dest, blobSizedRefs)
contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"}
if !testing.Short() {
for i := 0; i < 95; i++ {
contents = append(contents, "foo-"+strconv.Itoa(i))
}
}
t.Logf("Testing receive")
for _, x := range contents {
b1 := &test.Blob{x}
b1s, err := sto.ReceiveBlob(b1.BlobRef(), b1.Reader())
if err != nil {
t.Fatalf("ReceiveBlob of %s: %v", b1, err)
}
if b1s != b1.SizedRef() {
t.Fatal("Received %v; want %v", b1s, b1.SizedRef())
}
blobs = append(blobs, b1)
blobRefs = append(blobRefs, b1.BlobRef())
blobSizedRefs = append(blobSizedRefs, b1.SizedRef())
b1 := &test.Blob{"foo"}
b1s, err := sto.ReceiveBlob(b1.BlobRef(), b1.Reader())
if err != nil {
t.Fatalf("ReceiveBlob of b1: %v", err)
switch len(blobSizedRefs) {
case 1, 5, 100:
t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs))
dest = make(chan blob.SizedRef)
go func() {
if err := sto.EnumerateBlobs(dest, "", 1000); err != nil {
t.Fatalf("EnumerateBlob: %v", err)
}
}()
testEnumerate(t, dest, blobSizedRefs)
}
}
if b1s != b1.SizedRef() {
t.Fatal("Received %v; want %v", b1s, b1.SizedRef())
b1 := blobs[0]
// finish here if you want to examine the test directory
//t.Fatalf("FINISH")
t.Logf("Testing FetchStreaming")
for i, b2 := range blobs {
t.Logf("%d. fetching %s", i, b2.BlobRef())
rc, size, err := sto.FetchStreaming(b2.BlobRef())
if err != nil {
t.Fatalf("error fetching %d. %s: %v", i, b2, err)
}
defer rc.Close()
testSizedBlob(t, rc, b2.BlobRef(), size)
}
// TODO: test all the other methods
if fetcher, ok := sto.(fetcher); ok {
t.Logf("fetching %s", b1.BlobRef())
rsc, size, err := fetcher.Fetch(b1.BlobRef())
if err != nil {
t.Fatalf("error fetching %s: %v", b1, err)
}
defer rsc.Close()
n, err := rsc.Seek(0, 0)
if err != nil {
t.Fatalf("error seeking in %s: %v", rsc, err)
}
if n != 0 {
t.Fatalf("after seeking to 0, we are at %d!", n)
}
testSizedBlob(t, rsc, b1.BlobRef(), size)
}
t.Logf("Testing Stat")
dest = make(chan blob.SizedRef)
go func() {
if err := sto.StatBlobs(dest, blobRefs); err != nil {
t.Fatalf("error stating blobs %s: %v", blobRefs, err)
}
}()
testStat(t, dest, blobSizedRefs)
t.Logf("Testing Enumerate")
dest = make(chan blob.SizedRef)
go func() {
if err := sto.EnumerateBlobs(dest, "", 1000); err != nil {
t.Fatalf("EnumerateBlob: %v", err)
}
}()
testEnumerate(t, dest, blobSizedRefs)
t.Logf("Testing Remove")
if err := sto.RemoveBlobs(blobRefs); err != nil {
if strings.Index(err.Error(), "not implemented") >= 0 {
t.Logf("RemoveBlob %s: %v", b1, err)
} else {
t.Fatalf("RemoveBlob %s: %v", b1, err)
}
}
}
type fetcher interface {
Fetch(blob blob.Ref) (types.ReadSeekCloser, int64, error)
}
func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
h := sha1.New()
n, err := io.Copy(h, r)
if err != nil {
t.Fatalf("error reading from %s: %v", r, err)
}
if n != size {
t.Fatalf("read %d bytes from %s, metadata said %d!", n, size)
}
b2 := blob.RefFromHash(h)
if b2 != b1 {
t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2)
}
}
func testEnumerate(t *testing.T, dest <-chan blob.SizedRef, want []blob.SizedRef) {
// must sort want slice!
m := make(map[string]int, len(want))
refs := make([]string, 0, len(want))
for i, sb := range want {
m[sb.Ref.String()] = i
refs = append(refs, sb.Ref.String())
}
sort.Strings(refs)
i := 0
for sb := range dest {
t.Logf("enumerated %s", sb)
if !sb.Valid() {
break
}
wanted := want[m[refs[i]]]
if wanted.Size != sb.Size {
t.Fatalf("received blob size is %d, wanted %d for &%d", sb.Size, wanted.Size, i)
}
if wanted.Ref != sb.Ref {
t.Fatalf("received blob ref mismatch &%d: wanted %s, got %s", i, sb.Ref, wanted.Ref)
}
i++
if i >= len(want) {
break
}
}
}
func testStat(t *testing.T, dest <-chan blob.SizedRef, want []blob.SizedRef) {
// blobs may arrive in ANY order
m := make(map[string]int, len(want))
for i, sb := range want {
m[sb.Ref.String()] = i
}
i := 0
for sb := range dest {
t.Logf("statted %s", sb)
if !sb.Valid() {
break
}
wanted := want[m[sb.Ref.String()]]
if wanted.Size != sb.Size {
t.Fatalf("received blob size is %d, wanted %d for &%d", sb.Size, wanted.Size, i)
}
if wanted.Ref != sb.Ref {
t.Fatalf("received blob ref mismatch &%d: wanted %s, got %s", i, sb.Ref, wanted.Ref)
}
i++
if i >= len(want) {
break
}
}
}