diff --git a/pkg/search/websocket.go b/pkg/search/websocket.go index bb6aec006..85d63fae6 100644 --- a/pkg/search/websocket.go +++ b/pkg/search/websocket.go @@ -99,7 +99,7 @@ func (h *wsHub) run() { // New blob was received. Kick off standing search queries to see if any changed. for conn := range h.conns { for _, wq := range conn.queries { - go h.doSearch(wq) + go h.redoSearch(wq) } } case wr := <-h.watchReq: @@ -140,6 +140,29 @@ func (h *wsHub) run() { } } +// redoSearch is called (in its own goroutine) when a new schema blob +// arrives to note that wq might now have new results and we should +// re-run it. But because a search can take awhile, don't run more +// than one refresh at a time. +func (h *wsHub) redoSearch(wq *watchedQuery) { + wq.mu.Lock() + defer wq.mu.Unlock() + wq.dirty = true + if wq.refreshing { + // Somebody else is already refreshing. + // One's enough. + return + } + for wq.dirty { + wq.refreshing = true + wq.dirty = false + wq.mu.Unlock() // release lock while running query; might become dirty meanwhile + h.doSearch(wq) + wq.mu.Lock() // before checking wq.dirty + } + wq.refreshing = false +} + func (h *wsHub) doSearch(wq *watchedQuery) { // Make our own copy, in case q := new(SearchQuery) @@ -182,9 +205,11 @@ type watchedQuery struct { tag string q *SearchQuery - mu sync.Mutex // guards lastRes - lastres *SearchResult - lastresj []byte // as JSON + mu sync.Mutex // guards following + refreshing bool // search is currently running + dirty bool // new schema blob arrived while refreshing; another refresh due + lastres *SearchResult + lastresj []byte // as JSON } // watchReq is a (un)subscribe request.