diff --git a/pkg/sorted/mysql/dbschema.go b/pkg/sorted/mysql/dbschema.go index 6d6680d9a..ac52e125a 100644 --- a/pkg/sorted/mysql/dbschema.go +++ b/pkg/sorted/mysql/dbschema.go @@ -28,12 +28,12 @@ func SchemaVersion() int { // which is purely about bytes. func SQLCreateTables() []string { return []string{ - `CREATE TABLE IF NOT EXISTS rows ( + `CREATE TABLE IF NOT EXISTS /*DB*/rows ( k VARCHAR(255) NOT NULL PRIMARY KEY, v VARCHAR(255)) DEFAULT CHARACTER SET binary`, - `CREATE TABLE IF NOT EXISTS meta ( + `CREATE TABLE IF NOT EXISTS /*DB*/meta ( metakey VARCHAR(255) NOT NULL PRIMARY KEY, value VARCHAR(255) NOT NULL) DEFAULT CHARACTER SET binary`, diff --git a/pkg/sorted/mysql/mysqlkv.go b/pkg/sorted/mysql/mysqlkv.go index 5e0c8a91f..9a00f72f0 100644 --- a/pkg/sorted/mysql/mysqlkv.go +++ b/pkg/sorted/mysql/mysqlkv.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "strings" + "sync" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/sorted" @@ -35,40 +36,49 @@ func init() { } func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) { - host := cfg.OptionalString("host", "") - dsn := fmt.Sprintf("%s:%s@/%s", - cfg.RequiredString("user"), - cfg.OptionalString("password", ""), - cfg.RequiredString("database"), + var ( + user = cfg.RequiredString("user") + database = cfg.RequiredString("database") + host = cfg.OptionalString("host", "") + password = cfg.OptionalString("password", "") ) if err := cfg.Validate(); err != nil { return nil, err } + var err error if host != "" { - // TODO(mpl): document that somewhere if !strings.Contains(host, ":") { - host = host + ":3306" + host += ":3306" } - dsn = strings.Replace(dsn, "@", fmt.Sprintf("@tcp(%v)", host), 1) + host = "tcp(" + host + ")" } + // The DSN does NOT have a database name in it so it's + // cacheable and can be shared between different queues & the + // index, all sharing the same database server, cutting down + // number of TCP connections required. We add the database + // name in queries instead. + dsn := fmt.Sprintf("%s:%s@%s/", user, password, host) - db, err := sql.Open("mysql", dsn) + db, err := openOrCachedDB(dsn) if err != nil { return nil, err } - for _, tableSql := range SQLCreateTables() { - if _, err := db.Exec(tableSql); err != nil { - return nil, fmt.Errorf("error creating table with %q: %v", tableSql, err) + + for _, tableSQL := range SQLCreateTables() { + tableSQL = strings.Replace(tableSQL, "/*DB*/", database+".", -1) + if _, err := db.Exec(tableSQL); err != nil { + return nil, fmt.Errorf("error creating table with %q: %v", tableSQL, err) } } - if _, err := db.Exec(fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, SchemaVersion())); err != nil { + if _, err := db.Exec(fmt.Sprintf(`REPLACE INTO %s.meta VALUES ('version', '%d')`, database, SchemaVersion())); err != nil { return nil, fmt.Errorf("error setting schema version: %v", err) } kv := &keyValue{ db: db, KeyValue: &sqlkv.KeyValue{ - DB: db, + DB: db, + TablePrefix: database + ".", }, } if err := kv.ping(); err != nil { @@ -94,6 +104,26 @@ func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) { return kv, nil } +// We keep a cache of open database handles. +var ( + dbsmu sync.Mutex + dbs = map[string]*sql.DB{} // DSN -> db +) + +func openOrCachedDB(dsn string) (*sql.DB, error) { + dbsmu.Lock() + defer dbsmu.Unlock() + if db, ok := dbs[dsn]; ok { + return db, nil + } + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + dbs[dsn] = db + return db, nil +} + type keyValue struct { *sqlkv.KeyValue @@ -107,7 +137,7 @@ func (kv *keyValue) ping() error { } func (kv *keyValue) SchemaVersion() (version int, err error) { - err = kv.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version) + err = kv.db.QueryRow("SELECT value FROM " + kv.KeyValue.TablePrefix + "meta WHERE metakey='version'").Scan(&version) return }