more camli/db work.

Change-Id: I9c86df6a9e528432305d6042a59dd171f1b67109
This commit is contained in:
Brad Fitzpatrick 2011-08-24 18:10:05 +04:00
parent 5fae3cf86a
commit 80b91d5ba8
4 changed files with 166 additions and 21 deletions

View File

@ -22,6 +22,7 @@ import (
"fmt"
"os"
"runtime"
"sync"
"camli/db/dbimpl"
)
@ -42,6 +43,7 @@ type DB struct {
driver dbimpl.Driver
dbarg string
mu sync.Mutex
freeConn []dbimpl.Conn
}
@ -62,14 +64,30 @@ func (db *DB) maxIdleConns() int {
// conn returns a newly-opened or cached dbimpl.Conn
func (db *DB) conn() (dbimpl.Conn, os.Error) {
db.mu.Lock()
if n := len(db.freeConn); n > 0 {
conn := db.freeConn[n-1]
db.freeConn = db.freeConn[:n-1]
db.mu.Unlock()
return conn, nil
}
db.mu.Unlock()
return db.driver.Open(db.dbarg)
}
func (db *DB) connIfFree(wanted dbimpl.Conn) (conn dbimpl.Conn, ok bool) {
db.mu.Lock()
defer db.mu.Unlock()
for n, conn := range db.freeConn {
if conn == wanted {
db.freeConn[n] = db.freeConn[len(db.freeConn)-1]
db.freeConn = db.freeConn[:n-1]
return wanted, true
}
}
return nil, false
}
func (db *DB) putConn(c dbimpl.Conn) {
if n := len(db.freeConn); n < db.maxIdleConns() {
db.freeConn = append(db.freeConn, c)
@ -102,8 +120,7 @@ func (db *DB) Prepare(query string) (*Stmt, os.Error) {
stmt := &Stmt{
db: db,
query: query,
ci: ci,
si: si,
css: []connStmt{{ci, si}},
}
return stmt, nil
}
@ -183,13 +200,19 @@ func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
panic(todo())
}
type Stmt struct {
db *DB // where we came from
ci dbimpl.Conn // the Conn that we're bound to. to execute, we need to wait for this Conn to be free.
si dbimpl.Stmt // owned
// connStmt is a prepared statement on a particular connection.
type connStmt struct {
ci dbimpl.Conn
si dbimpl.Stmt
}
// query is the query that created the Stmt
query string
type Stmt struct {
// Immutable:
db *DB // where we came from
query string // that created the Sttm
mu sync.Mutex
css []connStmt // can use any that have idle connections
}
func todo() string {
@ -198,7 +221,53 @@ func todo() string {
}
func (s *Stmt) Exec(args ...interface{}) os.Error {
panic(todo())
ci, si, err := s.connStmt()
if err != nil {
return err
}
defer s.db.putConn(ci)
// TODO(bradfitz): convert args from full set (package db) to
// restricted set (package dbimpl)
resi, err := si.Exec(args)
if err != nil {
return err
}
_ = resi // TODO(bradfitz): return these stats, converted to pkg db type
return nil
}
func (s *Stmt) connStmt(args ...interface{}) (dbimpl.Conn, dbimpl.Stmt, os.Error) {
s.mu.Lock()
var cs connStmt
match := false
for _, v := range s.css {
// TODO(bradfitz): lazily clean up entries in this
// list with dead conns while enumerating
if _, match = s.db.connIfFree(cs.ci); match {
cs = v
break
}
}
s.mu.Unlock()
// Make a new conn if all are busy.
// TODO(bradfitz): or wait for one? make configurable later?
if !match {
ci, err := s.db.conn()
if err != nil {
return nil, nil, err
}
si, err := ci.Prepare(s.query)
if err != nil {
return nil, nil, err
}
s.mu.Lock()
cs = connStmt{ci, si}
s.css = append(s.css, cs)
s.mu.Unlock()
}
return cs.ci, cs.si, nil
}
func (s *Stmt) Query(args ...interface{}) (*Rows, os.Error) {

View File

@ -21,7 +21,10 @@ import (
)
func TestDb(t *testing.T) {
db, err := Open("test", "foo;wipe")
db, err := Open("test", "foo")
if err := db.Exec("WIPE"); err != nil {
t.Fatalf("exec wipe: %v", err)
}
if err != nil {
t.Fatalf("Open: %v", err)
}
@ -33,4 +36,35 @@ func TestDb(t *testing.T) {
if err != nil {
t.Errorf("Stmt, err = %v, %v", stmt, err)
}
type execTest struct {
args []interface{}
wantErr string
}
execTests := []execTest{
// Okay:
{[]interface{}{"Brad", 31}, ""},
{[]interface{}{"Brad", int64(31)}, ""},
{[]interface{}{"Bob", "32"}, ""},
{[]interface{}{7, 9}, ""},
// Invalid conversions:
//{[]interface{}{"Brad", int64(0xFFFFFFFF)}, "conversion"},
//{[]interface{}{"Brad", "strconv fail"}, "conversion"},
// Wrong number of args:
//{[]interface{}{}, "wrong number of args"},
//{[]interface{}{1, 2, 3}, "wrong number of args"},
}
for n, et := range execTests {
err := stmt.Exec(et.args...)
errStr := ""
if err != nil {
errStr = err.String()
}
if errStr != et.wantErr {
t.Errorf("stmt.Execute #%d: for %v, got error %q, want error %q",
n, et.args, errStr, et.wantErr)
}
}
}

View File

@ -57,7 +57,8 @@ type Result interface {
RowsAffected() (int64, os.Error)
}
// Stmt is a prepared statement. It is bound to a Conn.
// Stmt is a prepared statement. It is bound to a Conn and not
// used by multiple goroutines concurrently.
type Stmt interface {
Close() os.Error
NumInput() int
@ -78,6 +79,16 @@ type Tx interface {
Rollback() os.Error
}
type RowsAffected int64
func (RowsAffected) AutoIncrementId() (int64, os.Error) {
return 0, os.NewError("no AutoIncrementId available")
}
func (v RowsAffected) RowsAffected() (int64, os.Error) {
return int64(v), nil
}
type ddlSuccess struct{}
var DDLSuccess Result = ddlSuccess{}

View File

@ -56,8 +56,14 @@ type fakeDB struct {
}
type table struct {
mu sync.Mutex
colname []string
coltype []string
rows []*row
}
type row struct {
cols []interface{} // must be same size as its table colname + coltype
}
type fakeConn struct {
@ -108,15 +114,15 @@ func (d *fakeDriver) Open(dsn string) (dbimpl.Conn, os.Error) {
db = &fakeDB{name: name}
d.dbs[name] = db
}
if len(parts) > 1 && parts[1] == "wipe" {
db.wipe()
}
return &fakeConn{db: db}, nil
}
func (db *fakeDB) wipe() {
db.mu.Lock()
defer db.mu.Unlock()
if len(db.tables) != 0 {
print("wiped db", db)
}
db.tables = nil
}
@ -137,16 +143,33 @@ func (db *fakeDB) createTable(name string, columnNames, columnTypes []string) os
return nil
}
func (db *fakeDB) insert(table string, vals []interface{}) (dbimpl.Result, os.Error) {
db.mu.Lock()
t, ok := db.table(table)
db.mu.Unlock()
if !ok {
return nil, fmt.Errorf("fakedb: table %q doesn't exist", table)
}
t.mu.Lock()
defer t.mu.Unlock()
t.rows = append(t.rows, &row{cols: vals})
return dbimpl.RowsAffected(1), nil
}
// must be called with db.mu lock held
func (db *fakeDB) table(table string) (*table, bool) {
if db.tables == nil {
return nil, false
}
t, ok := db.tables[table]
return t, ok
}
func (db *fakeDB) columnType(table, column string) (typ string, ok bool) {
db.mu.Lock()
defer db.mu.Unlock()
if db.tables == nil {
println("no tables exist")
return
}
t, ok := db.tables[table]
t, ok := db.table(table)
if !ok {
println("table no exist")
return
}
for n, cname := range t.colname {
@ -191,6 +214,8 @@ func (c *fakeConn) Prepare(query string) (dbimpl.Stmt, os.Error) {
cmd := parts[0]
stmt := &fakeStmt{q: query, c: c, cmd: cmd}
switch cmd {
case "WIPE":
// Nothing
case "CREATE":
if len(parts) != 3 {
return nil, errf("invalid %q syntax with %d parts; want 3", cmd, len(parts))
@ -239,12 +264,18 @@ func (s *fakeStmt) Close() os.Error {
}
func (s *fakeStmt) Exec(args []interface{}) (dbimpl.Result, os.Error) {
db := s.c.db
switch s.cmd {
case "WIPE":
db.wipe()
return dbimpl.DDLSuccess, nil
case "CREATE":
if err := s.c.db.createTable(s.table, s.colName, s.colType); err != nil {
if err := db.createTable(s.table, s.colName, s.colType); err != nil {
return nil, err
}
return dbimpl.DDLSuccess, nil
case "INSERT":
return db.insert(s.table, args)
}
fmt.Printf("EXEC statement, cmd=%q: %#v\n", s.cmd, s)
return nil, fmt.Errorf("unimplemented statement Exec command type of %q", s.cmd)