fs: implement fuse nodeReader.Read in terms of FileReader.ReadAt

Change-Id: I136543554ca5337568ec94d8888528220d5f88cc
This commit is contained in:
Brad Fitzpatrick 2012-12-26 10:02:22 -08:00
parent 08f09ebf2b
commit b8dc625560
2 changed files with 37 additions and 16 deletions

View File

@ -22,6 +22,8 @@ import (
"io/ioutil"
"log"
"os"
"os/signal"
"syscall"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver/localdisk" // used for the blob cache
@ -82,6 +84,13 @@ func main() {
// TODO: set fs's logger
}
// This doesn't appear to work on OS X:
sigc := make(chan os.Signal, 1)
go func() {
log.Fatalf("Signal %s received, shutting down.", <-sigc)
}()
signal.Notify(sigc, syscall.SIGQUIT, syscall.SIGTERM)
conn, err := fuse.Mount(mountPoint)
if err != nil {
log.Fatalf("Mount: %v", err)

View File

@ -173,30 +173,35 @@ func (n *node) Open(req *fuse.OpenRequest, res *fuse.OpenResponse, intr fuse.Int
if ss.Type == "directory" {
return n, nil
}
return &nodeReader{n: n, ss: ss}, nil
fr, err := ss.NewFileReader(n.fs.fetcher)
if err != nil {
// Will only happen if ss.Type != "file" or "bytes"
log.Printf("NewFileReader(%s) = %v", n.blobref, err)
return nil, fuse.EIO
}
return &nodeReader{n: n, fr: fr}, nil
}
type nodeReader struct {
n *node
ss *schema.Superset // not nil, always of type "file" or "bytes"
fr *schema.FileReader
}
func (nr *nodeReader) Read(req *fuse.ReadRequest, res *fuse.ReadResponse, intr fuse.Intr) fuse.Error {
log.Printf("CAMLI nodeReader READ on %v: %#v", nr.n.blobref, req)
// TODO: this isn't incredibly efficient, creating a new
// FileReader for each read chunk. We could do better here
// and re-use a locked pool of readers, trying to find the one
// with a current offset <= req.Offset first.
fr, err := nr.ss.NewFileReader(nr.n.fs.fetcher)
if err != nil {
panic(err)
if req.Offset >= nr.fr.Size() {
return nil
}
defer fr.Close()
fr.Skip(uint64(req.Offset))
buf := make([]byte, req.Size)
n, err := io.ReadFull(fr, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
size := req.Size
if int64(size) + req.Offset >= nr.fr.Size() {
size -= int((int64(size) + req.Offset) - nr.fr.Size())
}
buf := make([]byte, size)
n, err := nr.fr.ReadAt(buf, req.Offset)
if err == io.EOF {
err = nil
}
if err != nil {
log.Printf("camli read on %v at %d: %v", nr.n.blobref, req.Offset, err)
return fuse.EIO
}
@ -204,6 +209,12 @@ func (nr *nodeReader) Read(req *fuse.ReadRequest, res *fuse.ReadResponse, intr f
return nil
}
func (nr *nodeReader) Release(req *fuse.ReleaseRequest, intr fuse.Intr) fuse.Error {
log.Printf("CAMLI nodeReader RELEASE on %v", nr.n.blobref)
nr.fr.Close()
return nil
}
func (n *node) ReadDir(intr fuse.Intr) ([]fuse.Dirent, fuse.Error) {
log.Printf("CAMLI ReadDir on %v", n.blobref)
n.dmu.Lock()
@ -259,7 +270,8 @@ func (n *node) ReadDir(intr fuse.Intr) ([]fuse.Dirent, fuse.Error) {
}
n.dirents = make([]fuse.Dirent, 0)
for _, ch := range ssc {
for i, ch := range ssc {
log.Printf("CAMLI dir %v set %v, waiting on entry %d/%d", n.blobref, setRef, i+1, len(ssc))
r := <-ch
memberRef, mss, err := r.BlobRef, r.Superset, r.error
if err != nil {