From e03d923fe113197e84598ea77b6ff5358dbf9d45 Mon Sep 17 00:00:00 2001 From: mpl Date: Fri, 15 Nov 2013 00:49:52 +0100 Subject: [PATCH] pkg/index: use a map to populate the mutations When indexing upon a blob reception, we first populate all the mutations in a map instead of in a batch mutation. Then we transfer all the mutations in a batch and commit it immediately. This makes the window when the batch mutation is open much shorter, and will ease future indexing because it allows reading from the index while writing the mutations to the map. Change-Id: I276282388f59ca543835bfa5ec64986453b23fe1 --- pkg/index/receive.go | 102 ++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 44 deletions(-) diff --git a/pkg/index/receive.go b/pkg/index/receive.go index 19291f53a..10b94b8f8 100644 --- a/pkg/index/receive.go +++ b/pkg/index/receive.go @@ -71,6 +71,12 @@ func (ix *Index) reindex(br blob.Ref) { log.Printf("index: successfully reindexed %v", sb) } +type mutationMap map[string]string + +func (mm mutationMap) Set(k, v string) { + mm[k] = v +} + func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.SizedRef, err error) { sniffer := NewBlobSniffer(blobRef) written, err := io.Copy(sniffer, source) @@ -80,16 +86,13 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz sniffer.Parse() - bm := ix.s.BeginBatch() - - err = ix.populateMutation(blobRef, sniffer, bm) + mm, err := ix.populateMutationMap(blobRef, sniffer) if err != nil { return } - err = ix.s.CommitBatch(bm) - if err != nil { - return + if err := ix.commit(mm); err != nil { + return retsb, err } // TODO(bradfitz): log levels? These are generally noisy @@ -102,35 +105,48 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz return blob.SizedRef{blobRef, written}, nil } -// populateMutation populates keys & values into the provided BatchMutation. +// commit writes the contents of the mutationMap on a batch +// mutation and commits that batch. +func (ix *Index) commit(mm mutationMap) error { + bm := ix.s.BeginBatch() + for k, v := range mm { + bm.Set(k, v) + } + return ix.s.CommitBatch(bm) +} + +// populateMutationMap populates keys & values that will be committed +// into the returned map. // // the blobref can be trusted at this point (it's been fully consumed // and verified to match), and the sniffer has been populated. -func (ix *Index) populateMutation(br blob.Ref, sniffer *BlobSniffer, bm BatchMutation) error { - bm.Set("have:"+br.String(), fmt.Sprintf("%d", sniffer.Size())) - bm.Set("meta:"+br.String(), fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType())) +func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (mutationMap, error) { + mm := mutationMap{ + "have:" + br.String(): fmt.Sprintf("%d", sniffer.Size()), + "meta:" + br.String(): fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType()), + } if blob, ok := sniffer.SchemaBlob(); ok { switch blob.Type() { case "claim": - if err := ix.populateClaim(blob, bm); err != nil { - return err + if err := ix.populateClaim(blob, mm); err != nil { + return nil, err } case "permanode": - //if err := mi.populatePermanode(blobRef, camli, bm); err != nil { - //return err + //if err := mi.populatePermanode(blobRef, camli, mm); err != nil { + //return nil, err //} case "file": - if err := ix.populateFile(blob, bm); err != nil { - return err + if err := ix.populateFile(blob, mm); err != nil { + return nil, err } case "directory": - if err := ix.populateDir(blob, bm); err != nil { - return err + if err := ix.populateDir(blob, mm); err != nil { + return nil, err } } } - return nil + return mm, nil } // keepFirstN keeps the first N bytes written to it in Bytes. @@ -149,10 +165,9 @@ func (w *keepFirstN) Write(p []byte) (n int, err error) { return len(p), nil } -// blobref: of the file or schema blob -// blob: the parsed file schema blob -// bm: keys to populate -func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { +// b: the parsed file schema blob +// mm: keys to populate +func (ix *Index) populateFile(b *schema.Blob, mm mutationMap) error { var times []time.Time // all creation or mod times seen; may be zero times = append(times, b.ModTime()) @@ -191,7 +206,7 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { if imageBuf != nil { if conf, err := images.DecodeConfig(bytes.NewReader(imageBuf.Bytes)); err == nil { - bm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height))) + mm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height))) } if ft, err := schema.FileTime(bytes.NewReader(imageBuf.Bytes)); err == nil { log.Printf("filename %q exif = %v, %v", b.FileName(), ft, err) @@ -218,14 +233,14 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { } wholeRef := blob.RefFromHash(sha1) - bm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1") - bm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mime)) - bm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s)) + mm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1") + mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mime)) + mm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s)) if strings.HasPrefix(mime, "audio/") { tag, err := taglib.Decode(fr, fr.Size()) if err == nil { - indexMusic(tag, wholeRef, bm) + indexMusic(tag, wholeRef, mm) } else { log.Print("index: error parsing tag: ", err) } @@ -236,7 +251,7 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { // indexMusic adds mutations to index the wholeRef by most of the // fields in gotaglib.GenericTag. -func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, bm BatchMutation) { +func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, mm mutationMap) { const justYearLayout = "2006" var yearStr, trackStr string @@ -258,15 +273,14 @@ func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, bm BatchMutation) { for tag, value := range tags { if value != "" { - bm.Set(keyAudioTag.Key(tag, strings.ToLower(value), wholeRef), "1") + mm.Set(keyAudioTag.Key(tag, strings.ToLower(value), wholeRef), "1") } } } -// blobref: of the file or schema blob -// ss: the parsed file schema blob -// bm: keys to populate -func (ix *Index) populateDir(b *schema.Blob, bm BatchMutation) error { +// b: the parsed file schema blob +// mm: keys to populate +func (ix *Index) populateDir(b *schema.Blob, mm mutationMap) error { blobRef := b.BlobRef() // TODO(bradfitz): move the NewDirReader and FileName method off *schema.Blob and onto @@ -285,14 +299,14 @@ func (ix *Index) populateDir(b *schema.Blob, bm BatchMutation) error { return nil } - bm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), "")) + mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), "")) for _, br := range sts { - bm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1") + mm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1") } return nil } -func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error { +func (ix *Index) populateClaim(b *schema.Blob, mm mutationMap) error { br := b.BlobRef() claim, ok := b.AsClaim() @@ -319,13 +333,13 @@ func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error { } verifiedKeyId := vr.SignerKeyId - bm.Set("signerkeyid:"+vr.CamliSigner.String(), verifiedKeyId) + mm.Set("signerkeyid:"+vr.CamliSigner.String(), verifiedKeyId) recentKey := keyRecentPermanode.Key(verifiedKeyId, claim.ClaimDateString(), br) - bm.Set(recentKey, pnbr.String()) + mm.Set(recentKey, pnbr.String()) claimKey := keyPermanodeClaim.Key(pnbr, verifiedKeyId, claim.ClaimDateString(), br) - bm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value)) + mm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value)) if strings.HasPrefix(attr, "camliPath:") { targetRef, ok := blob.Parse(value) @@ -345,24 +359,24 @@ func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error { key := keyPathBackward.Key(verifiedKeyId, targetRef, claimRef) val := keyPathBackward.Val(claim.ClaimDateString(), baseRef, active, suffix) - bm.Set(key, val) + mm.Set(key, val) key = keyPathForward.Key(verifiedKeyId, baseRef, suffix, claim.ClaimDateString(), claimRef) val = keyPathForward.Val(active, targetRef) - bm.Set(key, val) + mm.Set(key, val) } } if search.IsIndexedAttribute(attr) { key := keySignerAttrValue.Key(verifiedKeyId, attr, value, claim.ClaimDateString(), br) - bm.Set(key, keySignerAttrValue.Val(pnbr)) + mm.Set(key, keySignerAttrValue.Val(pnbr)) } if search.IsBlobReferenceAttribute(attr) { targetRef, ok := blob.Parse(value) if ok { key := keyEdgeBackward.Key(targetRef, pnbr, br) - bm.Set(key, keyEdgeBackward.Val("permanode", "")) + mm.Set(key, keyEdgeBackward.Val("permanode", "")) } }