diff --git a/server/go/camlistored/sync.go b/server/go/camlistored/sync.go
index 62ababcf9..51e536bfd 100644
--- a/server/go/camlistored/sync.go
+++ b/server/go/camlistored/sync.go
@@ -51,6 +51,7 @@ type SyncHandler struct {
recentCopyTime *time.Time
totalCopies int64
totalCopyBytes int64
+ totalErrors int64
}
type timestampedError struct {
@@ -101,6 +102,7 @@ func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if sh.recentCopyTime != nil {
fmt.Fprintf(rw, "
Most recent copy: %s", sh.recentCopyTime.Format(time.RFC3339))
}
+ fmt.Fprintf(rw, "Copy errors: %d", sh.totalErrors)
fmt.Fprintf(rw, "")
if len(sh.blobStatus) > 0 {
@@ -152,6 +154,11 @@ func (sh *SyncHandler) addErrorToLog(err os.Error) {
}
}
+type copyResult struct {
+ sb blobref.SizedBlobRef
+ err os.Error
+}
+
func (sh *SyncHandler) syncQueueLoop() {
every(queueSyncInterval, func() {
Enumerate:
@@ -162,24 +169,35 @@ func (sh *SyncHandler) syncQueueLoop() {
go func() {
errch <- sh.fromq.EnumerateBlobs(enumch, "", 1000, int(queueSyncInterval.Seconds()))
}()
- nCopied := 0
+ nCopied := 0
toCopy := 0
+
workch := make(chan blobref.SizedBlobRef, 1000)
- wg := new(sync.WaitGroup)
+ resch := make(chan copyResult, 8)
for sb := range enumch {
toCopy++
workch <- sb
if toCopy <= sh.copierPoolSize {
- wg.Add(1)
- go func() {
- nCopied += sh.copyWorker(workch)
- wg.Done()
- }()
+ go sh.copyWorker(resch, workch)
}
+ sh.setStatus("Enumerating queued blobs: %d", toCopy)
}
close(workch)
- wg.Wait()
+ for i := 0; i < toCopy; i++ {
+ sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy)
+ res := <-resch
+ nCopied++
+ sh.lk.Lock()
+ if res.err == nil {
+ sh.totalCopies++
+ sh.totalCopyBytes += res.sb.Size
+ sh.recentCopyTime = time.UTC()
+ } else {
+ sh.totalErrors++
+ }
+ sh.lk.Unlock()
+ }
if err := <-errch; err != nil {
sh.addErrorToLog(fmt.Errorf("replication error for queue %q, enumerate from source: %v", err))
@@ -193,18 +211,10 @@ func (sh *SyncHandler) syncQueueLoop() {
})
}
-func (sh *SyncHandler) copyWorker(ch chan blobref.SizedBlobRef) (nCopied int) {
- for sb := range ch {
- if err := sh.copyBlob(sb); err == nil {
- nCopied++
- sh.lk.Lock()
- sh.totalCopies++
- sh.totalCopyBytes += sb.Size
- sh.recentCopyTime = time.UTC()
- sh.lk.Unlock()
- }
+func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blobref.SizedBlobRef) {
+ for sb := range work {
+ res <- copyResult{sb, sh.copyBlob(sb)}
}
- return
}
type statusFunc func() string