Add generation support for diskpacked.

Refactor the localdisk, diskpacked common code to pkg/blobserver/local
(only StorageGeneration, ResetStorageGeneration in this CL)

Change-Id: Ib04125805d5a1960bd29a474d3fc7ca985708d8d
This commit is contained in:
Tamás Gulácsi 2013-10-09 21:20:28 +02:00
parent 34823a1655
commit a624b6d3b2
4 changed files with 127 additions and 61 deletions

View File

@ -41,6 +41,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/local"
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/kvfile"
"camlistore.org/pkg/jsonconfig"
@ -48,7 +49,7 @@ import (
"camlistore.org/third_party/github.com/camlistore/lock"
)
const defaultMaxFileSize = 512 << 20 // 512MB
const defaultMaxFileSize = 512 << 20 // 512MB
type storage struct {
root string
@ -62,6 +63,8 @@ type storage struct {
currentO int64 // current offset
closed bool
closeErr error
*local.Generationer
}
// newStorage returns a new storage in path root with the given maxFileSize,
@ -81,10 +84,15 @@ func newStorage(root string, maxFileSize int64) (*storage, error) {
if maxFileSize <= 0 {
maxFileSize = defaultMaxFileSize
}
s := &storage{root: root, index: index, maxFileSize: maxFileSize}
s := &storage{root: root, index: index, maxFileSize: maxFileSize,
Generationer: local.NewGenerationer(root),
}
if err := s.openCurrent(); err != nil {
return nil, err
}
if _, _, err := s.StorageGeneration(); err != nil {
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
}
return s, nil
}

View File

@ -0,0 +1,105 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package local implements functionality common to both the "localdisk" and
"diskpacked" storage mechanisms.
*/
package local
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
)
type Generationer struct {
root string
}
// NewGenerationer returns a struct for implementing blobserver.Generationer
func NewGenerationer(rootDir string) *Generationer {
return &Generationer{rootDir}
}
func (g Generationer) generationFile() string {
return filepath.Join(g.root, "GENERATION.dat")
}
// StorageGeneration returns the generation's initialization time,
// and the random string. If the GENERATION.dat file does not exist under
// root yet, then creates it now (by calling ResetGeneration).
func (g Generationer) StorageGeneration() (initTime time.Time, random string, err error) {
f, err := os.Open(g.generationFile())
if os.IsNotExist(err) {
if err = g.ResetStorageGeneration(); err != nil {
return
}
f, err = os.Open(g.generationFile())
}
if err != nil {
return
}
defer f.Close()
bs, err := ioutil.ReadAll(f)
if err != nil {
return
}
if i := bytes.IndexByte(bs, '\n'); i != -1 {
bs = bs[:i]
}
if fi, err := f.Stat(); err == nil {
initTime = fi.ModTime()
}
random = string(bs)
return
}
// ResetStorageGeneration reinitializes the generation by recreating the
// GENERATION.dat file with a new random string
func (g Generationer) ResetStorageGeneration() error {
var buf bytes.Buffer
if _, err := io.CopyN(&buf, rand.Reader, 20); err != nil {
return err
}
hex := fmt.Sprintf("%x", buf.Bytes())
buf.Reset()
buf.WriteString(hex)
buf.WriteString(`
This file's random string on the first line is an optimization and
paranoia facility for clients.
If the client sees the same random string in multiple upload sessions,
it assumes that the blobserver still has all the same blobs, and also
it's the same server. This mechanism is not fundamental to
Camlistore's operation: the client could also check each blob before
uploading, or enumerate all blobs from the server too. This is purely
an optimization so clients can mix this value into their "is this file
uploaded?" local cache keys.
If you deleted any blobs (or suspect any were corrupted), it's best to
delete this file so clients can safely re-upload them.
`)
return ioutil.WriteFile(g.generationFile(), buf.Bytes(), 0644)
}

View File

@ -17,82 +17,30 @@ limitations under the License.
package localdisk
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"camlistore.org/pkg/blobserver"
)
// Compile-time check that *DiskStorage implements blobserver.Generationer
var _ blobserver.Generationer = (*DiskStorage)(nil)
func (ds *DiskStorage) generationFile() string {
return filepath.Join(ds.root, "GENERATION.dat")
}
// StorageGeneration returns the generation's initialization time,
// and the random string.
func (ds *DiskStorage) StorageGeneration() (initTime time.Time, random string, err error) {
if ds.partition != "" {
err = fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
return
}
f, err := os.Open(ds.generationFile())
if os.IsNotExist(err) {
if err = ds.ResetStorageGeneration(); err != nil {
return
}
f, err = os.Open(ds.generationFile())
}
if err != nil {
return
}
defer f.Close()
bs, err := ioutil.ReadAll(f)
if err != nil {
return
}
if i := bytes.IndexByte(bs, '\n'); i != -1 {
bs = bs[:i]
}
if fi, err := f.Stat(); err == nil {
initTime = fi.ModTime()
}
random = string(bs)
return
return ds.gen.StorageGeneration()
}
// ResetStorageGeneration reinitializes the generation by recreating the
// GENERATION.dat file with a new random string
func (ds *DiskStorage) ResetStorageGeneration() error {
if ds.partition != "" {
return fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
}
var buf bytes.Buffer
if _, err := io.CopyN(&buf, rand.Reader, 20); err != nil {
return err
}
hex := fmt.Sprintf("%x", buf.Bytes())
buf.Reset()
buf.WriteString(hex)
buf.WriteString(`
This file's random string on the first line is an optimization and
paranoia facility for clients.
If the client sees the same random string in multiple upload sessions,
it assumes that the blobserver still has all the same blobs, and also
it's the same server. This mechanism is not fundamental to
Camlistore's operation: the client could also check each blob before
uploading, or enumerate all blobs from the server too. This is purely
an optimization so clients can mix this value into their "is this file
uploaded?" local cache keys.
If you deleted any blobs (or suspect any were corrupted), it's best to
delete this file so clients can safely re-upload them.
`)
return ioutil.WriteFile(ds.generationFile(), buf.Bytes(), 0644)
return ds.gen.ResetStorageGeneration()
}

View File

@ -39,6 +39,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/local"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/types"
)
@ -60,6 +61,9 @@ type DiskStorage struct {
// The same lock is shared between queues created from a parent,
// since they interact with overlapping sets of directories.
dirLockMu *sync.RWMutex
// gen will be nil if partition != ""
gen *local.Generationer
}
// New returns a new local disk storage implementation at the provided
@ -79,6 +83,7 @@ func New(root string) (*DiskStorage, error) {
ds := &DiskStorage{
root: root,
dirLockMu: new(sync.RWMutex),
gen: local.NewGenerationer(root),
}
if _, _, err := ds.StorageGeneration(); err != nil {
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)