Corpus: after meta rows, scan rest all in parallel.

This works well enough for MySQL at least. We'll see if it hurts the other sorted.KeyValue
implementations. If so, we might have to fix those.

Change-Id: I71e9f56f2dee9e7334c7a35a4960f55399ca306f
This commit is contained in:
Brad Fitzpatrick 2013-12-07 19:11:31 -08:00
parent 5f1e7de0d1
commit f806008d06
1 changed files with 21 additions and 4 deletions

View File

@ -18,6 +18,7 @@ import (
"camlistore.org/pkg/schema"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/strutil"
"camlistore.org/pkg/syncutil"
"camlistore.org/pkg/types/camtypes"
)
@ -157,13 +158,17 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
"filetimes|",
"imagesize|",
}
var grp syncutil.Group
for i, prefix := range prefixes {
if logCorpusStats {
log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(prefixes)+1, prefix)
}
if err := c.scanPrefix(s, prefix); err != nil {
return err
log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(prefixes)+1,
prefix[:len(prefix)-1])
}
prefix := prefix
grp.Go(func() error { return c.scanPrefix(s, prefix) })
}
if err := grp.Err(); err != nil {
return err
}
// Post-load optimizations and restoration of invariants.
@ -219,13 +224,25 @@ func (c *Corpus) scanPrefix(s sorted.KeyValue, prefix string) (err error) {
panic("No registered merge func for prefix " + prefix)
}
n, t0 := 0, time.Now()
it := queryPrefixString(s, prefix)
defer closeIterator(it, &err)
for it.Next() {
n++
if n == 1 {
// Let the query be sent off and responses start flowing in before
// we take the lock. And if no rows: no lock.
c.mu.Lock()
defer c.mu.Unlock()
}
if err := fn(c, it.KeyBytes(), it.ValueBytes()); err != nil {
return err
}
}
if logCorpusStats {
d := time.Since(t0)
log.Printf("Scanned prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d)
}
return nil
}