Fix corpus/search locking bugs.

The WebSockets live queries exposed a latent locking bug: the search
code was acquiring the Corpus read lock multiple times.  If a write
came in during the middle of a search, between two read locks, the
second RLock would hang (since a writer lock was pending) and then the
search and write would deadlock.

Instead, just acquire the read lock once at the beginning of the query and
never re-acquire it.  For now: new "Locked" versions of all the Corpus APIs.
In the future: will probably just add the locked-or-not into the context, and make
sure all Corpus methods require a *context.Context.

Also fix two potential data races: one with LogicalCondition (which
might've raced on the scratch ss buffer, depending on the query), and
a test-only race that didn't matter in production, but was annoying in
tsan output regardless, so fixed.

More tests to follow later. Unblocking Aaron for now.

Change-Id: Ie1edcbd061235f1ef04f9b4c68b86814c84361af
This commit is contained in:
Brad Fitzpatrick 2013-12-12 11:34:07 +04:00
parent 3da18bc2da
commit a0576f2c0d
5 changed files with 109 additions and 63 deletions

View File

@ -41,6 +41,7 @@ import (
// Corpus is an in-memory summary of all of a user's blobs' metadata.
type Corpus struct {
mu sync.RWMutex
//mu syncutil.RWMutexTracker // when debugging
// building is true at start while scanning all rows in the
// index. While building, certain invariants (like things
@ -97,6 +98,12 @@ type Corpus struct {
ss []string
}
// RLock locks the Corpus for reads. It must be used for any "Locked" methods.
func (c *Corpus) RLock() { c.mu.RLock() }
// RUnlock unlocks the Corpus for reads.
func (c *Corpus) RUnlock() { c.mu.RUnlock() }
type edge struct {
edgeType string
peer blob.Ref
@ -485,14 +492,15 @@ func (c *Corpus) br(br blob.Ref) blob.Ref {
// *********** Reading from the corpus
// EnumerateCamliBlobs sends just camlistore meta blobs to ch.
// EnumerateCamliBlobsLocked sends just camlistore meta blobs to ch.
//
// The Corpus must already be locked with RLock.
//
// If camType is empty, all camlistore blobs are sent, otherwise it specifies
// the camliType to send.
// ch is closed at the end. It never returns an error.
func (c *Corpus) EnumerateCamliBlobs(ctx *context.Context, camType string, ch chan<- camtypes.BlobMeta) error {
// ch is closed at the end. The err will either be nil or context.ErrCanceled.
func (c *Corpus) EnumerateCamliBlobsLocked(ctx *context.Context, camType string, ch chan<- camtypes.BlobMeta) error {
defer close(ch)
c.mu.RLock()
defer c.mu.RUnlock()
for t, m := range c.camBlobs {
if camType != "" && camType != t {
continue
@ -508,10 +516,11 @@ func (c *Corpus) EnumerateCamliBlobs(ctx *context.Context, camType string, ch ch
return nil
}
func (c *Corpus) EnumerateBlobMeta(ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
// EnumerateBlobMetaLocked sends all known blobs to ch, or until the context is canceled.
//
// The Corpus must already be locked with RLock.
func (c *Corpus) EnumerateBlobMetaLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
defer close(ch)
c.mu.RLock()
defer c.mu.RUnlock()
for _, bm := range c.blobs {
select {
case ch <- *bm:
@ -537,13 +546,15 @@ func (s byPermanodeModtime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// EnumeratePermanodesLastModified sends all permanodes, sorted by most recently modified first, to ch,
// or until ctx is done.
func (c *Corpus) EnumeratePermanodesLastModified(ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
//
// The Corpus must already be locked with RLock.
func (c *Corpus) EnumeratePermanodesLastModifiedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
defer close(ch)
c.mu.RLock()
defer c.mu.RUnlock()
// TODO: keep these sorted in memory
pns := make([]pnAndTime, 0, len(c.permanodes))
for pn := range c.permanodes {
if modt, ok := c.permanodeModtimeLocked(pn); ok {
if modt, ok := c.PermanodeModtimeLocked(pn); ok {
pns = append(pns, pnAndTime{pn, modt})
}
}
@ -555,6 +566,7 @@ func (c *Corpus) EnumeratePermanodesLastModified(ctx *context.Context, ch chan<-
}
select {
case ch <- *bm:
continue
case <-ctx.Done():
return context.ErrCanceled
}
@ -597,10 +609,12 @@ func (c *Corpus) PermanodeModtime(pn blob.Ref) (t time.Time, ok bool) {
// TODO: figure out behavior wrt mutations by different people
c.mu.RLock()
defer c.mu.RUnlock()
return c.permanodeModtimeLocked(pn)
return c.PermanodeModtimeLocked(pn)
}
func (c *Corpus) permanodeModtimeLocked(pn blob.Ref) (t time.Time, ok bool) {
// PermanodeModtimeLocked is like PermanodeModtime but for when the Corpus is
// already locked via RLock.
func (c *Corpus) PermanodeModtimeLocked(pn blob.Ref) (t time.Time, ok bool) {
pm, ok := c.permanodes[pn]
if !ok {
return
@ -623,6 +637,18 @@ func (c *Corpus) permanodeModtimeLocked(pn blob.Ref) (t time.Time, ok bool) {
// signerFilter is optional.
// dst must start with length 0 (laziness, mostly)
func (c *Corpus) AppendPermanodeAttrValues(dst []string,
permaNode blob.Ref,
attr string,
at time.Time,
signerFilter blob.Ref) []string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.AppendPermanodeAttrValuesLocked(dst, permaNode, attr, at, signerFilter)
}
// AppendPermanodeAttrValuesLocked is the version of AppendPermanodeAttrValues that assumes
// the Corpus is already locked with RLock.
func (c *Corpus) AppendPermanodeAttrValuesLocked(dst []string,
permaNode blob.Ref,
attr string,
at time.Time,
@ -630,8 +656,6 @@ func (c *Corpus) AppendPermanodeAttrValues(dst []string,
if len(dst) > 0 {
panic("len(dst) must be 0")
}
c.mu.RLock()
defer c.mu.RUnlock()
pm, ok := c.permanodes[permaNode]
if !ok {
return dst

View File

@ -1048,7 +1048,9 @@ func enumerateSignerKeyId(s sorted.KeyValue, cb func(blob.Ref, string)) (err err
// EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch.
func (x *Index) EnumerateBlobMeta(ctx *context.Context, ch chan<- camtypes.BlobMeta) (err error) {
if x.corpus != nil {
return x.corpus.EnumerateBlobMeta(ctx, ch)
x.corpus.RLock()
defer x.corpus.RUnlock()
return x.corpus.EnumerateBlobMetaLocked(ctx, ch)
}
defer close(ch)
return enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {

View File

@ -20,7 +20,7 @@ func SetTestHookBug121(hook func()) {
testHookBug121 = hook
}
func ExportCandSource() string { return candSource }
func ExportSetCandidateSourceHook(fn func(string)) { candSourceHook = fn }
func ExportBufferedConst() int { return buffered }

View File

@ -24,11 +24,11 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
"camlistore.org/pkg/syncutil"
"camlistore.org/pkg/types/camtypes"
)
@ -383,6 +383,7 @@ type search struct {
h *Handler
q *SearchQuery
res *SearchResult
ctx *context.Context
// ss is a scratch string slice to avoid allocations.
// We assume (at least so far) that only 1 goroutine is used
@ -411,14 +412,21 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
h: h,
q: q,
res: res,
ctx: context.TODO(),
}
defer s.ctx.Cancel()
ctx := context.TODO()
corpus := h.corpus
var unlockOnce sync.Once
if corpus != nil {
corpus.RLock()
defer unlockOnce.Do(corpus.RUnlock)
}
ch := make(chan camtypes.BlobMeta, buffered)
errc := make(chan error, 1)
sendCtx := ctx.New()
sendCtx := s.ctx.New()
defer sendCtx.Cancel()
go func() {
errc <- q.sendAllCandidates(sendCtx, s, ch)
@ -454,6 +462,10 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
}
}
if corpus != nil {
unlockOnce.Do(corpus.RUnlock)
}
if q.Describe != nil {
q.Describe.BlobRef = blob.Ref{} // zero this out, if caller set it
blobs := make([]blob.Ref, 0, len(res.Blobs))
@ -482,12 +494,8 @@ func anyCamliType(s *search, br blob.Ref, bm camtypes.BlobMeta) (bool, error) {
return bm.CamliType != "", nil
}
// For testing only.
// Not thread-safe.
var (
// candSource is the most recent strategy that sendAllCandidates used.
candSource string
)
// Test hook.
var candSourceHook func(string)
// sendAllCandidates sends all possible matches to dst.
// dst must be closed, regardless of error.
@ -496,16 +504,22 @@ func (q *SearchQuery) sendAllCandidates(ctx *context.Context, s *search, dst cha
corpus := s.h.corpus
if corpus != nil {
if c.onlyMatchesPermanode() && q.Sort == LastModifiedDesc {
candSource = "corpus_permanode_desc"
return corpus.EnumeratePermanodesLastModified(ctx, dst)
if candSourceHook != nil {
candSourceHook("corpus_permanode_desc")
}
return corpus.EnumeratePermanodesLastModifiedLocked(ctx, dst)
}
if c.AnyCamliType || c.CamliType != "" {
camType := c.CamliType // empty means all
candSource = "camli_blob_meta"
return corpus.EnumerateCamliBlobs(ctx, camType, dst)
if candSourceHook != nil {
candSourceHook("camli_blob_meta")
}
return corpus.EnumerateCamliBlobsLocked(ctx, camType, dst)
}
}
candSource = "all_blob_meta"
if candSourceHook != nil {
candSourceHook("all_blob_meta")
}
return s.h.index.EnumerateBlobMeta(ctx, dst)
}
@ -601,40 +615,41 @@ func (c *LogicalConstraint) checkValid() error {
}
func (c *LogicalConstraint) blobMatches(s *search, br blob.Ref, bm camtypes.BlobMeta) (bool, error) {
// Note: not using multiple goroutines here, because
// so far the *search type assumes it's
// single-threaded. (e.g. the .ss scratch type).
// Also, not using multiple goroutines means we can
// short-circuit when Op == "and" and av is false.
av, err := c.A.blobMatches(s, br, bm)
if err != nil {
return false, err
}
switch c.Op {
case "and", "xor":
var g syncutil.Group
var av, bv bool
g.Go(func() (err error) {
av, err = c.A.blobMatches(s, br, bm)
return
})
g.Go(func() (err error) {
bv, err = c.B.blobMatches(s, br, bm)
return
})
if err := g.Err(); err != nil {
return false, err
}
switch c.Op {
case "and":
return av && bv, nil
case "xor":
return av != bv, nil
case "not":
return !av, nil
case "and":
if !av {
// Short-circuit.
return false, nil
}
case "or":
av, err := c.A.blobMatches(s, br, bm)
if err != nil {
return false, err
}
if av {
// Short-circuit.
return true, nil
}
return c.B.blobMatches(s, br, bm)
case "not":
v, err := c.A.blobMatches(s, br, bm)
return !v, err
}
bv, err := c.B.blobMatches(s, br, bm)
if err != nil {
return false, err
}
switch c.Op {
case "and", "or":
return bv, nil
case "xor":
return av != bv, nil
}
panic("unreachable")
}
@ -689,7 +704,7 @@ func (c *PermanodeConstraint) blobMatches(s *search, br blob.Ref, bm camtypes.Bl
if corpus == nil {
vals = dp.Attr[c.Attr]
} else {
s.ss = corpus.AppendPermanodeAttrValues(
s.ss = corpus.AppendPermanodeAttrValuesLocked(
s.ss[:0], br, c.Attr, c.At, s.h.owner)
vals = s.ss
}
@ -700,7 +715,7 @@ func (c *PermanodeConstraint) blobMatches(s *search, br blob.Ref, bm camtypes.Bl
}
if c.ModTime != nil {
if corpus != nil {
mt, ok := corpus.PermanodeModtime(br)
mt, ok := corpus.PermanodeModtimeLocked(br)
if !ok || !c.ModTime.timeMatches(mt) {
return false, nil
}

View File

@ -553,6 +553,11 @@ func TestQueryRecentPermanodes(t *testing.T) {
p3 := id.NewPlannedPermanode("3")
id.SetAttribute(p3, "foo", "p3")
var usedSource string
ExportSetCandidateSourceHook(func(s string) {
usedSource = s
})
req := &SearchQuery{
Constraint: &Constraint{
Permanode: &PermanodeConstraint{},
@ -565,8 +570,8 @@ func TestQueryRecentPermanodes(t *testing.T) {
if err != nil {
qt.t.Fatal(err)
}
if s := ExportCandSource(); s != "corpus_permanode_desc" {
t.Errorf("used candidate source strategy %q; want corpus_permanode_desc", s)
if usedSource != "corpus_permanode_desc" {
t.Errorf("used candidate source strategy %q; want corpus_permanode_desc", usedSource)
}
wantBlobs := []*SearchResultBlob{
{Blob: p3},