pkg/blobserver/overlay: use better names for underlying storage

Use the names "lower" and "upper" from OverlayFS
instead of "base" and "stage".

Also make the deleted keyvalue (and thus support for deletion) optional,
and improve readability for the StatBlobs method.

Change-Id: Ic3f36609bf4599251f9ba7c648f513b788550298
This commit is contained in:
Attila Tajti 2018-02-23 08:32:44 +01:00
parent 2681557121
commit b6baba23b0
2 changed files with 91 additions and 67 deletions

View File

@ -16,16 +16,20 @@ limitations under the License.
/* /*
Package overlay registers the "overlay" blobserver storage type Package overlay registers the "overlay" blobserver storage type
that acts as a staging area which is read-write layer on top of a that presents storage that is the result of overlaying a
base storage. The base storage is never changed. storage ("upper") on top of another storage ("lower").
All changes go to the upper storage. The lower storage is never changed.
The optional "deleted" KeyValue store may be provided to keep track of
deleted blobs. When "deleted" is missing, deletion returns an error.
Example usage: Example usage:
"/bs/": { "/bs/": {
"handler": "storage-overlay", "handler": "storage-overlay",
"handlerArgs": { "handlerArgs": {
"base": "/sto-base/", "lower": "/sto-base/",
"overlay": "/bs-local-changes/", "upper": "/bs-local-changes/",
"deleted": { "deleted": {
"file": "/volume1/camlistore/home/var/camlistore/blobs/deleted.leveldb", "file": "/volume1/camlistore/home/var/camlistore/blobs/deleted.leveldb",
"type": "leveldb" "type": "leveldb"
@ -61,41 +65,44 @@ type readOnlyStorage interface {
} }
type overlayStorage struct { type overlayStorage struct {
base readOnlyStorage lower readOnlyStorage
// deleted keeps refs deleted from base // deleted stores refs deleted from lower
deleted sorted.KeyValue deleted sorted.KeyValue
// read-write storage for changes // read-write storage for changes
stage blobserver.Storage upper blobserver.Storage
} }
func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) { func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
var ( var (
sourcePrefix = conf.RequiredString("base") lowerPrefix = conf.RequiredString("lower")
stagePrefix = conf.RequiredString("stage") upperPrefix = conf.RequiredString("upper")
deletedConf = conf.RequiredObject("deleted") deletedConf = conf.OptionalObject("deleted")
) )
if err := conf.Validate(); err != nil { if err := conf.Validate(); err != nil {
return nil, err return nil, err
} }
base, err := ld.GetStorage(sourcePrefix) lower, err := ld.GetStorage(lowerPrefix)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load base at %s: %v", sourcePrefix, err) return nil, fmt.Errorf("failed to load lower at %s: %v", lowerPrefix, err)
} }
stage, err := ld.GetStorage(stagePrefix) upper, err := ld.GetStorage(upperPrefix)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load stage at %s: %v", stagePrefix, err) return nil, fmt.Errorf("failed to load upper at %s: %v", upperPrefix, err)
} }
deleted, err := sorted.NewKeyValueMaybeWipe(deletedConf) var deleted sorted.KeyValue
if err != nil { if len(deletedConf) != 0 {
return nil, fmt.Errorf("failed to setup deleted: %v", err) deleted, err = sorted.NewKeyValueMaybeWipe(deletedConf)
if err != nil {
return nil, fmt.Errorf("failed to setup deleted: %v", err)
}
} }
sto := &overlayStorage{ sto := &overlayStorage{
base: base, lower: lower,
stage: stage, upper: upper,
deleted: deleted, deleted: deleted,
} }
@ -103,21 +110,28 @@ func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storag
} }
func (sto *overlayStorage) Close() error { func (sto *overlayStorage) Close() error {
if sto.deleted == nil {
return nil
}
return sto.deleted.Close() return sto.deleted.Close()
} }
// ReceiveBlob stores received blobs on the stage layer. // ReceiveBlob stores received blobs on the upper layer.
func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) { func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
sb, err = sto.stage.ReceiveBlob(ctx, br, src) sb, err = sto.upper.ReceiveBlob(ctx, br, src)
if err == nil { if err == nil && sto.deleted != nil {
err = sto.deleted.Delete(br.String()) err = sto.deleted.Delete(br.String())
} }
return sb, err return sb, err
} }
// RemoveBlobs marks the given blobs as deleted, and removes them if they are in the stage layer. // RemoveBlobs marks the given blobs as deleted, and removes them if they are in the upper layer.
func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
err := sto.stage.RemoveBlobs(ctx, blobs) if sto.deleted == nil {
return blobserver.ErrNotImplemented
}
err := sto.upper.RemoveBlobs(ctx, blobs)
if err != nil { if err != nil {
return err return err
} }
@ -130,6 +144,10 @@ func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) er
} }
func (sto *overlayStorage) isDeleted(br blob.Ref) bool { func (sto *overlayStorage) isDeleted(br blob.Ref) bool {
if sto.deleted == nil {
return false
}
_, err := sto.deleted.Get(br.String()) _, err := sto.deleted.Get(br.String())
if err == nil { if err == nil {
return true return true
@ -142,19 +160,19 @@ func (sto *overlayStorage) isDeleted(br blob.Ref) bool {
return false return false
} }
// Fetch the blob by trying first the stage and then base. // Fetch the blob by trying first the upper and then lower.
// The base storage is checked only if the blob was not deleleted in sto itself. // The lower storage is checked only if the blob was not deleleted in sto itself.
func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) { func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) {
if sto.isDeleted(br) { if sto.isDeleted(br) {
return nil, 0, os.ErrNotExist return nil, 0, os.ErrNotExist
} }
file, size, err = sto.stage.Fetch(ctx, br) file, size, err = sto.upper.Fetch(ctx, br)
if err != os.ErrNotExist { if err != os.ErrNotExist {
return file, size, err return file, size, err
} }
return sto.base.Fetch(ctx, br) return sto.lower.Fetch(ctx, br)
} }
// StatBlobs on all BlobStatter reads sequentially, returning the first error. // StatBlobs on all BlobStatter reads sequentially, returning the first error.
@ -168,7 +186,7 @@ func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f fu
seen := make(map[blob.Ref]struct{}, len(exists)) seen := make(map[blob.Ref]struct{}, len(exists))
err := sto.stage.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error { err := sto.upper.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error {
seen[sbr.Ref] = struct{}{} seen[sbr.Ref] = struct{}{}
return f(sbr) return f(sbr)
}) })
@ -177,21 +195,21 @@ func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f fu
return err return err
} }
baseBlobs := exists[:0] lowerBlobs := make([]blob.Ref, 0, len(exists))
for _, br := range exists { for _, br := range exists {
if _, s := seen[br]; !s { if _, s := seen[br]; !s {
baseBlobs = append(baseBlobs, br) lowerBlobs = append(lowerBlobs, br)
} }
} }
return sto.base.StatBlobs(ctx, baseBlobs, f) return sto.lower.StatBlobs(ctx, lowerBlobs, f)
} }
// EnumerateBlobs enumerates blobs of the base and stage layers. // EnumerateBlobs enumerates blobs of the lower and upper layers.
func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest) defer close(dest)
enums := []blobserver.BlobEnumerator{sto.base, sto.stage} enums := []blobserver.BlobEnumerator{sto.lower, sto.upper}
// Ensure that we send limit blobs if possible. // Ensure that we send limit blobs if possible.
sent := 0 sent := 0
@ -233,16 +251,16 @@ func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.
} }
func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) { func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) {
if gener, ok := sto.stage.(blobserver.Generationer); ok { if gener, ok := sto.upper.(blobserver.Generationer); ok {
return gener.StorageGeneration() return gener.StorageGeneration()
} }
err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.stage)) err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
return return
} }
func (sto *overlayStorage) ResetStorageGeneration() error { func (sto *overlayStorage) ResetStorageGeneration() error {
if gener, ok := sto.stage.(blobserver.Generationer); ok { if gener, ok := sto.upper.(blobserver.Generationer); ok {
return gener.ResetStorageGeneration() return gener.ResetStorageGeneration()
} }
return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.stage)) return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
} }

View File

@ -30,52 +30,58 @@ import (
var ctxbg = context.Background() var ctxbg = context.Background()
func newStage(t *testing.T) blobserver.Storage { func newOverlay(t *testing.T, withDel bool) blobserver.Storage {
stage, _ := newStageWithBase(t) overlay, _ := newOverlayWithLower(t, withDel)
return stage return overlay
} }
func newStageWithBase(t *testing.T) (stage, base blobserver.Storage) { func newOverlayWithLower(t *testing.T, withDel bool) (sto, lower blobserver.Storage) {
ld := test.NewLoader() ld := test.NewLoader()
base, _ = ld.GetStorage("/good-base/") lower, _ = ld.GetStorage("/good-lower/")
ld.GetStorage("/good-stage/") ld.GetStorage("/good-upper/")
sto, err := newFromConfig(ld, map[string]interface{}{ conf := map[string]interface{}{
"base": "/good-base/", "lower": "/good-lower/",
"stage": "/good-stage/", "upper": "/good-upper/",
"deleted": map[string]interface{}{ }
if withDel {
conf["deleted"] = map[string]interface{}{
"type": "memory", "type": "memory",
}, }
}) }
sto, err := newFromConfig(ld, conf)
if err != nil { if err != nil {
t.Fatalf("Invalid config: %v", err) t.Fatalf("Invalid config: %v", err)
} }
return sto, base return sto, lower
} }
func TestStorageTest(t *testing.T) { func TestStorageTest(t *testing.T) {
storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) { storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) {
return newStage(t), func() {} return newOverlay(t, true), func() {}
})
storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) {
return newOverlay(t, false), func() {}
}) })
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
ctx := context.Background() ctx := context.Background()
stage, base := newStageWithBase(t) sto, lower := newOverlayWithLower(t, true)
var ( var (
// blobs that go into base // blobs that go into lower
S0 = &test.Blob{Contents: "source blob 0"} S0 = &test.Blob{Contents: "lower blob 0"}
S1 = &test.Blob{Contents: "source blob 1"} S1 = &test.Blob{Contents: "lower blob 1"}
// blobs that go into stage // blobs that go into sto
A = &test.Blob{Contents: "some small blob"} A = &test.Blob{Contents: "some small blob"}
B = &test.Blob{Contents: strings.Repeat("some middle blob", 100)} B = &test.Blob{Contents: strings.Repeat("some middle blob", 100)}
C = &test.Blob{Contents: strings.Repeat("A 8192 bytes length largish blob", 8192/32)} C = &test.Blob{Contents: strings.Repeat("A 8192 bytes length largish blob", 8192/32)}
) )
// add S0 and S1 to the underlying source // add S0 and S1 to lower
for _, tb := range []*test.Blob{S0, S1} { for _, tb := range []*test.Blob{S0, S1} {
sb, err := base.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader()) sb, err := lower.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader())
if err != nil { if err != nil {
t.Fatalf("ReceiveBlob of %s: %v", sb, err) t.Fatalf("ReceiveBlob of %s: %v", sb, err)
} }
@ -84,16 +90,16 @@ func TestDelete(t *testing.T) {
} }
} }
baseRefs := []blob.SizedRef{ lowerRefs := []blob.SizedRef{
S0.SizedRef(), S0.SizedRef(),
S1.SizedRef(), S1.SizedRef(),
} }
type step func() error type step func() error
stepAdd := func(tb *test.Blob) step { // add the blob to stage stepAdd := func(tb *test.Blob) step { // add the blob to sto
return func() error { return func() error {
sb, err := stage.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader()) sb, err := sto.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader())
if err != nil { if err != nil {
return fmt.Errorf("ReceiveBlob of %s: %v", sb, err) return fmt.Errorf("ReceiveBlob of %s: %v", sb, err)
} }
@ -110,17 +116,17 @@ func TestDelete(t *testing.T) {
wantRefs[i] = tb.SizedRef() wantRefs[i] = tb.SizedRef()
} }
return func() error { return func() error {
// ensure base was not modified // ensure lower was not modified
if err := storagetest.CheckEnumerate(base, baseRefs); err != nil { if err := storagetest.CheckEnumerate(lower, lowerRefs); err != nil {
return err return err
} }
return storagetest.CheckEnumerate(stage, wantRefs) return storagetest.CheckEnumerate(sto, wantRefs)
} }
} }
stepDelete := func(tb *test.Blob) step { // delete the blob in stage stepDelete := func(tb *test.Blob) step { // delete the blob in sto
return func() error { return func() error {
if err := stage.RemoveBlobs(ctx, []blob.Ref{tb.BlobRef()}); err != nil { if err := sto.RemoveBlobs(ctx, []blob.Ref{tb.BlobRef()}); err != nil {
return fmt.Errorf("RemoveBlob(%s): %v", tb.BlobRef(), err) return fmt.Errorf("RemoveBlob(%s): %v", tb.BlobRef(), err)
} }
return nil return nil