pkg/sorted/mysql: fix detection of outdated database schema

Regardless of whether the database tables are newly created, we were
_always_ stamping the database schema version in the meta table BEFORE
doing the database version check. Which means said check was actually
useless.

Also add a small sanity check of the database name.

Issue #806

Change-Id: I85e19ef7583ebd5ef1043a6deb0fe61abaa4d190
This commit is contained in:
mpl 2016-06-07 02:09:26 +02:00
parent 76cf1a0900
commit fb6fe83535
1 changed files with 45 additions and 19 deletions

View File

@ -50,6 +50,9 @@ func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
if !validDatabaseName(database) {
return nil, fmt.Errorf("%q looks like an invalid database name", database)
}
var err error
if host != "" {
host, err = maybeRemapCloudSQL(host)
@ -76,24 +79,8 @@ func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
if err := CreateDB(db, database); err != nil {
return nil, err
}
for _, tableSQL := range SQLCreateTables() {
tableSQL = strings.Replace(tableSQL, "/*DB*/", database, -1)
if _, err := db.Exec(tableSQL); err != nil {
errMsg := "error creating table with %q: %v."
createError := err
sv, err := serverVersion(db)
if err != nil {
return nil, err
}
if !hasLargeVarchar(sv) {
errMsg += "\nYour MySQL server is too old (< 5.0.3) to support VARCHAR larger than 255."
}
return nil, fmt.Errorf(errMsg, tableSQL, createError)
}
}
if _, err := db.Exec(fmt.Sprintf(`REPLACE INTO %s.meta VALUES ('version', '%d')`, database, SchemaVersion())); err != nil {
return nil, fmt.Errorf("error setting schema version: %v", err)
if err := createTables(db, database); err != nil {
return nil, err
}
kv := &keyValue{
@ -108,9 +95,17 @@ func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
if err := kv.ping(); err != nil {
return nil, fmt.Errorf("MySQL db unreachable: %v", err)
}
version, err := kv.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err)
return nil, fmt.Errorf("error getting current database schema version: %v", err)
}
if version == 0 {
// Newly created table case
if _, err := db.Exec(fmt.Sprintf(`REPLACE INTO %s.meta VALUES ('version', ?)`, database), requiredSchemaVersion); err != nil {
return nil, fmt.Errorf("error setting schema version: %v", err)
}
return kv, nil
}
if version != requiredSchemaVersion {
if version == 20 && requiredSchemaVersion == 21 {
@ -128,6 +123,13 @@ func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
return kv, nil
}
var dbnameRx = regexp.MustCompile(`^[a-zA-Z0-9_]+$`)
// validDatabaseName reports whether dbname is a valid-looking database name.
func validDatabaseName(dbname string) bool {
return dbnameRx.MatchString(dbname)
}
// CreateDB creates the named database if it does not already exist.
func CreateDB(db *sql.DB, dbname string) error {
if dbname == "" {
@ -139,6 +141,25 @@ func CreateDB(db *sql.DB, dbname string) error {
return nil
}
func createTables(db *sql.DB, database string) error {
for _, tableSQL := range SQLCreateTables() {
tableSQL = strings.Replace(tableSQL, "/*DB*/", database, -1)
if _, err := db.Exec(tableSQL); err != nil {
errMsg := "error creating table with %q: %v."
createError := err
sv, err := serverVersion(db)
if err != nil {
return err
}
if !hasLargeVarchar(sv) {
errMsg += "\nYour MySQL server is too old (< 5.0.3) to support VARCHAR larger than 255."
}
return fmt.Errorf(errMsg, tableSQL, createError)
}
}
return nil
}
// We keep a cache of open database handles.
var (
dbsmu sync.Mutex
@ -181,8 +202,13 @@ func (kv *keyValue) ping() error {
return err
}
// SchemaVersion returns the schema version found in the meta table. If no
// version is found it returns (0, nil), as the table should be considered empty.
func (kv *keyValue) SchemaVersion() (version int, err error) {
err = kv.db.QueryRow("SELECT value FROM " + kv.KeyValue.TablePrefix + "meta WHERE metakey='version'").Scan(&version)
if err == sql.ErrNoRows {
return 0, nil
}
return
}