blobserver: simplify BlobStreamer interface, remove limitBytes

The caller can count on its own and cancel the context.

Also, I realize again that there's no HTTP interface for this yet, since
I didn't need to update one.

Change-Id: Ie3129bdb0dbc977c1803f18288e4d1e52e2d8478
This commit is contained in:
Brad Fitzpatrick 2014-12-21 21:53:53 +13:00
parent 4436452c68
commit 37001fa359
7 changed files with 234 additions and 149 deletions

21
TODO
View File

@ -4,18 +4,23 @@ There are two TODO lists. This file (good for airplanes) and the online bug trac
Offline list:
-- reindexing:
* review blob streaming interface & diskpacked implementation thereof.
notably: should the continuation token come per-blob? then we could
remove the limitBytes parameter too, and just let the caller cancel when
they want.
* add streaming interface to localdisk? maybe, even though not ideal, but
really: migrate my personal instance from localdisk to diskpacked
* add blobserver.EnumerateAllUnsorted.
-- add HTTP handler for blobstreamer. stream a tar file? where to put
continuation token? special file after each tar entry? special file
at the end? HTTP Trailers? (but nobody supports them)
-- reindexing:
* add streaming interface to localdisk? maybe, even though not ideal, but
really: migrate my personal instance from localdisk to blobpacked +
maybe diskpacked for loose blobs? start by migrating to blobpacked and
measuring size of loose.
* add blobserver.EnumerateAllUnsorted (which could use StreamBlobs
if available, else use EnumerateAll, else maybe even use a new
interface method that goes forever and can't resume at a point,
but can be canceled, and localdisk could implement that at least)
* add buffered sorted.KeyValue implementation: a memory one (of
configurable max size) in front of a real disk one. add a Flush method
to it. also Flush when memory gets big enough.
In progress: pkg/sorted/buffer
-- stop using the "cond" blob router storage type in genconfig, as
well as the /bs-and-index/ "replica" storage type, and just let the

View File

@ -474,7 +474,7 @@ func TestStreamBlobs(t *testing.T) {
got[b.Ref()] = true
}
}()
nextToken, err := s.StreamBlobs(ctx, dest, token, 1<<63-1)
nextToken, err := s.StreamBlobs(ctx, dest, token)
if err != nil {
t.Fatalf("StreamBlobs = %v", err)
}

View File

