blobserver/{localdisk,files}: move generic localdisk code to the files package

Just code movement.

Except I did delete some 5 year old localdisk migration code that's no
longer relevant.

Updates #1111 (this is prep for SFTP support)

Change-Id: Ibe1de1d4d804a6c86919a9df454ab125027e4c33
This commit is contained in:
Brad Fitzpatrick 2018-04-29 20:58:37 -07:00
parent 87d22ea432
commit a4d0cc6ab7
13 changed files with 290 additions and 404 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
package files
import (
"context"
@ -49,7 +49,7 @@ func (ee *enumerateError) Error() string {
}
// readBlobs implements EnumerateBlobs. It calls itself recursively on subdirectories.
func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) error {
func (ds *Storage) readBlobs(ctx context.Context, opts readBlobRequest) error {
dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto)
names, err := ds.fs.ReadDirNames(dirFullPath)
if err != nil {
@ -165,7 +165,7 @@ func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) erro
return nil
}
func (ds *DiskStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
func (ds *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
if limit == 0 {
log.Printf("Warning: localdisk.EnumerateBlobs called with a limit of 0")

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
package files_test
import (
"context"
@ -22,16 +22,35 @@ import (
"io/ioutil"
"os"
"sort"
"sync"
"testing"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/blobserver"
"perkeep.org/pkg/blobserver/files"
"perkeep.org/pkg/test"
. "perkeep.org/pkg/test/asserts"
. "perkeep.org/pkg/test/asserts" // delete this grossness
)
var (
epochLock sync.Mutex
rootEpoch = 0
)
func NewTestStorage(t *testing.T) (sto blobserver.Storage, root string) {
epochLock.Lock()
rootEpoch++
path := fmt.Sprintf("%s/camli-testroot-%d-%d", os.TempDir(), os.Getpid(), rootEpoch)
epochLock.Unlock()
if err := os.MkdirAll(path, 0755); err != nil {
t.Fatalf("Failed to create temp directory %q: %v", path, err)
}
return files.NewStorage(files.OSFS(), path), path
}
func TestEnumerate(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
ds, root := NewTestStorage(t)
defer os.RemoveAll(root)
// For test simplicity foo, bar, and baz all have ascending
// sha1s and lengths.
@ -86,8 +105,8 @@ func TestEnumerate(t *testing.T) {
}
func TestEnumerateEmpty(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
ds, root := NewTestStorage(t)
defer os.RemoveAll(root)
limit := 5000
ch := make(chan blob.SizedRef)
@ -116,8 +135,8 @@ func (sb SortedSizedBlobs) Swap(i, j int) {
}
func TestEnumerateIsSorted(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
ds, root := NewTestStorage(t)
defer os.RemoveAll(root)
const blobsToMake = 250
t.Logf("Uploading test blobs...")
@ -129,13 +148,13 @@ func TestEnumerateIsSorted(t *testing.T) {
// Make some fake blobs in other partitions to confuse the
// enumerate code.
// TODO(bradfitz): remove this eventually.
fakeDir := ds.root + "/partition/queue-indexer/sha1/1f0/710"
fakeDir := root + "/partition/queue-indexer/sha1/1f0/710"
ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating fakeDir")
ExpectNil(t, ioutil.WriteFile(fakeDir+"/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat",
[]byte("fake file"), 0644), "writing fake blob")
// And the same for a "cache" directory, used by the default configuration.
fakeDir = ds.root + "/cache/sha1/1f0/710"
fakeDir = root + "/cache/sha1/1f0/710"
ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating cache fakeDir")
ExpectNil(t, ioutil.WriteFile(fakeDir+"/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat",
[]byte("fake file"), 0644), "writing fake blob")

View File

@ -21,9 +21,20 @@ type directly; it's used by "localdisk" and in the future "sftp" and
"webdav".
*/
package files // import "perkeep.org/pkg/blobserver/files"
import (
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sync"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/blobserver"
"go4.org/syncutil"
)
// VFS describes the virtual filesystem needed by this package.
@ -66,3 +77,153 @@ type ReadableFile interface {
io.Seeker
io.Closer
}
type Storage struct {
fs VFS
root string
// dirLockMu must be held for writing when deleting an empty directory
// and for read when receiving blobs.
dirLockMu *sync.RWMutex
// statGate limits how many pending Stat calls we have in flight.
statGate *syncutil.Gate
// tmpFileGate limits the number of temporary files open at the same
// time, so we don't run into the max set by ulimit. It is nil on
// systems (Windows) where we don't know the maximum number of open
// file descriptors.
tmpFileGate *syncutil.Gate
}
// SetNewFileGate sets a gate (counting semaphore) on the number of new files
// that may be opened for writing at a time.
func (s *Storage) SetNewFileGate(g *syncutil.Gate) { s.tmpFileGate = g }
func NewStorage(fs VFS, root string) *Storage {
return &Storage{
fs: fs,
root: root,
dirLockMu: new(sync.RWMutex),
statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later?
}
}
func (ds *Storage) tryRemoveDir(dir string) {
ds.dirLockMu.Lock()
defer ds.dirLockMu.Unlock()
ds.fs.RemoveDir(dir) // ignore error
}
func (ds *Storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
return ds.fetch(ctx, br, 0, -1)
}
func (ds *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
if offset < 0 || length < 0 {
return nil, blob.ErrNegativeSubFetch
}
rc, _, err := ds.fetch(ctx, br, offset, length)
return rc, err
}
// u32 converts n to an uint32, or panics if n is out of range
func u32(n int64) uint32 {
if n < 0 || n > math.MaxUint32 {
panic("bad size " + fmt.Sprint(n))
}
return uint32(n)
}
// length -1 means entire file
func (ds *Storage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
// TODO: use ctx, if the os package ever supports that.
fileName := ds.blobPath(br)
stat, err := ds.fs.Stat(fileName)
if os.IsNotExist(err) {
return nil, 0, os.ErrNotExist
}
size = u32(stat.Size())
file, err := ds.fs.Open(fileName)
if err != nil {
if os.IsNotExist(err) {
err = os.ErrNotExist
}
return nil, 0, err
}
// normal Fetch
if length < 0 && offset == 0 {
return file, size, nil
}
// SubFetch:
if offset < 0 || offset > stat.Size() {
if offset < 0 {
return nil, 0, blob.ErrNegativeSubFetch
}
return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
}
if offset != 0 {
if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset {
file.Close()
return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err)
}
}
return struct {
io.Reader
io.Closer
}{
Reader: io.LimitReader(file, length),
Closer: file,
}, 0 /* unused */, nil
}
func (ds *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
for _, blob := range blobs {
fileName := ds.blobPath(blob)
err := ds.fs.Remove(fileName)
switch {
case err == nil:
continue
case os.IsNotExist(err):
// deleting already-deleted file; harmless.
continue
default:
return err
}
}
return nil
}
func blobFileBaseName(b blob.Ref) string {
return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
}
func (ds *Storage) blobDirectory(b blob.Ref) string {
d := b.Digest()
if len(d) < 4 {
d = d + "____"
}
return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4])
}
func (ds *Storage) blobPath(b blob.Ref) string {
return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b))
}
const maxParallelStats = 20
var statGate = syncutil.NewGate(maxParallelStats)
func (ds *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) {
fi, err := ds.fs.Stat(ds.blobPath(ref))
switch {
case err == nil && fi.Mode().IsRegular():
return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil
case err != nil && !os.IsNotExist(err):
return sb, err
}
return sb, nil
})
}

View File

@ -14,27 +14,4 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
import (
"fmt"
"path/filepath"
"perkeep.org/pkg/blob"
)
func blobFileBaseName(b blob.Ref) string {
return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
}
func (ds *DiskStorage) blobDirectory(b blob.Ref) string {
d := b.Digest()
if len(d) < 4 {
d = d + "____"
}
return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4])
}
func (ds *DiskStorage) blobPath(b blob.Ref) string {
return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b))
}
package files_test

