diff --git a/pkg/blobserver/blobpacked/blobpacked.go b/pkg/blobserver/blobpacked/blobpacked.go index c6ac97c45..12940d479 100644 --- a/pkg/blobserver/blobpacked/blobpacked.go +++ b/pkg/blobserver/blobpacked/blobpacked.go @@ -123,6 +123,10 @@ const ( wholeMetaPrefix = "w:" ) +const ( + zipManifestPath = "camlistore/camlistore-pack-manifest.json" +) + type subFetcherStorage interface { blobserver.Storage blob.SubFetcher @@ -802,7 +806,7 @@ func (pk *packer) writeAZip(trunc blob.Ref) (err error) { } // Manifest file - fw, err = zw.Create("camlistore/camlistore-pack-manifest.json") + fw, err = zw.Create(zipManifestPath) check(err) enc, err := json.MarshalIndent(mf, "", " ") check(err) @@ -868,6 +872,130 @@ func (pk *packer) writeAZip(trunc blob.Ref) (err error) { return nil } +// foreachZipBlob calls fn for each blob in the zip pack blob +// identified by zipRef. If fn returns a non-nil error, +// foreachZipBlob stops enumerating with that error. +func (s *storage) foreachZipBlob(zipRef blob.Ref, fn func(BlobAndPos) error) error { + sb, err := blobserver.StatBlob(s.large, zipRef) + if err != nil { + return err + } + zr, err := zip.NewReader(blob.ReaderAt(s.large, zipRef), int64(sb.Size)) + if err != nil { + return err + } + var maniFile *zip.File // or nil if not found + var firstOff int64 // offset of first file (the packed data chunks) + for i, f := range zr.File { + if i == 0 { + firstOff, err = f.DataOffset() + if err != nil { + return err + } + } + if f.Name == zipManifestPath { + maniFile = f + break + } + } + if maniFile == nil { + return errors.New("no camlistore manifest file found in zip") + } + for _, f := range zr.File { + if !strings.HasPrefix(f.Name, "camlistore/") || f.Name == zipManifestPath || + !strings.HasSuffix(f.Name, ".json") { + continue + } + brStr := strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json") + br, ok := blob.Parse(brStr) + if ok { + off, err := f.DataOffset() + if err != nil { + return err + } + if err := fn(BlobAndPos{ + SizedRef: blob.SizedRef{br, uint32(f.UncompressedSize64)}, + Offset: off, + }); err != nil { + return err + } + } + } + maniRC, err := maniFile.Open() + if err != nil { + return err + } + defer maniRC.Close() + var mf Manifest + if err := json.NewDecoder(maniRC).Decode(&mf); err != nil { + return err + } + if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() { + return errors.New("incomplete blobpack manifest JSON") + } + for _, bap := range mf.DataBlobs { + bap.Offset += firstOff + if err := fn(bap); err != nil { + return err + } + } + return nil +} + +// deleteZipPack deletes the zip pack file br, but only if that zip +// file's parts are deleted already from the meta index. +func (s *storage) deleteZipPack(br blob.Ref) error { + inUse, err := s.zipPartsInUse(br) + if err != nil { + return err + } + if len(inUse) > 0 { + return fmt.Errorf("can't delete zip pack %v: %d parts in use: %v", br, len(inUse), inUse) + } + if err := s.large.RemoveBlobs([]blob.Ref{br}); err != nil { + return err + } + return s.meta.Delete("d:" + br.String()) +} + +func (s *storage) zipPartsInUse(br blob.Ref) ([]blob.Ref, error) { + var ( + mu sync.Mutex + inUse []blob.Ref + ) + var grp syncutil.Group + gate := syncutil.NewGate(20) // arbitrary constant + err := s.foreachZipBlob(br, func(bap BlobAndPos) error { + gate.Start() + grp.Go(func() error { + defer gate.Done() + mr, err := s.getMetaRow(bap.Ref) + if err != nil { + return err + } + if mr.largeRef.Valid() { + mu.Lock() + inUse = append(inUse, mr.largeRef) + mu.Unlock() + } + return nil + }) + return nil + }) + if os.IsNotExist(err) { + // An already-deleted blob from large isn't considered + // to be in-use. + return nil, nil + } + if err != nil { + return nil, err + } + if err := grp.Err(); err != nil { + return nil, err + } + return inUse, nil +} + // A BlobAndPos is a blobref, its size, and where it is located within // a larger group of bytes. type BlobAndPos struct { diff --git a/pkg/blobserver/blobpacked/blobpacked_test.go b/pkg/blobserver/blobpacked/blobpacked_test.go index 238cf4a29..186ea32a1 100644 --- a/pkg/blobserver/blobpacked/blobpacked_test.go +++ b/pkg/blobserver/blobpacked/blobpacked_test.go @@ -26,6 +26,7 @@ import ( "reflect" "runtime" "strconv" + "strings" "testing" "time" @@ -477,6 +478,92 @@ func TestStreamBlobs(t *testing.T) { } } +func TestForeachZipBlob(t *testing.T) { + const fileSize = 2 << 20 + const fileName = "foo.dat" + fileContents := randBytes(fileSize) + + ctx := context.New() + defer ctx.Cancel() + + var pt *packTest + testPack(t, + func(sto blobserver.Storage) error { + _, err := schema.WriteFileFromReader(sto, fileName, bytes.NewReader(fileContents)) + return err + }, + wantNumLargeBlobs(1), + wantNumSmallBlobs(0), + func(v *packTest) { + pt = v + }, + ) + + zipBlob, err := singleBlob(pt.large) + if err != nil { + t.Fatal(err) + } + zipBytes := slurpBlob(t, pt.large, zipBlob.Ref) + + all := map[blob.Ref]blob.SizedRef{} + if err := blobserver.EnumerateAll(ctx, pt.logical, func(sb blob.SizedRef) error { + all[sb.Ref] = sb + return nil + }); err != nil { + t.Fatal(err) + } + foreachSaw := 0 + if err := pt.sto.foreachZipBlob(zipBlob.Ref, func(bap BlobAndPos) error { + foreachSaw++ + want, ok := all[bap.Ref] + if !ok { + t.Error("unwanted blob ref returned from foreachZipBlob: %v", bap.Ref) + return nil + } + delete(all, bap.Ref) + if want.Size != bap.Size { + t.Error("for %v, foreachZipBlob size = %d; want %d", bap.Ref, bap.Size, want.Size) + return nil + } + + // Verify the offset. + h := bap.Ref.Hash() + h.Write(zipBytes[bap.Offset : bap.Offset+int64(bap.Size)]) + if !bap.Ref.HashMatches(h) { + return fmt.Errorf("foreachZipBlob returned blob %v at offset %d that failed validation", bap.Ref, bap.Offset) + } + + return nil + }); err != nil { + t.Fatal(err) + } + + t.Logf("foreachZipBlob enumerated %d blobs", foreachSaw) + if len(all) > 0 { + t.Errorf("foreachZipBlob forgot to enumerate %d blobs: %v", len(all), all) + } +} + +// singleBlob assumes that sto contains a single blob and returns it. +// If there are more or fewer than one blob, it's an error. +func singleBlob(sto blobserver.BlobEnumerator) (ret blob.SizedRef, err error) { + ctx := context.New() + defer ctx.Cancel() + + n := 0 + if err = blobserver.EnumerateAll(ctx, sto, func(sb blob.SizedRef) error { + ret = sb + n++ + return nil + }); err != nil { + return blob.SizedRef{}, err + } + if n != 1 { + return blob.SizedRef{}, fmt.Errorf("saw %d blobs; want 1", n) + } + return +} + func TestRemoveBlobs(t *testing.T) { ctx := context.New() defer ctx.Cancel() @@ -509,6 +596,19 @@ func TestRemoveBlobs(t *testing.T) { }); err != nil { t.Fatal(err) } + + // Find the zip + zipBlob, err := singleBlob(sto.large) + if err != nil { + t.Fatalf("failed to find packed zip: %v", err) + } + + // The zip file is in use, so verify we can't delete it. + if err := sto.deleteZipPack(zipBlob.Ref); err == nil { + t.Fatalf("zip pack blob deleted but it should not have been allowed") + } + + // Delete everything for len(all) > 0 { del := all[0].Ref all = all[1:] @@ -520,7 +620,40 @@ func TestRemoveBlobs(t *testing.T) { } } - // TODO: verify the metarow for "d:" is there. - // TODO: and then force a scan to delete zips, and verify 0 blobs everywhere - // TODO: and then verify the metarow for "d:" is gone. + dRows := func() (n int) { + if err := sorted.ForeachInRange(sto.meta, "d:", "", func(key, value string) error { + if strings.HasPrefix(key, "d:") { + n++ + } + return nil + }); err != nil { + t.Fatal("meta iteration error: %v", err) + } + return + } + + if n := dRows(); n == 0 { + t.Fatalf("expected a 'd:' row after deletes") + } + + // TODO: test the background pack-deleter loop? figure out its design first. + if err := sto.deleteZipPack(zipBlob.Ref); err != nil { + t.Errorf("error deleting zip %v: %v", zipBlob.Ref, err) + } + if n := dRows(); n != 0 { + t.Errorf("expected the 'd:' row to be deleted") + } +} + +func slurpBlob(t *testing.T, sto blob.Fetcher, br blob.Ref) []byte { + rc, _, err := sto.Fetch(br) + if err != nil { + t.Fatal(err) + } + defer rc.Close() + slurp, err := ioutil.ReadAll(rc) + if err != nil { + t.Fatal(err) + } + return slurp }