pkg/index: move mysql to sorted

also minor cleanups in sqlite and mongo

Change-Id: I1f371358997d929c72a8f63d2630a1c3fa4a0240
This commit is contained in:
mpl 2013-12-18 19:01:13 +01:00
parent 114f522bf0
commit 62f8a252ff
8 changed files with 225 additions and 133 deletions

View File

@ -25,9 +25,9 @@ import (
"strings" "strings"
"camlistore.org/pkg/cmdmain" "camlistore.org/pkg/cmdmain"
"camlistore.org/pkg/index/mysql"
"camlistore.org/pkg/index/postgres" "camlistore.org/pkg/index/postgres"
"camlistore.org/pkg/sorted/mongo" "camlistore.org/pkg/sorted/mongo"
"camlistore.org/pkg/sorted/mysql"
"camlistore.org/pkg/sorted/sqlite" "camlistore.org/pkg/sorted/sqlite"
_ "camlistore.org/third_party/github.com/lib/pq" _ "camlistore.org/third_party/github.com/lib/pq"

View File

@ -83,17 +83,20 @@ func TestSortedKV(t *testing.T) {
type mongoTester struct{} type mongoTester struct{}
func (mongoTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) { func (mongoTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) {
skipOrFailIfNoMongo(t)
defer test.TLog(t)() defer test.TLog(t)()
var mu sync.Mutex // guards cleanups
var cleanups []func() var cleanups []func()
defer func() { defer func() {
mu.Lock() // never unlocked
for _, fn := range cleanups { for _, fn := range cleanups {
fn() fn()
} }
}() }()
initIndex := func() *index.Index { initIndex := func() *index.Index {
kv, cleanup := newSorted(t) kv, cleanup := newSorted(t)
mu.Lock()
cleanups = append(cleanups, cleanup) cleanups = append(cleanups, cleanup)
mu.Unlock()
return index.New(kv) return index.New(kv)
} }
tfn(t, initIndex) tfn(t, initIndex)

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2012 Google Inc. Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -20,13 +20,16 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"log"
"sync" "sync"
"testing" "testing"
"camlistore.org/pkg/index" "camlistore.org/pkg/index"
"camlistore.org/pkg/index/indextest" "camlistore.org/pkg/index/indextest"
"camlistore.org/pkg/index/mysql"
"camlistore.org/pkg/osutil" "camlistore.org/pkg/osutil"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/kvtest"
"camlistore.org/pkg/sorted/mysql"
"camlistore.org/pkg/test" "camlistore.org/pkg/test"
_ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv" _ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv"
@ -40,47 +43,19 @@ var (
func checkDB() { func checkDB() {
var err error var err error
if rootdb, err = sql.Open("mymysql", "mysql/root/root"); err == nil { rootdb, err = sql.Open("mymysql", "mysql/root/root")
var n int
err := rootdb.QueryRow("SELECT COUNT(*) FROM user").Scan(&n)
if err == nil {
dbAvailable = true
}
}
}
func makeIndex() *index.Index {
dbname := "camlitest_" + osutil.Username()
do(rootdb, "DROP DATABASE IF EXISTS "+dbname)
do(rootdb, "CREATE DATABASE "+dbname)
db, err := sql.Open("mymysql", dbname+"/root/root")
if err != nil { if err != nil {
panic("opening test database: " + err.Error()) log.Printf("Could not open rootdb: %v", err)
}
for _, tableSql := range mysql.SQLCreateTables() {
do(db, tableSql)
}
do(db, fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, mysql.SchemaVersion()))
s, err := mysql.NewStorage("localhost", "root", "root", dbname)
if err != nil {
panic(err)
}
return index.New(s)
}
func do(db *sql.DB, sql string) {
_, err := db.Exec(sql)
if err == nil {
return return
} }
panic(fmt.Sprintf("Error %v running SQL: %s", err, sql)) var n int
err = rootdb.QueryRow("SELECT COUNT(*) FROM user").Scan(&n)
if err == nil {
dbAvailable = true
}
} }
type mysqlTester struct{} func skipOrFailIfNoMySQL(t *testing.T) {
func (mysqlTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) {
once.Do(checkDB) once.Do(checkDB)
if !dbAvailable { if !dbAvailable {
// TODO(bradfitz): accept somehow other passwords than // TODO(bradfitz): accept somehow other passwords than
@ -90,6 +65,68 @@ func (mysqlTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index))
test.DependencyErrorOrSkip(t) test.DependencyErrorOrSkip(t)
t.Fatalf("MySQL not available locally for testing: %v", err) t.Fatalf("MySQL not available locally for testing: %v", err)
} }
}
func do(db *sql.DB, sql string) {
_, err := db.Exec(sql)
if err == nil {
return
}
log.Fatalf("Error %v running SQL: %s", err, sql)
}
func newSorted(t *testing.T) (kv sorted.KeyValue, clean func()) {
skipOrFailIfNoMySQL(t)
dbname := "camlitest_" + osutil.Username()
do(rootdb, "DROP DATABASE IF EXISTS "+dbname)
do(rootdb, "CREATE DATABASE "+dbname)
db, err := sql.Open("mymysql", dbname+"/root/root")
if err != nil {
t.Fatalf("opening test database: " + err.Error())
}
for _, tableSql := range mysql.SQLCreateTables() {
do(db, tableSql)
}
do(db, fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, mysql.SchemaVersion()))
kv, err = mysql.NewKeyValue(mysql.Config{
Database: dbname,
User: "root",
Password: "root",
})
if err != nil {
t.Fatal(err)
}
return kv, func() {
kv.Close()
}
}
func TestSortedKV(t *testing.T) {
kv, clean := newSorted(t)
defer clean()
kvtest.TestSorted(t, kv)
}
type mysqlTester struct{}
func (mysqlTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) {
var mu sync.Mutex // guards cleanups
var cleanups []func()
defer func() {
mu.Lock() // never unlocked
for _, fn := range cleanups {
fn()
}
}()
makeIndex := func() *index.Index {
s, cleanup := newSorted(t)
mu.Lock()
cleanups = append(cleanups, cleanup)
mu.Unlock()
return index.New(s)
}
tfn(t, makeIndex) tfn(t, makeIndex)
} }

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2011 Google Inc. Copyright 2011 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -19,116 +19,38 @@ limitations under the License.
package mysql package mysql
import ( import (
"database/sql"
"fmt"
"os"
"camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index" "camlistore.org/pkg/index"
"camlistore.org/pkg/index/sqlindex"
"camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted/mysql"
_ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv" _ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv"
) )
type myIndexStorage struct { func init() {
*sqlindex.Storage blobserver.RegisterStorageConstructor("mysqlindexer", newFromConfig)
host, user, password, database string
db *sql.DB
} }
var _ sorted.KeyValue = (*myIndexStorage)(nil)
// NewStorage returns an sorted.KeyValue implementation of the described MySQL database.
// This exists mostly for testing and does not initialize the schema.
func NewStorage(host, user, password, dbname string) (sorted.KeyValue, error) {
// TODO(bradfitz): host is ignored; how to plumb it through with mymysql?
dsn := dbname + "/" + user + "/" + password
db, err := sql.Open("mymysql", dsn)
if err != nil {
return nil, err
}
// TODO(bradfitz): ping db, check that it's reachable.
return &myIndexStorage{
db: db,
Storage: &sqlindex.Storage{
DB: db,
},
host: host,
user: user,
password: password,
database: dbname,
}, nil
}
const fixSchema20to21 = `Character set in tables changed to binary, you can fix your tables with:
ALTER TABLE rows CONVERT TO CHARACTER SET binary;
ALTER TABLE meta CONVERT TO CHARACTER SET binary;
UPDATE meta SET value=21 WHERE metakey='version' AND value=20;
`
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
var ( blobPrefix := config.RequiredString("blobSource")
blobPrefix = config.RequiredString("blobSource") mysqlConf, err := mysql.ConfigFromJSON(config)
host = config.OptionalString("host", "localhost") if err != nil {
user = config.RequiredString("user")
password = config.OptionalString("password", "")
database = config.RequiredString("database")
)
if err := config.Validate(); err != nil {
return nil, err return nil, err
} }
kv, err := mysql.NewKeyValue(mysqlConf)
if err != nil {
return nil, err
}
ix := index.New(kv)
sto, err := ld.GetStorage(blobPrefix) sto, err := ld.GetStorage(blobPrefix)
if err != nil { if err != nil {
ix.Close()
return nil, err return nil, err
} }
isto, err := NewStorage(host, user, password, database)
if err != nil {
return nil, err
}
is := isto.(*myIndexStorage)
if err := is.ping(); err != nil {
return nil, err
}
version, err := is.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err)
}
if version != requiredSchemaVersion {
if version == 20 && requiredSchemaVersion == 21 {
fmt.Fprintf(os.Stderr, fixSchema20to21)
}
if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" {
// Good signal that we're using the devcam server, so help out
// the user with a more useful tip:
return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion)
}
return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
version, requiredSchemaVersion)
}
ix := index.New(is)
ix.BlobSource = sto ix.BlobSource = sto
// Good enough, for now: // Good enough, for now:
ix.KeyFetcher = ix.BlobSource ix.KeyFetcher = ix.BlobSource
return ix, nil return ix, nil
} }
func init() {
blobserver.RegisterStorageConstructor("mysqlindexer", blobserver.StorageConstructor(newFromConfig))
}
func (mi *myIndexStorage) ping() error {
// TODO(bradfitz): something more efficient here?
_, err := mi.SchemaVersion()
return err
}
func (mi *myIndexStorage) SchemaVersion() (version int, err error) {
err = mi.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}

