sorted/mysql: don't put database name in the DSN, share connections

Change-Id: Ia4d399e72a1290f68dc2b0de71945176c6aac482
This commit is contained in:
Brad Fitzpatrick 2014-08-06 16:15:08 -07:00
parent d6a8ecce2d
commit 6513d525c6
2 changed files with 47 additions and 17 deletions

View File

@ -28,12 +28,12 @@ func SchemaVersion() int {
// which is purely about bytes.
func SQLCreateTables() []string {
return []string{
`CREATE TABLE IF NOT EXISTS rows (
`CREATE TABLE IF NOT EXISTS /*DB*/rows (
k VARCHAR(255) NOT NULL PRIMARY KEY,
v VARCHAR(255))
DEFAULT CHARACTER SET binary`,
`CREATE TABLE IF NOT EXISTS meta (
`CREATE TABLE IF NOT EXISTS /*DB*/meta (
metakey VARCHAR(255) NOT NULL PRIMARY KEY,
value VARCHAR(255) NOT NULL)
DEFAULT CHARACTER SET binary`,

View File

@ -23,6 +23,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
@ -35,40 +36,49 @@ func init() {
}
func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
host := cfg.OptionalString("host", "")
dsn := fmt.Sprintf("%s:%s@/%s",
cfg.RequiredString("user"),
cfg.OptionalString("password", ""),
cfg.RequiredString("database"),
var (
user = cfg.RequiredString("user")
database = cfg.RequiredString("database")
host = cfg.OptionalString("host", "")
password = cfg.OptionalString("password", "")
)
if err := cfg.Validate(); err != nil {
return nil, err
}
var err error
if host != "" {
// TODO(mpl): document that somewhere
if !strings.Contains(host, ":") {
host = host + ":3306"
host += ":3306"
}
dsn = strings.Replace(dsn, "@", fmt.Sprintf("@tcp(%v)", host), 1)
host = "tcp(" + host + ")"
}
// The DSN does NOT have a database name in it so it's
// cacheable and can be shared between different queues & the
// index, all sharing the same database server, cutting down
// number of TCP connections required. We add the database
// name in queries instead.
dsn := fmt.Sprintf("%s:%s@%s/", user, password, host)
db, err := sql.Open("mysql", dsn)
db, err := openOrCachedDB(dsn)
if err != nil {
return nil, err
}
for _, tableSql := range SQLCreateTables() {
if _, err := db.Exec(tableSql); err != nil {
return nil, fmt.Errorf("error creating table with %q: %v", tableSql, err)
for _, tableSQL := range SQLCreateTables() {
tableSQL = strings.Replace(tableSQL, "/*DB*/", database+".", -1)
if _, err := db.Exec(tableSQL); err != nil {
return nil, fmt.Errorf("error creating table with %q: %v", tableSQL, err)
}
}
if _, err := db.Exec(fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, SchemaVersion())); err != nil {
if _, err := db.Exec(fmt.Sprintf(`REPLACE INTO %s.meta VALUES ('version', '%d')`, database, SchemaVersion())); err != nil {
return nil, fmt.Errorf("error setting schema version: %v", err)
}
kv := &keyValue{
db: db,
KeyValue: &sqlkv.KeyValue{
DB: db,
DB: db,
TablePrefix: database + ".",
},
}
if err := kv.ping(); err != nil {
@ -94,6 +104,26 @@ func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
return kv, nil
}
// We keep a cache of open database handles.
var (
dbsmu sync.Mutex
dbs = map[string]*sql.DB{} // DSN -> db
)
func openOrCachedDB(dsn string) (*sql.DB, error) {
dbsmu.Lock()
defer dbsmu.Unlock()
if db, ok := dbs[dsn]; ok {
return db, nil
}
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
dbs[dsn] = db
return db, nil
}
type keyValue struct {
*sqlkv.KeyValue
@ -107,7 +137,7 @@ func (kv *keyValue) ping() error {
}
func (kv *keyValue) SchemaVersion() (version int, err error) {
err = kv.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
err = kv.db.QueryRow("SELECT value FROM " + kv.KeyValue.TablePrefix + "meta WHERE metakey='version'").Scan(&version)
return
}