Add pkg/blobserver/archiver for making Glacier archives.

From the package docs:

  Package archiver zips lots of little blobs into bigger zip files
  and stores them somewhere. While generic, it was designed to
  incrementally create Amazon Glacier archives from many little
  blobs, rather than creating millions of Glacier archives.

Change-Id: If304b2d4bf144bfab073c61c148bb34fa0be2f2d
This commit is contained in:
Brad Fitzpatrick 2014-02-07 14:32:36 -08:00
parent c137cfc32d
commit 1b1087a830
2 changed files with 375 additions and 0 deletions

View File

@ -0,0 +1,180 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package archiver zips lots of little blobs into bigger zip files
// and stores them somewhere. While generic, it was designed to
// incrementally create Amazon Glacier archives from many little
// blobs, rather than creating millions of Glacier archives.
package archiver
import (
"archive/zip"
"bytes"
"errors"
"io"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
)
// DefaultMinZipSize is the default value of Archiver.MinZipSize.
const DefaultMinZipSize = 16 << 20
// An Archiver specifies the parameters of the job that copies from
// one blobserver Storage (the Source) to long-term storage.
type Archiver struct {
// Source is where the blobs should come from.
// (and be deleted from, if DeleteSourceAfterStore)
Source blobserver.Storage
// MinZipSize is the minimum size of zip files to create.
// If zero, DefaultMinZipSize is used.
MinZipSize int64
// Store specifies a function that writes the zip file
// (encoded in the byte slice) to permanent storage
// (e.g. Amazon Glacier) and notes somewhere (a database) that
// it contains the listed blobs. The blobs are redundant with
// the filenames in the zip file, which will be named by
// their blobref string, with no extension.
Store func(zip []byte, blobs []blob.SizedRef) error
// DeleteSourceAfterStore, if true, deletes the blobs from Source
// after Store returns success.
// This should pretty much always be set true, otherwise subsequent
// calls to Run/RunOnce will generate the same archives. Wrap
// the Source in a "namespace" storage if you don't actually
// want to delete things locally.
DeleteSourceAfterStore bool
}
// ErrSourceTooSmall is returned by RunOnce if there aren't enough blobs on Source
// to warrant a new zip archive.
var ErrSourceTooSmall = errors.New("archiver: not enough blob data on source to warrant a new zip archive")
func (a *Archiver) zipSize() int64 {
if a.MinZipSize > 0 {
return a.MinZipSize
}
return DefaultMinZipSize
}
var errStopEnumerate = errors.New("sentinel return value")
// RunOnce scans a.Source and conditionally creates a new zip.
// It returns ErrSourceTooSmall if there aren't enough blobs on Source.
func (a *Archiver) RunOnce() error {
if a.Source == nil {
return errors.New("archiver: nil Source")
}
if a.Store == nil {
return errors.New("archiver: nil Store func")
}
pz := &potentialZip{a: a}
err := blobserver.EnumerateAll(context.New(), a.Source, func(sb blob.SizedRef) error {
if err := pz.addBlob(sb); err != nil {
return err
}
if pz.bigEnough() {
return errStopEnumerate
}
return nil
})
if err == errStopEnumerate {
err = nil
}
if err != nil {
return err
}
if err := pz.condClose(); err != nil {
return err
}
if !pz.bigEnough() {
return ErrSourceTooSmall
}
if err := a.Store(pz.buf.Bytes(), pz.blobs); err != nil {
return err
}
if a.DeleteSourceAfterStore {
blobs := make([]blob.Ref, 0, len(pz.blobs))
for _, sb := range pz.blobs {
blobs = append(blobs, sb.Ref)
}
if err := a.Source.RemoveBlobs(blobs); err != nil {
return err
}
}
return nil
}
type potentialZip struct {
a *Archiver
blobs []blob.SizedRef
zw *zip.Writer // nil until actually writing
buf bytes.Buffer // of the zip file
sumSize int64 // of uncompressed bytes of blobs
closed bool
}
func (z *potentialZip) bigEnough() bool {
return int64(z.buf.Len()) > z.a.zipSize()
}
func (z *potentialZip) condClose() error {
if z.closed || z.zw == nil {
return nil
}
z.closed = true
return z.zw.Close()
}
func (z *potentialZip) addBlob(sb blob.SizedRef) error {
if z.bigEnough() {
return nil
}
z.sumSize += int64(sb.Size)
if z.zw == nil && z.sumSize > z.a.zipSize() {
z.zw = zip.NewWriter(&z.buf)
for _, sb := range z.blobs {
if err := z.writeZipBlob(sb); err != nil {
return err
}
}
}
z.blobs = append(z.blobs, sb)
if z.zw != nil {
return z.writeZipBlob(sb)
}
return nil
}
func (z *potentialZip) writeZipBlob(sb blob.SizedRef) error {
w, err := z.zw.CreateHeader(&zip.FileHeader{
Name: sb.Ref.String(),
Method: zip.Deflate,
})
if err != nil {
return err
}
blobSrc, _, err := z.a.Source.FetchStreaming(sb.Ref)
if err != nil {
return err
}
defer blobSrc.Close()
_, err = io.Copy(w, blobSrc)
return err
}