View File

@ -38,7 +38,6 @@ import (
var ( var (
once sync.Once once sync.Once
dbAvailable bool dbAvailable bool
rootdb *sql.DB
) )
func do(db *sql.DB, sql string) { func do(db *sql.DB, sql string) {

130
pkg/sorted/mysql/mysqlkv.go Normal file
View File

@ -0,0 +1,130 @@
/*
Copyright 2011 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package mysql provides an implementation of sorted.KeyValue
// on top of MySQL.
package mysql
import (
"database/sql"
"fmt"
"os"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/sqlkv"
_ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv"
)
func init() {
sorted.RegisterKeyValue("mysql", newKeyValueFromJSONConfig)
}
// Config holds the parameters used to connect to the MySQL db.
type Config struct {
Host string // Optional. Defaults to "localhost" in ConfigFromJSON.
Database string // Required.
User string // Required.
Password string // Optional.
}
// ConfigFromJSON populates Config from config, and validates
// config. It returns an error if config fails to validate.
func ConfigFromJSON(config jsonconfig.Obj) (Config, error) {
conf := Config{
Host: config.OptionalString("host", "localhost"),
User: config.RequiredString("user"),
Password: config.OptionalString("password", ""),
Database: config.RequiredString("database"),
}
if err := config.Validate(); err != nil {
return Config{}, err
}
return conf, nil
}
func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
conf, err := ConfigFromJSON(cfg)
if err != nil {
return nil, err
}
return NewKeyValue(conf)
}
// NewKeyValue returns a sorted.KeyValue implementation of the described MySQL database.
func NewKeyValue(cfg Config) (sorted.KeyValue, error) {
// TODO(bradfitz,mpl): host is ignored for now. I think we can connect to host with:
// tcp:ADDR*DBNAME/USER/PASSWD (http://godoc.org/github.com/ziutek/mymysql/godrv#Driver.Open)
// I suppose we'll have to do a lookup first.
dsn := cfg.Database + "/" + cfg.User + "/" + cfg.Password
db, err := sql.Open("mymysql", dsn)
if err != nil {
return nil, err
}
kv := &keyValue{
db: db,
KeyValue: &sqlkv.KeyValue{
DB: db,
},
conf: cfg,
}
if err := kv.ping(); err != nil {
return nil, fmt.Errorf("MySQL db unreachable: %v", err)
}
version, err := kv.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err)
}
if version != requiredSchemaVersion {
if version == 20 && requiredSchemaVersion == 21 {
fmt.Fprintf(os.Stderr, fixSchema20to21)
}
if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" {
// Good signal that we're using the devcam server, so help out
// the user with a more useful tip:
return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion)
}
return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
version, requiredSchemaVersion)
}
return kv, nil
}
type keyValue struct {
*sqlkv.KeyValue
conf Config
db *sql.DB
}
func (kv *keyValue) ping() error {
// TODO(bradfitz): something more efficient here?
_, err := kv.SchemaVersion()
return err
}
func (kv *keyValue) SchemaVersion() (version int, err error) {
err = kv.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}
const fixSchema20to21 = `Character set in tables changed to binary, you can fix your tables with:
ALTER TABLE rows CONVERT TO CHARACTER SET binary;
ALTER TABLE meta CONVERT TO CHARACTER SET binary;
UPDATE meta SET value=21 WHERE metakey='version' AND value=20;
`

View File

@ -70,6 +70,7 @@ import (
_ "camlistore.org/pkg/sorted" _ "camlistore.org/pkg/sorted"
_ "camlistore.org/pkg/sorted/kvfile" _ "camlistore.org/pkg/sorted/kvfile"
_ "camlistore.org/pkg/sorted/mongo" _ "camlistore.org/pkg/sorted/mongo"
_ "camlistore.org/pkg/sorted/mysql"
"camlistore.org/pkg/sorted/sqlite" "camlistore.org/pkg/sorted/sqlite"
// Handlers: // Handlers: