sorted/sqlkv: support for adding optional 'db.' prefix before table names

This is so configurations using the same database server for many
databases (multiple queues + the index) can all share the same
underlying *sql.DB

Change-Id: Iae7d4835fe9045008709c953e36b729729b1b6f4
This commit is contained in:
Brad Fitzpatrick 2014-08-06 16:13:13 -07:00
parent b65868d754
commit d6a8ecce2d
1 changed files with 17 additions and 10 deletions

View File

@ -23,6 +23,7 @@ import (
"fmt"
"log"
"regexp"
"strings"
"sync"
"camlistore.org/pkg/leak"
@ -49,14 +50,20 @@ type KeyValue struct {
// concurrency in most cases.
Serial bool
// TablePrefix optionally provides a prefix for SQL table
// names. This is typically "dbname.", ending in a period.
TablePrefix string
mu sync.Mutex // the mutex used, if Serial is set
}
func (kv *KeyValue) sql(v string) string {
// TODO(bradfitz): all this string manipulation is redundant at runtime.
// We should do it once at the beginning and keep the strings around.
if f := kv.PlaceHolderFunc; f != nil {
return f(v)
v = f(v)
}
return v
return strings.Replace(v, "/*TPRE*/", kv.TablePrefix, -1)
}
type batchTx struct {
@ -86,14 +93,14 @@ func (b *batchTx) Set(key, value string) {
b.err = b.SetFunc(b.tx, key, value)
return
}
_, b.err = b.tx.Exec(b.sql("REPLACE INTO rows (k, v) VALUES (?, ?)"), key, value)
_, b.err = b.tx.Exec(b.sql("REPLACE INTO /*TPRE*/rows (k, v) VALUES (?, ?)"), key, value)
}
func (b *batchTx) Delete(key string) {
if b.err != nil {
return
}
_, b.err = b.tx.Exec(b.sql("DELETE FROM rows WHERE k=?"), key)
_, b.err = b.tx.Exec(b.sql("DELETE FROM /*TPRE*/rows WHERE k=?"), key)
}
func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
@ -128,7 +135,7 @@ func (kv *KeyValue) Get(key string) (value string, err error) {
kv.mu.Lock()
defer kv.mu.Unlock()
}
err = kv.DB.QueryRow(kv.sql("SELECT v FROM rows WHERE k=?"), key).Scan(&value)
err = kv.DB.QueryRow(kv.sql("SELECT v FROM /*TPRE*/rows WHERE k=?"), key).Scan(&value)
if err == sql.ErrNoRows {
err = sorted.ErrNotFound
}
@ -143,7 +150,7 @@ func (kv *KeyValue) Set(key, value string) error {
if kv.SetFunc != nil {
return kv.SetFunc(kv.DB, key, value)
}
_, err := kv.DB.Exec(kv.sql("REPLACE INTO rows (k, v) VALUES (?, ?)"), key, value)
_, err := kv.DB.Exec(kv.sql("REPLACE INTO /*TPRE*/rows (k, v) VALUES (?, ?)"), key, value)
return err
}
@ -152,7 +159,7 @@ func (kv *KeyValue) Delete(key string) error {
kv.mu.Lock()
defer kv.mu.Unlock()
}
_, err := kv.DB.Exec(kv.sql("DELETE FROM rows WHERE k=?"), key)
_, err := kv.DB.Exec(kv.sql("DELETE FROM /*TPRE*/rows WHERE k=?"), key)
return err
}
@ -161,7 +168,7 @@ func (kv *KeyValue) Wipe() error {
kv.mu.Lock()
defer kv.mu.Unlock()
}
_, err := kv.DB.Exec(kv.sql("DELETE FROM rows"))
_, err := kv.DB.Exec(kv.sql("DELETE FROM /*TPRE*/rows"))
return err
}
@ -175,9 +182,9 @@ func (kv *KeyValue) Find(start, end string) sorted.Iterator {
var rows *sql.Rows
var err error
if end == "" {
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM rows WHERE k >= ? ORDER BY k "), start)
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM /*TPRE*/rows WHERE k >= ? ORDER BY k "), start)
} else {
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM rows WHERE k >= ? AND k < ? ORDER BY k "), start, end)
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM /*TPRE*/rows WHERE k >= ? AND k < ? ORDER BY k "), start, end)
}
if err != nil {
log.Printf("unexpected query error: %v", err)