From 1b1087a830c719655fb1ba72f9c6271661d33927 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 7 Feb 2014 14:32:36 -0800 Subject: [PATCH] Add pkg/blobserver/archiver for making Glacier archives. From the package docs: Package archiver zips lots of little blobs into bigger zip files and stores them somewhere. While generic, it was designed to incrementally create Amazon Glacier archives from many little blobs, rather than creating millions of Glacier archives. Change-Id: If304b2d4bf144bfab073c61c148bb34fa0be2f2d --- pkg/blobserver/archiver/archiver.go | 180 +++++++++++++++++++++ pkg/blobserver/archiver/archiver_test.go | 195 +++++++++++++++++++++++ 2 files changed, 375 insertions(+) create mode 100644 pkg/blobserver/archiver/archiver.go create mode 100644 pkg/blobserver/archiver/archiver_test.go diff --git a/pkg/blobserver/archiver/archiver.go b/pkg/blobserver/archiver/archiver.go new file mode 100644 index 000000000..d03423038 --- /dev/null +++ b/pkg/blobserver/archiver/archiver.go @@ -0,0 +1,180 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package archiver zips lots of little blobs into bigger zip files +// and stores them somewhere. While generic, it was designed to +// incrementally create Amazon Glacier archives from many little +// blobs, rather than creating millions of Glacier archives. +package archiver + +import ( + "archive/zip" + "bytes" + "errors" + "io" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" +) + +// DefaultMinZipSize is the default value of Archiver.MinZipSize. +const DefaultMinZipSize = 16 << 20 + +// An Archiver specifies the parameters of the job that copies from +// one blobserver Storage (the Source) to long-term storage. +type Archiver struct { + // Source is where the blobs should come from. + // (and be deleted from, if DeleteSourceAfterStore) + Source blobserver.Storage + + // MinZipSize is the minimum size of zip files to create. + // If zero, DefaultMinZipSize is used. + MinZipSize int64 + + // Store specifies a function that writes the zip file + // (encoded in the byte slice) to permanent storage + // (e.g. Amazon Glacier) and notes somewhere (a database) that + // it contains the listed blobs. The blobs are redundant with + // the filenames in the zip file, which will be named by + // their blobref string, with no extension. + Store func(zip []byte, blobs []blob.SizedRef) error + + // DeleteSourceAfterStore, if true, deletes the blobs from Source + // after Store returns success. + // This should pretty much always be set true, otherwise subsequent + // calls to Run/RunOnce will generate the same archives. Wrap + // the Source in a "namespace" storage if you don't actually + // want to delete things locally. + DeleteSourceAfterStore bool +} + +// ErrSourceTooSmall is returned by RunOnce if there aren't enough blobs on Source +// to warrant a new zip archive. +var ErrSourceTooSmall = errors.New("archiver: not enough blob data on source to warrant a new zip archive") + +func (a *Archiver) zipSize() int64 { + if a.MinZipSize > 0 { + return a.MinZipSize + } + return DefaultMinZipSize +} + +var errStopEnumerate = errors.New("sentinel return value") + +// RunOnce scans a.Source and conditionally creates a new zip. +// It returns ErrSourceTooSmall if there aren't enough blobs on Source. +func (a *Archiver) RunOnce() error { + if a.Source == nil { + return errors.New("archiver: nil Source") + } + if a.Store == nil { + return errors.New("archiver: nil Store func") + } + pz := &potentialZip{a: a} + err := blobserver.EnumerateAll(context.New(), a.Source, func(sb blob.SizedRef) error { + if err := pz.addBlob(sb); err != nil { + return err + } + if pz.bigEnough() { + return errStopEnumerate + } + return nil + }) + if err == errStopEnumerate { + err = nil + } + if err != nil { + return err + } + if err := pz.condClose(); err != nil { + return err + } + if !pz.bigEnough() { + return ErrSourceTooSmall + } + if err := a.Store(pz.buf.Bytes(), pz.blobs); err != nil { + return err + } + if a.DeleteSourceAfterStore { + blobs := make([]blob.Ref, 0, len(pz.blobs)) + for _, sb := range pz.blobs { + blobs = append(blobs, sb.Ref) + } + if err := a.Source.RemoveBlobs(blobs); err != nil { + return err + } + } + return nil +} + +type potentialZip struct { + a *Archiver + blobs []blob.SizedRef + zw *zip.Writer // nil until actually writing + buf bytes.Buffer // of the zip file + sumSize int64 // of uncompressed bytes of blobs + closed bool +} + +func (z *potentialZip) bigEnough() bool { + return int64(z.buf.Len()) > z.a.zipSize() +} + +func (z *potentialZip) condClose() error { + if z.closed || z.zw == nil { + return nil + } + z.closed = true + return z.zw.Close() +} + +func (z *potentialZip) addBlob(sb blob.SizedRef) error { + if z.bigEnough() { + return nil + } + z.sumSize += int64(sb.Size) + if z.zw == nil && z.sumSize > z.a.zipSize() { + z.zw = zip.NewWriter(&z.buf) + for _, sb := range z.blobs { + if err := z.writeZipBlob(sb); err != nil { + return err + } + } + } + z.blobs = append(z.blobs, sb) + if z.zw != nil { + return z.writeZipBlob(sb) + } + return nil +} + +func (z *potentialZip) writeZipBlob(sb blob.SizedRef) error { + w, err := z.zw.CreateHeader(&zip.FileHeader{ + Name: sb.Ref.String(), + Method: zip.Deflate, + }) + if err != nil { + return err + } + blobSrc, _, err := z.a.Source.FetchStreaming(sb.Ref) + if err != nil { + return err + } + defer blobSrc.Close() + _, err = io.Copy(w, blobSrc) + return err +} diff --git a/pkg/blobserver/archiver/archiver_test.go b/pkg/blobserver/archiver/archiver_test.go new file mode 100644 index 000000000..c769b2053 --- /dev/null +++ b/pkg/blobserver/archiver/archiver_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package archiver + +import ( + "archive/zip" + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "reflect" + "strings" + "testing" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/schema" + "camlistore.org/pkg/test" +) + +func TestArchiver(t *testing.T) { + src := new(test.Fetcher) + blobHello := &test.Blob{Contents: "Hello"} + blobWorld := &test.Blob{Contents: "World" + strings.Repeat("!", 1024)} + + golden := map[blob.Ref]string{ + blobHello.BlobRef(): blobHello.Contents, + blobWorld.BlobRef(): blobWorld.Contents, + } + + a := &Archiver{ + Source: src, + DeleteSourceAfterStore: true, + } + + src.AddBlob(blobHello) + a.Store = func([]byte, []blob.SizedRef) error { + return errors.New("Store shouldn't be called") + } + a.MinZipSize = 400 // empirically: the zip will be 416 bytes + if err := a.RunOnce(); err != ErrSourceTooSmall { + t.Fatalf("RunOnce with just Hello = %v; want ErrSourceTooSmall", err) + } + + src.AddBlob(blobWorld) + var zipData []byte + var inZip []blob.SizedRef + a.Store = func(zip []byte, brs []blob.SizedRef) error { + zipData = zip + inZip = brs + return nil + } + if err := a.RunOnce(); err != nil { + t.Fatalf("RunOnce with Hello and World = %v", err) + } + if zipData == nil { + t.Error("no zip data stored") + } + if len(src.BlobrefStrings()) != 0 { + t.Errorf("source still has blobs = %q; want none", src.BlobrefStrings) + } + if len(inZip) != 2 { + t.Errorf("expected 2 blobs reported as in zip to Store; got %v", inZip) + } + + got := map[blob.Ref]string{} + if err := foreachZipEntry(zipData, func(br blob.Ref, all []byte) { + got[br] = string(all) + }); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(golden, got) { + t.Errorf("zip contents didn't match. got: %v; want %v", got, golden) + } +} + +// Tests a bunch of rounds on a bunch of data. +func TestArchiverStress(t *testing.T) { + if testing.Short() { + t.Skip("Skipping in short mode") + } + src := new(test.Fetcher) + fileRef, err := schema.WriteFileFromReader(src, "random", io.LimitReader(randReader{}, 10<<20)) + if err != nil { + t.Fatal(err) + } + n0 := src.NumBlobs() + t.Logf("Wrote %v in %d blobs", fileRef, n0) + + refs0 := src.BlobrefStrings() + + var zips [][]byte + archived := map[blob.Ref]bool{} + a := &Archiver{ + Source: src, + MinZipSize: 1 << 20, + DeleteSourceAfterStore: true, + Store: func(zipd []byte, brs []blob.SizedRef) error { + zips = append(zips, zipd) + for _, sbr := range brs { + if archived[sbr.Ref] { + t.Error("duplicate archive of %v", sbr.Ref) + } + archived[sbr.Ref] = true + } + return nil + }, + } + for { + err := a.RunOnce() + if err == ErrSourceTooSmall { + break + } + if err != nil { + t.Fatal(err) + } + } + + if len(archived) == 0 { + t.Errorf("unexpected small number of archived blobs = %d", len(archived)) + } + if len(zips) < 2 { + t.Errorf("unexpected small number of zip files = %d", len(zips)) + } + if n1 := src.NumBlobs() + len(archived); n0 != n1 { + t.Errorf("original %d blobs != %d after + %d archived (%d)", n0, src.NumBlobs(), len(archived), n1) + } + + // And restore: + for _, zipd := range zips { + if err := foreachZipEntry(zipd, func(br blob.Ref, contents []byte) { + tb := &test.Blob{Contents: string(contents)} + if tb.BlobRef() != br { + t.Fatal("corrupt zip callback") + } + src.AddBlob(tb) + }); err != nil { + t.Fatal(err) + } + } + + refs1 := src.BlobrefStrings() + if !reflect.DeepEqual(refs0, refs1) { + t.Error("Restore error.") + } +} + +type randReader struct{} + +func (randReader) Read(p []byte) (n int, err error) { + for i := range p { + p[i] = byte(rand.Intn(256)) + } + return len(p), nil +} + +func foreachZipEntry(zipData []byte, fn func(blob.Ref, []byte)) error { + zipr, err := zip.NewReader(bytes.NewReader(zipData), int64(len(zipData))) + if err != nil { + return err + } + + for _, f := range zipr.File { + br, ok := blob.Parse(f.Name) + if !ok { + return fmt.Errorf("Bogus zip filename %q", f.Name) + } + rc, err := f.Open() + if err != nil { + return err + } + all, err := ioutil.ReadAll(rc) + rc.Close() + if err != nil { + return err + } + fn(br, all) + } + return nil +}