pkg/index: move sqlite implementation to sorted

Change-Id: I1e300ffa14547fabeac8d255fd694054ebcb9e53
This commit is contained in:
mpl 2013-12-13 22:14:14 +01:00
parent 3c5a6e27af
commit b62c94fdd1
8 changed files with 416 additions and 124 deletions

View File

@ -27,8 +27,8 @@ import (
"camlistore.org/pkg/cmdmain"
"camlistore.org/pkg/index/mysql"
"camlistore.org/pkg/index/postgres"
"camlistore.org/pkg/index/sqlite"
"camlistore.org/pkg/sorted/mongo"
"camlistore.org/pkg/sorted/sqlite"
_ "camlistore.org/third_party/github.com/lib/pq"
_ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv"

View File

@ -1,5 +1,5 @@
/*
Copyright 2012 Google Inc.
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -19,124 +19,36 @@ limitations under the License.
package sqlite
import (
"database/sql"
"errors"
"fmt"
"os"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/sqlindex"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/sqlite"
)
type storage struct {
*sqlindex.Storage
file string
db *sql.DB
}
var _ sorted.KeyValue = (*storage)(nil)
var compiled = false
// CompiledIn returns whether SQLite support is compiled in.
// If it returns false, the build tag "with_sqlite" was not specified.
func CompiledIn() bool {
return compiled
}
var ErrNotCompiled = errors.New("camlistored was not built with SQLite support. If you built with make.go, use go run make.go --sqlite=true. If you used go get or get install, use go {get,install} --tags=with_sqlite" + compileHint())
func compileHint() string {
if _, err := os.Stat("/etc/apt"); err == nil {
return " (Hint: apt-get install libsqlite3-dev)"
}
return ""
}
// NewStorage returns an sorted.KeyValue implementation of the described SQLite database.
// This exists mostly for testing and does not initialize the schema.
func NewStorage(file string) (sorted.KeyValue, error) {
if !compiled {
return nil, ErrNotCompiled
}
db, err := sql.Open("sqlite3", file)
if err != nil {
return nil, err
}
return &storage{
file: file,
db: db,
Storage: &sqlindex.Storage{
DB: db,
Serial: true,
},
}, nil
func init() {
blobserver.RegisterStorageConstructor("sqliteindexer", newFromConfig)
}
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
var (
blobPrefix = config.RequiredString("blobSource")
file = config.RequiredString("file")
)
blobPrefix := config.RequiredString("blobSource")
file := config.RequiredString("file")
if err := config.Validate(); err != nil {
return nil, err
}
kv, err := sqlite.NewKeyValue(file)
if err != nil {
return nil, err
}
ix := index.New(kv)
sto, err := ld.GetStorage(blobPrefix)
if err != nil {
ix.Close()
return nil, err
}
fi, err := os.Stat(file)
if os.IsNotExist(err) || (err == nil && fi.Size() == 0) {
return nil, fmt.Errorf(`You need to initialize your SQLite index database with: camtool dbinit --dbname=%s --dbtype=sqlite`, file)
}
isto, err := NewStorage(file)
if err != nil {
return nil, err
}
is := isto.(*storage)
version, err := is.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database with 'camtool dbinit %s'?): %v", file, err)
}
if err := is.ping(); err != nil {
return nil, err
}
if version != requiredSchemaVersion {
if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" {
// Good signal that we're using the devcam server, so help out
// the user with a more useful tip:
return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion)
}
return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
version, requiredSchemaVersion)
}
ix := index.New(is)
ix.BlobSource = sto
// Good enough, for now:
ix.KeyFetcher = ix.BlobSource
return ix, nil
}
func init() {
blobserver.RegisterStorageConstructor("sqliteindexer", blobserver.StorageConstructor(newFromConfig))
}
func (mi *storage) ping() error {
// TODO(bradfitz): something more efficient here?
_, err := mi.SchemaVersion()
return err
}
func (mi *storage) SchemaVersion() (version int, err error) {
err = mi.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2012 Google Inc.
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -28,9 +28,9 @@ import (
"camlistore.org/pkg/index"
"camlistore.org/pkg/index/indextest"
"camlistore.org/pkg/index/sqlite"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/kvtest"
"camlistore.org/pkg/sorted/sqlite"
_ "camlistore.org/third_party/github.com/mattn/go-sqlite3"
)
@ -49,14 +49,11 @@ func do(db *sql.DB, sql string) {
panic(fmt.Sprintf("Error %v running SQL: %s", err, sql))
}
func makeSorted(t *testing.T) (s sorted.KeyValue, clean func()) {
func newSorted(t *testing.T) (kv sorted.KeyValue, clean func()) {
f, err := ioutil.TempFile("", "sqlite-test")
if err != nil {
t.Fatal(err)
}
clean = func() {
os.Remove(f.Name())
}
db, err := sql.Open("sqlite3", f.Name())
if err != nil {
t.Fatalf("opening test database: %v", err)
@ -65,22 +62,26 @@ func makeSorted(t *testing.T) (s sorted.KeyValue, clean func()) {
do(db, tableSql)
}
do(db, fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, sqlite.SchemaVersion()))
s, err = sqlite.NewStorage(f.Name())
kv, err = sqlite.NewKeyValue(f.Name())
if err != nil {
panic(err)
t.Fatal(err)
}
return kv, func() {
kv.Close()
os.Remove(f.Name())
}
return s, clean
}
func TestSortedKV(t *testing.T) {
kv, clean := makeSorted(t)
kv, clean := newSorted(t)
defer clean()
kvtest.TestSorted(t, kv)
}
type sqliteTester struct{}
type tester struct{}
func (sqliteTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) {
func (tester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) {
var mu sync.Mutex // guards cleanups
var cleanups []func()
defer func() {
@ -90,7 +91,7 @@ func (sqliteTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)
}
}()
makeIndex := func() *index.Index {
s, cleanup := makeSorted(t)
s, cleanup := newSorted(t)
mu.Lock()
cleanups = append(cleanups, cleanup)
mu.Unlock()
@ -100,23 +101,23 @@ func (sqliteTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)
}
func TestIndex_SQLite(t *testing.T) {
sqliteTester{}.test(t, indextest.Index)
tester{}.test(t, indextest.Index)
}
func TestPathsOfSignerTarget_SQLite(t *testing.T) {
sqliteTester{}.test(t, indextest.PathsOfSignerTarget)
tester{}.test(t, indextest.PathsOfSignerTarget)
}
func TestFiles_SQLite(t *testing.T) {
sqliteTester{}.test(t, indextest.Files)
tester{}.test(t, indextest.Files)
}
func TestEdgesTo_SQLite(t *testing.T) {
sqliteTester{}.test(t, indextest.EdgesTo)
tester{}.test(t, indextest.EdgesTo)
}
func TestDelete_SQLite(t *testing.T) {
sqliteTester{}.test(t, indextest.Delete)
tester{}.test(t, indextest.Delete)
}
func TestConcurrency(t *testing.T) {
@ -124,7 +125,7 @@ func TestConcurrency(t *testing.T) {
t.Logf("skipping for short mode")
return
}
s, clean := makeSorted(t)
s, clean := newSorted(t)
defer clean()
const n = 100
ch := make(chan error)
@ -163,7 +164,7 @@ func TestFDLeak(t *testing.T) {
fd0 := numFDs(t)
t.Logf("fd0 = %d", fd0)
s, clean := makeSorted(t)
s, clean := newSorted(t)
defer clean()
bm := s.BeginBatch()

View File

@ -1,5 +1,5 @@
/*
Copyright 2012 Google Inc.
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -0,0 +1,124 @@
/*
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package sqlite provides an implementation of sorted.KeyValue
// using an SQLite database file.
package sqlite
import (
"database/sql"
"errors"
"fmt"
"os"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/sqlkv"
)
func init() {
sorted.RegisterKeyValue("sqlite", newKeyValueFromConfig)
}
func newKeyValueFromConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
file := cfg.RequiredString("file")
if err := cfg.Validate(); err != nil {
return nil, err
}
return NewKeyValue(file)
}
// NewKeyValue returns a KeyValue implementation on top of
// an SQLite database file.
func NewKeyValue(file string) (sorted.KeyValue, error) {
if !compiled {
return nil, ErrNotCompiled
}
fi, err := os.Stat(file)
if os.IsNotExist(err) || (err == nil && fi.Size() == 0) {
return nil, fmt.Errorf(`You need to initialize your SQLite database with: camtool dbinit --dbname=%s --dbtype=sqlite`, file)
}
db, err := sql.Open("sqlite3", file)
if err != nil {
return nil, err
}
kv := &keyValue{
file: file,
db: db,
KeyValue: &sqlkv.KeyValue{
DB: db,
Serial: true,
},
}
version, err := kv.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database with 'camtool dbinit %s'?): %v", file, err)
}
if err := kv.ping(); err != nil {
return nil, err
}
if version != requiredSchemaVersion {
if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" {
// Good signal that we're using the devcam server, so help out
// the user with a more useful tip:
return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion)
}
return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
version, requiredSchemaVersion)
}
return kv, nil
}
type keyValue struct {
*sqlkv.KeyValue
file string
db *sql.DB
}
var compiled = false
// CompiledIn returns whether SQLite support is compiled in.
// If it returns false, the build tag "with_sqlite" was not specified.
func CompiledIn() bool {
return compiled
}
var ErrNotCompiled = errors.New("camlistored was not built with SQLite support. If you built with make.go, use go run make.go --sqlite=true. If you used go get or get install, use go {get,install} --tags=with_sqlite" + compileHint())
func compileHint() string {
if _, err := os.Stat("/etc/apt"); err == nil {
return " (Hint: apt-get install libsqlite3-dev)"
}
return ""
}
func (kv *keyValue) ping() error {
// TODO(bradfitz): something more efficient here?
_, err := kv.SchemaVersion()
return err
}
func (kv *keyValue) SchemaVersion() (version int, err error) {
err = kv.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}

250
pkg/sorted/sqlkv/sqlkv.go Normal file
View File

@ -0,0 +1,250 @@
/*
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package sqlkv implements the sorted.KeyValue interface using an *sql.DB.
package sqlkv
import (
"database/sql"
"errors"
"fmt"
"log"
"regexp"
"sync"
"camlistore.org/pkg/leak"
"camlistore.org/pkg/sorted"
)
// KeyValue implements the sorted.KeyValue interface using an *sql.DB.
type KeyValue struct {
DB *sql.DB
// SetFunc is an optional func to use when REPLACE INTO does not exist
SetFunc func(*sql.DB, string, string) error
BatchSetFunc func(*sql.Tx, string, string) error
// PlaceHolderFunc optionally replaces ? placeholders with the right ones for the rdbms
// in use
PlaceHolderFunc func(string) string
// Serial determines whether a Go-level mutex protects DB from
// concurrent access. This isn't perfect and exists just for
// SQLite, whose driver likes to return "the database is
// locked" (camlistore.org/issue/114), so this keeps some
// pressure off. But we still trust SQLite to deal with
// concurrency in most cases.
Serial bool
mu sync.Mutex // the mutex used, if Serial is set
}
func (kv *KeyValue) sql(v string) string {
if f := kv.PlaceHolderFunc; f != nil {
return f(v)
}
return v
}
type batchTx struct {
tx *sql.Tx
err error // sticky
// SetFunc is an optional func to use when REPLACE INTO does not exist
SetFunc func(*sql.Tx, string, string) error
// PlaceHolderFunc optionally replaces ? placeholders with the right ones for the rdbms
// in use
PlaceHolderFunc func(string) string
}
func (b *batchTx) sql(v string) string {
if f := b.PlaceHolderFunc; f != nil {
return f(v)
}
return v
}
func (b *batchTx) Set(key, value string) {
if b.err != nil {
return
}
if b.SetFunc != nil {
b.err = b.SetFunc(b.tx, key, value)
return
}
_, b.err = b.tx.Exec(b.sql("REPLACE INTO rows (k, v) VALUES (?, ?)"), key, value)
}
func (b *batchTx) Delete(key string) {
if b.err != nil {
return
}
_, b.err = b.tx.Exec(b.sql("DELETE FROM rows WHERE k=?"), key)
}
func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
if kv.Serial {
kv.mu.Lock()
}
tx, err := kv.DB.Begin()
return &batchTx{
tx: tx,
err: err,
SetFunc: kv.BatchSetFunc,
PlaceHolderFunc: kv.PlaceHolderFunc,
}
}
func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error {
if kv.Serial {
defer kv.mu.Unlock()
}
bt, ok := b.(*batchTx)
if !ok {
return fmt.Errorf("wrong BatchMutation type %T", b)
}
if bt.err != nil {
return bt.err
}
return bt.tx.Commit()
}
func (kv *KeyValue) Get(key string) (value string, err error) {
if kv.Serial {
kv.mu.Lock()
defer kv.mu.Unlock()
}
err = kv.DB.QueryRow(kv.sql("SELECT v FROM rows WHERE k=?"), key).Scan(&value)
if err == sql.ErrNoRows {
err = sorted.ErrNotFound
}
return
}
func (kv *KeyValue) Set(key, value string) error {
if kv.Serial {
kv.mu.Lock()
defer kv.mu.Unlock()
}
if kv.SetFunc != nil {
return kv.SetFunc(kv.DB, key, value)
}
_, err := kv.DB.Exec(kv.sql("REPLACE INTO rows (k, v) VALUES (?, ?)"), key, value)
return err
}
func (kv *KeyValue) Delete(key string) error {
if kv.Serial {
kv.mu.Lock()
defer kv.mu.Unlock()
}
_, err := kv.DB.Exec(kv.sql("DELETE FROM rows WHERE k=?"), key)
return err
}
func (kv *KeyValue) Close() error { return kv.DB.Close() }
func (kv *KeyValue) Find(start, end string) sorted.Iterator {
if kv.Serial {
kv.mu.Lock()
defer kv.mu.Unlock()
}
var rows *sql.Rows
var err error
if end == "" {
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM rows WHERE k >= ? ORDER BY k "), start)
} else {
rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM rows WHERE k >= ? AND k < ? ORDER BY k "), start, end)
}
if err != nil {
log.Printf("unexpected query error: %v", err)
return &iter{err: err}
}
it := &iter{
kv: kv,
rows: rows,
closeCheck: leak.NewChecker(),
}
return it
}
var wordThenPunct = regexp.MustCompile(`^\w+\W$`)
// iter is a iterator over sorted key/value pairs in rows.
type iter struct {
kv *KeyValue
end string // optional end bound
err error // accumulated error, returned at Close
closeCheck *leak.Checker
rows *sql.Rows // if non-nil, the rows we're reading from
key sql.RawBytes
val sql.RawBytes
skey, sval *string // if non-nil, it's been stringified
}
var errClosed = errors.New("sqlkv: Iterator already closed")
func (t *iter) KeyBytes() []byte { return t.key }
func (t *iter) Key() string {
if t.skey != nil {
return *t.skey
}
str := string(t.key)
t.skey = &str
return str
}
func (t *iter) ValueBytes() []byte { return t.val }
func (t *iter) Value() string {
if t.sval != nil {
return *t.sval
}
str := string(t.val)
t.sval = &str
return str
}
func (t *iter) Close() error {
t.closeCheck.Close()
if t.rows != nil {
t.rows.Close()
t.rows = nil
}
err := t.err
t.err = errClosed
return err
}
func (t *iter) Next() bool {
if t.err != nil {
return false
}
t.skey, t.sval = nil, nil
if !t.rows.Next() {
return false
}
t.err = t.rows.Scan(&t.key, &t.val)
if t.err != nil {
log.Printf("unexpected Scan error: %v", t.err)
return false
}
return true
}

View File

@ -60,12 +60,17 @@ import (
_ "camlistore.org/pkg/blobserver/s3"
_ "camlistore.org/pkg/blobserver/shard"
// Indexers: (also present themselves as storage targets)
// sqlite is taken care of in option_sqlite.go
_ "camlistore.org/pkg/index" // base indexer + in-memory dev index
_ "camlistore.org/pkg/index/kvfile"
_ "camlistore.org/pkg/index/mongo"
_ "camlistore.org/pkg/index/mysql"
_ "camlistore.org/pkg/index/postgres"
"camlistore.org/pkg/index/sqlite"
// KeyValue implementations:
_ "camlistore.org/pkg/sorted"
_ "camlistore.org/pkg/sorted/kvfile"
_ "camlistore.org/pkg/sorted/mongo"
"camlistore.org/pkg/sorted/sqlite"
// Handlers:
_ "camlistore.org/pkg/search"