diff --git a/pkg/blobserver/archiver/archiver.go b/pkg/blobserver/archiver/archiver.go new file mode 100644 index 000000000..d03423038 --- /dev/null +++ b/pkg/blobserver/archiver/archiver.go @@ -0,0 +1,180 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package archiver zips lots of little blobs into bigger zip files +// and stores them somewhere. While generic, it was designed to +// incrementally create Amazon Glacier archives from many little +// blobs, rather than creating millions of Glacier archives. +package archiver + +import ( + "archive/zip" + "bytes" + "errors" + "io" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" +) + +// DefaultMinZipSize is the default value of Archiver.MinZipSize. +const DefaultMinZipSize = 16 << 20 + +// An Archiver specifies the parameters of the job that copies from +// one blobserver Storage (the Source) to long-term storage. +type Archiver struct { + // Source is where the blobs should come from. + // (and be deleted from, if DeleteSourceAfterStore) + Source blobserver.Storage + + // MinZipSize is the minimum size of zip files to create. + // If zero, DefaultMinZipSize is used. + MinZipSize int64 + + // Store specifies a function that writes the zip file + // (encoded in the byte slice) to permanent storage + // (e.g. Amazon Glacier) and notes somewhere (a database) that + // it contains the listed blobs. The blobs are redundant with + // the filenames in the zip file, which will be named by + // their blobref string, with no extension. + Store func(zip []byte, blobs []blob.SizedRef) error + + // DeleteSourceAfterStore, if true, deletes the blobs from Source + // after Store returns success. + // This should pretty much always be set true, otherwise subsequent + // calls to Run/RunOnce will generate the same archives. Wrap + // the Source in a "namespace" storage if you don't actually + // want to delete things locally. + DeleteSourceAfterStore bool +} + +// ErrSourceTooSmall is returned by RunOnce if there aren't enough blobs on Source +// to warrant a new zip archive. +var ErrSourceTooSmall = errors.New("archiver: not enough blob data on source to warrant a new zip archive") + +func (a *Archiver) zipSize() int64 { + if a.MinZipSize > 0 { + return a.MinZipSize + } + return DefaultMinZipSize +} + +var errStopEnumerate = errors.New("sentinel return value") + +// RunOnce scans a.Source and conditionally creates a new zip. +// It returns ErrSourceTooSmall if there aren't enough blobs on Source. +func (a *Archiver) RunOnce() error { + if a.Source == nil { + return errors.New("archiver: nil Source") + } + if a.Store == nil { + return errors.New("archiver: nil Store func") + } + pz := &potentialZip{a: a} + err := blobserver.EnumerateAll(context.New(), a.Source, func(sb blob.SizedRef) error { + if err := pz.addBlob(sb); err != nil { + return err + } + if pz.bigEnough() { + return errStopEnumerate + } + return nil + }) + if err == errStopEnumerate { + err = nil + } + if err != nil { + return err + } + if err := pz.condClose(); err != nil { + return err + } + if !pz.bigEnough() { + return ErrSourceTooSmall + } + if err := a.Store(pz.buf.Bytes(), pz.blobs); err != nil { + return err + } + if a.DeleteSourceAfterStore { + blobs := make([]blob.Ref, 0, len(pz.blobs)) + for _, sb := range pz.blobs { + blobs = append(blobs, sb.Ref) + } + if err := a.Source.RemoveBlobs(blobs); err != nil { + return err + } + } + return nil +} + +type potentialZip struct { + a *Archiver + blobs []blob.SizedRef + zw *zip.Writer // nil until actually writing + buf bytes.Buffer // of the zip file + sumSize int64 // of uncompressed bytes of blobs + closed bool +} + +func (z *potentialZip) bigEnough() bool { + return int64(z.buf.Len()) > z.a.zipSize() +} + +func (z *potentialZip) condClose() error { + if z.closed || z.zw == nil { + return nil + } + z.closed = true + return z.zw.Close() +} + +func (z *potentialZip) addBlob(sb blob.SizedRef) error { + if z.bigEnough() { + return nil + } + z.sumSize += int64(sb.Size) + if z.zw == nil && z.sumSize > z.a.zipSize() { + z.zw = zip.NewWriter(&z.buf) + for _, sb := range z.blobs { + if err := z.writeZipBlob(sb); err != nil { + return err + } + } + } + z.blobs = append(z.blobs, sb) + if z.zw != nil { + return z.writeZipBlob(sb) + } + return nil +} + +func (z *potentialZip) writeZipBlob(sb blob.SizedRef) error { + w, err := z.zw.CreateHeader(&zip.FileHeader{ + Name: sb.Ref.String(), + Method: zip.Deflate, + }) + if err != nil { + return err + } + blobSrc, _, err := z.a.Source.FetchStreaming(sb.Ref) + if err != nil { + return err + } + defer blobSrc.Close() + _, err = io.Copy(w, blobSrc) + return err +} diff --git a/pkg/blobserver/archiver/archiver_test.go b/pkg/blobserver/archiver/archiver_test.go new file mode 100644 index 000000000..c769b2053 --- /dev/null +++ b/pkg/blobserver/archiver/archiver_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package archiver + +import ( + "archive/zip" + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "reflect" + "strings" + "testing" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/schema" + "camlistore.org/pkg/test" +) + +func TestArchiver(t *testing.T) { + src := new(test.Fetcher) + blobHello := &test.Blob{Contents: "Hello"} + blobWorld := &test.Blob{Contents: "World" + strings.Repeat("!", 1024)} + + golden := map[blob.Ref]string{ + blobHello.BlobRef(): blobHello.Contents, + blobWorld.BlobRef(): blobWorld.Contents, + } + + a := &Archiver{ + Source: src, + DeleteSourceAfterStore: true, + } + + src.AddBlob(blobHello) + a.Store = func([]byte, []blob.SizedRef) error { + return errors.New("Store shouldn't be called") + } + a.MinZipSize = 400 // empirically: the zip will be 416 bytes + if err := a.RunOnce(); err != ErrSourceTooSmall { + t.Fatalf("RunOnce with just Hello = %v; want ErrSourceTooSmall", err) + } + + src.AddBlob(blobWorld) + var zipData []byte + var inZip []blob.SizedRef + a.Store = func(zip []byte, brs []blob.SizedRef) error { + zipData = zip + inZip = brs + return nil + } + if err := a.RunOnce(); err != nil { + t.Fatalf("RunOnce with Hello and World = %v", err) + } + if zipData == nil { + t.Error("no zip data stored") + } + if len(src.BlobrefStrings()) != 0 { + t.Errorf("source still has blobs = %q; want none", src.BlobrefStrings) + } + if len(inZip) != 2 { + t.Errorf("expected 2 blobs reported as in zip to Store; got %v", inZip) + } + + got := map[blob.Ref]string{} + if err := foreachZipEntry(zipData, func(br blob.Ref, all []byte) { + got[br] = string(all) + }); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(golden, got) { + t.Errorf("zip contents didn't match. got: %v; want %v", got, golden) + } +} + +// Tests a bunch of rounds on a bunch of data. +func TestArchiverStress(t *testing.T) { + if testing.Short() { + t.Skip("Skipping in short mode") + } + src := new(test.Fetcher) + fileRef, err := schema.WriteFileFromReader(src, "random", io.LimitReader(randReader{}, 10<<20)) + if err != nil { + t.Fatal(err) + } + n0 := src.NumBlobs() + t.Logf("Wrote %v in %d blobs", fileRef, n0) + + refs0 := src.BlobrefStrings() + + var zips [][]byte + archived := map[blob.Ref]bool{} + a := &Archiver{ + Source: src, + MinZipSize: 1 << 20, + DeleteSourceAfterStore: true, + Store: func(zipd []byte, brs []blob.SizedRef) error { + zips = append(zips, zipd) + for _, sbr := range brs { + if archived[sbr.Ref] { + t.Error("duplicate archive of %v", sbr.Ref) + } + archived[sbr.Ref] = true + } + return nil + }, + } + for { + err := a.RunOnce() + if err == ErrSourceTooSmall { + break + } + if err != nil { + t.Fatal(err) + } + } + + if len(archived) == 0 { + t.Errorf("unexpected small number of archived blobs = %d", len(archived)) + } + if len(zips) < 2 { + t.Errorf("unexpected small number of zip files = %d", len(zips)) + } + if n1 := src.NumBlobs() + len(archived); n0 != n1 { + t.Errorf("original %d blobs != %d after + %d archived (%d)", n0, src.NumBlobs(), len(archived), n1) + } + + // And restore: + for _, zipd := range zips { + if err := foreachZipEntry(zipd, func(br blob.Ref, contents []byte) { + tb := &test.Blob{Contents: string(contents)} + if tb.BlobRef() != br { + t.Fatal("corrupt zip callback") + } + src.AddBlob(tb) + }); err != nil { + t.Fatal(err) + } + } + + refs1 := src.BlobrefStrings() + if !reflect.DeepEqual(refs0, refs1) { + t.Error("Restore error.") + } +} + +type randReader struct{} + +func (randReader) Read(p []byte) (n int, err error) { + for i := range p { + p[i] = byte(rand.Intn(256)) + } + return len(p), nil +} + +func foreachZipEntry(zipData []byte, fn func(blob.Ref, []byte)) error { + zipr, err := zip.NewReader(bytes.NewReader(zipData), int64(len(zipData))) + if err != nil { + return err + } + + for _, f := range zipr.File { + br, ok := blob.Parse(f.Name) + if !ok { + return fmt.Errorf("Bogus zip filename %q", f.Name) + } + rc, err := f.Open() + if err != nil { + return err + } + all, err := ioutil.ReadAll(rc) + rc.Close() + if err != nil { + return err + } + fn(br, all) + } + return nil +}