From 053eb73b3db605dc9faee20e753477568b912dc7 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 11 May 2011 08:49:17 -0700 Subject: [PATCH] sync: more stats, better goroutine/chan design --- server/go/camlistored/sync.go | 48 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/server/go/camlistored/sync.go b/server/go/camlistored/sync.go index 62ababcf9..51e536bfd 100644 --- a/server/go/camlistored/sync.go +++ b/server/go/camlistored/sync.go @@ -51,6 +51,7 @@ type SyncHandler struct { recentCopyTime *time.Time totalCopies int64 totalCopyBytes int64 + totalErrors int64 } type timestampedError struct { @@ -101,6 +102,7 @@ func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if sh.recentCopyTime != nil { fmt.Fprintf(rw, "
  • Most recent copy: %s
  • ", sh.recentCopyTime.Format(time.RFC3339)) } + fmt.Fprintf(rw, "
  • Copy errors: %d
  • ", sh.totalErrors) fmt.Fprintf(rw, "") if len(sh.blobStatus) > 0 { @@ -152,6 +154,11 @@ func (sh *SyncHandler) addErrorToLog(err os.Error) { } } +type copyResult struct { + sb blobref.SizedBlobRef + err os.Error +} + func (sh *SyncHandler) syncQueueLoop() { every(queueSyncInterval, func() { Enumerate: @@ -162,24 +169,35 @@ func (sh *SyncHandler) syncQueueLoop() { go func() { errch <- sh.fromq.EnumerateBlobs(enumch, "", 1000, int(queueSyncInterval.Seconds())) }() - nCopied := 0 + nCopied := 0 toCopy := 0 + workch := make(chan blobref.SizedBlobRef, 1000) - wg := new(sync.WaitGroup) + resch := make(chan copyResult, 8) for sb := range enumch { toCopy++ workch <- sb if toCopy <= sh.copierPoolSize { - wg.Add(1) - go func() { - nCopied += sh.copyWorker(workch) - wg.Done() - }() + go sh.copyWorker(resch, workch) } + sh.setStatus("Enumerating queued blobs: %d", toCopy) } close(workch) - wg.Wait() + for i := 0; i < toCopy; i++ { + sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy) + res := <-resch + nCopied++ + sh.lk.Lock() + if res.err == nil { + sh.totalCopies++ + sh.totalCopyBytes += res.sb.Size + sh.recentCopyTime = time.UTC() + } else { + sh.totalErrors++ + } + sh.lk.Unlock() + } if err := <-errch; err != nil { sh.addErrorToLog(fmt.Errorf("replication error for queue %q, enumerate from source: %v", err)) @@ -193,18 +211,10 @@ func (sh *SyncHandler) syncQueueLoop() { }) } -func (sh *SyncHandler) copyWorker(ch chan blobref.SizedBlobRef) (nCopied int) { - for sb := range ch { - if err := sh.copyBlob(sb); err == nil { - nCopied++ - sh.lk.Lock() - sh.totalCopies++ - sh.totalCopyBytes += sb.Size - sh.recentCopyTime = time.UTC() - sh.lk.Unlock() - } +func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blobref.SizedBlobRef) { + for sb := range work { + res <- copyResult{sb, sh.copyBlob(sb)} } - return } type statusFunc func() string