diff --git a/pkg/blobserver/overlay/overlay.go b/pkg/blobserver/overlay/overlay.go index ee3234ddc..99bd014bf 100644 --- a/pkg/blobserver/overlay/overlay.go +++ b/pkg/blobserver/overlay/overlay.go @@ -16,16 +16,20 @@ limitations under the License. /* Package overlay registers the "overlay" blobserver storage type -that acts as a staging area which is read-write layer on top of a -base storage. The base storage is never changed. +that presents storage that is the result of overlaying a +storage ("upper") on top of another storage ("lower"). +All changes go to the upper storage. The lower storage is never changed. + +The optional "deleted" KeyValue store may be provided to keep track of +deleted blobs. When "deleted" is missing, deletion returns an error. Example usage: "/bs/": { "handler": "storage-overlay", "handlerArgs": { - "base": "/sto-base/", - "overlay": "/bs-local-changes/", + "lower": "/sto-base/", + "upper": "/bs-local-changes/", "deleted": { "file": "/volume1/camlistore/home/var/camlistore/blobs/deleted.leveldb", "type": "leveldb" @@ -61,41 +65,44 @@ type readOnlyStorage interface { } type overlayStorage struct { - base readOnlyStorage + lower readOnlyStorage - // deleted keeps refs deleted from base + // deleted stores refs deleted from lower deleted sorted.KeyValue // read-write storage for changes - stage blobserver.Storage + upper blobserver.Storage } func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) { var ( - sourcePrefix = conf.RequiredString("base") - stagePrefix = conf.RequiredString("stage") - deletedConf = conf.RequiredObject("deleted") + lowerPrefix = conf.RequiredString("lower") + upperPrefix = conf.RequiredString("upper") + deletedConf = conf.OptionalObject("deleted") ) if err := conf.Validate(); err != nil { return nil, err } - base, err := ld.GetStorage(sourcePrefix) + lower, err := ld.GetStorage(lowerPrefix) if err != nil { - return nil, fmt.Errorf("failed to load base at %s: %v", sourcePrefix, err) + return nil, fmt.Errorf("failed to load lower at %s: %v", lowerPrefix, err) } - stage, err := ld.GetStorage(stagePrefix) + upper, err := ld.GetStorage(upperPrefix) if err != nil { - return nil, fmt.Errorf("failed to load stage at %s: %v", stagePrefix, err) + return nil, fmt.Errorf("failed to load upper at %s: %v", upperPrefix, err) } - deleted, err := sorted.NewKeyValueMaybeWipe(deletedConf) - if err != nil { - return nil, fmt.Errorf("failed to setup deleted: %v", err) + var deleted sorted.KeyValue + if len(deletedConf) != 0 { + deleted, err = sorted.NewKeyValueMaybeWipe(deletedConf) + if err != nil { + return nil, fmt.Errorf("failed to setup deleted: %v", err) + } } sto := &overlayStorage{ - base: base, - stage: stage, + lower: lower, + upper: upper, deleted: deleted, } @@ -103,21 +110,28 @@ func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storag } func (sto *overlayStorage) Close() error { + if sto.deleted == nil { + return nil + } return sto.deleted.Close() } -// ReceiveBlob stores received blobs on the stage layer. +// ReceiveBlob stores received blobs on the upper layer. func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) { - sb, err = sto.stage.ReceiveBlob(ctx, br, src) - if err == nil { + sb, err = sto.upper.ReceiveBlob(ctx, br, src) + if err == nil && sto.deleted != nil { err = sto.deleted.Delete(br.String()) } return sb, err } -// RemoveBlobs marks the given blobs as deleted, and removes them if they are in the stage layer. +// RemoveBlobs marks the given blobs as deleted, and removes them if they are in the upper layer. func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { - err := sto.stage.RemoveBlobs(ctx, blobs) + if sto.deleted == nil { + return blobserver.ErrNotImplemented + } + + err := sto.upper.RemoveBlobs(ctx, blobs) if err != nil { return err } @@ -130,6 +144,10 @@ func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) er } func (sto *overlayStorage) isDeleted(br blob.Ref) bool { + if sto.deleted == nil { + return false + } + _, err := sto.deleted.Get(br.String()) if err == nil { return true @@ -142,19 +160,19 @@ func (sto *overlayStorage) isDeleted(br blob.Ref) bool { return false } -// Fetch the blob by trying first the stage and then base. -// The base storage is checked only if the blob was not deleleted in sto itself. +// Fetch the blob by trying first the upper and then lower. +// The lower storage is checked only if the blob was not deleleted in sto itself. func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) { if sto.isDeleted(br) { return nil, 0, os.ErrNotExist } - file, size, err = sto.stage.Fetch(ctx, br) + file, size, err = sto.upper.Fetch(ctx, br) if err != os.ErrNotExist { return file, size, err } - return sto.base.Fetch(ctx, br) + return sto.lower.Fetch(ctx, br) } // StatBlobs on all BlobStatter reads sequentially, returning the first error. @@ -168,7 +186,7 @@ func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f fu seen := make(map[blob.Ref]struct{}, len(exists)) - err := sto.stage.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error { + err := sto.upper.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error { seen[sbr.Ref] = struct{}{} return f(sbr) }) @@ -177,21 +195,21 @@ func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f fu return err } - baseBlobs := exists[:0] + lowerBlobs := make([]blob.Ref, 0, len(exists)) for _, br := range exists { if _, s := seen[br]; !s { - baseBlobs = append(baseBlobs, br) + lowerBlobs = append(lowerBlobs, br) } } - return sto.base.StatBlobs(ctx, baseBlobs, f) + return sto.lower.StatBlobs(ctx, lowerBlobs, f) } -// EnumerateBlobs enumerates blobs of the base and stage layers. +// EnumerateBlobs enumerates blobs of the lower and upper layers. func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) - enums := []blobserver.BlobEnumerator{sto.base, sto.stage} + enums := []blobserver.BlobEnumerator{sto.lower, sto.upper} // Ensure that we send limit blobs if possible. sent := 0 @@ -233,16 +251,16 @@ func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob. } func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) { - if gener, ok := sto.stage.(blobserver.Generationer); ok { + if gener, ok := sto.upper.(blobserver.Generationer); ok { return gener.StorageGeneration() } - err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.stage)) + err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper)) return } func (sto *overlayStorage) ResetStorageGeneration() error { - if gener, ok := sto.stage.(blobserver.Generationer); ok { + if gener, ok := sto.upper.(blobserver.Generationer); ok { return gener.ResetStorageGeneration() } - return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.stage)) + return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper)) } diff --git a/pkg/blobserver/overlay/overlay_test.go b/pkg/blobserver/overlay/overlay_test.go index 141758a7e..98aaae3b3 100644 --- a/pkg/blobserver/overlay/overlay_test.go +++ b/pkg/blobserver/overlay/overlay_test.go @@ -30,52 +30,58 @@ import ( var ctxbg = context.Background() -func newStage(t *testing.T) blobserver.Storage { - stage, _ := newStageWithBase(t) - return stage +func newOverlay(t *testing.T, withDel bool) blobserver.Storage { + overlay, _ := newOverlayWithLower(t, withDel) + return overlay } -func newStageWithBase(t *testing.T) (stage, base blobserver.Storage) { +func newOverlayWithLower(t *testing.T, withDel bool) (sto, lower blobserver.Storage) { ld := test.NewLoader() - base, _ = ld.GetStorage("/good-base/") - ld.GetStorage("/good-stage/") - sto, err := newFromConfig(ld, map[string]interface{}{ - "base": "/good-base/", - "stage": "/good-stage/", - "deleted": map[string]interface{}{ + lower, _ = ld.GetStorage("/good-lower/") + ld.GetStorage("/good-upper/") + conf := map[string]interface{}{ + "lower": "/good-lower/", + "upper": "/good-upper/", + } + if withDel { + conf["deleted"] = map[string]interface{}{ "type": "memory", - }, - }) + } + } + sto, err := newFromConfig(ld, conf) if err != nil { t.Fatalf("Invalid config: %v", err) } - return sto, base + return sto, lower } func TestStorageTest(t *testing.T) { storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) { - return newStage(t), func() {} + return newOverlay(t, true), func() {} + }) + storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) { + return newOverlay(t, false), func() {} }) } func TestDelete(t *testing.T) { ctx := context.Background() - stage, base := newStageWithBase(t) + sto, lower := newOverlayWithLower(t, true) var ( - // blobs that go into base - S0 = &test.Blob{Contents: "source blob 0"} - S1 = &test.Blob{Contents: "source blob 1"} + // blobs that go into lower + S0 = &test.Blob{Contents: "lower blob 0"} + S1 = &test.Blob{Contents: "lower blob 1"} - // blobs that go into stage + // blobs that go into sto A = &test.Blob{Contents: "some small blob"} B = &test.Blob{Contents: strings.Repeat("some middle blob", 100)} C = &test.Blob{Contents: strings.Repeat("A 8192 bytes length largish blob", 8192/32)} ) - // add S0 and S1 to the underlying source + // add S0 and S1 to lower for _, tb := range []*test.Blob{S0, S1} { - sb, err := base.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader()) + sb, err := lower.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader()) if err != nil { t.Fatalf("ReceiveBlob of %s: %v", sb, err) } @@ -84,16 +90,16 @@ func TestDelete(t *testing.T) { } } - baseRefs := []blob.SizedRef{ + lowerRefs := []blob.SizedRef{ S0.SizedRef(), S1.SizedRef(), } type step func() error - stepAdd := func(tb *test.Blob) step { // add the blob to stage + stepAdd := func(tb *test.Blob) step { // add the blob to sto return func() error { - sb, err := stage.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader()) + sb, err := sto.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader()) if err != nil { return fmt.Errorf("ReceiveBlob of %s: %v", sb, err) } @@ -110,17 +116,17 @@ func TestDelete(t *testing.T) { wantRefs[i] = tb.SizedRef() } return func() error { - // ensure base was not modified - if err := storagetest.CheckEnumerate(base, baseRefs); err != nil { + // ensure lower was not modified + if err := storagetest.CheckEnumerate(lower, lowerRefs); err != nil { return err } - return storagetest.CheckEnumerate(stage, wantRefs) + return storagetest.CheckEnumerate(sto, wantRefs) } } - stepDelete := func(tb *test.Blob) step { // delete the blob in stage + stepDelete := func(tb *test.Blob) step { // delete the blob in sto return func() error { - if err := stage.RemoveBlobs(ctx, []blob.Ref{tb.BlobRef()}); err != nil { + if err := sto.RemoveBlobs(ctx, []blob.Ref{tb.BlobRef()}); err != nil { return fmt.Errorf("RemoveBlob(%s): %v", tb.BlobRef(), err) } return nil