mirror of https://github.com/perkeep/perkeep.git
camput: add an unbounded mode to chanworker.
Change-Id: I90040fc6a67c88ae54338ab9556295861170a871
This commit is contained in:
parent
e731931edd
commit
058434449c
|
@ -21,6 +21,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type nodeWorker struct {
|
type nodeWorker struct {
|
||||||
|
@ -35,9 +36,33 @@ type nodeWorker struct {
|
||||||
// NewNodeWorker starts nWorkers goroutines running fn on incoming
|
// NewNodeWorker starts nWorkers goroutines running fn on incoming
|
||||||
// nodes sent on the returned channel. fn may block; writes to the
|
// nodes sent on the returned channel. fn may block; writes to the
|
||||||
// channel will buffer.
|
// channel will buffer.
|
||||||
|
// If nWorkers is negative, a new goroutine running fn is called for each
|
||||||
|
// item sent on the returned channel.
|
||||||
|
// When the returned channel is closed, fn is called with (nil, false)
|
||||||
|
// after all other calls to fn have completed.
|
||||||
func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
|
func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
|
||||||
|
if nWorkers == 0 {
|
||||||
|
panic("invalid nWorkers valid of 0")
|
||||||
|
}
|
||||||
|
retc := make(chan *node, buffered)
|
||||||
|
if nWorkers < 0 {
|
||||||
|
// Unbounded number of workers.
|
||||||
|
go func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for w := range retc {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(w *node) {
|
||||||
|
fn(w, true)
|
||||||
|
wg.Done()
|
||||||
|
}(w)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
fn(nil, false)
|
||||||
|
}()
|
||||||
|
return retc
|
||||||
|
}
|
||||||
w := &nodeWorker{
|
w := &nodeWorker{
|
||||||
c: make(chan *node, buffered),
|
c: retc,
|
||||||
workc: make(chan *node, buffered),
|
workc: make(chan *node, buffered),
|
||||||
donec: make(chan bool), // when workers finish
|
donec: make(chan bool), // when workers finish
|
||||||
fn: fn,
|
fn: fn,
|
||||||
|
@ -53,7 +78,7 @@ func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
|
||||||
}
|
}
|
||||||
fn(nil, false) // final sentinel
|
fn(nil, false) // final sentinel
|
||||||
}()
|
}()
|
||||||
return w.c
|
return retc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *nodeWorker) pump() {
|
func (w *nodeWorker) pump() {
|
||||||
|
|
Loading…
Reference in New Issue