diff --git a/pkg/blobserver/localdisk/localdisk.go b/pkg/blobserver/localdisk/localdisk.go index bef651dd2..b18b7abb7 100644 --- a/pkg/blobserver/localdisk/localdisk.go +++ b/pkg/blobserver/localdisk/localdisk.go @@ -42,7 +42,9 @@ import ( "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/local" "camlistore.org/pkg/osutil" + "go4.org/jsonconfig" + "go4.org/syncutil" ) // DiskStorage implements the blobserver.Storage interface using the @@ -56,6 +58,12 @@ type DiskStorage struct { // gen will be nil if partition != "" gen *local.Generationer + + // tmpFileGate limits the number of temporary files open at the same + // time, so we don't run into the max set by ulimit. It is nil on + // systems (Windows) where we don't know the maximum number of open + // file descriptors. + tmpFileGate *syncutil.Gate } func (ds *DiskStorage) String() string { @@ -70,6 +78,14 @@ func IsDir(root string) (bool, error) { return false, nil } +const ( + // We refuse to create a DiskStorage when the user's ulimit is lower than + // minFDLimit. 100 is ridiculously low, but the default value on OSX is 256, and we + // don't want to fail by default, so our min value has to be lower than 256. + minFDLimit = 100 + recommendedFDLimit = 1024 +) + // New returns a new local disk storage implementation at the provided // root directory, which must already exist. func New(root string) (*DiskStorage, error) { @@ -103,6 +119,20 @@ func New(root string) (*DiskStorage, error) { if _, _, err := ds.StorageGeneration(); err != nil { return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err) } + ul, err := osutil.MaxFD() + if err != nil { + if err == osutil.ErrNotSupported { + // Do not set the gate on Windows, since we don't know the ulimit. + return ds, nil + } + return nil, err + } + if ul < minFDLimit { + return nil, fmt.Errorf("The max number of open file descriptors on your system (ulimit -n) is too low. Please fix it with 'ulimit -S -n X' with X being at least %d.", recommendedFDLimit) + } + // Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Camlistore. + // TODO(mpl): make this used and enforced Camlistore-wide. Issue #837. + ds.tmpFileGate = syncutil.NewGate(int(ul * 80 / 100)) return ds, nil } diff --git a/pkg/blobserver/localdisk/receive.go b/pkg/blobserver/localdisk/receive.go index bc5dfbe65..d0b8ef511 100644 --- a/pkg/blobserver/localdisk/receive.go +++ b/pkg/blobserver/localdisk/receive.go @@ -27,6 +27,20 @@ import ( "camlistore.org/pkg/blob" ) +func (ds *DiskStorage) startGate() { + if ds.tmpFileGate == nil { + return + } + ds.tmpFileGate.Start() +} + +func (ds *DiskStorage) doneGate() { + if ds.tmpFileGate == nil { + return + } + ds.tmpFileGate.Done() +} + func (ds *DiskStorage) ReceiveBlob(blobRef blob.Ref, source io.Reader) (ref blob.SizedRef, err error) { ds.dirLockMu.RLock() defer ds.dirLockMu.RUnlock() @@ -37,8 +51,12 @@ func (ds *DiskStorage) ReceiveBlob(blobRef blob.Ref, source io.Reader) (ref blob return } + // TODO(mpl): warn when we hit the gate, and at a limited rate, like maximum once a minute. + // Deferring to another CL, since it requires modifications to syncutil.Gate first. + ds.startGate() tempFile, err := ioutil.TempFile(hashedDirectory, blobFileBaseName(blobRef)+".tmp") if err != nil { + ds.doneGate() return } @@ -48,6 +66,7 @@ func (ds *DiskStorage) ReceiveBlob(blobRef blob.Ref, source io.Reader) (ref blob log.Println("Removing temp file: ", tempFile.Name()) os.Remove(tempFile.Name()) } + ds.doneGate() }() written, err := io.Copy(tempFile, source) diff --git a/pkg/osutil/syscall.go b/pkg/osutil/syscall.go new file mode 100644 index 000000000..78ae18950 --- /dev/null +++ b/pkg/osutil/syscall.go @@ -0,0 +1,35 @@ +/* +Copyright 2016 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package osutil + +// Mkfifo creates a FIFO file named path. It returns ErrNotSupported on +// unsupported systems. +func Mkfifo(path string, mode uint32) error { + return mkfifo(path, mode) +} + +// Mksocket creates a socket file (a Unix Domain Socket) named path. It returns +// ErrNotSupported on unsupported systems. +func Mksocket(path string) error { + return mksocket(path) +} + +// MaxFD returns the maximum number of open file descriptors allowed. It returns +// ErrNotSupported on unsupported systems. +func MaxFD() (uint64, error) { + return maxFD() +} diff --git a/pkg/osutil/syscall_appengine.go b/pkg/osutil/syscall_appengine.go index 2f0181aee..d9c211d67 100644 --- a/pkg/osutil/syscall_appengine.go +++ b/pkg/osutil/syscall_appengine.go @@ -20,3 +20,4 @@ package osutil func Mkfifo(path string, mode uint32) error { return ErrNotSupported } func Mksocket(path string) error { return ErrNotSupported } +func MaxFD() (uint64, error) { return 0, ErrNotSupported } diff --git a/pkg/osutil/syscall_posix.go b/pkg/osutil/syscall_posix.go index ab085d81d..9ccfa2d42 100644 --- a/pkg/osutil/syscall_posix.go +++ b/pkg/osutil/syscall_posix.go @@ -19,18 +19,18 @@ limitations under the License. package osutil import ( + "fmt" "net" "os" "path/filepath" "syscall" ) -func Mkfifo(path string, mode uint32) error { +func mkfifo(path string, mode uint32) error { return syscall.Mkfifo(path, mode) } -// Mksocket creates a socket file (a Unix Domain Socket) named path. -func Mksocket(path string) error { +func mksocket(path string) error { dir := filepath.Dir(path) base := filepath.Base(path) tmp := filepath.Join(dir, "."+base) @@ -50,3 +50,11 @@ func Mksocket(path string) error { return nil } + +func maxFD() (uint64, error) { + var rlim syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim); err != nil { + return 0, fmt.Errorf("ulimit error: %v", err) + } + return rlim.Cur, nil +} diff --git a/pkg/osutil/syscall_solaris.go b/pkg/osutil/syscall_solaris.go index db0593c81..882f854dd 100644 --- a/pkg/osutil/syscall_solaris.go +++ b/pkg/osutil/syscall_solaris.go @@ -19,19 +19,19 @@ limitations under the License. package osutil import ( + "fmt" "net" "os" "path/filepath" "syscall" ) -func Mkfifo(path string, mode uint32) error { +func mkfifo(path string, mode uint32) error { // Mkfifo is missing from syscall, thus call Mknod as it does on Linux. return syscall.Mknod(path, mode|syscall.S_IFIFO, 0) } -// Mksocket creates a socket file (a Unix Domain Socket) named path. -func Mksocket(path string) error { +func mksocket(path string) error { dir := filepath.Dir(path) base := filepath.Base(path) tmp := filepath.Join(dir, "."+base) @@ -51,3 +51,11 @@ func Mksocket(path string) error { return nil } + +func maxFD() (uint64, error) { + var rlim syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim); err != nil { + return 0, fmt.Errorf("ulimit error: %v", err) + } + return rlim.Cur, nil +} diff --git a/pkg/osutil/syscall_windows.go b/pkg/osutil/syscall_windows.go index 7eedb32d5..7508fbde8 100644 --- a/pkg/osutil/syscall_windows.go +++ b/pkg/osutil/syscall_windows.go @@ -16,5 +16,6 @@ limitations under the License. package osutil -func Mkfifo(path string, mode uint32) error { return ErrNotSupported } -func Mksocket(path string) error { return ErrNotSupported } +func mkfifo(path string, mode uint32) error { return ErrNotSupported } +func mksocket(path string) error { return ErrNotSupported } +func maxFD() (uint64, error) { return 0, ErrNotSupported }