mirror of https://github.com/perkeep/perkeep.git
blobserver/{localdisk,files}: move generic localdisk code to the files package
Just code movement. Except I did delete some 5 year old localdisk migration code that's no longer relevant. Updates #1111 (this is prep for SFTP support) Change-Id: Ibe1de1d4d804a6c86919a9df454ab125027e4c33
This commit is contained in:
parent
87d22ea432
commit
a4d0cc6ab7
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
package files
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -49,7 +49,7 @@ func (ee *enumerateError) Error() string {
|
|||
}
|
||||
|
||||
// readBlobs implements EnumerateBlobs. It calls itself recursively on subdirectories.
|
||||
func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) error {
|
||||
func (ds *Storage) readBlobs(ctx context.Context, opts readBlobRequest) error {
|
||||
dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto)
|
||||
names, err := ds.fs.ReadDirNames(dirFullPath)
|
||||
if err != nil {
|
||||
|
@ -165,7 +165,7 @@ func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
func (ds *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
defer close(dest)
|
||||
if limit == 0 {
|
||||
log.Printf("Warning: localdisk.EnumerateBlobs called with a limit of 0")
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
package files_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -22,16 +22,35 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
"perkeep.org/pkg/blobserver/files"
|
||||
"perkeep.org/pkg/test"
|
||||
. "perkeep.org/pkg/test/asserts"
|
||||
. "perkeep.org/pkg/test/asserts" // delete this grossness
|
||||
)
|
||||
|
||||
var (
|
||||
epochLock sync.Mutex
|
||||
rootEpoch = 0
|
||||
)
|
||||
|
||||
func NewTestStorage(t *testing.T) (sto blobserver.Storage, root string) {
|
||||
epochLock.Lock()
|
||||
rootEpoch++
|
||||
path := fmt.Sprintf("%s/camli-testroot-%d-%d", os.TempDir(), os.Getpid(), rootEpoch)
|
||||
epochLock.Unlock()
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
t.Fatalf("Failed to create temp directory %q: %v", path, err)
|
||||
}
|
||||
return files.NewStorage(files.OSFS(), path), path
|
||||
}
|
||||
|
||||
func TestEnumerate(t *testing.T) {
|
||||
ds := NewStorage(t)
|
||||
defer cleanUp(ds)
|
||||
ds, root := NewTestStorage(t)
|
||||
defer os.RemoveAll(root)
|
||||
|
||||
// For test simplicity foo, bar, and baz all have ascending
|
||||
// sha1s and lengths.
|
||||
|
@ -86,8 +105,8 @@ func TestEnumerate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEnumerateEmpty(t *testing.T) {
|
||||
ds := NewStorage(t)
|
||||
defer cleanUp(ds)
|
||||
ds, root := NewTestStorage(t)
|
||||
defer os.RemoveAll(root)
|
||||
|
||||
limit := 5000
|
||||
ch := make(chan blob.SizedRef)
|
||||
|
@ -116,8 +135,8 @@ func (sb SortedSizedBlobs) Swap(i, j int) {
|
|||
}
|
||||
|
||||
func TestEnumerateIsSorted(t *testing.T) {
|
||||
ds := NewStorage(t)
|
||||
defer cleanUp(ds)
|
||||
ds, root := NewTestStorage(t)
|
||||
defer os.RemoveAll(root)
|
||||
|
||||
const blobsToMake = 250
|
||||
t.Logf("Uploading test blobs...")
|
||||
|
@ -129,13 +148,13 @@ func TestEnumerateIsSorted(t *testing.T) {
|
|||
// Make some fake blobs in other partitions to confuse the
|
||||
// enumerate code.
|
||||
// TODO(bradfitz): remove this eventually.
|
||||
fakeDir := ds.root + "/partition/queue-indexer/sha1/1f0/710"
|
||||
fakeDir := root + "/partition/queue-indexer/sha1/1f0/710"
|
||||
ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating fakeDir")
|
||||
ExpectNil(t, ioutil.WriteFile(fakeDir+"/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat",
|
||||
[]byte("fake file"), 0644), "writing fake blob")
|
||||
|
||||
// And the same for a "cache" directory, used by the default configuration.
|
||||
fakeDir = ds.root + "/cache/sha1/1f0/710"
|
||||
fakeDir = root + "/cache/sha1/1f0/710"
|
||||
ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating cache fakeDir")
|
||||
ExpectNil(t, ioutil.WriteFile(fakeDir+"/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat",
|
||||
[]byte("fake file"), 0644), "writing fake blob")
|
|
@ -21,9 +21,20 @@ type directly; it's used by "localdisk" and in the future "sftp" and
|
|||
"webdav".
|
||||
*/
|
||||
package files // import "perkeep.org/pkg/blobserver/files"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
|
||||
"go4.org/syncutil"
|
||||
)
|
||||
|
||||
// VFS describes the virtual filesystem needed by this package.
|
||||
|
@ -66,3 +77,153 @@ type ReadableFile interface {
|
|||
io.Seeker
|
||||
io.Closer
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
fs VFS
|
||||
root string
|
||||
|
||||
// dirLockMu must be held for writing when deleting an empty directory
|
||||
// and for read when receiving blobs.
|
||||
dirLockMu *sync.RWMutex
|
||||
|
||||
// statGate limits how many pending Stat calls we have in flight.
|
||||
statGate *syncutil.Gate
|
||||
|
||||
// tmpFileGate limits the number of temporary files open at the same
|
||||
// time, so we don't run into the max set by ulimit. It is nil on
|
||||
// systems (Windows) where we don't know the maximum number of open
|
||||
// file descriptors.
|
||||
tmpFileGate *syncutil.Gate
|
||||
}
|
||||
|
||||
// SetNewFileGate sets a gate (counting semaphore) on the number of new files
|
||||
// that may be opened for writing at a time.
|
||||
func (s *Storage) SetNewFileGate(g *syncutil.Gate) { s.tmpFileGate = g }
|
||||
|
||||
func NewStorage(fs VFS, root string) *Storage {
|
||||
return &Storage{
|
||||
fs: fs,
|
||||
root: root,
|
||||
dirLockMu: new(sync.RWMutex),
|
||||
statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later?
|
||||
}
|
||||
}
|
||||
|
||||
func (ds *Storage) tryRemoveDir(dir string) {
|
||||
ds.dirLockMu.Lock()
|
||||
defer ds.dirLockMu.Unlock()
|
||||
ds.fs.RemoveDir(dir) // ignore error
|
||||
}
|
||||
|
||||
func (ds *Storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
|
||||
return ds.fetch(ctx, br, 0, -1)
|
||||
}
|
||||
|
||||
func (ds *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
|
||||
if offset < 0 || length < 0 {
|
||||
return nil, blob.ErrNegativeSubFetch
|
||||
}
|
||||
rc, _, err := ds.fetch(ctx, br, offset, length)
|
||||
return rc, err
|
||||
}
|
||||
|
||||
// u32 converts n to an uint32, or panics if n is out of range
|
||||
func u32(n int64) uint32 {
|
||||
if n < 0 || n > math.MaxUint32 {
|
||||
panic("bad size " + fmt.Sprint(n))
|
||||
}
|
||||
return uint32(n)
|
||||
}
|
||||
|
||||
// length -1 means entire file
|
||||
func (ds *Storage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
|
||||
// TODO: use ctx, if the os package ever supports that.
|
||||
fileName := ds.blobPath(br)
|
||||
stat, err := ds.fs.Stat(fileName)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, 0, os.ErrNotExist
|
||||
}
|
||||
size = u32(stat.Size())
|
||||
file, err := ds.fs.Open(fileName)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = os.ErrNotExist
|
||||
}
|
||||
return nil, 0, err
|
||||
}
|
||||
// normal Fetch
|
||||
if length < 0 && offset == 0 {
|
||||
return file, size, nil
|
||||
}
|
||||
// SubFetch:
|
||||
if offset < 0 || offset > stat.Size() {
|
||||
if offset < 0 {
|
||||
return nil, 0, blob.ErrNegativeSubFetch
|
||||
}
|
||||
return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
|
||||
}
|
||||
if offset != 0 {
|
||||
if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset {
|
||||
file.Close()
|
||||
return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err)
|
||||
}
|
||||
}
|
||||
return struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}{
|
||||
Reader: io.LimitReader(file, length),
|
||||
Closer: file,
|
||||
}, 0 /* unused */, nil
|
||||
}
|
||||
|
||||
func (ds *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
|
||||
for _, blob := range blobs {
|
||||
fileName := ds.blobPath(blob)
|
||||
err := ds.fs.Remove(fileName)
|
||||
switch {
|
||||
case err == nil:
|
||||
continue
|
||||
case os.IsNotExist(err):
|
||||
// deleting already-deleted file; harmless.
|
||||
continue
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func blobFileBaseName(b blob.Ref) string {
|
||||
return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
|
||||
}
|
||||
|
||||
func (ds *Storage) blobDirectory(b blob.Ref) string {
|
||||
d := b.Digest()
|
||||
if len(d) < 4 {
|
||||
d = d + "____"
|
||||
}
|
||||
return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4])
|
||||
}
|
||||
|
||||
func (ds *Storage) blobPath(b blob.Ref) string {
|
||||
return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b))
|
||||
}
|
||||
|
||||
const maxParallelStats = 20
|
||||
|
||||
var statGate = syncutil.NewGate(maxParallelStats)
|
||||
|
||||
func (ds *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
|
||||
return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) {
|
||||
fi, err := ds.fs.Stat(ds.blobPath(ref))
|
||||
switch {
|
||||
case err == nil && fi.Mode().IsRegular():
|
||||
return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil
|
||||
case err != nil && !os.IsNotExist(err):
|
||||
return sb, err
|
||||
}
|
||||
return sb, nil
|
||||
|
||||
})
|
||||
}
|
||||
|
|
|
@ -14,27 +14,4 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"perkeep.org/pkg/blob"
|
||||
)
|
||||
|
||||
func blobFileBaseName(b blob.Ref) string {
|
||||
return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) blobDirectory(b blob.Ref) string {
|
||||
d := b.Digest()
|
||||
if len(d) < 4 {
|
||||
d = d + "____"
|
||||
}
|
||||
return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4])
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) blobPath(b blob.Ref) string {
|
||||
return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b))
|
||||
}
|
||||
package files_test
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
Copyright 2018 The Perkeep Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package files
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
// OSFS returns an implementation of VFS interface using the host filesystem.
|
||||
func OSFS() VFS {
|
||||
return osFS{}
|
||||
}
|
||||
|
||||
// osFS implements the VFS interface using the os package and
|
||||
// the host filesystem.
|
||||
type osFS struct{}
|
||||
|
||||
func (osFS) Remove(path string) error { return os.Remove(path) }
|
||||
func (osFS) RemoveDir(path string) error { return os.Remove(path) }
|
||||
func (osFS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) }
|
||||
func (osFS) Lstat(path string) (os.FileInfo, error) { return os.Lstat(path) }
|
||||
func (osFS) Open(path string) (ReadableFile, error) { return os.Open(path) }
|
||||
func (osFS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) }
|
||||
|
||||
func (osFS) Rename(oldname, newname string) error {
|
||||
err := os.Rename(oldname, newname)
|
||||
if err != nil {
|
||||
err = mapRenameError(err, oldname, newname)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (osFS) TempFile(dir, prefix string) (WritableFile, error) {
|
||||
f, err := ioutil.TempFile(dir, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (osFS) ReadDirNames(dir string) ([]string, error) {
|
||||
d, err := os.Open(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer d.Close()
|
||||
return d.Readdirnames(-1)
|
||||
}
|
|
@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
package files
|
||||
|
||||
func mapRenameError(err error, oldfile, newfile string) error {
|
||||
return err
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
package files
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
package files
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
@ -25,7 +25,7 @@ import (
|
|||
|
||||
func TestPaths(t *testing.T) {
|
||||
br := blob.MustParse("digalg-abc")
|
||||
ds := &DiskStorage{root: "/tmp/dir"}
|
||||
ds := &Storage{root: "/tmp/dir"}
|
||||
|
||||
slash := filepath.ToSlash
|
||||
if e, g := "/tmp/dir/digalg/ab/c_", slash(ds.blobDirectory(br)); e != g {
|
|
@ -14,11 +14,10 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
package files
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -26,28 +25,28 @@ import (
|
|||
"perkeep.org/pkg/blob"
|
||||
)
|
||||
|
||||
func (ds *DiskStorage) startGate() {
|
||||
func (ds *Storage) startGate() {
|
||||
if ds.tmpFileGate == nil {
|
||||
return
|
||||
}
|
||||
ds.tmpFileGate.Start()
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) doneGate() {
|
||||
func (ds *Storage) doneGate() {
|
||||
if ds.tmpFileGate == nil {
|
||||
return
|
||||
}
|
||||
ds.tmpFileGate.Done()
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (ref blob.SizedRef, err error) {
|
||||
func (ds *Storage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) {
|
||||
ds.dirLockMu.RLock()
|
||||
defer ds.dirLockMu.RUnlock()
|
||||
|
||||
hashedDirectory := ds.blobDirectory(blobRef)
|
||||
err = ds.fs.MkdirAll(hashedDirectory, 0700)
|
||||
err := ds.fs.MkdirAll(hashedDirectory, 0700)
|
||||
if err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
// TODO(mpl): warn when we hit the gate, and at a limited rate, like maximum once a minute.
|
||||
|
@ -56,7 +55,7 @@ func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source
|
|||
tempFile, err := ds.fs.TempFile(hashedDirectory, blobFileBaseName(blobRef)+".tmp")
|
||||
if err != nil {
|
||||
ds.doneGate()
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
success := false // set true later
|
||||
|
@ -70,37 +69,33 @@ func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source
|
|||
|
||||
written, err := io.Copy(tempFile, source)
|
||||
if err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
if err = tempFile.Sync(); err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
if err = tempFile.Close(); err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
stat, err := ds.fs.Lstat(tempFile.Name())
|
||||
if err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
if stat.Size() != written {
|
||||
err = fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written)
|
||||
return
|
||||
return blob.SizedRef{}, fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written)
|
||||
}
|
||||
|
||||
fileName := ds.blobPath(blobRef)
|
||||
if err = ds.fs.Rename(tempFile.Name(), fileName); err != nil {
|
||||
if err = mapRenameError(err, tempFile.Name(), fileName); err != nil {
|
||||
return
|
||||
}
|
||||
if err := ds.fs.Rename(tempFile.Name(), fileName); err != nil {
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
stat, err = ds.fs.Lstat(fileName)
|
||||
if err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
if stat.Size() != written {
|
||||
err = errors.New("written size didn't match")
|
||||
return
|
||||
return blob.SizedRef{}, fmt.Errorf("files: wrote %d bytes but stat after said %d bytes", written, stat.Size())
|
||||
}
|
||||
|
||||
success = true // used in defer above
|
|
@ -32,15 +32,11 @@ package localdisk // import "perkeep.org/pkg/blobserver/localdisk"
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"perkeep.org/internal/osutil"
|
||||
"perkeep.org/pkg/blob"
|
||||
|
@ -55,25 +51,12 @@ import (
|
|||
// DiskStorage implements the blobserver.Storage interface using the
|
||||
// local filesystem.
|
||||
type DiskStorage struct {
|
||||
blobserver.Storage
|
||||
|
||||
root string
|
||||
|
||||
fs files.VFS
|
||||
|
||||
// dirLockMu must be held for writing when deleting an empty directory
|
||||
// and for read when receiving blobs.
|
||||
dirLockMu *sync.RWMutex
|
||||
|
||||
// gen will be nil if partition != ""
|
||||
gen *local.Generationer
|
||||
|
||||
// tmpFileGate limits the number of temporary files open at the same
|
||||
// time, so we don't run into the max set by ulimit. It is nil on
|
||||
// systems (Windows) where we don't know the maximum number of open
|
||||
// file descriptors.
|
||||
tmpFileGate *syncutil.Gate
|
||||
|
||||
// statGate limits how many pending Stat calls we have in flight.
|
||||
statGate *syncutil.Gate
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) String() string {
|
||||
|
@ -121,15 +104,11 @@ func New(root string) (*DiskStorage, error) {
|
|||
if !fi.IsDir() {
|
||||
return nil, fmt.Errorf("storage root %q exists but is not a directory", root)
|
||||
}
|
||||
fileSto := files.NewStorage(files.OSFS(), root)
|
||||
ds := &DiskStorage{
|
||||
fs: osFS{},
|
||||
root: root,
|
||||
dirLockMu: new(sync.RWMutex),
|
||||
gen: local.NewGenerationer(root),
|
||||
statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later?
|
||||
}
|
||||
if err := ds.migrate3to2(); err != nil {
|
||||
return nil, fmt.Errorf("Error updating localdisk format: %v", err)
|
||||
Storage: fileSto,
|
||||
root: root,
|
||||
gen: local.NewGenerationer(root),
|
||||
}
|
||||
if _, _, err := ds.StorageGeneration(); err != nil {
|
||||
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
|
||||
|
@ -147,7 +126,8 @@ func New(root string) (*DiskStorage, error) {
|
|||
}
|
||||
// Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Perkeep.
|
||||
// TODO(mpl): make this used and enforced Perkeep-wide. Issue #837.
|
||||
ds.tmpFileGate = syncutil.NewGate(int(ul * 80 / 100))
|
||||
fileSto.SetNewFileGate(syncutil.NewGate(int(ul * 80 / 100)))
|
||||
|
||||
err = ds.checkFS()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -167,93 +147,10 @@ func init() {
|
|||
blobserver.RegisterStorageConstructor("filesystem", blobserver.StorageConstructor(newFromConfig))
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) tryRemoveDir(dir string) {
|
||||
ds.dirLockMu.Lock()
|
||||
defer ds.dirLockMu.Unlock()
|
||||
ds.fs.RemoveDir(dir) // ignore error
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
|
||||
return ds.fetch(ctx, br, 0, -1)
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
|
||||
if offset < 0 || length < 0 {
|
||||
return nil, blob.ErrNegativeSubFetch
|
||||
}
|
||||
rc, _, err := ds.fetch(ctx, br, offset, length)
|
||||
return rc, err
|
||||
}
|
||||
|
||||
// u32 converts n to an uint32, or panics if n is out of range
|
||||
func u32(n int64) uint32 {
|
||||
if n < 0 || n > math.MaxUint32 {
|
||||
panic("bad size " + fmt.Sprint(n))
|
||||
}
|
||||
return uint32(n)
|
||||
}
|
||||
|
||||
// length -1 means entire file
|
||||
func (ds *DiskStorage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
|
||||
// TODO: use ctx, if the os package ever supports that.
|
||||
fileName := ds.blobPath(br)
|
||||
stat, err := ds.fs.Stat(fileName)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, 0, os.ErrNotExist
|
||||
}
|
||||
size = u32(stat.Size())
|
||||
file, err := ds.fs.Open(fileName)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = os.ErrNotExist
|
||||
}
|
||||
return nil, 0, err
|
||||
}
|
||||
// normal Fetch
|
||||
if length < 0 && offset == 0 {
|
||||
return file, size, nil
|
||||
}
|
||||
// SubFetch:
|
||||
if offset < 0 || offset > stat.Size() {
|
||||
if offset < 0 {
|
||||
return nil, 0, blob.ErrNegativeSubFetch
|
||||
}
|
||||
return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
|
||||
}
|
||||
if offset != 0 {
|
||||
if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset {
|
||||
file.Close()
|
||||
return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err)
|
||||
}
|
||||
}
|
||||
return struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}{
|
||||
Reader: io.LimitReader(file, length),
|
||||
Closer: file,
|
||||
}, 0 /* unused */, nil
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
|
||||
for _, blob := range blobs {
|
||||
fileName := ds.blobPath(blob)
|
||||
err := ds.fs.Remove(fileName)
|
||||
switch {
|
||||
case err == nil:
|
||||
continue
|
||||
case os.IsNotExist(err):
|
||||
// deleting already-deleted file; harmless.
|
||||
continue
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkFS verifies the DiskStorage root storage path
|
||||
// operations include: stat, read/write file, mkdir, delete (files and directories)
|
||||
//
|
||||
// TODO: move this into the files package too?
|
||||
func (ds *DiskStorage) checkFS() (ret error) {
|
||||
tempdir, err := ioutil.TempDir(ds.root, "")
|
||||
if err != nil {
|
||||
|
@ -297,32 +194,3 @@ func (ds *DiskStorage) checkFS() (ret error) {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// osFS implements the files.VFS interface using the os package and
|
||||
// the host filesystem.
|
||||
type osFS struct{}
|
||||
|
||||
func (osFS) Remove(path string) error { return os.Remove(path) }
|
||||
func (osFS) RemoveDir(path string) error { return os.Remove(path) }
|
||||
func (osFS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) }
|
||||
func (osFS) Lstat(path string) (os.FileInfo, error) { return os.Lstat(path) }
|
||||
func (osFS) Open(path string) (files.ReadableFile, error) { return os.Open(path) }
|
||||
func (osFS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) }
|
||||
func (osFS) Rename(oldname, newname string) error { return os.Rename(oldname, newname) }
|
||||
|
||||
func (osFS) TempFile(dir, prefix string) (files.WritableFile, error) {
|
||||
f, err := ioutil.TempFile(dir, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (osFS) ReadDirNames(dir string) ([]string, error) {
|
||||
d, err := os.Open(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer d.Close()
|
||||
return d.Readdirnames(-1)
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
"perkeep.org/pkg/blobserver/files"
|
||||
"perkeep.org/pkg/blobserver/storagetest"
|
||||
"perkeep.org/pkg/test"
|
||||
)
|
||||
|
@ -105,6 +106,7 @@ func TestMultiStat(t *testing.T) {
|
|||
// In addition to the two "foo" and "bar" blobs, add
|
||||
// maxParallelStats other dummy blobs, to exercise the stat
|
||||
// rate-limiting (which had a deadlock once after a cleanup)
|
||||
const maxParallelStats = 20
|
||||
for i := 0; i < maxParallelStats; i++ {
|
||||
blobs = append(blobs, blob.RefFromString(strconv.Itoa(i)))
|
||||
}
|
||||
|
@ -144,15 +146,6 @@ func TestMissingGetReturnsNoEnt(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func rename(old, new string) error {
|
||||
if err := os.Rename(old, new); err != nil {
|
||||
if renameErr := mapRenameError(err, old, new); renameErr != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type file struct {
|
||||
name string
|
||||
contents string
|
||||
|
@ -162,6 +155,7 @@ func TestRename(t *testing.T) {
|
|||
if runtime.GOOS != "windows" {
|
||||
t.Skip("Skipping test if not on windows")
|
||||
}
|
||||
var rename = files.OSFS().Rename
|
||||
files := []file{
|
||||
{name: filepath.Join(os.TempDir(), "foo"), contents: "foo"},
|
||||
{name: filepath.Join(os.TempDir(), "bar"), contents: "barr"},
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
Copyright 2011 The Perkeep Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package localdisk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
|
||||
"go4.org/syncutil"
|
||||
)
|
||||
|
||||
const maxParallelStats = 20
|
||||
|
||||
var statGate = syncutil.NewGate(maxParallelStats)
|
||||
|
||||
func (ds *DiskStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
|
||||
return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) {
|
||||
fi, err := ds.fs.Stat(ds.blobPath(ref))
|
||||
switch {
|
||||
case err == nil && fi.Mode().IsRegular():
|
||||
return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil
|
||||
case err != nil && !os.IsNotExist(err):
|
||||
return sb, err
|
||||
}
|
||||
return sb, nil
|
||||
|
||||
})
|
||||
}
|
|
@ -1,146 +0,0 @@
|
|||
/*
|
||||
Copyright 2013 The Perkeep Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This file deals with the migration of stuff from the old xxx/yyy/sha1-xxxyyyzzz.dat
|
||||
// files to the xx/yy format.
|
||||
|
||||
package localdisk
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"perkeep.org/pkg/blob"
|
||||
)
|
||||
|
||||
func (ds *DiskStorage) migrate3to2() error {
|
||||
sha1root := filepath.Join(ds.root, "sha1")
|
||||
f, err := os.Open(sha1root)
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
names, err := f.Readdirnames(-1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.Close()
|
||||
var three []string
|
||||
for _, name := range names {
|
||||
if len(name) == 3 {
|
||||
three = append(three, name)
|
||||
}
|
||||
}
|
||||
if len(three) == 0 {
|
||||
return nil
|
||||
}
|
||||
sort.Strings(three)
|
||||
made := make(map[string]bool) // dirs made
|
||||
for i, dir := range three {
|
||||
log.Printf("Migrating structure of %d/%d directories in %s; doing %q", i+1, len(three), sha1root, dir)
|
||||
fullDir := filepath.Join(sha1root, dir)
|
||||
err := filepath.Walk(fullDir, func(path string, fi os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
baseName := filepath.Base(path)
|
||||
|
||||
// Cases like "sha1-7ea231f1bd008e04c0629666ba695c399db76b8a.dat.tmp546786166"
|
||||
if strings.Contains(baseName, ".dat.tmp") && fi.Mode().IsRegular() &&
|
||||
fi.ModTime().Before(time.Now().Add(-24*time.Hour)) {
|
||||
return ds.cleanupTempFile(path)
|
||||
}
|
||||
|
||||
if !(fi.Mode().IsRegular() && strings.HasSuffix(baseName, ".dat")) {
|
||||
return nil
|
||||
}
|
||||
br, ok := blob.Parse(strings.TrimSuffix(baseName, ".dat"))
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
dir := ds.blobDirectory(br)
|
||||
if !made[dir] {
|
||||
if err := os.MkdirAll(dir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
made[dir] = true
|
||||
}
|
||||
dst := ds.blobPath(br)
|
||||
if fi, err := os.Stat(dst); !os.IsNotExist(err) {
|
||||
return fmt.Errorf("Expected %s to not exist; got stat %v, %v", dst, fi, err)
|
||||
}
|
||||
if err := os.Rename(path, dst); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := removeEmptyDirOrDirs(fullDir); err != nil {
|
||||
log.Printf("Failed to remove old dir %s: %v", fullDir, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeEmptyDirOrDirs(dir string) error {
|
||||
err := filepath.Walk(dir, func(subdir string, fi os.FileInfo, err error) error {
|
||||
if subdir == dir {
|
||||
// root.
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fi.Mode().IsDir() {
|
||||
removeEmptyDirOrDirs(subdir)
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Remove(dir)
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) cleanupTempFile(path string) error {
|
||||
base := filepath.Base(path)
|
||||
i := strings.Index(base, ".dat.tmp")
|
||||
if i < 0 {
|
||||
return nil
|
||||
}
|
||||
br, ok := blob.Parse(base[:i])
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If it already exists at the good path, delete it.
|
||||
goodPath := ds.blobPath(br)
|
||||
if _, err := os.Stat(goodPath); err == nil {
|
||||
return os.Remove(path)
|
||||
}
|
||||
|
||||
// TODO(bradfitz): care whether it's correct digest or not?
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue