Fix data race in reindexing.

See https://camlistore.org/issue/331

Change-Id: I210ae4e1779b7fe248196152a6acbc5728051249
This commit is contained in:
Brad Fitzpatrick 2014-01-08 20:36:17 -08:00
parent a00d964169
commit 4365f69330
2 changed files with 11 additions and 1 deletions

5
TESTS
View File

@ -1,5 +1,10 @@
Tests needed Tests needed
-- integration test of reindexing + race detector
-- support for running race detector on all tests. when in race mode, also run
integration test children in race mode.
-- test that server/camlistored still builds & starts even when sqlite isn't -- test that server/camlistored still builds & starts even when sqlite isn't
available (TODO: hide it from the test by running make.go in a child available (TODO: hide it from the test by running make.go in a child
process with a faked-out PKG_CONFIG environment or something, to make process with a faked-out PKG_CONFIG environment or something, to make

View File

@ -142,11 +142,11 @@ func (x *Index) Reindex() error {
nerr := 0 nerr := 0
blobc := make(chan blob.Ref, 32) blobc := make(chan blob.Ref, 32)
defer close(blobc)
enumCtx := ctx.New() enumCtx := ctx.New()
enumErr := make(chan error, 1) enumErr := make(chan error, 1)
go func() { go func() {
defer close(blobc)
donec := enumCtx.Done() donec := enumCtx.Done()
var lastTick time.Time var lastTick time.Time
enumErr <- blobserver.EnumerateAll(enumCtx, x.BlobSource, func(sb blob.SizedRef) error { enumErr <- blobserver.EnumerateAll(enumCtx, x.BlobSource, func(sb blob.SizedRef) error {
@ -167,8 +167,11 @@ func (x *Index) Reindex() error {
}) })
}() }()
const j = 4 // arbitrary concurrency level const j = 4 // arbitrary concurrency level
var wg sync.WaitGroup
for i := 0; i < j; i++ { for i := 0; i < j; i++ {
wg.Add(1)
go func() { go func() {
defer wg.Done()
for br := range blobc { for br := range blobc {
if err := x.reindex(br); err != nil { if err := x.reindex(br); err != nil {
log.Printf("Error reindexing %v: %v", br, err) log.Printf("Error reindexing %v: %v", br, err)
@ -184,6 +187,8 @@ func (x *Index) Reindex() error {
if err := <-enumErr; err != nil { if err := <-enumErr; err != nil {
return err return err
} }
wg.Wait()
log.Printf("Index rebuild complete.") log.Printf("Index rebuild complete.")
nerrmu.Lock() // no need to unlock nerrmu.Lock() // no need to unlock
if nerr != 0 { if nerr != 0 {