From a082382f335274985f874b570f58b8b2f822fcca Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 23 Dec 2014 17:44:05 -0800 Subject: [PATCH] blobserver: simplify interface more, add NewMultiBlobStreamer, storagetest func This is round two of simplifying the BlobStreamer interface. The continuation tokens are now sent per-item. This permits the following item (NewMultiBlobStreamer) but also simplifies the return value (only returns an error now) and permits the use of buffered channels as the destination without getting out of sync. The new NewMultiBlobStreamer is like io.MultiReader, but for BlobStreamers, letting them be stitched together. This will be used by the blobpacked storage layer to simplify its code, so it doesn't need to deal with the handoff between loose and packed blobs itself. This MultiBlobStreamer needs a 1 element buffer, which wasn't compatible with the old BlobStreamer interface, hence the change to include the continuation tokens with the blob on the channel. Finally, add a new storagetest function for testing any blobstreamer and use it for NewMultiBlobStreamer, diskpacked, and later in blobpacked. Updates #532 Change-Id: Iccffed289adec93ca5100c7ef8b0a8d57e05833c --- pkg/blobserver/blobpacked/blobpacked_test.go | 12 +- pkg/blobserver/blobpacked/stream.go | 72 ++++------- pkg/blobserver/diskpacked/diskpacked.go | 36 +++--- pkg/blobserver/diskpacked/stream_test.go | 118 ++++--------------- pkg/blobserver/interface.go | 27 +++-- pkg/blobserver/multistream.go | 76 ++++++++++++ pkg/blobserver/multistream_test.go | 86 ++++++++++++++ pkg/blobserver/storagetest/storagetest.go | 106 +++++++++++++++++ pkg/test/blob.go | 15 +++ 9 files changed, 362 insertions(+), 186 deletions(-) create mode 100644 pkg/blobserver/multistream.go create mode 100644 pkg/blobserver/multistream_test.go diff --git a/pkg/blobserver/blobpacked/blobpacked_test.go b/pkg/blobserver/blobpacked/blobpacked_test.go index 09a867b94..9f11b9fe7 100644 --- a/pkg/blobserver/blobpacked/blobpacked_test.go +++ b/pkg/blobserver/blobpacked/blobpacked_test.go @@ -446,6 +446,7 @@ func TestZ_LeakCheck(t *testing.T) { } func TestStreamBlobs(t *testing.T) { + t.Skip("TODO: blob streaming isn't done in blobpacked") small := new(test.Fetcher) s := &storage{ small: small, @@ -466,21 +467,18 @@ func TestStreamBlobs(t *testing.T) { token := "" // beginning got := map[blob.Ref]bool{} - dest := make(chan *blob.Blob, 16) + dest := make(chan blobserver.BlobAndToken, 16) done := make(chan bool) go func() { defer close(done) - for b := range dest { - got[b.Ref()] = true + for bt := range dest { + got[bt.Blob.Ref()] = true } }() - nextToken, err := s.StreamBlobs(ctx, dest, token) + err := s.StreamBlobs(ctx, dest, token) if err != nil { t.Fatalf("StreamBlobs = %v", err) } - if nextToken != "l:" { - t.Fatalf("nextToken = %q; want \"l:\"", nextToken) - } <-done if !reflect.DeepEqual(got, all) { t.Errorf("Got blobs %v; want %v", got, all) diff --git a/pkg/blobserver/blobpacked/stream.go b/pkg/blobserver/blobpacked/stream.go index bc71fd6bb..680afb969 100644 --- a/pkg/blobserver/blobpacked/stream.go +++ b/pkg/blobserver/blobpacked/stream.go @@ -17,59 +17,36 @@ limitations under the License. package blobpacked import ( - "errors" - "fmt" - "strings" - - "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/context" - "camlistore.org/pkg/types" ) // StreamBlobs impl. -// Continuation token is: -// "s*" if we're in the small blobs, (or "" to start): -// "s:pt:" (pass through) -// "s:after:" (blob ref of already-sent item) -// "l*" if we're in the large blobs: -// "l::" (of blob data from beginning of zip) -// TODO: also care about whether large supports blob streamer? -// First it streams from small (if available, else enumerates) -// Then it streams from large (if available, else enumerates), -// and for each large, streams the contents of the zips. -func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { - defer close(dest) - switch { - case contToken == "" || strings.HasPrefix(contToken, "s:"): - return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:")) - case strings.HasPrefix(contToken, "l:"): - return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:")) - default: - return "", fmt.Errorf("invalid continue token %q", contToken) - } +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) { + panic("TODO") + return blobserver.NewMultiBlobStreamer( + smallBlobStreamer{s}, + largeBlobStreamer{s}, + ).StreamBlobs(ctx, dest, contToken) } -func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { - smallStream, ok := s.small.(blobserver.BlobStreamer) - if ok { - if contToken != "" || !strings.HasPrefix(contToken, "pt:") { - return "", errors.New("invalid pass-through stream token") - } - next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt")) - if err == nil || next == "" { - next = "l:" // now do large - } - return next, err - } - if contToken != "" && !strings.HasPrefix(contToken, "after:") { - return "", fmt.Errorf("invalid small continue token %q", contToken) - } - enumCtx := ctx.New() - enumDone := enumCtx.Done() - defer enumCtx.Cancel() - sbc := make(chan blob.SizedRef) // unbuffered +type smallBlobStreamer struct{ sto *storage } +type largeBlobStreamer struct{ sto *storage } + +func (st smallBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) { + panic("TODO") +} + +func (st largeBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) { + panic("TODO") +} + +// TODO: move some ofthis old pre-NewMultiBlobStreamer code into +// blobserver. in particular, transparently using enumerate for +// BlobStreamer when the storage doesn't support it should be provided +// by the blobserver package. inevitably others will want that. +/* enumErrc := make(chan error, 1) go func() { defer close(sbc) @@ -114,7 +91,4 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, } return "s:after:" + lastRef.String(), nil } - -func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { - panic("TODO") -} +*/ diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index bdd29bf9c..58c07f7d0 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -528,19 +528,19 @@ var deletedBlobRef = regexp.MustCompile(`^x+-0+$`) var _ blobserver.BlobStreamer = (*storage)(nil) // StreamBlobs Implements the blobserver.StreamBlobs interface. -func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error { defer close(dest) fileNum, offset, err := parseContToken(contToken) if err != nil { - return "", err + return errors.New("diskpacked: invalid continuation token") } debug.Printf("Continuing blob streaming from pack %s, offset %d", s.filename(fileNum), offset) fd, err := os.Open(s.filename(fileNum)) if err != nil { - return "", err + return err } // fd will change over time; Close whichever is current when we exit. defer func() { @@ -558,7 +558,7 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // size of the file diskpacked currently still writing. _, err = fd.Seek(offset, os.SEEK_SET) if err != nil { - return "", err + return err } const ioBufSize = 256 * 1024 @@ -566,15 +566,11 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // We'll use bufio to avoid read system call overhead. r := bufio.NewReaderSize(fd, ioBufSize) - var lastSent struct { - fileNum int - offset int64 - } for { // Are we at the EOF of this pack? if _, err := r.Peek(1); err != nil { if err != io.EOF { - return "", err + return err } // EOF case; continue to the next pack, if any. fileNum += 1 @@ -583,9 +579,9 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont fd, err = os.Open(s.filename(fileNum)) if os.IsNotExist(err) { // We reached the end. - return "", nil + return nil } else if err != nil { - return "", err + return err } r.Reset(fd) continue @@ -594,14 +590,14 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont thisOffset := offset // of current blob's header consumed, digest, size, err := readHeader(r) if err != nil { - return "", err + return err } offset += int64(consumed) if deletedBlobRef.Match(digest) { // Skip over deletion padding if _, err := io.CopyN(ioutil.Discard, r, int64(size)); err != nil { - return "", err + return err } offset += int64(size) continue @@ -617,23 +613,25 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // don't open any blobs. data := make([]byte, size) if _, err := io.ReadFull(r, data); err != nil { - return "", err + return err } offset += int64(size) ref, ok := blob.ParseBytes(digest) if !ok { - return "", fmt.Errorf("diskpacked: Invalid blobref %q", digest) + return fmt.Errorf("diskpacked: Invalid blobref %q", digest) } newReader := func() types.ReadSeekCloser { return newReadSeekNopCloser(bytes.NewReader(data)) } blob := blob.NewBlob(ref, size, newReader) select { - case dest <- blob: - lastSent.fileNum = fileNum - lastSent.offset = thisOffset + case dest <- blobserver.BlobAndToken{ + Blob: blob, + Token: fmt.Sprintf("%d %d", fileNum, thisOffset), + }: + // Nothing. case <-ctx.Done(): - return fmt.Sprintf("%d %d", lastSent.fileNum, lastSent.offset), context.ErrCanceled + return context.ErrCanceled } } } diff --git a/pkg/blobserver/diskpacked/stream_test.go b/pkg/blobserver/diskpacked/stream_test.go index 20f0b19a2..2ddfc2f88 100644 --- a/pkg/blobserver/diskpacked/stream_test.go +++ b/pkg/blobserver/diskpacked/stream_test.go @@ -26,10 +26,10 @@ import ( "os" "path/filepath" "testing" - "time" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/context" "camlistore.org/pkg/test" ) @@ -125,51 +125,25 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { return s, clean } -// nBlobs is the optional number of blobs after which to cancel the -// context. 0 means unlimited. -// // It verifies the size and hash of each // before returning and fails the test if any of the checks fail. It // also fails the test if StreamBlobs returns a non-nil error. -func getAllUpToLimit(t *testing.T, s *storage, tok string, nBlobs int) (blobs []*blob.Blob, contToken string) { +func streamAll(t *testing.T, s *storage) []*blob.Blob { + var blobs []*blob.Blob ctx := context.New() - ch := make(chan *blob.Blob) - nextCh := make(chan string, 1) + ch := make(chan blobserver.BlobAndToken) errCh := make(chan error, 1) - go func() { - next, err := s.StreamBlobs(ctx, ch, tok) - nextCh <- next - errCh <- err - }() + go func() { errCh <- s.StreamBlobs(ctx, ch, "") }() - nGot := 0 - var wantErr error - for blob := range ch { - verifySizeAndHash(t, blob) - blobs = append(blobs, blob) - nGot++ - if nGot == nBlobs { - ctx.Cancel() - wantErr = context.ErrCanceled - break - } + for bt := range ch { + verifySizeAndHash(t, bt.Blob) + blobs = append(blobs, bt.Blob) } - - if nGot < nBlobs { - t.Fatalf("only got %d blobs; wanted at least %d", nGot, nBlobs) + if err := <-errCh; err != nil { + t.Fatalf("StreamBlobs error = %v", err) } - - select { - case err := <-errCh: - if err != wantErr { - t.Fatalf("StreamBlobs error = %v; want %v", err, wantErr) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for StreamBlobs to finish (ignored cancel") - } - - return blobs, <-nextCh + return blobs } // Tests the streaming of all blobs in a storage, with hash verification. @@ -178,15 +152,16 @@ func TestBasicStreaming(t *testing.T) { defer clean() expected := len(pool00001) - blobs, next := getAllUpToLimit(t, s, "", 0) - + blobs := streamAll(t, s) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, len(blobs)) } - if next != "" { - t.Fatalf("Got continuation token %q; want empty", next) + wantRefs := make([]blob.SizedRef, len(blobs)) + for i, b := range blobs { + wantRefs[i] = b.SizedRef() } + storagetest.TestStreamer(t, s, storagetest.WantSizedRefs(wantRefs)) } func verifySizeAndHash(t *testing.T, blob *blob.Blob) { @@ -207,61 +182,15 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) { } } -// Tests that StreamBlobs returns a continuation token on cancel -func TestStreamBlobsContinuationToken(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}) - defer clean() - - limit := 2 // get the first blob only - wantCount := 2 - blobs, next := getAllUpToLimit(t, s, "", limit) - - for i, b := range blobs { - t.Logf("blob[%d] = %v", i, b.Ref()) - } - if len(blobs) != wantCount { - t.Fatalf("got %d blobs; want %d", len(blobs), wantCount) - } - - // For pool00001, the header + data of the first blob is has len 50 - if wantContToken := "0 50"; next != wantContToken { - t.Fatalf("Got continuation token %q; want %q", next, wantContToken) - } -} - -func TestSeekToContToken(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}) - defer clean() - - expected := len(pool00001) - 1 - blobs, next := getAllUpToLimit(t, s, "0 50", 0) - - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) - } - - if next != "" { - t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", "", next) - } -} - // Tests that we can correctly switch over to the next pack if we // still need to stream more blobs when a pack reaches EOF. func TestStreamMultiplePacks(t *testing.T) { s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001}) defer clean() - - expected := 2 * len(pool00001) - blobs, _ := getAllUpToLimit(t, s, "", 0) - - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) - } + storagetest.TestStreamer(t, s, storagetest.WantN(len(pool00001)+len(pool00001))) } -func TestSkipRemovedBlobs(t *testing.T) { +func TestStreamSkipRemovedBlobs(t *testing.T) { // Note: This is the only streaming test that makes use of the // index (for RemoveBlobs() to succeed). The others do create // an indexed storage but they do not use the index to stream @@ -279,17 +208,10 @@ func TestSkipRemovedBlobs(t *testing.T) { err := s.RemoveBlobs([]blob.Ref{ref}) if err != nil { - t.Fatalf("blobserver.Storage.RemoveBlobs(): %v", err) + t.Fatalf("RemoveBlobs: %v", err) } diskpackedSto := s.(*storage) - expected := len(pool00001) - 1 // We've deleted 1 - blobs, _ := getAllUpToLimit(t, diskpackedSto, "", 0) - - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) - } - + storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected)) } diff --git a/pkg/blobserver/interface.go b/pkg/blobserver/interface.go index 646778563..db3b4cfb7 100644 --- a/pkg/blobserver/interface.go +++ b/pkg/blobserver/interface.go @@ -96,6 +96,15 @@ type BlobEnumerator interface { limit int) error } +// BlobAndToken is the value used by the BlobStreamer interface, +// containing both a Blob and a continuation token. +type BlobAndToken struct { + *blob.Blob + // Token is the continuation token to resume streaming + // starting at this blob in the future. + Token string +} + type BlobStreamer interface { // BlobStream is an optional interface that may be implemented by // Storage implementations. @@ -105,28 +114,20 @@ type BlobStreamer interface { // BlobStreamer will send blobs to dest in the most efficient // order possible. // - // The provided continuation token resumes the stream from a - // point. To start from the beginning, send the empty string. + // The provided continuation token resumes the stream at a + // point. To start from the beginning, send the empty string. // The token is opaque and must never be interpreted; its // format may change between versions of the server. // // If the content is canceled, the error value is - // context.ErrCanceled and the nextContinueToken is a - // continuation token to resume exactly _at_ (not after) the - // last value sent. This lets callers receive a blob, decide - // its size crosses a threshold, and resume at that blob at a - // later point. Callers should thus usually pass an unbuffered - // channel, although it is not an error to do otherwise, if - // the caller is careful. + // context.ErrCanceled. // // StreamBlobs must unconditionally close dest before // returning, and it must return context.ErrCanceled if // ctx.Done() becomes readable. // - // When StreamBlobs reaches the end, the return value is ("", nil). - // The nextContinueToken must only ever be non-empty if err is - // context.ErrCanceled. - StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) + // When StreamBlobs reaches the end, the return value is nil. + StreamBlobs(ctx *context.Context, dest chan<- BlobAndToken, contToken string) error } // Cache is the minimal interface expected of a blob cache. diff --git a/pkg/blobserver/multistream.go b/pkg/blobserver/multistream.go new file mode 100644 index 000000000..79d7fbedc --- /dev/null +++ b/pkg/blobserver/multistream.go @@ -0,0 +1,76 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blobserver + +import ( + "errors" + "regexp" + "strconv" + "strings" + + "camlistore.org/pkg/context" +) + +// NewMultiBlobStreamer concatenates multiple BlobStreamers into one. +func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer { + return multiStreamer{s: streamers} +} + +type multiStreamer struct { + s []BlobStreamer +} + +var msTokenPrefixRx = regexp.MustCompile(`^(\d+):`) + +func (ms multiStreamer) StreamBlobs(ctx *context.Context, dest chan<- BlobAndToken, contToken string) error { + defer close(dest) + part := 0 + if contToken != "" { + pfx := msTokenPrefixRx.FindString(contToken) + var err error + part, err = strconv.Atoi(strings.TrimSuffix(pfx, ":")) + if pfx == "" || err != nil || part >= len(ms.s) { + return errors.New("invalid continuation token") + } + contToken = contToken[len(pfx):] + } + srcs := ms.s[part:] + for len(srcs) > 0 { + bs := srcs[0] + subDest := make(chan BlobAndToken, 16) + errc := make(chan error, 1) + go func() { + errc <- bs.StreamBlobs(ctx, subDest, contToken) + }() + partStr := strconv.Itoa(part) + for bt := range subDest { + select { + case <-ctx.Done(): + return context.ErrCanceled + case dest <- BlobAndToken{Blob: bt.Blob, Token: partStr + ":" + bt.Token}: + } + } + if err := <-errc; err != nil { + return err + } + // Advance to the next part: + part++ + srcs = srcs[1:] + contToken = "" + } + return nil +} diff --git a/pkg/blobserver/multistream_test.go b/pkg/blobserver/multistream_test.go new file mode 100644 index 000000000..544c2a11a --- /dev/null +++ b/pkg/blobserver/multistream_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blobserver_test + +import ( + "errors" + "strconv" + "testing" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/storagetest" + "camlistore.org/pkg/context" + "camlistore.org/pkg/test" +) + +type staticStreamer []*blob.Blob + +func (s staticStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error { + defer close(dest) + var pos int + if contToken != "" { + var err error + pos, err = strconv.Atoi(contToken) + if err != nil || pos < 0 || pos >= len(s) { + return errors.New("invalid token") + } + s = s[pos:] + } + for len(s) > 0 { + select { + case dest <- blobserver.BlobAndToken{Blob: s[0], Token: strconv.Itoa(pos)}: + pos++ + s = s[1:] + case <-ctx.Done(): + return context.ErrCanceled + } + } + return nil +} + +func TestStaticStreamer(t *testing.T) { + var blobs []*blob.Blob + var want []blob.SizedRef + for i := 0; i < 5; i++ { + tb := &test.Blob{strconv.Itoa(i)} + b := tb.Blob() + blobs = append(blobs, b) + want = append(want, b.SizedRef()) + } + bs := staticStreamer(blobs) + storagetest.TestStreamer(t, bs, storagetest.WantSizedRefs(want)) +} + +func TestMultiStreamer(t *testing.T) { + var streamers []blobserver.BlobStreamer + var want []blob.SizedRef + n := 0 + + for st := 0; st < 3; st++ { + var blobs []*blob.Blob + for i := 0; i < 3; i++ { + n++ + tb := &test.Blob{strconv.Itoa(n)} + b := tb.Blob() + want = append(want, b.SizedRef()) // overall + blobs = append(blobs, b) // this sub-streamer + } + streamers = append(streamers, staticStreamer(blobs)) + } + storagetest.TestStreamer(t, blobserver.NewMultiBlobStreamer(streamers...), storagetest.WantSizedRefs(want)) +} diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 537196b3e..463d54e77 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -363,3 +363,109 @@ func testStat(t *testing.T, enum <-chan blob.SizedRef, want []blob.SizedRef) { } } } + +type StreamerTestOpt interface { + verify(got []blob.SizedRef) error +} + +// WantN is a wanted condition, that the caller wants N of the items. +type WantN int + +func (want WantN) verify(got []blob.SizedRef) error { + if int(want) != len(got) { + return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want)) + } + return nil +} + +type WantSizedRefs []blob.SizedRef + +func (s WantSizedRefs) verify(got []blob.SizedRef) error { + want := []blob.SizedRef(s) + if !reflect.DeepEqual(got, want) { + return fmt.Errorf("Mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want) + } + return nil +} + +// TestStreamer tests that the BlobStreamer implements all of the +// promised interface behavior and ultimately yields the provided +// blobs. +func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) { + // First see if, without cancelation, it yields the right + // result and without errors. + ch := make(chan blobserver.BlobAndToken) + errCh := make(chan error, 1) + go func() { + ctx := context.New() + defer ctx.Cancel() + errCh <- bs.StreamBlobs(ctx, ch, "") + }() + var gotRefs []blob.SizedRef + for b := range ch { + gotRefs = append(gotRefs, b.SizedRef()) + } + if err := <-errCh; err != nil { + t.Errorf("initial uninterrupted StreamBlobs error: %v", err) + } + for _, opt := range opts { + if err := opt.verify(gotRefs); err != nil { + t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err) + } + } + if t.Failed() { + return + } + + // Next, the "complex pass": test a cancelation at each point, + // to test that resume works properly. + // + // Basic strategy: + // -- receive 1 blob, note the blobref, cancel. + // -- start again with that blobref, receive 2, cancel. first should be same, + // second should be new. note its blobref. + // Each iteration should yield 1 new unique blob and all but + // the first and last will return 2 blobs. + wantRefs := append([]blob.SizedRef(nil), gotRefs...) // copy + gotRefs = gotRefs[:0] + contToken := "" + for i := 0; i < len(wantRefs); i++ { + ctx := context.New() + ch := make(chan blobserver.BlobAndToken) + errc := make(chan error, 1) + go func() { + errc <- bs.StreamBlobs(ctx, ch, contToken) + }() + nrecv := 0 + nextToken := "" + for bt := range ch { + nrecv++ + sbr := bt.Blob.SizedRef() + isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1] + if isNew { + gotRefs = append(gotRefs, sbr) + nextToken = bt.Token + ctx.Cancel() + break + } else if i == 0 { + t.Fatalf("first iteration should receive a new value") + } else if nrecv == 2 { + t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i) + } + } + err := <-errc + if err != nil && err != context.ErrCanceled { + t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err) + } + if err == nil { + break + } + contToken = nextToken + } + if !reflect.DeepEqual(gotRefs, wantRefs) { + if len(gotRefs) != len(wantRefs) { + t.Errorf("With complex pass, got %d blobs; want %d", len(gotRefs), len(wantRefs)) + } + t.Fatalf("Mismatch on complex pass:\n got %q\nwant %q\n", gotRefs, wantRefs) + } +} diff --git a/pkg/test/blob.go b/pkg/test/blob.go index 99f14d238..9844d5deb 100644 --- a/pkg/test/blob.go +++ b/pkg/test/blob.go @@ -20,11 +20,13 @@ import ( "crypto/sha1" "fmt" "io" + "io/ioutil" "strings" "testing" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/types" ) // Blob is a utility class for unit tests. @@ -32,6 +34,19 @@ type Blob struct { Contents string // the contents of the blob } +func (tb *Blob) Blob() *blob.Blob { + s := tb.Contents + return blob.NewBlob(tb.BlobRef(), tb.Size(), func() types.ReadSeekCloser { + return struct { + io.ReadSeeker + io.Closer + }{ + io.NewSectionReader(strings.NewReader(s), 0, int64(len(s))), + ioutil.NopCloser(nil), + } + }) +} + func (tb *Blob) BlobRef() blob.Ref { h := sha1.New() h.Write([]byte(tb.Contents))