camput: add an unbounded mode to chanworker.

Change-Id: I90040fc6a67c88ae54338ab9556295861170a871
This commit is contained in:
Brad Fitzpatrick 2013-09-10 20:07:20 -07:00
parent e731931edd
commit 058434449c
1 changed files with 27 additions and 2 deletions

View File

@ -21,6 +21,7 @@ package main
import ( import (
"container/list" "container/list"
"sync"
) )
type nodeWorker struct { type nodeWorker struct {
@ -35,9 +36,33 @@ type nodeWorker struct {
// NewNodeWorker starts nWorkers goroutines running fn on incoming // NewNodeWorker starts nWorkers goroutines running fn on incoming
// nodes sent on the returned channel. fn may block; writes to the // nodes sent on the returned channel. fn may block; writes to the
// channel will buffer. // channel will buffer.
// If nWorkers is negative, a new goroutine running fn is called for each
// item sent on the returned channel.
// When the returned channel is closed, fn is called with (nil, false)
// after all other calls to fn have completed.
func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node { func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
if nWorkers == 0 {
panic("invalid nWorkers valid of 0")
}
retc := make(chan *node, buffered)
if nWorkers < 0 {
// Unbounded number of workers.
go func() {
var wg sync.WaitGroup
for w := range retc {
wg.Add(1)
go func(w *node) {
fn(w, true)
wg.Done()
}(w)
}
wg.Wait()
fn(nil, false)
}()
return retc
}
w := &nodeWorker{ w := &nodeWorker{
c: make(chan *node, buffered), c: retc,
workc: make(chan *node, buffered), workc: make(chan *node, buffered),
donec: make(chan bool), // when workers finish donec: make(chan bool), // when workers finish
fn: fn, fn: fn,
@ -53,7 +78,7 @@ func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node {
} }
fn(nil, false) // final sentinel fn(nil, false) // final sentinel
}() }()
return w.c return retc
} }
func (w *nodeWorker) pump() { func (w *nodeWorker) pump() {