diff --git a/lib/go/camli/blobref/chanpeek.go b/lib/go/camli/blobref/chanpeek.go new file mode 100644 index 000000000..854276a8c --- /dev/null +++ b/lib/go/camli/blobref/chanpeek.go @@ -0,0 +1,60 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blobref + +// TODO: use Generics if/when available +type ChanPeeker struct { + Ch <-chan SizedBlobRef + + // A channel should either have a peek value or be closed: + peek *SizedBlobRef + closed bool +} + +func (cp *ChanPeeker) Peek() *SizedBlobRef { + if cp.closed { + return nil + } + if cp.peek != nil { + return cp.peek + } + v, ok := <-cp.Ch + if !ok { + cp.closed = true + return nil + } + cp.peek = &v + return cp.peek +} + +func (cp *ChanPeeker) Closed() bool { + cp.Peek() + return cp.closed +} + +func (cp *ChanPeeker) Take() *SizedBlobRef { + v := cp.Peek() + cp.peek = nil + return v +} + +func (cp *ChanPeeker) ConsumeAll() { + for !cp.Closed() { + cp.Take() + } +} + diff --git a/lib/go/camli/client/sync.go b/lib/go/camli/client/sync.go index b377bd05b..81399c038 100644 --- a/lib/go/camli/client/sync.go +++ b/lib/go/camli/client/sync.go @@ -20,35 +20,6 @@ import ( "camli/blobref" ) -// TODO: use Generics if/when available -type chanPeeker struct { - ch chan blobref.SizedBlobRef - peek *blobref.SizedBlobRef - Closed bool -} - -func (cp *chanPeeker) Peek() *blobref.SizedBlobRef { - if cp.Closed { - return nil - } - if cp.peek != nil { - return cp.peek - } - v, ok := <-cp.ch - if !ok { - cp.Closed = true - return nil - } - cp.peek = &v - return cp.peek -} - -func (cp *chanPeeker) Take() *blobref.SizedBlobRef { - v := cp.Peek() - cp.peek = nil - return v -} - // ListMissingDestinationBlobs reads from 'srcch' and 'dstch' (sorted // enumerations of blobs from two blob servers) and sends to // 'destMissing' any blobs which appear on the source but not at the @@ -56,8 +27,8 @@ func (cp *chanPeeker) Take() *blobref.SizedBlobRef { func ListMissingDestinationBlobs(destMissing, srcch, dstch chan blobref.SizedBlobRef) { defer close(destMissing) - src := &chanPeeker{ch: srcch} - dst := &chanPeeker{ch: dstch} + src := &blobref.ChanPeeker{Ch: srcch} + dst := &blobref.ChanPeeker{Ch: dstch} for src.Peek() != nil { // If the destination has reached its end, anything