more sync work; start of loop and queue removal

This commit is contained in:
Brad Fitzpatrick 2011-03-05 09:53:51 -08:00
parent bc7340d7e7
commit cafaec5dc8
2 changed files with 99 additions and 46 deletions

View File

@ -26,17 +26,20 @@ import (
)
// Things that can be uploaded. (at most one of these)
var flagLoop = flag.Bool("loop", false, "sync in a loop once done")
var flagLoop = flag.Bool("loop", false, "sync in a loop once done; requires --removesrc")
var flagVerbose = flag.Bool("verbose", false, "be verbose")
var flagSrc = flag.String("src", "", "Source host")
var flagSrc = flag.String("src", "", "Source blobserver prefix (generally a mirrored queue partition)")
var flagSrcPass = flag.String("srcpassword", "", "Source password")
var flagDest = flag.String("dest", "", "Destination blobserver, or 'stdout' to just enumerate the --src blobs to stdout")
var flagDestPass = flag.String("destpassword", "", "Destination password")
var flagRemoveSource = flag.Bool("removesrc", false,
"remove each blob from the source after syncing to the destination; for queue processing")
func usage(err string) {
if err != "" {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
fmt.Fprintf(os.Stderr, "Error: %s\n\nUsage:\n", err)
}
flag.PrintDefaults()
os.Exit(2)
@ -46,10 +49,13 @@ func main() {
flag.Parse()
if *flagSrc == "" {
usage("No srchost specified.")
usage("No --src specified.")
}
if *flagDest == "" {
usage("No desthost specified.")
usage("No --dest specified.")
}
if *flagLoop && !*flagRemoveSource {
usage("Can't use --loop without --removesrc")
}
sc := client.New(*flagSrc, *flagSrcPass)
@ -62,58 +68,92 @@ func main() {
sc.SetLogger(logger)
dc.SetLogger(logger)
passNum := 0
for {
passNum++
if err := doPass(sc, dc, passNum); err != nil {
log.Fatalf("sync failed: %v", err)
}
if !*flagLoop {
break
}
}
}
func doPass(sc, dc *client.Client, passNum int) (retErr os.Error) {
srcBlobs := make(chan *blobref.SizedBlobRef, 100)
destBlobs := make(chan *blobref.SizedBlobRef, 100)
srcErr := make(chan os.Error)
destErr := make(chan os.Error)
errorCount := 0
go func() {
srcErr <- sc.EnumerateBlobs(srcBlobs)
}()
checkSourceError := func() {
if err := <-srcErr; err != nil {
retErr = os.NewError(fmt.Sprintf("Enumerate error from source: %v", err))
}
}
if *flagDest == "stdout" {
for sb := range srcBlobs {
fmt.Printf("%s %d\n", sb.BlobRef, sb.Size)
}
} else {
go func() {
destErr <- dc.EnumerateBlobs(destBlobs)
}()
// Merge sort srcBlobs and destBlobs
destNotHaveBlobs := make(chan *blobref.SizedBlobRef, 100)
go client.ListMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs)
for sb := range destNotHaveBlobs {
fmt.Printf("Destination needs blob: %s\n", sb)
blobReader, size, err := sc.Fetch(sb.BlobRef)
if err != nil {
errorCount++
log.Printf("Error fetching %s: %v", sb.BlobRef, err)
continue
}
if size != sb.Size {
errorCount++
log.Printf("Source blobserver's enumerate size of %d for blob %s doesn't match its Get size of %d",
sb.Size, sb.BlobRef, size)
continue
}
uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader}
pr, err := dc.Upload(uh)
if err != nil {
errorCount++
log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err)
continue
}
log.Printf("Put: %v", pr)
}
checkSourceError()
return
}
if err := <-srcErr; err != nil {
log.Fatalf("Enumerate error from source: %v", err)
}
if *flagDest != "stdout" {
go func() {
destErr <- dc.EnumerateBlobs(destBlobs)
}()
checkDestError := func() {
if err := <-destErr; err != nil {
log.Fatalf("Enumerate error from destination: %v", err)
retErr = os.NewError(fmt.Sprintf("Enumerate error from destination: %v", err))
}
}
bytesCopied := int64(0)
blobsCopied := 0
destNotHaveBlobs := make(chan *blobref.SizedBlobRef, 100)
go client.ListMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs)
for sb := range destNotHaveBlobs {
fmt.Printf("Destination needs blob: %s\n", sb)
blobReader, size, err := sc.Fetch(sb.BlobRef)
if err != nil {
errorCount++
log.Printf("Error fetching %s: %v", sb.BlobRef, err)
continue
}
if size != sb.Size {
errorCount++
log.Printf("Source blobserver's enumerate size of %d for blob %s doesn't match its Get size of %d",
sb.Size, sb.BlobRef, size)
continue
}
uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader}
pr, err := dc.Upload(uh)
if err != nil {
errorCount++
log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err)
continue
}
if !pr.Skipped {
blobsCopied++
bytesCopied += pr.Size
}
if *flagRemoveSource {
// TODO: remove from source
}
}
// TODO: return blobsCopied & bytesCopied
checkSourceError()
checkDestError()
if retErr == nil && errorCount > 0 {
retErr = os.NewError(fmt.Sprintf("%d errors during sync", errorCount))
}
return
}

View File

@ -23,16 +23,24 @@ import (
"os"
)
type EnumerateOpts struct {
After string
MaxWaitSec int // max seconds to long poll for, waiting for any blob
}
// Note: closes ch.
func (c *Client) EnumerateBlobs(ch chan *blobref.SizedBlobRef) os.Error {
return c.EnumerateBlobsAfter(ch, "")
return c.EnumerateBlobsOpts(ch, EnumerateOpts{})
}
const enumerateBatchSize = 10
// Note: closes ch.
func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string) os.Error {
func (c *Client) EnumerateBlobsOpts(ch chan *blobref.SizedBlobRef, opts EnumerateOpts) os.Error {
defer close(ch)
if opts.After != "" && opts.MaxWaitSec != 0 {
return os.NewError("client error: it's invalid to use enumerate After and MaxWaitSec together")
}
error := func(msg string, e os.Error) os.Error {
err := os.NewError(fmt.Sprintf("client enumerate error: %s: %v", msg, e))
@ -41,9 +49,14 @@ func (c *Client) EnumerateBlobsAfter(ch chan *blobref.SizedBlobRef, after string
}
keepGoing := true
after := opts.After
for keepGoing {
url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d",
c.server, http.URLEscape(after), enumerateBatchSize)
waitSec := 0
if after == "" {
waitSec = opts.MaxWaitSec
}
url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d",
c.server, http.URLEscape(after), enumerateBatchSize, waitSec)
req := c.newRequest("GET", url)
resp, err := http.DefaultClient.Do(req)
if err != nil {