diff --git a/cmd/camput/chanworker.go b/cmd/camput/chanworker.go index ac5d7961a..d65dc30ef 100644 --- a/cmd/camput/chanworker.go +++ b/cmd/camput/chanworker.go @@ -21,6 +21,7 @@ package main import ( "container/list" + "sync" ) type nodeWorker struct { @@ -35,9 +36,33 @@ type nodeWorker struct { // NewNodeWorker starts nWorkers goroutines running fn on incoming // nodes sent on the returned channel. fn may block; writes to the // channel will buffer. +// If nWorkers is negative, a new goroutine running fn is called for each +// item sent on the returned channel. +// When the returned channel is closed, fn is called with (nil, false) +// after all other calls to fn have completed. func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node { + if nWorkers == 0 { + panic("invalid nWorkers valid of 0") + } + retc := make(chan *node, buffered) + if nWorkers < 0 { + // Unbounded number of workers. + go func() { + var wg sync.WaitGroup + for w := range retc { + wg.Add(1) + go func(w *node) { + fn(w, true) + wg.Done() + }(w) + } + wg.Wait() + fn(nil, false) + }() + return retc + } w := &nodeWorker{ - c: make(chan *node, buffered), + c: retc, workc: make(chan *node, buffered), donec: make(chan bool), // when workers finish fn: fn, @@ -53,7 +78,7 @@ func NewNodeWorker(nWorkers int, fn func(n *node, ok bool)) chan<- *node { } fn(nil, false) // final sentinel }() - return w.c + return retc } func (w *nodeWorker) pump() {