blobserver/blobpacked, storagetest: fix context goroutine leak

Change-Id: I09abd7ef203fe267219a52a57f8627ce7e28d80d
This commit is contained in:
Brad Fitzpatrick 2014-10-15 12:28:49 +02:00
parent e0edcdeba3
commit 4f9c8aea72
3 changed files with 30 additions and 7 deletions

View File

@ -82,7 +82,6 @@ file will have a different 'wholePartIndex' number, starting at index
package blobpacked
// TODO: BlobStreamer using the zip manifests, for recovery.
// TODO: add test for context goroutine leaks
import (
"bytes"

View File

@ -23,7 +23,9 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"runtime"
"testing"
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
@ -200,6 +202,9 @@ func testPack(t *testing.T,
write func(sto blobserver.Storage) error,
checks ...func(*packTest),
) {
ctx := context.New()
defer ctx.Cancel()
logical := new(test.Fetcher)
small, large := new(test.Fetcher), new(test.Fetcher)
pt := &packTest{
@ -254,7 +259,7 @@ func testPack(t *testing.T,
var zipRefs []blob.Ref
var zipSeen = map[blob.Ref]bool{}
blobserver.EnumerateAll(context.New(), large, func(sb blob.SizedRef) error {
blobserver.EnumerateAll(ctx, large, func(sb blob.SizedRef) error {
zipRefs = append(zipRefs, sb.Ref)
zipSeen[sb.Ref] = true
return nil
@ -325,7 +330,7 @@ func testPack(t *testing.T,
// Verify that each chunk in the logical mapping is in the meta.
logBlobs := 0
if err := blobserver.EnumerateAll(context.New(), logical, func(sb blob.SizedRef) error {
if err := blobserver.EnumerateAll(ctx, logical, func(sb blob.SizedRef) error {
logBlobs++
v, err := pt.sto.meta.Get(blobMetaPrefix + sb.Ref.String())
if err == sorted.ErrNotFound && pt.okayNoMeta[sb.Ref] {
@ -355,8 +360,10 @@ func testPack(t *testing.T,
}
// TODO: so many more tests:
// -- that uploading an identical-but-different-named file doesn't make a new large
// -- that uploading a 49% identical one does.
// -- like TestPackTwoIdenticalfiles, but instead of testing
// no dup for 100% identical file bytes, test that uploading a
// 49% identical one does denormalize and repack.
// -- verify deleting from the source
// -- verify we can reconstruct it all from the zip
// -- verify the meta before & after
@ -396,7 +403,9 @@ func TestSmallFallback(t *testing.T) {
// Enumerate
saw := false
if err := blobserver.EnumerateAll(context.New(), s, func(sb blob.SizedRef) error {
ctx := context.New()
defer ctx.Cancel()
if err := blobserver.EnumerateAll(ctx, s, func(sb blob.SizedRef) error {
if sb != wantSB {
return fmt.Errorf("saw blob %v; want %v", sb, wantSB)
}
@ -409,3 +418,16 @@ func TestSmallFallback(t *testing.T) {
t.Error("didn't see blob in Enumerate")
}
}
func TestZ_LeakCheck(t *testing.T) {
if testing.Short() {
return
}
time.Sleep(50 * time.Millisecond) // let goroutines schedule & die off
buf := make([]byte, 1<<20)
buf = buf[:runtime.Stack(buf, true)]
n := bytes.Count(buf, []byte("[chan receive]:"))
if n > 1 {
t.Errorf("%d goroutines in chan receive: %s", n, buf)
}
}

View File

@ -254,7 +254,9 @@ func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts .
var grp syncutil.Group
sawEnd := make(chan bool, 1)
grp.Go(func() error {
if err := sto.EnumerateBlobs(context.New(), sbc, after, n); err != nil {
ctx := context.New()
defer ctx.Cancel()
if err := sto.EnumerateBlobs(ctx, sbc, after, n); err != nil {
return fmt.Errorf("EnumerateBlobs(%q, %d): %v", after, n, err)
}
return nil