perkeep/pkg/blobserver/storagetest/storagetest.go

572 lines
15 KiB
Go

/*
Copyright 2013 The Perkeep Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package storagetest tests blobserver.Storage implementations
package storagetest // import "perkeep.org/pkg/blobserver/storagetest"
import (
"context"
"errors"
"fmt"
"io"
"reflect"
"sort"
"strconv"
"strings"
"testing"
"time"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/blobserver"
"perkeep.org/pkg/test"
"go4.org/syncutil"
)
type Opts struct {
// New is required and must return the storage server to test.
New func(*testing.T) blobserver.Storage
// Retries specifies how long to wait to retry after each failure
// that may be an eventual consistency issue (enumerate, stat), etc.
Retries []time.Duration
SkipEnum bool // for when EnumerateBlobs is not implemented
}
func Test(t *testing.T, fn func(*testing.T) blobserver.Storage) {
TestOpt(t, Opts{New: fn})
}
type run struct {
t *testing.T
opt Opts
sto blobserver.Storage
}
func TestOpt(t *testing.T, opt Opts) {
sto := opt.New(t)
r := &run{
t: t,
opt: opt,
sto: sto,
}
t.Logf("Testing blobserver storage %T", sto)
t.Logf("Testing Enumerate for empty")
r.testEnumerate(nil)
t.Logf("Test stat of blob not existing")
{
b := &test.Blob{Contents: "not exist"}
blobRefs := []blob.Ref{b.BlobRef()}
testStat(t, sto, blobRefs, nil)
}
var blobs []*test.Blob
var blobRefs []blob.Ref
var blobSizedRefs []blob.SizedRef
contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"}
if !testing.Short() {
for i := 0; i < 95; i++ {
contents = append(contents, "foo-"+strconv.Itoa(i))
}
}
t.Logf("Testing receive")
for i, x := range contents {
b1 := &test.Blob{Contents: x}
if testing.Short() {
t.Logf("blob[%d] = %s: %q", i, b1.BlobRef(), x)
}
b1s, err := sto.ReceiveBlob(context.Background(), b1.BlobRef(), b1.Reader())
if err != nil {
t.Fatalf("ReceiveBlob of %s: %v", b1, err)
}
if b1s != b1.SizedRef() {
t.Fatalf("Received %v; want %v", b1s, b1.SizedRef())
}
blobs = append(blobs, b1)
blobRefs = append(blobRefs, b1.BlobRef())
blobSizedRefs = append(blobSizedRefs, b1.SizedRef())
switch len(blobSizedRefs) {
case 1, 5, 100:
t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs))
r.testEnumerate(blobSizedRefs)
}
}
t.Logf("Testing Fetch")
for i, b2 := range blobs {
rc, size, err := sto.Fetch(context.Background(), b2.BlobRef())
if err != nil {
t.Fatalf("error fetching %d. %s: %v", i, b2, err)
}
testSizedBlob(t, rc, b2.BlobRef(), int64(size))
rc.Close()
}
t.Logf("Testing Stat")
testStat(t, sto, blobRefs, blobSizedRefs)
// Enumerate tests.
sort.Sort(blob.SizedByRef(blobSizedRefs))
t.Logf("Testing Enumerate on all")
r.testEnumerate(blobSizedRefs)
t.Logf("Testing Enumerate 'limit' param")
r.testEnumerate(blobSizedRefs[:3], 3)
// Enumerate 'after'
{
after := blobSizedRefs[2].Ref.String()
t.Logf("Testing Enumerate 'after' param; after %q", after)
r.testEnumerate(blobSizedRefs[3:], after)
}
// Enumerate 'after' + limit
{
after := blobSizedRefs[2].Ref.String()
t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
r.testEnumerate(blobSizedRefs[3:4], after, 1)
}
// Enumerate 'after' with prefix of a blobref + limit
{
after := "a"
t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
r.testEnumerate(blobSizedRefs[:1], after, 1)
}
r.testRemove(blobRefs)
r.testSubFetcher()
}
func (r *run) testRemove(blobRefs []blob.Ref) {
ctx := context.Background()
t, sto := r.t, r.sto
t.Logf("Testing Remove")
if err := sto.RemoveBlobs(ctx, blobRefs); err != nil {
if strings.Contains(err.Error(), "not implemented") {
t.Logf("RemoveBlobs: %v", err)
return
}
t.Fatalf("RemoveBlobs: %v", err)
}
r.testEnumerate(nil) // verify they're all gone
if len(blobRefs) > 0 {
t.Logf("Testing double-delete")
if err := sto.RemoveBlobs(ctx, []blob.Ref{blobRefs[0]}); err != nil {
t.Fatalf("Double RemoveBlobs: %v", err)
}
}
}
func (r *run) testSubFetcher() {
t, sto := r.t, r.sto
sf, ok := sto.(blob.SubFetcher)
if !ok {
t.Logf("%T is not a SubFetcher", sto)
return
}
t.Logf("Testing SubFetch")
big := &test.Blob{Contents: "Some big blob"}
if _, err := sto.ReceiveBlob(context.Background(), big.BlobRef(), big.Reader()); err != nil {
t.Fatal(err)
}
regions := []struct {
off, limit int64
want string
errok bool
}{
{5, 3, "big", false},
{5, 8, "big blob", false},
{5, 100, "big blob", true},
}
for _, tt := range regions {
r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
if err == blob.ErrUnimplemented {
t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
return
}
if err != nil {
t.Fatalf("Error fetching big blob for SubFetch: %v", err)
}
if r == nil {
t.Fatal("SubFetch returned nil, nil")
}
all, err := io.ReadAll(r)
r.Close()
if err != nil && !tt.errok {
t.Errorf("Unexpected error reading SubFetch region %+v: %v", tt, err)
}
if string(all) != tt.want {
t.Errorf("SubFetch region %+v got %q; want %q", tt, all, tt.want)
}
}
// test invalid offsets
invalids := []struct {
off, limit int64
}{
{int64(len(big.Contents)) + 1, 1},
{-1, 1},
{1, -1},
}
for _, tt := range invalids {
r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
if err == blob.ErrUnimplemented {
t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
return
}
if err == nil {
r.Close()
t.Errorf("No error fetching with off=%d limit=%d; wanted an error", tt.off, tt.limit)
continue
}
if err != blob.ErrNegativeSubFetch && err != blob.ErrOutOfRangeOffsetSubFetch {
t.Errorf("Unexpected error fetching with off=%d limit=%d: %v", tt.off, tt.limit, err)
}
}
}
func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
h := b1.Hash()
n, err := io.Copy(h, r)
if err != nil {
t.Fatalf("error reading from %s: %v", r, err)
}
if n != size {
t.Fatalf("read %d bytes from %s, metadata said %d!", n, r, size)
}
b2 := blob.RefFromHash(h)
if b2 != b1 {
t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2)
}
}
func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error {
var after string
var n = 1000
for _, opt := range opts {
switch v := opt.(type) {
case string:
after = v
case int:
n = v
default:
panic("bad option of type " + fmt.Sprintf("%T", v))
}
}
want := append([]blob.SizedRef(nil), wantUnsorted...)
sort.Sort(blob.SizedByRef(want))
sbc := make(chan blob.SizedRef, 10)
var got []blob.SizedRef
var grp syncutil.Group
sawEnd := make(chan bool, 1)
grp.Go(func() error {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if err := sto.EnumerateBlobs(ctx, sbc, after, n); err != nil {
return fmt.Errorf("EnumerateBlobs(%q, %d): %v", after, n, err)
}
return nil
})
grp.Go(func() error {
var lastRef blob.Ref
for sb := range sbc {
if !sb.Valid() {
return fmt.Errorf("invalid blobref %#v received in enumerate", sb)
}
got = append(got, sb)
if lastRef.Valid() && sb.Ref.Less(lastRef) {
return fmt.Errorf("blobs appearing out of order")
}
lastRef = sb.Ref
}
sawEnd <- true
return nil
})
grp.Go(func() error {
select {
case <-sawEnd:
return nil
case <-time.After(10 * time.Second):
return errors.New("timeout waiting for EnumerateBlobs to close its channel")
}
})
if err := grp.Err(); err != nil {
return fmt.Errorf("Enumerate error: %v", err)
}
if len(got) == 0 && len(want) == 0 {
return nil
}
var gotSet = map[blob.SizedRef]bool{}
for _, sb := range got {
if gotSet[sb] {
return fmt.Errorf("duplicate blob %v returned in enumerate", sb)
}
gotSet[sb] = true
}
if !reflect.DeepEqual(got, want) {
return fmt.Errorf("enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
len(got), len(want), got, want)
}
return nil
}
func (r *run) testEnumerate(wantUnsorted []blob.SizedRef, opts ...interface{}) {
if r.opt.SkipEnum {
r.t.Log("Skipping enum test")
return
}
if err := r.withRetries(func() error {
return CheckEnumerate(r.sto, wantUnsorted, opts...)
}); err != nil {
r.t.Fatalf("%v", err)
}
}
func (r *run) withRetries(fn func() error) error {
delays := r.opt.Retries
for {
err := fn()
if err == nil || len(delays) == 0 {
return err
}
r.t.Logf("(operation failed; retrying after %v)", delays[0])
time.Sleep(delays[0])
delays = delays[1:]
}
}
func testStat(t *testing.T, sto blobserver.BlobStatter, blobs []blob.Ref, want []blob.SizedRef) {
// blobs may arrive in ANY order
pos := make(map[blob.Ref]int) // wanted ref => its position in want
need := make(map[blob.Ref]bool)
for i, sb := range want {
pos[sb.Ref] = i
need[sb.Ref] = true
}
err := sto.StatBlobs(context.Background(), blobs, func(sb blob.SizedRef) error {
if !sb.Valid() {
t.Errorf("StatBlobs func called with invalid/zero blob.SizedRef")
return nil
}
wantPos, ok := pos[sb.Ref]
if !ok {
t.Errorf("StatBlobs func called with unrequested ref %v (size %d)", sb.Ref, sb.Size)
return nil
}
if !need[sb.Ref] {
t.Errorf("StatBlobs func called with ref %v multiple times", sb.Ref)
return nil
}
delete(need, sb.Ref)
w := want[wantPos]
if sb != w {
t.Errorf("StatBlobs returned %v; want %v", sb, w)
}
return nil
})
for br := range need {
t.Errorf("StatBlobs never returned results for %v", br)
}
if err != nil {
t.Errorf("StatBlobs: %v", err)
}
}
type StreamerTestOpt interface {
verify(got []blob.SizedRef) error
}
// WantN is a wanted condition, that the caller wants N of the items.
type WantN int
func (want WantN) verify(got []blob.SizedRef) error {
if int(want) != len(got) {
return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want))
}
return nil
}
type WantSizedRefs []blob.SizedRef
func (s WantSizedRefs) verify(got []blob.SizedRef) error {
want := []blob.SizedRef(s)
if !reflect.DeepEqual(got, want) {
return fmt.Errorf("mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want)
}
return nil
}
// TestStreamer tests that the BlobStreamer bs implements all of the
// promised interface behavior and ultimately yields the provided
// blobs.
//
// If bs also implements BlobEnumerator, the two are compared for
// consistency.
func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
var sawEnum map[blob.SizedRef]bool
if enumer, ok := bs.(blobserver.BlobEnumerator); ok {
sawEnum = make(map[blob.SizedRef]bool)
// First do an enumerate over all blobs as a baseline. The Streamer should
// yield the same blobs, even if it's in a different order.
enumCtx, cancel := context.WithCancel(context.TODO())
defer cancel()
if err := blobserver.EnumerateAll(enumCtx, enumer, func(sb blob.SizedRef) error {
sawEnum[sb] = true
return nil
}); err != nil {
t.Fatalf("Enumerate: %v", err)
}
}
// See if, without cancellation, it yields the right
// result and without errors.
ch := make(chan blobserver.BlobAndToken)
errCh := make(chan error, 1)
go func() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
errCh <- bs.StreamBlobs(ctx, ch, "")
}()
var gotRefs []blob.SizedRef
sawStreamed := map[blob.Ref]int{}
for b := range ch {
sawStreamed[b.Ref()]++
sbr := b.SizedRef()
if sawEnum != nil {
if _, ok := sawEnum[sbr]; ok {
delete(sawEnum, sbr)
} else {
t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr)
}
}
gotRefs = append(gotRefs, sbr)
}
if err := <-errCh; err != nil {
t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
}
for br, n := range sawStreamed {
if n > 1 {
t.Errorf("Streamed returned duplicate %v, %d times", br, n)
}
}
nMissing := 0
for sbr := range sawEnum {
t.Errorf("Enumerate found %v but Streamer didn't return it", sbr)
nMissing++
if nMissing == 10 && len(sawEnum) > 10 {
t.Errorf("... etc ...")
break
}
}
for _, opt := range opts {
if err := opt.verify(gotRefs); err != nil {
t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)
}
}
if t.Failed() {
return
}
// Next, the "complex pass": test a cancellation at each point,
// to test that resume works properly.
//
// Basic strategy:
// -- receive 1 blob, note the blobref, cancel.
// -- start again with that blobref, receive 2, cancel. first should be same,
// second should be new. note its blobref.
// Each iteration should yield 1 new unique blob and all but
// the first and last will return 2 blobs.
wantRefs := append([]blob.SizedRef(nil), gotRefs...) // copy
sawStreamed = map[blob.Ref]int{}
gotRefs = gotRefs[:0]
contToken := ""
for i := 0; i < len(wantRefs); i++ {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ch := make(chan blobserver.BlobAndToken)
errc := make(chan error, 1)
go func() {
errc <- bs.StreamBlobs(ctx, ch, contToken)
}()
nrecv := 0
nextToken := ""
for bt := range ch {
nrecv++
sbr := bt.Blob.SizedRef()
isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1]
if isNew {
if sawStreamed[sbr.Ref] > 0 {
t.Fatalf("In complex pass, returned duplicate blob %v\n\nSo far, before interrupting:\n%v\n\nWant:\n%v", sbr, gotRefs, wantRefs)
}
sawStreamed[sbr.Ref]++
gotRefs = append(gotRefs, sbr)
nextToken = bt.Token
cancel()
break
} else if i == 0 {
t.Fatalf("first iteration should receive a new value")
} else if nrecv == 2 {
t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i)
}
}
err := <-errc
if err != nil && err != context.Canceled {
t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err)
}
if err == nil {
break
}
contToken = nextToken
}
if !reflect.DeepEqual(gotRefs, wantRefs) {
t.Errorf("Mismatch on complex pass (got %d, want %d):\n got %q\nwant %q\n", len(gotRefs), len(wantRefs), gotRefs, wantRefs)
wantMap := map[blob.SizedRef]bool{}
for _, sbr := range wantRefs {
wantMap[sbr] = true
}
for _, sbr := range gotRefs {
if _, ok := wantMap[sbr]; ok {
delete(wantMap, sbr)
} else {
t.Errorf("got has unwanted: %v", sbr)
}
}
missing := wantMap // found stuff has been deleted
for sbr := range missing {
t.Errorf("got is missing: %v", sbr)
}
t.FailNow()
}
}