diff --git a/pkg/readerutil/opener.go b/pkg/readerutil/opener.go index d8a8ffedd..7e7122eac 100644 --- a/pkg/readerutil/opener.go +++ b/pkg/readerutil/opener.go @@ -19,7 +19,6 @@ package readerutil import ( "os" "sync" - "sync/atomic" "camlistore.org/pkg/singleflight" "camlistore.org/pkg/types" @@ -28,53 +27,77 @@ import ( var ( openerGroup singleflight.Group - openFileMu sync.RWMutex // guards openFiles + openFileMu sync.Mutex // guards openFiles openFiles = make(map[string]*openFile) ) type openFile struct { - // refCount must be 64-bit aligned for 32-bit platforms. - refCount int64 // starts at 1; only valid if initial increment >= 2 - *os.File - path string // map key of openFiles + path string // map key of openFiles + refCount int } -func (f *openFile) Close() error { - if atomic.AddInt64(&f.refCount, -1) == 0 { - openFileMu.Lock() - if openFiles[f.path] == f { - delete(openFiles, f.path) - } +type openFileHandle struct { + closed bool + *openFile +} + +func (f *openFileHandle) Close() error { + openFileMu.Lock() + if f.closed { openFileMu.Unlock() - f.File.Close() + return nil } - return nil + f.closed = true + f.refCount-- + if f.refCount < 0 { + panic("unexpected negative refcount") + } + zero := f.refCount == 0 + if zero { + delete(openFiles, f.path) + } + openFileMu.Unlock() + if !zero { + return nil + } + return f.openFile.File.Close() +} + +type openingFile struct { + path string + mu sync.RWMutex // write-locked until Open is done + + // Results, once mu is unlocked: + of *openFile + err error } // OpenSingle opens the given file path for reading, reusing existing file descriptors // when possible. func OpenSingle(path string) (types.ReaderAtCloser, error) { + openFileMu.Lock() + of := openFiles[path] + if of != nil { + of.refCount++ + openFileMu.Unlock() + return &openFileHandle{false, of}, nil + } + openFileMu.Unlock() // release the lock while we call os.Open + + winner := false // this goroutine made it into Do's func + // Returns an *openFile resi, err := openerGroup.Do(path, func() (interface{}, error) { - openFileMu.RLock() - of := openFiles[path] - openFileMu.RUnlock() - if of != nil { - if atomic.AddInt64(&of.refCount, 1) >= 2 { - return of, nil - } - of.Close() - } - + winner = true f, err := os.Open(path) if err != nil { return nil, err } - of = &openFile{ + of := &openFile{ File: f, - refCount: 1, path: path, + refCount: 1, } openFileMu.Lock() openFiles[path] = of @@ -84,5 +107,19 @@ func OpenSingle(path string) (types.ReaderAtCloser, error) { if err != nil { return nil, err } - return resi.(*openFile), nil + of = resi.(*openFile) + + // If our os.Open was dup-suppressed, we have to increment our + // reference count. + if !winner { + openFileMu.Lock() + if of.refCount == 0 { + // Winner already closed it. Try again (rare). + openFileMu.Unlock() + return OpenSingle(path) + } + of.refCount++ + openFileMu.Unlock() + } + return &openFileHandle{false, of}, nil } diff --git a/pkg/readerutil/opener_test.go b/pkg/readerutil/opener_test.go new file mode 100644 index 000000000..41fa37b5b --- /dev/null +++ b/pkg/readerutil/opener_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2013 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readerutil + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "runtime" + "testing" +) + +func TestOpenSingle(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + f, err := ioutil.TempFile("", "foo") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + contents := []byte("Some file contents") + if _, err := f.Write(contents); err != nil { + t.Fatal(err) + } + f.Close() + + const j = 4 + errc := make(chan error, j) + for i := 1; i < j; i++ { + go func() { + buf := make([]byte, len(contents)) + for i := 0; i < 400; i++ { + rac, err := OpenSingle(f.Name()) + if err != nil { + errc <- err + return + } + n, err := rac.ReadAt(buf, 0) + if err != nil { + errc <- err + return + } + if n != len(contents) || !bytes.Equal(buf, contents) { + errc <- fmt.Errorf("read %d, %q; want %d, %q", n, buf, len(contents), contents) + return + } + if err := rac.Close(); err != nil { + errc <- err + return + } + } + errc <- nil + }() + } + for i := 1; i < j; i++ { + if err := <-errc; err != nil { + t.Error(err) + } + } +}