mirror of https://github.com/perkeep/perkeep.git
Merge "Add generation support for diskpacked."
This commit is contained in:
commit
736f5d0d21
|
@ -41,6 +41,7 @@ import (
|
||||||
|
|
||||||
"camlistore.org/pkg/blob"
|
"camlistore.org/pkg/blob"
|
||||||
"camlistore.org/pkg/blobserver"
|
"camlistore.org/pkg/blobserver"
|
||||||
|
"camlistore.org/pkg/blobserver/local"
|
||||||
"camlistore.org/pkg/index"
|
"camlistore.org/pkg/index"
|
||||||
"camlistore.org/pkg/index/kvfile"
|
"camlistore.org/pkg/index/kvfile"
|
||||||
"camlistore.org/pkg/jsonconfig"
|
"camlistore.org/pkg/jsonconfig"
|
||||||
|
@ -63,6 +64,8 @@ type storage struct {
|
||||||
currentO int64 // current offset
|
currentO int64 // current offset
|
||||||
closed bool
|
closed bool
|
||||||
closeErr error
|
closeErr error
|
||||||
|
|
||||||
|
*local.Generationer
|
||||||
}
|
}
|
||||||
|
|
||||||
// newStorage returns a new storage in path root with the given maxFileSize,
|
// newStorage returns a new storage in path root with the given maxFileSize,
|
||||||
|
@ -82,10 +85,15 @@ func newStorage(root string, maxFileSize int64) (*storage, error) {
|
||||||
if maxFileSize <= 0 {
|
if maxFileSize <= 0 {
|
||||||
maxFileSize = defaultMaxFileSize
|
maxFileSize = defaultMaxFileSize
|
||||||
}
|
}
|
||||||
s := &storage{root: root, index: index, maxFileSize: maxFileSize}
|
s := &storage{root: root, index: index, maxFileSize: maxFileSize,
|
||||||
|
Generationer: local.NewGenerationer(root),
|
||||||
|
}
|
||||||
if err := s.openCurrent(); err != nil {
|
if err := s.openCurrent(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if _, _, err := s.StorageGeneration(); err != nil {
|
||||||
|
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
Copyright 2012 Google Inc.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
Package local implements functionality common to both the "localdisk" and
|
||||||
|
"diskpacked" storage mechanisms.
|
||||||
|
*/
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Generationer struct {
|
||||||
|
root string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGenerationer returns a struct for implementing blobserver.Generationer
|
||||||
|
func NewGenerationer(rootDir string) *Generationer {
|
||||||
|
return &Generationer{rootDir}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g Generationer) generationFile() string {
|
||||||
|
return filepath.Join(g.root, "GENERATION.dat")
|
||||||
|
}
|
||||||
|
|
||||||
|
// StorageGeneration returns the generation's initialization time,
|
||||||
|
// and the random string. If the GENERATION.dat file does not exist under
|
||||||
|
// root yet, then creates it now (by calling ResetGeneration).
|
||||||
|
func (g Generationer) StorageGeneration() (initTime time.Time, random string, err error) {
|
||||||
|
f, err := os.Open(g.generationFile())
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
if err = g.ResetStorageGeneration(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f, err = os.Open(g.generationFile())
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
bs, err := ioutil.ReadAll(f)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if i := bytes.IndexByte(bs, '\n'); i != -1 {
|
||||||
|
bs = bs[:i]
|
||||||
|
}
|
||||||
|
if fi, err := f.Stat(); err == nil {
|
||||||
|
initTime = fi.ModTime()
|
||||||
|
}
|
||||||
|
random = string(bs)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetStorageGeneration reinitializes the generation by recreating the
|
||||||
|
// GENERATION.dat file with a new random string
|
||||||
|
func (g Generationer) ResetStorageGeneration() error {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if _, err := io.CopyN(&buf, rand.Reader, 20); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hex := fmt.Sprintf("%x", buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
buf.WriteString(hex)
|
||||||
|
buf.WriteString(`
|
||||||
|
|
||||||
|
This file's random string on the first line is an optimization and
|
||||||
|
paranoia facility for clients.
|
||||||
|
|
||||||
|
If the client sees the same random string in multiple upload sessions,
|
||||||
|
it assumes that the blobserver still has all the same blobs, and also
|
||||||
|
it's the same server. This mechanism is not fundamental to
|
||||||
|
Camlistore's operation: the client could also check each blob before
|
||||||
|
uploading, or enumerate all blobs from the server too. This is purely
|
||||||
|
an optimization so clients can mix this value into their "is this file
|
||||||
|
uploaded?" local cache keys.
|
||||||
|
|
||||||
|
If you deleted any blobs (or suspect any were corrupted), it's best to
|
||||||
|
delete this file so clients can safely re-upload them.
|
||||||
|
|
||||||
|
`)
|
||||||
|
|
||||||
|
return ioutil.WriteFile(g.generationFile(), buf.Bytes(), 0644)
|
||||||
|
}
|
|
@ -17,82 +17,30 @@ limitations under the License.
|
||||||
package localdisk
|
package localdisk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/rand"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"camlistore.org/pkg/blobserver"
|
"camlistore.org/pkg/blobserver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Compile-time check that *DiskStorage implements blobserver.Generationer
|
||||||
var _ blobserver.Generationer = (*DiskStorage)(nil)
|
var _ blobserver.Generationer = (*DiskStorage)(nil)
|
||||||
|
|
||||||
func (ds *DiskStorage) generationFile() string {
|
// StorageGeneration returns the generation's initialization time,
|
||||||
return filepath.Join(ds.root, "GENERATION.dat")
|
// and the random string.
|
||||||
}
|
|
||||||
|
|
||||||
func (ds *DiskStorage) StorageGeneration() (initTime time.Time, random string, err error) {
|
func (ds *DiskStorage) StorageGeneration() (initTime time.Time, random string, err error) {
|
||||||
if ds.partition != "" {
|
if ds.partition != "" {
|
||||||
err = fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
|
err = fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
f, err := os.Open(ds.generationFile())
|
return ds.gen.StorageGeneration()
|
||||||
if os.IsNotExist(err) {
|
|
||||||
if err = ds.ResetStorageGeneration(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
f, err = os.Open(ds.generationFile())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
bs, err := ioutil.ReadAll(f)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if i := bytes.IndexByte(bs, '\n'); i != -1 {
|
|
||||||
bs = bs[:i]
|
|
||||||
}
|
|
||||||
if fi, err := f.Stat(); err == nil {
|
|
||||||
initTime = fi.ModTime()
|
|
||||||
}
|
|
||||||
random = string(bs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetStorageGeneration reinitializes the generation by recreating the
|
||||||
|
// GENERATION.dat file with a new random string
|
||||||
func (ds *DiskStorage) ResetStorageGeneration() error {
|
func (ds *DiskStorage) ResetStorageGeneration() error {
|
||||||
if ds.partition != "" {
|
if ds.partition != "" {
|
||||||
return fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
|
return fmt.Errorf("localdisk: can't call StorageGeneration on queue partition %q", ds.partition)
|
||||||
}
|
}
|
||||||
var buf bytes.Buffer
|
return ds.gen.ResetStorageGeneration()
|
||||||
if _, err := io.CopyN(&buf, rand.Reader, 20); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hex := fmt.Sprintf("%x", buf.Bytes())
|
|
||||||
buf.Reset()
|
|
||||||
buf.WriteString(hex)
|
|
||||||
buf.WriteString(`
|
|
||||||
|
|
||||||
This file's random string on the first line is an optimization and
|
|
||||||
paranoia facility for clients.
|
|
||||||
|
|
||||||
If the client sees the same random string in multiple upload sessions,
|
|
||||||
it assumes that the blobserver still has all the same blobs, and also
|
|
||||||
it's the same server. This mechanism is not fundamental to
|
|
||||||
Camlistore's operation: the client could also check each blob before
|
|
||||||
uploading, or enumerate all blobs from the server too. This is purely
|
|
||||||
an optimization so clients can mix this value into their "is this file
|
|
||||||
uploaded?" local cache keys.
|
|
||||||
|
|
||||||
If you deleted any blobs (or suspect any were corrupted), it's best to
|
|
||||||
delete this file so clients can safely re-upload them.
|
|
||||||
|
|
||||||
`)
|
|
||||||
|
|
||||||
return ioutil.WriteFile(ds.generationFile(), buf.Bytes(), 0644)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ import (
|
||||||
|
|
||||||
"camlistore.org/pkg/blob"
|
"camlistore.org/pkg/blob"
|
||||||
"camlistore.org/pkg/blobserver"
|
"camlistore.org/pkg/blobserver"
|
||||||
|
"camlistore.org/pkg/blobserver/local"
|
||||||
"camlistore.org/pkg/jsonconfig"
|
"camlistore.org/pkg/jsonconfig"
|
||||||
"camlistore.org/pkg/types"
|
"camlistore.org/pkg/types"
|
||||||
)
|
)
|
||||||
|
@ -60,6 +61,9 @@ type DiskStorage struct {
|
||||||
// The same lock is shared between queues created from a parent,
|
// The same lock is shared between queues created from a parent,
|
||||||
// since they interact with overlapping sets of directories.
|
// since they interact with overlapping sets of directories.
|
||||||
dirLockMu *sync.RWMutex
|
dirLockMu *sync.RWMutex
|
||||||
|
|
||||||
|
// gen will be nil if partition != ""
|
||||||
|
gen *local.Generationer
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new local disk storage implementation at the provided
|
// New returns a new local disk storage implementation at the provided
|
||||||
|
@ -79,6 +83,7 @@ func New(root string) (*DiskStorage, error) {
|
||||||
ds := &DiskStorage{
|
ds := &DiskStorage{
|
||||||
root: root,
|
root: root,
|
||||||
dirLockMu: new(sync.RWMutex),
|
dirLockMu: new(sync.RWMutex),
|
||||||
|
gen: local.NewGenerationer(root),
|
||||||
}
|
}
|
||||||
if _, _, err := ds.StorageGeneration(); err != nil {
|
if _, _, err := ds.StorageGeneration(); err != nil {
|
||||||
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
|
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
|
||||||
|
|
Loading…
Reference in New Issue