blobserver: give ListMissingDestinationBlobs the ability to exit earlier

If there's an enumeration error for src or dst, sending a zero value
will shut down ListMissingDestinationBlobs.

Change-Id: I95598e9dc2607610436faa06f40485d1abd2342f
This commit is contained in:
Brad Fitzpatrick 2014-03-17 23:20:37 -07:00
parent bf1ec32e39
commit bf2d86738e
1 changed files with 18 additions and 7 deletions

View File

@ -21,7 +21,12 @@ import "camlistore.org/pkg/blob"
// ListMissingDestinationBlobs reads from 'srcch' and 'dstch' (sorted // ListMissingDestinationBlobs reads from 'srcch' and 'dstch' (sorted
// enumerations of blobs from two blob servers) and sends to // enumerations of blobs from two blob servers) and sends to
// 'destMissing' any blobs which appear on the source but not at the // 'destMissing' any blobs which appear on the source but not at the
// destination. destMissing is closed at the end. // destination.
//
// destMissing is closed at the end.
//
// If an invalid (zero) blob from srcch or dstch arrives,
// ListMissingDestinationBlobs stops.
func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch func(blob.Ref), srcch, dstch <-chan blob.SizedRef) { func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch func(blob.Ref), srcch, dstch <-chan blob.SizedRef) {
defer close(destMissing) defer close(destMissing)
@ -29,10 +34,13 @@ func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch
dst := &blob.ChanPeeker{Ch: dstch} dst := &blob.ChanPeeker{Ch: dstch}
for { for {
_, ok := src.Peek() srcSized, ok := src.Peek()
if !ok { if !ok {
break break
} }
if !srcSized.Ref.Valid() {
return
}
// If the destination has reached its end, anything // If the destination has reached its end, anything
// remaining in the source is needed. // remaining in the source is needed.
@ -41,20 +49,23 @@ func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch
continue continue
} }
srcStr := src.MustPeek().Ref srcBlob := src.MustPeek().Ref
dstStr := dst.MustPeek().Ref dstRef := dst.MustPeek().Ref
if !dstRef.Valid() {
return
}
switch { switch {
case srcStr == dstStr: case srcBlob == dstRef:
// Skip both // Skip both
sb := src.MustTake() sb := src.MustTake()
db := dst.MustTake() db := dst.MustTake()
if sb.Size != db.Size { if sb.Size != db.Size {
sizeMismatch(sb.Ref) sizeMismatch(sb.Ref)
} }
case srcStr.Less(dstStr): case srcBlob.Less(dstRef):
destMissing <- src.MustTake() destMissing <- src.MustTake()
case dstStr.Less(srcStr): case dstRef.Less(srcBlob):
dst.Take() dst.Take()
} }
} }