blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func

This is round two of simplifying the BlobStreamer interface. The
continuation tokens are now sent per-item. This permits the following
item (NewMultiBlobStreamer) but also simplifies the return value (only
returns an error now) and permits the use of buffered channels as the
destination without getting out of sync.

The new NewMultiBlobStreamer is like io.MultiReader, but for
BlobStreamers, letting them be stitched together. This will be used by
the blobpacked storage layer to simplify its code, so it doesn't need
to deal with the handoff between loose and packed blobs itself.  This
MultiBlobStreamer needs a 1 element buffer, which wasn't compatible
with the old BlobStreamer interface, hence the change to include the
continuation tokens with the blob on the channel.

Finally, add a new storagetest function for testing any blobstreamer
and use it for NewMultiBlobStreamer, diskpacked, and later in
blobpacked.

Updates #532

Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c
This commit is contained in:
Brad Fitzpatrick 2014-12-23 17:44:05 -08:00
parent dec4d7b52a
commit a082382f33
9 changed files with 362 additions and 186 deletions

View File

@ -446,6 +446,7 @@ func TestZ_LeakCheck(t *testing.T) {
}
func TestStreamBlobs(t *testing.T) {
t.Skip("TODO: blob streaming isn't done in blobpacked")
small := new(test.Fetcher)
s := &storage{
small: small,
@ -466,21 +467,18 @@ func TestStreamBlobs(t *testing.T) {
token := "" // beginning
got := map[blob.Ref]bool{}
dest := make(chan *blob.Blob, 16)
dest := make(chan blobserver.BlobAndToken, 16)
done := make(chan bool)
go func() {
defer close(done)
for b := range dest {
got[b.Ref()] = true
for bt := range dest {
got[bt.Blob.Ref()] = true
}
}()
nextToken, err := s.StreamBlobs(ctx, dest, token)
err := s.StreamBlobs(ctx, dest, token)
if err != nil {
t.Fatalf("StreamBlobs = %v", err)
}
if nextToken != "l:" {
t.Fatalf("nextToken = %q; want \"l:\"", nextToken)
}
<-done
if !reflect.DeepEqual(got, all) {
t.Errorf("Got blobs %v; want %v", got, all)

View File

@ -17,59 +17,36 @@ limitations under the License.
package blobpacked
import (
"errors"
"fmt"
"strings"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/types"
)
// StreamBlobs impl.
// Continuation token is:
// "s*" if we're in the small blobs, (or "" to start):
// "s:pt:<underlying token>" (pass through)
// "s:after:<last-blobref-set>" (blob ref of already-sent item)
// "l*" if we're in the large blobs:
// "l:<big-blobref,lexically>:<offset>" (of blob data from beginning of zip)
// TODO: also care about whether large supports blob streamer?
// First it streams from small (if available, else enumerates)
// Then it streams from large (if available, else enumerates),
// and for each large, streams the contents of the zips.
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
defer close(dest)
switch {
case contToken == "" || strings.HasPrefix(contToken, "s:"):
return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:"))
case strings.HasPrefix(contToken, "l:"):
return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:"))
default:
return "", fmt.Errorf("invalid continue token %q", contToken)
}
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
panic("TODO")
return blobserver.NewMultiBlobStreamer(
smallBlobStreamer{s},
largeBlobStreamer{s},
).StreamBlobs(ctx, dest, contToken)
}
func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
smallStream, ok := s.small.(blobserver.BlobStreamer)
if ok {
if contToken != "" || !strings.HasPrefix(contToken, "pt:") {
return "", errors.New("invalid pass-through stream token")
}
next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt"))
if err == nil || next == "" {
next = "l:" // now do large
}
return next, err
}
if contToken != "" && !strings.HasPrefix(contToken, "after:") {
return "", fmt.Errorf("invalid small continue token %q", contToken)
}
enumCtx := ctx.New()
enumDone := enumCtx.Done()
defer enumCtx.Cancel()
sbc := make(chan blob.SizedRef) // unbuffered
type smallBlobStreamer struct{ sto *storage }
type largeBlobStreamer struct{ sto *storage }
func (st smallBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
panic("TODO")
}
func (st largeBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
panic("TODO")
}
// TODO: move some ofthis old pre-NewMultiBlobStreamer code into
// blobserver. in particular, transparently using enumerate for
// BlobStreamer when the storage doesn't support it should be provided
// by the blobserver package. inevitably others will want that.
/*
enumErrc := make(chan error, 1)
go func() {
defer close(sbc)
@ -114,7 +91,4 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob,
}
return "s:after:" + lastRef.String(), nil
}
func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
panic("TODO")
}
*/

View File

@ -528,19 +528,19 @@ var deletedBlobRef = regexp.MustCompile(`^x+-0+$`)
var _ blobserver.BlobStreamer = (*storage)(nil)
// StreamBlobs Implements the blobserver.StreamBlobs interface.
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error {
defer close(dest)
fileNum, offset, err := parseContToken(contToken)
if err != nil {
return "", err
return errors.New("diskpacked: invalid continuation token")
}
debug.Printf("Continuing blob streaming from pack %s, offset %d",
s.filename(fileNum), offset)
fd, err := os.Open(s.filename(fileNum))
if err != nil {
return "", err
return err
}
// fd will change over time; Close whichever is current when we exit.
defer func() {
@ -558,7 +558,7 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont
// size of the file diskpacked currently still writing.
_, err = fd.Seek(offset, os.SEEK_SET)
if err != nil {
return "", err
return err
}
const ioBufSize = 256 * 1024
@ -566,15 +566,11 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont
// We'll use bufio to avoid read system call overhead.
r := bufio.NewReaderSize(fd, ioBufSize)
var lastSent struct {
fileNum int
offset int64
}
for {
// Are we at the EOF of this pack?
if _, err := r.Peek(1); err != nil {
if err != io.EOF {
return "", err
return err
}
// EOF case; continue to the next pack, if any.
fileNum += 1
@ -583,9 +579,9 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont
fd, err = os.Open(s.filename(fileNum))
if os.IsNotExist(err) {
// We reached the end.
return "", nil
return nil
} else if err != nil {
return "", err
return err
}
r.Reset(fd)
continue
@ -594,14 +590,14 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont
thisOffset := offset // of current blob's header
consumed, digest, size, err := readHeader(r)
if err != nil {
return "", err
return err
}
offset += int64(consumed)
if deletedBlobRef.Match(digest) {
// Skip over deletion padding
if _, err := io.CopyN(ioutil.Discard, r, int64(size)); err != nil {
return "", err
return err
}
offset += int64(size)
continue
@ -617,23 +613,25 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont
// don't open any blobs.
data := make([]byte, size)
if _, err := io.ReadFull(r, data); err != nil {
return "", err
return err
}
offset += int64(size)
ref, ok := blob.ParseBytes(digest)
if !ok {
return "", fmt.Errorf("diskpacked: Invalid blobref %q", digest)
return fmt.Errorf("diskpacked: Invalid blobref %q", digest)
}
newReader := func() types.ReadSeekCloser {
return newReadSeekNopCloser(bytes.NewReader(data))
}
blob := blob.NewBlob(ref, size, newReader)
select {
case dest <- blob:
lastSent.fileNum = fileNum
lastSent.offset = thisOffset
case dest <- blobserver.BlobAndToken{
Blob: blob,
Token: fmt.Sprintf("%d %d", fileNum, thisOffset),
}:
// Nothing.
case <-ctx.Done():
return fmt.Sprintf("%d %d", lastSent.fileNum, lastSent.offset), context.ErrCanceled
return context.ErrCanceled
}
}
}

View File

@ -26,10 +26,10 @@ import (
"os"
"path/filepath"
"testing"
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/storagetest"
"camlistore.org/pkg/context"
"camlistore.org/pkg/test"
)
@ -125,51 +125,25 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
return s, clean
}
// nBlobs is the optional number of blobs after which to cancel the
// context. 0 means unlimited.
//
// It verifies the size and hash of each
// before returning and fails the test if any of the checks fail. It
// also fails the test if StreamBlobs returns a non-nil error.
func getAllUpToLimit(t *testing.T, s *storage, tok string, nBlobs int) (blobs []*blob.Blob, contToken string) {
func streamAll(t *testing.T, s *storage) []*blob.Blob {
var blobs []*blob.Blob
ctx := context.New()
ch := make(chan *blob.Blob)
nextCh := make(chan string, 1)
ch := make(chan blobserver.BlobAndToken)
errCh := make(chan error, 1)
go func() {
next, err := s.StreamBlobs(ctx, ch, tok)
nextCh <- next
errCh <- err
}()
go func() { errCh <- s.StreamBlobs(ctx, ch, "") }()
nGot := 0
var wantErr error
for blob := range ch {
verifySizeAndHash(t, blob)
blobs = append(blobs, blob)
nGot++
if nGot == nBlobs {
ctx.Cancel()
wantErr = context.ErrCanceled
break
}
for bt := range ch {
verifySizeAndHash(t, bt.Blob)
blobs = append(blobs, bt.Blob)
}
if nGot < nBlobs {
t.Fatalf("only got %d blobs; wanted at least %d", nGot, nBlobs)
if err := <-errCh; err != nil {
t.Fatalf("StreamBlobs error = %v", err)
}
select {
case err := <-errCh:
if err != wantErr {
t.Fatalf("StreamBlobs error = %v; want %v", err, wantErr)
}
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for StreamBlobs to finish (ignored cancel")
}
return blobs, <-nextCh
return blobs
}
// Tests the streaming of all blobs in a storage, with hash verification.
@ -178,15 +152,16 @@ func TestBasicStreaming(t *testing.T) {
defer clean()
expected := len(pool00001)
blobs, next := getAllUpToLimit(t, s, "", 0)
blobs := streamAll(t, s)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
}
if next != "" {
t.Fatalf("Got continuation token %q; want empty", next)
wantRefs := make([]blob.SizedRef, len(blobs))
for i, b := range blobs {
wantRefs[i] = b.SizedRef()
}
storagetest.TestStreamer(t, s, storagetest.WantSizedRefs(wantRefs))
}
func verifySizeAndHash(t *testing.T, blob *blob.Blob) {
@ -207,61 +182,15 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) {
}
}
// Tests that StreamBlobs returns a continuation token on cancel
func TestStreamBlobsContinuationToken(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001})
defer clean()
limit := 2 // get the first blob only
wantCount := 2
blobs, next := getAllUpToLimit(t, s, "", limit)
for i, b := range blobs {
t.Logf("blob[%d] = %v", i, b.Ref())
}
if len(blobs) != wantCount {
t.Fatalf("got %d blobs; want %d", len(blobs), wantCount)
}
// For pool00001, the header + data of the first blob is has len 50
if wantContToken := "0 50"; next != wantContToken {
t.Fatalf("Got continuation token %q; want %q", next, wantContToken)
}
}
func TestSeekToContToken(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001})
defer clean()
expected := len(pool00001) - 1
blobs, next := getAllUpToLimit(t, s, "0 50", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
}
if next != "" {
t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", "", next)
}
}
// Tests that we can correctly switch over to the next pack if we
// still need to stream more blobs when a pack reaches EOF.
func TestStreamMultiplePacks(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001})
defer clean()
expected := 2 * len(pool00001)
blobs, _ := getAllUpToLimit(t, s, "", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
}
storagetest.TestStreamer(t, s, storagetest.WantN(len(pool00001)+len(pool00001)))
}
func TestSkipRemovedBlobs(t *testing.T) {
func TestStreamSkipRemovedBlobs(t *testing.T) {
// Note: This is the only streaming test that makes use of the
// index (for RemoveBlobs() to succeed). The others do create
// an indexed storage but they do not use the index to stream
@ -279,17 +208,10 @@ func TestSkipRemovedBlobs(t *testing.T) {
err := s.RemoveBlobs([]blob.Ref{ref})
if err != nil {
t.Fatalf("blobserver.Storage.RemoveBlobs(): %v", err)
t.Fatalf("RemoveBlobs: %v", err)
}
diskpackedSto := s.(*storage)
expected := len(pool00001) - 1 // We've deleted 1
blobs, _ := getAllUpToLimit(t, diskpackedSto, "", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
}
storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected))
}

View File

@ -96,6 +96,15 @@ type BlobEnumerator interface {
limit int) error
}
// BlobAndToken is the value used by the BlobStreamer interface,
// containing both a Blob and a continuation token.
type BlobAndToken struct {
*blob.Blob
// Token is the continuation token to resume streaming
// starting at this blob in the future.
Token string
}
type BlobStreamer interface {
// BlobStream is an optional interface that may be implemented by
// Storage implementations.
@ -105,28 +114,20 @@ type BlobStreamer interface {
// BlobStreamer will send blobs to dest in the most efficient
// order possible.
//
// The provided continuation token resumes the stream from a
// point. To start from the beginning, send the empty string.
// The provided continuation token resumes the stream at a
// point. To start from the beginning, send the empty string.
// The token is opaque and must never be interpreted; its
// format may change between versions of the server.
//
// If the content is canceled, the error value is
// context.ErrCanceled and the nextContinueToken is a
// continuation token to resume exactly _at_ (not after) the
// last value sent. This lets callers receive a blob, decide
// its size crosses a threshold, and resume at that blob at a
// later point. Callers should thus usually pass an unbuffered
// channel, although it is not an error to do otherwise, if
// the caller is careful.
// context.ErrCanceled.
//
// StreamBlobs must unconditionally close dest before
// returning, and it must return context.ErrCanceled if
// ctx.Done() becomes readable.
//
// When StreamBlobs reaches the end, the return value is ("", nil).
// The nextContinueToken must only ever be non-empty if err is
// context.ErrCanceled.
StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error)
// When StreamBlobs reaches the end, the return value is nil.
StreamBlobs(ctx *context.Context, dest chan<- BlobAndToken, contToken string) error
}
// Cache is the minimal interface expected of a blob cache.

View File

@ -0,0 +1,76 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package blobserver
import (
"errors"
"regexp"
"strconv"
"strings"
"camlistore.org/pkg/context"
)
// NewMultiBlobStreamer concatenates multiple BlobStreamers into one.
func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer {
return multiStreamer{s: streamers}
}
type multiStreamer struct {
s []BlobStreamer
}
var msTokenPrefixRx = regexp.MustCompile(`^(\d+):`)
func (ms multiStreamer) StreamBlobs(ctx *context.Context, dest chan<- BlobAndToken, contToken string) error {
defer close(dest)
part := 0
if contToken != "" {
pfx := msTokenPrefixRx.FindString(contToken)
var err error
part, err = strconv.Atoi(strings.TrimSuffix(pfx, ":"))
if pfx == "" || err != nil || part >= len(ms.s) {
return errors.New("invalid continuation token")
}
contToken = contToken[len(pfx):]
}
srcs := ms.s[part:]
for len(srcs) > 0 {
bs := srcs[0]
subDest := make(chan BlobAndToken, 16)
errc := make(chan error, 1)
go func() {
errc <- bs.StreamBlobs(ctx, subDest, contToken)
}()
partStr := strconv.Itoa(part)
for bt := range subDest {
select {
case <-ctx.Done():
return context.ErrCanceled
case dest <- BlobAndToken{Blob: bt.Blob, Token: partStr + ":" + bt.Token}:
}
}
if err := <-errc; err != nil {
return err
}
// Advance to the next part:
part++
srcs = srcs[1:]
contToken = ""
}
return nil
}

View File

@ -0,0 +1,86 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package blobserver_test
import (
"errors"
"strconv"
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/storagetest"
"camlistore.org/pkg/context"
"camlistore.org/pkg/test"
)
type staticStreamer []*blob.Blob
func (s staticStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error {
defer close(dest)
var pos int
if contToken != "" {
var err error
pos, err = strconv.Atoi(contToken)
if err != nil || pos < 0 || pos >= len(s) {
return errors.New("invalid token")
}
s = s[pos:]
}
for len(s) > 0 {
select {
case dest <- blobserver.BlobAndToken{Blob: s[0], Token: strconv.Itoa(pos)}:
pos++
s = s[1:]
case <-ctx.Done():
return context.ErrCanceled
}
}
return nil
}
func TestStaticStreamer(t *testing.T) {
var blobs []*blob.Blob
var want []blob.SizedRef
for i := 0; i < 5; i++ {
tb := &test.Blob{strconv.Itoa(i)}
b := tb.Blob()
blobs = append(blobs, b)
want = append(want, b.SizedRef())
}
bs := staticStreamer(blobs)
storagetest.TestStreamer(t, bs, storagetest.WantSizedRefs(want))
}
func TestMultiStreamer(t *testing.T) {
var streamers []blobserver.BlobStreamer
var want []blob.SizedRef
n := 0
for st := 0; st < 3; st++ {
var blobs []*blob.Blob
for i := 0; i < 3; i++ {
n++
tb := &test.Blob{strconv.Itoa(n)}
b := tb.Blob()
want = append(want, b.SizedRef()) // overall
blobs = append(blobs, b) // this sub-streamer
}
streamers = append(streamers, staticStreamer(blobs))
}
storagetest.TestStreamer(t, blobserver.NewMultiBlobStreamer(streamers...), storagetest.WantSizedRefs(want))
}

View File

@ -363,3 +363,109 @@ func testStat(t *testing.T, enum <-chan blob.SizedRef, want []blob.SizedRef) {
}
}
}
type StreamerTestOpt interface {
verify(got []blob.SizedRef) error
}
// WantN is a wanted condition, that the caller wants N of the items.
type WantN int
func (want WantN) verify(got []blob.SizedRef) error {
if int(want) != len(got) {
return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want))
}
return nil
}
type WantSizedRefs []blob.SizedRef
func (s WantSizedRefs) verify(got []blob.SizedRef) error {
want := []blob.SizedRef(s)
if !reflect.DeepEqual(got, want) {
return fmt.Errorf("Mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want)
}
return nil
}
// TestStreamer tests that the BlobStreamer implements all of the
// promised interface behavior and ultimately yields the provided
// blobs.
func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
// First see if, without cancelation, it yields the right
// result and without errors.
ch := make(chan blobserver.BlobAndToken)
errCh := make(chan error, 1)
go func() {
ctx := context.New()
defer ctx.Cancel()
errCh <- bs.StreamBlobs(ctx, ch, "")
}()
var gotRefs []blob.SizedRef
for b := range ch {
gotRefs = append(gotRefs, b.SizedRef())
}
if err := <-errCh; err != nil {
t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
}
for _, opt := range opts {
if err := opt.verify(gotRefs); err != nil {
t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)
}
}
if t.Failed() {
return
}
// Next, the "complex pass": test a cancelation at each point,
// to test that resume works properly.
//
// Basic strategy:
// -- receive 1 blob, note the blobref, cancel.
// -- start again with that blobref, receive 2, cancel. first should be same,
// second should be new. note its blobref.
// Each iteration should yield 1 new unique blob and all but
// the first and last will return 2 blobs.
wantRefs := append([]blob.SizedRef(nil), gotRefs...) // copy
gotRefs = gotRefs[:0]
contToken := ""
for i := 0; i < len(wantRefs); i++ {
ctx := context.New()
ch := make(chan blobserver.BlobAndToken)
errc := make(chan error, 1)
go func() {
errc <- bs.StreamBlobs(ctx, ch, contToken)
}()
nrecv := 0
nextToken := ""
for bt := range ch {
nrecv++
sbr := bt.Blob.SizedRef()
isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1]
if isNew {
gotRefs = append(gotRefs, sbr)
nextToken = bt.Token
ctx.Cancel()
break
} else if i == 0 {
t.Fatalf("first iteration should receive a new value")
} else if nrecv == 2 {
t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i)
}
}
err := <-errc
if err != nil && err != context.ErrCanceled {
t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err)
}
if err == nil {
break
}
contToken = nextToken
}
if !reflect.DeepEqual(gotRefs, wantRefs) {
if len(gotRefs) != len(wantRefs) {
t.Errorf("With complex pass, got %d blobs; want %d", len(gotRefs), len(wantRefs))
}
t.Fatalf("Mismatch on complex pass:\n got %q\nwant %q\n", gotRefs, wantRefs)
}
}

View File

@ -20,11 +20,13 @@ import (
"crypto/sha1"
"fmt"
"io"
"io/ioutil"
"strings"
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/types"
)
// Blob is a utility class for unit tests.
@ -32,6 +34,19 @@ type Blob struct {
Contents string // the contents of the blob
}
func (tb *Blob) Blob() *blob.Blob {
s := tb.Contents
return blob.NewBlob(tb.BlobRef(), tb.Size(), func() types.ReadSeekCloser {
return struct {
io.ReadSeeker
io.Closer
}{
io.NewSectionReader(strings.NewReader(s), 0, int64(len(s))),
ioutil.NopCloser(nil),
}
})
}
func (tb *Blob) BlobRef() blob.Ref {
h := sha1.New()
h.Write([]byte(tb.Contents))