blobserver/storagetest: make TestStreamer stricter, compares against Enumerator

Also then fix up diskpacked to work under these stricter rules.

Noticed this while working on blobpacked's tests and noticed them
surprisingly pass at one point. Now they no longer pass (as they
shouldn't yet, since parts are still TODO).

Updates #532

Change-Id: Ie05d19823453594486caf921cabbd149c43df221
This commit is contained in:
Brad Fitzpatrick 2014-12-25 16:15:46 -08:00
parent 04bba70cc1
commit 6e3fc37ae5
3 changed files with 54 additions and 15 deletions

View File

@ -128,8 +128,7 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr
}
if overwrite && batch != nil {
log.Printf("overwriting %s from %d", index, packID)
if err = index.CommitBatch(batch); err != nil {
if err := index.CommitBatch(batch); err != nil {
return err
}
} else if !allOk {

View File

@ -43,8 +43,7 @@ type pack struct {
blobs []blobDetails
}
// TODO: why is this named pool00001? (--bradfitz)
var pool00001 = []blobDetails{
var testPack1 = []blobDetails{
{"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"},
{"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"},
{"sha1-4316a49fc962f627350ca0a01532421b8b93831d", "b782e7a6"},
@ -52,6 +51,11 @@ var pool00001 = []blobDetails{
{"sha1-bd2a193deeb56aa2554a53eda95d69a95e7bf642", "104c00d6cf9f486f277e8f0493759a21"},
}
var testPack2 = []blobDetails{
{"sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", fmt.Sprintf("%x", []byte("foo"))},
{"sha1-62cdb7020ff920e5aa642c3d4066950dd1f01f4d", fmt.Sprintf("%x", []byte("bar"))},
}
func uploadTestBlobs(t *testing.T, s blobserver.Storage, blobs []blobDetails) {
for _, b := range blobs {
ref, ok := blob.Parse(b.digest)
@ -112,6 +116,9 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
writePack(t, dir, i, p)
}
if err := Reindex(dir, true, nil); err != nil {
t.Fatalf("Reindexing after writing pack files: %v", err)
}
s, err = newStorage(dir, 0, nil)
if err != nil {
t.Fatal(err)
@ -148,10 +155,10 @@ func streamAll(t *testing.T, s *storage) []*blob.Blob {
// Tests the streaming of all blobs in a storage, with hash verification.
func TestBasicStreaming(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001})
s, clean := newTestStorage(t, pack{testPack1})
defer clean()
expected := len(pool00001)
expected := len(testPack1)
blobs := streamAll(t, s)
if len(blobs) != expected {
t.Fatalf("Wrong blob count: Expected %d, got %d", expected,
@ -185,9 +192,9 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) {
// Tests that we can correctly switch over to the next pack if we
// still need to stream more blobs when a pack reaches EOF.
func TestStreamMultiplePacks(t *testing.T) {
s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001})
s, clean := newTestStorage(t, pack{testPack1}, pack{testPack2})
defer clean()
storagetest.TestStreamer(t, s, storagetest.WantN(len(pool00001)+len(pool00001)))
storagetest.TestStreamer(t, s, storagetest.WantN(len(testPack1)+len(testPack2)))
}
func TestStreamSkipRemovedBlobs(t *testing.T) {
@ -199,11 +206,11 @@ func TestStreamSkipRemovedBlobs(t *testing.T) {
s, cleanup := newTempDiskpacked(t)
defer cleanup()
uploadTestBlobs(t, s, pool00001)
uploadTestBlobs(t, s, testPack1)
ref, ok := blob.Parse(pool00001[0].digest)
ref, ok := blob.Parse(testPack1[0].digest)
if !ok {
t.Fatalf("blob.Parse: %s", pool00001[0].digest)
t.Fatalf("blob.Parse: %s", testPack1[0].digest)
}
err := s.RemoveBlobs([]blob.Ref{ref})
@ -212,6 +219,6 @@ func TestStreamSkipRemovedBlobs(t *testing.T) {
}
diskpackedSto := s.(*storage)
expected := len(pool00001) - 1 // We've deleted 1
expected := len(testPack1) - 1 // We've deleted 1
storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected))
}

View File

@ -388,11 +388,29 @@ func (s WantSizedRefs) verify(got []blob.SizedRef) error {
return nil
}
type StreamEnumerator interface {
blobserver.BlobStreamer
blobserver.BlobEnumerator
}
// TestStreamer tests that the BlobStreamer implements all of the
// promised interface behavior and ultimately yields the provided
// blobs.
func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
// First see if, without cancelation, it yields the right
func TestStreamer(t *testing.T, bs StreamEnumerator, opts ...StreamerTestOpt) {
// First do an enumerate over all blobs as a baseline. The Streamer should
// yield the same blobs, even if it's in a different order.
sawEnum := make(map[blob.SizedRef]bool)
enumCtx := context.New()
defer enumCtx.Cancel()
if err := blobserver.EnumerateAll(enumCtx, bs, func(sb blob.SizedRef) error {
sawEnum[sb] = true
return nil
}); err != nil {
t.Fatalf("Enumerate: %v", err)
}
t.Logf("enumerated %d blobs in stream test", len(sawEnum))
// See if, without cancelation, it yields the right
// result and without errors.
ch := make(chan blobserver.BlobAndToken)
errCh := make(chan error, 1)
@ -403,11 +421,26 @@ func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTest
}()
var gotRefs []blob.SizedRef
for b := range ch {
gotRefs = append(gotRefs, b.SizedRef())
sbr := b.SizedRef()
if _, ok := sawEnum[sbr]; ok {
delete(sawEnum, sbr)
} else {
t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr)
}
gotRefs = append(gotRefs, sbr)
}
if err := <-errCh; err != nil {
t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
}
nMissing := 0
for sbr := range sawEnum {
t.Errorf("Enumerate found %v but Streamer didn't return it", sbr)
nMissing++
if nMissing == 10 && len(sawEnum) > 10 {
t.Errorf("... etc ...")
break
}
}
for _, opt := range opts {
if err := opt.verify(gotRefs); err != nil {
t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)