diff --git a/clients/go/camsync/camsync.go b/clients/go/camsync/camsync.go index 89823e070..363e41b08 100644 --- a/clients/go/camsync/camsync.go +++ b/clients/go/camsync/camsync.go @@ -21,7 +21,6 @@ import ( "camli/client" "flag" "fmt" -// "io" "log" "os" ) @@ -30,10 +29,10 @@ import ( var flagLoop = flag.Bool("loop", false, "sync in a loop once done") var flagVerbose = flag.Bool("verbose", false, "be verbose") -var flagSrcHost = flag.String("srchost", "", "Source host") -var flagSrcPass = flag.String("srcpass", "", "Source password") -var flagDestHost = flag.String("desthost", "", "Destination host") -var flagDestPass = flag.String("destpass", "", "Destination password") +var flagSrc = flag.String("src", "", "Source host") +var flagSrcPass = flag.String("srcpassword", "", "Source password") +var flagDest = flag.String("dest", "", "Destination blobserver, or 'stdout' to just enumerate the --src blobs to stdout") +var flagDestPass = flag.String("destpassword", "", "Destination password") func usage(err string) { if err != "" { @@ -43,18 +42,75 @@ func usage(err string) { os.Exit(2) } +// TODO: use Generics if/when available +type chanPeeker struct { + ch chan *blobref.SizedBlobRef + peek *blobref.SizedBlobRef + Closed bool +} + +func (cp *chanPeeker) Peek() *blobref.SizedBlobRef { + if cp.Closed { + return nil + } + if cp.peek != nil { + return cp.peek + } + cp.peek = <-cp.ch + if closed(cp.ch) { + cp.Closed = true + return nil + } + return cp.peek +} + +func (cp *chanPeeker) Take() *blobref.SizedBlobRef { + v := cp.Peek() + cp.peek = nil + return v +} + +func yieldMissingDestinationBlobs(destMissing, srcch, dstch chan *blobref.SizedBlobRef) { + defer close(destMissing) + + src := &chanPeeker{ch: srcch} + dst := &chanPeeker{ch: dstch} + + for src.Peek() != nil { + // If the destination has reached its end, anything + // remaining in the source is needed. + if dst.Peek() == nil { + destMissing <- src.Take() + continue + } + + srcStr := src.Peek().BlobRef.String() + dstStr := dst.Peek().BlobRef.String() + switch { + case srcStr == dstStr: + // Skip both + src.Take() + dst.Take() + case srcStr < dstStr: + src.Take() + case srcStr > dstStr: + destMissing <- src.Take() + } + } +} + func main() { flag.Parse() - if *flagSrcHost == "" { + if *flagSrc == "" { usage("No srchost specified.") } - if *flagDestHost == "" { + if *flagDest == "" { usage("No desthost specified.") } - sc := client.New(*flagSrcHost, *flagSrcPass) - dc := client.New(*flagDestHost, *flagDestPass) + sc := client.New(*flagSrc, *flagSrcPass) + dc := client.New(*flagDest, *flagDestPass) var logger *log.Logger = nil if *flagVerbose { @@ -63,17 +119,36 @@ func main() { sc.SetLogger(logger) dc.SetLogger(logger) - ch := make(chan *blobref.SizedBlobRef, 100) - enumErrCh := make(chan os.Error) + srcBlobs := make(chan *blobref.SizedBlobRef, 100) + destBlobs := make(chan *blobref.SizedBlobRef, 100) + srcErr := make(chan os.Error) + destErr := make(chan os.Error) go func() { - enumErrCh <- sc.EnumerateBlobs(ch) + srcErr <- sc.EnumerateBlobs(srcBlobs) }() + if *flagDest == "stdout" { + for sb := range srcBlobs { + fmt.Printf("%s %d\n", sb.BlobRef, sb.Size) + } + } else { + go func() { + destErr <- sc.EnumerateBlobs(destBlobs) + }() - for sb := range ch { - fmt.Printf("Got blob: %s\n", sb) + // Merge sort srcBlobs and destBlobs + destNotHaveBlobs := make(chan *blobref.SizedBlobRef, 100) + go yieldMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs) + for sb := range destNotHaveBlobs { + fmt.Printf("Destination needs blob: %s\n", sb) + } } - if err := <-enumErrCh; err != nil { + if err := <-srcErr; err != nil { log.Fatalf("Enumerate error from source: %v", err) } + if *flagDest != "stdout" { + if err := <-destErr; err != nil { + log.Fatalf("Enumerate error from destination: %v", err) + } + } } diff --git a/dev-camsync b/dev-camsync new file mode 100755 index 000000000..a449ef4aa --- /dev/null +++ b/dev-camsync @@ -0,0 +1,4 @@ +#!/bin/sh + +./build.pl camsync && \ + clients/go/camsync/camsync --verbose --src=http://localhost:3179 --srcpassword=foo $@ diff --git a/lib/go/client/enumerate.go b/lib/go/client/enumerate.go index 5fc349ff9..166485b16 100644 --- a/lib/go/client/enumerate.go +++ b/lib/go/client/enumerate.go @@ -32,6 +32,8 @@ const enumerateBatchSize = 10 // Note: closes ch. func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string) os.Error { + defer close(ch) + error := func(msg string, e os.Error) os.Error { err := os.NewError(fmt.Sprintf("client enumerate error: %s: %v", msg, e)) c.log.Print(err.String()) @@ -42,7 +44,6 @@ func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string for keepGoing { url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d", c.server, http.URLEscape(after), enumerateBatchSize) - c.log.Print("Fetching " + url) req := c.newRequest("GET", url) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -80,8 +81,6 @@ func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string after, keepGoing = getJsonMapString(json, "continueAfter") } - - close(ch) return nil }