From fcbbf2a4df0194513630427af8aacd9b4de85dfe Mon Sep 17 00:00:00 2001 From: mpl Date: Thu, 12 Dec 2013 18:08:28 +0100 Subject: [PATCH] index: move mongo implementation to sorted Also devcam server -wipe wasn't wiping for mongo anymore, now fixed. Change-Id: Iecc9d8025ddfba8d8ae9417ee170baf02be1d52f --- dev/devcam/server.go | 5 +- pkg/index/mongo/export_test.go | 36 ---- pkg/index/mongo/mongoindex.go | 261 ++------------------------ pkg/index/mongo/mongoindex_test.go | 66 ++++--- pkg/sorted/kv.go | 5 + pkg/sorted/mem.go | 2 +- pkg/sorted/mongo/mongokv.go | 287 +++++++++++++++++++++++++++++ 7 files changed, 353 insertions(+), 309 deletions(-) delete mode 100644 pkg/index/mongo/export_test.go create mode 100644 pkg/sorted/mongo/mongokv.go diff --git a/dev/devcam/server.go b/dev/devcam/server.go index e705a7b67..ff79cf867 100644 --- a/dev/devcam/server.go +++ b/dev/devcam/server.go @@ -275,10 +275,13 @@ func (c *serverCmd) setupIndexer() error { args = append(args, "-dbtype=sqlite", "-dbname="+c.env.m["CAMLI_DBNAME"]) + case c.mongo: + // TODO(mpl): hack. add mongo support to dbinit instead. + c.env.Set("CAMLI_MONGO_WIPE", "true") + fallthrough default: return nil } - // TODO(mpl): I think we're forgetting to wipe mongo here. if c.wipe { args = append(args, "-wipe") } else { diff --git a/pkg/index/mongo/export_test.go b/pkg/index/mongo/export_test.go deleted file mode 100644 index 9cc70f260..000000000 --- a/pkg/index/mongo/export_test.go +++ /dev/null @@ -1,36 +0,0 @@ -/* -Copyright 2012 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mongo - -import ( - "camlistore.org/pkg/index" -) - -func NewMongoIndex(mgw *MongoWrapper) (*index.Index, error) { - return newMongoIndex(mgw) -} - -// AddUser creates a new user in mgw.Database so the mongo indexer -// tests can be run as authenticated with this user. -func AddUser(mgw *MongoWrapper, user, password string) error { - ses, err := mgw.getConnection() - if err != nil { - return err - } - defer ses.Close() - return ses.DB(mgw.Database).AddUser(user, password, false) -} diff --git a/pkg/index/mongo/mongoindex.go b/pkg/index/mongo/mongoindex.go index 32cddabac..8102aeb09 100644 --- a/pkg/index/mongo/mongoindex.go +++ b/pkg/index/mongo/mongoindex.go @@ -19,276 +19,49 @@ limitations under the License. package mongo import ( - "errors" "os" "strconv" - "strings" - "sync" - "time" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/index" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/sorted" - - "camlistore.org/third_party/labix.org/v2/mgo" - "camlistore.org/third_party/labix.org/v2/mgo/bson" + "camlistore.org/pkg/sorted/mongo" ) -// We explicitely separate the key and the value in a document, -// instead of simply storing as key:value, to avoid problems -// such as "." being an illegal char in a key name. Also because -// there is no way to do partial matching for key names (one can -// only check for their existence with bson.M{$exists: true}). -const ( - collectionName = "keys" - mgoKey = "k" - mgoValue = "v" -) - -type MongoWrapper struct { - Servers string - User string - Password string - Database string - Collection string -} - -func (mgw *MongoWrapper) url() string { - if mgw.User == "" || mgw.Password == "" { - return mgw.Servers - } - return mgw.User + ":" + mgw.Password + "@" + mgw.Servers + "/" + mgw.Database -} - -// Note that Ping won't work with old (1.2) mongo servers. -func (mgw *MongoWrapper) TestConnection(timeout time.Duration) bool { - session, err := mgo.DialWithTimeout(mgw.url(), timeout) - if err != nil { - return false - } - defer session.Close() - session.SetSyncTimeout(timeout) - if err = session.Ping(); err != nil { - return false - } - return true -} - -func (mgw *MongoWrapper) getConnection() (*mgo.Session, error) { - // TODO(mpl): do some "client caching" as in mysql, to avoid systematically dialing? - session, err := mgo.Dial(mgw.url()) - if err != nil { - return nil, err - } - session.SetMode(mgo.Monotonic, true) - session.SetSafe(&mgo.Safe{}) - return session, nil -} - -// TODO(mpl): I'm only calling getCollection at the beginning, and -// keeping the collection around and reusing it everywhere, instead -// of calling getCollection everytime, because that's the easiest. -// But I can easily change that. Gustavo says it does not make -// much difference either way. -// Brad, what do you think? -func (mgw *MongoWrapper) getCollection() (*mgo.Collection, error) { - session, err := mgw.getConnection() - if err != nil { - return nil, err - } - session.SetSafe(&mgo.Safe{}) - session.SetMode(mgo.Strong, true) - c := session.DB(mgw.Database).C(mgw.Collection) - return c, nil -} - func init() { blobserver.RegisterStorageConstructor("mongodbindexer", - blobserver.StorageConstructor(newMongoIndexFromConfig)) + blobserver.StorageConstructor(newFromConfig)) } -func newMongoIndex(mgw *MongoWrapper) (*index.Index, error) { - db, err := mgw.getCollection() +func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { + blobPrefix := config.RequiredString("blobSource") + kv, err := mongo.NewKeyValue(config) if err != nil { return nil, err } - mongoStorage := &mongoKeys{db: db} - return index.New(mongoStorage), nil -} - -func newMongoIndexFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { - blobPrefix := config.RequiredString("blobSource") - mgw := &MongoWrapper{ - Servers: config.OptionalString("host", "localhost"), - Database: config.RequiredString("database"), - User: config.OptionalString("user", ""), - Password: config.OptionalString("password", ""), - Collection: collectionName, - } - if err := config.Validate(); err != nil { - return nil, err + // TODO(mpl): hack. remove once dbinit supports mongo. + // https://camlistore-review.googlesource.com/1427 + if wipe, _ := strconv.ParseBool(os.Getenv("CAMLI_MONGO_WIPE")); wipe { + wiper, ok := kv.(sorted.Wiper) + if !ok { + panic("mongo KeyValue not a Wiper") + } + err = wiper.Wipe() + if err != nil { + return nil, err + } } + ix := index.New(kv) sto, err := ld.GetStorage(blobPrefix) if err != nil { return nil, err } - ix, err := newMongoIndex(mgw) - if err != nil { - return nil, err - } ix.BlobSource = sto // Good enough, for now: ix.KeyFetcher = ix.BlobSource - if wipe, _ := strconv.ParseBool(os.Getenv("CAMLI_MONGO_WIPE")); wipe { - err = ix.Storage().Delete("") - if err != nil { - return nil, err - } - } - return ix, err } - -// Implementation of index Iterator -type mongoStrIterator struct { - res bson.M - *mgo.Iter - end string -} - -func (s *mongoStrIterator) Next() bool { - if !s.Iter.Next(&s.res) { - return false - } - if s.end != "" { - if key, ok := (s.res[mgoKey]).(string); !ok || key >= s.end { - return false - } - } - return true -} - -func (s *mongoStrIterator) Key() string { - key, ok := (s.res[mgoKey]).(string) - if !ok { - return "" - } - return key -} - -func (s *mongoStrIterator) KeyBytes() []byte { - // TODO(bradfitz,mpl): this is less efficient than the string way. we should - // do better here, somehow, like all the other sorted.KeyValue iterators. - // For now: - return []byte(s.Key()) -} - -func (s *mongoStrIterator) Value() string { - value, ok := (s.res[mgoValue]).(string) - if !ok { - return "" - } - return value -} - -func (s *mongoStrIterator) ValueBytes() []byte { - // TODO(bradfitz,mpl): this is less efficient than the string way. we should - // do better here, somehow, like all the other sorted.KeyValue iterators. - // For now: - return []byte(s.Value()) -} - -func (s *mongoStrIterator) Close() error { - // TODO(mpl): think about anything more to be done here. - return nil -} - -// Implementation of sorted.KeyValue -type mongoKeys struct { - mu sync.Mutex // guards db - db *mgo.Collection -} - -func (mk *mongoKeys) Get(key string) (string, error) { - mk.mu.Lock() - defer mk.mu.Unlock() - res := bson.M{} - q := mk.db.Find(&bson.M{mgoKey: key}) - err := q.One(&res) - if err != nil { - if err == mgo.ErrNotFound { - return "", sorted.ErrNotFound - } else { - return "", err - } - } - return res[mgoValue].(string), err -} - -func (mk *mongoKeys) Find(start, end string) sorted.Iterator { - mk.mu.Lock() - defer mk.mu.Unlock() - // TODO(mpl): escape other special chars, or maybe replace $regex with something - // more suited if possible. - cleanedStart := strings.Replace(start, "|", `\|`, -1) - iter := mk.db.Find(&bson.M{mgoKey: &bson.M{"$regex": "^" + cleanedStart}}).Sort(mgoKey).Iter() - return &mongoStrIterator{res: bson.M{}, Iter: iter, end: end} -} - -func (mk *mongoKeys) Set(key, value string) error { - mk.mu.Lock() - defer mk.mu.Unlock() - _, err := mk.db.Upsert(&bson.M{mgoKey: key}, &bson.M{mgoKey: key, mgoValue: value}) - return err -} - -// Delete removes the document with the matching key. -// If key is "", it removes all documents. -func (mk *mongoKeys) Delete(key string) error { - mk.mu.Lock() - defer mk.mu.Unlock() - if key == "" { - _, err := mk.db.RemoveAll(nil) - return err - } - return mk.db.Remove(&bson.M{mgoKey: key}) -} - -func (mk *mongoKeys) BeginBatch() sorted.BatchMutation { - return sorted.NewBatchMutation() -} - -type batch interface { - Mutations() []sorted.Mutation -} - -func (mk *mongoKeys) CommitBatch(bm sorted.BatchMutation) error { - b, ok := bm.(batch) - if !ok { - return errors.New("invalid batch type") - } - - mk.mu.Lock() - defer mk.mu.Unlock() - for _, m := range b.Mutations() { - if m.IsDelete() { - if err := mk.db.Remove(bson.M{mgoKey: m.Key()}); err != nil { - return err - } - } else { - if _, err := mk.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil { - return err - } - } - } - return nil -} - -func (mk *mongoKeys) Close() error { - // TODO(mpl): Close the Session? Connection? - return nil -} diff --git a/pkg/index/mongo/mongoindex_test.go b/pkg/index/mongo/mongoindex_test.go index 809a89ec7..a61a0a0c7 100644 --- a/pkg/index/mongo/mongoindex_test.go +++ b/pkg/index/mongo/mongoindex_test.go @@ -24,7 +24,10 @@ import ( "camlistore.org/pkg/index" "camlistore.org/pkg/index/indextest" - "camlistore.org/pkg/index/mongo" + "camlistore.org/pkg/jsonconfig" + "camlistore.org/pkg/sorted" + "camlistore.org/pkg/sorted/kvtest" + "camlistore.org/pkg/sorted/mongo" "camlistore.org/pkg/test" ) @@ -34,40 +37,37 @@ var ( ) func checkMongoUp() { - mgw := &mongo.MongoWrapper{ - Servers: "localhost", - } - mongoNotAvailable = !mgw.TestConnection(500 * time.Millisecond) + mongoNotAvailable = !mongo.Ping("localhost", 500*time.Millisecond) } -func initMongoIndex() *index.Index { +func newSorted(t *testing.T) (kv sorted.KeyValue, cleanup func()) { // connect without credentials and wipe the database - mgw := &mongo.MongoWrapper{ - Servers: "localhost", - Database: "camlitest", - Collection: "keys", + cfg := jsonconfig.Obj{ + "host": "localhost", + "database": "camlitest", } - idx, err := mongo.NewMongoIndex(mgw) + var err error + kv, err = mongo.NewKeyValue(cfg) if err != nil { - panic(err) + t.Fatal(err) } - err = idx.Storage().Delete("") + wiper, ok := kv.(sorted.Wiper) + if !ok { + panic("mongo KeyValue not a Wiper") + } + err = wiper.Wipe() if err != nil { - panic(err) + t.Fatal(err) } - // create user and connect with credentials - err = mongo.AddUser(mgw, "root", "root") - if err != nil { - panic(err) + return kv, func() { + kv.Close() } - mgw = &mongo.MongoWrapper{ - Servers: "localhost", - Database: "camlitest", - Collection: "keys", - User: "root", - Password: "root", - } - return idx +} + +func TestSortedKV(t *testing.T) { + kv, cleanup := newSorted(t) + defer cleanup() + kvtest.TestSorted(t, kv) } type mongoTester struct{} @@ -79,7 +79,19 @@ func (mongoTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) test.DependencyErrorOrSkip(t) t.Fatalf("Mongo not available locally for testing: %v", err) } - tfn(t, initMongoIndex) + defer test.TLog(t)() + var cleanups []func() + defer func() { + for _, fn := range cleanups { + fn() + } + }() + initIndex := func() *index.Index { + kv, cleanup := newSorted(t) + cleanups = append(cleanups, cleanup) + return index.New(kv) + } + tfn(t, initIndex) } func TestIndex_Mongo(t *testing.T) { diff --git a/pkg/sorted/kv.go b/pkg/sorted/kv.go index 37b272bd8..57cc8d927 100644 --- a/pkg/sorted/kv.go +++ b/pkg/sorted/kv.go @@ -62,6 +62,11 @@ type KeyValue interface { Close() error } +type Wiper interface { + // Wipe removes all key/value pairs. + Wipe() error +} + // Iterator iterates over an index KeyValue's key/value pairs in key order. // // An iterator must be closed after use, but it is not necessary to read an diff --git a/pkg/sorted/mem.go b/pkg/sorted/mem.go index a70e36af9..4c14f9cb3 100644 --- a/pkg/sorted/mem.go +++ b/pkg/sorted/mem.go @@ -33,7 +33,7 @@ func NewMemoryKeyValue() KeyValue { return &memKeys{db: db} } -// memKeys is a naive in-memory implementation of index.Storage for test & development +// memKeys is a naive in-memory implementation of KeyValue for test & development // purposes only. type memKeys struct { mu sync.Mutex // guards db diff --git a/pkg/sorted/mongo/mongokv.go b/pkg/sorted/mongo/mongokv.go new file mode 100644 index 000000000..58c1cd8a9 --- /dev/null +++ b/pkg/sorted/mongo/mongokv.go @@ -0,0 +1,287 @@ +/* +Copyright 2013 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mongo provides an implementation of sorted.KeyValue +// using MongoDB. +package mongo + +import ( + "bytes" + "errors" + "sync" + "time" + + "camlistore.org/pkg/jsonconfig" + "camlistore.org/pkg/sorted" + + "camlistore.org/third_party/labix.org/v2/mgo" + "camlistore.org/third_party/labix.org/v2/mgo/bson" +) + +// We explicitely separate the key and the value in a document, +// instead of simply storing as key:value, to avoid problems +// such as "." being an illegal char in a key name. Also because +// there is no way to do partial matching for key names (one can +// only check for their existence with bson.M{$exists: true}). +const ( + collectionName = "keys" + mgoKey = "k" + mgoValue = "v" +) + +func init() { + sorted.RegisterKeyValue("mongo", NewKeyValue) +} + +// TODO(mpl): automatically derive the keys doc from something else. +// a JSON annotated struct? maybe from instance. +// maybe something in common with pkg/serverconfig/genconfig.go + +// NewKeyValue returns a KeyValue implementation on top of MongoDB. +// cfg contains these keys: +// "host", optional string +// "database", string +// "user", optional string +// "password", optional string +func NewKeyValue(cfg jsonconfig.Obj) (sorted.KeyValue, error) { + ins := &instance{ + Servers: cfg.OptionalString("host", "localhost"), + Database: cfg.RequiredString("database"), + User: cfg.OptionalString("user", ""), + Password: cfg.OptionalString("password", ""), + Collection: collectionName, + } + if err := cfg.Validate(); err != nil { + return nil, err + } + db, err := ins.getCollection() + if err != nil { + return nil, err + } + return &keyValue{db: db, session: ins.session}, nil +} + +// Implementation of Iterator +type iter struct { + res bson.M + *mgo.Iter + end []byte +} + +func (it *iter) Next() bool { + if !it.Iter.Next(&it.res) { + return false + } + if len(it.end) > 0 && bytes.Compare(it.KeyBytes(), it.end) >= 0 { + return false + } + return true +} + +func (it *iter) Key() string { + key, ok := (it.res[mgoKey]).(string) + if !ok { + return "" + } + return key +} + +func (it *iter) KeyBytes() []byte { + // TODO(bradfitz,mpl): this is less efficient than the string way. we should + // do better here, somehow, like all the other KeyValue iterators. + // For now: + return []byte(it.Key()) +} + +func (it *iter) Value() string { + value, ok := (it.res[mgoValue]).(string) + if !ok { + return "" + } + return value +} + +func (it *iter) ValueBytes() []byte { + // TODO(bradfitz,mpl): this is less efficient than the string way. we should + // do better here, somehow, like all the other KeyValue iterators. + // For now: + return []byte(it.Value()) +} + +func (it *iter) Close() error { + // TODO(mpl): update mongo in 3rd party and return it.Iter.Close() + return nil +} + +// Implementation of KeyValue +type keyValue struct { + session *mgo.Session // so we can close it + mu sync.Mutex // guards db + db *mgo.Collection +} + +func (kv *keyValue) Get(key string) (string, error) { + kv.mu.Lock() + defer kv.mu.Unlock() + res := bson.M{} + q := kv.db.Find(&bson.M{mgoKey: key}) + err := q.One(&res) + if err != nil { + if err == mgo.ErrNotFound { + return "", sorted.ErrNotFound + } else { + return "", err + } + } + return res[mgoValue].(string), err +} + +func (kv *keyValue) Find(start, end string) sorted.Iterator { + kv.mu.Lock() + defer kv.mu.Unlock() + it := kv.db.Find(&bson.M{mgoKey: &bson.M{"$gte": start}}).Sort(mgoKey).Iter() + return &iter{res: bson.M{}, Iter: it, end: []byte(end)} +} + +func (kv *keyValue) Set(key, value string) error { + kv.mu.Lock() + defer kv.mu.Unlock() + _, err := kv.db.Upsert(&bson.M{mgoKey: key}, &bson.M{mgoKey: key, mgoValue: value}) + return err +} + +// Delete removes the document with the matching key. +func (kv *keyValue) Delete(key string) error { + kv.mu.Lock() + defer kv.mu.Unlock() + err := kv.db.Remove(&bson.M{mgoKey: key}) + if err == mgo.ErrNotFound { + return nil + } + return err +} + +// Wipe removes all documents from the collection. +func (kv *keyValue) Wipe() error { + kv.mu.Lock() + defer kv.mu.Unlock() + _, err := kv.db.RemoveAll(nil) + return err +} + +type batch interface { + Mutations() []sorted.Mutation +} + +func (kv *keyValue) BeginBatch() sorted.BatchMutation { + return sorted.NewBatchMutation() +} + +func (kv *keyValue) CommitBatch(bm sorted.BatchMutation) error { + b, ok := bm.(batch) + if !ok { + return errors.New("invalid batch type") + } + + kv.mu.Lock() + defer kv.mu.Unlock() + for _, m := range b.Mutations() { + if m.IsDelete() { + if err := kv.db.Remove(bson.M{mgoKey: m.Key()}); err != nil { + return err + } + } else { + if _, err := kv.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil { + return err + } + } + } + return nil +} + +func (kv *keyValue) Close() error { + kv.session.Close() + return nil +} + +// Ping tests if MongoDB on host can be dialed. +func Ping(host string, timeout time.Duration) bool { + return (&instance{Servers: host}).ping(timeout) +} + +// instance helps with the low level details about +// the connection to MongoDB. +type instance struct { + Servers string + User string + Password string + Database string + Collection string + session *mgo.Session +} + +func (ins *instance) url() string { + if ins.User == "" || ins.Password == "" { + return ins.Servers + } + return ins.User + ":" + ins.Password + "@" + ins.Servers + "/" + ins.Database +} + +// ping won't work with old (1.2) mongo servers. +func (ins *instance) ping(timeout time.Duration) bool { + session, err := mgo.DialWithTimeout(ins.url(), timeout) + if err != nil { + return false + } + defer session.Close() + session.SetSyncTimeout(timeout) + if err = session.Ping(); err != nil { + return false + } + return true +} + +func (ins *instance) getConnection() (*mgo.Session, error) { + if ins.session != nil { + return ins.session, nil + } + // TODO(mpl): do some "client caching" as in mysql, to avoid systematically dialing? + session, err := mgo.Dial(ins.url()) + if err != nil { + return nil, err + } + session.SetMode(mgo.Monotonic, true) + session.SetSafe(&mgo.Safe{}) // so we get an ErrNotFound error when deleting an absent key + ins.session = session + return session, nil +} + +// TODO(mpl): I'm only calling getCollection at the beginning, and +// keeping the collection around and reusing it everywhere, instead +// of calling getCollection everytime, because that's the easiest. +// But I can easily change that. Gustavo says it does not make +// much difference either way. +// Brad, what do you think? +func (ins *instance) getCollection() (*mgo.Collection, error) { + session, err := ins.getConnection() + if err != nil { + return nil, err + } + session.SetSafe(&mgo.Safe{}) + session.SetMode(mgo.Strong, true) + c := session.DB(ins.Database).C(ins.Collection) + return c, nil +}