mirror of https://github.com/perkeep/perkeep.git
pkg/blobserver/union: add "union" blobserver type
This blobserver is just "cat"ing the given "read" storages. This is read-only, so you should use some other storage to augment this for writing and removing - for example the "cond" storage is perfect for this. My use-case is to use blobpacked with large=diskpacked, small=filesystem, but consolidate the small blob storage into a diskpacked + filesystem after the filesystem becomes huge. Another use-case is joining separately built camlistore servers into one. (For me, they have to be separated later, so I've built them separately, but I've to use it joined for a month). Change-Id: I4e7e42cd59286f0f34da2f6ff01e44439771d53c
This commit is contained in:
parent
32e37c926d
commit
2ba0c43003
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package blobserver
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
|
@ -25,6 +26,9 @@ import (
|
|||
"perkeep.org/pkg/blob"
|
||||
)
|
||||
|
||||
// ErrReadonly is the error value returned by read-only blobservers.
|
||||
var ErrReadonly = errors.New("this blobserver is read only")
|
||||
|
||||
// ReceiveString uploads the blob given by the string s to dst
|
||||
// and returns its blobref and size.
|
||||
func ReceiveString(dst BlobReceiver, s string) (blob.SizedRef, error) {
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
/*
|
||||
Copyright 2017 The Perkeep Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package union registers the "union" read-only blobserver storage type
|
||||
// to read from the given subsets, serving the first responding.
|
||||
package union // import "perkeep.org/pkg/blobserver/union"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"go4.org/jsonconfig"
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
)
|
||||
|
||||
type unionStorage struct {
|
||||
subsets []blobserver.Storage
|
||||
}
|
||||
|
||||
func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
|
||||
sto := &unionStorage{}
|
||||
|
||||
reads := conf.RequiredList("subsets")
|
||||
if err := conf.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, s := range reads {
|
||||
rs, err := ld.GetStorage(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sto.subsets = append(sto.subsets, rs)
|
||||
}
|
||||
|
||||
return sto, nil
|
||||
}
|
||||
|
||||
// ReceiveBlob would receive the blobs, but now just returns ErrReadonly.
|
||||
func (sto *unionStorage) ReceiveBlob(br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
|
||||
return blob.SizedRef{}, blobserver.ErrReadonly
|
||||
}
|
||||
|
||||
// RemoveBlobs would remove the given blobs, but now just returns ErrReadonly.
|
||||
func (sto *unionStorage) RemoveBlobs(blobs []blob.Ref) error {
|
||||
return blobserver.ErrReadonly
|
||||
}
|
||||
|
||||
// Fetch the blob by trying all configured read Storage concurrently,
|
||||
// returning the first successful response, or the first error if there's no match.
|
||||
func (sto *unionStorage) Fetch(b blob.Ref) (file io.ReadCloser, size uint32, err error) {
|
||||
type result struct {
|
||||
file io.ReadCloser
|
||||
size uint32
|
||||
err error
|
||||
}
|
||||
results := make(chan result, len(sto.subsets))
|
||||
var wg sync.WaitGroup
|
||||
for _, bs := range sto.subsets {
|
||||
bs := bs
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var res result
|
||||
res.file, res.size, res.err = bs.Fetch(b)
|
||||
results <- res
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
var firstErr error
|
||||
var firstRes result
|
||||
for r := range results {
|
||||
if r.err != nil {
|
||||
if firstErr == nil {
|
||||
firstErr = r.err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if firstRes.file != nil {
|
||||
if r.file != nil {
|
||||
r.file.Close() // don't need, we already have a successful Fetch
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
firstRes = r
|
||||
}
|
||||
if firstRes.file != nil {
|
||||
return firstRes.file, firstRes.size, nil
|
||||
}
|
||||
return nil, 0, firstErr
|
||||
}
|
||||
|
||||
// StatBlobs on all BlobStatter reads sequentially, returning the first error.
|
||||
func (sto *unionStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
// need to dedup the blobs
|
||||
maybeDup := make(chan blob.SizedRef)
|
||||
errCh := make(chan error, 1)
|
||||
var wg sync.WaitGroup
|
||||
var any bool
|
||||
for _, s := range sto.subsets {
|
||||
if bs, ok := s.(blobserver.BlobStatter); ok {
|
||||
any = true
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := bs.StatBlobs(ctx, blobs, func(sr blob.SizedRef) error {
|
||||
maybeDup <- sr
|
||||
return nil
|
||||
}); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
if !any {
|
||||
return errors.New("union: No BlobStatter reader configured")
|
||||
}
|
||||
|
||||
var closeChanOnce sync.Once
|
||||
go func() {
|
||||
wg.Wait()
|
||||
closeChanOnce.Do(func() { close(maybeDup) })
|
||||
}()
|
||||
|
||||
seen := make(map[blob.Ref]struct{}, len(blobs))
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errCh:
|
||||
closeChanOnce.Do(func() { close(maybeDup) })
|
||||
return err
|
||||
case sr, ok := <-maybeDup:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if _, ok = seen[sr.Ref]; !ok {
|
||||
seen[sr.Ref] = struct{}{}
|
||||
if err := f(sr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EnumerateBlobs concurrently on the readers, returning one of the errors.
|
||||
func (sto *unionStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
return blobserver.MergedEnumerateStorage(ctx, dest, sto.subsets, after, limit)
|
||||
}
|
||||
|
||||
func init() {
|
||||
blobserver.RegisterStorageConstructor("union", blobserver.StorageConstructor(newFromConfig))
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
Copyright 2017 The Perkeep Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package union
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go4.org/jsonconfig"
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
_ "perkeep.org/pkg/blobserver/cond"
|
||||
"perkeep.org/pkg/blobserver/replica"
|
||||
"perkeep.org/pkg/blobserver/storagetest"
|
||||
"perkeep.org/pkg/test"
|
||||
)
|
||||
|
||||
func newUnion(t *testing.T, ld *test.Loader, config jsonconfig.Obj) *unionStorage {
|
||||
sto, err := newFromConfig(ld, config)
|
||||
if err != nil {
|
||||
t.Fatalf("Invalid config: %v", err)
|
||||
}
|
||||
return sto.(*unionStorage)
|
||||
}
|
||||
|
||||
func mustReceive(t *testing.T, dst blobserver.Storage, tb *test.Blob) blob.SizedRef {
|
||||
tbRef := tb.BlobRef()
|
||||
sb, err := blobserver.Receive(dst, tbRef, tb.Reader())
|
||||
if err != nil {
|
||||
t.Fatalf("Receive: %v", err)
|
||||
}
|
||||
if int(sb.Size) != len(tb.Contents) {
|
||||
t.Fatalf("size = %d; want %d", sb.Size, len(tb.Contents))
|
||||
}
|
||||
if sb.Ref != tbRef {
|
||||
t.Fatal("wrong blob received")
|
||||
}
|
||||
return sb
|
||||
}
|
||||
|
||||
func TestStorageTest(t *testing.T) {
|
||||
storagetest.Test(t, func(t *testing.T) (_ blobserver.Storage, cleanup func()) {
|
||||
ld := test.NewLoader()
|
||||
s1, _ := ld.GetStorage("/good-schema/")
|
||||
s2, _ := ld.GetStorage("/good-other/")
|
||||
ld.SetStorage("/replica-all/", replica.NewForTest([]blobserver.Storage{s1, s2}))
|
||||
uni := newUnion(t, ld, map[string]interface{}{
|
||||
"subsets": []interface{}{"/good-schema/", "/good-other/"},
|
||||
})
|
||||
ld.SetStorage("/union/", uni)
|
||||
cnd := newCond(t, ld, map[string]interface{}{
|
||||
"write": "/good-schema/",
|
||||
"read": "/union/",
|
||||
"remove": "/replica-all/",
|
||||
})
|
||||
return cnd, func() {}
|
||||
})
|
||||
}
|
||||
func newCond(t *testing.T, ld *test.Loader, config jsonconfig.Obj) blobserver.Storage {
|
||||
sto, err := blobserver.CreateStorage("cond", ld, config)
|
||||
if err != nil {
|
||||
t.Fatalf("Invalid config: %v", err)
|
||||
}
|
||||
return sto
|
||||
}
|
|
@ -65,6 +65,7 @@ import (
|
|||
_ "perkeep.org/pkg/blobserver/replica"
|
||||
_ "perkeep.org/pkg/blobserver/s3"
|
||||
_ "perkeep.org/pkg/blobserver/shard"
|
||||
_ "perkeep.org/pkg/blobserver/union"
|
||||
// Indexers: (also present themselves as storage targets)
|
||||
// KeyValue implementations:
|
||||
_ "perkeep.org/pkg/sorted/kvfile"
|
||||
|
|
Loading…
Reference in New Issue