mirror of https://github.com/perkeep/perkeep.git
sync: more stats, better goroutine/chan design
This commit is contained in:
parent
408710a4e7
commit
053eb73b3d
|
@ -51,6 +51,7 @@ type SyncHandler struct {
|
|||
recentCopyTime *time.Time
|
||||
totalCopies int64
|
||||
totalCopyBytes int64
|
||||
totalErrors int64
|
||||
}
|
||||
|
||||
type timestampedError struct {
|
||||
|
@ -101,6 +102,7 @@ func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||
if sh.recentCopyTime != nil {
|
||||
fmt.Fprintf(rw, "<li>Most recent copy: %s</li>", sh.recentCopyTime.Format(time.RFC3339))
|
||||
}
|
||||
fmt.Fprintf(rw, "<li>Copy errors: %d</li>", sh.totalErrors)
|
||||
fmt.Fprintf(rw, "</ul>")
|
||||
|
||||
if len(sh.blobStatus) > 0 {
|
||||
|
@ -152,6 +154,11 @@ func (sh *SyncHandler) addErrorToLog(err os.Error) {
|
|||
}
|
||||
}
|
||||
|
||||
type copyResult struct {
|
||||
sb blobref.SizedBlobRef
|
||||
err os.Error
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) syncQueueLoop() {
|
||||
every(queueSyncInterval, func() {
|
||||
Enumerate:
|
||||
|
@ -162,24 +169,35 @@ func (sh *SyncHandler) syncQueueLoop() {
|
|||
go func() {
|
||||
errch <- sh.fromq.EnumerateBlobs(enumch, "", 1000, int(queueSyncInterval.Seconds()))
|
||||
}()
|
||||
nCopied := 0
|
||||
|
||||
nCopied := 0
|
||||
toCopy := 0
|
||||
|
||||
workch := make(chan blobref.SizedBlobRef, 1000)
|
||||
wg := new(sync.WaitGroup)
|
||||
resch := make(chan copyResult, 8)
|
||||
for sb := range enumch {
|
||||
toCopy++
|
||||
workch <- sb
|
||||
if toCopy <= sh.copierPoolSize {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
nCopied += sh.copyWorker(workch)
|
||||
wg.Done()
|
||||
}()
|
||||
go sh.copyWorker(resch, workch)
|
||||
}
|
||||
sh.setStatus("Enumerating queued blobs: %d", toCopy)
|
||||
}
|
||||
close(workch)
|
||||
wg.Wait()
|
||||
for i := 0; i < toCopy; i++ {
|
||||
sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy)
|
||||
res := <-resch
|
||||
nCopied++
|
||||
sh.lk.Lock()
|
||||
if res.err == nil {
|
||||
sh.totalCopies++
|
||||
sh.totalCopyBytes += res.sb.Size
|
||||
sh.recentCopyTime = time.UTC()
|
||||
} else {
|
||||
sh.totalErrors++
|
||||
}
|
||||
sh.lk.Unlock()
|
||||
}
|
||||
|
||||
if err := <-errch; err != nil {
|
||||
sh.addErrorToLog(fmt.Errorf("replication error for queue %q, enumerate from source: %v", err))
|
||||
|
@ -193,18 +211,10 @@ func (sh *SyncHandler) syncQueueLoop() {
|
|||
})
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) copyWorker(ch chan blobref.SizedBlobRef) (nCopied int) {
|
||||
for sb := range ch {
|
||||
if err := sh.copyBlob(sb); err == nil {
|
||||
nCopied++
|
||||
sh.lk.Lock()
|
||||
sh.totalCopies++
|
||||
sh.totalCopyBytes += sb.Size
|
||||
sh.recentCopyTime = time.UTC()
|
||||
sh.lk.Unlock()
|
||||
}
|
||||
func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blobref.SizedBlobRef) {
|
||||
for sb := range work {
|
||||
res <- copyResult{sb, sh.copyBlob(sb)}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type statusFunc func() string
|
||||
|
|
Loading…
Reference in New Issue