From 37001fa359421be280ebb812036e08a0c8d49510 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 21 Dec 2014 21:53:53 +1300 Subject: [PATCH] blobserver: simplify BlobStreamer interface, remove limitBytes The caller can count on its own and cancel the context. Also, I realize again that there's no HTTP interface for this yet, since I didn't need to update one. Change-Id: Ie3129bdb0dbc977c1803f18288e4d1e52e2d8478 --- TODO | 21 ++- pkg/blobserver/blobpacked/blobpacked_test.go | 2 +- pkg/blobserver/blobpacked/stream.go | 14 +- pkg/blobserver/diskpacked/diskpacked.go | 178 ++++++++++--------- pkg/blobserver/diskpacked/diskpacked_test.go | 45 ++++- pkg/blobserver/diskpacked/stream_test.go | 87 +++++---- pkg/blobserver/interface.go | 36 ++-- 7 files changed, 234 insertions(+), 149 deletions(-) diff --git a/TODO b/TODO index f8ceac431..419260844 100644 --- a/TODO +++ b/TODO @@ -4,18 +4,23 @@ There are two TODO lists. This file (good for airplanes) and the online bug trac Offline list: --- reindexing: - * review blob streaming interface & diskpacked implementation thereof. - notably: should the continuation token come per-blob? then we could - remove the limitBytes parameter too, and just let the caller cancel when - they want. - * add streaming interface to localdisk? maybe, even though not ideal, but - really: migrate my personal instance from localdisk to diskpacked - * add blobserver.EnumerateAllUnsorted. +-- add HTTP handler for blobstreamer. stream a tar file? where to put + continuation token? special file after each tar entry? special file + at the end? HTTP Trailers? (but nobody supports them) +-- reindexing: + * add streaming interface to localdisk? maybe, even though not ideal, but + really: migrate my personal instance from localdisk to blobpacked + + maybe diskpacked for loose blobs? start by migrating to blobpacked and + measuring size of loose. + * add blobserver.EnumerateAllUnsorted (which could use StreamBlobs + if available, else use EnumerateAll, else maybe even use a new + interface method that goes forever and can't resume at a point, + but can be canceled, and localdisk could implement that at least) * add buffered sorted.KeyValue implementation: a memory one (of configurable max size) in front of a real disk one. add a Flush method to it. also Flush when memory gets big enough. + In progress: pkg/sorted/buffer -- stop using the "cond" blob router storage type in genconfig, as well as the /bs-and-index/ "replica" storage type, and just let the diff --git a/pkg/blobserver/blobpacked/blobpacked_test.go b/pkg/blobserver/blobpacked/blobpacked_test.go index f11d69e1f..09a867b94 100644 --- a/pkg/blobserver/blobpacked/blobpacked_test.go +++ b/pkg/blobserver/blobpacked/blobpacked_test.go @@ -474,7 +474,7 @@ func TestStreamBlobs(t *testing.T) { got[b.Ref()] = true } }() - nextToken, err := s.StreamBlobs(ctx, dest, token, 1<<63-1) + nextToken, err := s.StreamBlobs(ctx, dest, token) if err != nil { t.Fatalf("StreamBlobs = %v", err) } diff --git a/pkg/blobserver/blobpacked/stream.go b/pkg/blobserver/blobpacked/stream.go index 869f66511..bc71fd6bb 100644 --- a/pkg/blobserver/blobpacked/stream.go +++ b/pkg/blobserver/blobpacked/stream.go @@ -39,25 +39,25 @@ import ( // First it streams from small (if available, else enumerates) // Then it streams from large (if available, else enumerates), // and for each large, streams the contents of the zips. -func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) { +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { defer close(dest) switch { case contToken == "" || strings.HasPrefix(contToken, "s:"): - return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:"), limitBytes) + return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:")) case strings.HasPrefix(contToken, "l:"): - return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:"), limitBytes) + return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:")) default: return "", fmt.Errorf("invalid continue token %q", contToken) } } -func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) { +func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { smallStream, ok := s.small.(blobserver.BlobStreamer) if ok { if contToken != "" || !strings.HasPrefix(contToken, "pt:") { return "", errors.New("invalid pass-through stream token") } - next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt"), limitBytes) + next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt")) if err == nil || next == "" { next = "l:" // now do large } @@ -84,7 +84,7 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, }() var sent int64 var lastRef blob.Ref - for sent < limitBytes { + for { sb, ok := <-sbc if !ok { break @@ -115,6 +115,6 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, return "s:after:" + lastRef.String(), nil } -func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) { +func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { panic("TODO") } diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index 4b5b9941c..bdd29bf9c 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -53,6 +53,7 @@ import ( "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/sorted" + "camlistore.org/pkg/strutil" "camlistore.org/pkg/syncutil" "camlistore.org/pkg/types" "camlistore.org/third_party/github.com/camlistore/lock" @@ -84,13 +85,13 @@ type storage struct { writeLock io.Closer // Provided by lock.Lock, and guards other processes from accesing the file open for writes. + *local.Generationer + mu sync.Mutex // Guards all I/O state. closed bool writer *os.File fds []*os.File size int64 - - *local.Generationer } func (s *storage) String() string { @@ -489,17 +490,24 @@ func parseContToken(token string) (pack int, offset int64, err error) { return } -func readHeader(r io.Reader) (digest string, size uint32, err error) { - _, err = fmt.Fscanf(r, "[%s %d]", &digest, &size) - - return -} - -func headerLength(digest string, size uint32) int { - // Assumes that the size in the header is always in base-10 - // format, and also that precisely one space separates the - // digest and the size. - return len(fmt.Sprintf("[%s %d]", digest, size)) +// readHeader parses "[sha1-fooooo 1234]" from r and returns the +// number of bytes read (including the starting '[' and ending ']'), +// the blobref bytes (not necessarily valid) and the number as a +// uint32. +// The consumed count returned is only valid if err == nil. +// The returned digest slice is only valid until the next read from br. +func readHeader(br *bufio.Reader) (consumed int, digest []byte, size uint32, err error) { + line, err := br.ReadSlice(']') + if err != nil { + return + } + const minSize = len("[b-c 0]") + sp := bytes.IndexByte(line, ' ') + size64, err := strutil.ParseUintBytes(line[sp+1:len(line)-1], 10, 32) + if len(line) < minSize || line[0] != '[' || line[len(line)-1] != ']' || sp < 0 || err != nil { + return 0, nil, 0, errors.New("diskpacked: invalid header reader") + } + return len(line), line[1:sp], uint32(size64), nil } // Type readSeekNopCloser is an io.ReadSeeker with a no-op Close method. @@ -517,27 +525,40 @@ func newReadSeekNopCloser(rs io.ReadSeeker) types.ReadSeekCloser { // set to all 'x', the hash value is all '0', and has the correct size. var deletedBlobRef = regexp.MustCompile(`^x+-0+$`) +var _ blobserver.BlobStreamer = (*storage)(nil) + // StreamBlobs Implements the blobserver.StreamBlobs interface. -func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) { +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) { defer close(dest) - i, offset, err := parseContToken(contToken) + fileNum, offset, err := parseContToken(contToken) if err != nil { - return + return "", err } debug.Printf("Continuing blob streaming from pack %s, offset %d", - s.filename(i), offset) + s.filename(fileNum), offset) - fd, err := os.Open(s.filename(i)) + fd, err := os.Open(s.filename(fileNum)) if err != nil { - return + return "", err } - defer fd.Close() + // fd will change over time; Close whichever is current when we exit. + defer func() { + if fd != nil { // may be nil on os.Open error below + fd.Close() + } + }() - // ContToken always refers to the exact next place we will read from + // ContToken always refers to the exact next place we will read from. + // Note that seeking past the end is legal on Unix and for io.Seeker, + // but that will just result in a mostly harmless EOF. + // + // TODO: probably be stricter here and don't allow seek past + // the end, since we know the size of closed files and the + // size of the file diskpacked currently still writing. _, err = fd.Seek(offset, os.SEEK_SET) if err != nil { - return + return "", err } const ioBufSize = 256 * 1024 @@ -545,87 +566,76 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont // We'll use bufio to avoid read system call overhead. r := bufio.NewReaderSize(fd, ioBufSize) - var offsetToAdd int64 = 0 - var sent int64 = 0 - setNextContToken := func() { - nextContinueToken = fmt.Sprintf("%d %d", i, offset+offsetToAdd) + var lastSent struct { + fileNum int + offset int64 } for { - if sent >= limitBytes { - setNextContToken() - break - } - // Are we at the EOF of this pack? - _, err = r.Peek(1) - if err != nil { - if err == io.EOF { - // Continue to the next pack, if there's any - i += 1 - offset = 0 - offsetToAdd = 0 - fd.Close() // Close the previous pack - fd, err = os.Open(s.filename(i)) - if err != nil { - if os.IsNotExist(err) { - return "", nil - } - return - } - defer fd.Close() - r = bufio.NewReaderSize(fd, ioBufSize) - continue + if _, err := r.Peek(1); err != nil { + if err != io.EOF { + return "", err } - - return - } - - var digest string - var size uint32 - digest, size, err = readHeader(r) - if err != nil { - return - } - - offsetToAdd += int64(headerLength(digest, size)) - if deletedBlobRef.MatchString(digest) { - // Skip over deletion padding - _, err = io.CopyN(ioutil.Discard, r, int64(size)) - if err != nil { - return + // EOF case; continue to the next pack, if any. + fileNum += 1 + offset = 0 + fd.Close() // Close the previous pack + fd, err = os.Open(s.filename(fileNum)) + if os.IsNotExist(err) { + // We reached the end. + return "", nil + } else if err != nil { + return "", err } - offsetToAdd += int64(size) + r.Reset(fd) continue } - // Finally, read and send the blob - data := make([]byte, size) - _, err = io.ReadFull(r, data) + thisOffset := offset // of current blob's header + consumed, digest, size, err := readHeader(r) if err != nil { - return + return "", err + } + + offset += int64(consumed) + if deletedBlobRef.Match(digest) { + // Skip over deletion padding + if _, err := io.CopyN(ioutil.Discard, r, int64(size)); err != nil { + return "", err + } + offset += int64(size) + continue + } + + // Finally, read and send the blob. + + // TODO: remove this allocation per blob. We can make one instead + // outside of the loop, guarded by a mutex, and re-use it, only to + // lock the mutex and clone it if somebody actually calls Open + // on the *blob.Blob. Otherwise callers just scanning all the blobs + // to see if they have everything incur lots of garbage if they + // don't open any blobs. + data := make([]byte, size) + if _, err := io.ReadFull(r, data); err != nil { + return "", err + } + offset += int64(size) + ref, ok := blob.ParseBytes(digest) + if !ok { + return "", fmt.Errorf("diskpacked: Invalid blobref %q", digest) } - offsetToAdd += int64(size) newReader := func() types.ReadSeekCloser { return newReadSeekNopCloser(bytes.NewReader(data)) } - ref, ok := blob.Parse(digest) - if !ok { - err = fmt.Errorf("diskpacked: Invalid blobref %s", - digest) - return - } blob := blob.NewBlob(ref, size, newReader) - select { case dest <- blob: - sent += int64(size) + lastSent.fileNum = fileNum + lastSent.offset = thisOffset case <-ctx.Done(): - err = context.ErrCanceled - return + return fmt.Sprintf("%d %d", lastSent.fileNum, lastSent.offset), context.ErrCanceled } } - - return } func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) { diff --git a/pkg/blobserver/diskpacked/diskpacked_test.go b/pkg/blobserver/diskpacked/diskpacked_test.go index 5d22303ff..5661aff9f 100644 --- a/pkg/blobserver/diskpacked/diskpacked_test.go +++ b/pkg/blobserver/diskpacked/diskpacked_test.go @@ -17,6 +17,7 @@ limitations under the License. package diskpacked import ( + "bufio" "errors" "fmt" "io/ioutil" @@ -43,6 +44,7 @@ func newTempDiskpackedMemory(t *testing.T) (sto blobserver.Storage, cleanup func } func newTempDiskpackedWithIndex(t *testing.T, indexConf jsonconfig.Obj) (sto blobserver.Storage, cleanup func()) { + restoreLogging := test.TLog(t) dir, err := ioutil.TempDir("", "diskpacked-test") if err != nil { t.Fatal(err) @@ -54,12 +56,12 @@ func newTempDiskpackedWithIndex(t *testing.T, indexConf jsonconfig.Obj) (sto blo } return s, func() { s.Close() - if camliDebug { t.Logf("CAMLI_DEBUG set, skipping cleanup of dir %q", dir) } else { os.RemoveAll(dir) } + restoreLogging() } } @@ -252,3 +254,44 @@ func (idx *failingIndex) Set(key string, value string) error { } return idx.KeyValue.Set(key, value) } + +func TestReadHeader(t *testing.T) { + tests := []struct { + in string + wantConsumed int + wantDigest string + wantSize uint32 + wantErr bool + }{ + {"[foo-123 234]", 13, "foo-123", 234, false}, + + // Too short: + {in: "", wantErr: true}, + {in: "[", wantErr: true}, + {in: "[]", wantErr: true}, + // Missing brackets: + {in: "[foo-123 234", wantErr: true}, + {in: "foo-123 234]", wantErr: true}, + // non-number in size: + {in: "[foo-123 234x]", wantErr: true}, + // No spce: + {in: "[foo-abcd1234]", wantErr: true}, + } + for _, tt := range tests { + consumed, digest, size, err := readHeader(bufio.NewReader(strings.NewReader(tt.in))) + if tt.wantErr { + if err == nil { + t.Errorf("readHeader(%q) = %d, %q, %v with nil error; but wanted an error", + tt.in, consumed, digest, size) + } + } else if consumed != tt.wantConsumed || + string(digest) != tt.wantDigest || + size != tt.wantSize || + err != nil { + t.Errorf("readHeader(%q) = %d, %q, %v, %v; want %d, %q, %v, nil", + tt.in, + consumed, digest, size, err, + tt.wantConsumed, tt.wantDigest, tt.wantSize) + } + } +} diff --git a/pkg/blobserver/diskpacked/stream_test.go b/pkg/blobserver/diskpacked/stream_test.go index 0d08a1944..20f0b19a2 100644 --- a/pkg/blobserver/diskpacked/stream_test.go +++ b/pkg/blobserver/diskpacked/stream_test.go @@ -26,10 +26,12 @@ import ( "os" "path/filepath" "testing" + "time" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/context" + "camlistore.org/pkg/test" ) type blobDetails struct { @@ -41,6 +43,7 @@ type pack struct { blobs []blobDetails } +// TODO: why is this named pool00001? (--bradfitz) var pool00001 = []blobDetails{ {"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"}, {"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"}, @@ -99,6 +102,7 @@ func writePack(t *testing.T, dir string, i int, p pack) { } func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { + restoreLogging := test.TLog(t) dir, err := ioutil.TempDir("", "diskpacked-test") if err != nil { t.Fatal(err) @@ -116,43 +120,56 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { clean = func() { s.Close() os.RemoveAll(dir) + restoreLogging() } - - return + return s, clean } -// Streams all blobs until the total size of the blobs transfered -// equals or exceeds limit (in bytes) or the storage runs out of -// blobs, and returns them. It verifies the size and hash of each +// nBlobs is the optional number of blobs after which to cancel the +// context. 0 means unlimited. +// +// It verifies the size and hash of each // before returning and fails the test if any of the checks fail. It // also fails the test if StreamBlobs returns a non-nil error. -func getAllUpToLimit(t *testing.T, s *storage, tok string, limit int64) (blobs []*blob.Blob, contToken string) { +func getAllUpToLimit(t *testing.T, s *storage, tok string, nBlobs int) (blobs []*blob.Blob, contToken string) { ctx := context.New() ch := make(chan *blob.Blob) nextCh := make(chan string, 1) errCh := make(chan error, 1) go func() { - next, err := s.StreamBlobs(ctx, ch, tok, limit) - + next, err := s.StreamBlobs(ctx, ch, tok) nextCh <- next errCh <- err }() - blobs = make([]*blob.Blob, 0, 32) + nGot := 0 + var wantErr error for blob := range ch { verifySizeAndHash(t, blob) blobs = append(blobs, blob) + nGot++ + if nGot == nBlobs { + ctx.Cancel() + wantErr = context.ErrCanceled + break + } } - contToken = <-nextCh - - if err := <-errCh; err != nil { - t.Fatal(err) + if nGot < nBlobs { + t.Fatalf("only got %d blobs; wanted at least %d", nGot, nBlobs) } - return + select { + case err := <-errCh: + if err != wantErr { + t.Fatalf("StreamBlobs error = %v; want %v", err, wantErr) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for StreamBlobs to finish (ignored cancel") + } + return blobs, <-nextCh } // Tests the streaming of all blobs in a storage, with hash verification. @@ -160,19 +177,16 @@ func TestBasicStreaming(t *testing.T) { s, clean := newTestStorage(t, pack{pool00001}) defer clean() - limit := int64(999999) expected := len(pool00001) - blobs, next := getAllUpToLimit(t, s, "", limit) + blobs, next := getAllUpToLimit(t, s, "", 0) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, len(blobs)) } - if next != "" { - t.Fatalf("Expected empty continuation token, got: %s", next) + t.Fatalf("Got continuation token %q; want empty", next) } - } func verifySizeAndHash(t *testing.T, blob *blob.Blob) { @@ -185,33 +199,33 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) { r.Close() if uint32(n) != blob.Size() { - t.Fatalf("Wrong blob size. Expected %d, got %d", - blob.Size(), n) + t.Fatalf("read %d bytes from blob %v; want %v", n, blob.Ref(), blob.Size()) } if !blob.SizedRef().HashMatches(hash) { - t.Fatal("Blob has wrong digest") + t.Fatalf("read wrong bytes from blobref %v (digest mismatch)", blob.Ref()) } } -// Tests that StreamBlobs respects the byte limit (limitBytes) -func TestLimitBytes(t *testing.T) { +// Tests that StreamBlobs returns a continuation token on cancel +func TestStreamBlobsContinuationToken(t *testing.T) { s, clean := newTestStorage(t, pack{pool00001}) defer clean() - limit := int64(1) // This should cause us to get only the 1st blob. - expected := 1 + limit := 2 // get the first blob only + wantCount := 2 blobs, next := getAllUpToLimit(t, s, "", limit) - if len(blobs) != expected { - t.Fatalf("Wrong blob count: Expected %d, got %d", expected, - len(blobs)) + for i, b := range blobs { + t.Logf("blob[%d] = %v", i, b.Ref()) + } + if len(blobs) != wantCount { + t.Fatalf("got %d blobs; want %d", len(blobs), wantCount) } // For pool00001, the header + data of the first blob is has len 50 - expectedContToken := "0 50" - if next != expectedContToken { - t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", expectedContToken, next) + if wantContToken := "0 50"; next != wantContToken { + t.Fatalf("Got continuation token %q; want %q", next, wantContToken) } } @@ -219,9 +233,8 @@ func TestSeekToContToken(t *testing.T) { s, clean := newTestStorage(t, pack{pool00001}) defer clean() - limit := int64(999999) expected := len(pool00001) - 1 - blobs, next := getAllUpToLimit(t, s, "0 50", limit) + blobs, next := getAllUpToLimit(t, s, "0 50", 0) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, @@ -239,9 +252,8 @@ func TestStreamMultiplePacks(t *testing.T) { s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001}) defer clean() - limit := int64(999999) expected := 2 * len(pool00001) - blobs, _ := getAllUpToLimit(t, s, "", limit) + blobs, _ := getAllUpToLimit(t, s, "", 0) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, @@ -272,9 +284,8 @@ func TestSkipRemovedBlobs(t *testing.T) { diskpackedSto := s.(*storage) - limit := int64(999999) expected := len(pool00001) - 1 // We've deleted 1 - blobs, _ := getAllUpToLimit(t, diskpackedSto, "", limit) + blobs, _ := getAllUpToLimit(t, diskpackedSto, "", 0) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, diff --git a/pkg/blobserver/interface.go b/pkg/blobserver/interface.go index 6f730b740..646778563 100644 --- a/pkg/blobserver/interface.go +++ b/pkg/blobserver/interface.go @@ -97,20 +97,36 @@ type BlobEnumerator interface { } type BlobStreamer interface { - // StreamBlobs sends blobs to dest in unspecified order. It is + // BlobStream is an optional interface that may be implemented by + // Storage implementations. + // + // StreamBlobs sends blobs to dest in an unspecified order. It is // expected that a Storage implementation implementing // BlobStreamer will send blobs to dest in the most efficient - // order possible. StreamBlobs will stop sending blobs to dest - // and return an opaque continuation token (in the string - // return parameter) when the total size of the blobs it has - // sent equals or exceeds limit. A succeeding call to - // StreamBlobs should pass the string returned from the - // previous call in contToken, or an empty string if the - // caller wishes to receive blobs from "the - // start". StreamBlobs must unconditionally close dest before + // order possible. + // + // The provided continuation token resumes the stream from a + // point. To start from the beginning, send the empty string. + // The token is opaque and must never be interpreted; its + // format may change between versions of the server. + // + // If the content is canceled, the error value is + // context.ErrCanceled and the nextContinueToken is a + // continuation token to resume exactly _at_ (not after) the + // last value sent. This lets callers receive a blob, decide + // its size crosses a threshold, and resume at that blob at a + // later point. Callers should thus usually pass an unbuffered + // channel, although it is not an error to do otherwise, if + // the caller is careful. + // + // StreamBlobs must unconditionally close dest before // returning, and it must return context.ErrCanceled if // ctx.Done() becomes readable. - StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) + // + // When StreamBlobs reaches the end, the return value is ("", nil). + // The nextContinueToken must only ever be non-empty if err is + // context.ErrCanceled. + StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) } // Cache is the minimal interface expected of a blob cache.