@ -39,25 +39,25 @@ import (
// First it streams from small (if available, else enumerates)
// Then it streams from large (if available, else enumerates),
// and for each large, streams the contents of the zips.
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) {
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
defer close(dest)
switch {
case contToken == "" || strings.HasPrefix(contToken, "s:"):
return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:"), limitBytes)
return s.streamSmallBlobs(ctx, dest, strings.TrimPrefix(contToken, "s:"))
case strings.HasPrefix(contToken, "l:"):
return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:"), limitBytes)
return s.streamLargeBlobs(ctx, dest, strings.TrimPrefix(contToken, "l:"))
default:
return "", fmt.Errorf("invalid continue token %q", contToken)
}
}
func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) {
func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
smallStream, ok := s.small.(blobserver.BlobStreamer)
if ok {
if contToken != "" || !strings.HasPrefix(contToken, "pt:") {
return "", errors.New("invalid pass-through stream token")
}
next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt"), limitBytes)
next, err := smallStream.StreamBlobs(ctx, dest, strings.TrimPrefix(contToken, "pt"))
if err == nil || next == "" {
next = "l:" // now do large
}
@ -84,7 +84,7 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob,
}()
var sent int64
var lastRef blob.Ref
for sent < limitBytes {
for {
sb, ok := <-sbc
if !ok {
break
@ -115,6 +115,6 @@ func (s *storage) streamSmallBlobs(ctx *context.Context, dest chan<- *blob.Blob,
return "s:after:" + lastRef.String(), nil
}
func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) {
func (s *storage) streamLargeBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
panic("TODO")
}

View File

@ -53,6 +53,7 @@ import (
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/strutil"
"camlistore.org/pkg/syncutil"
"camlistore.org/pkg/types"
"camlistore.org/third_party/github.com/camlistore/lock"
@ -84,13 +85,13 @@ type storage struct {
writeLock io.Closer // Provided by lock.Lock, and guards other processes from accesing the file open for writes.
*local.Generationer
mu sync.Mutex // Guards all I/O state.
closed bool
writer *os.File
fds []*os.File
size int64
*local.Generationer
}
func (s *storage) String() string {
@ -489,17 +490,24 @@ func parseContToken(token string) (pack int, offset int64, err error) {
return
}
func readHeader(r io.Reader) (digest string, size uint32, err error) {
_, err = fmt.Fscanf(r, "[%s %d]", &digest, &size)
return
}
func headerLength(digest string, size uint32) int {
// Assumes that the size in the header is always in base-10
// format, and also that precisely one space separates the
// digest and the size.
return len(fmt.Sprintf("[%s %d]", digest, size))
// readHeader parses "[sha1-fooooo 1234]" from r and returns the
// number of bytes read (including the starting '[' and ending ']'),
// the blobref bytes (not necessarily valid) and the number as a
// uint32.
// The consumed count returned is only valid if err == nil.
// The returned digest slice is only valid until the next read from br.
func readHeader(br *bufio.Reader) (consumed int, digest []byte, size uint32, err error) {
line, err := br.ReadSlice(']')
if err != nil {
return
}
const minSize = len("[b-c 0]")
sp := bytes.IndexByte(line, ' ')
size64, err := strutil.ParseUintBytes(line[sp+1:len(line)-1], 10, 32)
if len(line) < minSize || line[0] != '[' || line[len(line)-1] != ']' || sp < 0 || err != nil {
return 0, nil, 0, errors.New("diskpacked: invalid header reader")
}
return len(line), line[1:sp], uint32(size64), nil
}
// Type readSeekNopCloser is an io.ReadSeeker with a no-op Close method.
@ -517,27 +525,40 @@ func newReadSeekNopCloser(rs io.ReadSeeker) types.ReadSeekCloser {
// set to all 'x', the hash value is all '0', and has the correct size.
var deletedBlobRef = regexp.MustCompile(`^x+-0+$`)
var _ blobserver.BlobStreamer = (*storage)(nil)
// StreamBlobs Implements the blobserver.StreamBlobs interface.
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) {
func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error) {
defer close(dest)
i, offset, err := parseContToken(contToken)
fileNum, offset, err := parseContToken(contToken)
if err != nil {
return
return "", err
}
debug.Printf("Continuing blob streaming from pack %s, offset %d",
s.filename(i), offset)
s.filename(fileNum), offset)
fd, err := os.Open(s.filename(i))
fd, err := os.Open(s.filename(fileNum))
if err != nil {
return
return "", err
}
defer fd.Close()
// fd will change over time; Close whichever is current when we exit.
defer func() {
if fd != nil { // may be nil on os.Open error below
fd.Close()
}
}()
// ContToken always refers to the exact next place we will read from
// ContToken always refers to the exact next place we will read from.
// Note that seeking past the end is legal on Unix and for io.Seeker,
// but that will just result in a mostly harmless EOF.
//
// TODO: probably be stricter here and don't allow seek past
// the end, since we know the size of closed files and the
// size of the file diskpacked currently still writing.
_, err = fd.Seek(offset, os.SEEK_SET)
if err != nil {
return
return "", err
}
const ioBufSize = 256 * 1024
@ -545,87 +566,76 @@ func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, cont
// We'll use bufio to avoid read system call overhead.
r := bufio.NewReaderSize(fd, ioBufSize)
var offsetToAdd int64 = 0
var sent int64 = 0
setNextContToken := func() {
nextContinueToken = fmt.Sprintf("%d %d", i, offset+offsetToAdd)
var lastSent struct {
fileNum int
offset int64
}
for {
if sent >= limitBytes {
setNextContToken()
break
}
// Are we at the EOF of this pack?
_, err = r.Peek(1)
if err != nil {
if err == io.EOF {
// Continue to the next pack, if there's any
i += 1
offset = 0
offsetToAdd = 0
fd.Close() // Close the previous pack
fd, err = os.Open(s.filename(i))
if err != nil {
if os.IsNotExist(err) {
return "", nil
}
return
}
defer fd.Close()
r = bufio.NewReaderSize(fd, ioBufSize)
continue
if _, err := r.Peek(1); err != nil {
if err != io.EOF {
return "", err
}
return
}
var digest string
var size uint32
digest, size, err = readHeader(r)
if err != nil {
return
}
offsetToAdd += int64(headerLength(digest, size))
if deletedBlobRef.MatchString(digest) {
// Skip over deletion padding
_, err = io.CopyN(ioutil.Discard, r, int64(size))
if err != nil {
return
// EOF case; continue to the next pack, if any.
fileNum += 1
offset = 0
fd.Close() // Close the previous pack
fd, err = os.Open(s.filename(fileNum))
if os.IsNotExist(err) {
// We reached the end.
return "", nil
} else if err != nil {
return "", err
}
offsetToAdd += int64(size)
r.Reset(fd)
continue
}
// Finally, read and send the blob
data := make([]byte, size)
_, err = io.ReadFull(r, data)
thisOffset := offset // of current blob's header
consumed, digest, size, err := readHeader(r)
if err != nil {
return
return "", err
}
offset += int64(consumed)
if deletedBlobRef.Match(digest) {
// Skip over deletion padding
if _, err := io.CopyN(ioutil.Discard, r, int64(size)); err != nil {
return "", err
}
offset += int64(size)
continue
}
// Finally, read and send the blob.
// TODO: remove this allocation per blob. We can make one instead
// outside of the loop, guarded by a mutex, and re-use it, only to
// lock the mutex and clone it if somebody actually calls Open
// on the *blob.Blob. Otherwise callers just scanning all the blobs
// to see if they have everything incur lots of garbage if they
// don't open any blobs.
data := make([]byte, size)
if _, err := io.ReadFull(r, data); err != nil {
return "", err
}
offset += int64(size)
ref, ok := blob.ParseBytes(digest)
if !ok {
return "", fmt.Errorf("diskpacked: Invalid blobref %q", digest)
}
offsetToAdd += int64(size)
newReader := func() types.ReadSeekCloser {
return newReadSeekNopCloser(bytes.NewReader(data))
}
ref, ok := blob.Parse(digest)
if !ok {
err = fmt.Errorf("diskpacked: Invalid blobref %s",
digest)
return
}
blob := blob.NewBlob(ref, size, newReader)
select {
case dest <- blob:
sent += int64(size)
lastSent.fileNum = fileNum
lastSent.offset = thisOffset
case <-ctx.Done():
err = context.ErrCanceled
return
return fmt.Sprintf("%d %d", lastSent.fileNum, lastSent.offset), context.ErrCanceled
}
}
return
}
func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package diskpacked
import (
"bufio"
"errors"
"fmt"
"io/ioutil"
@ -43,6 +44,7 @@ func newTempDiskpackedMemory(t *testing.T) (sto blobserver.Storage, cleanup func
}
func newTempDiskpackedWithIndex(t *testing.T, indexConf jsonconfig.Obj) (sto blobserver.Storage, cleanup func()) {
restoreLogging := test.TLog(t)
dir, err := ioutil.TempDir("", "diskpacked-test")
if err != nil {
t.Fatal(err)
@ -54,12 +56,12 @@ func newTempDiskpackedWithIndex(t *testing.T, indexConf jsonconfig.Obj) (sto blo
}
return s, func() {
s.Close()
if camliDebug {
t.Logf("CAMLI_DEBUG set, skipping cleanup of dir %q", dir)
} else {
os.RemoveAll(dir)
}
restoreLogging()
}
}
@ -252,3 +254,44 @@ func (idx *failingIndex) Set(key string, value string) error {
}
return idx.KeyValue.Set(key, value)
}
func TestReadHeader(t *testing.T) {
tests := []struct {
in string
wantConsumed int
wantDigest string
wantSize uint32
wantErr bool
}{
{"[foo-123 234]", 13, "foo-123", 234, false},
// Too short:
{in: "", wantErr: true},
{in: "[", wantErr: true},
{in: "[]", wantErr: true},
// Missing brackets:
{in: "[foo-123 234", wantErr: true},
{in: "foo-123 234]", wantErr: true},
// non-number in size:
{in: "[foo-123 234x]", wantErr: true},
// No spce:
{in: "[foo-abcd1234]", wantErr: true},
}
for _, tt := range tests {
consumed, digest, size, err := readHeader(bufio.NewReader(strings.NewReader(tt.in)))
if tt.wantErr {
if err == nil {
t.Errorf("readHeader(%q) = %d, %q, %v with nil error; but wanted an error",
tt.in, consumed, digest, size)
}
} else if consumed != tt.wantConsumed ||
string(digest) != tt.wantDigest ||
size != tt.wantSize ||
err != nil {
t.Errorf("readHeader(%q) = %d, %q, %v, %v; want %d, %q, %v, nil",
tt.in,
consumed, digest, size, err,
tt.wantConsumed, tt.wantDigest, tt.wantSize)
}
}
}

View File

@ -26,10 +26,12 @@ import (
"os"
"path/filepath"
"testing"
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/test"
)
type blobDetails struct {
@ -41,6 +43,7 @@ type pack struct {
blobs []blobDetails
}
// TODO: why is this named pool00001? (--bradfitz)
var pool00001 = []blobDetails{
{"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"},
{"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"},
@ -99,6 +102,7 @@ func writePack(t *testing.T, dir string, i int, p pack) {
}
func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
restoreLogging := test.TLog(t)
dir, err := ioutil.TempDir("", "diskpacked-test")
if err != nil {
t.Fatal(err)
@ -116,43 +120,56 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
clean = func() {
s.Close()
os.RemoveAll(dir)
restoreLogging()
}
return
return s, clean
}
// Streams all blobs until the total size of the blobs transfered
// equals or exceeds limit (in bytes) or the storage runs out of
// blobs, and returns them. It verifies the size and hash of each
// nBlobs is the optional number of blobs after which to cancel the
// context. 0 means unlimited.
//
// It verifies the size and hash of each
// before returning and fails the test if any of the checks fail. It
// also fails the test if StreamBlobs returns a non-nil error.
func getAllUpToLimit(t *testing.T, s *storage, tok string, limit int64) (blobs []*blob.Blob, contToken string) {
func getAllUpToLimit(t *testing.T, s *storage, tok string, nBlobs int) (blobs []*blob.Blob, contToken string) {
ctx := context.New()
ch := make(chan *blob.Blob)
nextCh := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
next, err := s.StreamBlobs(ctx, ch, tok, limit)
next, err := s.StreamBlobs(ctx, ch, tok)
nextCh <- next
errCh <- err
}()
blobs = make([]*blob.Blob, 0, 32)
nGot := 0
var wantErr error
for blob := range ch {
verifySizeAndHash(t, blob)
blobs = append(blobs, blob)
nGot++
if nGot == nBlobs {
ctx.Cancel()
wantErr = context.ErrCanceled
break
}
}
contToken = <-nextCh
if err := <-errCh; err != nil {
t.Fatal(err)
if nGot < nBlobs {
t.Fatalf("only got %d blobs; wanted at least %d", nGot, nBlobs)
}
return
select {
case err := <-errCh:
if err != wantErr {
t.Fatalf("StreamBlobs error = %v; want %v", err, wantErr)
}
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for StreamBlobs to finish (ignored cancel")
}
return blobs, <-nextCh
}
// Tests the streaming of all blobs in a storage, with hash verification.
@ -160,19 +177,16 @@ func TestBasicStreaming(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001})
defer clean()
limit := int64(999999)
expected := len(pool00001)
blobs, next := getAllUpToLimit(t, s, "", limit)
blobs, next := getAllUpToLimit(t, s, "", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
}
if next != "" {
t.Fatalf("Expected empty continuation token, got: %s", next)
t.Fatalf("Got continuation token %q; want empty", next)
}
}
func verifySizeAndHash(t *testing.T, blob *blob.Blob) {
@ -185,33 +199,33 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) {
r.Close()
if uint32(n) != blob.Size() {
t.Fatalf("Wrong blob size. Expected %d, got %d",
blob.Size(), n)
t.Fatalf("read %d bytes from blob %v; want %v", n, blob.Ref(), blob.Size())
}
if !blob.SizedRef().HashMatches(hash) {
t.Fatal("Blob has wrong digest")
t.Fatalf("read wrong bytes from blobref %v (digest mismatch)", blob.Ref())
}
}
// Tests that StreamBlobs respects the byte limit (limitBytes)
func TestLimitBytes(t *testing.T) {
// Tests that StreamBlobs returns a continuation token on cancel
func TestStreamBlobsContinuationToken(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001})
defer clean()
limit := int64(1) // This should cause us to get only the 1st blob.
expected := 1
limit := 2 // get the first blob only
wantCount := 2
blobs, next := getAllUpToLimit(t, s, "", limit)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
for i, b := range blobs {
t.Logf("blob[%d] = %v", i, b.Ref())
}
if len(blobs) != wantCount {
t.Fatalf("got %d blobs; want %d", len(blobs), wantCount)
}
// For pool00001, the header + data of the first blob is has len 50
expectedContToken := "0 50"
if next != expectedContToken {
t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", expectedContToken, next)
if wantContToken := "0 50"; next != wantContToken {
t.Fatalf("Got continuation token %q; want %q", next, wantContToken)
}
}
@ -219,9 +233,8 @@ func TestSeekToContToken(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001})
defer clean()
limit := int64(999999)
expected := len(pool00001) - 1
blobs, next := getAllUpToLimit(t, s, "0 50", limit)
blobs, next := getAllUpToLimit(t, s, "0 50", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
@ -239,9 +252,8 @@ func TestStreamMultiplePacks(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001})
defer clean()
limit := int64(999999)
expected := 2 * len(pool00001)
blobs, _ := getAllUpToLimit(t, s, "", limit)
blobs, _ := getAllUpToLimit(t, s, "", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
@ -272,9 +284,8 @@ func TestSkipRemovedBlobs(t *testing.T) {
diskpackedSto := s.(*storage)
limit := int64(999999)
expected := len(pool00001) - 1 // We've deleted 1
blobs, _ := getAllUpToLimit(t, diskpackedSto, "", limit)
blobs, _ := getAllUpToLimit(t, diskpackedSto, "", 0)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,

View File

@ -97,20 +97,36 @@ type BlobEnumerator interface {
}
type BlobStreamer interface {
// StreamBlobs sends blobs to dest in unspecified order. It is
// BlobStream is an optional interface that may be implemented by
// Storage implementations.
//
// StreamBlobs sends blobs to dest in an unspecified order. It is
// expected that a Storage implementation implementing
// BlobStreamer will send blobs to dest in the most efficient
// order possible. StreamBlobs will stop sending blobs to dest
// and return an opaque continuation token (in the string
// return parameter) when the total size of the blobs it has
// sent equals or exceeds limit. A succeeding call to
// StreamBlobs should pass the string returned from the
// previous call in contToken, or an empty string if the
// caller wishes to receive blobs from "the
// start". StreamBlobs must unconditionally close dest before
// order possible.
//
// The provided continuation token resumes the stream from a
// point. To start from the beginning, send the empty string.
// The token is opaque and must never be interpreted; its
// format may change between versions of the server.
//
// If the content is canceled, the error value is
// context.ErrCanceled and the nextContinueToken is a
// continuation token to resume exactly _at_ (not after) the
// last value sent. This lets callers receive a blob, decide
// its size crosses a threshold, and resume at that blob at a
// later point. Callers should thus usually pass an unbuffered
// channel, although it is not an error to do otherwise, if
// the caller is careful.
//
// StreamBlobs must unconditionally close dest before
// returning, and it must return context.ErrCanceled if
// ctx.Done() becomes readable.
StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error)
//
// When StreamBlobs reaches the end, the return value is ("", nil).
// The nextContinueToken must only ever be non-empty if err is
// context.ErrCanceled.
StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string) (nextContinueToken string, err error)
}
// Cache is the minimal interface expected of a blob cache.