diff --git a/pkg/blobserver/blobpacked/blobpacked_test.go b/pkg/blobserver/blobpacked/blobpacked_test.go index 3d682fb4c..8fa9e5489 100644 --- a/pkg/blobserver/blobpacked/blobpacked_test.go +++ b/pkg/blobserver/blobpacked/blobpacked_test.go @@ -456,7 +456,8 @@ func TestStreamBlobs(t *testing.T) { s.init() all := map[blob.Ref]bool{} - for i := 0; i < 10; i++ { + const nBlobs = 10 + for i := 0; i < nBlobs; i++ { b := &test.Blob{strconv.Itoa(i)} b.MustUpload(t, small) all[b.BlobRef()] = true @@ -482,6 +483,36 @@ func TestStreamBlobs(t *testing.T) { if !reflect.DeepEqual(got, all) { t.Errorf("Got blobs %v; want %v", got, all) } + storagetest.TestStreamer(t, s, storagetest.WantN(nBlobs)) +} + +func TestStreamBlobs_Loose_Enumerate(t *testing.T) { + testStreamBlobsLoose(t, false) +} + +func TestStreamBlobs_Loose_Streamed(t *testing.T) { + testStreamBlobsLoose(t, true) +} + +func testStreamBlobsLoose(t *testing.T, streamed bool) { + var small blobserver.Storage = new(test.Fetcher) + if !streamed { + // Hide the BlobStreamer interface impl. + small = struct{ blobserver.Storage }{small} + + } + s := &storage{ + small: small, + large: new(test.Fetcher), // unused + meta: sorted.NewMemoryKeyValue(), + log: test.NewLogger(t, "blobpacked: "), + } + s.init() + const nBlobs = 10 + for i := 0; i < nBlobs; i++ { + (&test.Blob{strconv.Itoa(i)}).MustUpload(t, small) + } + storagetest.TestStreamer(t, s, storagetest.WantN(nBlobs)) } func TestForeachZipBlob(t *testing.T) { diff --git a/pkg/blobserver/blobpacked/stream.go b/pkg/blobserver/blobpacked/stream.go index 8bcc47be8..182ccc368 100644 --- a/pkg/blobserver/blobpacked/stream.go +++ b/pkg/blobserver/blobpacked/stream.go @@ -63,54 +63,3 @@ func (st largeBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobse // TODO(bradfitz): implement return nil } - -// TODO: move some ofthis old pre-NewMultiBlobStreamer code into -// blobserver. in particular, transparently using enumerate for -// BlobStreamer when the storage doesn't support it should be provided -// by the blobserver package. inevitably others will want that. -/* - enumErrc := make(chan error, 1) - go func() { - defer close(sbc) - enumErrc <- blobserver.EnumerateAllFrom(enumCtx, s.small, strings.TrimPrefix(contToken, "after:"), func(sb blob.SizedRef) error { - select { - case sbc <- sb: - return nil - case <-enumDone: - return context.ErrCanceled - } - }) - }() - var sent int64 - var lastRef blob.Ref - for { - sb, ok := <-sbc - if !ok { - break - } - opener := func() types.ReadSeekCloser { - return blob.NewLazyReadSeekCloser(s.small, sb.Ref) - } - select { - case dest <- blob.NewBlob(sb.Ref, sb.Size, opener): - lastRef = sb.Ref - sent += int64(sb.Size) - case <-ctx.Done(): - return "", context.ErrCanceled - } - } - - enumCtx.Cancel() // redundant if sbc was already closed, but harmless. - enumErr := <-enumErrc - if enumErr == nil { - return "l:", nil - } - - // See if we didn't send anything due to enumeration errors. - if sent == 0 { - enumCtx.Cancel() - return "l:", enumErr - } - return "s:after:" + lastRef.String(), nil -} -*/