From 2984897ac7204b1b16f9958f3d6b1f0d32d1f016 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 16 Nov 2013 17:24:02 -0800 Subject: [PATCH] search: more in-memory search work. keep blob metadata in memory, and start of testing all search queries in three modes: classic index.Storage scanning, all in-memory with corpus scanned from the index.Storage, and the in-memory corpus built up over time as blobs arrive. Change-Id: I40536e498a63bece5bd4897cdbbd0cef78085f44 --- pkg/index/corpus.go | 22 ++++++++++++--- pkg/index/index.go | 44 +++++++++++++++++++++--------- pkg/index/interface.go | 2 +- pkg/index/receive.go | 11 ++++++++ pkg/index/sniff.go | 14 ++++++---- pkg/search/query_test.go | 58 +++++++++++++++++++++++++++++++++++----- 6 files changed, 123 insertions(+), 28 deletions(-) diff --git a/pkg/index/corpus.go b/pkg/index/corpus.go index f455d64fb..9beab9016 100644 --- a/pkg/index/corpus.go +++ b/pkg/index/corpus.go @@ -2,7 +2,6 @@ package index import ( "errors" - "log" "sync" "camlistore.org/pkg/blob" @@ -47,8 +46,15 @@ func NewCorpusFromStorage(s Storage) (*Corpus, error) { return nil, errors.New("storage is nil") } c := newCorpus() - // TODO: slurp from storage - log.Printf("TODO: NewCorpusFromStorage should slurp from storage") + err := enumerateBlobMeta(s, func(bm camtypes.BlobMeta) error { + c.blobs[bm.Ref] = bm + // TODO: populate blobref intern table + return nil + }) + if err != nil { + return nil, err + } + // TODO: slurp more from storage return c, nil } @@ -57,3 +63,13 @@ func (x *Index) KeepInMemory() (*Corpus, error) { x.corpus, err = NewCorpusFromStorage(x.s) return x.corpus, err } + +func (c *Corpus) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) error { + defer close(ch) + c.mu.RLock() + defer c.mu.RUnlock() + for _, bm := range c.blobs { + ch <- bm + } + return nil +} diff --git a/pkg/index/index.go b/pkg/index/index.go index f2884f3e3..32384fbea 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -195,15 +195,23 @@ func (p *prefixIter) Next() bool { return v } -func (x *Index) queryPrefix(key *keyType, args ...interface{}) *prefixIter { - return x.queryPrefixString(key.Prefix(args...)) +func queryPrefixString(s Storage, prefix string) *prefixIter { + return &prefixIter{ + prefix: prefix, + Iterator: s.Find(prefix), + } } func (x *Index) queryPrefixString(prefix string) *prefixIter { - return &prefixIter{ - prefix: prefix, - Iterator: x.s.Find(prefix), - } + return queryPrefixString(x.s, prefix) +} + +func queryPrefix(s Storage, key *keyType, args ...interface{}) *prefixIter { + return queryPrefixString(s, key.Prefix(args...)) +} + +func (x *Index) queryPrefix(key *keyType, args ...interface{}) *prefixIter { + return x.queryPrefixString(key.Prefix(args...)) } func closeIterator(it Iterator, perr *error) { @@ -922,10 +930,8 @@ func (x *Index) GetDirMembers(dir blob.Ref, dest chan<- blob.Ref, limit int) (er return nil } -// EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch. -func (x *Index) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) (err error) { - defer close(ch) - it := x.queryPrefixString("meta:") +func enumerateBlobMeta(s Storage, cb func(camtypes.BlobMeta) error) (err error) { + it := queryPrefixString(s, "meta:") defer closeIterator(it, &err) for it.Next() { refStr := strings.TrimPrefix(it.Key(), "meta:") @@ -942,13 +948,27 @@ func (x *Index) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) (err error) { if err != nil { continue } - ch <- camtypes.BlobMeta{ + if err := cb(camtypes.BlobMeta{ Ref: br, Size: size, CamliType: camliTypeFromMIME(v[pipe+1:]), + }); err != nil { + return err } } - return err + return nil +} + +// EnumerateBlobMeta sends all metadata about all known blobs to ch and then closes ch. +func (x *Index) EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) (err error) { + if x.corpus != nil { + return x.corpus.EnumerateBlobMeta(ch) + } + defer close(ch) + return enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error { + ch <- bm + return nil + }) } // Storage returns the index's underlying Storage implementation. diff --git a/pkg/index/interface.go b/pkg/index/interface.go index cd90b79b6..c945bc427 100644 --- a/pkg/index/interface.go +++ b/pkg/index/interface.go @@ -109,6 +109,6 @@ type Interface interface { // known to the indexer (which may be a subset of all total // blobs, since the indexer is typically configured to not see // non-metadata blobs) and then closes ch. When it returns an - // error, it also closes ch. + // error, it also closes ch. The blobs may be sent in any order. EnumerateBlobMeta(ch chan<- camtypes.BlobMeta) error } diff --git a/pkg/index/receive.go b/pkg/index/receive.go index 9b2e0586b..eb104ffea 100644 --- a/pkg/index/receive.go +++ b/pkg/index/receive.go @@ -38,6 +38,7 @@ import ( "camlistore.org/pkg/magic" "camlistore.org/pkg/schema" "camlistore.org/pkg/types" + "camlistore.org/pkg/types/camtypes" "camlistore.org/third_party/taglib" ) @@ -94,6 +95,16 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz return retsb, err } + if c := ix.corpus; c != nil { + c.mu.Lock() + defer c.mu.Unlock() + c.blobs[blobRef] = camtypes.BlobMeta{ + Ref: blobRef, + Size: int(written), + CamliType: sniffer.CamliType(), + } + } + // TODO(bradfitz): log levels? These are generally noisy // (especially in tests, like search/handler_test), but I // could see it being useful in production. For now, disabled: diff --git a/pkg/index/sniff.go b/pkg/index/sniff.go index f20734e90..071768685 100644 --- a/pkg/index/sniff.go +++ b/pkg/index/sniff.go @@ -28,10 +28,11 @@ import ( type BlobSniffer struct { br blob.Ref - header []byte - written int64 - meta *schema.Blob // or nil - mimeType string + header []byte + written int64 + meta *schema.Blob // or nil + mimeType string + camliType string } func NewBlobSniffer(ref blob.Ref) *BlobSniffer { @@ -80,9 +81,12 @@ func (sn *BlobSniffer) Body() ([]byte, error) { // the form "application/json; camliType=foo". func (sn *BlobSniffer) MIMEType() string { return sn.mimeType } +func (sn *BlobSniffer) CamliType() string { return sn.mimeType } + func (sn *BlobSniffer) Parse() { if sn.bufferIsCamliJSON() { - sn.mimeType = "application/json; camliType=" + sn.meta.Type() + sn.camliType = sn.meta.Type() + sn.mimeType = "application/json; camliType=" + sn.camliType } else { sn.mimeType = magic.MIMEType(sn.header) } diff --git a/pkg/search/query_test.go b/pkg/search/query_test.go index 397fe52c4..71cd68ec2 100644 --- a/pkg/search/query_test.go +++ b/pkg/search/query_test.go @@ -11,8 +11,24 @@ import ( . "camlistore.org/pkg/search" ) +// indexType is one of the three ways we test the query handler code. +type indexType int + +const ( + indexClassic indexType = iota // sorted key/value pairs from index.Storage + indexCorpusScan // *Corpus scanned from key/value pairs on start + indexCorpusBuild // empty *Corpus, built iteratively as blob received. +) + +type queryTest struct { + t *testing.T + id *indextest.IndexDeps + + Handler func() *Handler +} + func querySetup(t *testing.T) (*indextest.IndexDeps, *Handler) { - idx := index.NewMemoryIndex() + idx := index.NewMemoryIndex() // string key-value pairs in memory, as if they were on disk id := indextest.NewIndexDeps(idx) id.Fataler = t h := NewHandler(idx, id.SignerBlobRef) @@ -26,6 +42,10 @@ func dumpRes(t *testing.T, res *SearchResult) { } } +func (qt *queryTest) wantRes(res *SearchResult, wanted ...blob.Ref) { + wantRes(qt.t, res, wanted...) +} + func wantRes(t *testing.T, res *SearchResult, wanted ...blob.Ref) { need := make(map[blob.Ref]bool) for _, br := range wanted { @@ -78,21 +98,45 @@ func TestQueryCamliType(t *testing.T) { wantRes(t, sres, fileRef) } -func TestQueryAnyCamliType(t *testing.T) { - id, h := querySetup(t) +func testQuery(t *testing.T, fn func(*queryTest), itype indexType) { + idx := index.NewMemoryIndex() // string key-value pairs in memory, as if they were on disk + if itype == indexCorpusBuild { + if _, err := idx.KeepInMemory(); err != nil { + t.Fatal(err) + } + } + qt := &queryTest{ + t: t, + id: indextest.NewIndexDeps(idx), + } + qt.id.Fataler = t + qt.Handler = func() *Handler { + if itype == indexCorpusScan { + if _, err := idx.KeepInMemory(); err != nil { + t.Fatal(err) + } + } + return NewHandler(idx, qt.id.SignerBlobRef) + } + fn(qt) +} - fileRef, _ := id.UploadFile("file.txt", "foo", time.Unix(1382073153, 0)) +func TestQueryAnyCamliType(t *testing.T) { testQuery(t, testQueryAnyCamliType, indexClassic) } +func TestQueryAnyCamliType_Scan(t *testing.T) { testQuery(t, testQueryAnyCamliType, indexCorpusScan) } +func TestQueryAnyCamliType_Build(t *testing.T) { testQuery(t, testQueryAnyCamliType, indexCorpusBuild) } +func testQueryAnyCamliType(qt *queryTest) { + fileRef, _ := qt.id.UploadFile("file.txt", "foo", time.Unix(1382073153, 0)) sq := &SearchQuery{ Constraint: &Constraint{ AnyCamliType: true, }, } - sres, err := h.Query(sq) + sres, err := qt.Handler().Query(sq) if err != nil { - t.Fatal(err) + qt.t.Fatal(err) } - wantRes(t, sres, fileRef) + qt.wantRes(sres, fileRef) } func TestQueryBlobSize(t *testing.T) {