diff --git a/pkg/index/corpus.go b/pkg/index/corpus.go index bfd0d13f6..7b6771905 100644 --- a/pkg/index/corpus.go +++ b/pkg/index/corpus.go @@ -18,6 +18,7 @@ import ( "camlistore.org/pkg/schema" "camlistore.org/pkg/sorted" "camlistore.org/pkg/strutil" + "camlistore.org/pkg/syncutil" "camlistore.org/pkg/types/camtypes" ) @@ -157,13 +158,17 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error { "filetimes|", "imagesize|", } + var grp syncutil.Group for i, prefix := range prefixes { if logCorpusStats { - log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(prefixes)+1, prefix) - } - if err := c.scanPrefix(s, prefix); err != nil { - return err + log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(prefixes)+1, + prefix[:len(prefix)-1]) } + prefix := prefix + grp.Go(func() error { return c.scanPrefix(s, prefix) }) + } + if err := grp.Err(); err != nil { + return err } // Post-load optimizations and restoration of invariants. @@ -219,13 +224,25 @@ func (c *Corpus) scanPrefix(s sorted.KeyValue, prefix string) (err error) { panic("No registered merge func for prefix " + prefix) } + n, t0 := 0, time.Now() it := queryPrefixString(s, prefix) defer closeIterator(it, &err) for it.Next() { + n++ + if n == 1 { + // Let the query be sent off and responses start flowing in before + // we take the lock. And if no rows: no lock. + c.mu.Lock() + defer c.mu.Unlock() + } if err := fn(c, it.KeyBytes(), it.ValueBytes()); err != nil { return err } } + if logCorpusStats { + d := time.Since(t0) + log.Printf("Scanned prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d) + } return nil }