More camsync work.

This commit is contained in:
Brad Fitzpatrick 2011-03-02 20:03:09 -08:00
parent d9bf9a3e14
commit 7381cbf4d1
3 changed files with 96 additions and 18 deletions

View File

@ -21,7 +21,6 @@ import (
"camli/client"
"flag"
"fmt"
// "io"
"log"
"os"
)
@ -30,10 +29,10 @@ import (
var flagLoop = flag.Bool("loop", false, "sync in a loop once done")
var flagVerbose = flag.Bool("verbose", false, "be verbose")
var flagSrcHost = flag.String("srchost", "", "Source host")
var flagSrcPass = flag.String("srcpass", "", "Source password")
var flagDestHost = flag.String("desthost", "", "Destination host")
var flagDestPass = flag.String("destpass", "", "Destination password")
var flagSrc = flag.String("src", "", "Source host")
var flagSrcPass = flag.String("srcpassword", "", "Source password")
var flagDest = flag.String("dest", "", "Destination blobserver, or 'stdout' to just enumerate the --src blobs to stdout")
var flagDestPass = flag.String("destpassword", "", "Destination password")
func usage(err string) {
if err != "" {
@ -43,18 +42,75 @@ func usage(err string) {
os.Exit(2)
}
// TODO: use Generics if/when available
type chanPeeker struct {
ch chan *blobref.SizedBlobRef
peek *blobref.SizedBlobRef
Closed bool
}
func (cp *chanPeeker) Peek() *blobref.SizedBlobRef {
if cp.Closed {
return nil
}
if cp.peek != nil {
return cp.peek
}
cp.peek = <-cp.ch
if closed(cp.ch) {
cp.Closed = true
return nil
}
return cp.peek
}
func (cp *chanPeeker) Take() *blobref.SizedBlobRef {
v := cp.Peek()
cp.peek = nil
return v
}
func yieldMissingDestinationBlobs(destMissing, srcch, dstch chan *blobref.SizedBlobRef) {
defer close(destMissing)
src := &chanPeeker{ch: srcch}
dst := &chanPeeker{ch: dstch}
for src.Peek() != nil {
// If the destination has reached its end, anything
// remaining in the source is needed.
if dst.Peek() == nil {
destMissing <- src.Take()
continue
}
srcStr := src.Peek().BlobRef.String()
dstStr := dst.Peek().BlobRef.String()
switch {
case srcStr == dstStr:
// Skip both
src.Take()
dst.Take()
case srcStr < dstStr:
src.Take()
case srcStr > dstStr:
destMissing <- src.Take()
}
}
}
func main() {
flag.Parse()
if *flagSrcHost == "" {
if *flagSrc == "" {
usage("No srchost specified.")
}
if *flagDestHost == "" {
if *flagDest == "" {
usage("No desthost specified.")
}
sc := client.New(*flagSrcHost, *flagSrcPass)
dc := client.New(*flagDestHost, *flagDestPass)
sc := client.New(*flagSrc, *flagSrcPass)
dc := client.New(*flagDest, *flagDestPass)
var logger *log.Logger = nil
if *flagVerbose {
@ -63,17 +119,36 @@ func main() {
sc.SetLogger(logger)
dc.SetLogger(logger)
ch := make(chan *blobref.SizedBlobRef, 100)
enumErrCh := make(chan os.Error)
srcBlobs := make(chan *blobref.SizedBlobRef, 100)
destBlobs := make(chan *blobref.SizedBlobRef, 100)
srcErr := make(chan os.Error)
destErr := make(chan os.Error)
go func() {
enumErrCh <- sc.EnumerateBlobs(ch)
srcErr <- sc.EnumerateBlobs(srcBlobs)
}()
if *flagDest == "stdout" {
for sb := range srcBlobs {
fmt.Printf("%s %d\n", sb.BlobRef, sb.Size)
}
} else {
go func() {
destErr <- sc.EnumerateBlobs(destBlobs)
}()
for sb := range ch {
fmt.Printf("Got blob: %s\n", sb)
// Merge sort srcBlobs and destBlobs
destNotHaveBlobs := make(chan *blobref.SizedBlobRef, 100)
go yieldMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs)
for sb := range destNotHaveBlobs {
fmt.Printf("Destination needs blob: %s\n", sb)
}
}
if err := <-enumErrCh; err != nil {
if err := <-srcErr; err != nil {
log.Fatalf("Enumerate error from source: %v", err)
}
if *flagDest != "stdout" {
if err := <-destErr; err != nil {
log.Fatalf("Enumerate error from destination: %v", err)
}
}
}

4
dev-camsync Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
./build.pl camsync && \
clients/go/camsync/camsync --verbose --src=http://localhost:3179 --srcpassword=foo $@

View File

@ -32,6 +32,8 @@ const enumerateBatchSize = 10
// Note: closes ch.
func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string) os.Error {
defer close(ch)
error := func(msg string, e os.Error) os.Error {
err := os.NewError(fmt.Sprintf("client enumerate error: %s: %v", msg, e))
c.log.Print(err.String())
@ -42,7 +44,6 @@ func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string
for keepGoing {
url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d",
c.server, http.URLEscape(after), enumerateBatchSize)
c.log.Print("Fetching " + url)
req := c.newRequest("GET", url)
resp, err := http.DefaultClient.Do(req)
if err != nil {
@ -80,8 +81,6 @@ func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string
after, keepGoing = getJsonMapString(json, "continueAfter")
}
close(ch)
return nil
}