From db50bae0c442d316cb2411ad05f25f7830cf5d5d Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 7 Nov 2016 21:56:15 -0800 Subject: [PATCH] pkg/index: read blob before acquiring index mutex For #878 Change-Id: I8abaf5d923fc6dee7e8a9a3e84f82d4cf7484329 --- pkg/index/kvfile_test.go | 6 ++++-- pkg/index/receive.go | 29 ++++++++++++++++------------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/index/kvfile_test.go b/pkg/index/kvfile_test.go index 691c344d9..69dfbab21 100644 --- a/pkg/index/kvfile_test.go +++ b/pkg/index/kvfile_test.go @@ -57,8 +57,10 @@ func indexTest(t *testing.T, sortedGenfn func(t *testing.T) (sorted.KeyValue, func()), tfn func(*testing.T, func() *index.Index)) { defer test.TLog(t)() - var mu sync.Mutex // guards cleanups - var cleanups []func() + var ( + mu sync.Mutex // guards cleanups + cleanups []func() + ) defer func() { mu.Lock() // never unlocked for _, fn := range cleanups { diff --git a/pkg/index/receive.go b/pkg/index/receive.go index c87a740ad..765b8b441 100644 --- a/pkg/index/receive.go +++ b/pkg/index/receive.go @@ -189,8 +189,14 @@ func (ix *Index) removeAllMissingEdges(br blob.Ref) { } } -func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.SizedRef, err error) { - ctx := context.TODO() +func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) { + // Read from source before acquiring ix.Lock (Issue 878): + sniffer := NewBlobSniffer(blobRef) + written, err := io.Copy(sniffer, source) + if err != nil { + return blob.SizedRef{}, err + } + sbr := blob.SizedRef{Ref: blobRef, Size: uint32(written)} ix.Lock() defer ix.Unlock() @@ -204,14 +210,10 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz } } }() - sniffer := NewBlobSniffer(blobRef) - written, err := io.Copy(sniffer, source) - if err != nil { - return - } + if haveVal, haveErr := ix.s.Get("have:" + blobRef.String()); haveErr == nil { if strings.HasSuffix(haveVal, "|indexed") { - return blob.SizedRef{Ref: blobRef, Size: uint32(written)}, nil + return sbr, nil } } @@ -221,10 +223,11 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz fetcher: ix.blobSource, } + ctx := context.TODO() mm, err := ix.populateMutationMap(ctx, fetcher, blobRef, sniffer) if err != nil { if err != errMissingDep { - return + return blob.SizedRef{}, err } fetcher.mu.Lock() defer fetcher.mu.Unlock() @@ -243,18 +246,18 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz // successfully recorded that the blob isn't // indexed, but we'll reindex it later once // the dependent blobs arrive. - return blob.SizedRef{Ref: blobRef, Size: uint32(written)}, nil + return sbr, nil } - return + return blob.SizedRef{}, err } if err := ix.commit(mm); err != nil { - return retsb, err + return blob.SizedRef{}, err } if c := ix.corpus; c != nil { if err = c.addBlob(ctx, blobRef, mm); err != nil { - return + return blob.SizedRef{}, err } }