From 71d7cf9e88a5e2a72036ec8a4295599ebd83de16 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 7 Nov 2012 20:55:37 +0100 Subject: [PATCH] Implement storage generations, init and discovery --- pkg/blobserver/cond/cond.go | 15 ++++ pkg/blobserver/interface.go | 34 +++++++++ pkg/blobserver/localdisk/generation.go | 98 ++++++++++++++++++++++++++ pkg/blobserver/localdisk/localdisk.go | 28 +++++--- pkg/server/root.go | 10 +++ 5 files changed, 177 insertions(+), 8 deletions(-) create mode 100644 pkg/blobserver/localdisk/generation.go diff --git a/pkg/blobserver/cond/cond.go b/pkg/blobserver/cond/cond.go index 2ea37b108..d5ec055a7 100644 --- a/pkg/blobserver/cond/cond.go +++ b/pkg/blobserver/cond/cond.go @@ -58,6 +58,21 @@ func (sto *condStorage) WrapContext(req *http.Request) blobserver.Storage { return s2 } +func (sto *condStorage) StorageGeneration() (initTime time.Time, random string, err error) { + if gener, ok := sto.read.(blobserver.Generationer); ok { + return gener.StorageGeneration() + } + err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read)) + return +} + +func (sto *condStorage) ResetStorageGeneration() error { + if gener, ok := sto.read.(blobserver.Generationer); ok { + return gener.ResetStorageGeneration() + } + return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read)) +} + func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (storage blobserver.Storage, err error) { sto := &condStorage{ SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, diff --git a/pkg/blobserver/interface.go b/pkg/blobserver/interface.go index 9f2a8ba49..921b02ccc 100644 --- a/pkg/blobserver/interface.go +++ b/pkg/blobserver/interface.go @@ -121,6 +121,40 @@ type Configer interface { Config() *Config } +// A GenerationNotSupportedError explains why a Storage +// value implemented the Generationer interface but failed due +// to a wrapped Storage value not implementing the interface. +type GenerationNotSupportedError string + +func (s GenerationNotSupportedError) Error() string { return string(s) } + +/* +The optional Generationer interface is an optimization and paranoia +facility for clients which can be implemented by Storage +implementations. + +If the client sees the same random string in multiple upload sessions, +it assumes that the blobserver still has all the same blobs, and also +it's the same server. This mechanism is not fundamental to +Camlistore's operation: the client could also check each blob before +uploading, or enumerate all blobs from the server too. This is purely +an optimization so clients can mix this value into their "is this file +uploaded?" local cache keys. +*/ +type Generationer interface { + // Generation returns a Storage's initialization time and + // and unique random string (or UUID). Implementations + // should call ResetStorageGeneration on demand if no + // information is known. + // The error will be of type GenerationNotSupportedError if an underlying + // storage target doesn't support the Generationer interface. + StorageGeneration() (initTime time.Time, random string, err error) + + // ResetGeneration deletes the information returned by Generation + // and re-generates it. + ResetStorageGeneration() error +} + type Storage interface { blobref.StreamingFetcher BlobReceiver diff --git a/pkg/blobserver/localdisk/generation.go b/pkg/blobserver/localdisk/generation.go new file mode 100644 index 000000000..1389bfa20 --- /dev/null +++ b/pkg/blobserver/localdisk/generation.go @@ -0,0 +1,98 @@ +/* +Copyright 2012 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package localdisk + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "time" + + "camlistore.org/pkg/blobserver" +) + +var _ blobserver.Generationer = (*DiskStorage)(nil) + +func (ds *DiskStorage) generationFile() string { + return filepath.Join(ds.root, "GENERATION.dat") +} + +func (ds *DiskStorage) StorageGeneration() (initTime time.Time, random string, err error) { + if ds.partition != "" { + err = fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition) + return + } + f, err := os.Open(ds.generationFile()) + if os.IsNotExist(err) { + if err = ds.ResetStorageGeneration(); err != nil { + return + } + f, err = os.Open(ds.generationFile()) + } + if err != nil { + return + } + defer f.Close() + bs, err := ioutil.ReadAll(f) + if err != nil { + return + } + if i := bytes.IndexByte(bs, '\n'); i != -1 { + bs = bs[:i] + } + if fi, err := f.Stat(); err == nil { + initTime = fi.ModTime() + } + random = string(bs) + return +} + +func (ds *DiskStorage) ResetStorageGeneration() error { + if ds.partition != "" { + return fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition) + } + var buf bytes.Buffer + if _, err := io.CopyN(&buf, rand.Reader, 20); err != nil { + return err + } + hex := fmt.Sprintf("%x", buf.Bytes()) + buf.Reset() + buf.WriteString(hex) + buf.WriteString(` + +This file's random string on the first line is an optimization and +paranoia facility for clients. + +If the client sees the same random string in multiple upload sessions, +it assumes that the blobserver still has all the same blobs, and also +it's the same server. This mechanism is not fundamental to +Camlistore's operation: the client could also check each blob before +uploading, or enumerate all blobs from the server too. This is purely +an optimization so clients can mix this value into their "is this file +uploaded?" local cache keys. + +If you deleted any blobs (or suspect any were corrupted), it's best to +delete this file so clients can safely re-upload them. + +`) + + return ioutil.WriteFile(ds.generationFile(), buf.Bytes(), 0644) +} diff --git a/pkg/blobserver/localdisk/localdisk.go b/pkg/blobserver/localdisk/localdisk.go index 53741847e..682c44e08 100644 --- a/pkg/blobserver/localdisk/localdisk.go +++ b/pkg/blobserver/localdisk/localdisk.go @@ -17,7 +17,6 @@ limitations under the License. package localdisk import ( - "errors" "fmt" "io" "log" @@ -41,18 +40,28 @@ type DiskStorage struct { mirrorPartitions []*DiskStorage } -func New(root string) (storage *DiskStorage, err error) { +// New returns a new local disk storage implementation, rooted at the provided +// directory, which must already exist. +func New(root string) (*DiskStorage, error) { // Local disk. - fi, staterr := os.Stat(root) - if staterr != nil || !fi.IsDir() { - err = errors.New(fmt.Sprintf("Storage root %q doesn't exist or is not a directory.", root)) - return + fi, err := os.Stat(root) + if os.IsNotExist(err) { + return nil, fmt.Errorf("Storage root %q doesn't exist", root) } - storage = &DiskStorage{ + if err != nil { + return nil, err + } + if !fi.IsDir() { + return nil, fmt.Errorf("Storage root %q exists but is not a directory.", root) + } + ds := &DiskStorage{ SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, root: root, } - return + if _, _, err := ds.StorageGeneration(); err != nil { + return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err) + } + return ds, nil } func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) { @@ -70,6 +79,9 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserv if !fi.IsDir() { return nil, fmt.Errorf("Path %q isn't a directory", sto.root) } + if _, _, err := sto.StorageGeneration(); err != nil { + return nil, fmt.Errorf("Error initialization generation for %q: %v", sto.root, err) + } return sto, nil } diff --git a/pkg/server/root.go b/pkg/server/root.go index 0f7b2a188..0be78fb0c 100644 --- a/pkg/server/root.go +++ b/pkg/server/root.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" "camlistore.org/pkg/auth" "camlistore.org/pkg/blobserver" @@ -108,6 +109,15 @@ func (rh *RootHandler) serveDiscovery(rw http.ResponseWriter, req *http.Request) "blobRoot": rh.BlobRoot, "searchRoot": rh.SearchRoot, } + if gener, ok := rh.Storage.(blobserver.Generationer); ok { + initTime, gen, err := gener.StorageGeneration() + if err != nil { + m["storageGenerationError"] = err.Error() + } else { + m["storageInitTime"] = initTime.UTC().Format(time.RFC3339) + m["storageGeneration"] = gen + } + } if rh.ui != nil { rh.ui.populateDiscoveryMap(m) }