index: show corpus building CPU usage, disabled blob.Parse cache, size maps

Change-Id: I089053e8de46978573e4b5fe5cdc14ccac0d54a2
This commit is contained in:
Brad Fitzpatrick 2013-11-30 12:04:04 -08:00
parent e0e0c9fead
commit 778f5cc6c4
2 changed files with 53 additions and 15 deletions

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"camlistore.org/pkg/blob" "camlistore.org/pkg/blob"
"camlistore.org/pkg/osutil"
"camlistore.org/pkg/schema" "camlistore.org/pkg/schema"
"camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted"
"camlistore.org/pkg/strutil" "camlistore.org/pkg/strutil"
@ -34,7 +35,8 @@ type Corpus struct {
gen int64 gen int64
strs map[string]string // interned strings strs map[string]string // interned strings
brInterns int64 brOfStr map[string]blob.Ref // blob.Parse fast path
brInterns int64 // blob.Ref -> blob.Ref, via br method
blobs map[blob.Ref]*camtypes.BlobMeta blobs map[blob.Ref]*camtypes.BlobMeta
sumBlobBytes int64 sumBlobBytes int64
@ -70,6 +72,7 @@ func newCorpus() *Corpus {
imageInfo: make(map[blob.Ref]camtypes.ImageInfo), imageInfo: make(map[blob.Ref]camtypes.ImageInfo),
deletedBy: make(map[blob.Ref]blob.Ref), deletedBy: make(map[blob.Ref]blob.Ref),
keyId: make(map[blob.Ref]string), keyId: make(map[blob.Ref]string),
brOfStr: make(map[string]blob.Ref),
} }
} }
@ -128,19 +131,28 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
c.building = true c.building = true
ms0 := memstats() ms0 := memstats()
log.Printf("Slurping corpus to memory from index...") log.Printf("Slurping corpus to memory from index...")
// We do the "meta" rows first, before the prefixes below, because it
// populates the blobs map (used for blobref interning) and the camBlobs
// map (used for hinting the size of other maps)
log.Printf("Slurping corpus to memory from index... (1/6: meta rows)")
if err := c.scanPrefix(s, "meta:"); err != nil {
return err
}
c.files = make(map[blob.Ref]camtypes.FileInfo, len(c.camBlobs["file"]))
c.permanodes = make(map[blob.Ref]*PermanodeMeta, len(c.camBlobs["permanode"]))
cpu0 := osutil.CPUUsage()
prefixes := []string{ prefixes := []string{
"meta:", // should be first, for blobref interning
"signerkeyid:", "signerkeyid:",
"claim|", "claim|",
"fileinfo|", "fileinfo|",
"filetimes|", "filetimes|",
"imagesize|", "imagesize|",
} }
for i, prefix := range prefixes { for i, prefix := range prefixes {
log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+1, len(prefixes), prefix) log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(prefixes)+1, prefix)
if err := c.scanPrefix(s, prefix); err != nil { if err := c.scanPrefix(s, prefix); err != nil {
return err return err
} }
@ -161,9 +173,12 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
} }
} }
c.brOfStr = nil // drop this now.
c.building = false c.building = false
// log.V(1).Printf("interned blob.Ref = %d", c.brInterns) // log.V(1).Printf("interned blob.Ref = %d", c.brInterns)
cpu := osutil.CPUUsage() - cpu0
ms1 := memstats() ms1 := memstats()
memUsed := ms1.Alloc - ms0.Alloc memUsed := ms1.Alloc - ms0.Alloc
if ms1.Alloc < ms0.Alloc { if ms1.Alloc < ms0.Alloc {
@ -177,6 +192,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
len(c.permanodes), len(c.permanodes),
len(c.files), len(c.files),
len(c.imageInfo)) len(c.imageInfo))
log.Printf("Corpus scanning CPU usage: %v", cpu)
return nil return nil
} }
@ -226,6 +242,10 @@ func (c *Corpus) mergeMetaRow(k, v string) error {
if !ok { if !ok {
return fmt.Errorf("bogus meta row: %q -> %q", k, v) return fmt.Errorf("bogus meta row: %q -> %q", k, v)
} }
if useBlobParseCache && c.brOfStr != nil {
brstr := k[len("meta:"):]
c.brOfStr[brstr] = bm.Ref
}
if _, dup := c.blobs[bm.Ref]; dup { if _, dup := c.blobs[bm.Ref]; dup {
// Um, shouldn't happen. TODO(bradfitz): is it // Um, shouldn't happen. TODO(bradfitz): is it
// guaranteed elsewhere that duplicate blobs are never // guaranteed elsewhere that duplicate blobs are never
@ -259,7 +279,7 @@ func (c *Corpus) mergeSignerKeyIdRow(k, v string) error {
} }
func (c *Corpus) mergeClaimRow(k, v string) error { func (c *Corpus) mergeClaimRow(k, v string) error {
cl, ok := kvClaim(k, v) cl, ok := kvClaim(k, v, c.blobParse)
if !ok || !cl.Permanode.Valid() { if !ok || !cl.Permanode.Valid() {
return fmt.Errorf("bogus claim row: %q -> %q", k, v) return fmt.Errorf("bogus claim row: %q -> %q", k, v)
} }
@ -288,7 +308,7 @@ func (c *Corpus) mergeFileInfoRow(k, v string) error {
if len(c.ss) != 2 { if len(c.ss) != 2 {
return fmt.Errorf("unexpected fileinfo key %q", k) return fmt.Errorf("unexpected fileinfo key %q", k)
} }
br, ok := blob.Parse(c.ss[1]) br, ok := c.blobParse(c.ss[1])
if !ok { if !ok {
return fmt.Errorf("unexpected fileinfo blobref in key %q", k) return fmt.Errorf("unexpected fileinfo blobref in key %q", k)
} }
@ -317,7 +337,7 @@ func (c *Corpus) mergeFileTimesRow(k, v string) error {
if len(c.ss) != 2 { if len(c.ss) != 2 {
return fmt.Errorf("unexpected filetimes key %q", k) return fmt.Errorf("unexpected filetimes key %q", k)
} }
br, ok := blob.Parse(c.ss[1]) br, ok := c.blobParse(c.ss[1])
if !ok { if !ok {
return fmt.Errorf("unexpected filetimes blobref in key %q", k) return fmt.Errorf("unexpected filetimes blobref in key %q", k)
} }
@ -337,7 +357,7 @@ func (c *Corpus) mutateFileInfo(br blob.Ref, fn func(*camtypes.FileInfo)) {
} }
func (c *Corpus) mergeImageSizeRow(k, v string) error { func (c *Corpus) mergeImageSizeRow(k, v string) error {
br, okk := blob.Parse(k[len("imagesize|"):]) br, okk := c.blobParse(k[len("imagesize|"):])
ii, okv := kvImageInfo(v) ii, okv := kvImageInfo(v)
if !okk || !okv { if !okk || !okv {
return fmt.Errorf("bogus row %q = %q", k, v) return fmt.Errorf("bogus row %q = %q", k, v)
@ -347,6 +367,24 @@ func (c *Corpus) mergeImageSizeRow(k, v string) error {
return nil return nil
} }
// This enables the blob.Parse fast path cache, which reduces CPU (via
// reduced GC from new garbage), but increases memory usage, even
// though it shouldn't. The GC should fully discard the brOfStr map
// (which we nil out at the end of parsing), but the Go GC doesn't
// seem to clear it all.
// TODO: investigate / file bugs.
const useBlobParseCache = false
func (c *Corpus) blobParse(v string) (br blob.Ref, ok bool) {
if useBlobParseCache {
br, ok = c.brOfStr[v]
if ok {
return
}
}
return blob.Parse(v)
}
// str returns s, interned. // str returns s, interned.
func (c *Corpus) str(s string) string { func (c *Corpus) str(s string) string {
if s == "" { if s == "" {

View File

@ -378,7 +378,7 @@ func (x *Index) AppendClaims(dst []camtypes.Claim, permaNode blob.Ref,
if mustHave != "" && !strings.Contains(val, mustHave) { if mustHave != "" && !strings.Contains(val, mustHave) {
continue continue
} }
cl, ok := kvClaim(it.Key(), val) cl, ok := kvClaim(it.Key(), val, blob.Parse)
if !ok { if !ok {
continue continue
} }
@ -396,22 +396,22 @@ func (x *Index) AppendClaims(dst []camtypes.Claim, permaNode blob.Ref,
return dst, nil return dst, nil
} }
func kvClaim(k, v string) (c camtypes.Claim, ok bool) { func kvClaim(k, v string, blobParse func(string) (blob.Ref, bool)) (c camtypes.Claim, ok bool) {
// TODO(bradfitz): remove the strings.Split calls to reduce allocations. // TODO(bradfitz): remove the strings.Split calls to reduce allocations.
keyPart := strings.Split(k, "|") keyPart := strings.Split(k, "|")
valPart := strings.Split(v, "|") valPart := strings.Split(v, "|")
if len(keyPart) < 5 || len(valPart) < 4 { if len(keyPart) < 5 || len(valPart) < 4 {
return return
} }
signerRef, ok := blob.Parse(valPart[3]) signerRef, ok := blobParse(valPart[3])
if !ok { if !ok {
return return
} }
permaNode, ok := blob.Parse(keyPart[1]) permaNode, ok := blobParse(keyPart[1])
if !ok { if !ok {
return return
} }
claimRef, ok := blob.Parse(keyPart[4]) claimRef, ok := blobParse(keyPart[4])
if !ok { if !ok {
return return
} }