readerutil: fix OpenSingle bug causing "bad file descriptor" in dispacked.

Simplifies the code a bit, even though locking is a bit more coarse
now. We can make it faster later if it matters, especially now that
there's a test (which reliably failed before this fix).

Fixes camlistore.org/issue/264

Change-Id: Ifac79728f6a105ba76a60997c55c5d7d818f6f71
This commit is contained in:
Brad Fitzpatrick 2013-12-24 10:57:06 -08:00
parent c9b0249c94
commit 2bc4b5075f
2 changed files with 141 additions and 27 deletions

View File

@ -19,7 +19,6 @@ package readerutil
import ( import (
"os" "os"
"sync" "sync"
"sync/atomic"
"camlistore.org/pkg/singleflight" "camlistore.org/pkg/singleflight"
"camlistore.org/pkg/types" "camlistore.org/pkg/types"
@ -28,53 +27,77 @@ import (
var ( var (
openerGroup singleflight.Group openerGroup singleflight.Group
openFileMu sync.RWMutex // guards openFiles openFileMu sync.Mutex // guards openFiles
openFiles = make(map[string]*openFile) openFiles = make(map[string]*openFile)
) )
type openFile struct { type openFile struct {
// refCount must be 64-bit aligned for 32-bit platforms.
refCount int64 // starts at 1; only valid if initial increment >= 2
*os.File *os.File
path string // map key of openFiles path string // map key of openFiles
refCount int
} }
func (f *openFile) Close() error { type openFileHandle struct {
if atomic.AddInt64(&f.refCount, -1) == 0 { closed bool
openFileMu.Lock() *openFile
if openFiles[f.path] == f { }
delete(openFiles, f.path)
} func (f *openFileHandle) Close() error {
openFileMu.Lock()
if f.closed {
openFileMu.Unlock() openFileMu.Unlock()
f.File.Close() return nil
} }
return nil f.closed = true
f.refCount--
if f.refCount < 0 {
panic("unexpected negative refcount")
}
zero := f.refCount == 0
if zero {
delete(openFiles, f.path)
}
openFileMu.Unlock()
if !zero {
return nil
}
return f.openFile.File.Close()
}
type openingFile struct {
path string
mu sync.RWMutex // write-locked until Open is done
// Results, once mu is unlocked:
of *openFile
err error
} }
// OpenSingle opens the given file path for reading, reusing existing file descriptors // OpenSingle opens the given file path for reading, reusing existing file descriptors
// when possible. // when possible.
func OpenSingle(path string) (types.ReaderAtCloser, error) { func OpenSingle(path string) (types.ReaderAtCloser, error) {
openFileMu.Lock()
of := openFiles[path]
if of != nil {
of.refCount++
openFileMu.Unlock()
return &openFileHandle{false, of}, nil
}
openFileMu.Unlock() // release the lock while we call os.Open
winner := false // this goroutine made it into Do's func
// Returns an *openFile // Returns an *openFile
resi, err := openerGroup.Do(path, func() (interface{}, error) { resi, err := openerGroup.Do(path, func() (interface{}, error) {
openFileMu.RLock() winner = true
of := openFiles[path]
openFileMu.RUnlock()
if of != nil {
if atomic.AddInt64(&of.refCount, 1) >= 2 {
return of, nil
}
of.Close()
}
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
of = &openFile{ of := &openFile{
File: f, File: f,
refCount: 1,
path: path, path: path,
refCount: 1,
} }
openFileMu.Lock() openFileMu.Lock()
openFiles[path] = of openFiles[path] = of
@ -84,5 +107,19 @@ func OpenSingle(path string) (types.ReaderAtCloser, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return resi.(*openFile), nil of = resi.(*openFile)
// If our os.Open was dup-suppressed, we have to increment our
// reference count.
if !winner {
openFileMu.Lock()
if of.refCount == 0 {
// Winner already closed it. Try again (rare).
openFileMu.Unlock()
return OpenSingle(path)
}
of.refCount++
openFileMu.Unlock()
}
return &openFileHandle{false, of}, nil
} }

View File

@ -0,0 +1,77 @@
/*
Copyright 2013 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package readerutil
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"runtime"
"testing"
)
func TestOpenSingle(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4))
f, err := ioutil.TempFile("", "foo")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
contents := []byte("Some file contents")
if _, err := f.Write(contents); err != nil {
t.Fatal(err)
}
f.Close()
const j = 4
errc := make(chan error, j)
for i := 1; i < j; i++ {
go func() {
buf := make([]byte, len(contents))
for i := 0; i < 400; i++ {
rac, err := OpenSingle(f.Name())
if err != nil {
errc <- err
return
}
n, err := rac.ReadAt(buf, 0)
if err != nil {
errc <- err
return
}
if n != len(contents) || !bytes.Equal(buf, contents) {
errc <- fmt.Errorf("read %d, %q; want %d, %q", n, buf, len(contents), contents)
return
}
if err := rac.Close(); err != nil {
errc <- err
return
}
}
errc <- nil
}()
}
for i := 1; i < j; i++ {
if err := <-errc; err != nil {
t.Error(err)
}
}
}