From bf2e1fa585179bffcbd4bbf2a9394097cdb2a84b Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 17 Mar 2014 23:21:53 -0700 Subject: [PATCH] sync: don't replicate a shard's missing blobs until enumeration is complete Prevents spurious replication of blobs on enumeration error. Change-Id: I38db7406f6ea52137cb757b32599b18eb7fcf3da --- pkg/server/sync.go | 79 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 14 deletions(-) diff --git a/pkg/server/sync.go b/pkg/server/sync.go index 418949db5..02b6da546 100644 --- a/pkg/server/sync.go +++ b/pkg/server/sync.go @@ -705,24 +705,46 @@ func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) { defer ctx.Cancel() src, serrc := sh.startValidatePrefix(ctx, pfx, false) dst, derrc := sh.startValidatePrefix(ctx, pfx, true) - - missing := make(chan blob.SizedRef, 8) - go blobserver.ListMissingDestinationBlobs(missing, func(blob.Ref) {}, src, dst) - for sb := range missing { - sh.mu.Lock() - sh.vmissing++ - sh.mu.Unlock() - // TODO: stats for missing blobs found. - sh.enqueue(sb) + srcErr := &chanError{ + C: serrc, + Wrap: func(err error) error { + return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err) + }, + } + dstErr := &chanError{ + C: derrc, + Wrap: func(err error) error { + return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err) + }, } - if err := <-serrc; err != nil { - return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err) + missingc := make(chan blob.SizedRef, 8) + go blobserver.ListMissingDestinationBlobs(missingc, func(blob.Ref) {}, src, dst) + + var missing []blob.SizedRef + for sb := range missingc { + missing = append(missing, sb) } - if err := <-derrc; err != nil { - return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err) + + if err := srcErr.Get(); err != nil { + return err } - return nil + if err := dstErr.Get(); err != nil { + return err + } + + for _, sb := range missing { + if enqErr := sh.enqueue(sb); enqErr != nil { + if err == nil { + err = enqErr + } + } else { + sh.mu.Lock() + sh.vmissing += 1 + sh.mu.Unlock() + } + } + return err } var errNotPrefix = errors.New("sentinel error: hit blob into the next shard") @@ -775,6 +797,10 @@ func (sh *SyncHandler) startValidatePrefix(ctx *context.Context, pfx string, doD if err == errNotPrefix { err = nil } + if err != nil { + // Send a zero value to shut down ListMissingDestinationBlobs. + c <- blob.SizedRef{} + } errc <- err }() return c, errc @@ -959,3 +985,28 @@ func (sh *SyncHandler) EnumerateBlobs(ctx *context.Context, dest chan<- blob.Siz func (sh *SyncHandler) RemoveBlobs(blobs []blob.Ref) error { panic("Unimplemeted RemoveBlobs") } + +// chanError is a Future around an incoming error channel of one item. +// It can also wrap its error in something more descriptive. +type chanError struct { + C <-chan error + Wrap func(error) error // optional + err error + received bool +} + +func (ce *chanError) Set(err error) { + if ce.Wrap != nil && err != nil { + err = ce.Wrap(err) + } + ce.err = err + ce.received = true +} + +func (ce *chanError) Get() error { + if ce.received { + return ce.err + } + ce.Set(<-ce.C) + return ce.err +}