index: move mongo implementation to sorted

Also devcam server -wipe wasn't wiping for mongo anymore, now fixed.

Change-Id: Iecc9d8025ddfba8d8ae9417ee170baf02be1d52f
This commit is contained in:
mpl 2013-12-12 18:08:28 +01:00
parent 22101b61d8
commit fcbbf2a4df
7 changed files with 353 additions and 309 deletions

View File

@ -275,10 +275,13 @@ func (c *serverCmd) setupIndexer() error {
args = append(args,
"-dbtype=sqlite",
"-dbname="+c.env.m["CAMLI_DBNAME"])
case c.mongo:
// TODO(mpl): hack. add mongo support to dbinit instead.
c.env.Set("CAMLI_MONGO_WIPE", "true")
fallthrough
default:
return nil
}
// TODO(mpl): I think we're forgetting to wipe mongo here.
if c.wipe {
args = append(args, "-wipe")
} else {

View File

@ -1,36 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mongo
import (
"camlistore.org/pkg/index"
)
func NewMongoIndex(mgw *MongoWrapper) (*index.Index, error) {
return newMongoIndex(mgw)
}
// AddUser creates a new user in mgw.Database so the mongo indexer
// tests can be run as authenticated with this user.
func AddUser(mgw *MongoWrapper, user, password string) error {
ses, err := mgw.getConnection()
if err != nil {
return err
}
defer ses.Close()
return ses.DB(mgw.Database).AddUser(user, password, false)
}

View File

@ -19,276 +19,49 @@ limitations under the License.
package mongo
import (
"errors"
"os"
"strconv"
"strings"
"sync"
"time"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/third_party/labix.org/v2/mgo"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"camlistore.org/pkg/sorted/mongo"
)
// We explicitely separate the key and the value in a document,
// instead of simply storing as key:value, to avoid problems
// such as "." being an illegal char in a key name. Also because
// there is no way to do partial matching for key names (one can
// only check for their existence with bson.M{$exists: true}).
const (
collectionName = "keys"
mgoKey = "k"
mgoValue = "v"
)
type MongoWrapper struct {
Servers string
User string
Password string
Database string
Collection string
}
func (mgw *MongoWrapper) url() string {
if mgw.User == "" || mgw.Password == "" {
return mgw.Servers
}
return mgw.User + ":" + mgw.Password + "@" + mgw.Servers + "/" + mgw.Database
}
// Note that Ping won't work with old (1.2) mongo servers.
func (mgw *MongoWrapper) TestConnection(timeout time.Duration) bool {
session, err := mgo.DialWithTimeout(mgw.url(), timeout)
if err != nil {
return false
}
defer session.Close()
session.SetSyncTimeout(timeout)
if err = session.Ping(); err != nil {
return false
}
return true
}
func (mgw *MongoWrapper) getConnection() (*mgo.Session, error) {
// TODO(mpl): do some "client caching" as in mysql, to avoid systematically dialing?
session, err := mgo.Dial(mgw.url())
if err != nil {
return nil, err
}
session.SetMode(mgo.Monotonic, true)
session.SetSafe(&mgo.Safe{})
return session, nil
}
// TODO(mpl): I'm only calling getCollection at the beginning, and
// keeping the collection around and reusing it everywhere, instead
// of calling getCollection everytime, because that's the easiest.
// But I can easily change that. Gustavo says it does not make
// much difference either way.
// Brad, what do you think?
func (mgw *MongoWrapper) getCollection() (*mgo.Collection, error) {
session, err := mgw.getConnection()
if err != nil {
return nil, err
}
session.SetSafe(&mgo.Safe{})
session.SetMode(mgo.Strong, true)
c := session.DB(mgw.Database).C(mgw.Collection)
return c, nil
}
func init() {
blobserver.RegisterStorageConstructor("mongodbindexer",
blobserver.StorageConstructor(newMongoIndexFromConfig))
blobserver.StorageConstructor(newFromConfig))
}
func newMongoIndex(mgw *MongoWrapper) (*index.Index, error) {
db, err := mgw.getCollection()
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
blobPrefix := config.RequiredString("blobSource")
kv, err := mongo.NewKeyValue(config)
if err != nil {
return nil, err
}
mongoStorage := &mongoKeys{db: db}
return index.New(mongoStorage), nil
}
func newMongoIndexFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
blobPrefix := config.RequiredString("blobSource")
mgw := &MongoWrapper{
Servers: config.OptionalString("host", "localhost"),
Database: config.RequiredString("database"),
User: config.OptionalString("user", ""),
Password: config.OptionalString("password", ""),
Collection: collectionName,
}
if err := config.Validate(); err != nil {
return nil, err
// TODO(mpl): hack. remove once dbinit supports mongo.
// https://camlistore-review.googlesource.com/1427
if wipe, _ := strconv.ParseBool(os.Getenv("CAMLI_MONGO_WIPE")); wipe {
wiper, ok := kv.(sorted.Wiper)
if !ok {
panic("mongo KeyValue not a Wiper")
}
err = wiper.Wipe()
if err != nil {
return nil, err
}
}
ix := index.New(kv)
sto, err := ld.GetStorage(blobPrefix)
if err != nil {
return nil, err
}
ix, err := newMongoIndex(mgw)
if err != nil {
return nil, err
}
ix.BlobSource = sto
// Good enough, for now:
ix.KeyFetcher = ix.BlobSource
if wipe, _ := strconv.ParseBool(os.Getenv("CAMLI_MONGO_WIPE")); wipe {
err = ix.Storage().Delete("")
if err != nil {
return nil, err
}
}
return ix, err
}
// Implementation of index Iterator
type mongoStrIterator struct {
res bson.M
*mgo.Iter
end string
}
func (s *mongoStrIterator) Next() bool {
if !s.Iter.Next(&s.res) {
return false
}
if s.end != "" {
if key, ok := (s.res[mgoKey]).(string); !ok || key >= s.end {
return false
}
}
return true
}
func (s *mongoStrIterator) Key() string {
key, ok := (s.res[mgoKey]).(string)
if !ok {
return ""
}
return key
}
func (s *mongoStrIterator) KeyBytes() []byte {
// TODO(bradfitz,mpl): this is less efficient than the string way. we should
// do better here, somehow, like all the other sorted.KeyValue iterators.
// For now:
return []byte(s.Key())
}
func (s *mongoStrIterator) Value() string {
value, ok := (s.res[mgoValue]).(string)
if !ok {
return ""
}
return value
}
func (s *mongoStrIterator) ValueBytes() []byte {
// TODO(bradfitz,mpl): this is less efficient than the string way. we should
// do better here, somehow, like all the other sorted.KeyValue iterators.
// For now:
return []byte(s.Value())
}
func (s *mongoStrIterator) Close() error {
// TODO(mpl): think about anything more to be done here.
return nil
}
// Implementation of sorted.KeyValue
type mongoKeys struct {
mu sync.Mutex // guards db
db *mgo.Collection
}
func (mk *mongoKeys) Get(key string) (string, error) {
mk.mu.Lock()
defer mk.mu.Unlock()
res := bson.M{}
q := mk.db.Find(&bson.M{mgoKey: key})
err := q.One(&res)
if err != nil {
if err == mgo.ErrNotFound {
return "", sorted.ErrNotFound
} else {
return "", err
}
}
return res[mgoValue].(string), err
}
func (mk *mongoKeys) Find(start, end string) sorted.Iterator {
mk.mu.Lock()
defer mk.mu.Unlock()
// TODO(mpl): escape other special chars, or maybe replace $regex with something
// more suited if possible.
cleanedStart := strings.Replace(start, "|", `\|`, -1)
iter := mk.db.Find(&bson.M{mgoKey: &bson.M{"$regex": "^" + cleanedStart}}).Sort(mgoKey).Iter()
return &mongoStrIterator{res: bson.M{}, Iter: iter, end: end}
}
func (mk *mongoKeys) Set(key, value string) error {
mk.mu.Lock()
defer mk.mu.Unlock()
_, err := mk.db.Upsert(&bson.M{mgoKey: key}, &bson.M{mgoKey: key, mgoValue: value})
return err
}
// Delete removes the document with the matching key.
// If key is "", it removes all documents.
func (mk *mongoKeys) Delete(key string) error {
mk.mu.Lock()
defer mk.mu.Unlock()
if key == "" {
_, err := mk.db.RemoveAll(nil)
return err
}
return mk.db.Remove(&bson.M{mgoKey: key})
}
func (mk *mongoKeys) BeginBatch() sorted.BatchMutation {
return sorted.NewBatchMutation()
}
type batch interface {
Mutations() []sorted.Mutation
}
func (mk *mongoKeys) CommitBatch(bm sorted.BatchMutation) error {
b, ok := bm.(batch)
if !ok {
return errors.New("invalid batch type")
}
mk.mu.Lock()
defer mk.mu.Unlock()
for _, m := range b.Mutations() {
if m.IsDelete() {
if err := mk.db.Remove(bson.M{mgoKey: m.Key()}); err != nil {
return err
}
} else {
if _, err := mk.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil {
return err
}
}
}
return nil
}
func (mk *mongoKeys) Close() error {
// TODO(mpl): Close the Session? Connection?
return nil
}

