From bf2f09cab34b38970f6808cd535a769ed3d36777 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 2 Apr 2014 13:39:36 -0700 Subject: [PATCH] index: reschedule indexing a claim blob if public key blob isn't yet available Change-Id: Ie0174bf830eb4790080b2b5e7cdc4ea0af25406f --- pkg/index/index_test.go | 65 ++++++++++++++++++++++++++++++++++++ pkg/index/indextest/tests.go | 4 +++ pkg/index/receive.go | 6 ++-- pkg/jsonsign/verify.go | 10 +++--- 4 files changed, 78 insertions(+), 7 deletions(-) diff --git a/pkg/index/index_test.go b/pkg/index/index_test.go index 287193ab0..9ad021593 100644 --- a/pkg/index/index_test.go +++ b/pkg/index/index_test.go @@ -28,6 +28,7 @@ import ( "testing" "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" "camlistore.org/pkg/index" "camlistore.org/pkg/index/indextest" "camlistore.org/pkg/sorted" @@ -358,3 +359,67 @@ func TestOutOfOrderIndexing(t *testing.T) { } }) } + +func TestIndexingClaimMissingPubkey(t *testing.T) { + s := sorted.NewMemoryKeyValue() + idx, err := index.New(s) + if err != nil { + t.Fatal(err) + } + + id := indextest.NewIndexDeps(idx) + id.Fataler = t + + goodKeyFetcher := id.Index.KeyFetcher + emptyFetcher := new(test.Fetcher) + + pn := id.NewPermanode() + + // Prevent the index from being able to find the public key: + idx.KeyFetcher = emptyFetcher + + // This previous failed to upload, since the signer's public key was + // unavailable. + claimRef := id.SetAttribute(pn, "tag", "foo") + + t.Logf(" Claim is %v", claimRef) + t.Logf("Signer is %v", id.SignerBlobRef) + + // Verify that populateClaim noted the missing public key blob: + { + key := fmt.Sprintf("missing|%s|%s", claimRef, id.SignerBlobRef) + if got, err := s.Get(key); got == "" || err != nil { + t.Errorf("key %q missing (err: %v); want 1", key, err) + } + } + + // Now make it available again: + idx.KeyFetcher = idx.BlobSource + + if err := copyBlob(id.SignerBlobRef, idx.BlobSource.(*test.Fetcher), goodKeyFetcher); err != nil { + t.Errorf("Error copying public key to BlobSource: %v", err) + } + if err := copyBlob(id.SignerBlobRef, idx, goodKeyFetcher); err != nil { + t.Errorf("Error uploading public key to indexer: %v", err) + } + + idx.Exp_AwaitReindexing(t) + + // Verify that populateClaim noted the missing public key blob: + { + key := fmt.Sprintf("missing|%s|%s", claimRef, id.SignerBlobRef) + if got, err := s.Get(key); got != "" || err == nil { + t.Errorf("row %q still exists", key) + } + } +} + +func copyBlob(br blob.Ref, dst blobserver.BlobReceiver, src blob.Fetcher) error { + rc, _, err := src.Fetch(br) + if err != nil { + return err + } + defer rc.Close() + _, err = dst.ReceiveBlob(br, rc) + return err +} diff --git a/pkg/index/indextest/tests.go b/pkg/index/indextest/tests.go index 46b1dc751..64ff74d8b 100644 --- a/pkg/index/indextest/tests.go +++ b/pkg/index/indextest/tests.go @@ -104,6 +104,10 @@ func (id *IndexDeps) uploadAndSign(m *schema.Builder) blob.Ref { id.Fatalf("problem signing: " + err.Error()) } tb := &test.Blob{Contents: signed} + _, err = id.BlobSource.ReceiveBlob(tb.BlobRef(), tb.Reader()) + if err != nil { + id.Fatalf("public uploading signed blob to blob source, pre-indexing: %v, %v", tb.BlobRef(), err) + } _, err = id.Index.ReceiveBlob(tb.BlobRef(), tb.Reader()) if err != nil { id.Fatalf("problem indexing blob: %v\nblob was:\n%s", err, signed) diff --git a/pkg/index/receive.go b/pkg/index/receive.go index 6d682ac27..9339f1a7c 100644 --- a/pkg/index/receive.go +++ b/pkg/index/receive.go @@ -284,7 +284,7 @@ func (ix *Index) populateMutationMap(fetcher *missTrackFetcher, br blob.Ref, sni if blob, ok := sniffer.SchemaBlob(); ok { switch blob.Type() { case "claim": - if err := ix.populateClaim(blob, mm); err != nil { + if err := ix.populateClaim(fetcher, blob, mm); err != nil { return nil, err } case "file": @@ -647,7 +647,7 @@ func (ix *Index) populateDeleteClaim(cl schema.Claim, vr *jsonsign.VerifyRequest mm.Set(claimKey, keyPermanodeClaim.Val(cl.ClaimType(), attr, value, vr.CamliSigner)) } -func (ix *Index) populateClaim(b *schema.Blob, mm *mutationMap) error { +func (ix *Index) populateClaim(fetcher *missTrackFetcher, b *schema.Blob, mm *mutationMap) error { br := b.BlobRef() claim, ok := b.AsClaim() @@ -656,7 +656,7 @@ func (ix *Index) populateClaim(b *schema.Blob, mm *mutationMap) error { return nil } - vr := jsonsign.NewVerificationRequest(b.JSON(), ix.KeyFetcher) + vr := jsonsign.NewVerificationRequest(b.JSON(), blob.NewSerialFetcher(ix.KeyFetcher, fetcher)) if !vr.Verify() { // TODO(bradfitz): ask if the vr.Err.(jsonsign.Error).IsPermanent() and retry // later if it's not permanent? or maybe do this up a level? diff --git a/pkg/jsonsign/verify.go b/pkg/jsonsign/verify.go index 62ad9d5f5..460f33bbe 100644 --- a/pkg/jsonsign/verify.go +++ b/pkg/jsonsign/verify.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "log" + "os" "strings" "camlistore.org/pkg/blob" @@ -137,12 +138,13 @@ func (vr *VerifyRequest) ParsePayloadMap() bool { func (vr *VerifyRequest) FindAndParsePublicKeyBlob() bool { reader, _, err := vr.fetcher.Fetch(vr.CamliSigner) + if err == os.ErrNotExist { + vr.Err = camerrors.ErrMissingKeyBlob + return false + } if err != nil { log.Printf("error fetching public key blob %v: %v", vr.CamliSigner, err) - // TODO(mpl): we're losing some info here, so maybe - // create an error type that contains the reason, - // instead of logging the reason. - vr.Err = camerrors.ErrMissingKeyBlob + vr.Err = err return false } defer reader.Close()