View File

@ -0,0 +1,63 @@
/*
Copyright 2018 The Perkeep Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package files
import (
"io/ioutil"
"os"
)
// OSFS returns an implementation of VFS interface using the host filesystem.
func OSFS() VFS {
return osFS{}
}
// osFS implements the VFS interface using the os package and
// the host filesystem.
type osFS struct{}
func (osFS) Remove(path string) error { return os.Remove(path) }
func (osFS) RemoveDir(path string) error { return os.Remove(path) }
func (osFS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) }
func (osFS) Lstat(path string) (os.FileInfo, error) { return os.Lstat(path) }
func (osFS) Open(path string) (ReadableFile, error) { return os.Open(path) }
func (osFS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) }
func (osFS) Rename(oldname, newname string) error {
err := os.Rename(oldname, newname)
if err != nil {
err = mapRenameError(err, oldname, newname)
}
return err
}
func (osFS) TempFile(dir, prefix string) (WritableFile, error) {
f, err := ioutil.TempFile(dir, prefix)
if err != nil {
return nil, err
}
return f, nil
}
func (osFS) ReadDirNames(dir string) ([]string, error) {
d, err := os.Open(dir)
if err != nil {
return nil, err
}
defer d.Close()
return d.Readdirnames(-1)
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
package files
func mapRenameError(err error, oldfile, newfile string) error {
return err

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
package files
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
package files
import (
"path/filepath"
@ -25,7 +25,7 @@ import (
func TestPaths(t *testing.T) {
br := blob.MustParse("digalg-abc")
ds := &DiskStorage{root: "/tmp/dir"}
ds := &Storage{root: "/tmp/dir"}
slash := filepath.ToSlash
if e, g := "/tmp/dir/digalg/ab/c_", slash(ds.blobDirectory(br)); e != g {

View File

@ -14,11 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
package files
import (
"context"
"errors"
"fmt"
"io"
"log"
@ -26,28 +25,28 @@ import (
"perkeep.org/pkg/blob"
)
func (ds *DiskStorage) startGate() {
func (ds *Storage) startGate() {
if ds.tmpFileGate == nil {
return
}
ds.tmpFileGate.Start()
}
func (ds *DiskStorage) doneGate() {
func (ds *Storage) doneGate() {
if ds.tmpFileGate == nil {
return
}
ds.tmpFileGate.Done()
}
func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (ref blob.SizedRef, err error) {
func (ds *Storage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) {
ds.dirLockMu.RLock()
defer ds.dirLockMu.RUnlock()
hashedDirectory := ds.blobDirectory(blobRef)
err = ds.fs.MkdirAll(hashedDirectory, 0700)
err := ds.fs.MkdirAll(hashedDirectory, 0700)
if err != nil {
return
return blob.SizedRef{}, err
}
// TODO(mpl): warn when we hit the gate, and at a limited rate, like maximum once a minute.
@ -56,7 +55,7 @@ func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source
tempFile, err := ds.fs.TempFile(hashedDirectory, blobFileBaseName(blobRef)+".tmp")
if err != nil {
ds.doneGate()
return
return blob.SizedRef{}, err
}
success := false // set true later
@ -70,37 +69,33 @@ func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source
written, err := io.Copy(tempFile, source)
if err != nil {
return
return blob.SizedRef{}, err
}
if err = tempFile.Sync(); err != nil {
return
return blob.SizedRef{}, err
}
if err = tempFile.Close(); err != nil {
return
return blob.SizedRef{}, err
}
stat, err := ds.fs.Lstat(tempFile.Name())
if err != nil {
return
return blob.SizedRef{}, err
}
if stat.Size() != written {
err = fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written)
return
return blob.SizedRef{}, fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written)
}
fileName := ds.blobPath(blobRef)
if err = ds.fs.Rename(tempFile.Name(), fileName); err != nil {
if err = mapRenameError(err, tempFile.Name(), fileName); err != nil {
return
}
if err := ds.fs.Rename(tempFile.Name(), fileName); err != nil {
return blob.SizedRef{}, err
}
stat, err = ds.fs.Lstat(fileName)
if err != nil {
return
return blob.SizedRef{}, err
}
if stat.Size() != written {
err = errors.New("written size didn't match")
return
return blob.SizedRef{}, fmt.Errorf("files: wrote %d bytes but stat after said %d bytes", written, stat.Size())
}
success = true // used in defer above

View File

@ -32,15 +32,11 @@ package localdisk // import "perkeep.org/pkg/blobserver/localdisk"
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
"sync"
"perkeep.org/internal/osutil"
"perkeep.org/pkg/blob"
@ -55,25 +51,12 @@ import (
// DiskStorage implements the blobserver.Storage interface using the
// local filesystem.
type DiskStorage struct {
blobserver.Storage
root string
fs files.VFS
// dirLockMu must be held for writing when deleting an empty directory
// and for read when receiving blobs.
dirLockMu *sync.RWMutex
// gen will be nil if partition != ""
gen *local.Generationer
// tmpFileGate limits the number of temporary files open at the same
// time, so we don't run into the max set by ulimit. It is nil on
// systems (Windows) where we don't know the maximum number of open
// file descriptors.
tmpFileGate *syncutil.Gate
// statGate limits how many pending Stat calls we have in flight.
statGate *syncutil.Gate
}
func (ds *DiskStorage) String() string {
@ -121,15 +104,11 @@ func New(root string) (*DiskStorage, error) {
if !fi.IsDir() {
return nil, fmt.Errorf("storage root %q exists but is not a directory", root)
}
fileSto := files.NewStorage(files.OSFS(), root)
ds := &DiskStorage{
fs: osFS{},
root: root,
dirLockMu: new(sync.RWMutex),
gen: local.NewGenerationer(root),
statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later?
}
if err := ds.migrate3to2(); err != nil {
return nil, fmt.Errorf("Error updating localdisk format: %v", err)
Storage: fileSto,
root: root,
gen: local.NewGenerationer(root),
}
if _, _, err := ds.StorageGeneration(); err != nil {
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
@ -147,7 +126,8 @@ func New(root string) (*DiskStorage, error) {
}
// Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Perkeep.
// TODO(mpl): make this used and enforced Perkeep-wide. Issue #837.
ds.tmpFileGate = syncutil.NewGate(int(ul * 80 / 100))
fileSto.SetNewFileGate(syncutil.NewGate(int(ul * 80 / 100)))
err = ds.checkFS()
if err != nil {
return nil, err
@ -167,93 +147,10 @@ func init() {
blobserver.RegisterStorageConstructor("filesystem", blobserver.StorageConstructor(newFromConfig))
}
func (ds *DiskStorage) tryRemoveDir(dir string) {
ds.dirLockMu.Lock()
defer ds.dirLockMu.Unlock()
ds.fs.RemoveDir(dir) // ignore error
}
func (ds *DiskStorage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
return ds.fetch(ctx, br, 0, -1)
}
func (ds *DiskStorage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
if offset < 0 || length < 0 {
return nil, blob.ErrNegativeSubFetch
}
rc, _, err := ds.fetch(ctx, br, offset, length)
return rc, err
}
// u32 converts n to an uint32, or panics if n is out of range
func u32(n int64) uint32 {
if n < 0 || n > math.MaxUint32 {
panic("bad size " + fmt.Sprint(n))
}
return uint32(n)
}
// length -1 means entire file
func (ds *DiskStorage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
// TODO: use ctx, if the os package ever supports that.
fileName := ds.blobPath(br)
stat, err := ds.fs.Stat(fileName)
if os.IsNotExist(err) {
return nil, 0, os.ErrNotExist
}
size = u32(stat.Size())
file, err := ds.fs.Open(fileName)
if err != nil {
if os.IsNotExist(err) {
err = os.ErrNotExist
}
return nil, 0, err
}
// normal Fetch
if length < 0 && offset == 0 {
return file, size, nil
}
// SubFetch:
if offset < 0 || offset > stat.Size() {
if offset < 0 {
return nil, 0, blob.ErrNegativeSubFetch
}
return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
}
if offset != 0 {
if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset {
file.Close()
return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err)
}
}
return struct {
io.Reader
io.Closer
}{
Reader: io.LimitReader(file, length),
Closer: file,
}, 0 /* unused */, nil
}
func (ds *DiskStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
for _, blob := range blobs {
fileName := ds.blobPath(blob)
err := ds.fs.Remove(fileName)
switch {
case err == nil:
continue
case os.IsNotExist(err):
// deleting already-deleted file; harmless.
continue
default:
return err
}
}
return nil
}
// checkFS verifies the DiskStorage root storage path
// operations include: stat, read/write file, mkdir, delete (files and directories)
//
// TODO: move this into the files package too?
func (ds *DiskStorage) checkFS() (ret error) {
tempdir, err := ioutil.TempDir(ds.root, "")
if err != nil {
@ -297,32 +194,3 @@ func (ds *DiskStorage) checkFS() (ret error) {
}
return nil
}
// osFS implements the files.VFS interface using the os package and
// the host filesystem.
type osFS struct{}
func (osFS) Remove(path string) error { return os.Remove(path) }
func (osFS) RemoveDir(path string) error { return os.Remove(path) }
func (osFS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) }
func (osFS) Lstat(path string) (os.FileInfo, error) { return os.Lstat(path) }
func (osFS) Open(path string) (files.ReadableFile, error) { return os.Open(path) }
func (osFS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) }
func (osFS) Rename(oldname, newname string) error { return os.Rename(oldname, newname) }
func (osFS) TempFile(dir, prefix string) (files.WritableFile, error) {
f, err := ioutil.TempFile(dir, prefix)
if err != nil {
return nil, err
}
return f, nil
}
func (osFS) ReadDirNames(dir string) ([]string, error) {
d, err := os.Open(dir)
if err != nil {
return nil, err
}
defer d.Close()
return d.Readdirnames(-1)
}

