sync: don't replicate a shard's missing blobs until enumeration is complete

Prevents spurious replication of blobs on enumeration error.

Change-Id: I38db7406f6ea52137cb757b32599b18eb7fcf3da
This commit is contained in:
Brad Fitzpatrick 2014-03-17 23:21:53 -07:00
parent bf2d86738e
commit bf2e1fa585
1 changed files with 65 additions and 14 deletions

View File

@ -705,24 +705,46 @@ func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) {
defer ctx.Cancel()
src, serrc := sh.startValidatePrefix(ctx, pfx, false)
dst, derrc := sh.startValidatePrefix(ctx, pfx, true)
missing := make(chan blob.SizedRef, 8)
go blobserver.ListMissingDestinationBlobs(missing, func(blob.Ref) {}, src, dst)
for sb := range missing {
sh.mu.Lock()
sh.vmissing++
sh.mu.Unlock()
// TODO: stats for missing blobs found.
sh.enqueue(sb)
srcErr := &chanError{
C: serrc,
Wrap: func(err error) error {
return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err)
},
}
dstErr := &chanError{
C: derrc,
Wrap: func(err error) error {
return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err)
},
}
if err := <-serrc; err != nil {
return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err)
missingc := make(chan blob.SizedRef, 8)
go blobserver.ListMissingDestinationBlobs(missingc, func(blob.Ref) {}, src, dst)
var missing []blob.SizedRef
for sb := range missingc {
missing = append(missing, sb)
}
if err := <-derrc; err != nil {
return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err)
if err := srcErr.Get(); err != nil {
return err
}
return nil
if err := dstErr.Get(); err != nil {
return err
}
for _, sb := range missing {
if enqErr := sh.enqueue(sb); enqErr != nil {
if err == nil {
err = enqErr
}
} else {
sh.mu.Lock()
sh.vmissing += 1
sh.mu.Unlock()
}
}
return err
}
var errNotPrefix = errors.New("sentinel error: hit blob into the next shard")
@ -775,6 +797,10 @@ func (sh *SyncHandler) startValidatePrefix(ctx *context.Context, pfx string, doD
if err == errNotPrefix {
err = nil
}
if err != nil {
// Send a zero value to shut down ListMissingDestinationBlobs.
c <- blob.SizedRef{}
}
errc <- err
}()
return c, errc
@ -959,3 +985,28 @@ func (sh *SyncHandler) EnumerateBlobs(ctx *context.Context, dest chan<- blob.Siz
func (sh *SyncHandler) RemoveBlobs(blobs []blob.Ref) error {
panic("Unimplemeted RemoveBlobs")
}
// chanError is a Future around an incoming error channel of one item.
// It can also wrap its error in something more descriptive.
type chanError struct {
C <-chan error
Wrap func(error) error // optional
err error
received bool
}
func (ce *chanError) Set(err error) {
if ce.Wrap != nil && err != nil {
err = ce.Wrap(err)
}
ce.err = err
ce.received = true
}
func (ce *chanError) Get() error {
if ce.received {
return ce.err
}
ce.Set(<-ce.C)
return ce.err
}