diff --git a/pkg/blobserver/blobpacked/blobpacked.go b/pkg/blobserver/blobpacked/blobpacked.go old mode 100644 new mode 100755 index 9a18ae386..fb492e0c9 --- a/pkg/blobserver/blobpacked/blobpacked.go +++ b/pkg/blobserver/blobpacked/blobpacked.go @@ -614,7 +614,11 @@ func (s enumerator) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRe if err != nil { return err } - dest <- blob.SizedRef{Ref: br, Size: size} + select { + case <-ctx.Done(): + return ctx.Err() + case dest <- blob.SizedRef{Ref: br, Size: size}: + } } return nil } diff --git a/pkg/blobserver/mergedenum.go b/pkg/blobserver/mergedenum.go old mode 100644 new mode 100755 index c2e04010a..0f735b796 --- a/pkg/blobserver/mergedenum.go +++ b/pkg/blobserver/mergedenum.go @@ -44,21 +44,21 @@ func mergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, nsrc int, g subctx, cancel := context.WithCancel(ctx) defer cancel() - startEnum := func(source BlobEnumerator) (*blob.ChanPeeker, <-chan error) { + errch := make(chan error, nsrc+1) // +1 for nil + startEnum := func(source BlobEnumerator) *blob.ChanPeeker { ch := make(chan blob.SizedRef, buffered) - errch := make(chan error, 1) go func() { - errch <- source.EnumerateBlobs(subctx, ch, after, limit) + err := source.EnumerateBlobs(subctx, ch, after, limit) + if err != nil { + errch <- err + } }() - return &blob.ChanPeeker{Ch: ch}, errch + return &blob.ChanPeeker{Ch: ch} } peekers := make([]*blob.ChanPeeker, 0, nsrc) - errs := make([]<-chan error, 0, nsrc) for i := 0; i < nsrc; i++ { - peeker, errch := startEnum(getSource(i)) - peekers = append(peekers, peeker) - errs = append(errs, errch) + peekers = append(peekers, startEnum(getSource(i))) } nSent := 0 @@ -85,17 +85,18 @@ func mergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, nsrc int, g break } - dest <- lowest - nSent++ - lastSent = lowest.Ref + select { + case dest <- lowest: + nSent++ + lastSent = lowest.Ref + case <-ctx.Done(): + return ctx.Err() + case err := <-errch: + return err + } } // If any part returns an error, we return an error. - var retErr error - for _, errch := range errs { - if err := <-errch; err != nil { - retErr = err - } - } - return retErr + errch <- nil + return <-errch } diff --git a/pkg/blobserver/mergedenum_test.go b/pkg/blobserver/mergedenum_test.go index 3b27e5d27..7fe3a26a6 100644 --- a/pkg/blobserver/mergedenum_test.go +++ b/pkg/blobserver/mergedenum_test.go @@ -17,6 +17,7 @@ limitations under the License. package blobserver import ( + "fmt" "reflect" "sort" "testing" @@ -75,6 +76,17 @@ var mergedTests = []struct { limit: 3, want: []string{"foo-a", "foo-b", "foo-c"}, }, + { + // Illustrates deadlock bugfix from + // https://camlistore-review.googlesource.com/5946 + name: "limit2", + srcs: []BlobEnumerator{ + enumBlobRange("foo", 0, buffered+10), + enumBlobRange("bar", 0, buffered+10), + }, + limit: buffered + 10, + want: strRange("bar", 0, buffered+10), + }, { name: "no sources", srcs: []BlobEnumerator{}, @@ -122,6 +134,18 @@ func enumBlobs(v ...string) BlobEnumerator { return testEnum{v} } +func enumBlobRange(base string, start, count int) BlobEnumerator { + return testEnum{strRange(base, start, count)} +} + +func strRange(base string, start, count int) []string { + v := make([]string, count) + for i := 0; i < count; i++ { + v[i] = fmt.Sprintf("%s-%04d", base, start+i) + } + return v +} + type testEnum struct { blobs []string }