View File

@ -0,0 +1,195 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archiver
import (
"archive/zip"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"reflect"
"strings"
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/schema"
"camlistore.org/pkg/test"
)
func TestArchiver(t *testing.T) {
src := new(test.Fetcher)
blobHello := &test.Blob{Contents: "Hello"}
blobWorld := &test.Blob{Contents: "World" + strings.Repeat("!", 1024)}
golden := map[blob.Ref]string{
blobHello.BlobRef(): blobHello.Contents,
blobWorld.BlobRef(): blobWorld.Contents,
}
a := &Archiver{
Source: src,
DeleteSourceAfterStore: true,
}
src.AddBlob(blobHello)
a.Store = func([]byte, []blob.SizedRef) error {
return errors.New("Store shouldn't be called")
}
a.MinZipSize = 400 // empirically: the zip will be 416 bytes
if err := a.RunOnce(); err != ErrSourceTooSmall {
t.Fatalf("RunOnce with just Hello = %v; want ErrSourceTooSmall", err)
}
src.AddBlob(blobWorld)
var zipData []byte
var inZip []blob.SizedRef
a.Store = func(zip []byte, brs []blob.SizedRef) error {
zipData = zip
inZip = brs
return nil
}
if err := a.RunOnce(); err != nil {
t.Fatalf("RunOnce with Hello and World = %v", err)
}
if zipData == nil {
t.Error("no zip data stored")
}
if len(src.BlobrefStrings()) != 0 {
t.Errorf("source still has blobs = %q; want none", src.BlobrefStrings)
}
if len(inZip) != 2 {
t.Errorf("expected 2 blobs reported as in zip to Store; got %v", inZip)
}
got := map[blob.Ref]string{}
if err := foreachZipEntry(zipData, func(br blob.Ref, all []byte) {
got[br] = string(all)
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(golden, got) {
t.Errorf("zip contents didn't match. got: %v; want %v", got, golden)
}
}
// Tests a bunch of rounds on a bunch of data.
func TestArchiverStress(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
src := new(test.Fetcher)
fileRef, err := schema.WriteFileFromReader(src, "random", io.LimitReader(randReader{}, 10<<20))
if err != nil {
t.Fatal(err)
}
n0 := src.NumBlobs()
t.Logf("Wrote %v in %d blobs", fileRef, n0)
refs0 := src.BlobrefStrings()
var zips [][]byte
archived := map[blob.Ref]bool{}
a := &Archiver{
Source: src,
MinZipSize: 1 << 20,
DeleteSourceAfterStore: true,
Store: func(zipd []byte, brs []blob.SizedRef) error {
zips = append(zips, zipd)
for _, sbr := range brs {
if archived[sbr.Ref] {
t.Error("duplicate archive of %v", sbr.Ref)
}
archived[sbr.Ref] = true
}
return nil
},
}
for {
err := a.RunOnce()
if err == ErrSourceTooSmall {
break
}
if err != nil {
t.Fatal(err)
}
}
if len(archived) == 0 {
t.Errorf("unexpected small number of archived blobs = %d", len(archived))
}
if len(zips) < 2 {
t.Errorf("unexpected small number of zip files = %d", len(zips))
}
if n1 := src.NumBlobs() + len(archived); n0 != n1 {
t.Errorf("original %d blobs != %d after + %d archived (%d)", n0, src.NumBlobs(), len(archived), n1)
}
// And restore:
for _, zipd := range zips {
if err := foreachZipEntry(zipd, func(br blob.Ref, contents []byte) {
tb := &test.Blob{Contents: string(contents)}
if tb.BlobRef() != br {
t.Fatal("corrupt zip callback")
}
src.AddBlob(tb)
}); err != nil {
t.Fatal(err)
}
}
refs1 := src.BlobrefStrings()
if !reflect.DeepEqual(refs0, refs1) {
t.Error("Restore error.")
}
}
type randReader struct{}
func (randReader) Read(p []byte) (n int, err error) {
for i := range p {
p[i] = byte(rand.Intn(256))
}
return len(p), nil
}
func foreachZipEntry(zipData []byte, fn func(blob.Ref, []byte)) error {
zipr, err := zip.NewReader(bytes.NewReader(zipData), int64(len(zipData)))
if err != nil {
return err
}
for _, f := range zipr.File {
br, ok := blob.Parse(f.Name)
if !ok {
return fmt.Errorf("Bogus zip filename %q", f.Name)
}
rc, err := f.Open()
if err != nil {
return err
}
all, err := ioutil.ReadAll(rc)
rc.Close()
if err != nil {
return err
}
fn(br, all)
}
return nil
}