mirror of https://github.com/perkeep/perkeep.git
camput: add an unbounded mode to chanworker.
Change-Id: I90040fc6a67c88ae54338ab9556295861170a871
This commit is contained in:
parent
e731931edd
commit
058434449c
|
@ -21,6 +21,7 @@ package main
|
|||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type nodeWorker struct {
|
||||
|
@ -35,9 +36,33 @@ type nodeWorker struct {
|
|||
// NewNodeWorker starts nWorkers goroutines running fn on incoming
|
||||
// nodes sent on the returned channel. fn may block; writes to the
|
||||
// channel will buffer.
|
||||
// If nWorkers is negative, a new goroutine running fn is called for each
|
||||
// item sent on the returned channel.
|
||||
// When the returned channel is closed, fn is called with (nil, false)
|
||||
// after all other calls to fn have completed.
|
||||
func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
|
||||
if nWorkers == 0 {
|
||||
panic("invalid nWorkers valid of 0")
|
||||
}
|
||||
retc := make(chan *node, buffered)
|
||||
if nWorkers < 0 {
|
||||
// Unbounded number of workers.
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
for w := range retc {
|
||||
wg.Add(1)
|
||||
go func(w *node) {
|
||||
fn(w, true)
|
||||
wg.Done()
|
||||
}(w)
|
||||
}
|
||||
wg.Wait()
|
||||
fn(nil, false)
|
||||
}()
|
||||
return retc
|
||||
}
|
||||
w := &nodeWorker{
|
||||
c: make(chan *node, buffered),
|
||||
c: retc,
|
||||
workc: make(chan *node, buffered),
|
||||
donec: make(chan bool), // when workers finish
|
||||
fn: fn,
|
||||
|
@ -53,7 +78,7 @@ func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
|
|||
}
|
||||
fn(nil, false) // final sentinel
|
||||
}()
|
||||
return w.c
|
||||
return retc
|
||||
}
|
||||
|
||||
func (w *nodeWorker) pump() {
|
||||
|
|
Loading…
Reference in New Issue