mirror of https://github.com/perkeep/perkeep.git
Merge "clients/go/camsync: return stats struct and use them in verbose mode"
This commit is contained in:
commit
8ac1935c9c
|
@ -37,6 +37,12 @@ var flagDestPass = flag.String("destpassword", "", "Destination password")
|
|||
var flagRemoveSource = flag.Bool("removesrc", false,
|
||||
"remove each blob from the source after syncing to the destination; for queue processing")
|
||||
|
||||
type SyncStats struct {
|
||||
BlobsCopied int
|
||||
BytesCopied int64
|
||||
ErrorCount int
|
||||
}
|
||||
|
||||
func usage(err string) {
|
||||
if err != "" {
|
||||
fmt.Fprintf(os.Stderr, "Error: %s\n\nUsage:\n", err)
|
||||
|
@ -71,21 +77,24 @@ func main() {
|
|||
passNum := 0
|
||||
for {
|
||||
passNum++
|
||||
if err := doPass(sc, dc, passNum); err != nil {
|
||||
stats, err := doPass(sc, dc, passNum)
|
||||
if err != nil {
|
||||
log.Fatalf("sync failed: %v", err)
|
||||
}
|
||||
if *flagVerbose {
|
||||
log.Printf("sync stats - pass: %d, blobs: %d, bytes %d\n", passNum, stats.BlobsCopied, stats.BytesCopied)
|
||||
}
|
||||
if !*flagLoop {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func doPass(sc, dc *client.Client, passNum int) (retErr os.Error) {
|
||||
func doPass(sc, dc *client.Client, passNum int) (stats SyncStats, retErr os.Error) {
|
||||
srcBlobs := make(chan blobref.SizedBlobRef, 100)
|
||||
destBlobs := make(chan blobref.SizedBlobRef, 100)
|
||||
srcErr := make(chan os.Error)
|
||||
destErr := make(chan os.Error)
|
||||
errorCount := 0
|
||||
|
||||
go func() {
|
||||
srcErr <- sc.EnumerateBlobs(srcBlobs)
|
||||
|
@ -113,9 +122,6 @@ func doPass(sc, dc *client.Client, passNum int) (retErr os.Error) {
|
|||
}
|
||||
}
|
||||
|
||||
bytesCopied := int64(0)
|
||||
blobsCopied := 0
|
||||
|
||||
destNotHaveBlobs := make(chan blobref.SizedBlobRef, 100)
|
||||
go client.ListMissingDestinationBlobs(destNotHaveBlobs, srcBlobs, destBlobs)
|
||||
for sb := range destNotHaveBlobs {
|
||||
|
@ -123,12 +129,12 @@ func doPass(sc, dc *client.Client, passNum int) (retErr os.Error) {
|
|||
|
||||
blobReader, size, err := sc.FetchStreaming(sb.BlobRef)
|
||||
if err != nil {
|
||||
errorCount++
|
||||
stats.ErrorCount++
|
||||
log.Printf("Error fetching %s: %v", sb.BlobRef, err)
|
||||
continue
|
||||
}
|
||||
if size != sb.Size {
|
||||
errorCount++
|
||||
stats.ErrorCount++
|
||||
log.Printf("Source blobserver's enumerate size of %d for blob %s doesn't match its Get size of %d",
|
||||
sb.Size, sb.BlobRef, size)
|
||||
continue
|
||||
|
@ -136,27 +142,26 @@ func doPass(sc, dc *client.Client, passNum int) (retErr os.Error) {
|
|||
uh := &client.UploadHandle{BlobRef: sb.BlobRef, Size: size, Contents: blobReader}
|
||||
pr, err := dc.Upload(uh)
|
||||
if err != nil {
|
||||
errorCount++
|
||||
stats.ErrorCount++
|
||||
log.Printf("Upload of %s to destination blobserver failed: %v", sb.BlobRef, err)
|
||||
continue
|
||||
}
|
||||
if !pr.Skipped {
|
||||
blobsCopied++
|
||||
bytesCopied += pr.Size
|
||||
stats.BlobsCopied++
|
||||
stats.BytesCopied += pr.Size
|
||||
}
|
||||
if *flagRemoveSource {
|
||||
if err = sc.RemoveBlob(sb.BlobRef); err != nil {
|
||||
errorCount++
|
||||
stats.ErrorCount++
|
||||
log.Printf("Failed to delete %s from source: %v", sb.BlobRef, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: return blobsCopied & bytesCopied
|
||||
checkSourceError()
|
||||
checkDestError()
|
||||
if retErr == nil && errorCount > 0 {
|
||||
retErr = os.NewError(fmt.Sprintf("%d errors during sync", errorCount))
|
||||
if retErr == nil && stats.ErrorCount > 0 {
|
||||
retErr = os.NewError(fmt.Sprintf("%d errors during sync", stats.ErrorCount))
|
||||
}
|
||||
return
|
||||
return stats, retErr
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue