mirror of https://github.com/perkeep/perkeep.git
pkg/index: read blob before acquiring index mutex
For #878 Change-Id: I8abaf5d923fc6dee7e8a9a3e84f82d4cf7484329
This commit is contained in:
parent
fe09b0a28b
commit
db50bae0c4
|
@ -57,8 +57,10 @@ func indexTest(t *testing.T,
|
|||
sortedGenfn func(t *testing.T) (sorted.KeyValue, func()),
|
||||
tfn func(*testing.T, func() *index.Index)) {
|
||||
defer test.TLog(t)()
|
||||
var mu sync.Mutex // guards cleanups
|
||||
var cleanups []func()
|
||||
var (
|
||||
mu sync.Mutex // guards cleanups
|
||||
cleanups []func()
|
||||
)
|
||||
defer func() {
|
||||
mu.Lock() // never unlocked
|
||||
for _, fn := range cleanups {
|
||||
|
|
|
@ -189,8 +189,14 @@ func (ix *Index) removeAllMissingEdges(br blob.Ref) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.SizedRef, err error) {
|
||||
ctx := context.TODO()
|
||||
func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) {
|
||||
// Read from source before acquiring ix.Lock (Issue 878):
|
||||
sniffer := NewBlobSniffer(blobRef)
|
||||
written, err := io.Copy(sniffer, source)
|
||||
if err != nil {
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
sbr := blob.SizedRef{Ref: blobRef, Size: uint32(written)}
|
||||
|
||||
ix.Lock()
|
||||
defer ix.Unlock()
|
||||
|
@ -204,14 +210,10 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
|
|||
}
|
||||
}
|
||||
}()
|
||||
sniffer := NewBlobSniffer(blobRef)
|
||||
written, err := io.Copy(sniffer, source)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if haveVal, haveErr := ix.s.Get("have:" + blobRef.String()); haveErr == nil {
|
||||
if strings.HasSuffix(haveVal, "|indexed") {
|
||||
return blob.SizedRef{Ref: blobRef, Size: uint32(written)}, nil
|
||||
return sbr, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,10 +223,11 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
|
|||
fetcher: ix.blobSource,
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
mm, err := ix.populateMutationMap(ctx, fetcher, blobRef, sniffer)
|
||||
if err != nil {
|
||||
if err != errMissingDep {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
fetcher.mu.Lock()
|
||||
defer fetcher.mu.Unlock()
|
||||
|
@ -243,18 +246,18 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
|
|||
// successfully recorded that the blob isn't
|
||||
// indexed, but we'll reindex it later once
|
||||
// the dependent blobs arrive.
|
||||
return blob.SizedRef{Ref: blobRef, Size: uint32(written)}, nil
|
||||
return sbr, nil
|
||||
}
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
if err := ix.commit(mm); err != nil {
|
||||
return retsb, err
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
|
||||
if c := ix.corpus; c != nil {
|
||||
if err = c.addBlob(ctx, blobRef, mm); err != nil {
|
||||
return
|
||||
return blob.SizedRef{}, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue