diff --git a/cmd/camtool/sync.go b/cmd/camtool/sync.go index ef2cb56d2..502f0a6aa 100644 --- a/cmd/camtool/sync.go +++ b/cmd/camtool/sync.go @@ -30,8 +30,9 @@ import ( ) type syncCmd struct { - src string - dest string + src string + dest string + third string loop bool verbose bool @@ -46,6 +47,7 @@ func init() { cmd := new(syncCmd) flags.StringVar(&cmd.src, "src", "", "Source blobserver is either a URL prefix (with optional path), a host[:port], or blank to use the Camlistore client config's default host.") flags.StringVar(&cmd.dest, "dest", "", "Destination blobserver, or 'stdout' to just enumerate the --src blobs to stdout.") + flags.StringVar(&cmd.third, "thirdleg", "", "Copy blobs present in source but missing from destination to this 'third leg' blob store, instead of the destination.") flags.BoolVar(&cmd.loop, "loop", false, "Create an associate a new permanode for the uploaded file or directory.") flags.BoolVar(&cmd.verbose, "verbose", false, "Be verbose.") @@ -98,16 +100,23 @@ func (c *syncCmd) RunCommand(args []string) error { sc := client.New(src) sc.SetupAuth() + sc.SetLogger(c.logger) + dc := client.New(c.dest) dc.SetupAuth() - - sc.SetLogger(c.logger) dc.SetLogger(c.logger) + var tc *client.Client + if c.third != "" { + tc = client.New(c.third) + tc.SetupAuth() + tc.SetLogger(c.logger) + } + passNum := 0 for { passNum++ - stats, err := c.doPass(sc, dc) + stats, err := c.doPass(sc, dc, tc) if c.verbose { log.Printf("sync stats - pass: %d, blobs: %d, bytes %d\n", passNum, stats.BlobsCopied, stats.BytesCopied) } @@ -154,7 +163,7 @@ func (c *syncCmd) syncAll() error { if c.verbose { log.Printf("Now syncing: %v -> %v", sh.From, sh.To) } - stats, err := c.doPass(from, to) + stats, err := c.doPass(from, to, nil) if c.verbose { log.Printf("sync stats, blobs: %d, bytes %d\n", stats.BlobsCopied, stats.BytesCopied) } @@ -180,7 +189,7 @@ func (c *syncCmd) discoClient() *client.Client { return cl } -func (c *syncCmd) doPass(sc, dc *client.Client) (stats SyncStats, retErr error) { +func (c *syncCmd) doPass(sc, dc, tc *client.Client) (stats SyncStats, retErr error) { srcBlobs := make(chan blobref.SizedBlobRef, 100) destBlobs := make(chan blobref.SizedBlobRef, 100) srcErr := make(chan error) @@ -220,6 +229,27 @@ func (c *syncCmd) doPass(sc, dc *client.Client) (stats SyncStats, retErr error) } mismatches := []*blobref.BlobRef{} go client.ListMissingDestinationBlobs(destNotHaveBlobs, sizeMismatch, readSrcBlobs, destBlobs) + + // Handle three-legged mode if tc is provided. + checkThirdError := func() {} // default nop + syncBlobs := destNotHaveBlobs + uploadClient := dc + if tc != nil { + thirdBlobs := make(chan blobref.SizedBlobRef, 100) + thirdErr := make(chan error) + go func() { + thirdErr <- tc.SimpleEnumerateBlobs(thirdBlobs) + }() + checkThirdError = func() { + if err := <-thirdErr; err != nil { + retErr = fmt.Errorf("Enumerate error from third leg: %v", err) + } + } + thirdNeedBlobs := make(chan blobref.SizedBlobRef) + go client.ListMissingDestinationBlobs(thirdNeedBlobs, sizeMismatch, destNotHaveBlobs, thirdBlobs) + syncBlobs = thirdNeedBlobs + uploadClient = tc + } For: for { select { @@ -228,7 +258,7 @@ For: log.Printf("WARNING: blobref %v has differing sizes on source and est", br) stats.ErrorCount++ mismatches = append(mismatches, br) - case sb, ok := <-destNotHaveBlobs: + case sb, ok := <-syncBlobs: if !ok { break For } @@ -247,7 +277,7 @@ For: continue } uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader} - pr, err := dc.Upload(uh) + pr, err := uploadClient.Upload(uh) if err != nil { stats.ErrorCount++ log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err) @@ -268,6 +298,7 @@ For: checkSourceError() checkDestError() + checkThirdError() if retErr == nil && stats.ErrorCount > 0 { retErr = fmt.Errorf("%d errors during sync", stats.ErrorCount) }