From cafaec5dc8c957b6380a062cf1c1e0d71cfba790 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 5 Mar 2011 09:53:51 -0800 Subject: [PATCH] more sync work; start of loop and queue removal --- clients/go/camsync/camsync.go | 124 ++++++++++++++++++++++------------ lib/go/client/enumerate.go | 21 ++++-- 2 files changed, 99 insertions(+), 46 deletions(-) diff --git a/clients/go/camsync/camsync.go b/clients/go/camsync/camsync.go index ba2976f58..e1be3eb4a 100644 --- a/clients/go/camsync/camsync.go +++ b/clients/go/camsync/camsync.go @@ -26,17 +26,20 @@ import ( ) // Things that can be uploaded. (at most one of these) -var flagLoop = flag.Bool("loop", false, "sync in a loop once done") +var flagLoop = flag.Bool("loop", false, "sync in a loop once done; requires --removesrc") var flagVerbose = flag.Bool("verbose", false, "be verbose") -var flagSrc = flag.String("src", "", "Source host") +var flagSrc = flag.String("src", "", "Source blobserver prefix (generally a mirrored queue partition)") var flagSrcPass = flag.String("srcpassword", "", "Source password") var flagDest = flag.String("dest", "", "Destination blobserver, or 'stdout' to just enumerate the --src blobs to stdout") var flagDestPass = flag.String("destpassword", "", "Destination password") +var flagRemoveSource = flag.Bool("removesrc", false, + "remove each blob from the source after syncing to the destination; for queue processing") + func usage(err string) { if err != "" { - fmt.Fprintf(os.Stderr, "Error: %s\n", err) + fmt.Fprintf(os.Stderr, "Error: %s\n\nUsage:\n", err) } flag.PrintDefaults() os.Exit(2) @@ -46,10 +49,13 @@ func main() { flag.Parse() if *flagSrc == "" { - usage("No srchost specified.") + usage("No --src specified.") } if *flagDest == "" { - usage("No desthost specified.") + usage("No --dest specified.") + } + if *flagLoop && !*flagRemoveSource { + usage("Can't use --loop without --removesrc") } sc := client.New(*flagSrc, *flagSrcPass) @@ -62,58 +68,92 @@ func main() { sc.SetLogger(logger) dc.SetLogger(logger) + passNum := 0 + for { + passNum++ + if err := doPass(sc, dc, passNum); err != nil { + log.Fatalf("sync failed: %v", err) + } + if !*flagLoop { + break + } + } +} + +func doPass(sc, dc *client.Client, passNum int) (retErr os.Error) { srcBlobs := make(chan *blobref.SizedBlobRef, 100) destBlobs := make(chan *blobref.SizedBlobRef, 100) srcErr := make(chan os.Error) destErr := make(chan os.Error) errorCount := 0 + go func() { srcErr <- sc.EnumerateBlobs(srcBlobs) }() + checkSourceError := func() { + if err := <-srcErr; err != nil { + retErr = os.NewError(fmt.Sprintf("Enumerate error from source: %v", err)) + } + } + if *flagDest == "stdout" { for sb := range srcBlobs { fmt.Printf("%s %d\n", sb.BlobRef, sb.Size) } - } else { - go func() { - destErr <- dc.EnumerateBlobs(destBlobs) - }() - - // Merge sort srcBlobs and destBlobs - destNotHaveBlobs := make(chan *blobref.SizedBlobRef, 100) - go client.ListMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs) - for sb := range destNotHaveBlobs { - fmt.Printf("Destination needs blob: %s\n", sb) - - blobReader, size, err := sc.Fetch(sb.BlobRef) - if err != nil { - errorCount++ - log.Printf("Error fetching %s: %v", sb.BlobRef, err) - continue - } - if size != sb.Size { - errorCount++ - log.Printf("Source blobserver's enumerate size of %d for blob %s doesn't match its Get size of %d", - sb.Size, sb.BlobRef, size) - continue - } - uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader} - pr, err := dc.Upload(uh) - if err != nil { - errorCount++ - log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err) - continue - } - log.Printf("Put: %v", pr) - } + checkSourceError() + return } - if err := <-srcErr; err != nil { - log.Fatalf("Enumerate error from source: %v", err) - } - if *flagDest != "stdout" { + go func() { + destErr <- dc.EnumerateBlobs(destBlobs) + }() + checkDestError := func() { if err := <-destErr; err != nil { - log.Fatalf("Enumerate error from destination: %v", err) + retErr = os.NewError(fmt.Sprintf("Enumerate error from destination: %v", err)) } } + + bytesCopied := int64(0) + blobsCopied := 0 + + destNotHaveBlobs := make(chan *blobref.SizedBlobRef, 100) + go client.ListMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs) + for sb := range destNotHaveBlobs { + fmt.Printf("Destination needs blob: %s\n", sb) + + blobReader, size, err := sc.Fetch(sb.BlobRef) + if err != nil { + errorCount++ + log.Printf("Error fetching %s: %v", sb.BlobRef, err) + continue + } + if size != sb.Size { + errorCount++ + log.Printf("Source blobserver's enumerate size of %d for blob %s doesn't match its Get size of %d", + sb.Size, sb.BlobRef, size) + continue + } + uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader} + pr, err := dc.Upload(uh) + if err != nil { + errorCount++ + log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err) + continue + } + if !pr.Skipped { + blobsCopied++ + bytesCopied += pr.Size + } + if *flagRemoveSource { + // TODO: remove from source + } + } + + // TODO: return blobsCopied & bytesCopied + checkSourceError() + checkDestError() + if retErr == nil && errorCount > 0 { + retErr = os.NewError(fmt.Sprintf("%d errors during sync", errorCount)) + } + return } diff --git a/lib/go/client/enumerate.go b/lib/go/client/enumerate.go index 166485b16..caa4c8371 100644 --- a/lib/go/client/enumerate.go +++ b/lib/go/client/enumerate.go @@ -23,16 +23,24 @@ import ( "os" ) +type EnumerateOpts struct { + After string + MaxWaitSec int // max seconds to long poll for, waiting for any blob +} + // Note: closes ch. func (c *Client) EnumerateBlobs(ch chan *blobref.SizedBlobRef) os.Error { - return c.EnumerateBlobsAfter(ch, "") + return c.EnumerateBlobsOpts(ch, EnumerateOpts{}) } const enumerateBatchSize = 10 // Note: closes ch. -func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string) os.Error { +func (c *Client) EnumerateBlobsOpts(ch chan *blobref.SizedBlobRef, opts EnumerateOpts) os.Error { defer close(ch) + if opts.After != "" && opts.MaxWaitSec != 0 { + return os.NewError("client error: it's invalid to use enumerate After and MaxWaitSec together") + } error := func(msg string, e os.Error) os.Error { err := os.NewError(fmt.Sprintf("client enumerate error: %s: %v", msg, e)) @@ -41,9 +49,14 @@ func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string } keepGoing := true + after := opts.After for keepGoing { - url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d", - c.server, http.URLEscape(after), enumerateBatchSize) + waitSec := 0 + if after == "" { + waitSec = opts.MaxWaitSec + } + url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d", + c.server, http.URLEscape(after), enumerateBatchSize, waitSec) req := c.newRequest("GET", url) resp, err := http.DefaultClient.Do(req) if err != nil {