blobpacked: RemoveBlobs work: deleteZipPack, foreachZipBlob, more tests

Change-Id: Ice35948a071703a2683f20f5ccdd710a746f9c7b
This commit is contained in:
Brad Fitzpatrick 2014-10-28 19:31:06 -03:00
parent 5ac4e18a62
commit 3ee058675e
2 changed files with 265 additions and 4 deletions

View File

@ -123,6 +123,10 @@ const (
wholeMetaPrefix = "w:"
)
const (
zipManifestPath = "camlistore/camlistore-pack-manifest.json"
)
type subFetcherStorage interface {
blobserver.Storage
blob.SubFetcher
@ -802,7 +806,7 @@ func (pk *packer) writeAZip(trunc blob.Ref) (err error) {
}
// Manifest file
fw, err = zw.Create("camlistore/camlistore-pack-manifest.json")
fw, err = zw.Create(zipManifestPath)
check(err)
enc, err := json.MarshalIndent(mf, "", " ")
check(err)
@ -868,6 +872,130 @@ func (pk *packer) writeAZip(trunc blob.Ref) (err error) {
return nil
}
// foreachZipBlob calls fn for each blob in the zip pack blob
// identified by zipRef. If fn returns a non-nil error,
// foreachZipBlob stops enumerating with that error.
func (s *storage) foreachZipBlob(zipRef blob.Ref, fn func(BlobAndPos) error) error {
sb, err := blobserver.StatBlob(s.large, zipRef)
if err != nil {
return err
}
zr, err := zip.NewReader(blob.ReaderAt(s.large, zipRef), int64(sb.Size))
if err != nil {
return err
}
var maniFile *zip.File // or nil if not found
var firstOff int64 // offset of first file (the packed data chunks)
for i, f := range zr.File {
if i == 0 {
firstOff, err = f.DataOffset()
if err != nil {
return err
}
}
if f.Name == zipManifestPath {
maniFile = f
break
}
}
if maniFile == nil {
return errors.New("no camlistore manifest file found in zip")
}
for _, f := range zr.File {
if !strings.HasPrefix(f.Name, "camlistore/") || f.Name == zipManifestPath ||
!strings.HasSuffix(f.Name, ".json") {
continue
}
brStr := strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json")
br, ok := blob.Parse(brStr)
if ok {
off, err := f.DataOffset()
if err != nil {
return err
}
if err := fn(BlobAndPos{
SizedRef: blob.SizedRef{br, uint32(f.UncompressedSize64)},
Offset: off,
}); err != nil {
return err
}
}
}
maniRC, err := maniFile.Open()
if err != nil {
return err
}
defer maniRC.Close()
var mf Manifest
if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
return err
}
if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
return errors.New("incomplete blobpack manifest JSON")
}
for _, bap := range mf.DataBlobs {
bap.Offset += firstOff
if err := fn(bap); err != nil {
return err
}
}
return nil
}
// deleteZipPack deletes the zip pack file br, but only if that zip
// file's parts are deleted already from the meta index.
func (s *storage) deleteZipPack(br blob.Ref) error {
inUse, err := s.zipPartsInUse(br)
if err != nil {
return err
}
if len(inUse) > 0 {
return fmt.Errorf("can't delete zip pack %v: %d parts in use: %v", br, len(inUse), inUse)
}
if err := s.large.RemoveBlobs([]blob.Ref{br}); err != nil {
return err
}
return s.meta.Delete("d:" + br.String())
}
func (s *storage) zipPartsInUse(br blob.Ref) ([]blob.Ref, error) {
var (
mu sync.Mutex
inUse []blob.Ref
)
var grp syncutil.Group
gate := syncutil.NewGate(20) // arbitrary constant
err := s.foreachZipBlob(br, func(bap BlobAndPos) error {
gate.Start()
grp.Go(func() error {
defer gate.Done()
mr, err := s.getMetaRow(bap.Ref)
if err != nil {
return err
}
if mr.largeRef.Valid() {
mu.Lock()
inUse = append(inUse, mr.largeRef)
mu.Unlock()
}
return nil
})
return nil
})
if os.IsNotExist(err) {
// An already-deleted blob from large isn't considered
// to be in-use.
return nil, nil
}
if err != nil {
return nil, err
}
if err := grp.Err(); err != nil {
return nil, err
}
return inUse, nil
}
// A BlobAndPos is a blobref, its size, and where it is located within
// a larger group of bytes.
type BlobAndPos struct {

View File

@ -26,6 +26,7 @@ import (
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"
@ -477,6 +478,92 @@ func TestStreamBlobs(t *testing.T) {
}
}
func TestForeachZipBlob(t *testing.T) {
const fileSize = 2 << 20
const fileName = "foo.dat"
fileContents := randBytes(fileSize)
ctx := context.New()
defer ctx.Cancel()
var pt *packTest
testPack(t,
func(sto blobserver.Storage) error {
_, err := schema.WriteFileFromReader(sto, fileName, bytes.NewReader(fileContents))
return err
},
wantNumLargeBlobs(1),
wantNumSmallBlobs(0),
func(v *packTest) {
pt = v
},
)
zipBlob, err := singleBlob(pt.large)
if err != nil {
t.Fatal(err)
}
zipBytes := slurpBlob(t, pt.large, zipBlob.Ref)
all := map[blob.Ref]blob.SizedRef{}
if err := blobserver.EnumerateAll(ctx, pt.logical, func(sb blob.SizedRef) error {
all[sb.Ref] = sb
return nil
}); err != nil {
t.Fatal(err)
}
foreachSaw := 0
if err := pt.sto.foreachZipBlob(zipBlob.Ref, func(bap BlobAndPos) error {
foreachSaw++
want, ok := all[bap.Ref]
if !ok {
t.Error("unwanted blob ref returned from foreachZipBlob: %v", bap.Ref)
return nil
}
delete(all, bap.Ref)
if want.Size != bap.Size {
t.Error("for %v, foreachZipBlob size = %d; want %d", bap.Ref, bap.Size, want.Size)
return nil
}
// Verify the offset.
h := bap.Ref.Hash()
h.Write(zipBytes[bap.Offset : bap.Offset+int64(bap.Size)])
if !bap.Ref.HashMatches(h) {
return fmt.Errorf("foreachZipBlob returned blob %v at offset %d that failed validation", bap.Ref, bap.Offset)
}
return nil
}); err != nil {
t.Fatal(err)
}
t.Logf("foreachZipBlob enumerated %d blobs", foreachSaw)
if len(all) > 0 {
t.Errorf("foreachZipBlob forgot to enumerate %d blobs: %v", len(all), all)
}
}
// singleBlob assumes that sto contains a single blob and returns it.
// If there are more or fewer than one blob, it's an error.
func singleBlob(sto blobserver.BlobEnumerator) (ret blob.SizedRef, err error) {
ctx := context.New()
defer ctx.Cancel()
n := 0
if err = blobserver.EnumerateAll(ctx, sto, func(sb blob.SizedRef) error {
ret = sb
n++
return nil
}); err != nil {
return blob.SizedRef{}, err
}
if n != 1 {
return blob.SizedRef{}, fmt.Errorf("saw %d blobs; want 1", n)
}
return
}
func TestRemoveBlobs(t *testing.T) {
ctx := context.New()
defer ctx.Cancel()
@ -509,6 +596,19 @@ func TestRemoveBlobs(t *testing.T) {
}); err != nil {
t.Fatal(err)
}
// Find the zip
zipBlob, err := singleBlob(sto.large)
if err != nil {
t.Fatalf("failed to find packed zip: %v", err)
}
// The zip file is in use, so verify we can't delete it.
if err := sto.deleteZipPack(zipBlob.Ref); err == nil {
t.Fatalf("zip pack blob deleted but it should not have been allowed")
}
// Delete everything
for len(all) > 0 {
del := all[0].Ref
all = all[1:]
@ -520,7 +620,40 @@ func TestRemoveBlobs(t *testing.T) {
}
}
// TODO: verify the metarow for "d:" is there.
// TODO: and then force a scan to delete zips, and verify 0 blobs everywhere
// TODO: and then verify the metarow for "d:" is gone.
dRows := func() (n int) {
if err := sorted.ForeachInRange(sto.meta, "d:", "", func(key, value string) error {
if strings.HasPrefix(key, "d:") {
n++
}
return nil
}); err != nil {
t.Fatal("meta iteration error: %v", err)
}
return
}
if n := dRows(); n == 0 {
t.Fatalf("expected a 'd:' row after deletes")
}
// TODO: test the background pack-deleter loop? figure out its design first.
if err := sto.deleteZipPack(zipBlob.Ref); err != nil {
t.Errorf("error deleting zip %v: %v", zipBlob.Ref, err)
}
if n := dRows(); n != 0 {
t.Errorf("expected the 'd:' row to be deleted")
}
}
func slurpBlob(t *testing.T, sto blob.Fetcher, br blob.Ref) []byte {
rc, _, err := sto.Fetch(br)
if err != nil {
t.Fatal(err)
}
defer rc.Close()
slurp, err := ioutil.ReadAll(rc)
if err != nil {
t.Fatal(err)
}
return slurp
}