index: reschedule indexing a claim blob if public key blob isn't yet available

Change-Id: Ie0174bf830eb4790080b2b5e7cdc4ea0af25406f
This commit is contained in:
Brad Fitzpatrick 2014-04-02 13:39:36 -07:00
parent bfb13b62b9
commit bf2f09cab3
4 changed files with 78 additions and 7 deletions

View File

@ -28,6 +28,7 @@ import (
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/indextest"
"camlistore.org/pkg/sorted"
@ -358,3 +359,67 @@ func TestOutOfOrderIndexing(t *testing.T) {
}
})
}
func TestIndexingClaimMissingPubkey(t *testing.T) {
s := sorted.NewMemoryKeyValue()
idx, err := index.New(s)
if err != nil {
t.Fatal(err)
}
id := indextest.NewIndexDeps(idx)
id.Fataler = t
goodKeyFetcher := id.Index.KeyFetcher
emptyFetcher := new(test.Fetcher)
pn := id.NewPermanode()
// Prevent the index from being able to find the public key:
idx.KeyFetcher = emptyFetcher
// This previous failed to upload, since the signer's public key was
// unavailable.
claimRef := id.SetAttribute(pn, "tag", "foo")
t.Logf(" Claim is %v", claimRef)
t.Logf("Signer is %v", id.SignerBlobRef)
// Verify that populateClaim noted the missing public key blob:
{
key := fmt.Sprintf("missing|%s|%s", claimRef, id.SignerBlobRef)
if got, err := s.Get(key); got == "" || err != nil {
t.Errorf("key %q missing (err: %v); want 1", key, err)
}
}
// Now make it available again:
idx.KeyFetcher = idx.BlobSource
if err := copyBlob(id.SignerBlobRef, idx.BlobSource.(*test.Fetcher), goodKeyFetcher); err != nil {
t.Errorf("Error copying public key to BlobSource: %v", err)
}
if err := copyBlob(id.SignerBlobRef, idx, goodKeyFetcher); err != nil {
t.Errorf("Error uploading public key to indexer: %v", err)
}
idx.Exp_AwaitReindexing(t)
// Verify that populateClaim noted the missing public key blob:
{
key := fmt.Sprintf("missing|%s|%s", claimRef, id.SignerBlobRef)
if got, err := s.Get(key); got != "" || err == nil {
t.Errorf("row %q still exists", key)
}
}
}
func copyBlob(br blob.Ref, dst blobserver.BlobReceiver, src blob.Fetcher) error {
rc, _, err := src.Fetch(br)
if err != nil {
return err
}
defer rc.Close()
_, err = dst.ReceiveBlob(br, rc)
return err
}

View File

@ -104,6 +104,10 @@ func (id *IndexDeps) uploadAndSign(m *schema.Builder) blob.Ref {
id.Fatalf("problem signing: " + err.Error())
}
tb := &test.Blob{Contents: signed}
_, err = id.BlobSource.ReceiveBlob(tb.BlobRef(), tb.Reader())
if err != nil {
id.Fatalf("public uploading signed blob to blob source, pre-indexing: %v, %v", tb.BlobRef(), err)
}
_, err = id.Index.ReceiveBlob(tb.BlobRef(), tb.Reader())
if err != nil {
id.Fatalf("problem indexing blob: %v\nblob was:\n%s", err, signed)

View File

@ -284,7 +284,7 @@ func (ix *Index) populateMutationMap(fetcher *missTrackFetcher, br blob.Ref, sni
if blob, ok := sniffer.SchemaBlob(); ok {
switch blob.Type() {
case "claim":
if err := ix.populateClaim(blob, mm); err != nil {
if err := ix.populateClaim(fetcher, blob, mm); err != nil {
return nil, err
}
case "file":
@ -647,7 +647,7 @@ func (ix *Index) populateDeleteClaim(cl schema.Claim, vr *jsonsign.VerifyRequest
mm.Set(claimKey, keyPermanodeClaim.Val(cl.ClaimType(), attr, value, vr.CamliSigner))
}
func (ix *Index) populateClaim(b *schema.Blob, mm *mutationMap) error {
func (ix *Index) populateClaim(fetcher *missTrackFetcher, b *schema.Blob, mm *mutationMap) error {
br := b.BlobRef()
claim, ok := b.AsClaim()
@ -656,7 +656,7 @@ func (ix *Index) populateClaim(b *schema.Blob, mm *mutationMap) error {
return nil
}
vr := jsonsign.NewVerificationRequest(b.JSON(), ix.KeyFetcher)
vr := jsonsign.NewVerificationRequest(b.JSON(), blob.NewSerialFetcher(ix.KeyFetcher, fetcher))
if !vr.Verify() {
// TODO(bradfitz): ask if the vr.Err.(jsonsign.Error).IsPermanent() and retry
// later if it's not permanent? or maybe do this up a level?

View File

@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"log"
"os"
"strings"
"camlistore.org/pkg/blob"
@ -137,12 +138,13 @@ func (vr *VerifyRequest) ParsePayloadMap() bool {
func (vr *VerifyRequest) FindAndParsePublicKeyBlob() bool {
reader, _, err := vr.fetcher.Fetch(vr.CamliSigner)
if err == os.ErrNotExist {
vr.Err = camerrors.ErrMissingKeyBlob
return false
}
if err != nil {
log.Printf("error fetching public key blob %v: %v", vr.CamliSigner, err)
// TODO(mpl): we're losing some info here, so maybe
// create an error type that contains the reason,
// instead of logging the reason.
vr.Err = camerrors.ErrMissingKeyBlob
vr.Err = err
return false
}
defer reader.Close()