diff --git a/pkg/index/corpus.go b/pkg/index/corpus.go index f809549ec..7941814a7 100644 --- a/pkg/index/corpus.go +++ b/pkg/index/corpus.go @@ -41,6 +41,7 @@ import ( // Corpus is an in-memory summary of all of a user's blobs' metadata. type Corpus struct { mu sync.RWMutex + //mu syncutil.RWMutexTracker // when debugging // building is true at start while scanning all rows in the // index. While building, certain invariants (like things @@ -97,6 +98,12 @@ type Corpus struct { ss []string } +// RLock locks the Corpus for reads. It must be used for any "Locked" methods. +func (c *Corpus) RLock() { c.mu.RLock() } + +// RUnlock unlocks the Corpus for reads. +func (c *Corpus) RUnlock() { c.mu.RUnlock() } + type edge struct { edgeType string peer blob.Ref @@ -485,14 +492,15 @@ func (c *Corpus) br(br blob.Ref) blob.Ref { // *********** Reading from the corpus -// EnumerateCamliBlobs sends just camlistore meta blobs to ch. +// EnumerateCamliBlobsLocked sends just camlistore meta blobs to ch. +// +// The Corpus must already be locked with RLock. +// // If camType is empty, all camlistore blobs are sent, otherwise it specifies // the camliType to send. -// ch is closed at the end. It never returns an error. -func (c *Corpus) EnumerateCamliBlobs(ctx *context.Context, camType string, ch chan<- camtypes.BlobMeta) error { +// ch is closed at the end. The err will either be nil or context.ErrCanceled. +func (c *Corpus) EnumerateCamliBlobsLocked(ctx *context.Context, camType string, ch chan<- camtypes.BlobMeta) error { defer close(ch) - c.mu.RLock() - defer c.mu.RUnlock() for t, m := range c.camBlobs { if camType != "" && camType != t { continue @@ -508,10 +516,11 @@ func (c *Corpus) EnumerateCamliBlobs(ctx *context.Context, camType string, ch ch return nil } -func (c *Corpus) EnumerateBlobMeta(ctx *context.Context, ch chan<- camtypes.BlobMeta) error { +// EnumerateBlobMetaLocked sends all known blobs to ch, or until the context is canceled. +// +// The Corpus must already be locked with RLock. +func (c *Corpus) EnumerateBlobMetaLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error { defer close(ch) - c.mu.RLock() - defer c.mu.RUnlock() for _, bm := range c.blobs { select { case ch <- *bm: @@ -537,13 +546,15 @@ func (s byPermanodeModtime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // EnumeratePermanodesLastModified sends all permanodes, sorted by most recently modified first, to ch, // or until ctx is done. -func (c *Corpus) EnumeratePermanodesLastModified(ctx *context.Context, ch chan<- camtypes.BlobMeta) error { +// +// The Corpus must already be locked with RLock. +func (c *Corpus) EnumeratePermanodesLastModifiedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error { defer close(ch) - c.mu.RLock() - defer c.mu.RUnlock() + + // TODO: keep these sorted in memory pns := make([]pnAndTime, 0, len(c.permanodes)) for pn := range c.permanodes { - if modt, ok := c.permanodeModtimeLocked(pn); ok { + if modt, ok := c.PermanodeModtimeLocked(pn); ok { pns = append(pns, pnAndTime{pn, modt}) } } @@ -555,6 +566,7 @@ func (c *Corpus) EnumeratePermanodesLastModified(ctx *context.Context, ch chan<- } select { case ch <- *bm: + continue case <-ctx.Done(): return context.ErrCanceled } @@ -597,10 +609,12 @@ func (c *Corpus) PermanodeModtime(pn blob.Ref) (t time.Time, ok bool) { // TODO: figure out behavior wrt mutations by different people c.mu.RLock() defer c.mu.RUnlock() - return c.permanodeModtimeLocked(pn) + return c.PermanodeModtimeLocked(pn) } -func (c *Corpus) permanodeModtimeLocked(pn blob.Ref) (t time.Time, ok bool) { +// PermanodeModtimeLocked is like PermanodeModtime but for when the Corpus is +// already locked via RLock. +func (c *Corpus) PermanodeModtimeLocked(pn blob.Ref) (t time.Time, ok bool) { pm, ok := c.permanodes[pn] if !ok { return @@ -623,6 +637,18 @@ func (c *Corpus) permanodeModtimeLocked(pn blob.Ref) (t time.Time, ok bool) { // signerFilter is optional. // dst must start with length 0 (laziness, mostly) func (c *Corpus) AppendPermanodeAttrValues(dst []string, + permaNode blob.Ref, + attr string, + at time.Time, + signerFilter blob.Ref) []string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.AppendPermanodeAttrValuesLocked(dst, permaNode, attr, at, signerFilter) +} + +// AppendPermanodeAttrValuesLocked is the version of AppendPermanodeAttrValues that assumes +// the Corpus is already locked with RLock. +func (c *Corpus) AppendPermanodeAttrValuesLocked(dst []string, permaNode blob.Ref, attr string, at time.Time, @@ -630,8 +656,6 @@ func (c *Corpus) AppendPermanodeAttrValues(dst []string, if len(dst) > 0 { panic("len(dst) must be 0") } - c.mu.RLock() - defer c.mu.RUnlock() pm, ok := c.permanodes[permaNode] if !ok { return dst diff --git a/pkg/index/index.go b/pkg/index/index.go index b75cc54ab..98c679e7b 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -1048,7 +1048,9 @@ func enumerateSignerKeyId(s sorted.KeyValue, cb func(blob.Ref, string)) (err err // EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch. func (x *Index) EnumerateBlobMeta(ctx *context.Context, ch chan<- camtypes.BlobMeta) (err error) { if x.corpus != nil { - return x.corpus.EnumerateBlobMeta(ctx, ch) + x.corpus.RLock() + defer x.corpus.RUnlock() + return x.corpus.EnumerateBlobMetaLocked(ctx, ch) } defer close(ch) return enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error { diff --git a/pkg/search/export_test.go b/pkg/search/export_test.go index a24f340f2..569570447 100644 --- a/pkg/search/export_test.go +++ b/pkg/search/export_test.go @@ -20,7 +20,7 @@ func SetTestHookBug121(hook func()) { testHookBug121 = hook } -func ExportCandSource() string { return candSource } +func ExportSetCandidateSourceHook(fn func(string)) { candSourceHook = fn } func ExportBufferedConst() int { return buffered } diff --git a/pkg/search/query.go b/pkg/search/query.go index 921499062..fcaefc00a 100644 --- a/pkg/search/query.go +++ b/pkg/search/query.go @@ -24,11 +24,11 @@ import ( "net/http" "os" "strings" + "sync" "time" "camlistore.org/pkg/blob" "camlistore.org/pkg/context" - "camlistore.org/pkg/syncutil" "camlistore.org/pkg/types/camtypes" ) @@ -383,6 +383,7 @@ type search struct { h *Handler q *SearchQuery res *SearchResult + ctx *context.Context // ss is a scratch string slice to avoid allocations. // We assume (at least so far) that only 1 goroutine is used @@ -411,14 +412,21 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) { h: h, q: q, res: res, + ctx: context.TODO(), } + defer s.ctx.Cancel() - ctx := context.TODO() + corpus := h.corpus + var unlockOnce sync.Once + if corpus != nil { + corpus.RLock() + defer unlockOnce.Do(corpus.RUnlock) + } ch := make(chan camtypes.BlobMeta, buffered) errc := make(chan error, 1) - sendCtx := ctx.New() + sendCtx := s.ctx.New() defer sendCtx.Cancel() go func() { errc <- q.sendAllCandidates(sendCtx, s, ch) @@ -454,6 +462,10 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) { } } + if corpus != nil { + unlockOnce.Do(corpus.RUnlock) + } + if q.Describe != nil { q.Describe.BlobRef = blob.Ref{} // zero this out, if caller set it blobs := make([]blob.Ref, 0, len(res.Blobs)) @@ -482,12 +494,8 @@ func anyCamliType(s *search, br blob.Ref, bm camtypes.BlobMeta) (bool, error) { return bm.CamliType != "", nil } -// For testing only. -// Not thread-safe. -var ( - // candSource is the most recent strategy that sendAllCandidates used. - candSource string -) +// Test hook. +var candSourceHook func(string) // sendAllCandidates sends all possible matches to dst. // dst must be closed, regardless of error. @@ -496,16 +504,22 @@ func (q *SearchQuery) sendAllCandidates(ctx *context.Context, s *search, dst cha corpus := s.h.corpus if corpus != nil { if c.onlyMatchesPermanode() && q.Sort == LastModifiedDesc { - candSource = "corpus_permanode_desc" - return corpus.EnumeratePermanodesLastModified(ctx, dst) + if candSourceHook != nil { + candSourceHook("corpus_permanode_desc") + } + return corpus.EnumeratePermanodesLastModifiedLocked(ctx, dst) } if c.AnyCamliType || c.CamliType != "" { camType := c.CamliType // empty means all - candSource = "camli_blob_meta" - return corpus.EnumerateCamliBlobs(ctx, camType, dst) + if candSourceHook != nil { + candSourceHook("camli_blob_meta") + } + return corpus.EnumerateCamliBlobsLocked(ctx, camType, dst) } } - candSource = "all_blob_meta" + if candSourceHook != nil { + candSourceHook("all_blob_meta") + } return s.h.index.EnumerateBlobMeta(ctx, dst) } @@ -601,40 +615,41 @@ func (c *LogicalConstraint) checkValid() error { } func (c *LogicalConstraint) blobMatches(s *search, br blob.Ref, bm camtypes.BlobMeta) (bool, error) { + // Note: not using multiple goroutines here, because + // so far the *search type assumes it's + // single-threaded. (e.g. the .ss scratch type). + // Also, not using multiple goroutines means we can + // short-circuit when Op == "and" and av is false. + + av, err := c.A.blobMatches(s, br, bm) + if err != nil { + return false, err + } switch c.Op { - case "and", "xor": - var g syncutil.Group - var av, bv bool - g.Go(func() (err error) { - av, err = c.A.blobMatches(s, br, bm) - return - }) - g.Go(func() (err error) { - bv, err = c.B.blobMatches(s, br, bm) - return - }) - if err := g.Err(); err != nil { - return false, err - } - switch c.Op { - case "and": - return av && bv, nil - case "xor": - return av != bv, nil + case "not": + return !av, nil + case "and": + if !av { + // Short-circuit. + return false, nil } case "or": - av, err := c.A.blobMatches(s, br, bm) - if err != nil { - return false, err - } if av { // Short-circuit. return true, nil } - return c.B.blobMatches(s, br, bm) - case "not": - v, err := c.A.blobMatches(s, br, bm) - return !v, err + } + + bv, err := c.B.blobMatches(s, br, bm) + if err != nil { + return false, err + } + + switch c.Op { + case "and", "or": + return bv, nil + case "xor": + return av != bv, nil } panic("unreachable") } @@ -689,7 +704,7 @@ func (c *PermanodeConstraint) blobMatches(s *search, br blob.Ref, bm camtypes.Bl if corpus == nil { vals = dp.Attr[c.Attr] } else { - s.ss = corpus.AppendPermanodeAttrValues( + s.ss = corpus.AppendPermanodeAttrValuesLocked( s.ss[:0], br, c.Attr, c.At, s.h.owner) vals = s.ss } @@ -700,7 +715,7 @@ func (c *PermanodeConstraint) blobMatches(s *search, br blob.Ref, bm camtypes.Bl } if c.ModTime != nil { if corpus != nil { - mt, ok := corpus.PermanodeModtime(br) + mt, ok := corpus.PermanodeModtimeLocked(br) if !ok || !c.ModTime.timeMatches(mt) { return false, nil } diff --git a/pkg/search/query_test.go b/pkg/search/query_test.go index e5a46e9a1..175104a50 100644 --- a/pkg/search/query_test.go +++ b/pkg/search/query_test.go @@ -553,6 +553,11 @@ func TestQueryRecentPermanodes(t *testing.T) { p3 := id.NewPlannedPermanode("3") id.SetAttribute(p3, "foo", "p3") + var usedSource string + ExportSetCandidateSourceHook(func(s string) { + usedSource = s + }) + req := &SearchQuery{ Constraint: &Constraint{ Permanode: &PermanodeConstraint{}, @@ -565,8 +570,8 @@ func TestQueryRecentPermanodes(t *testing.T) { if err != nil { qt.t.Fatal(err) } - if s := ExportCandSource(); s != "corpus_permanode_desc" { - t.Errorf("used candidate source strategy %q; want corpus_permanode_desc", s) + if usedSource != "corpus_permanode_desc" { + t.Errorf("used candidate source strategy %q; want corpus_permanode_desc", usedSource) } wantBlobs := []*SearchResultBlob{ {Blob: p3},