mirror of https://github.com/perkeep/perkeep.git
Merge "pkg/blobserver: fix deadlock in MergedEnumerate"
This commit is contained in:
commit
1383f340db
|
@ -614,7 +614,11 @@ func (s enumerator) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRe
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dest <- blob.SizedRef{Ref: br, Size: size}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case dest <- blob.SizedRef{Ref: br, Size: size}:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -44,21 +44,21 @@ func mergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, nsrc int, g
|
|||
subctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
startEnum := func(source BlobEnumerator) (*blob.ChanPeeker, <-chan error) {
|
||||
errch := make(chan error, nsrc+1) // +1 for nil
|
||||
startEnum := func(source BlobEnumerator) *blob.ChanPeeker {
|
||||
ch := make(chan blob.SizedRef, buffered)
|
||||
errch := make(chan error, 1)
|
||||
go func() {
|
||||
errch <- source.EnumerateBlobs(subctx, ch, after, limit)
|
||||
err := source.EnumerateBlobs(subctx, ch, after, limit)
|
||||
if err != nil {
|
||||
errch <- err
|
||||
}
|
||||
}()
|
||||
return &blob.ChanPeeker{Ch: ch}, errch
|
||||
return &blob.ChanPeeker{Ch: ch}
|
||||
}
|
||||
|
||||
peekers := make([]*blob.ChanPeeker, 0, nsrc)
|
||||
errs := make([]<-chan error, 0, nsrc)
|
||||
for i := 0; i < nsrc; i++ {
|
||||
peeker, errch := startEnum(getSource(i))
|
||||
peekers = append(peekers, peeker)
|
||||
errs = append(errs, errch)
|
||||
peekers = append(peekers, startEnum(getSource(i)))
|
||||
}
|
||||
|
||||
nSent := 0
|
||||
|
@ -85,17 +85,18 @@ func mergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, nsrc int, g
|
|||
break
|
||||
}
|
||||
|
||||
dest <- lowest
|
||||
nSent++
|
||||
lastSent = lowest.Ref
|
||||
select {
|
||||
case dest <- lowest:
|
||||
nSent++
|
||||
lastSent = lowest.Ref
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errch:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If any part returns an error, we return an error.
|
||||
var retErr error
|
||||
for _, errch := range errs {
|
||||
if err := <-errch; err != nil {
|
||||
retErr = err
|
||||
}
|
||||
}
|
||||
return retErr
|
||||
errch <- nil
|
||||
return <-errch
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package blobserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
@ -75,6 +76,17 @@ var mergedTests = []struct {
|
|||
limit: 3,
|
||||
want: []string{"foo-a", "foo-b", "foo-c"},
|
||||
},
|
||||
{
|
||||
// Illustrates deadlock bugfix from
|
||||
// https://camlistore-review.googlesource.com/5946
|
||||
name: "limit2",
|
||||
srcs: []BlobEnumerator{
|
||||
enumBlobRange("foo", 0, buffered+10),
|
||||
enumBlobRange("bar", 0, buffered+10),
|
||||
},
|
||||
limit: buffered + 10,
|
||||
want: strRange("bar", 0, buffered+10),
|
||||
},
|
||||
{
|
||||
name: "no sources",
|
||||
srcs: []BlobEnumerator{},
|
||||
|
@ -122,6 +134,18 @@ func enumBlobs(v ...string) BlobEnumerator {
|
|||
return testEnum{v}
|
||||
}
|
||||
|
||||
func enumBlobRange(base string, start, count int) BlobEnumerator {
|
||||
return testEnum{strRange(base, start, count)}
|
||||
}
|
||||
|
||||
func strRange(base string, start, count int) []string {
|
||||
v := make([]string, count)
|
||||
for i := 0; i < count; i++ {
|
||||
v[i] = fmt.Sprintf("%s-%04d", base, start+i)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
type testEnum struct {
|
||||
blobs []string
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue