From 5021a45ef5ae3041fc0f4acae3ba210ec471d5bf Mon Sep 17 00:00:00 2001 From: Brian Gitonga Marete Date: Fri, 14 Feb 2014 03:54:40 +0300 Subject: [PATCH] pkg/blobserver/diskpacked: Implement blob streaming. Change-Id: Ia5336c9556d9f9f02dc87d992a53415d5489a298 --- pkg/blobserver/diskpacked/diskpacked.go | 156 +++++++++++++ pkg/blobserver/diskpacked/stream_test.go | 284 +++++++++++++++++++++++ 2 files changed, 440 insertions(+) create mode 100644 pkg/blobserver/diskpacked/stream_test.go diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index e2c8bca64..e16071594 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -31,11 +31,13 @@ Example low-level config: package diskpacked import ( + "bufio" "bytes" "errors" "expvar" "fmt" "io" + "io/ioutil" "log" "os" "path/filepath" @@ -440,6 +442,160 @@ func (s *storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef return nil } +// The continuation token will be in the form: " " +func parseContToken(token string) (pack int, offset int64, err error) { + // Special case + if token == "" { + pack = 0 + offset = 0 + return + } + _, err = fmt.Sscan(token, &pack, &offset) + + return +} + +func readHeader(r io.Reader) (digest string, size uint32, err error) { + _, err = fmt.Fscanf(r, "[%s %d]", &digest, &size) + + return +} + +func headerLength(digest string, size uint32) int { + // Assumes that the size in the header is always in base-10 + // format, and also that precisely one space separates the + // digest and the size. + return len(fmt.Sprintf("[%s %d]", digest, size)) +} + +// The header of deleted blobs has a digest in which the hash type is +// set to all 'x', but the correct size. +func isDeletedRef(digest string) bool { + return strings.HasPrefix(digest, "x") +} + +// Type readSeekNopCloser is an io.ReadSeeker with a no-op Close method. +type readSeekNopCloser struct { + io.ReadSeeker +} + +func (readSeekNopCloser) Close() error { return nil } + +func newReadSeekNopCloser(rs io.ReadSeeker) types.ReadSeekCloser { + return readSeekNopCloser{rs} +} + +// StreamBlobs Implements the blobserver.StreamBlobs interface. +func (s *storage) StreamBlobs(ctx *context.Context, dest chan<- *blob.Blob, contToken string, limitBytes int64) (nextContinueToken string, err error) { + defer close(dest) + + i, offset, err := parseContToken(contToken) + if err != nil { + return + } + debug.Printf("Continuing blob streaming from pack %s, offset %d", + s.filename(i), offset) + + fd, err := os.Open(s.filename(i)) + if err != nil { + return + } + defer fd.Close() + + // ContToken always refers to the exact next place we will read from + _, err = fd.Seek(offset, os.SEEK_SET) + if err != nil { + return + } + + const ioBufSize = 256 * 1024 + + // We'll use bufio to avoid read system call overhead. + r := bufio.NewReaderSize(fd, ioBufSize) + + var offsetToAdd int64 = 0 + var sent int64 = 0 + setNextContToken := func() { + nextContinueToken = fmt.Sprintf("%d %d", i, offset+offsetToAdd) + } + for { + if sent >= limitBytes { + setNextContToken() + break + } + + // Are we at the EOF of this pack? + _, err = r.Peek(1) + if err != nil { + if err == io.EOF { + // Continue to the next pack, if there's any + i += 1 + offset = 0 + offsetToAdd = 0 + fd.Close() // Close the previous pack + fd, err = os.Open(s.filename(i)) + if err != nil { + if os.IsNotExist(err) { + return "", nil + } + return + } + defer fd.Close() + r = bufio.NewReaderSize(fd, ioBufSize) + continue + } + + return + } + + var digest string + var size uint32 + digest, size, err = readHeader(r) + if err != nil { + return + } + + offsetToAdd += int64(headerLength(digest, size)) + if isDeletedRef(digest) { + // Skip over deletion padding + _, err = io.CopyN(ioutil.Discard, r, int64(size)) + if err != nil { + return + } + offsetToAdd += int64(size) + continue + } + + // Finally, read and send the blob + data := make([]byte, size) + _, err = io.ReadFull(r, data) + if err != nil { + return + } + offsetToAdd += int64(size) + newReader := func() types.ReadSeekCloser { + return newReadSeekNopCloser(bytes.NewReader(data)) + } + ref, ok := blob.Parse(digest) + if !ok { + err = fmt.Errorf("diskpacked: Invalid blobref %s", + digest) + return + } + blob := blob.NewBlob(ref, size, newReader) + + select { + case dest <- blob: + sent += int64(size) + case <-ctx.Done(): + err = context.ErrCanceled + return + } + } + + return +} + func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) { var b bytes.Buffer n, err := b.ReadFrom(source) diff --git a/pkg/blobserver/diskpacked/stream_test.go b/pkg/blobserver/diskpacked/stream_test.go new file mode 100644 index 000000000..0d08a1944 --- /dev/null +++ b/pkg/blobserver/diskpacked/stream_test.go @@ -0,0 +1,284 @@ +/* +Copyright 2013 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package diskpacked + +import ( + "bytes" + "crypto/sha1" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" +) + +type blobDetails struct { + digest string + data string // hex-encoded +} + +type pack struct { + blobs []blobDetails +} + +var pool00001 = []blobDetails{ + {"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"}, + {"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"}, + {"sha1-4316a49fc962f627350ca0a01532421b8b93831d", "b782e7a6"}, + {"sha1-74801cba6ffe31238f9995cc759b823aed8bd78c", "eedc50aebfa58de1"}, + {"sha1-bd2a193deeb56aa2554a53eda95d69a95e7bf642", "104c00d6cf9f486f277e8f0493759a21"}, +} + +func uploadTestBlobs(t *testing.T, s blobserver.Storage, blobs []blobDetails) { + for _, b := range blobs { + ref, ok := blob.Parse(b.digest) + if !ok { + t.Fatalf("Invalid blob ref: %s", b.digest) + } + data, err := hex.DecodeString(b.data) + if err != nil { + t.Fatalf("hex.DecodeString(): %v", err) + } + + _, err = blobserver.Receive(s, ref, bytes.NewBuffer(data)) + if err != nil { + t.Fatalf("blobserver.Receive(): %v", err) + } + } +} + +func basename(i int) string { + return fmt.Sprintf("pack-%05d.blobs", i) +} + +func writePack(t *testing.T, dir string, i int, p pack) { + fd, err := os.Create(filepath.Join(dir, basename(i))) + if err != nil { + t.Fatal(err) + } + defer fd.Close() + + for _, b := range p.blobs { + data, err := hex.DecodeString(b.data) + if err != nil { + t.Fatal(err) + } + + _, err = io.WriteString(fd, fmt.Sprintf("[%s %d]", b.digest, + len(data))) + if err != nil { + t.Fatal(err) + } + + _, err = fd.Write(data) + if err != nil { + t.Fatal(err) + } + } + +} + +func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { + dir, err := ioutil.TempDir("", "diskpacked-test") + if err != nil { + t.Fatal(err) + } + + for i, p := range packs { + writePack(t, dir, i, p) + } + + s, err = newStorage(dir, 0, nil) + if err != nil { + t.Fatal(err) + } + + clean = func() { + s.Close() + os.RemoveAll(dir) + } + + return +} + +// Streams all blobs until the total size of the blobs transfered +// equals or exceeds limit (in bytes) or the storage runs out of +// blobs, and returns them. It verifies the size and hash of each +// before returning and fails the test if any of the checks fail. It +// also fails the test if StreamBlobs returns a non-nil error. +func getAllUpToLimit(t *testing.T, s *storage, tok string, limit int64) (blobs []*blob.Blob, contToken string) { + ctx := context.New() + ch := make(chan *blob.Blob) + nextCh := make(chan string, 1) + errCh := make(chan error, 1) + + go func() { + next, err := s.StreamBlobs(ctx, ch, tok, limit) + + nextCh <- next + errCh <- err + }() + + blobs = make([]*blob.Blob, 0, 32) + for blob := range ch { + verifySizeAndHash(t, blob) + blobs = append(blobs, blob) + } + + contToken = <-nextCh + + if err := <-errCh; err != nil { + t.Fatal(err) + } + + return + +} + +// Tests the streaming of all blobs in a storage, with hash verification. +func TestBasicStreaming(t *testing.T) { + s, clean := newTestStorage(t, pack{pool00001}) + defer clean() + + limit := int64(999999) + expected := len(pool00001) + blobs, next := getAllUpToLimit(t, s, "", limit) + + if len(blobs) != expected { + t.Fatalf("Wrong blob count: Expected %d, got %d", expected, + len(blobs)) + } + + if next != "" { + t.Fatalf("Expected empty continuation token, got: %s", next) + } + +} + +func verifySizeAndHash(t *testing.T, blob *blob.Blob) { + hash := sha1.New() + r := blob.Open() + n, err := io.Copy(hash, r) + if err != nil { + t.Fatal(err) + } + r.Close() + + if uint32(n) != blob.Size() { + t.Fatalf("Wrong blob size. Expected %d, got %d", + blob.Size(), n) + } + + if !blob.SizedRef().HashMatches(hash) { + t.Fatal("Blob has wrong digest") + } +} + +// Tests that StreamBlobs respects the byte limit (limitBytes) +func TestLimitBytes(t *testing.T) { + s, clean := newTestStorage(t, pack{pool00001}) + defer clean() + + limit := int64(1) // This should cause us to get only the 1st blob. + expected := 1 + blobs, next := getAllUpToLimit(t, s, "", limit) + + if len(blobs) != expected { + t.Fatalf("Wrong blob count: Expected %d, got %d", expected, + len(blobs)) + } + + // For pool00001, the header + data of the first blob is has len 50 + expectedContToken := "0 50" + if next != expectedContToken { + t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", expectedContToken, next) + } +} + +func TestSeekToContToken(t *testing.T) { + s, clean := newTestStorage(t, pack{pool00001}) + defer clean() + + limit := int64(999999) + expected := len(pool00001) - 1 + blobs, next := getAllUpToLimit(t, s, "0 50", limit) + + if len(blobs) != expected { + t.Fatalf("Wrong blob count: Expected %d, got %d", expected, + len(blobs)) + } + + if next != "" { + t.Fatalf("Unexpected continuation token. Expected \"%s\", got \"%s\"", "", next) + } +} + +// Tests that we can correctly switch over to the next pack if we +// still need to stream more blobs when a pack reaches EOF. +func TestStreamMultiplePacks(t *testing.T) { + s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001}) + defer clean() + + limit := int64(999999) + expected := 2 * len(pool00001) + blobs, _ := getAllUpToLimit(t, s, "", limit) + + if len(blobs) != expected { + t.Fatalf("Wrong blob count: Expected %d, got %d", expected, + len(blobs)) + } +} + +func TestSkipRemovedBlobs(t *testing.T) { + // Note: This is the only streaming test that makes use of the + // index (for RemoveBlobs() to succeed). The others do create + // an indexed storage but they do not use the index to stream + // (nor should they use it). The streaming in this test is + // done by reading the underlying diskpacks. + s, cleanup := newTempDiskpacked(t) + defer cleanup() + + uploadTestBlobs(t, s, pool00001) + + ref, ok := blob.Parse(pool00001[0].digest) + if !ok { + t.Fatalf("blob.Parse: %s", pool00001[0].digest) + } + + err := s.RemoveBlobs([]blob.Ref{ref}) + if err != nil { + t.Fatalf("blobserver.Storage.RemoveBlobs(): %v", err) + } + + diskpackedSto := s.(*storage) + + limit := int64(999999) + expected := len(pool00001) - 1 // We've deleted 1 + blobs, _ := getAllUpToLimit(t, diskpackedSto, "", limit) + + if len(blobs) != expected { + t.Fatalf("Wrong blob count: Expected %d, got %d", expected, + len(blobs)) + } + +}