From 956a0a810b3acf194e0dd1664ef27f9b553a2ec5 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 29 Dec 2017 11:18:30 -0800 Subject: [PATCH] pkg/blobserver/localdisk: simplify code, limit stat concurrency Don't create an unbounded number of stat goroutines. Change-Id: Ie66cc9c680bd83e649966258a8e7ef09c8af5c62 --- pkg/blobserver/localdisk/enumerate.go | 112 +++++++++++++++----------- pkg/blobserver/localdisk/localdisk.go | 4 + 2 files changed, 71 insertions(+), 45 deletions(-) diff --git a/pkg/blobserver/localdisk/enumerate.go b/pkg/blobserver/localdisk/enumerate.go index 349e9d7a6..493fb74f4 100644 --- a/pkg/blobserver/localdisk/enumerate.go +++ b/pkg/blobserver/localdisk/enumerate.go @@ -24,12 +24,12 @@ import ( "path/filepath" "sort" "strings" + "sync" "camlistore.org/pkg/blob" ) type readBlobRequest struct { - done <-chan struct{} ch chan<- blob.SizedRef after string remain *int // limit countdown @@ -45,14 +45,15 @@ type enumerateError struct { } func (ee *enumerateError) Error() string { - return fmt.Sprintf("Enumerate error: %s: %v", ee.msg, ee.err) + return fmt.Sprintf("localdisk enumerate error: %s: %v", ee.msg, ee.err) } -func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { +// readBlobs implements EnumerateBlobs. It calls itself recursively on subdirectories. +func (ds *DiskStorage) readBlobs(ctx context.Context, opts readBlobRequest) error { dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto) dir, err := os.Open(dirFullPath) if err != nil { - return &enumerateError{"localdisk: opening directory " + dirFullPath, err} + return &enumerateError{"opening directory " + dirFullPath, err} } names, err := dir.Readdirnames(-1) dir.Close() @@ -65,27 +66,40 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { return nil } if err != nil { - return &enumerateError{"localdisk: readdirnames of " + dirFullPath, err} + return &enumerateError{"readdirnames of " + dirFullPath, err} } sort.Strings(names) - stat := make(map[string]chan interface{}) // gets sent error or os.FileInfo + stat := make(map[string]*future) // name -> future + + var toStat []func() for _, name := range names { if skipDir(name) || isShardDir(name) { continue } - ch := make(chan interface{}, 1) // 1 in case it's not read - name := name - stat[name] = ch - go func() { - fi, err := os.Stat(filepath.Join(dirFullPath, name)) + fullFile := filepath.Join(dirFullPath, name) + f := newFuture(func() (os.FileInfo, error) { + fi, err := os.Stat(fullFile) if err != nil { - ch <- err - } else { - ch <- fi + return nil, &enumerateError{"stat", err} } - }() + return fi, nil + }) + stat[name] = f + toStat = append(toStat, f.run) } + // Start pre-statting things. + go func() { + for _, f := range toStat { + ds.statGate.Start() + f := f + go func() { + ds.statGate.Done() + f() + }() + } + }() + for _, name := range names { if *opts.remain == 0 { return nil @@ -93,30 +107,17 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { if skipDir(name) { continue } - var ( - fi os.FileInfo - err error - didStat bool - ) - stat := func() { - if didStat { - return + + isDir := isShardDir(name) + if !isDir { + fi, err := stat[name].Get() + if err != nil { + return err } - didStat = true - fiv := <-stat[name] - var ok bool - if err, ok = fiv.(error); ok { - err = &enumerateError{"localdisk: stat of file " + filepath.Join(dirFullPath, name), err} - } else { - fi = fiv.(os.FileInfo) - } - } - isDir := func() bool { - stat() - return fi != nil && fi.IsDir() + isDir = fi.IsDir() } - if isShardDir(name) || isDir() { + if isDir { var newBlobPrefix string if opts.blobPrefix == "" { newBlobPrefix = name + "-" @@ -135,18 +136,22 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { ropts := opts ropts.blobPrefix = newBlobPrefix ropts.pathInto = opts.pathInto + "/" + name - if err := ds.readBlobs(ropts); err != nil { + if err := ds.readBlobs(ctx, ropts); err != nil { return err } continue } - stat() + if !strings.HasSuffix(name, ".dat") { + continue + } + + fi, err := stat[name].Get() if err != nil { return err } - if !fi.IsDir() && strings.HasSuffix(name, ".dat") { + if !fi.IsDir() { blobName := strings.TrimSuffix(name, ".dat") if blobName <= opts.after { continue @@ -154,12 +159,11 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { if blobRef, ok := blob.Parse(blobName); ok { select { case opts.ch <- blob.SizedRef{Ref: blobRef, Size: uint32(fi.Size())}: - case <-opts.done: - return context.Canceled + (*opts.remain)-- + case <-ctx.Done(): + return ctx.Err() } - (*opts.remain)-- } - continue } } @@ -173,8 +177,7 @@ func (ds *DiskStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.Size } limitMutable := limit - return ds.readBlobs(readBlobRequest{ - done: ctx.Done(), + return ds.readBlobs(ctx, readBlobRequest{ ch: dest, dirRoot: ds.root, after: after, @@ -197,3 +200,22 @@ func isShardDir(name string) bool { func isHex(b byte) bool { return ('0' <= b && b <= '9') || ('a' <= b && b <= 'f') } + +// future is an os.FileInfo future. +type future struct { + once sync.Once + f func() (os.FileInfo, error) + v os.FileInfo + err error +} + +func newFuture(f func() (os.FileInfo, error)) *future { + return &future{f: f} +} + +func (f *future) Get() (os.FileInfo, error) { + f.once.Do(f.run) + return f.v, f.err +} + +func (f *future) run() { f.v, f.err = f.f() } diff --git a/pkg/blobserver/localdisk/localdisk.go b/pkg/blobserver/localdisk/localdisk.go index 3e5aadbdc..c15d376df 100644 --- a/pkg/blobserver/localdisk/localdisk.go +++ b/pkg/blobserver/localdisk/localdisk.go @@ -64,6 +64,9 @@ type DiskStorage struct { // systems (Windows) where we don't know the maximum number of open // file descriptors. tmpFileGate *syncutil.Gate + + // statGate limits how many pending Stat calls we have in flight. + statGate *syncutil.Gate } func (ds *DiskStorage) String() string { @@ -112,6 +115,7 @@ func New(root string) (*DiskStorage, error) { root: root, dirLockMu: new(sync.RWMutex), gen: local.NewGenerationer(root), + statGate: syncutil.NewGate(10), // arbitrary, but bounded; be more clever later? } if err := ds.migrate3to2(); err != nil { return nil, fmt.Errorf("Error updating localdisk format: %v", err)