diff --git a/pkg/index/receive.go b/pkg/index/receive.go index 19291f53a..10b94b8f8 100644 --- a/pkg/index/receive.go +++ b/pkg/index/receive.go @@ -71,6 +71,12 @@ func (ix *Index) reindex(br blob.Ref) { log.Printf("index: successfully reindexed %v", sb) } +type mutationMap map[string]string + +func (mm mutationMap) Set(k, v string) { + mm[k] = v +} + func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.SizedRef, err error) { sniffer := NewBlobSniffer(blobRef) written, err := io.Copy(sniffer, source) @@ -80,16 +86,13 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz sniffer.Parse() - bm := ix.s.BeginBatch() - - err = ix.populateMutation(blobRef, sniffer, bm) + mm, err := ix.populateMutationMap(blobRef, sniffer) if err != nil { return } - err = ix.s.CommitBatch(bm) - if err != nil { - return + if err := ix.commit(mm); err != nil { + return retsb, err } // TODO(bradfitz): log levels? These are generally noisy @@ -102,35 +105,48 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz return blob.SizedRef{blobRef, written}, nil } -// populateMutation populates keys & values into the provided BatchMutation. +// commit writes the contents of the mutationMap on a batch +// mutation and commits that batch. +func (ix *Index) commit(mm mutationMap) error { + bm := ix.s.BeginBatch() + for k, v := range mm { + bm.Set(k, v) + } + return ix.s.CommitBatch(bm) +} + +// populateMutationMap populates keys & values that will be committed +// into the returned map. // // the blobref can be trusted at this point (it's been fully consumed // and verified to match), and the sniffer has been populated. -func (ix *Index) populateMutation(br blob.Ref, sniffer *BlobSniffer, bm BatchMutation) error { - bm.Set("have:"+br.String(), fmt.Sprintf("%d", sniffer.Size())) - bm.Set("meta:"+br.String(), fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType())) +func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (mutationMap, error) { + mm := mutationMap{ + "have:" + br.String(): fmt.Sprintf("%d", sniffer.Size()), + "meta:" + br.String(): fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType()), + } if blob, ok := sniffer.SchemaBlob(); ok { switch blob.Type() { case "claim": - if err := ix.populateClaim(blob, bm); err != nil { - return err + if err := ix.populateClaim(blob, mm); err != nil { + return nil, err } case "permanode": - //if err := mi.populatePermanode(blobRef, camli, bm); err != nil { - //return err + //if err := mi.populatePermanode(blobRef, camli, mm); err != nil { + //return nil, err //} case "file": - if err := ix.populateFile(blob, bm); err != nil { - return err + if err := ix.populateFile(blob, mm); err != nil { + return nil, err } case "directory": - if err := ix.populateDir(blob, bm); err != nil { - return err + if err := ix.populateDir(blob, mm); err != nil { + return nil, err } } } - return nil + return mm, nil } // keepFirstN keeps the first N bytes written to it in Bytes. @@ -149,10 +165,9 @@ func (w *keepFirstN) Write(p []byte) (n int, err error) { return len(p), nil } -// blobref: of the file or schema blob -// blob: the parsed file schema blob -// bm: keys to populate -func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { +// b: the parsed file schema blob +// mm: keys to populate +func (ix *Index) populateFile(b *schema.Blob, mm mutationMap) error { var times []time.Time // all creation or mod times seen; may be zero times = append(times, b.ModTime()) @@ -191,7 +206,7 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { if imageBuf != nil { if conf, err := images.DecodeConfig(bytes.NewReader(imageBuf.Bytes)); err == nil { - bm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height))) + mm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height))) } if ft, err := schema.FileTime(bytes.NewReader(imageBuf.Bytes)); err == nil { log.Printf("filename %q exif = %v, %v", b.FileName(), ft, err) @@ -218,14 +233,14 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { } wholeRef := blob.RefFromHash(sha1) - bm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1") - bm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mime)) - bm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s)) + mm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1") + mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mime)) + mm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s)) if strings.HasPrefix(mime, "audio/") { tag, err := taglib.Decode(fr, fr.Size()) if err == nil { - indexMusic(tag, wholeRef, bm) + indexMusic(tag, wholeRef, mm) } else { log.Print("index: error parsing tag: ", err) } @@ -236,7 +251,7 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error { // indexMusic adds mutations to index the wholeRef by most of the // fields in gotaglib.GenericTag. -func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, bm BatchMutation) { +func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, mm mutationMap) { const justYearLayout = "2006" var yearStr, trackStr string @@ -258,15 +273,14 @@ func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, bm BatchMutation) { for tag, value := range tags { if value != "" { - bm.Set(keyAudioTag.Key(tag, strings.ToLower(value), wholeRef), "1") + mm.Set(keyAudioTag.Key(tag, strings.ToLower(value), wholeRef), "1") } } } -// blobref: of the file or schema blob -// ss: the parsed file schema blob -// bm: keys to populate -func (ix *Index) populateDir(b *schema.Blob, bm BatchMutation) error { +// b: the parsed file schema blob +// mm: keys to populate +func (ix *Index) populateDir(b *schema.Blob, mm mutationMap) error { blobRef := b.BlobRef() // TODO(bradfitz): move the NewDirReader and FileName method off *schema.Blob and onto @@ -285,14 +299,14 @@ func (ix *Index) populateDir(b *schema.Blob, bm BatchMutation) error { return nil } - bm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), "")) + mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), "")) for _, br := range sts { - bm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1") + mm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1") } return nil } -func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error { +func (ix *Index) populateClaim(b *schema.Blob, mm mutationMap) error { br := b.BlobRef() claim, ok := b.AsClaim() @@ -319,13 +333,13 @@ func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error { } verifiedKeyId := vr.SignerKeyId - bm.Set("signerkeyid:"+vr.CamliSigner.String(), verifiedKeyId) + mm.Set("signerkeyid:"+vr.CamliSigner.String(), verifiedKeyId) recentKey := keyRecentPermanode.Key(verifiedKeyId, claim.ClaimDateString(), br) - bm.Set(recentKey, pnbr.String()) + mm.Set(recentKey, pnbr.String()) claimKey := keyPermanodeClaim.Key(pnbr, verifiedKeyId, claim.ClaimDateString(), br) - bm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value)) + mm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value)) if strings.HasPrefix(attr, "camliPath:") { targetRef, ok := blob.Parse(value) @@ -345,24 +359,24 @@ func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error { key := keyPathBackward.Key(verifiedKeyId, targetRef, claimRef) val := keyPathBackward.Val(claim.ClaimDateString(), baseRef, active, suffix) - bm.Set(key, val) + mm.Set(key, val) key = keyPathForward.Key(verifiedKeyId, baseRef, suffix, claim.ClaimDateString(), claimRef) val = keyPathForward.Val(active, targetRef) - bm.Set(key, val) + mm.Set(key, val) } } if search.IsIndexedAttribute(attr) { key := keySignerAttrValue.Key(verifiedKeyId, attr, value, claim.ClaimDateString(), br) - bm.Set(key, keySignerAttrValue.Val(pnbr)) + mm.Set(key, keySignerAttrValue.Val(pnbr)) } if search.IsBlobReferenceAttribute(attr) { targetRef, ok := blob.Parse(value) if ok { key := keyEdgeBackward.Key(targetRef, pnbr, br) - bm.Set(key, keyEdgeBackward.Val("permanode", "")) + mm.Set(key, keyEdgeBackward.Val("permanode", "")) } }