From 2ba0c43003ef4134e7e911ef55462a152a8b3eee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Tue, 2 Jan 2018 22:33:42 +0100 Subject: [PATCH] pkg/blobserver/union: add "union" blobserver type This blobserver is just "cat"ing the given "read" storages. This is read-only, so you should use some other storage to augment this for writing and removing - for example the "cond" storage is perfect for this. My use-case is to use blobpacked with large=diskpacked, small=filesystem, but consolidate the small blob storage into a diskpacked + filesystem after the filesystem becomes huge. Another use-case is joining separately built camlistore servers into one. (For me, they have to be separated later, so I've built them separately, but I've to use it joined for a month). Change-Id: I4e7e42cd59286f0f34da2f6ff01e44439771d53c --- pkg/blobserver/receive.go | 4 + pkg/blobserver/union/union.go | 177 +++++++++++++++++++++++++++++ pkg/blobserver/union/union_test.go | 78 +++++++++++++ server/camlistored/camlistored.go | 1 + 4 files changed, 260 insertions(+) create mode 100644 pkg/blobserver/union/union.go create mode 100644 pkg/blobserver/union/union_test.go diff --git a/pkg/blobserver/receive.go b/pkg/blobserver/receive.go index 978d64801..487f6bceb 100644 --- a/pkg/blobserver/receive.go +++ b/pkg/blobserver/receive.go @@ -17,6 +17,7 @@ limitations under the License. package blobserver import ( + "errors" "fmt" "hash" "io" @@ -25,6 +26,9 @@ import ( "perkeep.org/pkg/blob" ) +// ErrReadonly is the error value returned by read-only blobservers. +var ErrReadonly = errors.New("this blobserver is read only") + // ReceiveString uploads the blob given by the string s to dst // and returns its blobref and size. func ReceiveString(dst BlobReceiver, s string) (blob.SizedRef, error) { diff --git a/pkg/blobserver/union/union.go b/pkg/blobserver/union/union.go new file mode 100644 index 000000000..64efd1f0a --- /dev/null +++ b/pkg/blobserver/union/union.go @@ -0,0 +1,177 @@ +/* +Copyright 2017 The Perkeep Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package union registers the "union" read-only blobserver storage type +// to read from the given subsets, serving the first responding. +package union // import "perkeep.org/pkg/blobserver/union" + +import ( + "context" + "errors" + "io" + "sync" + + "go4.org/jsonconfig" + "perkeep.org/pkg/blob" + "perkeep.org/pkg/blobserver" +) + +type unionStorage struct { + subsets []blobserver.Storage +} + +func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) { + sto := &unionStorage{} + + reads := conf.RequiredList("subsets") + if err := conf.Validate(); err != nil { + return nil, err + } + + for _, s := range reads { + rs, err := ld.GetStorage(s) + if err != nil { + return nil, err + } + sto.subsets = append(sto.subsets, rs) + } + + return sto, nil +} + +// ReceiveBlob would receive the blobs, but now just returns ErrReadonly. +func (sto *unionStorage) ReceiveBlob(br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) { + return blob.SizedRef{}, blobserver.ErrReadonly +} + +// RemoveBlobs would remove the given blobs, but now just returns ErrReadonly. +func (sto *unionStorage) RemoveBlobs(blobs []blob.Ref) error { + return blobserver.ErrReadonly +} + +// Fetch the blob by trying all configured read Storage concurrently, +// returning the first successful response, or the first error if there's no match. +func (sto *unionStorage) Fetch(b blob.Ref) (file io.ReadCloser, size uint32, err error) { + type result struct { + file io.ReadCloser + size uint32 + err error + } + results := make(chan result, len(sto.subsets)) + var wg sync.WaitGroup + for _, bs := range sto.subsets { + bs := bs + wg.Add(1) + go func() { + defer wg.Done() + var res result + res.file, res.size, res.err = bs.Fetch(b) + results <- res + }() + } + go func() { + wg.Wait() + close(results) + }() + var firstErr error + var firstRes result + for r := range results { + if r.err != nil { + if firstErr == nil { + firstErr = r.err + } + continue + } + if firstRes.file != nil { + if r.file != nil { + r.file.Close() // don't need, we already have a successful Fetch + } + continue + } + + firstRes = r + } + if firstRes.file != nil { + return firstRes.file, firstRes.size, nil + } + return nil, 0, firstErr +} + +// StatBlobs on all BlobStatter reads sequentially, returning the first error. +func (sto *unionStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error { + if err := ctx.Err(); err != nil { + return err + } + // need to dedup the blobs + maybeDup := make(chan blob.SizedRef) + errCh := make(chan error, 1) + var wg sync.WaitGroup + var any bool + for _, s := range sto.subsets { + if bs, ok := s.(blobserver.BlobStatter); ok { + any = true + wg.Add(1) + go func() { + defer wg.Done() + if err := bs.StatBlobs(ctx, blobs, func(sr blob.SizedRef) error { + maybeDup <- sr + return nil + }); err != nil { + errCh <- err + } + }() + } + } + if !any { + return errors.New("union: No BlobStatter reader configured") + } + + var closeChanOnce sync.Once + go func() { + wg.Wait() + closeChanOnce.Do(func() { close(maybeDup) }) + }() + + seen := make(map[blob.Ref]struct{}, len(blobs)) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + closeChanOnce.Do(func() { close(maybeDup) }) + return err + case sr, ok := <-maybeDup: + if !ok { + return nil + } + if _, ok = seen[sr.Ref]; !ok { + seen[sr.Ref] = struct{}{} + if err := f(sr); err != nil { + return err + } + } + } + } +} + +// EnumerateBlobs concurrently on the readers, returning one of the errors. +func (sto *unionStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { + return blobserver.MergedEnumerateStorage(ctx, dest, sto.subsets, after, limit) +} + +func init() { + blobserver.RegisterStorageConstructor("union", blobserver.StorageConstructor(newFromConfig)) +} diff --git a/pkg/blobserver/union/union_test.go b/pkg/blobserver/union/union_test.go new file mode 100644 index 000000000..d8a397466 --- /dev/null +++ b/pkg/blobserver/union/union_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2017 The Perkeep Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package union + +import ( + "testing" + + "go4.org/jsonconfig" + "perkeep.org/pkg/blob" + "perkeep.org/pkg/blobserver" + _ "perkeep.org/pkg/blobserver/cond" + "perkeep.org/pkg/blobserver/replica" + "perkeep.org/pkg/blobserver/storagetest" + "perkeep.org/pkg/test" +) + +func newUnion(t *testing.T, ld *test.Loader, config jsonconfig.Obj) *unionStorage { + sto, err := newFromConfig(ld, config) + if err != nil { + t.Fatalf("Invalid config: %v", err) + } + return sto.(*unionStorage) +} + +func mustReceive(t *testing.T, dst blobserver.Storage, tb *test.Blob) blob.SizedRef { + tbRef := tb.BlobRef() + sb, err := blobserver.Receive(dst, tbRef, tb.Reader()) + if err != nil { + t.Fatalf("Receive: %v", err) + } + if int(sb.Size) != len(tb.Contents) { + t.Fatalf("size = %d; want %d", sb.Size, len(tb.Contents)) + } + if sb.Ref != tbRef { + t.Fatal("wrong blob received") + } + return sb +} + +func TestStorageTest(t *testing.T) { + storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) { + ld := test.NewLoader() + s1, _ := ld.GetStorage("/good-schema/") + s2, _ := ld.GetStorage("/good-other/") + ld.SetStorage("/replica-all/", replica.NewForTest([]blobserver.Storage{s1, s2})) + uni := newUnion(t, ld, map[string]interface{}{ + "subsets": []interface{}{"/good-schema/", "/good-other/"}, + }) + ld.SetStorage("/union/", uni) + cnd := newCond(t, ld, map[string]interface{}{ + "write": "/good-schema/", + "read": "/union/", + "remove": "/replica-all/", + }) + return cnd, func() {} + }) +} +func newCond(t *testing.T, ld *test.Loader, config jsonconfig.Obj) blobserver.Storage { + sto, err := blobserver.CreateStorage("cond", ld, config) + if err != nil { + t.Fatalf("Invalid config: %v", err) + } + return sto +} diff --git a/server/camlistored/camlistored.go b/server/camlistored/camlistored.go index ff6e26b7b..26a94a3e9 100644 --- a/server/camlistored/camlistored.go +++ b/server/camlistored/camlistored.go @@ -65,6 +65,7 @@ import ( _ "perkeep.org/pkg/blobserver/replica" _ "perkeep.org/pkg/blobserver/s3" _ "perkeep.org/pkg/blobserver/shard" + _ "perkeep.org/pkg/blobserver/union" // Indexers: (also present themselves as storage targets) // KeyValue implementations: _ "perkeep.org/pkg/sorted/kvfile"