search: more in-memory search work.

keep blob metadata in memory, and start of testing all search queries in three modes:
classic index.Storage scanning, all in-memory with corpus scanned from the index.Storage,
and the in-memory corpus built up over time as blobs arrive.

Change-Id: I40536e498a63bece5bd4897cdbbd0cef78085f44
This commit is contained in:
Brad Fitzpatrick 2013-11-16 17:24:02 -08:00
parent c573feb226
commit 2984897ac7
6 changed files with 123 additions and 28 deletions

View File

@ -2,7 +2,6 @@ package index
import (
"errors"
"log"
"sync"
"camlistore.org/pkg/blob"
@ -47,8 +46,15 @@ func NewCorpusFromStorage(s Storage) (*Corpus, error) {
return nil, errors.New("storage is nil")
}
c := newCorpus()
// TODO: slurp from storage
log.Printf("TODO: NewCorpusFromStorage should slurp from storage")
err := enumerateBlobMeta(s, func(bm camtypes.BlobMeta) error {
c.blobs[bm.Ref] = bm
// TODO: populate blobref intern table
return nil
})
if err != nil {
return nil, err
}
// TODO: slurp more from storage
return c, nil
}
@ -57,3 +63,13 @@ func (x *Index) KeepInMemory() (*Corpus, error) {
x.corpus, err = NewCorpusFromStorage(x.s)
return x.corpus, err
}
func (c *Corpus) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) error {
defer close(ch)
c.mu.RLock()
defer c.mu.RUnlock()
for _, bm := range c.blobs {
ch <- bm
}
return nil
}

View File

@ -195,15 +195,23 @@ func (p *prefixIter) Next() bool {
return v
}
func (x *Index) queryPrefix(key *keyType, args ...interface{}) *prefixIter {
return x.queryPrefixString(key.Prefix(args...))
func queryPrefixString(s Storage, prefix string) *prefixIter {
return &prefixIter{
prefix: prefix,
Iterator: s.Find(prefix),
}
}
func (x *Index) queryPrefixString(prefix string) *prefixIter {
return &prefixIter{
prefix: prefix,
Iterator: x.s.Find(prefix),
}
return queryPrefixString(x.s, prefix)
}
func queryPrefix(s Storage, key *keyType, args ...interface{}) *prefixIter {
return queryPrefixString(s, key.Prefix(args...))
}
func (x *Index) queryPrefix(key *keyType, args ...interface{}) *prefixIter {
return x.queryPrefixString(key.Prefix(args...))
}
func closeIterator(it Iterator, perr *error) {
@ -922,10 +930,8 @@ func (x *Index) GetDirMembers(dir blob.Ref, dest chan<- blob.Ref, limit int) (er
return nil
}
// EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch.
func (x *Index) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) (err error) {
defer close(ch)
it := x.queryPrefixString("meta:")
func enumerateBlobMeta(s Storage, cb func(camtypes.BlobMeta) error) (err error) {
it := queryPrefixString(s, "meta:")
defer closeIterator(it, &err)
for it.Next() {
refStr := strings.TrimPrefix(it.Key(), "meta:")
@ -942,13 +948,27 @@ func (x *Index) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) (err error) {
if err != nil {
continue
}
ch <- camtypes.BlobMeta{
if err := cb(camtypes.BlobMeta{
Ref: br,
Size: size,
CamliType: camliTypeFromMIME(v[pipe+1:]),
}); err != nil {
return err
}
}
return err
return nil
}
// EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch.
func (x *Index) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) (err error) {
if x.corpus != nil {
return x.corpus.EnumerateBlobMeta(ch)
}
defer close(ch)
return enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
ch <- bm
return nil
})
}
// Storage returns the index's underlying Storage implementation.

View File

@ -109,6 +109,6 @@ type Interface interface {
// known to the indexer (which may be a subset of all total
// blobs, since the indexer is typically configured to not see
// non-metadata blobs) and then closes ch. When it returns an
// error, it also closes ch.
// error, it also closes ch. The blobs may be sent in any order.
EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) error
}

View File

@ -38,6 +38,7 @@ import (
"camlistore.org/pkg/magic"
"camlistore.org/pkg/schema"
"camlistore.org/pkg/types"
"camlistore.org/pkg/types/camtypes"
"camlistore.org/third_party/taglib"
)
@ -94,6 +95,16 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
return retsb, err
}
if c := ix.corpus; c != nil {
c.mu.Lock()
defer c.mu.Unlock()
c.blobs[blobRef] = camtypes.BlobMeta{
Ref: blobRef,
Size: int(written),
CamliType: sniffer.CamliType(),
}
}
// TODO(bradfitz): log levels? These are generally noisy
// (especially in tests, like search/handler_test), but I
// could see it being useful in production. For now, disabled:

