Merge "pkg/index: move postgresql to sorted + some fixes"

This commit is contained in:
mpl 2013-12-23 23:49:13 +00:00 committed by Gerrit Code Review
commit 4a87b954fb
7 changed files with 305 additions and 173 deletions

View File

@ -25,9 +25,9 @@ import (
"strings"
"camlistore.org/pkg/cmdmain"
"camlistore.org/pkg/index/postgres"
"camlistore.org/pkg/sorted/mongo"
"camlistore.org/pkg/sorted/mysql"
"camlistore.org/pkg/sorted/postgres"
"camlistore.org/pkg/sorted/sqlite"
_ "camlistore.org/third_party/github.com/lib/pq"
@ -137,6 +137,10 @@ func (c *dbinitCmd) RunCommand(args []string) error {
}
case "mongo":
return nil
case "postgres":
// because we want string comparison to work as on MySQL and SQLite.
// in particular we want: 'foo|bar' < 'foo}' (which is not the case with an utf8 collation apparently).
do(rootdb, "CREATE DATABASE "+dbname+" LC_COLLATE = 'C' TEMPLATE = template0")
default:
do(rootdb, "CREATE DATABASE "+dbname)
}

View File

@ -20,13 +20,17 @@ import (
"database/sql"
"errors"
"fmt"
"log"
"strings"
"sync"
"testing"
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/indextest"
"camlistore.org/pkg/index/postgres"
"camlistore.org/pkg/osutil"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/kvtest"
"camlistore.org/pkg/sorted/postgres"
"camlistore.org/pkg/test"
_ "camlistore.org/third_party/github.com/lib/pq"
@ -36,58 +40,29 @@ var (
once sync.Once
dbAvailable bool
rootdb *sql.DB
testdb *sql.DB
)
func checkDB() {
var err error
if rootdb, err = sql.Open("postgres", "user=postgres password=postgres host=localhost dbname=postgres"); err == nil {
rootdb, err = sql.Open("postgres", "user=postgres password=postgres host=localhost dbname=postgres")
if err != nil {
log.Printf("Could not open postgres rootdb: %v", err)
return
}
var n int
err := rootdb.QueryRow("SELECT COUNT(*) FROM user").Scan(&n)
err = rootdb.QueryRow("SELECT COUNT(*) FROM user").Scan(&n)
if err == nil {
dbAvailable = true
}
}
}
// TODO(mpl): figure out why we run into that problem of sessions still open
// and then remove that hack.
func closeAllSessions(dbname string) {
query := `
SELECT
pg_terminate_backend(pg_stat_activity.pid)
FROM
pg_stat_activity
WHERE
pg_stat_activity.pid <> pg_backend_pid()
AND datname = '` + dbname + `'`
doQuery(rootdb, query)
func skipOrFailIfNoPostgreSQL(t *testing.T) {
once.Do(checkDB)
if !dbAvailable {
err := errors.New("Not running; start a postgres daemon on the standard port (5432) with password 'postgres' for postgres user")
test.DependencyErrorOrSkip(t)
t.Fatalf("PostGreSQL not available locally for testing: %v", err)
}
func makeIndex() *index.Index {
dbname := "camlitest_" + osutil.Username()
closeAllSessions(dbname)
do(rootdb, "DROP DATABASE IF EXISTS "+dbname)
do(rootdb, "CREATE DATABASE "+dbname)
var err error
testdb, err = sql.Open("postgres", "user=postgres password=postgres host=localhost sslmode=require dbname="+dbname)
if err != nil {
panic("opening test database: " + err.Error())
}
for _, tableSql := range postgres.SQLCreateTables() {
do(testdb, tableSql)
}
for _, statement := range postgres.SQLDefineReplace() {
do(testdb, statement)
}
doQuery(testdb, fmt.Sprintf(`SELECT replaceintometa('version', '%d')`, postgres.SchemaVersion()))
s, err := postgres.NewStorage("localhost", "postgres", "postgres", dbname, "require")
if err != nil {
panic(err)
}
return index.New(s)
}
func do(db *sql.DB, sql string) {
@ -95,7 +70,7 @@ func do(db *sql.DB, sql string) {
if err == nil {
return
}
panic(fmt.Sprintf("Error %v running SQL: %s", err, sql))
log.Fatalf("Error %v running SQL: %s", err, sql)
}
func doQuery(db *sql.DB, sql string) {
@ -104,17 +79,117 @@ func doQuery(db *sql.DB, sql string) {
r.Close()
return
}
panic(fmt.Sprintf("Error %v running SQL: %s", err, sql))
log.Fatalf("Error %v running SQL query: %s", err, sql)
}
// closeAllSessions connects to the "postgres" DB on cfg.Host, and closes all connections to cfg.Database.
func closeAllSessions(cfg postgres.Config) error {
conninfo := fmt.Sprintf("user=%s dbname=postgres host=%s password=%s sslmode=%s",
cfg.User, cfg.Host, cfg.Password, cfg.SSLMode)
rootdb, err := sql.Open("postgres", conninfo)
if err != nil {
return fmt.Errorf("Could not open root db: %v", err)
}
defer rootdb.Close()
query := `
SELECT
pg_terminate_backend(pg_stat_activity.pid)
FROM
pg_stat_activity
WHERE
datname = '` + cfg.Database +
`' AND pid <> pg_backend_pid()`
// They changed procpid to pid in 9.2
if version(rootdb) < "9.2" {
query = strings.Replace(query, ".pid", ".procpid", 1)
query = strings.Replace(query, "AND pid", "AND procpid", 1)
}
r, err := rootdb.Query(query)
if err != nil {
return fmt.Errorf("Error running SQL query\n %v\n: %s", query, err)
}
r.Close()
return nil
}
func version(db *sql.DB) string {
version := ""
if err := db.QueryRow("SELECT version()").Scan(&version); err != nil {
log.Fatalf("Could not get PostgreSQL version: %v", err)
}
fields := strings.Fields(version)
if len(fields) < 2 {
log.Fatalf("Could not get PostgreSQL version because bogus answer: %q", version)
}
return fields[1]
}
func newSorted(t *testing.T) (kv sorted.KeyValue, clean func()) {
skipOrFailIfNoPostgreSQL(t)
dbname := "camlitest_" + osutil.Username()
if err := closeAllSessions(postgres.Config{
User: "postgres",
Password: "postgres",
SSLMode: "require",
Database: dbname,
Host: "localhost",
}); err != nil {
t.Fatalf("Could not close all old sessions to %q: %v", dbname, err)
}
do(rootdb, "DROP DATABASE IF EXISTS "+dbname)
do(rootdb, "CREATE DATABASE "+dbname+" LC_COLLATE = 'C' TEMPLATE = template0")
testdb, err := sql.Open("postgres", "user=postgres password=postgres host=localhost sslmode=require dbname="+dbname)
if err != nil {
t.Fatalf("opening test database: " + err.Error())
}
for _, tableSql := range postgres.SQLCreateTables() {
do(testdb, tableSql)
}
for _, statement := range postgres.SQLDefineReplace() {
do(testdb, statement)
}
doQuery(testdb, fmt.Sprintf(`SELECT replaceintometa('version', '%d')`, postgres.SchemaVersion()))
kv, err = postgres.NewKeyValue(postgres.Config{
Host: "localhost",
Database: dbname,
User: "postgres",
Password: "postgres",
SSLMode: "require",
})
if err != nil {
t.Fatal(err)
}
return kv, func() {
kv.Close()
}
}
func TestSortedKV(t *testing.T) {
kv, clean := newSorted(t)
defer clean()
kvtest.TestSorted(t, kv)
}
type postgresTester struct{}
func (postgresTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) {
once.Do(checkDB)
if !dbAvailable {
err := errors.New("Not running; start a postgres daemon on the standard port (5432) with password 'postgres' for postgres user")
test.DependencyErrorOrSkip(t)
t.Fatalf("PostGreSQL not available locally for testing: %v", err)
var mu sync.Mutex // guards cleanups
var cleanups []func()
defer func() {
mu.Lock() // never unlocked
for _, fn := range cleanups {
fn()
}
}()
makeIndex := func() *index.Index {
s, cleanup := newSorted(t)
mu.Lock()
cleanups = append(cleanups, cleanup)
mu.Unlock()
return index.New(s)
}
tfn(t, makeIndex)
}

View File

@ -15,145 +15,42 @@ limitations under the License.
*/
// Package postgres implements the Camlistore index storage abstraction
// on top of Postgres.
// on top of PostgreSQL.
package postgres
import (
"database/sql"
"fmt"
"os"
"regexp"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/sqlindex"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/postgres"
_ "camlistore.org/third_party/github.com/lib/pq"
)
type myIndexStorage struct {
*sqlindex.Storage
host, user, password, database string
db *sql.DB
}
var _ sorted.KeyValue = (*myIndexStorage)(nil)
// postgres does not have REPLACE INTO (upsert), so we use that custom
// one for Set operations instead
func altSet(db *sql.DB, key, value string) error {
r, err := db.Query("SELECT replaceinto($1, $2)", key, value)
if err != nil {
return err
}
return r.Close()
}
// postgres does not have REPLACE INTO (upsert), so we use that custom
// one for Set operations in batch instead
func altBatchSet(tx *sql.Tx, key, value string) error {
r, err := tx.Query("SELECT replaceinto($1, $2)", key, value)
if err != nil {
return err
}
return r.Close()
}
var qmark = regexp.MustCompile(`\?`)
// replace all ? placeholders into the corresponding $n in queries
var replacePlaceHolders = func(query string) string {
i := 0
dollarInc := func(b []byte) []byte {
i++
return []byte(fmt.Sprintf("$%d", i))
}
return string(qmark.ReplaceAllFunc([]byte(query), dollarInc))
}
// NewStorage returns an sorted.KeyValue implementation of the described PostgreSQL database.
// This exists mostly for testing and does not initialize the schema.
func NewStorage(host, user, password, dbname, sslmode string) (sorted.KeyValue, error) {
conninfo := fmt.Sprintf("user=%s dbname=%s host=%s password=%s sslmode=%s", user, dbname, host, password, sslmode)
db, err := sql.Open("postgres", conninfo)
if err != nil {
return nil, err
}
return &myIndexStorage{
db: db,
Storage: &sqlindex.Storage{
DB: db,
SetFunc: altSet,
BatchSetFunc: altBatchSet,
PlaceHolderFunc: replacePlaceHolders,
},
host: host,
user: user,
password: password,
database: dbname,
}, nil
func init() {
blobserver.RegisterStorageConstructor("postgresindexer", newFromConfig)
}
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
var (
blobPrefix = config.RequiredString("blobSource")
host = config.OptionalString("host", "localhost")
user = config.RequiredString("user")
password = config.OptionalString("password", "")
database = config.RequiredString("database")
sslmode = config.OptionalString("sslmode", "require")
)
if err := config.Validate(); err != nil {
blobPrefix := config.RequiredString("blobSource")
postgresConf, err := postgres.ConfigFromJSON(config)
if err != nil {
return nil, err
}
kv, err := postgres.NewKeyValue(postgresConf)
if err != nil {
return nil, err
}
ix := index.New(kv)
sto, err := ld.GetStorage(blobPrefix)
if err != nil {
ix.Close()
return nil, err
}
isto, err := NewStorage(host, user, password, database, sslmode)
if err != nil {
return nil, err
}
is := isto.(*myIndexStorage)
if err := is.ping(); err != nil {
return nil, err
}
version, err := is.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err)
}
if version != requiredSchemaVersion {
if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" {
// Good signal that we're using the devcam server, so help out
// the user with a more useful tip:
return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion)
}
return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
version, requiredSchemaVersion)
}
ix := index.New(is)
ix.BlobSource = sto
// Good enough, for now:
ix.KeyFetcher = ix.BlobSource
return ix, nil
}
func init() {
blobserver.RegisterStorageConstructor("postgresindexer", blobserver.StorageConstructor(newFromConfig))
}
func (mi *myIndexStorage) ping() error {
// TODO(bradfitz): something more efficient here?
_, err := mi.SchemaVersion()
return err
}
func (mi *myIndexStorage) SchemaVersion() (version int, err error) {
err = mi.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2011 Google Inc.
Copyright 2011 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -16,13 +16,12 @@ limitations under the License.
package postgres
const requiredSchemaVersion = 1
const requiredSchemaVersion = 2
func SchemaVersion() int {
return requiredSchemaVersion
}
// TODO(mpl): use hstore
func SQLCreateTables() []string {
return []string{
`CREATE TABLE rows (

View File

@ -0,0 +1,156 @@
/*
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package postgres provides an implementation of sorted.KeyValue
// on top of PostgreSQL.
package postgres
import (
"database/sql"
"fmt"
"os"
"regexp"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/sqlkv"
_ "camlistore.org/third_party/github.com/lib/pq"
)
func init() {
sorted.RegisterKeyValue("postgresql", newKeyValueFromJSONConfig)
}
// Config holds the parameters used to connect to the PostgreSQL db.
type Config struct {
Host string // Optional. Defaults to "localhost" in ConfigFromJSON.
Database string // Required.
User string // Required.
Password string // Optional.
SSLMode string // Optional. Defaults to "require" in ConfigFromJSON.
}
// ConfigFromJSON populates Config from config, and validates
// config. It returns an error if config fails to validate.
func ConfigFromJSON(config jsonconfig.Obj) (Config, error) {
conf := Config{
Host: config.OptionalString("host", "localhost"),
User: config.RequiredString("user"),
Password: config.OptionalString("password", ""),
Database: config.RequiredString("database"),
SSLMode: config.OptionalString("sslmode", "require"),
}
if err := config.Validate(); err != nil {
return Config{}, err
}
return conf, nil
}
func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
conf, err := ConfigFromJSON(cfg)
if err != nil {
return nil, err
}
return NewKeyValue(conf)
}
// NewKeyValue returns a sorted.KeyValue implementation of the described PostgreSQL database.
func NewKeyValue(cfg Config) (sorted.KeyValue, error) {
conninfo := fmt.Sprintf("user=%s dbname=%s host=%s password=%s sslmode=%s",
cfg.User, cfg.Database, cfg.Host, cfg.Password, cfg.SSLMode)
db, err := sql.Open("postgres", conninfo)
if err != nil {
return nil, err
}
kv := &keyValue{
db: db,
KeyValue: &sqlkv.KeyValue{
DB: db,
SetFunc: altSet,
BatchSetFunc: altBatchSet,
PlaceHolderFunc: replacePlaceHolders,
},
conf: cfg,
}
if err := kv.ping(); err != nil {
return nil, fmt.Errorf("PostgreSQL db unreachable: %v", err)
}
version, err := kv.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err)
}
if version != requiredSchemaVersion {
if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" {
// Good signal that we're using the devcam server, so help out
// the user with a more useful tip:
return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion)
}
return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
version, requiredSchemaVersion)
}
return kv, nil
}
type keyValue struct {
*sqlkv.KeyValue
conf Config
db *sql.DB
}
// postgres does not have REPLACE INTO (upsert), so we use that custom
// one for Set operations instead
func altSet(db *sql.DB, key, value string) error {
r, err := db.Query("SELECT replaceinto($1, $2)", key, value)
if err != nil {
return err
}
return r.Close()
}
// postgres does not have REPLACE INTO (upsert), so we use that custom
// one for Set operations in batch instead
func altBatchSet(tx *sql.Tx, key, value string) error {
r, err := tx.Query("SELECT replaceinto($1, $2)", key, value)
if err != nil {
return err
}
return r.Close()
}
var qmark = regexp.MustCompile(`\?`)
// replace all ? placeholders into the corresponding $n in queries
var replacePlaceHolders = func(query string) string {
i := 0
dollarInc := func(b []byte) []byte {
i++
return []byte(fmt.Sprintf("$%d", i))
}
return string(qmark.ReplaceAllFunc([]byte(query), dollarInc))
}
func (kv *keyValue) ping() error {
_, err := kv.SchemaVersion()
return err
}
func (kv *keyValue) SchemaVersion() (version int, err error) {
err = kv.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}

View File

@ -71,6 +71,7 @@ import (
_ "camlistore.org/pkg/sorted/kvfile"
_ "camlistore.org/pkg/sorted/mongo"
_ "camlistore.org/pkg/sorted/mysql"
_ "camlistore.org/pkg/sorted/postgres"
"camlistore.org/pkg/sorted/sqlite"
// Handlers: