Implement storage generations, init and discovery

This commit is contained in:
Brad Fitzpatrick 2012-11-07 20:55:37 +01:00
parent a6d3151645
commit 71d7cf9e88
5 changed files with 177 additions and 8 deletions

View File

@ -58,6 +58,21 @@ func (sto *condStorage) WrapContext(req *http.Request) blobserver.Storage {
return s2
}
func (sto *condStorage) StorageGeneration() (initTime time.Time, random string, err error) {
if gener, ok := sto.read.(blobserver.Generationer); ok {
return gener.StorageGeneration()
}
err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read))
return
}
func (sto *condStorage) ResetStorageGeneration() error {
if gener, ok := sto.read.(blobserver.Generationer); ok {
return gener.ResetStorageGeneration()
}
return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read))
}
func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (storage blobserver.Storage, err error) {
sto := &condStorage{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},

View File

@ -121,6 +121,40 @@ type Configer interface {
Config() *Config
}
// A GenerationNotSupportedError explains why a Storage
// value implemented the Generationer interface but failed due
// to a wrapped Storage value not implementing the interface.
type GenerationNotSupportedError string
func (s GenerationNotSupportedError) Error() string { return string(s) }
/*
The optional Generationer interface is an optimization and paranoia
facility for clients which can be implemented by Storage
implementations.
If the client sees the same random string in multiple upload sessions,
it assumes that the blobserver still has all the same blobs, and also
it's the same server. This mechanism is not fundamental to
Camlistore's operation: the client could also check each blob before
uploading, or enumerate all blobs from the server too. This is purely
an optimization so clients can mix this value into their "is this file
uploaded?" local cache keys.
*/
type Generationer interface {
// Generation returns a Storage's initialization time and
// and unique random string (or UUID). Implementations
// should call ResetStorageGeneration on demand if no
// information is known.
// The error will be of type GenerationNotSupportedError if an underlying
// storage target doesn't support the Generationer interface.
StorageGeneration() (initTime time.Time, random string, err error)
// ResetGeneration deletes the information returned by Generation
// and re-generates it.
ResetStorageGeneration() error
}
type Storage interface {
blobref.StreamingFetcher
BlobReceiver

View File

@ -0,0 +1,98 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"camlistore.org/pkg/blobserver"
)
var _ blobserver.Generationer = (*DiskStorage)(nil)
func (ds *DiskStorage) generationFile() string {
return filepath.Join(ds.root, "GENERATION.dat")
}
func (ds *DiskStorage) StorageGeneration() (initTime time.Time, random string, err error) {
if ds.partition != "" {
err = fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
return
}
f, err := os.Open(ds.generationFile())
if os.IsNotExist(err) {
if err = ds.ResetStorageGeneration(); err != nil {
return
}
f, err = os.Open(ds.generationFile())
}
if err != nil {
return
}
defer f.Close()
bs, err := ioutil.ReadAll(f)
if err != nil {
return
}
if i := bytes.IndexByte(bs, '\n'); i != -1 {
bs = bs[:i]
}
if fi, err := f.Stat(); err == nil {
initTime = fi.ModTime()
}
random = string(bs)
return
}
func (ds *DiskStorage) ResetStorageGeneration() error {
if ds.partition != "" {
return fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
}
var buf bytes.Buffer
if _, err := io.CopyN(&buf, rand.Reader, 20); err != nil {
return err
}
hex := fmt.Sprintf("%x", buf.Bytes())
buf.Reset()
buf.WriteString(hex)
buf.WriteString(`
This file's random string on the first line is an optimization and
paranoia facility for clients.
If the client sees the same random string in multiple upload sessions,
it assumes that the blobserver still has all the same blobs, and also
it's the same server. This mechanism is not fundamental to
Camlistore's operation: the client could also check each blob before
uploading, or enumerate all blobs from the server too. This is purely
an optimization so clients can mix this value into their "is this file
uploaded?" local cache keys.
If you deleted any blobs (or suspect any were corrupted), it's best to
delete this file so clients can safely re-upload them.
`)
return ioutil.WriteFile(ds.generationFile(), buf.Bytes(), 0644)
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package localdisk
import (
"errors"
"fmt"
"io"
"log"
@ -41,18 +40,28 @@ type DiskStorage struct {
mirrorPartitions []*DiskStorage
}
func New(root string) (storage *DiskStorage, err error) {
// New returns a new local disk storage implementation, rooted at the provided
// directory, which must already exist.
func New(root string) (*DiskStorage, error) {
// Local disk.
fi, staterr := os.Stat(root)
if staterr != nil || !fi.IsDir() {
err = errors.New(fmt.Sprintf("Storage root %q doesn't exist or is not a directory.", root))
return
fi, err := os.Stat(root)
if os.IsNotExist(err) {
return nil, fmt.Errorf("Storage root %q doesn't exist", root)
}
storage = &DiskStorage{
if err != nil {
return nil, err
}
if !fi.IsDir() {
return nil, fmt.Errorf("Storage root %q exists but is not a directory.", root)
}
ds := &DiskStorage{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
root: root,
}
return
if _, _, err := ds.StorageGeneration(); err != nil {
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
}
return ds, nil
}
func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
@ -70,6 +79,9 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserv
if !fi.IsDir() {
return nil, fmt.Errorf("Path %q isn't a directory", sto.root)
}
if _, _, err := sto.StorageGeneration(); err != nil {
return nil, fmt.Errorf("Error initialization generation for %q: %v", sto.root, err)
}
return sto, nil
}

View File

@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"
"camlistore.org/pkg/auth"
"camlistore.org/pkg/blobserver"
@ -108,6 +109,15 @@ func (rh *RootHandler) serveDiscovery(rw http.ResponseWriter, req *http.Request)
"blobRoot": rh.BlobRoot,
"searchRoot": rh.SearchRoot,
}
if gener, ok := rh.Storage.(blobserver.Generationer); ok {
initTime, gen, err := gener.StorageGeneration()
if err != nil {
m["storageGenerationError"] = err.Error()
} else {
m["storageInitTime"] = initTime.UTC().Format(time.RFC3339)
m["storageGeneration"] = gen
}
}
if rh.ui != nil {
rh.ui.populateDiscoveryMap(m)
}