diff --git a/lib/go/camli/db/db.go b/lib/go/camli/db/db.go index 2310c41b7..d622d3d02 100644 --- a/lib/go/camli/db/db.go +++ b/lib/go/camli/db/db.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "runtime" + "sync" "camli/db/dbimpl" ) @@ -42,6 +43,7 @@ type DB struct { driver dbimpl.Driver dbarg string + mu sync.Mutex freeConn []dbimpl.Conn } @@ -62,14 +64,30 @@ func (db *DB) maxIdleConns() int { // conn returns a newly-opened or cached dbimpl.Conn func (db *DB) conn() (dbimpl.Conn, os.Error) { + db.mu.Lock() if n := len(db.freeConn); n > 0 { conn := db.freeConn[n-1] db.freeConn = db.freeConn[:n-1] + db.mu.Unlock() return conn, nil } + db.mu.Unlock() return db.driver.Open(db.dbarg) } +func (db *DB) connIfFree(wanted dbimpl.Conn) (conn dbimpl.Conn, ok bool) { + db.mu.Lock() + defer db.mu.Unlock() + for n, conn := range db.freeConn { + if conn == wanted { + db.freeConn[n] = db.freeConn[len(db.freeConn)-1] + db.freeConn = db.freeConn[:n-1] + return wanted, true + } + } + return nil, false +} + func (db *DB) putConn(c dbimpl.Conn) { if n := len(db.freeConn); n < db.maxIdleConns() { db.freeConn = append(db.freeConn, c) @@ -102,8 +120,7 @@ func (db *DB) Prepare(query string) (*Stmt, os.Error) { stmt := &Stmt{ db: db, query: query, - ci: ci, - si: si, + css: []connStmt{{ci, si}}, } return stmt, nil } @@ -183,13 +200,19 @@ func (tx *Tx) QueryRow(query string, args ...interface{}) *Row { panic(todo()) } -type Stmt struct { - db *DB // where we came from - ci dbimpl.Conn // the Conn that we're bound to. to execute, we need to wait for this Conn to be free. - si dbimpl.Stmt // owned +// connStmt is a prepared statement on a particular connection. +type connStmt struct { + ci dbimpl.Conn + si dbimpl.Stmt +} - // query is the query that created the Stmt - query string +type Stmt struct { + // Immutable: + db *DB // where we came from + query string // that created the Sttm + + mu sync.Mutex + css []connStmt // can use any that have idle connections } func todo() string { @@ -198,7 +221,53 @@ func todo() string { } func (s *Stmt) Exec(args ...interface{}) os.Error { - panic(todo()) + ci, si, err := s.connStmt() + if err != nil { + return err + } + defer s.db.putConn(ci) + // TODO(bradfitz): convert args from full set (package db) to + // restricted set (package dbimpl) + resi, err := si.Exec(args) + if err != nil { + return err + } + _ = resi // TODO(bradfitz): return these stats, converted to pkg db type + return nil +} + +func (s *Stmt) connStmt(args ...interface{}) (dbimpl.Conn, dbimpl.Stmt, os.Error) { + s.mu.Lock() + var cs connStmt + match := false + for _, v := range s.css { + // TODO(bradfitz): lazily clean up entries in this + // list with dead conns while enumerating + if _, match = s.db.connIfFree(cs.ci); match { + cs = v + break + } + } + s.mu.Unlock() + + // Make a new conn if all are busy. + // TODO(bradfitz): or wait for one? make configurable later? + if !match { + ci, err := s.db.conn() + if err != nil { + return nil, nil, err + } + si, err := ci.Prepare(s.query) + if err != nil { + return nil, nil, err + } + s.mu.Lock() + cs = connStmt{ci, si} + s.css = append(s.css, cs) + s.mu.Unlock() + } + + return cs.ci, cs.si, nil } func (s *Stmt) Query(args ...interface{}) (*Rows, os.Error) { diff --git a/lib/go/camli/db/db_test.go b/lib/go/camli/db/db_test.go index ea81b33db..e28491d01 100644 --- a/lib/go/camli/db/db_test.go +++ b/lib/go/camli/db/db_test.go @@ -21,7 +21,10 @@ import ( ) func TestDb(t *testing.T) { - db, err := Open("test", "foo;wipe") + db, err := Open("test", "foo") + if err := db.Exec("WIPE"); err != nil { + t.Fatalf("exec wipe: %v", err) + } if err != nil { t.Fatalf("Open: %v", err) } @@ -33,4 +36,35 @@ func TestDb(t *testing.T) { if err != nil { t.Errorf("Stmt, err = %v, %v", stmt, err) } + + type execTest struct { + args []interface{} + wantErr string + } + execTests := []execTest{ + // Okay: + {[]interface{}{"Brad", 31}, ""}, + {[]interface{}{"Brad", int64(31)}, ""}, + {[]interface{}{"Bob", "32"}, ""}, + {[]interface{}{7, 9}, ""}, + + // Invalid conversions: + //{[]interface{}{"Brad", int64(0xFFFFFFFF)}, "conversion"}, + //{[]interface{}{"Brad", "strconv fail"}, "conversion"}, + + // Wrong number of args: + //{[]interface{}{}, "wrong number of args"}, + //{[]interface{}{1, 2, 3}, "wrong number of args"}, + } + for n, et := range execTests { + err := stmt.Exec(et.args...) + errStr := "" + if err != nil { + errStr = err.String() + } + if errStr != et.wantErr { + t.Errorf("stmt.Execute #%d: for %v, got error %q, want error %q", + n, et.args, errStr, et.wantErr) + } + } } diff --git a/lib/go/camli/db/dbimpl/dbimpl.go b/lib/go/camli/db/dbimpl/dbimpl.go index 1c5335658..324a744f0 100644 --- a/lib/go/camli/db/dbimpl/dbimpl.go +++ b/lib/go/camli/db/dbimpl/dbimpl.go @@ -57,7 +57,8 @@ type Result interface { RowsAffected() (int64, os.Error) } -// Stmt is a prepared statement. It is bound to a Conn. +// Stmt is a prepared statement. It is bound to a Conn and not +// used by multiple goroutines concurrently. type Stmt interface { Close() os.Error NumInput() int @@ -78,6 +79,16 @@ type Tx interface { Rollback() os.Error } +type RowsAffected int64 + +func (RowsAffected) AutoIncrementId() (int64, os.Error) { + return 0, os.NewError("no AutoIncrementId available") +} + +func (v RowsAffected) RowsAffected() (int64, os.Error) { + return int64(v), nil +} + type ddlSuccess struct{} var DDLSuccess Result = ddlSuccess{} diff --git a/lib/go/camli/db/fakedb_test.go b/lib/go/camli/db/fakedb_test.go index bfc76b9ac..7e121ee49 100644 --- a/lib/go/camli/db/fakedb_test.go +++ b/lib/go/camli/db/fakedb_test.go @@ -56,8 +56,14 @@ type fakeDB struct { } type table struct { + mu sync.Mutex colname []string coltype []string + rows []*row +} + +type row struct { + cols []interface{} // must be same size as its table colname + coltype } type fakeConn struct { @@ -108,15 +114,15 @@ func (d *fakeDriver) Open(dsn string) (dbimpl.Conn, os.Error) { db = &fakeDB{name: name} d.dbs[name] = db } - if len(parts) > 1 && parts[1] == "wipe" { - db.wipe() - } return &fakeConn{db: db}, nil } func (db *fakeDB) wipe() { db.mu.Lock() defer db.mu.Unlock() + if len(db.tables) != 0 { + print("wiped db", db) + } db.tables = nil } @@ -137,16 +143,33 @@ func (db *fakeDB) createTable(name string, columnNames, columnTypes []string) os return nil } +func (db *fakeDB) insert(table string, vals []interface{}) (dbimpl.Result, os.Error) { + db.mu.Lock() + t, ok := db.table(table) + db.mu.Unlock() + if !ok { + return nil, fmt.Errorf("fakedb: table %q doesn't exist", table) + } + t.mu.Lock() + defer t.mu.Unlock() + t.rows = append(t.rows, &row{cols: vals}) + return dbimpl.RowsAffected(1), nil +} + +// must be called with db.mu lock held +func (db *fakeDB) table(table string) (*table, bool) { + if db.tables == nil { + return nil, false + } + t, ok := db.tables[table] + return t, ok +} + func (db *fakeDB) columnType(table, column string) (typ string, ok bool) { db.mu.Lock() defer db.mu.Unlock() - if db.tables == nil { - println("no tables exist") - return - } - t, ok := db.tables[table] + t, ok := db.table(table) if !ok { - println("table no exist") return } for n, cname := range t.colname { @@ -191,6 +214,8 @@ func (c *fakeConn) Prepare(query string) (dbimpl.Stmt, os.Error) { cmd := parts[0] stmt := &fakeStmt{q: query, c: c, cmd: cmd} switch cmd { + case "WIPE": + // Nothing case "CREATE": if len(parts) != 3 { return nil, errf("invalid %q syntax with %d parts; want 3", cmd, len(parts)) @@ -239,12 +264,18 @@ func (s *fakeStmt) Close() os.Error { } func (s *fakeStmt) Exec(args []interface{}) (dbimpl.Result, os.Error) { + db := s.c.db switch s.cmd { + case "WIPE": + db.wipe() + return dbimpl.DDLSuccess, nil case "CREATE": - if err := s.c.db.createTable(s.table, s.colName, s.colType); err != nil { + if err := db.createTable(s.table, s.colName, s.colType); err != nil { return nil, err } return dbimpl.DDLSuccess, nil + case "INSERT": + return db.insert(s.table, args) } fmt.Printf("EXEC statement, cmd=%q: %#v\n", s.cmd, s) return nil, fmt.Errorf("unimplemented statement Exec command type of %q", s.cmd)