mirror of https://github.com/perkeep/perkeep.git
pkg/blobserver/localdisk: gate tmp file creations
To avoid tmp file creation errors due to ulimit. A different, more flexible, approach was discussed on https://github.com/camlistore/camlistore/issues/812 , and could be implemented later on if the current CL is too naive. As a follow-up, issue #837 should be then fixed. Fixes #812 Change-Id: I2590fdac137b0e8711a6a1bf4ba8a32259496515
This commit is contained in:
parent
ae7b4b5d77
commit
2d87c79abb
|
@ -42,7 +42,9 @@ import (
|
|||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/blobserver/local"
|
||||
"camlistore.org/pkg/osutil"
|
||||
|
||||
"go4.org/jsonconfig"
|
||||
"go4.org/syncutil"
|
||||
)
|
||||
|
||||
// DiskStorage implements the blobserver.Storage interface using the
|
||||
|
@ -56,6 +58,12 @@ type DiskStorage struct {
|
|||
|
||||
// gen will be nil if partition != ""
|
||||
gen *local.Generationer
|
||||
|
||||
// tmpFileGate limits the number of temporary files open at the same
|
||||
// time, so we don't run into the max set by ulimit. It is nil on
|
||||
// systems (Windows) where we don't know the maximum number of open
|
||||
// file descriptors.
|
||||
tmpFileGate *syncutil.Gate
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) String() string {
|
||||
|
@ -70,6 +78,14 @@ func IsDir(root string) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
const (
|
||||
// We refuse to create a DiskStorage when the user's ulimit is lower than
|
||||
// minFDLimit. 100 is ridiculously low, but the default value on OSX is 256, and we
|
||||
// don't want to fail by default, so our min value has to be lower than 256.
|
||||
minFDLimit = 100
|
||||
recommendedFDLimit = 1024
|
||||
)
|
||||
|
||||
// New returns a new local disk storage implementation at the provided
|
||||
// root directory, which must already exist.
|
||||
func New(root string) (*DiskStorage, error) {
|
||||
|
@ -103,6 +119,20 @@ func New(root string) (*DiskStorage, error) {
|
|||
if _, _, err := ds.StorageGeneration(); err != nil {
|
||||
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
|
||||
}
|
||||
ul, err := osutil.MaxFD()
|
||||
if err != nil {
|
||||
if err == osutil.ErrNotSupported {
|
||||
// Do not set the gate on Windows, since we don't know the ulimit.
|
||||
return ds, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if ul < minFDLimit {
|
||||
return nil, fmt.Errorf("The max number of open file descriptors on your system (ulimit -n) is too low. Please fix it with 'ulimit -S -n X' with X being at least %d.", recommendedFDLimit)
|
||||
}
|
||||
// Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Camlistore.
|
||||
// TODO(mpl): make this used and enforced Camlistore-wide. Issue #837.
|
||||
ds.tmpFileGate = syncutil.NewGate(int(ul * 80 / 100))
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,20 @@ import (
|
|||
"camlistore.org/pkg/blob"
|
||||
)
|
||||
|
||||
func (ds *DiskStorage) startGate() {
|
||||
if ds.tmpFileGate == nil {
|
||||
return
|
||||
}
|
||||
ds.tmpFileGate.Start()
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) doneGate() {
|
||||
if ds.tmpFileGate == nil {
|
||||
return
|
||||
}
|
||||
ds.tmpFileGate.Done()
|
||||
}
|
||||
|
||||
func (ds *DiskStorage) ReceiveBlob(blobRef blob.Ref, source io.Reader) (ref blob.SizedRef, err error) {
|
||||
ds.dirLockMu.RLock()
|
||||
defer ds.dirLockMu.RUnlock()
|
||||
|
@ -37,8 +51,12 @@ func (ds *DiskStorage) ReceiveBlob(blobRef blob.Ref, source io.Reader) (ref blob
|
|||
return
|
||||
}
|
||||
|
||||
// TODO(mpl): warn when we hit the gate, and at a limited rate, like maximum once a minute.
|
||||
// Deferring to another CL, since it requires modifications to syncutil.Gate first.
|
||||
ds.startGate()
|
||||
tempFile, err := ioutil.TempFile(hashedDirectory, blobFileBaseName(blobRef)+".tmp")
|
||||
if err != nil {
|
||||
ds.doneGate()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -48,6 +66,7 @@ func (ds *DiskStorage) ReceiveBlob(blobRef blob.Ref, source io.Reader) (ref blob
|
|||
log.Println("Removing temp file: ", tempFile.Name())
|
||||
os.Remove(tempFile.Name())
|
||||
}
|
||||
ds.doneGate()
|
||||
}()
|
||||
|
||||
written, err := io.Copy(tempFile, source)
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
Copyright 2016 The Camlistore Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package osutil
|
||||
|
||||
// Mkfifo creates a FIFO file named path. It returns ErrNotSupported on
|
||||
// unsupported systems.
|
||||
func Mkfifo(path string, mode uint32) error {
|
||||
return mkfifo(path, mode)
|
||||
}
|
||||
|
||||
// Mksocket creates a socket file (a Unix Domain Socket) named path. It returns
|
||||
// ErrNotSupported on unsupported systems.
|
||||
func Mksocket(path string) error {
|
||||
return mksocket(path)
|
||||
}
|
||||
|
||||
// MaxFD returns the maximum number of open file descriptors allowed. It returns
|
||||
// ErrNotSupported on unsupported systems.
|
||||
func MaxFD() (uint64, error) {
|
||||
return maxFD()
|
||||
}
|
|
@ -20,3 +20,4 @@ package osutil
|
|||
|
||||
func Mkfifo(path string, mode uint32) error { return ErrNotSupported }
|
||||
func Mksocket(path string) error { return ErrNotSupported }
|
||||
func MaxFD() (uint64, error) { return 0, ErrNotSupported }
|
||||
|
|
|
@ -19,18 +19,18 @@ limitations under the License.
|
|||
package osutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func Mkfifo(path string, mode uint32) error {
|
||||
func mkfifo(path string, mode uint32) error {
|
||||
return syscall.Mkfifo(path, mode)
|
||||
}
|
||||
|
||||
// Mksocket creates a socket file (a Unix Domain Socket) named path.
|
||||
func Mksocket(path string) error {
|
||||
func mksocket(path string) error {
|
||||
dir := filepath.Dir(path)
|
||||
base := filepath.Base(path)
|
||||
tmp := filepath.Join(dir, "."+base)
|
||||
|
@ -50,3 +50,11 @@ func Mksocket(path string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func maxFD() (uint64, error) {
|
||||
var rlim syscall.Rlimit
|
||||
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim); err != nil {
|
||||
return 0, fmt.Errorf("ulimit error: %v", err)
|
||||
}
|
||||
return rlim.Cur, nil
|
||||
}
|
||||
|
|
|
@ -19,19 +19,19 @@ limitations under the License.
|
|||
package osutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func Mkfifo(path string, mode uint32) error {
|
||||
func mkfifo(path string, mode uint32) error {
|
||||
// Mkfifo is missing from syscall, thus call Mknod as it does on Linux.
|
||||
return syscall.Mknod(path, mode|syscall.S_IFIFO, 0)
|
||||
}
|
||||
|
||||
// Mksocket creates a socket file (a Unix Domain Socket) named path.
|
||||
func Mksocket(path string) error {
|
||||
func mksocket(path string) error {
|
||||
dir := filepath.Dir(path)
|
||||
base := filepath.Base(path)
|
||||
tmp := filepath.Join(dir, "."+base)
|
||||
|
@ -51,3 +51,11 @@ func Mksocket(path string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func maxFD() (uint64, error) {
|
||||
var rlim syscall.Rlimit
|
||||
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim); err != nil {
|
||||
return 0, fmt.Errorf("ulimit error: %v", err)
|
||||
}
|
||||
return rlim.Cur, nil
|
||||
}
|
||||
|
|
|
@ -16,5 +16,6 @@ limitations under the License.
|
|||
|
||||
package osutil
|
||||
|
||||
func Mkfifo(path string, mode uint32) error { return ErrNotSupported }
|
||||
func Mksocket(path string) error { return ErrNotSupported }
|
||||
func mkfifo(path string, mode uint32) error { return ErrNotSupported }
|
||||
func mksocket(path string) error { return ErrNotSupported }
|
||||
func maxFD() (uint64, error) { return 0, ErrNotSupported }
|
||||
|
|
Loading…
Reference in New Issue