pkg/search: don't run a speculative search more than once a time

When new blobs arrive we re-run open search queries (those requested
by an active websocket client) in case they changed, and then push the
new result to the browser if they did change.

But when uploading lots of data if the browser has a big search query
active, it was possible to start running the same query in many
goroutines over the same data. Don't do that.

Verified that this was the case with some temporary logging in the
already-refreshing case.

Updates camlistore/camlistore#519
This commit is contained in:
Brad Fitzpatrick 2015-12-29 13:33:45 -08:00
parent adbd439a14
commit c36a3a087e
1 changed files with 29 additions and 4 deletions

View File

@ -99,7 +99,7 @@ func (h *wsHub) run() {
// New blob was received. Kick off standing search queries to see if any changed.
for conn := range h.conns {
for _, wq := range conn.queries {
go h.doSearch(wq)
go h.redoSearch(wq)
}
}
case wr := <-h.watchReq:
@ -140,6 +140,29 @@ func (h *wsHub) run() {
}
}
// redoSearch is called (in its own goroutine) when a new schema blob
// arrives to note that wq might now have new results and we should
// re-run it. But because a search can take awhile, don't run more
// than one refresh at a time.
func (h *wsHub) redoSearch(wq *watchedQuery) {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.dirty = true
if wq.refreshing {
// Somebody else is already refreshing.
// One's enough.
return
}
for wq.dirty {
wq.refreshing = true
wq.dirty = false
wq.mu.Unlock() // release lock while running query; might become dirty meanwhile
h.doSearch(wq)
wq.mu.Lock() // before checking wq.dirty
}
wq.refreshing = false
}
func (h *wsHub) doSearch(wq *watchedQuery) {
// Make our own copy, in case
q := new(SearchQuery)
@ -182,9 +205,11 @@ type watchedQuery struct {
tag string
q *SearchQuery
mu sync.Mutex // guards lastRes
lastres *SearchResult
lastresj []byte // as JSON
mu sync.Mutex // guards following
refreshing bool // search is currently running
dirty bool // new schema blob arrived while refreshing; another refresh due
lastres *SearchResult
lastresj []byte // as JSON
}
// watchReq is a (un)subscribe request.