mirror of https://github.com/perkeep/perkeep.git
pkg/index: use a map to populate the mutations
When indexing upon a blob reception, we first populate all the mutations in a map instead of in a batch mutation. Then we transfer all the mutations in a batch and commit it immediately. This makes the window when the batch mutation is open much shorter, and will ease future indexing because it allows reading from the index while writing the mutations to the map. Change-Id: I276282388f59ca543835bfa5ec64986453b23fe1
This commit is contained in:
parent
e5eade1bd6
commit
e03d923fe1
|
@ -71,6 +71,12 @@ func (ix *Index) reindex(br blob.Ref) {
|
|||
log.Printf("index: successfully reindexed %v", sb)
|
||||
}
|
||||
|
||||
type mutationMap map[string]string
|
||||
|
||||
func (mm mutationMap) Set(k, v string) {
|
||||
mm[k] = v
|
||||
}
|
||||
|
||||
func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.SizedRef, err error) {
|
||||
sniffer := NewBlobSniffer(blobRef)
|
||||
written, err := io.Copy(sniffer, source)
|
||||
|
@ -80,16 +86,13 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
|
|||
|
||||
sniffer.Parse()
|
||||
|
||||
bm := ix.s.BeginBatch()
|
||||
|
||||
err = ix.populateMutation(blobRef, sniffer, bm)
|
||||
mm, err := ix.populateMutationMap(blobRef, sniffer)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = ix.s.CommitBatch(bm)
|
||||
if err != nil {
|
||||
return
|
||||
if err := ix.commit(mm); err != nil {
|
||||
return retsb, err
|
||||
}
|
||||
|
||||
// TODO(bradfitz): log levels? These are generally noisy
|
||||
|
@ -102,35 +105,48 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
|
|||
return blob.SizedRef{blobRef, written}, nil
|
||||
}
|
||||
|
||||
// populateMutation populates keys & values into the provided BatchMutation.
|
||||
// commit writes the contents of the mutationMap on a batch
|
||||
// mutation and commits that batch.
|
||||
func (ix *Index) commit(mm mutationMap) error {
|
||||
bm := ix.s.BeginBatch()
|
||||
for k, v := range mm {
|
||||
bm.Set(k, v)
|
||||
}
|
||||
return ix.s.CommitBatch(bm)
|
||||
}
|
||||
|
||||
// populateMutationMap populates keys & values that will be committed
|
||||
// into the returned map.
|
||||
//
|
||||
// the blobref can be trusted at this point (it's been fully consumed
|
||||
// and verified to match), and the sniffer has been populated.
|
||||
func (ix *Index) populateMutation(br blob.Ref, sniffer *BlobSniffer, bm BatchMutation) error {
|
||||
bm.Set("have:"+br.String(), fmt.Sprintf("%d", sniffer.Size()))
|
||||
bm.Set("meta:"+br.String(), fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType()))
|
||||
func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (mutationMap, error) {
|
||||
mm := mutationMap{
|
||||
"have:" + br.String(): fmt.Sprintf("%d", sniffer.Size()),
|
||||
"meta:" + br.String(): fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType()),
|
||||
}
|
||||
|
||||
if blob, ok := sniffer.SchemaBlob(); ok {
|
||||
switch blob.Type() {
|
||||
case "claim":
|
||||
if err := ix.populateClaim(blob, bm); err != nil {
|
||||
return err
|
||||
if err := ix.populateClaim(blob, mm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "permanode":
|
||||
//if err := mi.populatePermanode(blobRef, camli, bm); err != nil {
|
||||
//return err
|
||||
//if err := mi.populatePermanode(blobRef, camli, mm); err != nil {
|
||||
//return nil, err
|
||||
//}
|
||||
case "file":
|
||||
if err := ix.populateFile(blob, bm); err != nil {
|
||||
return err
|
||||
if err := ix.populateFile(blob, mm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "directory":
|
||||
if err := ix.populateDir(blob, bm); err != nil {
|
||||
return err
|
||||
if err := ix.populateDir(blob, mm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return mm, nil
|
||||
}
|
||||
|
||||
// keepFirstN keeps the first N bytes written to it in Bytes.
|
||||
|
@ -149,10 +165,9 @@ func (w *keepFirstN) Write(p []byte) (n int, err error) {
|
|||
return len(p), nil
|
||||
}
|
||||
|
||||
// blobref: of the file or schema blob
|
||||
// blob: the parsed file schema blob
|
||||
// bm: keys to populate
|
||||
func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error {
|
||||
// b: the parsed file schema blob
|
||||
// mm: keys to populate
|
||||
func (ix *Index) populateFile(b *schema.Blob, mm mutationMap) error {
|
||||
var times []time.Time // all creation or mod times seen; may be zero
|
||||
times = append(times, b.ModTime())
|
||||
|
||||
|
@ -191,7 +206,7 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error {
|
|||
|
||||
if imageBuf != nil {
|
||||
if conf, err := images.DecodeConfig(bytes.NewReader(imageBuf.Bytes)); err == nil {
|
||||
bm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height)))
|
||||
mm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height)))
|
||||
}
|
||||
if ft, err := schema.FileTime(bytes.NewReader(imageBuf.Bytes)); err == nil {
|
||||
log.Printf("filename %q exif = %v, %v", b.FileName(), ft, err)
|
||||
|
@ -218,14 +233,14 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error {
|
|||
}
|
||||
|
||||
wholeRef := blob.RefFromHash(sha1)
|
||||
bm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1")
|
||||
bm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mime))
|
||||
bm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s))
|
||||
mm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1")
|
||||
mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mime))
|
||||
mm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s))
|
||||
|
||||
if strings.HasPrefix(mime, "audio/") {
|
||||
tag, err := taglib.Decode(fr, fr.Size())
|
||||
if err == nil {
|
||||
indexMusic(tag, wholeRef, bm)
|
||||
indexMusic(tag, wholeRef, mm)
|
||||
} else {
|
||||
log.Print("index: error parsing tag: ", err)
|
||||
}
|
||||
|
@ -236,7 +251,7 @@ func (ix *Index) populateFile(b *schema.Blob, bm BatchMutation) error {
|
|||
|
||||
// indexMusic adds mutations to index the wholeRef by most of the
|
||||
// fields in gotaglib.GenericTag.
|
||||
func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, bm BatchMutation) {
|
||||
func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, mm mutationMap) {
|
||||
const justYearLayout = "2006"
|
||||
|
||||
var yearStr, trackStr string
|
||||
|
@ -258,15 +273,14 @@ func indexMusic(tag taglib.GenericTag, wholeRef blob.Ref, bm BatchMutation) {
|
|||
|
||||
for tag, value := range tags {
|
||||
if value != "" {
|
||||
bm.Set(keyAudioTag.Key(tag, strings.ToLower(value), wholeRef), "1")
|
||||
mm.Set(keyAudioTag.Key(tag, strings.ToLower(value), wholeRef), "1")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// blobref: of the file or schema blob
|
||||
// ss: the parsed file schema blob
|
||||
// bm: keys to populate
|
||||
func (ix *Index) populateDir(b *schema.Blob, bm BatchMutation) error {
|
||||
// b: the parsed file schema blob
|
||||
// mm: keys to populate
|
||||
func (ix *Index) populateDir(b *schema.Blob, mm mutationMap) error {
|
||||
blobRef := b.BlobRef()
|
||||
// TODO(bradfitz): move the NewDirReader and FileName method off *schema.Blob and onto
|
||||
|
||||
|
@ -285,14 +299,14 @@ func (ix *Index) populateDir(b *schema.Blob, bm BatchMutation) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
bm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), ""))
|
||||
mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), ""))
|
||||
for _, br := range sts {
|
||||
bm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1")
|
||||
mm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error {
|
||||
func (ix *Index) populateClaim(b *schema.Blob, mm mutationMap) error {
|
||||
br := b.BlobRef()
|
||||
|
||||
claim, ok := b.AsClaim()
|
||||
|
@ -319,13 +333,13 @@ func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error {
|
|||
}
|
||||
verifiedKeyId := vr.SignerKeyId
|
||||
|
||||
bm.Set("signerkeyid:"+vr.CamliSigner.String(), verifiedKeyId)
|
||||
mm.Set("signerkeyid:"+vr.CamliSigner.String(), verifiedKeyId)
|
||||
|
||||
recentKey := keyRecentPermanode.Key(verifiedKeyId, claim.ClaimDateString(), br)
|
||||
bm.Set(recentKey, pnbr.String())
|
||||
mm.Set(recentKey, pnbr.String())
|
||||
|
||||
claimKey := keyPermanodeClaim.Key(pnbr, verifiedKeyId, claim.ClaimDateString(), br)
|
||||
bm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value))
|
||||
mm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value))
|
||||
|
||||
if strings.HasPrefix(attr, "camliPath:") {
|
||||
targetRef, ok := blob.Parse(value)
|
||||
|
@ -345,24 +359,24 @@ func (ix *Index) populateClaim(b *schema.Blob, bm BatchMutation) error {
|
|||
|
||||
key := keyPathBackward.Key(verifiedKeyId, targetRef, claimRef)
|
||||
val := keyPathBackward.Val(claim.ClaimDateString(), baseRef, active, suffix)
|
||||
bm.Set(key, val)
|
||||
mm.Set(key, val)
|
||||
|
||||
key = keyPathForward.Key(verifiedKeyId, baseRef, suffix, claim.ClaimDateString(), claimRef)
|
||||
val = keyPathForward.Val(active, targetRef)
|
||||
bm.Set(key, val)
|
||||
mm.Set(key, val)
|
||||
}
|
||||
}
|
||||
|
||||
if search.IsIndexedAttribute(attr) {
|
||||
key := keySignerAttrValue.Key(verifiedKeyId, attr, value, claim.ClaimDateString(), br)
|
||||
bm.Set(key, keySignerAttrValue.Val(pnbr))
|
||||
mm.Set(key, keySignerAttrValue.Val(pnbr))
|
||||
}
|
||||
|
||||
if search.IsBlobReferenceAttribute(attr) {
|
||||
targetRef, ok := blob.Parse(value)
|
||||
if ok {
|
||||
key := keyEdgeBackward.Key(targetRef, pnbr, br)
|
||||
bm.Set(key, keyEdgeBackward.Val("permanode", ""))
|
||||
mm.Set(key, keyEdgeBackward.Val("permanode", ""))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue