Merge "implement camtool sync -thirdleg flag"

This commit is contained in:
Brad Fitzpatrick 2013-03-11 17:41:17 +00:00 committed by Gerrit Code Review
commit c0f7827ab9
1 changed files with 40 additions and 9 deletions

View File

@ -30,8 +30,9 @@ import (
)
type syncCmd struct {
src string
dest string
src string
dest string
third string
loop bool
verbose bool
@ -46,6 +47,7 @@ func init() {
cmd := new(syncCmd)
flags.StringVar(&cmd.src, "src", "", "Source blobserver is either a URL prefix (with optional path), a host[:port], or blank to use the Camlistore client config's default host.")
flags.StringVar(&cmd.dest, "dest", "", "Destination blobserver, or 'stdout' to just enumerate the --src blobs to stdout.")
flags.StringVar(&cmd.third, "thirdleg", "", "Copy blobs present in source but missing from destination to this 'third leg' blob store, instead of the destination.")
flags.BoolVar(&cmd.loop, "loop", false, "Create an associate a new permanode for the uploaded file or directory.")
flags.BoolVar(&cmd.verbose, "verbose", false, "Be verbose.")
@ -98,16 +100,23 @@ func (c *syncCmd) RunCommand(args []string) error {
sc := client.New(src)
sc.SetupAuth()
sc.SetLogger(c.logger)
dc := client.New(c.dest)
dc.SetupAuth()
sc.SetLogger(c.logger)
dc.SetLogger(c.logger)
var tc *client.Client
if c.third != "" {
tc = client.New(c.third)
tc.SetupAuth()
tc.SetLogger(c.logger)
}
passNum := 0
for {
passNum++
stats, err := c.doPass(sc, dc)
stats, err := c.doPass(sc, dc, tc)
if c.verbose {
log.Printf("sync stats - pass: %d, blobs: %d, bytes %d\n", passNum, stats.BlobsCopied, stats.BytesCopied)
}
@ -154,7 +163,7 @@ func (c *syncCmd) syncAll() error {
if c.verbose {
log.Printf("Now syncing: %v -> %v", sh.From, sh.To)
}
stats, err := c.doPass(from, to)
stats, err := c.doPass(from, to, nil)
if c.verbose {
log.Printf("sync stats, blobs: %d, bytes %d\n", stats.BlobsCopied, stats.BytesCopied)
}
@ -180,7 +189,7 @@ func (c *syncCmd) discoClient() *client.Client {
return cl
}
func (c *syncCmd) doPass(sc, dc *client.Client) (stats SyncStats, retErr error) {
func (c *syncCmd) doPass(sc, dc, tc *client.Client) (stats SyncStats, retErr error) {
srcBlobs := make(chan blobref.SizedBlobRef, 100)
destBlobs := make(chan blobref.SizedBlobRef, 100)
srcErr := make(chan error)
@ -220,6 +229,27 @@ func (c *syncCmd) doPass(sc, dc *client.Client) (stats SyncStats, retErr error)
}
mismatches := []*blobref.BlobRef{}
go client.ListMissingDestinationBlobs(destNotHaveBlobs, sizeMismatch, readSrcBlobs, destBlobs)
// Handle three-legged mode if tc is provided.
checkThirdError := func() {} // default nop
syncBlobs := destNotHaveBlobs
uploadClient := dc
if tc != nil {
thirdBlobs := make(chan blobref.SizedBlobRef, 100)
thirdErr := make(chan error)
go func() {
thirdErr <- tc.SimpleEnumerateBlobs(thirdBlobs)
}()
checkThirdError = func() {
if err := <-thirdErr; err != nil {
retErr = fmt.Errorf("Enumerate error from third leg: %v", err)
}
}
thirdNeedBlobs := make(chan blobref.SizedBlobRef)
go client.ListMissingDestinationBlobs(thirdNeedBlobs, sizeMismatch, destNotHaveBlobs, thirdBlobs)
syncBlobs = thirdNeedBlobs
uploadClient = tc
}
For:
for {
select {
@ -228,7 +258,7 @@ For:
log.Printf("WARNING: blobref %v has differing sizes on source and est", br)
stats.ErrorCount++
mismatches = append(mismatches, br)
case sb, ok := <-destNotHaveBlobs:
case sb, ok := <-syncBlobs:
if !ok {
break For
}
@ -247,7 +277,7 @@ For:
continue
}
uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader}
pr, err := dc.Upload(uh)
pr, err := uploadClient.Upload(uh)
if err != nil {
stats.ErrorCount++
log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err)
@ -268,6 +298,7 @@ For:
checkSourceError()
checkDestError()
checkThirdError()
if retErr == nil && stats.ErrorCount > 0 {
retErr = fmt.Errorf("%d errors during sync", stats.ErrorCount)
}