perkeep/pkg/blobserver/diskpacked/stream_test.go

226 lines
5.6 KiB
Go

/*
Copyright 2013 The Perkeep Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diskpacked
import (
"bytes"
"context"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"testing"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/blobserver"
"perkeep.org/pkg/blobserver/storagetest"
"perkeep.org/pkg/test"
)
type blobDetails struct {
digest string
data string // hex-encoded
}
type pack struct {
blobs []blobDetails
}
var testPack1 = []blobDetails{
{"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"},
{"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"},
{"sha1-4316a49fc962f627350ca0a01532421b8b93831d", "b782e7a6"},
{"sha1-74801cba6ffe31238f9995cc759b823aed8bd78c", "eedc50aebfa58de1"},
{"sha1-bd2a193deeb56aa2554a53eda95d69a95e7bf642", "104c00d6cf9f486f277e8f0493759a21"},
}
var testPack2 = []blobDetails{
{"sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", fmt.Sprintf("%x", []byte("foo"))},
{"sha1-62cdb7020ff920e5aa642c3d4066950dd1f01f4d", fmt.Sprintf("%x", []byte("bar"))},
}
func uploadTestBlobs(t *testing.T, s blobserver.Storage, blobs []blobDetails) {
for _, b := range blobs {
ref, ok := blob.Parse(b.digest)
if !ok {
t.Fatalf("Invalid blob ref: %s", b.digest)
}
data, err := hex.DecodeString(b.data)
if err != nil {
t.Fatalf("hex.DecodeString(): %v", err)
}
_, err = blobserver.Receive(ctxbg, s, ref, bytes.NewBuffer(data))
if err != nil {
t.Fatalf("blobserver.Receive(): %v", err)
}
}
}
func basename(i int) string {
return fmt.Sprintf("pack-%05d.blobs", i)
}
func writePack(t *testing.T, dir string, i int, p pack) {
fd, err := os.Create(filepath.Join(dir, basename(i)))
if err != nil {
t.Fatal(err)
}
defer fd.Close()
for _, b := range p.blobs {
data, err := hex.DecodeString(b.data)
if err != nil {
t.Fatal(err)
}
_, err = io.WriteString(fd, fmt.Sprintf("[%s %d]", b.digest,
len(data)))
if err != nil {
t.Fatal(err)
}
_, err = fd.Write(data)
if err != nil {
t.Fatal(err)
}
}
}
func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
restoreLogging := test.TLog(t)
dir := t.TempDir()
for i, p := range packs {
writePack(t, dir, i, p)
}
ctx, cancel := context.WithCancel(context.Background())
err := Reindex(ctx, dir, true, nil)
cancel()
if err != nil {
t.Fatalf("Reindexing after writing pack files: %v", err)
}
s, err = newStorage(dir, 0, nil)
if err != nil {
t.Fatal(err)
}
clean = func() {
s.Close()
restoreLogging()
}
return s, clean
}
// It verifies the size and hash of each
// before returning and fails the test if any of the checks fail. It
// also fails the test if StreamBlobs returns a non-nil error.
func streamAll(t *testing.T, s *storage) []*blob.Blob {
blobs := make([]*blob.Blob, 0, 1024)
ch := make(chan blobserver.BlobAndToken)
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { errCh <- s.StreamBlobs(ctx, ch, "") }()
for bt := range ch {
verifySizeAndHash(t, bt.Blob)
blobs = append(blobs, bt.Blob)
}
if err := <-errCh; err != nil {
t.Fatalf("StreamBlobs error = %v", err)
}
return blobs
}
// Tests the streaming of all blobs in a storage, with hash verification.
func TestBasicStreaming(t *testing.T) {
s, clean := newTestStorage(t, pack{testPack1})
defer clean()
expected := len(testPack1)
blobs := streamAll(t, s)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
len(blobs))
}
wantRefs := make([]blob.SizedRef, len(blobs))
for i, b := range blobs {
wantRefs[i] = b.SizedRef()
}
storagetest.TestStreamer(t, s, storagetest.WantSizedRefs(wantRefs))
}
func verifySizeAndHash(t *testing.T, blob *blob.Blob) {
hash := sha1.New()
r, err := blob.ReadAll(ctxbg)
if err != nil {
t.Fatal(err)
}
n, err := io.Copy(hash, r)
if err != nil {
t.Fatal(err)
}
if uint32(n) != blob.Size() {
t.Fatalf("read %d bytes from blob %v; want %v", n, blob.Ref(), blob.Size())
}
if !blob.SizedRef().HashMatches(hash) {
t.Fatalf("read wrong bytes from blobref %v (digest mismatch)", blob.Ref())
}
}
// Tests that we can correctly switch over to the next pack if we
// still need to stream more blobs when a pack reaches EOF.
func TestStreamMultiplePacks(t *testing.T) {
s, clean := newTestStorage(t, pack{testPack1}, pack{testPack2})
defer clean()
storagetest.TestStreamer(t, s, storagetest.WantN(len(testPack1)+len(testPack2)))
}
func TestStreamSkipRemovedBlobs(t *testing.T) {
ctx := context.Background()
// Note: This is the only streaming test that makes use of the
// index (for RemoveBlobs() to succeed). The others do create
// an indexed storage but they do not use the index to stream
// (nor should they use it). The streaming in this test is
// done by reading the underlying diskpacks.
s := newTempDiskpacked(t)
uploadTestBlobs(t, s, testPack1)
ref, ok := blob.Parse(testPack1[0].digest)
if !ok {
t.Fatalf("blob.Parse: %s", testPack1[0].digest)
}
err := s.RemoveBlobs(ctx, []blob.Ref{ref})
if err != nil {
t.Fatalf("RemoveBlobs: %v", err)
}
diskpackedSto := s.(*storage)
expected := len(testPack1) - 1 // We've deleted 1
storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected))
}