View File

@ -24,7 +24,10 @@ import (
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/indextest"
"camlistore.org/pkg/index/mongo"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/kvtest"
"camlistore.org/pkg/sorted/mongo"
"camlistore.org/pkg/test"
)
@ -34,40 +37,37 @@ var (
)
func checkMongoUp() {
mgw := &mongo.MongoWrapper{
Servers: "localhost",
}
mongoNotAvailable = !mgw.TestConnection(500 * time.Millisecond)
mongoNotAvailable = !mongo.Ping("localhost", 500*time.Millisecond)
}
func initMongoIndex() *index.Index {
func newSorted(t *testing.T) (kv sorted.KeyValue, cleanup func()) {
// connect without credentials and wipe the database
mgw := &mongo.MongoWrapper{
Servers: "localhost",
Database: "camlitest",
Collection: "keys",
cfg := jsonconfig.Obj{
"host": "localhost",
"database": "camlitest",
}
idx, err := mongo.NewMongoIndex(mgw)
var err error
kv, err = mongo.NewKeyValue(cfg)
if err != nil {
panic(err)
t.Fatal(err)
}
err = idx.Storage().Delete("")
wiper, ok := kv.(sorted.Wiper)
if !ok {
panic("mongo KeyValue not a Wiper")
}
err = wiper.Wipe()
if err != nil {
panic(err)
t.Fatal(err)
}
// create user and connect with credentials
err = mongo.AddUser(mgw, "root", "root")
if err != nil {
panic(err)
return kv, func() {
kv.Close()
}
mgw = &mongo.MongoWrapper{
Servers: "localhost",
Database: "camlitest",
Collection: "keys",
User: "root",
Password: "root",
}
return idx
}
func TestSortedKV(t *testing.T) {
kv, cleanup := newSorted(t)
defer cleanup()
kvtest.TestSorted(t, kv)
}
type mongoTester struct{}
@ -79,7 +79,19 @@ func (mongoTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index))
test.DependencyErrorOrSkip(t)
t.Fatalf("Mongo not available locally for testing: %v", err)
}
tfn(t, initMongoIndex)
defer test.TLog(t)()
var cleanups []func()
defer func() {
for _, fn := range cleanups {
fn()
}
}()
initIndex := func() *index.Index {
kv, cleanup := newSorted(t)
cleanups = append(cleanups, cleanup)
return index.New(kv)
}
tfn(t, initIndex)
}
func TestIndex_Mongo(t *testing.T) {

View File

@ -62,6 +62,11 @@ type KeyValue interface {
Close() error
}
type Wiper interface {
// Wipe removes all key/value pairs.
Wipe() error
}
// Iterator iterates over an index KeyValue's key/value pairs in key order.
//
// An iterator must be closed after use, but it is not necessary to read an

View File

@ -33,7 +33,7 @@ func NewMemoryKeyValue() KeyValue {
return &memKeys{db: db}
}
// memKeys is a naive in-memory implementation of index.Storage for test & development
// memKeys is a naive in-memory implementation of KeyValue for test & development
// purposes only.
type memKeys struct {
mu sync.Mutex // guards db

287
pkg/sorted/mongo/mongokv.go Normal file
View File

@ -0,0 +1,287 @@
/*
Copyright 2013 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package mongo provides an implementation of sorted.KeyValue
// using MongoDB.
package mongo
import (
"bytes"
"errors"
"sync"
"time"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/third_party/labix.org/v2/mgo"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
)
// We explicitely separate the key and the value in a document,
// instead of simply storing as key:value, to avoid problems
// such as "." being an illegal char in a key name. Also because
// there is no way to do partial matching for key names (one can
// only check for their existence with bson.M{$exists: true}).
const (
collectionName = "keys"
mgoKey = "k"
mgoValue = "v"
)
func init() {
sorted.RegisterKeyValue("mongo", NewKeyValue)
}
// TODO(mpl): automatically derive the keys doc from something else.
// a JSON annotated struct? maybe from instance.
// maybe something in common with pkg/serverconfig/genconfig.go
// NewKeyValue returns a KeyValue implementation on top of MongoDB.
// cfg contains these keys:
// "host", optional string
// "database", string
// "user", optional string
// "password", optional string
func NewKeyValue(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
ins := &instance{
Servers: cfg.OptionalString("host", "localhost"),
Database: cfg.RequiredString("database"),
User: cfg.OptionalString("user", ""),
Password: cfg.OptionalString("password", ""),
Collection: collectionName,
}
if err := cfg.Validate(); err != nil {
return nil, err
}
db, err := ins.getCollection()
if err != nil {
return nil, err
}
return &keyValue{db: db, session: ins.session}, nil
}
// Implementation of Iterator
type iter struct {
res bson.M
*mgo.Iter
end []byte
}
func (it *iter) Next() bool {
if !it.Iter.Next(&it.res) {
return false
}
if len(it.end) > 0 && bytes.Compare(it.KeyBytes(), it.end) >= 0 {
return false
}
return true
}
func (it *iter) Key() string {
key, ok := (it.res[mgoKey]).(string)
if !ok {
return ""
}
return key
}
func (it *iter) KeyBytes() []byte {
// TODO(bradfitz,mpl): this is less efficient than the string way. we should
// do better here, somehow, like all the other KeyValue iterators.
// For now:
return []byte(it.Key())
}
func (it *iter) Value() string {
value, ok := (it.res[mgoValue]).(string)
if !ok {
return ""
}
return value
}
func (it *iter) ValueBytes() []byte {
// TODO(bradfitz,mpl): this is less efficient than the string way. we should
// do better here, somehow, like all the other KeyValue iterators.
// For now:
return []byte(it.Value())
}
func (it *iter) Close() error {
// TODO(mpl): update mongo in 3rd party and return it.Iter.Close()
return nil
}
// Implementation of KeyValue
type keyValue struct {
session *mgo.Session // so we can close it
mu sync.Mutex // guards db
db *mgo.Collection
}
func (kv *keyValue) Get(key string) (string, error) {
kv.mu.Lock()
defer kv.mu.Unlock()
res := bson.M{}
q := kv.db.Find(&bson.M{mgoKey: key})
err := q.One(&res)
if err != nil {
if err == mgo.ErrNotFound {
return "", sorted.ErrNotFound
} else {
return "", err
}
}
return res[mgoValue].(string), err
}
func (kv *keyValue) Find(start, end string) sorted.Iterator {
kv.mu.Lock()
defer kv.mu.Unlock()
it := kv.db.Find(&bson.M{mgoKey: &bson.M{"$gte": start}}).Sort(mgoKey).Iter()
return &iter{res: bson.M{}, Iter: it, end: []byte(end)}
}
func (kv *keyValue) Set(key, value string) error {
kv.mu.Lock()
defer kv.mu.Unlock()
_, err := kv.db.Upsert(&bson.M{mgoKey: key}, &bson.M{mgoKey: key, mgoValue: value})
return err
}
// Delete removes the document with the matching key.
func (kv *keyValue) Delete(key string) error {
kv.mu.Lock()
defer kv.mu.Unlock()
err := kv.db.Remove(&bson.M{mgoKey: key})
if err == mgo.ErrNotFound {
return nil
}
return err
}
// Wipe removes all documents from the collection.
func (kv *keyValue) Wipe() error {
kv.mu.Lock()
defer kv.mu.Unlock()
_, err := kv.db.RemoveAll(nil)
return err
}
type batch interface {
Mutations() []sorted.Mutation
}
func (kv *keyValue) BeginBatch() sorted.BatchMutation {
return sorted.NewBatchMutation()
}
func (kv *keyValue) CommitBatch(bm sorted.BatchMutation) error {
b, ok := bm.(batch)
if !ok {
return errors.New("invalid batch type")
}
kv.mu.Lock()
defer kv.mu.Unlock()
for _, m := range b.Mutations() {
if m.IsDelete() {
if err := kv.db.Remove(bson.M{mgoKey: m.Key()}); err != nil {
return err
}
} else {
if _, err := kv.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil {
return err
}
}
}
return nil
}
func (kv *keyValue) Close() error {
kv.session.Close()
return nil
}
// Ping tests if MongoDB on host can be dialed.
func Ping(host string, timeout time.Duration) bool {
return (&instance{Servers: host}).ping(timeout)
}
// instance helps with the low level details about
// the connection to MongoDB.
type instance struct {
Servers string
User string
Password string
Database string
Collection string
session *mgo.Session
}
func (ins *instance) url() string {
if ins.User == "" || ins.Password == "" {
return ins.Servers
}
return ins.User + ":" + ins.Password + "@" + ins.Servers + "/" + ins.Database
}
// ping won't work with old (1.2) mongo servers.
func (ins *instance) ping(timeout time.Duration) bool {
session, err := mgo.DialWithTimeout(ins.url(), timeout)
if err != nil {
return false
}
defer session.Close()
session.SetSyncTimeout(timeout)
if err = session.Ping(); err != nil {
return false
}
return true
}
func (ins *instance) getConnection() (*mgo.Session, error) {
if ins.session != nil {
return ins.session, nil
}
// TODO(mpl): do some "client caching" as in mysql, to avoid systematically dialing?
session, err := mgo.Dial(ins.url())
if err != nil {
return nil, err
}
session.SetMode(mgo.Monotonic, true)
session.SetSafe(&mgo.Safe{}) // so we get an ErrNotFound error when deleting an absent key
ins.session = session
return session, nil
}
// TODO(mpl): I'm only calling getCollection at the beginning, and
// keeping the collection around and reusing it everywhere, instead
// of calling getCollection everytime, because that's the easiest.
// But I can easily change that. Gustavo says it does not make
// much difference either way.
// Brad, what do you think?
func (ins *instance) getCollection() (*mgo.Collection, error) {
session, err := ins.getConnection()
if err != nil {
return nil, err
}
session.SetSafe(&mgo.Safe{})
session.SetMode(mgo.Strong, true)
c := session.DB(ins.Database).C(ins.Collection)
return c, nil
}