mirror of https://github.com/perkeep/perkeep.git
namespace: implementation, and initial basic tests using storagetest.
Change-Id: Ib53822fb530f9a1bd2a3c0658120639d5bf4426a
This commit is contained in:
parent
b0ae1a55eb
commit
bb5059bafd
|
@ -23,18 +23,24 @@ limitations under the License.
|
|||
package namespace
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"camlistore.org/pkg/blob"
|
||||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/context"
|
||||
"camlistore.org/pkg/jsonconfig"
|
||||
"camlistore.org/pkg/sorted"
|
||||
"camlistore.org/pkg/strutil"
|
||||
)
|
||||
|
||||
type nsto struct {
|
||||
inventory sorted.KeyValue
|
||||
master blobserver.Storage
|
||||
|
||||
blobserver.NoImplStorage // TODO(bradfitz): remove this and finish implementing
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -58,3 +64,107 @@ func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobser
|
|||
}
|
||||
return sto, nil
|
||||
}
|
||||
|
||||
func (ns *nsto) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
defer close(dest)
|
||||
done := ctx.Done()
|
||||
|
||||
it := ns.inventory.Find(after, "")
|
||||
first := true
|
||||
for limit > 0 && it.Next() {
|
||||
if first {
|
||||
first = false
|
||||
if after != "" && it.Key() == after {
|
||||
continue
|
||||
}
|
||||
}
|
||||
br, ok := blob.ParseBytes(it.KeyBytes())
|
||||
size, err := strutil.ParseUintBytes(it.ValueBytes(), 10, 32)
|
||||
if !ok || err != nil {
|
||||
log.Printf("Bogus namespace key %q / value %q", it.Key(), it.Value())
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case dest <- blob.SizedRef{br, int64(size)}:
|
||||
case <-done:
|
||||
return context.ErrCanceled
|
||||
}
|
||||
limit--
|
||||
}
|
||||
if err := it.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ns *nsto) FetchStreaming(br blob.Ref) (rc io.ReadCloser, size int64, err error) {
|
||||
invSizeStr, err := ns.inventory.Get(br.String())
|
||||
if err == sorted.ErrNotFound {
|
||||
err = os.ErrNotExist
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
invSize, err := strconv.ParseUint(invSizeStr, 10, 32)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rc, size, err = ns.master.FetchStreaming(br)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if size != int64(invSize) {
|
||||
log.Printf("namespace: on blob %v, unexpected inventory size %d for master size %d", br, invSize, size)
|
||||
return nil, 0, os.ErrNotExist
|
||||
}
|
||||
return rc, size, nil
|
||||
}
|
||||
|
||||
func (ns *nsto) ReceiveBlob(br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
|
||||
var buf bytes.Buffer
|
||||
size, err := io.Copy(&buf, src)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if a duplicate blob, already uploaded previously.
|
||||
if _, ierr := ns.inventory.Get(br.String()); ierr == nil {
|
||||
return blob.SizedRef{br, size}, nil
|
||||
}
|
||||
|
||||
sb, err = ns.master.ReceiveBlob(br, &buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = ns.inventory.Set(br.String(), strconv.Itoa(int(size)))
|
||||
return
|
||||
}
|
||||
|
||||
func (ns *nsto) RemoveBlobs(blobs []blob.Ref) error {
|
||||
for _, br := range blobs {
|
||||
if err := ns.inventory.Delete(br.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ns *nsto) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
||||
for _, br := range blobs {
|
||||
invSizeStr, err := ns.inventory.Get(br.String())
|
||||
if err == sorted.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
invSize, err := strconv.ParseUint(invSizeStr, 10, 32)
|
||||
if err != nil {
|
||||
log.Printf("Bogus namespace key %q / value %q", br.String(), invSizeStr)
|
||||
}
|
||||
dest <- blob.SizedRef{br, int64(invSize)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
Copyright 2014 The Camlistore Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package namespace
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/blobserver/storagetest"
|
||||
"camlistore.org/pkg/test"
|
||||
)
|
||||
|
||||
func newTestNamespace(t *testing.T) (sto blobserver.Storage, cleanup func()) {
|
||||
ld := test.NewLoader()
|
||||
sto, err := newFromConfig(ld, map[string]interface{}{
|
||||
"storage": "/good-storage/",
|
||||
"inventory": map[string]interface{}{
|
||||
"type": "memory",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return sto, func() {}
|
||||
}
|
||||
|
||||
func TestNamespace(t *testing.T) {
|
||||
storagetest.Test(t, newTestNamespace)
|
||||
}
|
Loading…
Reference in New Issue