mirror of https://github.com/perkeep/perkeep.git
pkg/search: change corpus enumeration signatures for speed
Avoid select overhead in hot paths. Just use funcs. Also, for sort-by-map searches, don't do a describe and pass over all the results doing location lookups a second time. Remember the location from the initial matching. Cache it on the search value. Reduces some sort-by-map searches from 10 seconds to 3 seconds for me. (still too slow, but good start) Change-Id: I632954738df9accd802f28364ed11e48ddba0d14
This commit is contained in:
parent
ee8960864b
commit
93c6d682d2
|
@ -780,46 +780,39 @@ func (c *Corpus) br(br blob.Ref) blob.Ref {
|
|||
|
||||
// *********** Reading from the corpus
|
||||
|
||||
// EnumerateCamliBlobs sends just camlistore meta blobs to ch.
|
||||
// EnumerateCamliBlobs calls fn for all known meta blobs.
|
||||
//
|
||||
// If camType is empty, all camlistore blobs are sent, otherwise it specifies
|
||||
// the camliType to send.
|
||||
// ch is closed at the end. The err will either be nil or context.Canceled.
|
||||
func (c *Corpus) EnumerateCamliBlobs(ctx context.Context, camType string, ch chan<- camtypes.BlobMeta) error {
|
||||
defer close(ch)
|
||||
// If camType is not empty, it specifies a filter for which meta blob
|
||||
// types to call fn for. If empty, all are emitted.
|
||||
//
|
||||
// If fn returns false, iteration ends.
|
||||
func (c *Corpus) EnumerateCamliBlobs(camType string, fn func(camtypes.BlobMeta) bool) {
|
||||
if camType != "" {
|
||||
for _, bm := range c.camBlobs[camType] {
|
||||
select {
|
||||
case ch <- *bm:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
if !fn(*bm) {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
for _, m := range c.camBlobs {
|
||||
for _, bm := range m {
|
||||
select {
|
||||
case ch <- *bm:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
if !fn(*bm) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnumerateBlobMeta sends all known blobs to ch, or until the context is canceled.
|
||||
func (c *Corpus) EnumerateBlobMeta(ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
defer close(ch)
|
||||
// EnumerateBlobMeta calls fn for all known meta blobs in an undefined
|
||||
// order.
|
||||
// If fn returns false, iteration ends.
|
||||
func (c *Corpus) EnumerateBlobMeta(fn func(camtypes.BlobMeta) bool) {
|
||||
for _, bm := range c.blobs {
|
||||
select {
|
||||
case ch <- *bm:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
if !fn(*bm) {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// pnAndTime is a value type wrapping a permanode blobref and its modtime.
|
||||
|
@ -909,36 +902,30 @@ func (lsp *lazySortedPermanodes) sorted(reverse bool) []pnAndTime {
|
|||
return pns
|
||||
}
|
||||
|
||||
func (c *Corpus) sendPermanodes(ctx context.Context, ch chan<- camtypes.BlobMeta, pns []pnAndTime) error {
|
||||
func (c *Corpus) enumeratePermanodes(fn func(camtypes.BlobMeta) bool, pns []pnAndTime) {
|
||||
for _, cand := range pns {
|
||||
bm := c.blobs[cand.pn]
|
||||
if bm == nil {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case ch <- *bm:
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
if !fn(*bm) {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnumeratePermanodesLastModified sends all permanodes, sorted by most recently modified first, to ch,
|
||||
// or until ctx is done.
|
||||
func (c *Corpus) EnumeratePermanodesLastModified(ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
defer close(ch)
|
||||
return c.sendPermanodes(ctx, ch, c.permanodesByModtime.sorted(true))
|
||||
// EnumeratePermanodesLastModified calls fn for all permanodes, sorted by most recently modified first.
|
||||
// Iteration ends prematurely if fn returns false.
|
||||
func (c *Corpus) EnumeratePermanodesLastModified(fn func(camtypes.BlobMeta) bool) {
|
||||
c.enumeratePermanodes(fn, c.permanodesByModtime.sorted(true))
|
||||
}
|
||||
|
||||
// EnumeratePermanodesCreated sends all permanodes to ch, or until ctx is done.
|
||||
// EnumeratePermanodesCreated calls fn for all permanodes.
|
||||
// They are sorted using the contents creation date if any, the permanode modtime
|
||||
// otherwise, and in the order specified by newestFirst.
|
||||
func (c *Corpus) EnumeratePermanodesCreated(ctx context.Context, ch chan<- camtypes.BlobMeta, newestFirst bool) error {
|
||||
defer close(ch)
|
||||
|
||||
return c.sendPermanodes(ctx, ch, c.permanodesByTime.sorted(newestFirst))
|
||||
// Iteration ends prematurely if fn returns false.
|
||||
func (c *Corpus) EnumeratePermanodesCreated(fn func(camtypes.BlobMeta) bool, newestFirst bool) {
|
||||
c.enumeratePermanodes(fn, c.permanodesByTime.sorted(newestFirst))
|
||||
}
|
||||
|
||||
func (c *Corpus) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
|
||||
|
|
|
@ -305,7 +305,17 @@ func TestKVClaim(t *testing.T) {
|
|||
func TestDeletePermanode_Modtime(t *testing.T) {
|
||||
testDeletePermanodes(t,
|
||||
func(c *index.Corpus, ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
return c.EnumeratePermanodesLastModified(ctx, ch)
|
||||
var err error
|
||||
c.EnumeratePermanodesLastModified(func(m camtypes.BlobMeta) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return false
|
||||
case ch <- m:
|
||||
return true
|
||||
}
|
||||
})
|
||||
return err
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -313,7 +323,17 @@ func TestDeletePermanode_Modtime(t *testing.T) {
|
|||
func TestDeletePermanode_CreateTime(t *testing.T) {
|
||||
testDeletePermanodes(t,
|
||||
func(c *index.Corpus, ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
return c.EnumeratePermanodesCreated(ctx, ch, true)
|
||||
var err error
|
||||
c.EnumeratePermanodesCreated(func(m camtypes.BlobMeta) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return false
|
||||
case ch <- m:
|
||||
return true
|
||||
}
|
||||
}, true)
|
||||
return err
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -419,7 +439,17 @@ func testDeletePermanodes(t *testing.T,
|
|||
func TestEnumerateOrder_Modtime(t *testing.T) {
|
||||
testEnumerateOrder(t,
|
||||
func(c *index.Corpus, ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
return c.EnumeratePermanodesLastModified(ctx, ch)
|
||||
var err error
|
||||
c.EnumeratePermanodesLastModified(func(m camtypes.BlobMeta) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return false
|
||||
case ch <- m:
|
||||
return true
|
||||
}
|
||||
})
|
||||
return err
|
||||
},
|
||||
modtimeOrder,
|
||||
)
|
||||
|
@ -428,7 +458,17 @@ func TestEnumerateOrder_Modtime(t *testing.T) {
|
|||
func TestEnumerateOrder_CreateTime(t *testing.T) {
|
||||
testEnumerateOrder(t,
|
||||
func(c *index.Corpus, ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
return c.EnumeratePermanodesCreated(ctx, ch, true)
|
||||
var err error
|
||||
c.EnumeratePermanodesCreated(func(m camtypes.BlobMeta) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return false
|
||||
case ch <- m:
|
||||
return true
|
||||
}
|
||||
}, true)
|
||||
return err
|
||||
},
|
||||
createOrder,
|
||||
)
|
||||
|
@ -497,7 +537,17 @@ func testEnumerateOrder(t *testing.T,
|
|||
func TestCacheSortedPermanodes_ModtimeRace(t *testing.T) {
|
||||
testCacheSortedPermanodesRace(t,
|
||||
func(c *index.Corpus, ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
return c.EnumeratePermanodesLastModified(ctx, ch)
|
||||
var err error
|
||||
c.EnumeratePermanodesLastModified(func(m camtypes.BlobMeta) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return false
|
||||
case ch <- m:
|
||||
return true
|
||||
}
|
||||
})
|
||||
return err
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -506,7 +556,17 @@ func TestCacheSortedPermanodes_ModtimeRace(t *testing.T) {
|
|||
func TestCacheSortedPermanodes_CreateTimeRace(t *testing.T) {
|
||||
testCacheSortedPermanodesRace(t,
|
||||
func(c *index.Corpus, ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
return c.EnumeratePermanodesCreated(ctx, ch, true)
|
||||
var err error
|
||||
c.EnumeratePermanodesCreated(func(m camtypes.BlobMeta) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return false
|
||||
case ch <- m:
|
||||
return true
|
||||
}
|
||||
}, true)
|
||||
return err
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -570,18 +630,8 @@ func TestLazySortedPermanodes(t *testing.T) {
|
|||
pn := idxd.NewPlannedPermanode("one")
|
||||
idxd.SetAttribute(pn, "tag", "one")
|
||||
|
||||
ctx := context.Background()
|
||||
enum := func(reverse bool) {
|
||||
ch := make(chan camtypes.BlobMeta, 10)
|
||||
errc := make(chan error, 1)
|
||||
go func() { errc <- c.EnumeratePermanodesCreated(ctx, ch, reverse) }()
|
||||
for range ch {
|
||||
}
|
||||
err := <-errc
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Could not enumerate permanodes: %v", err)
|
||||
}
|
||||
c.EnumeratePermanodesCreated(func(m camtypes.BlobMeta) bool { return true }, reverse)
|
||||
}
|
||||
enum(false)
|
||||
lsp = c.Exp_LSPByTime(false)
|
||||
|
|
|
@ -1470,20 +1470,48 @@ func enumerateBlobMeta(s sorted.KeyValue, cb func(camtypes.BlobMeta) error) (err
|
|||
return nil
|
||||
}
|
||||
|
||||
// EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch.
|
||||
func (x *Index) EnumerateBlobMeta(ctx context.Context, ch chan<- camtypes.BlobMeta) (err error) {
|
||||
var errStopIteration = errors.New("stop iteration") // local error, doesn't escape this package
|
||||
|
||||
// EnumerateBlobMeta calls fn for all known meta blobs.
|
||||
// If fn returns false, iteration stops and an nil error is returned.
|
||||
// If ctx becomes done, iteration stops and ctx.Err() is returned.
|
||||
func (x *Index) EnumerateBlobMeta(ctx context.Context, fn func(camtypes.BlobMeta) bool) error {
|
||||
if x.corpus != nil {
|
||||
return x.corpus.EnumerateBlobMeta(ctx, ch)
|
||||
var err error
|
||||
var n int
|
||||
done := ctx.Done()
|
||||
x.corpus.EnumerateBlobMeta(func(m camtypes.BlobMeta) bool {
|
||||
// Every so often, check context.
|
||||
n++
|
||||
if n%256 == 0 {
|
||||
select {
|
||||
case <-done:
|
||||
err = ctx.Err()
|
||||
return false
|
||||
default:
|
||||
|
||||
}
|
||||
}
|
||||
return fn(m)
|
||||
})
|
||||
return err
|
||||
}
|
||||
defer close(ch)
|
||||
return enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
|
||||
done := ctx.Done()
|
||||
err := enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
|
||||
select {
|
||||
case ch <- bm:
|
||||
case <-ctx.Done():
|
||||
case <-done:
|
||||
return ctx.Err()
|
||||
default:
|
||||
if !fn(bm) {
|
||||
return errStopIteration
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == errStopIteration {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Storage returns the index's underlying Storage implementation.
|
||||
|
|
|
@ -143,11 +143,12 @@ type Interface interface {
|
|||
// opts may be nil to accept the defaults.
|
||||
EdgesTo(ref blob.Ref, opts *camtypes.EdgesToOpts) ([]*camtypes.Edge, error)
|
||||
|
||||
// EnumerateBlobMeta sends ch information about all blobs
|
||||
// known to the indexer (which may be a subset of all total
|
||||
// blobs, since the indexer is typically configured to not see
|
||||
// non-metadata blobs) and then closes ch. When it returns an
|
||||
// error, it also closes ch. The blobs may be sent in any order.
|
||||
// If the context finishes, the return error is ctx.Err().
|
||||
EnumerateBlobMeta(context.Context, chan<- camtypes.BlobMeta) error
|
||||
// EnumerateBlobMeta calls fn for each blob known to the
|
||||
// indexer (which may be a subset of all total blobs, since
|
||||
// the indexer is typically configured to not see non-metadata
|
||||
// blobs). The blobs may be sent in any order. If the context
|
||||
// finishes, the return error is ctx.Err().
|
||||
// If the provided function returns false, iteration ends with a nil
|
||||
// return value.
|
||||
EnumerateBlobMeta(context.Context, func(camtypes.BlobMeta) bool) error
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package stress
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
@ -442,15 +443,13 @@ func enumerateMeta(b *testing.B, dbfile string,
|
|||
idx.InitBlobSource(bs)
|
||||
defer idx.Close()
|
||||
|
||||
ch := make(chan camtypes.BlobMeta, 100)
|
||||
go func() {
|
||||
if err := idx.EnumerateBlobMeta(nil, ch); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}()
|
||||
n := 0
|
||||
for range ch {
|
||||
fn := func(m camtypes.BlobMeta) bool {
|
||||
n++
|
||||
return true
|
||||
}
|
||||
if err := idx.EnumerateBlobMeta(context.Background(), fn); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.Logf("Enumerated %d meta blobs", n)
|
||||
return n
|
||||
|
|
|
@ -852,6 +852,13 @@ type search struct {
|
|||
// We assume (at least so far) that only 1 goroutine is used
|
||||
// for a given search, so anything can use this.
|
||||
ss []string // scratch
|
||||
|
||||
// loc is a cache of calculated locations.
|
||||
//
|
||||
// TODO: if location-of-permanode were cheaper and cached in
|
||||
// the corpus instead, then we wouldn't need this. And then
|
||||
// searches would be faster anyway. This is a hack.
|
||||
loc map[blob.Ref]camtypes.Location
|
||||
}
|
||||
|
||||
func (s *search) blobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
|
||||
|
@ -877,7 +884,22 @@ func optimizePlan(c *Constraint) *Constraint {
|
|||
return c
|
||||
}
|
||||
|
||||
func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
||||
var debugQuerySpeed, _ = strconv.ParseBool(os.Getenv("CAMLI_DEBUG_QUERY_SPEED"))
|
||||
|
||||
func (h *Handler) Query(rawq *SearchQuery) (ret_ *SearchResult, _ error) {
|
||||
if debugQuerySpeed {
|
||||
t0 := time.Now()
|
||||
jq, _ := json.Marshal(rawq)
|
||||
log.Printf("Start %v, Doing search %s... ", t0.Format(time.RFC3339), jq)
|
||||
defer func() {
|
||||
d := time.Since(t0)
|
||||
if ret_ != nil {
|
||||
log.Printf("Start %v + %v = %v results", t0.Format(time.RFC3339), d, len(ret_.Blobs))
|
||||
} else {
|
||||
log.Printf("Start %v + %v = error")
|
||||
}
|
||||
}()
|
||||
}
|
||||
ctx := context.TODO() // TODO: set from rawq
|
||||
exprResult, err := rawq.checkValid(ctx)
|
||||
if err != nil {
|
||||
|
@ -889,6 +911,7 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
|||
h: h,
|
||||
q: q,
|
||||
res: res,
|
||||
loc: make(map[blob.Ref]camtypes.Location),
|
||||
}
|
||||
|
||||
h.index.RLock()
|
||||
|
@ -899,27 +922,23 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
|||
|
||||
corpus := h.corpus
|
||||
|
||||
ch := make(chan camtypes.BlobMeta, buffered)
|
||||
errc := make(chan error, 1)
|
||||
|
||||
cands := q.pickCandidateSource(s)
|
||||
if candSourceHook != nil {
|
||||
candSourceHook(cands.name)
|
||||
}
|
||||
|
||||
sendCtx, cancelSend := context.WithCancel(ctx)
|
||||
defer cancelSend()
|
||||
go func() { errc <- cands.send(sendCtx, s, ch) }()
|
||||
|
||||
wantAround, foundAround := false, false
|
||||
if q.Around.Valid() {
|
||||
wantAround = true
|
||||
}
|
||||
blobMatches := q.Constraint.matcher()
|
||||
for meta := range ch {
|
||||
|
||||
var enumErr error
|
||||
cands.send(ctx, s, func(meta camtypes.BlobMeta) bool {
|
||||
match, err := blobMatches(ctx, s, meta.Ref, meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
enumErr = err
|
||||
return false
|
||||
}
|
||||
if match {
|
||||
res.Blobs = append(res.Blobs, &SearchResultBlob{
|
||||
|
@ -930,20 +949,19 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
|||
// we temporarily ignore the limit.
|
||||
// TODO(mpl): the above means that we also ignore Continue and Around here. I
|
||||
// don't think we need them for the map aspect for now though.
|
||||
continue
|
||||
return true
|
||||
}
|
||||
if q.Limit <= 0 || !cands.sorted {
|
||||
if wantAround && !foundAround && q.Around == meta.Ref {
|
||||
foundAround = true
|
||||
}
|
||||
continue
|
||||
return true
|
||||
}
|
||||
if !wantAround || foundAround {
|
||||
if len(res.Blobs) == q.Limit {
|
||||
cancelSend()
|
||||
break
|
||||
return false
|
||||
}
|
||||
continue
|
||||
return true
|
||||
}
|
||||
if q.Around == meta.Ref {
|
||||
foundAround = true
|
||||
|
@ -960,19 +978,19 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
|||
res.Blobs = res.Blobs[discard:]
|
||||
}
|
||||
if len(res.Blobs) == q.Limit {
|
||||
cancelSend()
|
||||
break
|
||||
return false
|
||||
}
|
||||
continue
|
||||
return true
|
||||
}
|
||||
if len(res.Blobs) == q.Limit {
|
||||
n := copy(res.Blobs, res.Blobs[len(res.Blobs)/2:])
|
||||
res.Blobs = res.Blobs[:n]
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := <-errc; err != nil && err != context.Canceled {
|
||||
return nil, err
|
||||
return true
|
||||
})
|
||||
if enumErr != nil {
|
||||
return nil, enumErr
|
||||
}
|
||||
if wantAround && !foundAround {
|
||||
// results are ignored if Around was not found
|
||||
|
@ -1055,45 +1073,42 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// describeReq is used as a backup of q.Describe, because DescribeLocked renders
|
||||
// q.Describe useless for a second describe call, which we need to do after
|
||||
// bestByLocation.
|
||||
var describeReq DescribeRequest
|
||||
if q.Describe != nil {
|
||||
if q.Sort == MapSort {
|
||||
describeReq = *(q.Describe)
|
||||
// Populate s.res.LocationArea
|
||||
{
|
||||
var la camtypes.LocationBounds
|
||||
for _, v := range res.Blobs {
|
||||
br := v.Blob
|
||||
loc, ok := s.loc[br]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
la = la.Expand(loc)
|
||||
}
|
||||
if la != (camtypes.LocationBounds{}) {
|
||||
s.res.LocationArea = &la
|
||||
}
|
||||
}
|
||||
|
||||
if q.Sort == MapSort {
|
||||
bestByLocation(s.res, s.loc, q.Limit)
|
||||
}
|
||||
|
||||
if q.Describe != nil {
|
||||
q.Describe.BlobRef = blob.Ref{} // zero this out, if caller set it
|
||||
blobs := make([]blob.Ref, 0, len(res.Blobs))
|
||||
for _, srb := range res.Blobs {
|
||||
blobs = append(blobs, srb.Blob)
|
||||
}
|
||||
q.Describe.BlobRefs = blobs
|
||||
t0 := time.Now()
|
||||
res, err := s.h.DescribeLocked(ctx, q.Describe)
|
||||
log.Printf("Describe of %d blobs = %v", len(blobs), time.Since(t0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.res.Describe = res
|
||||
}
|
||||
|
||||
if q.Sort == MapSort {
|
||||
bestByLocation(s.res, q.Limit)
|
||||
// The first describe was about all the blobs that were matching the constraint,
|
||||
// without any limit, so it could be uselessly pretty large. So we describe
|
||||
// again, this time only on the blobs that were selected by bestByLocation.
|
||||
q.Describe = &describeReq
|
||||
q.Describe.BlobRef = blob.Ref{} // zero this out, if caller set it
|
||||
blobs := make([]blob.Ref, 0, len(s.res.Blobs))
|
||||
for _, srb := range res.Blobs {
|
||||
blobs = append(blobs, srb.Blob)
|
||||
}
|
||||
q.Describe.BlobRefs = blobs
|
||||
res, err := s.h.DescribeLocked(ctx, q.Describe)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.res.Describe = res
|
||||
}
|
||||
return s.res, nil
|
||||
}
|
||||
|
||||
|
@ -1113,7 +1128,8 @@ func (h *Handler) Query(rawq *SearchQuery) (*SearchResult, error) {
|
|||
// N = (number of non empty cells) / limit
|
||||
// 5) for each cell, append to the set of selected nodes the first N nodes of
|
||||
// the cell.
|
||||
func bestByLocation(res *SearchResult, limit int) {
|
||||
func bestByLocation(res *SearchResult, locm map[blob.Ref]camtypes.Location, limit int) {
|
||||
// Calculate res.LocationArea.
|
||||
if len(res.Blobs) <= limit {
|
||||
return
|
||||
}
|
||||
|
@ -1138,17 +1154,12 @@ func bestByLocation(res *SearchResult, limit int) {
|
|||
latZero := area.North
|
||||
longZero := area.West
|
||||
|
||||
meta := res.Describe.Meta
|
||||
for _, v := range res.Blobs {
|
||||
br := v.Blob
|
||||
dbr, ok := meta[br.String()]
|
||||
loc, ok := locm[br]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
loc := dbr.Location
|
||||
if loc == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
relLat := latZero - loc.Latitude
|
||||
relLong := loc.Longitude - longZero
|
||||
|
@ -1263,7 +1274,7 @@ type candidateSource struct {
|
|||
|
||||
// sends sends to the channel and must close it, regardless of error
|
||||
// or interruption from context.Done().
|
||||
send func(context.Context, *search, chan<- camtypes.BlobMeta) error
|
||||
send func(context.Context, *search, func(camtypes.BlobMeta) bool) error
|
||||
}
|
||||
|
||||
func (q *SearchQuery) pickCandidateSource(s *search) (src candidateSource) {
|
||||
|
@ -1275,14 +1286,16 @@ func (q *SearchQuery) pickCandidateSource(s *search) (src candidateSource) {
|
|||
switch q.Sort {
|
||||
case LastModifiedDesc:
|
||||
src.name = "corpus_permanode_lastmod"
|
||||
src.send = func(ctx context.Context, s *search, dst chan<- camtypes.BlobMeta) error {
|
||||
return corpus.EnumeratePermanodesLastModified(ctx, dst)
|
||||
src.send = func(ctx context.Context, s *search, fn func(camtypes.BlobMeta) bool) error {
|
||||
corpus.EnumeratePermanodesLastModified(fn)
|
||||
return nil
|
||||
}
|
||||
return
|
||||
case CreatedDesc:
|
||||
src.name = "corpus_permanode_created"
|
||||
src.send = func(ctx context.Context, s *search, dst chan<- camtypes.BlobMeta) error {
|
||||
return corpus.EnumeratePermanodesCreated(ctx, dst, true)
|
||||
src.send = func(ctx context.Context, s *search, fn func(camtypes.BlobMeta) bool) error {
|
||||
corpus.EnumeratePermanodesCreated(fn, true)
|
||||
return nil
|
||||
}
|
||||
return
|
||||
default:
|
||||
|
@ -1292,23 +1305,25 @@ func (q *SearchQuery) pickCandidateSource(s *search) (src candidateSource) {
|
|||
// fastpath for files
|
||||
if c.matchesFileByWholeRef() {
|
||||
src.name = "corpus_file_meta"
|
||||
src.send = func(ctx context.Context, s *search, dst chan<- camtypes.BlobMeta) error {
|
||||
return corpus.EnumerateCamliBlobs(ctx, "file", dst)
|
||||
src.send = func(ctx context.Context, s *search, fn func(camtypes.BlobMeta) bool) error {
|
||||
corpus.EnumerateCamliBlobs("file", fn)
|
||||
return nil
|
||||
}
|
||||
return
|
||||
}
|
||||
if c.AnyCamliType || c.CamliType != "" {
|
||||
camType := c.CamliType // empty means all
|
||||
src.name = "corpus_blob_meta"
|
||||
src.send = func(ctx context.Context, s *search, dst chan<- camtypes.BlobMeta) error {
|
||||
return corpus.EnumerateCamliBlobs(ctx, camType, dst)
|
||||
src.send = func(ctx context.Context, s *search, fn func(camtypes.BlobMeta) bool) error {
|
||||
corpus.EnumerateCamliBlobs(camType, fn)
|
||||
return nil
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
src.name = "index_blob_meta"
|
||||
src.send = func(ctx context.Context, s *search, dst chan<- camtypes.BlobMeta) error {
|
||||
return s.h.index.EnumerateBlobMeta(ctx, dst)
|
||||
src.send = func(ctx context.Context, s *search, fn func(camtypes.BlobMeta) bool) error {
|
||||
return s.h.index.EnumerateBlobMeta(ctx, fn)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -1606,16 +1621,7 @@ func (c *PermanodeConstraint) blobMatches(ctx context.Context, s *search, br blo
|
|||
if !c.Location.matchesLatLong(l.Latitude, l.Longitude) {
|
||||
return false, nil
|
||||
}
|
||||
// If location was successfully matched, and all the rest matched as well, add
|
||||
// the location to the global location area of results.
|
||||
defer func() {
|
||||
if err == nil && ok {
|
||||
s.res.LocationArea = s.res.LocationArea.Expand(camtypes.Location{
|
||||
Latitude: l.Latitude,
|
||||
Longitude: l.Longitude,
|
||||
})
|
||||
}
|
||||
}()
|
||||
s.loc[br] = l
|
||||
}
|
||||
|
||||
if cc := c.Continue; cc != nil {
|
||||
|
@ -1798,16 +1804,14 @@ func (c *FileConstraint) blobMatches(ctx context.Context, s *search, br blob.Ref
|
|||
if !found || !c.Location.matchesLatLong(lat, long) {
|
||||
return false, nil
|
||||
}
|
||||
// If location was successfully matched, and all the rest matched as well, add
|
||||
// the location to the global location area of results.
|
||||
defer func() {
|
||||
if err == nil && ok {
|
||||
s.res.LocationArea = s.res.LocationArea.Expand(camtypes.Location{
|
||||
Latitude: lat,
|
||||
Longitude: long,
|
||||
})
|
||||
}
|
||||
}()
|
||||
// If location was successfully matched, add the
|
||||
// location to the global location area of results so
|
||||
// a sort-by-map doesn't need to look it up again
|
||||
// later.
|
||||
s.loc[br] = camtypes.Location{
|
||||
Latitude: lat,
|
||||
Longitude: long,
|
||||
}
|
||||
}
|
||||
// this makes sure, in conjunction with TestQueryFileLocation, that we only
|
||||
// expand the location iff the location matched AND the whole constraint matched as
|
||||
|
|
|
@ -226,6 +226,6 @@ func (fi *FakeIndex) EdgesTo(ref blob.Ref, opts *camtypes.EdgesToOpts) ([]*camty
|
|||
panic("NOIMPL")
|
||||
}
|
||||
|
||||
func (fi *FakeIndex) EnumerateBlobMeta(ctx context.Context, ch chan<- camtypes.BlobMeta) error {
|
||||
func (fi *FakeIndex) EnumerateBlobMeta(ctx context.Context, fn func(camtypes.BlobMeta) bool) error {
|
||||
panic("NOIMPL")
|
||||
}
|
||||
|
|
|
@ -265,9 +265,9 @@ func (l Longitude) WrapTo180() float64 {
|
|||
return lf
|
||||
}
|
||||
if lf > 0 {
|
||||
return math.Mod(lf+180., 360.) - 180.
|
||||
return math.Mod(lf+180, 360) - 180
|
||||
}
|
||||
return math.Mod(lf-180., 360.) + 180.
|
||||
return math.Mod(lf-180, 360) + 180
|
||||
}
|
||||
|
||||
// LocationBounds is a location area delimited by its fields. See Location for
|
||||
|
@ -298,16 +298,16 @@ func (l *LocationBounds) isWithinLongitude(loc Location) bool {
|
|||
// Expand returns a new LocationBounds nb. If either of loc coordinates is
|
||||
// outside of b, nb is the dimensions of b expanded as little as possible in
|
||||
// order to include loc. Otherwise, nb is just a copy of b.
|
||||
func (b *LocationBounds) Expand(loc Location) *LocationBounds {
|
||||
func (b LocationBounds) Expand(loc Location) LocationBounds {
|
||||
if b.isEmpty() {
|
||||
return &LocationBounds{
|
||||
return LocationBounds{
|
||||
North: loc.Latitude,
|
||||
South: loc.Latitude,
|
||||
West: loc.Longitude,
|
||||
East: loc.Longitude,
|
||||
}
|
||||
}
|
||||
nb := &LocationBounds{
|
||||
nb := LocationBounds{
|
||||
North: b.North,
|
||||
South: b.South,
|
||||
West: b.West,
|
||||
|
|
Loading…
Reference in New Issue