diff --git a/pkg/blobserver/localdisk/enumerate.go b/pkg/blobserver/files/enumerate.go similarity index 95% rename from pkg/blobserver/localdisk/enumerate.go rename to pkg/blobserver/files/enumerate.go index 445642636..81af23eb3 100644 --- a/pkg/blobserver/localdisk/enumerate.go +++ b/pkg/blobserver/files/enumerate.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk +package files import ( "context" @@ -49,7 +49,7 @@ func (ee *enumerateError) Error() string { } // readBlobs implements EnumerateBlobs. It calls itself recursively on subdirectories. -func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) error { +func (ds *Storage) readBlobs(ctx context.Context, opts readBlobRequest) error { dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto) names, err := ds.fs.ReadDirNames(dirFullPath) if err != nil { @@ -165,7 +165,7 @@ func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) erro return nil } -func (ds *DiskStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { +func (ds *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) if limit == 0 { log.Printf("Warning: localdisk.EnumerateBlobs called with a limit of 0") diff --git a/pkg/blobserver/localdisk/enumerate_test.go b/pkg/blobserver/files/enumerate_test.go similarity index 83% rename from pkg/blobserver/localdisk/enumerate_test.go rename to pkg/blobserver/files/enumerate_test.go index fe4ad00f1..0afa1e1fe 100644 --- a/pkg/blobserver/localdisk/enumerate_test.go +++ b/pkg/blobserver/files/enumerate_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk +package files_test import ( "context" @@ -22,16 +22,35 @@ import ( "io/ioutil" "os" "sort" + "sync" "testing" "perkeep.org/pkg/blob" + "perkeep.org/pkg/blobserver" + "perkeep.org/pkg/blobserver/files" "perkeep.org/pkg/test" - . "perkeep.org/pkg/test/asserts" + . "perkeep.org/pkg/test/asserts" // delete this grossness ) +var ( + epochLock sync.Mutex + rootEpoch = 0 +) + +func NewTestStorage(t *testing.T) (sto blobserver.Storage, root string) { + epochLock.Lock() + rootEpoch++ + path := fmt.Sprintf("%s/camli-testroot-%d-%d", os.TempDir(), os.Getpid(), rootEpoch) + epochLock.Unlock() + if err := os.MkdirAll(path, 0755); err != nil { + t.Fatalf("Failed to create temp directory %q: %v", path, err) + } + return files.NewStorage(files.OSFS(), path), path +} + func TestEnumerate(t *testing.T) { - ds := NewStorage(t) - defer cleanUp(ds) + ds, root := NewTestStorage(t) + defer os.RemoveAll(root) // For test simplicity foo, bar, and baz all have ascending // sha1s and lengths. @@ -86,8 +105,8 @@ func TestEnumerate(t *testing.T) { } func TestEnumerateEmpty(t *testing.T) { - ds := NewStorage(t) - defer cleanUp(ds) + ds, root := NewTestStorage(t) + defer os.RemoveAll(root) limit := 5000 ch := make(chan blob.SizedRef) @@ -116,8 +135,8 @@ func (sb SortedSizedBlobs) Swap(i, j int) { } func TestEnumerateIsSorted(t *testing.T) { - ds := NewStorage(t) - defer cleanUp(ds) + ds, root := NewTestStorage(t) + defer os.RemoveAll(root) const blobsToMake = 250 t.Logf("Uploading test blobs...") @@ -129,13 +148,13 @@ func TestEnumerateIsSorted(t *testing.T) { // Make some fake blobs in other partitions to confuse the // enumerate code. // TODO(bradfitz): remove this eventually. - fakeDir := ds.root + "/partition/queue-indexer/sha1/1f0/710" + fakeDir := root + "/partition/queue-indexer/sha1/1f0/710" ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating fakeDir") ExpectNil(t, ioutil.WriteFile(fakeDir+"/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat", []byte("fake file"), 0644), "writing fake blob") // And the same for a "cache" directory, used by the default configuration. - fakeDir = ds.root + "/cache/sha1/1f0/710" + fakeDir = root + "/cache/sha1/1f0/710" ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating cache fakeDir") ExpectNil(t, ioutil.WriteFile(fakeDir+"/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat", []byte("fake file"), 0644), "writing fake blob") diff --git a/pkg/blobserver/files/files.go b/pkg/blobserver/files/files.go index b0d2a1a1b..4f7aa2ed1 100644 --- a/pkg/blobserver/files/files.go +++ b/pkg/blobserver/files/files.go @@ -21,9 +21,20 @@ type directly; it's used by "localdisk" and in the future "sftp" and "webdav". */ package files // import "perkeep.org/pkg/blobserver/files" + import ( + "context" + "fmt" "io" + "math" "os" + "path/filepath" + "sync" + + "perkeep.org/pkg/blob" + "perkeep.org/pkg/blobserver" + + "go4.org/syncutil" ) // VFS describes the virtual filesystem needed by this package. @@ -66,3 +77,153 @@ type ReadableFile interface { io.Seeker io.Closer } + +type Storage struct { + fs VFS + root string + + // dirLockMu must be held for writing when deleting an empty directory + // and for read when receiving blobs. + dirLockMu *sync.RWMutex + + // statGate limits how many pending Stat calls we have in flight. + statGate *syncutil.Gate + + // tmpFileGate limits the number of temporary files open at the same + // time, so we don't run into the max set by ulimit. It is nil on + // systems (Windows) where we don't know the maximum number of open + // file descriptors. + tmpFileGate *syncutil.Gate +} + +// SetNewFileGate sets a gate (counting semaphore) on the number of new files +// that may be opened for writing at a time. +func (s *Storage) SetNewFileGate(g *syncutil.Gate) { s.tmpFileGate = g } + +func NewStorage(fs VFS, root string) *Storage { + return &Storage{ + fs: fs, + root: root, + dirLockMu: new(sync.RWMutex), + statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later? + } +} + +func (ds *Storage) tryRemoveDir(dir string) { + ds.dirLockMu.Lock() + defer ds.dirLockMu.Unlock() + ds.fs.RemoveDir(dir) // ignore error +} + +func (ds *Storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) { + return ds.fetch(ctx, br, 0, -1) +} + +func (ds *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) { + if offset < 0 || length < 0 { + return nil, blob.ErrNegativeSubFetch + } + rc, _, err := ds.fetch(ctx, br, offset, length) + return rc, err +} + +// u32 converts n to an uint32, or panics if n is out of range +func u32(n int64) uint32 { + if n < 0 || n > math.MaxUint32 { + panic("bad size " + fmt.Sprint(n)) + } + return uint32(n) +} + +// length -1 means entire file +func (ds *Storage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) { + // TODO: use ctx, if the os package ever supports that. + fileName := ds.blobPath(br) + stat, err := ds.fs.Stat(fileName) + if os.IsNotExist(err) { + return nil, 0, os.ErrNotExist + } + size = u32(stat.Size()) + file, err := ds.fs.Open(fileName) + if err != nil { + if os.IsNotExist(err) { + err = os.ErrNotExist + } + return nil, 0, err + } + // normal Fetch + if length < 0 && offset == 0 { + return file, size, nil + } + // SubFetch: + if offset < 0 || offset > stat.Size() { + if offset < 0 { + return nil, 0, blob.ErrNegativeSubFetch + } + return nil, 0, blob.ErrOutOfRangeOffsetSubFetch + } + if offset != 0 { + if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset { + file.Close() + return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err) + } + } + return struct { + io.Reader + io.Closer + }{ + Reader: io.LimitReader(file, length), + Closer: file, + }, 0 /* unused */, nil +} + +func (ds *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { + for _, blob := range blobs { + fileName := ds.blobPath(blob) + err := ds.fs.Remove(fileName) + switch { + case err == nil: + continue + case os.IsNotExist(err): + // deleting already-deleted file; harmless. + continue + default: + return err + } + } + return nil +} + +func blobFileBaseName(b blob.Ref) string { + return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest()) +} + +func (ds *Storage) blobDirectory(b blob.Ref) string { + d := b.Digest() + if len(d) < 4 { + d = d + "____" + } + return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4]) +} + +func (ds *Storage) blobPath(b blob.Ref) string { + return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b)) +} + +const maxParallelStats = 20 + +var statGate = syncutil.NewGate(maxParallelStats) + +func (ds *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error { + return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) { + fi, err := ds.fs.Stat(ds.blobPath(ref)) + switch { + case err == nil && fi.Mode().IsRegular(): + return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil + case err != nil && !os.IsNotExist(err): + return sb, err + } + return sb, nil + + }) +} diff --git a/pkg/blobserver/localdisk/path.go b/pkg/blobserver/files/files_test.go similarity index 54% rename from pkg/blobserver/localdisk/path.go rename to pkg/blobserver/files/files_test.go index 1f7b6fd91..454f591b1 100644 --- a/pkg/blobserver/localdisk/path.go +++ b/pkg/blobserver/files/files_test.go @@ -14,27 +14,4 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk - -import ( - "fmt" - "path/filepath" - - "perkeep.org/pkg/blob" -) - -func blobFileBaseName(b blob.Ref) string { - return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest()) -} - -func (ds *DiskStorage) blobDirectory(b blob.Ref) string { - d := b.Digest() - if len(d) < 4 { - d = d + "____" - } - return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4]) -} - -func (ds *DiskStorage) blobPath(b blob.Ref) string { - return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b)) -} +package files_test diff --git a/pkg/blobserver/files/osfs.go b/pkg/blobserver/files/osfs.go new file mode 100644 index 000000000..a068de250 --- /dev/null +++ b/pkg/blobserver/files/osfs.go @@ -0,0 +1,63 @@ +/* +Copyright 2018 The Perkeep Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package files + +import ( + "io/ioutil" + "os" +) + +// OSFS returns an implementation of VFS interface using the host filesystem. +func OSFS() VFS { + return osFS{} +} + +// osFS implements the VFS interface using the os package and +// the host filesystem. +type osFS struct{} + +func (osFS) Remove(path string) error { return os.Remove(path) } +func (osFS) RemoveDir(path string) error { return os.Remove(path) } +func (osFS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) } +func (osFS) Lstat(path string) (os.FileInfo, error) { return os.Lstat(path) } +func (osFS) Open(path string) (ReadableFile, error) { return os.Open(path) } +func (osFS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) } + +func (osFS) Rename(oldname, newname string) error { + err := os.Rename(oldname, newname) + if err != nil { + err = mapRenameError(err, oldname, newname) + } + return err +} + +func (osFS) TempFile(dir, prefix string) (WritableFile, error) { + f, err := ioutil.TempFile(dir, prefix) + if err != nil { + return nil, err + } + return f, nil +} + +func (osFS) ReadDirNames(dir string) ([]string, error) { + d, err := os.Open(dir) + if err != nil { + return nil, err + } + defer d.Close() + return d.Readdirnames(-1) +} diff --git a/pkg/blobserver/localdisk/receive_posix.go b/pkg/blobserver/files/osfs_posix.go similarity index 97% rename from pkg/blobserver/localdisk/receive_posix.go rename to pkg/blobserver/files/osfs_posix.go index e3a245696..45e690b1a 100644 --- a/pkg/blobserver/localdisk/receive_posix.go +++ b/pkg/blobserver/files/osfs_posix.go @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk +package files func mapRenameError(err error, oldfile, newfile string) error { return err diff --git a/pkg/blobserver/localdisk/receive_windows.go b/pkg/blobserver/files/osfs_windows.go similarity index 98% rename from pkg/blobserver/localdisk/receive_windows.go rename to pkg/blobserver/files/osfs_windows.go index 74c264049..1ec2ea575 100644 --- a/pkg/blobserver/localdisk/receive_windows.go +++ b/pkg/blobserver/files/osfs_windows.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk +package files import ( "fmt" diff --git a/pkg/blobserver/localdisk/path_test.go b/pkg/blobserver/files/path_test.go similarity index 94% rename from pkg/blobserver/localdisk/path_test.go rename to pkg/blobserver/files/path_test.go index 7d37a2efc..f8ca97522 100644 --- a/pkg/blobserver/localdisk/path_test.go +++ b/pkg/blobserver/files/path_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk +package files import ( "path/filepath" @@ -25,7 +25,7 @@ import ( func TestPaths(t *testing.T) { br := blob.MustParse("digalg-abc") - ds := &DiskStorage{root: "/tmp/dir"} + ds := &Storage{root: "/tmp/dir"} slash := filepath.ToSlash if e, g := "/tmp/dir/digalg/ab/c_", slash(ds.blobDirectory(br)); e != g { diff --git a/pkg/blobserver/localdisk/receive.go b/pkg/blobserver/files/receive.go similarity index 70% rename from pkg/blobserver/localdisk/receive.go rename to pkg/blobserver/files/receive.go index 6cfc0c358..d833e0e89 100644 --- a/pkg/blobserver/localdisk/receive.go +++ b/pkg/blobserver/files/receive.go @@ -14,11 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -package localdisk +package files import ( "context" - "errors" "fmt" "io" "log" @@ -26,28 +25,28 @@ import ( "perkeep.org/pkg/blob" ) -func (ds *DiskStorage) startGate() { +func (ds *Storage) startGate() { if ds.tmpFileGate == nil { return } ds.tmpFileGate.Start() } -func (ds *DiskStorage) doneGate() { +func (ds *Storage) doneGate() { if ds.tmpFileGate == nil { return } ds.tmpFileGate.Done() } -func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (ref blob.SizedRef, err error) { +func (ds *Storage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) { ds.dirLockMu.RLock() defer ds.dirLockMu.RUnlock() hashedDirectory := ds.blobDirectory(blobRef) - err = ds.fs.MkdirAll(hashedDirectory, 0700) + err := ds.fs.MkdirAll(hashedDirectory, 0700) if err != nil { - return + return blob.SizedRef{}, err } // TODO(mpl): warn when we hit the gate, and at a limited rate, like maximum once a minute. @@ -56,7 +55,7 @@ func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source tempFile, err := ds.fs.TempFile(hashedDirectory, blobFileBaseName(blobRef)+".tmp") if err != nil { ds.doneGate() - return + return blob.SizedRef{}, err } success := false // set true later @@ -70,37 +69,33 @@ func (ds *DiskStorage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source written, err := io.Copy(tempFile, source) if err != nil { - return + return blob.SizedRef{}, err } if err = tempFile.Sync(); err != nil { - return + return blob.SizedRef{}, err } if err = tempFile.Close(); err != nil { - return + return blob.SizedRef{}, err } stat, err := ds.fs.Lstat(tempFile.Name()) if err != nil { - return + return blob.SizedRef{}, err } if stat.Size() != written { - err = fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written) - return + return blob.SizedRef{}, fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written) } fileName := ds.blobPath(blobRef) - if err = ds.fs.Rename(tempFile.Name(), fileName); err != nil { - if err = mapRenameError(err, tempFile.Name(), fileName); err != nil { - return - } + if err := ds.fs.Rename(tempFile.Name(), fileName); err != nil { + return blob.SizedRef{}, err } stat, err = ds.fs.Lstat(fileName) if err != nil { - return + return blob.SizedRef{}, err } if stat.Size() != written { - err = errors.New("written size didn't match") - return + return blob.SizedRef{}, fmt.Errorf("files: wrote %d bytes but stat after said %d bytes", written, stat.Size()) } success = true // used in defer above diff --git a/pkg/blobserver/localdisk/localdisk.go b/pkg/blobserver/localdisk/localdisk.go index 875c9128e..98e94229c 100644 --- a/pkg/blobserver/localdisk/localdisk.go +++ b/pkg/blobserver/localdisk/localdisk.go @@ -32,15 +32,11 @@ package localdisk // import "perkeep.org/pkg/blobserver/localdisk" import ( "bytes" - "context" "fmt" - "io" "io/ioutil" "log" - "math" "os" "path/filepath" - "sync" "perkeep.org/internal/osutil" "perkeep.org/pkg/blob" @@ -55,25 +51,12 @@ import ( // DiskStorage implements the blobserver.Storage interface using the // local filesystem. type DiskStorage struct { + blobserver.Storage + root string - fs files.VFS - - // dirLockMu must be held for writing when deleting an empty directory - // and for read when receiving blobs. - dirLockMu *sync.RWMutex - // gen will be nil if partition != "" gen *local.Generationer - - // tmpFileGate limits the number of temporary files open at the same - // time, so we don't run into the max set by ulimit. It is nil on - // systems (Windows) where we don't know the maximum number of open - // file descriptors. - tmpFileGate *syncutil.Gate - - // statGate limits how many pending Stat calls we have in flight. - statGate *syncutil.Gate } func (ds *DiskStorage) String() string { @@ -121,15 +104,11 @@ func New(root string) (*DiskStorage, error) { if !fi.IsDir() { return nil, fmt.Errorf("storage root %q exists but is not a directory", root) } + fileSto := files.NewStorage(files.OSFS(), root) ds := &DiskStorage{ - fs: osFS{}, - root: root, - dirLockMu: new(sync.RWMutex), - gen: local.NewGenerationer(root), - statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later? - } - if err := ds.migrate3to2(); err != nil { - return nil, fmt.Errorf("Error updating localdisk format: %v", err) + Storage: fileSto, + root: root, + gen: local.NewGenerationer(root), } if _, _, err := ds.StorageGeneration(); err != nil { return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err) @@ -147,7 +126,8 @@ func New(root string) (*DiskStorage, error) { } // Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Perkeep. // TODO(mpl): make this used and enforced Perkeep-wide. Issue #837. - ds.tmpFileGate = syncutil.NewGate(int(ul * 80 / 100)) + fileSto.SetNewFileGate(syncutil.NewGate(int(ul * 80 / 100))) + err = ds.checkFS() if err != nil { return nil, err @@ -167,93 +147,10 @@ func init() { blobserver.RegisterStorageConstructor("filesystem", blobserver.StorageConstructor(newFromConfig)) } -func (ds *DiskStorage) tryRemoveDir(dir string) { - ds.dirLockMu.Lock() - defer ds.dirLockMu.Unlock() - ds.fs.RemoveDir(dir) // ignore error -} - -func (ds *DiskStorage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) { - return ds.fetch(ctx, br, 0, -1) -} - -func (ds *DiskStorage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) { - if offset < 0 || length < 0 { - return nil, blob.ErrNegativeSubFetch - } - rc, _, err := ds.fetch(ctx, br, offset, length) - return rc, err -} - -// u32 converts n to an uint32, or panics if n is out of range -func u32(n int64) uint32 { - if n < 0 || n > math.MaxUint32 { - panic("bad size " + fmt.Sprint(n)) - } - return uint32(n) -} - -// length -1 means entire file -func (ds *DiskStorage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) { - // TODO: use ctx, if the os package ever supports that. - fileName := ds.blobPath(br) - stat, err := ds.fs.Stat(fileName) - if os.IsNotExist(err) { - return nil, 0, os.ErrNotExist - } - size = u32(stat.Size()) - file, err := ds.fs.Open(fileName) - if err != nil { - if os.IsNotExist(err) { - err = os.ErrNotExist - } - return nil, 0, err - } - // normal Fetch - if length < 0 && offset == 0 { - return file, size, nil - } - // SubFetch: - if offset < 0 || offset > stat.Size() { - if offset < 0 { - return nil, 0, blob.ErrNegativeSubFetch - } - return nil, 0, blob.ErrOutOfRangeOffsetSubFetch - } - if offset != 0 { - if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset { - file.Close() - return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err) - } - } - return struct { - io.Reader - io.Closer - }{ - Reader: io.LimitReader(file, length), - Closer: file, - }, 0 /* unused */, nil -} - -func (ds *DiskStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { - for _, blob := range blobs { - fileName := ds.blobPath(blob) - err := ds.fs.Remove(fileName) - switch { - case err == nil: - continue - case os.IsNotExist(err): - // deleting already-deleted file; harmless. - continue - default: - return err - } - } - return nil -} - // checkFS verifies the DiskStorage root storage path // operations include: stat, read/write file, mkdir, delete (files and directories) +// +// TODO: move this into the files package too? func (ds *DiskStorage) checkFS() (ret error) { tempdir, err := ioutil.TempDir(ds.root, "") if err != nil { @@ -297,32 +194,3 @@ func (ds *DiskStorage) checkFS() (ret error) { } return nil } - -// osFS implements the files.VFS interface using the os package and -// the host filesystem. -type osFS struct{} - -func (osFS) Remove(path string) error { return os.Remove(path) } -func (osFS) RemoveDir(path string) error { return os.Remove(path) } -func (osFS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) } -func (osFS) Lstat(path string) (os.FileInfo, error) { return os.Lstat(path) } -func (osFS) Open(path string) (files.ReadableFile, error) { return os.Open(path) } -func (osFS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) } -func (osFS) Rename(oldname, newname string) error { return os.Rename(oldname, newname) } - -func (osFS) TempFile(dir, prefix string) (files.WritableFile, error) { - f, err := ioutil.TempFile(dir, prefix) - if err != nil { - return nil, err - } - return f, nil -} - -func (osFS) ReadDirNames(dir string) ([]string, error) { - d, err := os.Open(dir) - if err != nil { - return nil, err - } - defer d.Close() - return d.Readdirnames(-1) -} diff --git a/pkg/blobserver/localdisk/localdisk_test.go b/pkg/blobserver/localdisk/localdisk_test.go index 29b7cba99..a0f616263 100644 --- a/pkg/blobserver/localdisk/localdisk_test.go +++ b/pkg/blobserver/localdisk/localdisk_test.go @@ -29,6 +29,7 @@ import ( "perkeep.org/pkg/blob" "perkeep.org/pkg/blobserver" + "perkeep.org/pkg/blobserver/files" "perkeep.org/pkg/blobserver/storagetest" "perkeep.org/pkg/test" ) @@ -105,6 +106,7 @@ func TestMultiStat(t *testing.T) { // In addition to the two "foo" and "bar" blobs, add // maxParallelStats other dummy blobs, to exercise the stat // rate-limiting (which had a deadlock once after a cleanup) + const maxParallelStats = 20 for i := 0; i < maxParallelStats; i++ { blobs = append(blobs, blob.RefFromString(strconv.Itoa(i))) } @@ -144,15 +146,6 @@ func TestMissingGetReturnsNoEnt(t *testing.T) { } } -func rename(old, new string) error { - if err := os.Rename(old, new); err != nil { - if renameErr := mapRenameError(err, old, new); renameErr != nil { - return err - } - } - return nil -} - type file struct { name string contents string @@ -162,6 +155,7 @@ func TestRename(t *testing.T) { if runtime.GOOS != "windows" { t.Skip("Skipping test if not on windows") } + var rename = files.OSFS().Rename files := []file{ {name: filepath.Join(os.TempDir(), "foo"), contents: "foo"}, {name: filepath.Join(os.TempDir(), "bar"), contents: "barr"}, diff --git a/pkg/blobserver/localdisk/stat.go b/pkg/blobserver/localdisk/stat.go deleted file mode 100644 index f8eca77a2..000000000 --- a/pkg/blobserver/localdisk/stat.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2011 The Perkeep Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package localdisk - -import ( - "context" - "os" - - "perkeep.org/pkg/blob" - "perkeep.org/pkg/blobserver" - - "go4.org/syncutil" -) - -const maxParallelStats = 20 - -var statGate = syncutil.NewGate(maxParallelStats) - -func (ds *DiskStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error { - return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) { - fi, err := ds.fs.Stat(ds.blobPath(ref)) - switch { - case err == nil && fi.Mode().IsRegular(): - return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil - case err != nil && !os.IsNotExist(err): - return sb, err - } - return sb, nil - - }) -} diff --git a/pkg/blobserver/localdisk/upgrade32.go b/pkg/blobserver/localdisk/upgrade32.go deleted file mode 100644 index b8dc7db96..000000000 --- a/pkg/blobserver/localdisk/upgrade32.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2013 The Perkeep Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This file deals with the migration of stuff from the old xxx/yyy/sha1-xxxyyyzzz.dat -// files to the xx/yy format. - -package localdisk - -import ( - "fmt" - "log" - "os" - "path/filepath" - "sort" - "strings" - "time" - - "perkeep.org/pkg/blob" -) - -func (ds *DiskStorage) migrate3to2() error { - sha1root := filepath.Join(ds.root, "sha1") - f, err := os.Open(sha1root) - if os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - names, err := f.Readdirnames(-1) - if err != nil { - return err - } - f.Close() - var three []string - for _, name := range names { - if len(name) == 3 { - three = append(three, name) - } - } - if len(three) == 0 { - return nil - } - sort.Strings(three) - made := make(map[string]bool) // dirs made - for i, dir := range three { - log.Printf("Migrating structure of %d/%d directories in %s; doing %q", i+1, len(three), sha1root, dir) - fullDir := filepath.Join(sha1root, dir) - err := filepath.Walk(fullDir, func(path string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - baseName := filepath.Base(path) - - // Cases like "sha1-7ea231f1bd008e04c0629666ba695c399db76b8a.dat.tmp546786166" - if strings.Contains(baseName, ".dat.tmp") && fi.Mode().IsRegular() && - fi.ModTime().Before(time.Now().Add(-24*time.Hour)) { - return ds.cleanupTempFile(path) - } - - if !(fi.Mode().IsRegular() && strings.HasSuffix(baseName, ".dat")) { - return nil - } - br, ok := blob.Parse(strings.TrimSuffix(baseName, ".dat")) - if !ok { - return nil - } - dir := ds.blobDirectory(br) - if !made[dir] { - if err := os.MkdirAll(dir, 0700); err != nil { - return err - } - made[dir] = true - } - dst := ds.blobPath(br) - if fi, err := os.Stat(dst); !os.IsNotExist(err) { - return fmt.Errorf("Expected %s to not exist; got stat %v, %v", dst, fi, err) - } - if err := os.Rename(path, dst); err != nil { - return err - } - return nil - }) - if err != nil { - return err - } - if err := removeEmptyDirOrDirs(fullDir); err != nil { - log.Printf("Failed to remove old dir %s: %v", fullDir, err) - } - } - return nil -} - -func removeEmptyDirOrDirs(dir string) error { - err := filepath.Walk(dir, func(subdir string, fi os.FileInfo, err error) error { - if subdir == dir { - // root. - return nil - } - if err != nil { - return err - } - if fi.Mode().IsDir() { - removeEmptyDirOrDirs(subdir) - return filepath.SkipDir - } - return nil - }) - if err != nil { - return err - } - return os.Remove(dir) -} - -func (ds *DiskStorage) cleanupTempFile(path string) error { - base := filepath.Base(path) - i := strings.Index(base, ".dat.tmp") - if i < 0 { - return nil - } - br, ok := blob.Parse(base[:i]) - if !ok { - return nil - } - - // If it already exists at the good path, delete it. - goodPath := ds.blobPath(br) - if _, err := os.Stat(goodPath); err == nil { - return os.Remove(path) - } - - // TODO(bradfitz): care whether it's correct digest or not? - return nil -}