From bb5059bafd7672f841d82f93759061b9c3f69bd1 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 19 Jan 2014 22:47:35 -0800 Subject: [PATCH] namespace: implementation, and initial basic tests using storagetest. Change-Id: Ib53822fb530f9a1bd2a3c0658120639d5bf4426a --- pkg/blobserver/namespace/ns.go | 114 +++++++++++++++++++++++++++- pkg/blobserver/namespace/ns_test.go | 43 +++++++++++ 2 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 pkg/blobserver/namespace/ns_test.go diff --git a/pkg/blobserver/namespace/ns.go b/pkg/blobserver/namespace/ns.go index f71c58283..fd4ddf4f1 100644 --- a/pkg/blobserver/namespace/ns.go +++ b/pkg/blobserver/namespace/ns.go @@ -23,18 +23,24 @@ limitations under the License. package namespace import ( + "bytes" "fmt" + "io" + "log" + "os" + "strconv" + "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/sorted" + "camlistore.org/pkg/strutil" ) type nsto struct { inventory sorted.KeyValue master blobserver.Storage - - blobserver.NoImplStorage // TODO(bradfitz): remove this and finish implementing } func init() { @@ -58,3 +64,107 @@ func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobser } return sto, nil } + +func (ns *nsto) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { + defer close(dest) + done := ctx.Done() + + it := ns.inventory.Find(after, "") + first := true + for limit > 0 && it.Next() { + if first { + first = false + if after != "" && it.Key() == after { + continue + } + } + br, ok := blob.ParseBytes(it.KeyBytes()) + size, err := strutil.ParseUintBytes(it.ValueBytes(), 10, 32) + if !ok || err != nil { + log.Printf("Bogus namespace key %q / value %q", it.Key(), it.Value()) + continue + } + select { + case dest <- blob.SizedRef{br, int64(size)}: + case <-done: + return context.ErrCanceled + } + limit-- + } + if err := it.Close(); err != nil { + return err + } + return nil +} + +func (ns *nsto) FetchStreaming(br blob.Ref) (rc io.ReadCloser, size int64, err error) { + invSizeStr, err := ns.inventory.Get(br.String()) + if err == sorted.ErrNotFound { + err = os.ErrNotExist + return + } + if err != nil { + return + } + invSize, err := strconv.ParseUint(invSizeStr, 10, 32) + if err != nil { + return + } + rc, size, err = ns.master.FetchStreaming(br) + if err != nil { + return + } + if size != int64(invSize) { + log.Printf("namespace: on blob %v, unexpected inventory size %d for master size %d", br, invSize, size) + return nil, 0, os.ErrNotExist + } + return rc, size, nil +} + +func (ns *nsto) ReceiveBlob(br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) { + var buf bytes.Buffer + size, err := io.Copy(&buf, src) + if err != nil { + return + } + + // Check if a duplicate blob, already uploaded previously. + if _, ierr := ns.inventory.Get(br.String()); ierr == nil { + return blob.SizedRef{br, size}, nil + } + + sb, err = ns.master.ReceiveBlob(br, &buf) + if err != nil { + return + } + + err = ns.inventory.Set(br.String(), strconv.Itoa(int(size))) + return +} + +func (ns *nsto) RemoveBlobs(blobs []blob.Ref) error { + for _, br := range blobs { + if err := ns.inventory.Delete(br.String()); err != nil { + return err + } + } + return nil +} + +func (ns *nsto) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { + for _, br := range blobs { + invSizeStr, err := ns.inventory.Get(br.String()) + if err == sorted.ErrNotFound { + continue + } + if err != nil { + return err + } + invSize, err := strconv.ParseUint(invSizeStr, 10, 32) + if err != nil { + log.Printf("Bogus namespace key %q / value %q", br.String(), invSizeStr) + } + dest <- blob.SizedRef{br, int64(invSize)} + } + return nil +} diff --git a/pkg/blobserver/namespace/ns_test.go b/pkg/blobserver/namespace/ns_test.go new file mode 100644 index 000000000..0ff78ec4d --- /dev/null +++ b/pkg/blobserver/namespace/ns_test.go @@ -0,0 +1,43 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "testing" + + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/storagetest" + "camlistore.org/pkg/test" +) + +func newTestNamespace(t *testing.T) (sto blobserver.Storage, cleanup func()) { + ld := test.NewLoader() + sto, err := newFromConfig(ld, map[string]interface{}{ + "storage": "/good-storage/", + "inventory": map[string]interface{}{ + "type": "memory", + }, + }) + if err != nil { + t.Fatal(err) + } + return sto, func() {} +} + +func TestNamespace(t *testing.T) { + storagetest.Test(t, newTestNamespace) +}