mirror of https://github.com/perkeep/perkeep.git
fs: use schema.DirReader, rather than doing it by hand
Change-Id: I7e79621d621314ed1f50219f0cafc44b1b82ab1f
This commit is contained in:
parent
2fd4a22694
commit
d2bc6f6cd3
61
pkg/fs/fs.go
61
pkg/fs/fs.go
|
@ -227,65 +227,24 @@ func (n *node) ReadDir(intr fuse.Intr) ([]fuse.Dirent, fuse.Error) {
|
|||
|
||||
ss, err := n.schema()
|
||||
if err != nil {
|
||||
log.Printf("camli.ReadDir error on %v: %v", n.blobref, err)
|
||||
return nil, fuse.EIO
|
||||
}
|
||||
setRef := ss.DirectoryEntries()
|
||||
if setRef == nil {
|
||||
return nil, nil
|
||||
}
|
||||
log.Printf("fetching setref: %v...", setRef)
|
||||
setss, err := n.fs.fetchSchemaMeta(setRef)
|
||||
dr, err := schema.NewDirReader(n.fs.fetcher, ss.BlobRef())
|
||||
if err != nil {
|
||||
log.Printf("fetching %v for readdir on %v: %v", setRef, n.blobref, err)
|
||||
log.Printf("camli.ReadDir error on %v: %v", n.blobref, err)
|
||||
return nil, fuse.EIO
|
||||
}
|
||||
if setss.Type() != "static-set" {
|
||||
log.Printf("%v is not a static-set in readdir; is a %q", setRef, setss.Type())
|
||||
schemaEnts, err := dr.Readdir(-1)
|
||||
if err != nil {
|
||||
log.Printf("camli.ReadDir error on %v: %v", n.blobref, err)
|
||||
return nil, fuse.EIO
|
||||
}
|
||||
|
||||
// TODO(bradfitz): push down information to the fetcher
|
||||
// (cachingfetcher -> remote client http) that we're going to load a
|
||||
// bunch, so the HTTP client (if not using SPDY) can do discovery
|
||||
// and see if the server supports a batch handler, then get them all
|
||||
// in one round-trip, rather than attacking the server with hundreds
|
||||
// of parallel TLS setups.
|
||||
|
||||
// res is the result of fetchSchemaMeta. the ssc slice of channels keeps them ordered
|
||||
// the same as they're listed in the schema's Members.
|
||||
type res struct {
|
||||
*blobref.BlobRef
|
||||
*schema.Blob
|
||||
error
|
||||
}
|
||||
var ssc []chan res
|
||||
for _, memberRef := range setss.StaticSetMembers() {
|
||||
ch := make(chan res, 1)
|
||||
ssc = append(ssc, ch)
|
||||
// TODO: move the cmd/camput/chanworker.go into its own package, and use it here. only
|
||||
// have 10 or so of these loading at once. for now we do them all.
|
||||
go func(memberRef *blobref.BlobRef) {
|
||||
mss, err := n.fs.fetchSchemaMeta(memberRef)
|
||||
if err != nil {
|
||||
log.Printf("error reading entry %v in readdir: %v", memberRef, err)
|
||||
}
|
||||
ch <- res{memberRef, mss, err}
|
||||
}(memberRef)
|
||||
}
|
||||
|
||||
n.dirents = make([]fuse.Dirent, 0)
|
||||
for i, ch := range ssc {
|
||||
log.Printf("CAMLI dir %v set %v, waiting on entry %d/%d", n.blobref, setRef, i+1, len(ssc))
|
||||
r := <-ch
|
||||
memberRef, mss, err := r.BlobRef, r.Blob, r.error
|
||||
if err != nil {
|
||||
return nil, fuse.EIO
|
||||
}
|
||||
if filename := mss.FileName(); filename != "" {
|
||||
n.addLookupEntry(filename, memberRef)
|
||||
n.dirents = append(n.dirents, fuse.Dirent{
|
||||
Name: mss.FileName(),
|
||||
})
|
||||
for _, sent := range schemaEnts {
|
||||
if name := sent.FileName(); name != "" {
|
||||
n.addLookupEntry(name, sent.BlobRef())
|
||||
n.dirents = append(n.dirents, fuse.Dirent{Name: name})
|
||||
}
|
||||
}
|
||||
return n.dirents, nil
|
||||
|
|
|
@ -126,12 +126,38 @@ func (dr *DirReader) Readdir(n int) (entries []DirectoryEntry, err error) {
|
|||
up = len(sts)
|
||||
}
|
||||
}
|
||||
for _, entryBref := range sts[dr.current:up] {
|
||||
entry, err := NewDirectoryEntryFromBlobRef(dr.fetcher, entryBref)
|
||||
if err != nil {
|
||||
|
||||
// TODO(bradfitz): push down information to the fetcher
|
||||
// (e.g. cachingfetcher -> remote client http) that we're
|
||||
// going to load a bunch, so the HTTP client (if not using
|
||||
// SPDY) can do discovery and see if the server supports a
|
||||
// batch handler, then get them all in one round-trip, rather
|
||||
// than attacking the server with hundreds of parallel TLS
|
||||
// setups.
|
||||
|
||||
type res struct {
|
||||
ent DirectoryEntry
|
||||
err error
|
||||
}
|
||||
var cs []chan res
|
||||
|
||||
// Kick off all directory entry loads.
|
||||
// TODO: bound this?
|
||||
for _, entRef := range sts[dr.current:up] {
|
||||
c := make(chan res, 1)
|
||||
cs = append(cs, c)
|
||||
go func(entRef *blobref.BlobRef) {
|
||||
entry, err := NewDirectoryEntryFromBlobRef(dr.fetcher, entRef)
|
||||
c <- res{entry, err}
|
||||
}(entRef)
|
||||
}
|
||||
|
||||
for _, c := range cs {
|
||||
res := <-c
|
||||
if res.err != nil {
|
||||
return nil, fmt.Errorf("schema/filereader: can't create dirEntry: %v\n", err)
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
entries = append(entries, res.ent)
|
||||
}
|
||||
return entries, err
|
||||
return entries, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue