diff --git a/pkg/blobserver/diskpacked/reindex.go b/pkg/blobserver/diskpacked/reindex.go index 93bd24cf7..bd6a0276c 100644 --- a/pkg/blobserver/diskpacked/reindex.go +++ b/pkg/blobserver/diskpacked/reindex.go @@ -128,8 +128,7 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr } if overwrite && batch != nil { - log.Printf("overwriting %s from %d", index, packID) - if err = index.CommitBatch(batch); err != nil { + if err := index.CommitBatch(batch); err != nil { return err } } else if !allOk { diff --git a/pkg/blobserver/diskpacked/stream_test.go b/pkg/blobserver/diskpacked/stream_test.go index 2ddfc2f88..924d65afb 100644 --- a/pkg/blobserver/diskpacked/stream_test.go +++ b/pkg/blobserver/diskpacked/stream_test.go @@ -43,8 +43,7 @@ type pack struct { blobs []blobDetails } -// TODO: why is this named pool00001? (--bradfitz) -var pool00001 = []blobDetails{ +var testPack1 = []blobDetails{ {"sha1-04f029feccd2c5c3d3ef87329eb85606bbdd2698", "94"}, {"sha1-db846319868cf27ecc444bcc34cf126c86bf9a07", "6396"}, {"sha1-4316a49fc962f627350ca0a01532421b8b93831d", "b782e7a6"}, @@ -52,6 +51,11 @@ var pool00001 = []blobDetails{ {"sha1-bd2a193deeb56aa2554a53eda95d69a95e7bf642", "104c00d6cf9f486f277e8f0493759a21"}, } +var testPack2 = []blobDetails{ + {"sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", fmt.Sprintf("%x", []byte("foo"))}, + {"sha1-62cdb7020ff920e5aa642c3d4066950dd1f01f4d", fmt.Sprintf("%x", []byte("bar"))}, +} + func uploadTestBlobs(t *testing.T, s blobserver.Storage, blobs []blobDetails) { for _, b := range blobs { ref, ok := blob.Parse(b.digest) @@ -112,6 +116,9 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) { writePack(t, dir, i, p) } + if err := Reindex(dir, true, nil); err != nil { + t.Fatalf("Reindexing after writing pack files: %v", err) + } s, err = newStorage(dir, 0, nil) if err != nil { t.Fatal(err) @@ -148,10 +155,10 @@ func streamAll(t *testing.T, s *storage) []*blob.Blob { // Tests the streaming of all blobs in a storage, with hash verification. func TestBasicStreaming(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}) + s, clean := newTestStorage(t, pack{testPack1}) defer clean() - expected := len(pool00001) + expected := len(testPack1) blobs := streamAll(t, s) if len(blobs) != expected { t.Fatalf("Wrong blob count: Expected %d, got %d", expected, @@ -185,9 +192,9 @@ func verifySizeAndHash(t *testing.T, blob *blob.Blob) { // Tests that we can correctly switch over to the next pack if we // still need to stream more blobs when a pack reaches EOF. func TestStreamMultiplePacks(t *testing.T) { - s, clean := newTestStorage(t, pack{pool00001}, pack{pool00001}) + s, clean := newTestStorage(t, pack{testPack1}, pack{testPack2}) defer clean() - storagetest.TestStreamer(t, s, storagetest.WantN(len(pool00001)+len(pool00001))) + storagetest.TestStreamer(t, s, storagetest.WantN(len(testPack1)+len(testPack2))) } func TestStreamSkipRemovedBlobs(t *testing.T) { @@ -199,11 +206,11 @@ func TestStreamSkipRemovedBlobs(t *testing.T) { s, cleanup := newTempDiskpacked(t) defer cleanup() - uploadTestBlobs(t, s, pool00001) + uploadTestBlobs(t, s, testPack1) - ref, ok := blob.Parse(pool00001[0].digest) + ref, ok := blob.Parse(testPack1[0].digest) if !ok { - t.Fatalf("blob.Parse: %s", pool00001[0].digest) + t.Fatalf("blob.Parse: %s", testPack1[0].digest) } err := s.RemoveBlobs([]blob.Ref{ref}) @@ -212,6 +219,6 @@ func TestStreamSkipRemovedBlobs(t *testing.T) { } diskpackedSto := s.(*storage) - expected := len(pool00001) - 1 // We've deleted 1 + expected := len(testPack1) - 1 // We've deleted 1 storagetest.TestStreamer(t, diskpackedSto, storagetest.WantN(expected)) } diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 463d54e77..aa1c0c84c 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -388,11 +388,29 @@ func (s WantSizedRefs) verify(got []blob.SizedRef) error { return nil } +type StreamEnumerator interface { + blobserver.BlobStreamer + blobserver.BlobEnumerator +} + // TestStreamer tests that the BlobStreamer implements all of the // promised interface behavior and ultimately yields the provided // blobs. -func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) { - // First see if, without cancelation, it yields the right +func TestStreamer(t *testing.T, bs StreamEnumerator, opts ...StreamerTestOpt) { + // First do an enumerate over all blobs as a baseline. The Streamer should + // yield the same blobs, even if it's in a different order. + sawEnum := make(map[blob.SizedRef]bool) + enumCtx := context.New() + defer enumCtx.Cancel() + if err := blobserver.EnumerateAll(enumCtx, bs, func(sb blob.SizedRef) error { + sawEnum[sb] = true + return nil + }); err != nil { + t.Fatalf("Enumerate: %v", err) + } + t.Logf("enumerated %d blobs in stream test", len(sawEnum)) + + // See if, without cancelation, it yields the right // result and without errors. ch := make(chan blobserver.BlobAndToken) errCh := make(chan error, 1) @@ -403,11 +421,26 @@ func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTest }() var gotRefs []blob.SizedRef for b := range ch { - gotRefs = append(gotRefs, b.SizedRef()) + sbr := b.SizedRef() + if _, ok := sawEnum[sbr]; ok { + delete(sawEnum, sbr) + } else { + t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr) + } + gotRefs = append(gotRefs, sbr) } if err := <-errCh; err != nil { t.Errorf("initial uninterrupted StreamBlobs error: %v", err) } + nMissing := 0 + for sbr := range sawEnum { + t.Errorf("Enumerate found %v but Streamer didn't return it", sbr) + nMissing++ + if nMissing == 10 && len(sawEnum) > 10 { + t.Errorf("... etc ...") + break + } + } for _, opt := range opts { if err := opt.verify(gotRefs); err != nil { t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)