diff --git a/pkg/blobserver/blobpacked/blobpacked_test.go b/pkg/blobserver/blobpacked/blobpacked_test.go index 09a867b94..9f11b9fe7 100644 --- a/pkg/blobserver/blobpacked/blobpacked_test.go +++ b/pkg/blobserver/blobpacked/blobpacked_test.go @@ -446,6 +446,7 @@ func TestZ_LeakCheck(t *testing.T) { } func TestStreamBlobs(t *testing.T) { + t.Skip("TODO: blob streaming isn't done in blobpacked") small := new(test.Fetcher) s := &storage{ small: small, @@ -466,21 +467,18 @@ func TestStreamBlobs(t *testing.T) { token := "" // beginning got := map[blob.Ref]bool{} - dest := make(chan *blob.Blob, 16) + dest := make(chan blobserver.BlobAndToken, 16) done := make(chan bool) go func() { defer close(done) - for b := range dest { - got[b.Ref()] = true + for bt := range dest { + got[bt.Blob.Ref()] = true } }() - nextToken, err := s.StreamBlobs(ctx, dest, token) + err := s.StreamBlobs(ctx, dest, token) if err != nil { t.Fatalf("StreamBlobs = %v", err) } - if nextToken != "l:" { - t.Fatalf("nextToken = %q; want \"l:\"", nextToken) - } <-done if !reflect.DeepEqual(got, all) { t.Errorf("Got blobs %v; want %v", got, all) diff --git a/pkg/blobserver/blobpacked/stream.go b/pkg/blobserver/blobpacked/stream.go index bc71fd6bb..680afb969 100644 --- a/pkg/blobserver/blobpacked/stream.go +++ b/pkg/blobserver/blobpacked/stream.go @@ -17,59 +17,36 @@ limitations under the License. package blobpacked import ( - "errors" - "fmt" - "strings" - - "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/context" - "camlistore.org/pkg/types" ) // StreamBlobs impl. -// Continuation token is: -// "s*" if we're in the small blobs, (or "" to start): -// "s:pt:" (pass through) -// "s:after:" (blob ref of already-sent item) -// "l*" if we're in the large blobs: -// "l::" (of blob data from beginning of zip) -// TODO: also care about whether large supports blob streamer? -// First it streams from small (if available, else enumerates) -// Then it streams from large (if available, else enumerates), -// and for each large, streams the contents of the zips. -func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { - defer close(dest) - switch { - case contToken == "" || strings.HasPrefix(contToken, "s:"): - return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:")) - case strings.HasPrefix(contToken, "l:"): - return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:")) - default: - return "", fmt.Errorf("invalid continue token %q", contToken) - } +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) { + panic("TODO") + return blobserver.NewMultiBlobStreamer( + smallBlobStreamer{s}, + largeBlobStreamer{s}, + ).StreamBlobs(ctx, dest, contToken) } -func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { - smallStream, ok := s.small.(blobserver.BlobStreamer) - if ok { - if contToken != "" || !strings.HasPrefix(contToken, "pt:") { - return "", errors.New("invalid pass-through stream token") - } - next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt")) - if err == nil || next == "" { - next = "l:" // now do large - } - return next, err - } - if contToken != "" && !strings.HasPrefix(contToken, "after:") { - return "", fmt.Errorf("invalid small continue token %q", contToken) - } - enumCtx := ctx.New() - enumDone := enumCtx.Done() - defer enumCtx.Cancel() - sbc := make(chan blob.SizedRef) // unbuffered +type smallBlobStreamer struct{ sto *storage } +type largeBlobStreamer struct{ sto *storage } + +func (st smallBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) { + panic("TODO") +} + +func (st largeBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) { + panic("TODO") +} + +// TODO: move some ofthis old pre-NewMultiBlobStreamer code into +// blobserver. in particular, transparently using enumerate for +// BlobStreamer when the storage doesn't support it should be provided +// by the blobserver package. inevitably others will want that. +/* enumErrc := make(chan error, 1) go func() { defer close(sbc) @@ -114,7 +91,4 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, } return "s:after:" + lastRef.String(), nil } - -func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { - panic("TODO") -} +*/ diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index bdd29bf9c..58c07f7d0 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -528,19 +528,19 @@ var deletedBlobRef = regexp.MustCompile(`^x+-0+$`) var _ blobserver.BlobStreamer = (*storage)(nil) // StreamBlobs Implements the blobserver.StreamBlobs interface. -func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error { defer close(dest) fileNum, offset, err := parseContToken(contToken) if err != nil { - return "", err + return errors.New("diskpacked: invalid continuation token") } debug.Printf("Continuing blob streaming from pack %s, offset %d", s.filename(fileNum), offset) fd, err := os.Open(s.filename(fileNum)) if err != nil { - return "", err + return err } // fd will change over time; Close whichever is current when we exit. defer func() { @@ -558,7 +558,7 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // size of the file diskpacked currently still writing. _, err = fd.Seek(offset, os.SEEK_SET) if err != nil { - return "", err + return err } const ioBufSize = 256 * 1024 @@ -566,15 +566,11 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // We'll use bufio to avoid read system call overhead. r := bufio.NewReaderSize(fd, ioBufSize) - var lastSent struct { - fileNum int - offset int64 - } for { // Are we at the EOF of this pack? if _, err := r.Peek(1); err != nil { if err != io.EOF { - return "", err + return err } // EOF case; continue to the next pack, if any. fileNum += 1 @@ -583,9 +579,9 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont fd, err = os.Open(s.filename(fileNum)) if os.IsNotExist(err) { // We reached the end. - return "", nil + return nil } else if err != nil { - return "", err + return err } r.Reset(fd) continue @@ -594,14 +590,14 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont thisOffset := offset // of current blob's header consumed, digest, size, err := readHeader(r) if err != nil { - return "", err + return err } offset += int64(consumed) if deletedBlobRef.Match(digest) { // Skip over deletion padding if _, err := io.CopyN(ioutil.Discard, r, int64(size)); err != nil { - return "", err + return err } offset += int64(size) continue @@ -617,23 +613,25 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // don't open any blobs. data := make([]byte, size) if _, err := io.ReadFull(r, data); err != nil { - return "", err + return err } offset += int64(size) ref, ok := blob.ParseBytes(digest) if !ok { - return "", fmt.Errorf("diskpacked: Invalid blobref %q", digest) + return fmt.Errorf("diskpacked: Invalid blobref %q", digest) } newReader := func() types.ReadSeekCloser { return newReadSeekNopCloser(bytes.NewReader(data)) } blob := blob.NewBlob(ref, size, newReader) select { - case dest <- blob: - lastSent.fileNum = fileNum - lastSent.offset = thisOffset + case dest <- blobserver.BlobAndToken{ + Blob: blob, + Token: fmt.Sprintf("%d %d", fileNum, thisOffset), + }: + // Nothing. case <-ctx.Done(): - return fmt.Sprintf("%d %d", lastSent.fileNum, lastSent.offset), context.ErrCanceled + return context.ErrCanceled } } } diff --git a/pkg/blobserver/diskpacked/stream_test.go b/pkg/blobserver/diskpacked/stream_test.go index 20f0b19a2..2ddfc2f88 100644 --- a/pkg/blobserver/diskpacked/stream_test.go +++ b/pkg/blobserver/diskpacked/stream_test.go @@ -26,10 +26,10 @@ import ( "os" "path/filepath" "testing" - "time" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/context" "camlistore.org/pkg/test" ) @@ -125,51 +125,25 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { return s, clean } -// nBlobs is the optional number of blobs after which to cancel the -// context. 0 means unlimited. -// // It verifies the size and hash of each // before returning and fails the test if any of the checks fail. It // also fails the test if StreamBlobs returns a non-nil error. -func getAllUpToLimit(t *testing.T, s *storage, tok string, nBlobs int) (blobs []*blob.Blob, contToken string) { +func streamAll(t *testing.T, s *storage) []*blob.Blob { + var blobs []*blob.Blob ctx := context.New() - ch := make(chan *blob.Blob) - nextCh := make(chan string, 1) + ch := make(chan blobserver.BlobAndToken) errCh := make(chan error, 1) - go func() { - next, err := s.StreamBlobs(ctx, ch, tok) - nextCh <- next - errCh <- err - }() + go func() { errCh <- s.StreamBlobs(ctx, ch, "") }() - nGot := 0 - var wantErr error - for blob := range ch { - verifySizeAndHash(t, blob) - blobs = append(blobs, blob) - nGot++ - if nGot == nBlobs { - ctx.Cancel() - wantErr = context.ErrCanceled - break - } + for bt := range ch { + verifySizeAndHash(t, bt.Blob) + blobs = append(blobs, bt.Blob) } - - if nGot < nBlobs { - t.Fatalf("only got %d blobs; wanted at least %d", nGot, nBlobs) + if err := <-errCh; err != nil { + t.Fatalf("StreamBlobs error = %v", err) } - - select { - case err := <-errCh: - if err != wantErr { - t.Fatalf("StreamBlobs error = %v; want %v", err, wantErr) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for StreamBlobs to finish (ignored cancel") - } - - return blobs, <-nextCh + return blobs } // Tests the streaming of all blobs in a storage, with hash verification. @@ -178,15 +152,16 @@ func TestBasicStreaming(t *testing.T) { defer clean() expected := len(pool00001) - blobs, next := getAllUpToLimit(t, s, "", 0) - + blobs := streamAll(t, s) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, len(blobs)) } - if next != "" { - t.Fatalf("Got continuation token %q; want empty", next) + wantRefs := make([]blob.SizedRef, len(blobs)) + for i, b := range blobs { + wantRefs[i] = b.SizedRef() } + storagetest.TestStreamer(t, s, storagetest.WantSizedRefs(wantRefs)) } func verifySizeAndHash(t *testing.T, blob *blob.Blob) { @@ -207,61 +182,15 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) { } } -// Tests that StreamBlobs returns a continuation token on cancel -func TestStreamBlobsContinuationToken(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}) - defer clean() - - limit := 2 // get the first blob only - wantCount := 2 - blobs, next := getAllUpToLimit(t, s, "", limit) - - for i, b := range blobs { - t.Logf("blob[%d] = %v", i, b.Ref()) - } - if len(blobs) != wantCount { - t.Fatalf("got %d blobs; want %d", len(blobs), wantCount) - } - - // For pool00001, the header + data of the first blob is has len 50 - if wantContToken := "0 50"; next != wantContToken { - t.Fatalf("Got continuation token %q; want %q", next, wantContToken) - } -} - -func TestSeekToContToken(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}) - defer clean() - - expected := len(pool00001) - 1 - blobs, next := getAllUpToLimit(t, s, "0 50", 0) - - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) - } - - if next != "" { - t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", "", next) - } -} - // Tests that we can correctly switch over to the next pack if we // still need to stream more blobs when a pack reaches EOF. func TestStreamMultiplePacks(t *testing.T) { s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001}) defer clean() - - expected := 2 * len(pool00001) - blobs, _ := getAllUpToLimit(t, s, "", 0) - - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) - } + storagetest.TestStreamer(t, s, storagetest.WantN(len(pool00001)+len(pool00001))) } -func TestSkipRemovedBlobs(t *testing.T) { +func TestStreamSkipRemovedBlobs(t *testing.T) { // Note: This is the only streaming test that makes use of the // index (for RemoveBlobs() to succeed). The others do create // an indexed storage but they do not use the index to stream @@ -279,17 +208,10 @@ func TestSkipRemovedBlobs(t *testing.T) { err := s.RemoveBlobs([]blob.Ref{ref}) if err != nil { - t.Fatalf("blobserver.Storage.RemoveBlobs(): %v", err) + t.Fatalf("RemoveBlobs: %v", err) } diskpackedSto := s.(*storage) - expected := len(pool00001) - 1 // We've deleted 1 - blobs, _ := getAllUpToLimit(t, diskpackedSto, "", 0) - - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) - } - + storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected)) } diff --git a/pkg/blobserver/interface.go b/pkg/blobserver/interface.go index 646778563..db3b4cfb7 100644 --- a/pkg/blobserver/interface.go +++ b/pkg/blobserver/interface.go @@ -96,6 +96,15 @@ type BlobEnumerator interface { limit int) error } +// BlobAndToken is the value used by the BlobStreamer interface, +// containing both a Blob and a continuation token. +type BlobAndToken struct { + *blob.Blob + // Token is the continuation token to resume streaming + // starting at this blob in the future. + Token string +} + type BlobStreamer interface { // BlobStream is an optional interface that may be implemented by // Storage implementations. @@ -105,28 +114,20 @@ type BlobStreamer interface { // BlobStreamer will send blobs to dest in the most efficient // order possible. // - // The provided continuation token resumes the stream from a - // point. To start from the beginning, send the empty string. + // The provided continuation token resumes the stream at a + // point. To start from the beginning, send the empty string. // The token is opaque and must never be interpreted; its // format may change between versions of the server. // // If the content is canceled, the error value is - // context.ErrCanceled and the nextContinueToken is a - // continuation token to resume exactly _at_ (not after) the - // last value sent. This lets callers receive a blob, decide - // its size crosses a threshold, and resume at that blob at a - // later point. Callers should thus usually pass an unbuffered - // channel, although it is not an error to do otherwise, if - // the caller is careful. + // context.ErrCanceled. // // StreamBlobs must unconditionally close dest before // returning, and it must return context.ErrCanceled if // ctx.Done() becomes readable. // - // When StreamBlobs reaches the end, the return value is ("", nil). - // The nextContinueToken must only ever be non-empty if err is - // context.ErrCanceled. - StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) + // When StreamBlobs reaches the end, the return value is nil. + StreamBlobs(ctx *context.Context, dest chan<- BlobAndToken, contToken string) error } // Cache is the minimal interface expected of a blob cache. diff --git a/pkg/blobserver/multistream.go b/pkg/blobserver/multistream.go new file mode 100644 index 000000000..79d7fbedc --- /dev/null +++ b/pkg/blobserver/multistream.go @@ -0,0 +1,76 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blobserver + +import ( + "errors" + "regexp" + "strconv" + "strings" + + "camlistore.org/pkg/context" +) + +// NewMultiBlobStreamer concatenates multiple BlobStreamers into one. +func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer { + return multiStreamer{s: streamers} +} + +type multiStreamer struct { + s []BlobStreamer +} + +var msTokenPrefixRx = regexp.MustCompile(`^(\d+):`) + +func (ms multiStreamer) StreamBlobs(ctx *context.Context, dest chan<- BlobAndToken, contToken string) error { + defer close(dest) + part := 0 + if contToken != "" { + pfx := msTokenPrefixRx.FindString(contToken) + var err error + part, err = strconv.Atoi(strings.TrimSuffix(pfx, ":")) + if pfx == "" || err != nil || part >= len(ms.s) { + return errors.New("invalid continuation token") + } + contToken = contToken[len(pfx):] + } + srcs := ms.s[part:] + for len(srcs) > 0 { + bs := srcs[0] + subDest := make(chan BlobAndToken, 16) + errc := make(chan error, 1) + go func() { + errc <- bs.StreamBlobs(ctx, subDest, contToken) + }() + partStr := strconv.Itoa(part) + for bt := range subDest { + select { + case <-ctx.Done(): + return context.ErrCanceled + case dest <- BlobAndToken{Blob: bt.Blob, Token: partStr + ":" + bt.Token}: + } + } + if err := <-errc; err != nil { + return err + } + // Advance to the next part: + part++ + srcs = srcs[1:] + contToken = "" + } + return nil +} diff --git a/pkg/blobserver/multistream_test.go b/pkg/blobserver/multistream_test.go new file mode 100644 index 000000000..544c2a11a --- /dev/null +++ b/pkg/blobserver/multistream_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blobserver_test + +import ( + "errors" + "strconv" + "testing" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/storagetest" + "camlistore.org/pkg/context" + "camlistore.org/pkg/test" +) + +type staticStreamer []*blob.Blob + +func (s staticStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error { + defer close(dest) + var pos int + if contToken != "" { + var err error + pos, err = strconv.Atoi(contToken) + if err != nil || pos < 0 || pos >= len(s) { + return errors.New("invalid token") + } + s = s[pos:] + } + for len(s) > 0 { + select { + case dest <- blobserver.BlobAndToken{Blob: s[0], Token: strconv.Itoa(pos)}: + pos++ + s = s[1:] + case <-ctx.Done(): + return context.ErrCanceled + } + } + return nil +} + +func TestStaticStreamer(t *testing.T) { + var blobs []*blob.Blob + var want []blob.SizedRef + for i := 0; i < 5; i++ { + tb := &test.Blob{strconv.Itoa(i)} + b := tb.Blob() + blobs = append(blobs, b) + want = append(want, b.SizedRef()) + } + bs := staticStreamer(blobs) + storagetest.TestStreamer(t, bs, storagetest.WantSizedRefs(want)) +} + +func TestMultiStreamer(t *testing.T) { + var streamers []blobserver.BlobStreamer + var want []blob.SizedRef + n := 0 + + for st := 0; st < 3; st++ { + var blobs []*blob.Blob + for i := 0; i < 3; i++ { + n++ + tb := &test.Blob{strconv.Itoa(n)} + b := tb.Blob() + want = append(want, b.SizedRef()) // overall + blobs = append(blobs, b) // this sub-streamer + } + streamers = append(streamers, staticStreamer(blobs)) + } + storagetest.TestStreamer(t, blobserver.NewMultiBlobStreamer(streamers...), storagetest.WantSizedRefs(want)) +} diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 537196b3e..463d54e77 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -363,3 +363,109 @@ func testStat(t *testing.T, enum <-chan blob.SizedRef, want []blob.SizedRef) { } } } + +type StreamerTestOpt interface { + verify(got []blob.SizedRef) error +} + +// WantN is a wanted condition, that the caller wants N of the items. +type WantN int + +func (want WantN) verify(got []blob.SizedRef) error { + if int(want) != len(got) { + return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want)) + } + return nil +} + +type WantSizedRefs []blob.SizedRef + +func (s WantSizedRefs) verify(got []blob.SizedRef) error { + want := []blob.SizedRef(s) + if !reflect.DeepEqual(got, want) { + return fmt.Errorf("Mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want) + } + return nil +} + +// TestStreamer tests that the BlobStreamer implements all of the +// promised interface behavior and ultimately yields the provided +// blobs. +func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) { + // First see if, without cancelation, it yields the right + // result and without errors. + ch := make(chan blobserver.BlobAndToken) + errCh := make(chan error, 1) + go func() { + ctx := context.New() + defer ctx.Cancel() + errCh <- bs.StreamBlobs(ctx, ch, "") + }() + var gotRefs []blob.SizedRef + for b := range ch { + gotRefs = append(gotRefs, b.SizedRef()) + } + if err := <-errCh; err != nil { + t.Errorf("initial uninterrupted StreamBlobs error: %v", err) + } + for _, opt := range opts { + if err := opt.verify(gotRefs); err != nil { + t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err) + } + } + if t.Failed() { + return + } + + // Next, the "complex pass": test a cancelation at each point, + // to test that resume works properly. + // + // Basic strategy: + // -- receive 1 blob, note the blobref, cancel. + // -- start again with that blobref, receive 2, cancel. first should be same, + // second should be new. note its blobref. + // Each iteration should yield 1 new unique blob and all but + // the first and last will return 2 blobs. + wantRefs := append([]blob.SizedRef(nil), gotRefs...) // copy + gotRefs = gotRefs[:0] + contToken := "" + for i := 0; i < len(wantRefs); i++ { + ctx := context.New() + ch := make(chan blobserver.BlobAndToken) + errc := make(chan error, 1) + go func() { + errc <- bs.StreamBlobs(ctx, ch, contToken) + }() + nrecv := 0 + nextToken := "" + for bt := range ch { + nrecv++ + sbr := bt.Blob.SizedRef() + isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1] + if isNew { + gotRefs = append(gotRefs, sbr) + nextToken = bt.Token + ctx.Cancel() + break + } else if i == 0 { + t.Fatalf("first iteration should receive a new value") + } else if nrecv == 2 { + t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i) + } + } + err := <-errc + if err != nil && err != context.ErrCanceled { + t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err) + } + if err == nil { + break + } + contToken = nextToken + } + if !reflect.DeepEqual(gotRefs, wantRefs) { + if len(gotRefs) != len(wantRefs) { + t.Errorf("With complex pass, got %d blobs; want %d", len(gotRefs), len(wantRefs)) + } + t.Fatalf("Mismatch on complex pass:\n got %q\nwant %q\n", gotRefs, wantRefs) + } +} diff --git a/pkg/test/blob.go b/pkg/test/blob.go index 99f14d238..9844d5deb 100644 --- a/pkg/test/blob.go +++ b/pkg/test/blob.go @@ -20,11 +20,13 @@ import ( "crypto/sha1" "fmt" "io" + "io/ioutil" "strings" "testing" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/types" ) // Blob is a utility class for unit tests. @@ -32,6 +34,19 @@ type Blob struct { Contents string // the contents of the blob } +func (tb *Blob) Blob() *blob.Blob { + s := tb.Contents + return blob.NewBlob(tb.BlobRef(), tb.Size(), func() types.ReadSeekCloser { + return struct { + io.ReadSeeker + io.Closer + }{ + io.NewSectionReader(strings.NewReader(s), 0, int64(len(s))), + ioutil.NopCloser(nil), + } + }) +} + func (tb *Blob) BlobRef() blob.Ref { h := sha1.New() h.Write([]byte(tb.Contents))