From 6e3fc37ae59f1e924c707980a7fe6f5e0e3e9cba Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 25 Dec 2014 16:15:46 -0800 Subject: [PATCH] blobserver/storagetest: make TestStreamer stricter, compares against Enumerator Also then fix up diskpacked to work under these stricter rules. Noticed this while working on blobpacked's tests and noticed them surprisingly pass at one point. Now they no longer pass (as they shouldn't yet, since parts are still TODO). Updates #532 Change-Id: Ie05d19823453594486caf921cabbd149c43df221 --- pkg/blobserver/diskpacked/reindex.go | 3 +- pkg/blobserver/diskpacked/stream_test.go | 27 ++++++++++------ pkg/blobserver/storagetest/storagetest.go | 39 +++++++++++++++++++++-- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/pkg/blobserver/diskpacked/reindex.go b/pkg/blobserver/diskpacked/reindex.go index 93bd24cf7..bd6a0276c 100644 --- a/pkg/blobserver/diskpacked/reindex.go +++ b/pkg/blobserver/diskpacked/reindex.go @@ -128,8 +128,7 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr } if overwrite && batch != nil { - log.Printf("overwriting %s from %d", index, packID) - if err = index.CommitBatch(batch); err != nil { + if err := index.CommitBatch(batch); err != nil { return err } } else if !allOk { diff --git a/pkg/blobserver/diskpacked/stream_test.go b/pkg/blobserver/diskpacked/stream_test.go index 2ddfc2f88..924d65afb 100644 --- a/pkg/blobserver/diskpacked/stream_test.go +++ b/pkg/blobserver/diskpacked/stream_test.go @@ -43,8 +43,7 @@ type pack struct { blobs []blobDetails } -// TODO: why is this named pool00001? (--bradfitz) -var pool00001 = []blobDetails{ +var testPack1 = []blobDetails{ {"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"}, {"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"}, {"sha1-4316a49fc962f627350ca0a01532421b8b93831d", "b782e7a6"}, @@ -52,6 +51,11 @@ var pool00001 = []blobDetails{ {"sha1-bd2a193deeb56aa2554a53eda95d69a95e7bf642", "104c00d6cf9f486f277e8f0493759a21"}, } +var testPack2 = []blobDetails{ + {"sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", fmt.Sprintf("%x", []byte("foo"))}, + {"sha1-62cdb7020ff920e5aa642c3d4066950dd1f01f4d", fmt.Sprintf("%x", []byte("bar"))}, +} + func uploadTestBlobs(t *testing.T, s blobserver.Storage, blobs []blobDetails) { for _, b := range blobs { ref, ok := blob.Parse(b.digest) @@ -112,6 +116,9 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { writePack(t, dir, i, p) } + if err := Reindex(dir, true, nil); err != nil { + t.Fatalf("Reindexing after writing pack files: %v", err) + } s, err = newStorage(dir, 0, nil) if err != nil { t.Fatal(err) @@ -148,10 +155,10 @@ func streamAll(t *testing.T, s *storage) []*blob.Blob { // Tests the streaming of all blobs in a storage, with hash verification. func TestBasicStreaming(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}) + s, clean := newTestStorage(t, pack{testPack1}) defer clean() - expected := len(pool00001) + expected := len(testPack1) blobs := streamAll(t, s) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, @@ -185,9 +192,9 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) { // Tests that we can correctly switch over to the next pack if we // still need to stream more blobs when a pack reaches EOF. func TestStreamMultiplePacks(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001}) + s, clean := newTestStorage(t, pack{testPack1}, pack{testPack2}) defer clean() - storagetest.TestStreamer(t, s, storagetest.WantN(len(pool00001)+len(pool00001))) + storagetest.TestStreamer(t, s, storagetest.WantN(len(testPack1)+len(testPack2))) } func TestStreamSkipRemovedBlobs(t *testing.T) { @@ -199,11 +206,11 @@ func TestStreamSkipRemovedBlobs(t *testing.T) { s, cleanup := newTempDiskpacked(t) defer cleanup() - uploadTestBlobs(t, s, pool00001) + uploadTestBlobs(t, s, testPack1) - ref, ok := blob.Parse(pool00001[0].digest) + ref, ok := blob.Parse(testPack1[0].digest) if !ok { - t.Fatalf("blob.Parse: %s", pool00001[0].digest) + t.Fatalf("blob.Parse: %s", testPack1[0].digest) } err := s.RemoveBlobs([]blob.Ref{ref}) @@ -212,6 +219,6 @@ func TestStreamSkipRemovedBlobs(t *testing.T) { } diskpackedSto := s.(*storage) - expected := len(pool00001) - 1 // We've deleted 1 + expected := len(testPack1) - 1 // We've deleted 1 storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected)) } diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 463d54e77..aa1c0c84c 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -388,11 +388,29 @@ func (s WantSizedRefs) verify(got []blob.SizedRef) error { return nil } +type StreamEnumerator interface { + blobserver.BlobStreamer + blobserver.BlobEnumerator +} + // TestStreamer tests that the BlobStreamer implements all of the // promised interface behavior and ultimately yields the provided // blobs. -func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) { - // First see if, without cancelation, it yields the right +func TestStreamer(t *testing.T, bs StreamEnumerator, opts ...StreamerTestOpt) { + // First do an enumerate over all blobs as a baseline. The Streamer should + // yield the same blobs, even if it's in a different order. + sawEnum := make(map[blob.SizedRef]bool) + enumCtx := context.New() + defer enumCtx.Cancel() + if err := blobserver.EnumerateAll(enumCtx, bs, func(sb blob.SizedRef) error { + sawEnum[sb] = true + return nil + }); err != nil { + t.Fatalf("Enumerate: %v", err) + } + t.Logf("enumerated %d blobs in stream test", len(sawEnum)) + + // See if, without cancelation, it yields the right // result and without errors. ch := make(chan blobserver.BlobAndToken) errCh := make(chan error, 1) @@ -403,11 +421,26 @@ func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTest }() var gotRefs []blob.SizedRef for b := range ch { - gotRefs = append(gotRefs, b.SizedRef()) + sbr := b.SizedRef() + if _, ok := sawEnum[sbr]; ok { + delete(sawEnum, sbr) + } else { + t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr) + } + gotRefs = append(gotRefs, sbr) } if err := <-errCh; err != nil { t.Errorf("initial uninterrupted StreamBlobs error: %v", err) } + nMissing := 0 + for sbr := range sawEnum { + t.Errorf("Enumerate found %v but Streamer didn't return it", sbr) + nMissing++ + if nMissing == 10 && len(sawEnum) > 10 { + t.Errorf("... etc ...") + break + } + } for _, opt := range opts { if err := opt.verify(gotRefs); err != nil { t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)