View File

@ -28,10 +28,11 @@ import (
type BlobSniffer struct {
br blob.Ref
header []byte
written int64
meta *schema.Blob // or nil
mimeType string
header []byte
written int64
meta *schema.Blob // or nil
mimeType string
camliType string
}
func NewBlobSniffer(ref blob.Ref) *BlobSniffer {
@ -80,9 +81,12 @@ func (sn *BlobSniffer) Body() ([]byte, error) {
// the form "application/json; camliType=foo".
func (sn *BlobSniffer) MIMEType() string { return sn.mimeType }
func (sn *BlobSniffer) CamliType() string { return sn.mimeType }
func (sn *BlobSniffer) Parse() {
if sn.bufferIsCamliJSON() {
sn.mimeType = "application/json; camliType=" + sn.meta.Type()
sn.camliType = sn.meta.Type()
sn.mimeType = "application/json; camliType=" + sn.camliType
} else {
sn.mimeType = magic.MIMEType(sn.header)
}

View File

@ -11,8 +11,24 @@ import (
. "camlistore.org/pkg/search"
)
// indexType is one of the three ways we test the query handler code.
type indexType int
const (
indexClassic indexType = iota // sorted key/value pairs from index.Storage
indexCorpusScan // *Corpus scanned from key/value pairs on start
indexCorpusBuild // empty *Corpus, built iteratively as blob received.
)
type queryTest struct {
t *testing.T
id *indextest.IndexDeps
Handler func() *Handler
}
func querySetup(t *testing.T) (*indextest.IndexDeps, *Handler) {
idx := index.NewMemoryIndex()
idx := index.NewMemoryIndex() // string key-value pairs in memory, as if they were on disk
id := indextest.NewIndexDeps(idx)
id.Fataler = t
h := NewHandler(idx, id.SignerBlobRef)
@ -26,6 +42,10 @@ func dumpRes(t *testing.T, res *SearchResult) {
}
}
func (qt *queryTest) wantRes(res *SearchResult, wanted ...blob.Ref) {
wantRes(qt.t, res, wanted...)
}
func wantRes(t *testing.T, res *SearchResult, wanted ...blob.Ref) {
need := make(map[blob.Ref]bool)
for _, br := range wanted {
@ -78,21 +98,45 @@ func TestQueryCamliType(t *testing.T) {
wantRes(t, sres, fileRef)
}
func TestQueryAnyCamliType(t *testing.T) {
id, h := querySetup(t)
func testQuery(t *testing.T, fn func(*queryTest), itype indexType) {
idx := index.NewMemoryIndex() // string key-value pairs in memory, as if they were on disk
if itype == indexCorpusBuild {
if _, err := idx.KeepInMemory(); err != nil {
t.Fatal(err)
}
}
qt := &queryTest{
t: t,
id: indextest.NewIndexDeps(idx),
}
qt.id.Fataler = t
qt.Handler = func() *Handler {
if itype == indexCorpusScan {
if _, err := idx.KeepInMemory(); err != nil {
t.Fatal(err)
}
}
return NewHandler(idx, qt.id.SignerBlobRef)
}
fn(qt)
}
fileRef, _ := id.UploadFile("file.txt", "foo", time.Unix(1382073153, 0))
func TestQueryAnyCamliType(t *testing.T) { testQuery(t, testQueryAnyCamliType, indexClassic) }
func TestQueryAnyCamliType_Scan(t *testing.T) { testQuery(t, testQueryAnyCamliType, indexCorpusScan) }
func TestQueryAnyCamliType_Build(t *testing.T) { testQuery(t, testQueryAnyCamliType, indexCorpusBuild) }
func testQueryAnyCamliType(qt *queryTest) {
fileRef, _ := qt.id.UploadFile("file.txt", "foo", time.Unix(1382073153, 0))
sq := &SearchQuery{
Constraint: &Constraint{
AnyCamliType: true,
},
}
sres, err := h.Query(sq)
sres, err := qt.Handler().Query(sq)
if err != nil {
t.Fatal(err)
qt.t.Fatal(err)
}
wantRes(t, sres, fileRef)
qt.wantRes(sres, fileRef)
}
func TestQueryBlobSize(t *testing.T) {