pkg/blobserver/localdisk: simplify code, limit stat concurrency

Don't create an unbounded number of stat goroutines.

Change-Id: Ie66cc9c680bd83e649966258a8e7ef09c8af5c62
This commit is contained in:
Brad Fitzpatrick 2017-12-29 11:18:30 -08:00
parent 59ac14caa5
commit 956a0a810b
2 changed files with 71 additions and 45 deletions

View File

@ -24,12 +24,12 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"camlistore.org/pkg/blob"
)
type readBlobRequest struct {
done <-chan struct{}
ch chan<- blob.SizedRef
after string
remain *int // limit countdown
@ -45,14 +45,15 @@ type enumerateError struct {
}
func (ee *enumerateError) Error() string {
return fmt.Sprintf("Enumerate error: %s: %v", ee.msg, ee.err)
return fmt.Sprintf("localdisk enumerate error: %s: %v", ee.msg, ee.err)
}
func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
// readBlobs implements EnumerateBlobs. It calls itself recursively on subdirectories.
func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) error {
dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto)
dir, err := os.Open(dirFullPath)
if err != nil {
return &enumerateError{"localdisk: opening directory " + dirFullPath, err}
return &enumerateError{"opening directory " + dirFullPath, err}
}
names, err := dir.Readdirnames(-1)
dir.Close()
@ -65,27 +66,40 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
return nil
}
if err != nil {
return &enumerateError{"localdisk: readdirnames of " + dirFullPath, err}
return &enumerateError{"readdirnames of " + dirFullPath, err}
}
sort.Strings(names)
stat := make(map[string]chan interface{}) // gets sent error or os.FileInfo
stat := make(map[string]*future) // name -> future<os.FileInfo>
var toStat []func()
for _, name := range names {
if skipDir(name) || isShardDir(name) {
continue
}
ch := make(chan interface{}, 1) // 1 in case it's not read
name := name
stat[name] = ch
go func() {
fi, err := os.Stat(filepath.Join(dirFullPath, name))
fullFile := filepath.Join(dirFullPath, name)
f := newFuture(func() (os.FileInfo, error) {
fi, err := os.Stat(fullFile)
if err != nil {
ch <- err
} else {
ch <- fi
return nil, &enumerateError{"stat", err}
}
}()
return fi, nil
})
stat[name] = f
toStat = append(toStat, f.run)
}
// Start pre-statting things.
go func() {
for _, f := range toStat {
ds.statGate.Start()
f := f
go func() {
ds.statGate.Done()
f()
}()
}
}()
for _, name := range names {
if *opts.remain == 0 {
return nil
@ -93,30 +107,17 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
if skipDir(name) {
continue
}
var (
fi os.FileInfo
err error
didStat bool
)
stat := func() {
if didStat {
return
isDir := isShardDir(name)
if !isDir {
fi, err := stat[name].Get()
if err != nil {
return err
}
didStat = true
fiv := <-stat[name]
var ok bool
if err, ok = fiv.(error); ok {
err = &enumerateError{"localdisk: stat of file " + filepath.Join(dirFullPath, name), err}
} else {
fi = fiv.(os.FileInfo)
}
}
isDir := func() bool {
stat()
return fi != nil && fi.IsDir()
isDir = fi.IsDir()
}
if isShardDir(name) || isDir() {
if isDir {
var newBlobPrefix string
if opts.blobPrefix == "" {
newBlobPrefix = name + "-"
@ -135,18 +136,22 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
ropts := opts
ropts.blobPrefix = newBlobPrefix
ropts.pathInto = opts.pathInto + "/" + name
if err := ds.readBlobs(ropts); err != nil {
if err := ds.readBlobs(ctx, ropts); err != nil {
return err
}
continue
}
stat()
if !strings.HasSuffix(name, ".dat") {
continue
}
fi, err := stat[name].Get()
if err != nil {
return err
}
if !fi.IsDir() && strings.HasSuffix(name, ".dat") {
if !fi.IsDir() {
blobName := strings.TrimSuffix(name, ".dat")
if blobName <= opts.after {
continue
@ -154,12 +159,11 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
if blobRef, ok := blob.Parse(blobName); ok {
select {
case opts.ch <- blob.SizedRef{Ref: blobRef, Size: uint32(fi.Size())}:
case <-opts.done:
return context.Canceled
(*opts.remain)--
case <-ctx.Done():
return ctx.Err()
}
(*opts.remain)--
}
continue
}
}
@ -173,8 +177,7 @@ func (ds *DiskStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.Size
}
limitMutable := limit
return ds.readBlobs(readBlobRequest{
done: ctx.Done(),
return ds.readBlobs(ctx, readBlobRequest{
ch: dest,
dirRoot: ds.root,
after: after,
@ -197,3 +200,22 @@ func isShardDir(name string) bool {
func isHex(b byte) bool {
return ('0' <= b && b <= '9') || ('a' <= b && b <= 'f')
}
// future is an os.FileInfo future.
type future struct {
once sync.Once
f func() (os.FileInfo, error)
v os.FileInfo
err error
}
func newFuture(f func() (os.FileInfo, error)) *future {
return &future{f: f}
}
func (f *future) Get() (os.FileInfo, error) {
f.once.Do(f.run)
return f.v, f.err
}
func (f *future) run() { f.v, f.err = f.f() }

View File

@ -64,6 +64,9 @@ type DiskStorage struct {
// systems (Windows) where we don't know the maximum number of open
// file descriptors.
tmpFileGate *syncutil.Gate
// statGate limits how many pending Stat calls we have in flight.
statGate *syncutil.Gate
}
func (ds *DiskStorage) String() string {
@ -112,6 +115,7 @@ func New(root string) (*DiskStorage, error) {
root: root,
dirLockMu: new(sync.RWMutex),
gen: local.NewGenerationer(root),
statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later?
}
if err := ds.migrate3to2(); err != nil {
return nil, fmt.Errorf("Error updating localdisk format: %v", err)