View File

@ -29,6 +29,7 @@ import (
"perkeep.org/pkg/blob"
"perkeep.org/pkg/blobserver"
"perkeep.org/pkg/blobserver/files"
"perkeep.org/pkg/blobserver/storagetest"
"perkeep.org/pkg/test"
)
@ -105,6 +106,7 @@ func TestMultiStat(t *testing.T) {
// In addition to the two "foo" and "bar" blobs, add
// maxParallelStats other dummy blobs, to exercise the stat
// rate-limiting (which had a deadlock once after a cleanup)
const maxParallelStats = 20
for i := 0; i < maxParallelStats; i++ {
blobs = append(blobs, blob.RefFromString(strconv.Itoa(i)))
}
@ -144,15 +146,6 @@ func TestMissingGetReturnsNoEnt(t *testing.T) {
}
}
func rename(old, new string) error {
if err := os.Rename(old, new); err != nil {
if renameErr := mapRenameError(err, old, new); renameErr != nil {
return err
}
}
return nil
}
type file struct {
name string
contents string
@ -162,6 +155,7 @@ func TestRename(t *testing.T) {
if runtime.GOOS != "windows" {
t.Skip("Skipping test if not on windows")
}
var rename = files.OSFS().Rename
files := []file{
{name: filepath.Join(os.TempDir(), "foo"), contents: "foo"},
{name: filepath.Join(os.TempDir(), "bar"), contents: "barr"},

View File

@ -1,45 +0,0 @@
/*
Copyright 2011 The Perkeep Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
import (
"context"
"os"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/blobserver"
"go4.org/syncutil"
)
const maxParallelStats = 20
var statGate = syncutil.NewGate(maxParallelStats)
func (ds *DiskStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) {
fi, err := ds.fs.Stat(ds.blobPath(ref))
switch {
case err == nil && fi.Mode().IsRegular():
return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil
case err != nil && !os.IsNotExist(err):
return sb, err
}
return sb, nil
})
}

View File

@ -1,146 +0,0 @@
/*
Copyright 2013 The Perkeep Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file deals with the migration of stuff from the old xxx/yyy/sha1-xxxyyyzzz.dat
// files to the xx/yy format.
package localdisk
import (
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"
"perkeep.org/pkg/blob"
)
func (ds *DiskStorage) migrate3to2() error {
sha1root := filepath.Join(ds.root, "sha1")
f, err := os.Open(sha1root)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
names, err := f.Readdirnames(-1)
if err != nil {
return err
}
f.Close()
var three []string
for _, name := range names {
if len(name) == 3 {
three = append(three, name)
}
}
if len(three) == 0 {
return nil
}
sort.Strings(three)
made := make(map[string]bool) // dirs made
for i, dir := range three {
log.Printf("Migrating structure of %d/%d directories in %s; doing %q", i+1, len(three), sha1root, dir)
fullDir := filepath.Join(sha1root, dir)
err := filepath.Walk(fullDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
baseName := filepath.Base(path)
// Cases like "sha1-7ea231f1bd008e04c0629666ba695c399db76b8a.dat.tmp546786166"
if strings.Contains(baseName, ".dat.tmp") && fi.Mode().IsRegular() &&
fi.ModTime().Before(time.Now().Add(-24*time.Hour)) {
return ds.cleanupTempFile(path)
}
if !(fi.Mode().IsRegular() && strings.HasSuffix(baseName, ".dat")) {
return nil
}
br, ok := blob.Parse(strings.TrimSuffix(baseName, ".dat"))
if !ok {
return nil
}
dir := ds.blobDirectory(br)
if !made[dir] {
if err := os.MkdirAll(dir, 0700); err != nil {
return err
}
made[dir] = true
}
dst := ds.blobPath(br)
if fi, err := os.Stat(dst); !os.IsNotExist(err) {
return fmt.Errorf("Expected %s to not exist; got stat %v, %v", dst, fi, err)
}
if err := os.Rename(path, dst); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
if err := removeEmptyDirOrDirs(fullDir); err != nil {
log.Printf("Failed to remove old dir %s: %v", fullDir, err)
}
}
return nil
}
func removeEmptyDirOrDirs(dir string) error {
err := filepath.Walk(dir, func(subdir string, fi os.FileInfo, err error) error {
if subdir == dir {
// root.
return nil
}
if err != nil {
return err
}
if fi.Mode().IsDir() {
removeEmptyDirOrDirs(subdir)
return filepath.SkipDir
}
return nil
})
if err != nil {
return err
}
return os.Remove(dir)
}
func (ds *DiskStorage) cleanupTempFile(path string) error {
base := filepath.Base(path)
i := strings.Index(base, ".dat.tmp")
if i < 0 {
return nil
}
br, ok := blob.Parse(base[:i])
if !ok {
return nil
}
// If it already exists at the good path, delete it.
goodPath := ds.blobPath(br)
if _, err := os.Stat(goodPath); err == nil {
return os.Remove(path)
}
// TODO(bradfitz): care whether it's correct digest or not?
return nil
}