diff --git a/cmd/camtool/dbinit.go b/cmd/camtool/dbinit.go index ad5627869..bf0bef769 100644 --- a/cmd/camtool/dbinit.go +++ b/cmd/camtool/dbinit.go @@ -27,8 +27,8 @@ import ( "camlistore.org/pkg/cmdmain" "camlistore.org/pkg/index/mysql" "camlistore.org/pkg/index/postgres" - "camlistore.org/pkg/index/sqlite" "camlistore.org/pkg/sorted/mongo" + "camlistore.org/pkg/sorted/sqlite" _ "camlistore.org/third_party/github.com/lib/pq" _ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv" diff --git a/pkg/index/sqlite/sqlite.go b/pkg/index/sqlite/sqlite.go index 95d8626ff..00b3b1788 100644 --- a/pkg/index/sqlite/sqlite.go +++ b/pkg/index/sqlite/sqlite.go @@ -1,5 +1,5 @@ /* -Copyright 2012 Google Inc. +Copyright 2012 The Camlistore Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,124 +19,36 @@ limitations under the License. package sqlite import ( - "database/sql" - "errors" - "fmt" - "os" - "camlistore.org/pkg/blobserver" "camlistore.org/pkg/index" - "camlistore.org/pkg/index/sqlindex" "camlistore.org/pkg/jsonconfig" - "camlistore.org/pkg/sorted" + "camlistore.org/pkg/sorted/sqlite" ) -type storage struct { - *sqlindex.Storage - - file string - db *sql.DB -} - -var _ sorted.KeyValue = (*storage)(nil) - -var compiled = false - -// CompiledIn returns whether SQLite support is compiled in. -// If it returns false, the build tag "with_sqlite" was not specified. -func CompiledIn() bool { - return compiled -} - -var ErrNotCompiled = errors.New("camlistored was not built with SQLite support. If you built with make.go, use go run make.go --sqlite=true. If you used go get or get install, use go {get,install} --tags=with_sqlite" + compileHint()) - -func compileHint() string { - if _, err := os.Stat("/etc/apt"); err == nil { - return " (Hint: apt-get install libsqlite3-dev)" - } - return "" -} - -// NewStorage returns an sorted.KeyValue implementation of the described SQLite database. -// This exists mostly for testing and does not initialize the schema. -func NewStorage(file string) (sorted.KeyValue, error) { - if !compiled { - return nil, ErrNotCompiled - } - db, err := sql.Open("sqlite3", file) - if err != nil { - return nil, err - } - return &storage{ - file: file, - db: db, - Storage: &sqlindex.Storage{ - DB: db, - Serial: true, - }, - }, nil +func init() { + blobserver.RegisterStorageConstructor("sqliteindexer", newFromConfig) } func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { - var ( - blobPrefix = config.RequiredString("blobSource") - file = config.RequiredString("file") - ) + blobPrefix := config.RequiredString("blobSource") + file := config.RequiredString("file") if err := config.Validate(); err != nil { return nil, err } + kv, err := sqlite.NewKeyValue(file) + if err != nil { + return nil, err + } + + ix := index.New(kv) sto, err := ld.GetStorage(blobPrefix) if err != nil { + ix.Close() return nil, err } - fi, err := os.Stat(file) - if os.IsNotExist(err) || (err == nil && fi.Size() == 0) { - return nil, fmt.Errorf(`You need to initialize your SQLite index database with: camtool dbinit --dbname=%s --dbtype=sqlite`, file) - } - isto, err := NewStorage(file) - if err != nil { - return nil, err - } - is := isto.(*storage) - - version, err := is.SchemaVersion() - if err != nil { - return nil, fmt.Errorf("error getting schema version (need to init database with 'camtool dbinit %s'?): %v", file, err) - } - - if err := is.ping(); err != nil { - return nil, err - } - - if version != requiredSchemaVersion { - if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" { - // Good signal that we're using the devcam server, so help out - // the user with a more useful tip: - return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion) - } - return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)", - version, requiredSchemaVersion) - } - - ix := index.New(is) ix.BlobSource = sto // Good enough, for now: ix.KeyFetcher = ix.BlobSource return ix, nil } - -func init() { - blobserver.RegisterStorageConstructor("sqliteindexer", blobserver.StorageConstructor(newFromConfig)) -} - -func (mi *storage) ping() error { - // TODO(bradfitz): something more efficient here? - _, err := mi.SchemaVersion() - return err -} - -func (mi *storage) SchemaVersion() (version int, err error) { - err = mi.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version) - return -} diff --git a/pkg/index/sqlite/sqlite_test.go b/pkg/index/sqlite/sqlite_test.go index f54bde3a3..8ed3dd84c 100644 --- a/pkg/index/sqlite/sqlite_test.go +++ b/pkg/index/sqlite/sqlite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2012 Google Inc. +Copyright 2012 The Camlistore Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,9 +28,9 @@ import ( "camlistore.org/pkg/index" "camlistore.org/pkg/index/indextest" - "camlistore.org/pkg/index/sqlite" "camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted/kvtest" + "camlistore.org/pkg/sorted/sqlite" _ "camlistore.org/third_party/github.com/mattn/go-sqlite3" ) @@ -49,14 +49,11 @@ func do(db *sql.DB, sql string) { panic(fmt.Sprintf("Error %v running SQL: %s", err, sql)) } -func makeSorted(t *testing.T) (s sorted.KeyValue, clean func()) { +func newSorted(t *testing.T) (kv sorted.KeyValue, clean func()) { f, err := ioutil.TempFile("", "sqlite-test") if err != nil { t.Fatal(err) } - clean = func() { - os.Remove(f.Name()) - } db, err := sql.Open("sqlite3", f.Name()) if err != nil { t.Fatalf("opening test database: %v", err) @@ -65,22 +62,26 @@ func makeSorted(t *testing.T) (s sorted.KeyValue, clean func()) { do(db, tableSql) } do(db, fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, sqlite.SchemaVersion())) - s, err = sqlite.NewStorage(f.Name()) + + kv, err = sqlite.NewKeyValue(f.Name()) if err != nil { - panic(err) + t.Fatal(err) + } + return kv, func() { + kv.Close() + os.Remove(f.Name()) } - return s, clean } func TestSortedKV(t *testing.T) { - kv, clean := makeSorted(t) + kv, clean := newSorted(t) defer clean() kvtest.TestSorted(t, kv) } -type sqliteTester struct{} +type tester struct{} -func (sqliteTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) { +func (tester) test(t *testing.T, tfn func(*testing.T, func() *index.Index)) { var mu sync.Mutex // guards cleanups var cleanups []func() defer func() { @@ -90,7 +91,7 @@ func (sqliteTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index) } }() makeIndex := func() *index.Index { - s, cleanup := makeSorted(t) + s, cleanup := newSorted(t) mu.Lock() cleanups = append(cleanups, cleanup) mu.Unlock() @@ -100,23 +101,23 @@ func (sqliteTester) test(t *testing.T, tfn func(*testing.T, func() *index.Index) } func TestIndex_SQLite(t *testing.T) { - sqliteTester{}.test(t, indextest.Index) + tester{}.test(t, indextest.Index) } func TestPathsOfSignerTarget_SQLite(t *testing.T) { - sqliteTester{}.test(t, indextest.PathsOfSignerTarget) + tester{}.test(t, indextest.PathsOfSignerTarget) } func TestFiles_SQLite(t *testing.T) { - sqliteTester{}.test(t, indextest.Files) + tester{}.test(t, indextest.Files) } func TestEdgesTo_SQLite(t *testing.T) { - sqliteTester{}.test(t, indextest.EdgesTo) + tester{}.test(t, indextest.EdgesTo) } func TestDelete_SQLite(t *testing.T) { - sqliteTester{}.test(t, indextest.Delete) + tester{}.test(t, indextest.Delete) } func TestConcurrency(t *testing.T) { @@ -124,7 +125,7 @@ func TestConcurrency(t *testing.T) { t.Logf("skipping for short mode") return } - s, clean := makeSorted(t) + s, clean := newSorted(t) defer clean() const n = 100 ch := make(chan error) @@ -163,7 +164,7 @@ func TestFDLeak(t *testing.T) { fd0 := numFDs(t) t.Logf("fd0 = %d", fd0) - s, clean := makeSorted(t) + s, clean := newSorted(t) defer clean() bm := s.BeginBatch() diff --git a/pkg/index/sqlite/dbschema.go b/pkg/sorted/sqlite/dbschema.go similarity index 98% rename from pkg/index/sqlite/dbschema.go rename to pkg/sorted/sqlite/dbschema.go index 40ccb3c63..8b220c441 100644 --- a/pkg/index/sqlite/dbschema.go +++ b/pkg/sorted/sqlite/dbschema.go @@ -1,5 +1,5 @@ /* -Copyright 2012 Google Inc. +Copyright 2012 The Camlistore Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/index/sqlite/sqlite_cond.go b/pkg/sorted/sqlite/sqlite_cond.go similarity index 100% rename from pkg/index/sqlite/sqlite_cond.go rename to pkg/sorted/sqlite/sqlite_cond.go diff --git a/pkg/sorted/sqlite/sqlitekv.go b/pkg/sorted/sqlite/sqlitekv.go new file mode 100644 index 000000000..4951f65ba --- /dev/null +++ b/pkg/sorted/sqlite/sqlitekv.go @@ -0,0 +1,124 @@ +/* +Copyright 2012 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package sqlite provides an implementation of sorted.KeyValue +// using an SQLite database file. +package sqlite + +import ( + "database/sql" + "errors" + "fmt" + "os" + + "camlistore.org/pkg/jsonconfig" + "camlistore.org/pkg/sorted" + "camlistore.org/pkg/sorted/sqlkv" +) + +func init() { + sorted.RegisterKeyValue("sqlite", newKeyValueFromConfig) +} + +func newKeyValueFromConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) { + file := cfg.RequiredString("file") + if err := cfg.Validate(); err != nil { + return nil, err + } + return NewKeyValue(file) +} + +// NewKeyValue returns a KeyValue implementation on top of +// an SQLite database file. +func NewKeyValue(file string) (sorted.KeyValue, error) { + if !compiled { + return nil, ErrNotCompiled + } + + fi, err := os.Stat(file) + if os.IsNotExist(err) || (err == nil && fi.Size() == 0) { + return nil, fmt.Errorf(`You need to initialize your SQLite database with: camtool dbinit --dbname=%s --dbtype=sqlite`, file) + } + db, err := sql.Open("sqlite3", file) + if err != nil { + return nil, err + } + kv := &keyValue{ + file: file, + db: db, + KeyValue: &sqlkv.KeyValue{ + DB: db, + Serial: true, + }, + } + + version, err := kv.SchemaVersion() + if err != nil { + return nil, fmt.Errorf("error getting schema version (need to init database with 'camtool dbinit %s'?): %v", file, err) + } + + if err := kv.ping(); err != nil { + return nil, err + } + + if version != requiredSchemaVersion { + if os.Getenv("CAMLI_DEV_CAMLI_ROOT") != "" { + // Good signal that we're using the devcam server, so help out + // the user with a more useful tip: + return nil, fmt.Errorf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion) + } + return nil, fmt.Errorf("database schema version is %d; expect %d (need to re-init/upgrade database?)", + version, requiredSchemaVersion) + } + + return kv, nil + +} + +type keyValue struct { + *sqlkv.KeyValue + + file string + db *sql.DB +} + +var compiled = false + +// CompiledIn returns whether SQLite support is compiled in. +// If it returns false, the build tag "with_sqlite" was not specified. +func CompiledIn() bool { + return compiled +} + +var ErrNotCompiled = errors.New("camlistored was not built with SQLite support. If you built with make.go, use go run make.go --sqlite=true. If you used go get or get install, use go {get,install} --tags=with_sqlite" + compileHint()) + +func compileHint() string { + if _, err := os.Stat("/etc/apt"); err == nil { + return " (Hint: apt-get install libsqlite3-dev)" + } + return "" +} + +func (kv *keyValue) ping() error { + // TODO(bradfitz): something more efficient here? + _, err := kv.SchemaVersion() + return err +} + +func (kv *keyValue) SchemaVersion() (version int, err error) { + err = kv.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version) + return +} diff --git a/pkg/sorted/sqlkv/sqlkv.go b/pkg/sorted/sqlkv/sqlkv.go new file mode 100644 index 000000000..5a59249da --- /dev/null +++ b/pkg/sorted/sqlkv/sqlkv.go @@ -0,0 +1,250 @@ +/* +Copyright 2012 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package sqlkv implements the sorted.KeyValue interface using an *sql.DB. +package sqlkv + +import ( + "database/sql" + "errors" + "fmt" + "log" + "regexp" + "sync" + + "camlistore.org/pkg/leak" + "camlistore.org/pkg/sorted" +) + +// KeyValue implements the sorted.KeyValue interface using an *sql.DB. +type KeyValue struct { + DB *sql.DB + + // SetFunc is an optional func to use when REPLACE INTO does not exist + SetFunc func(*sql.DB, string, string) error + BatchSetFunc func(*sql.Tx, string, string) error + + // PlaceHolderFunc optionally replaces ? placeholders with the right ones for the rdbms + // in use + PlaceHolderFunc func(string) string + + // Serial determines whether a Go-level mutex protects DB from + // concurrent access. This isn't perfect and exists just for + // SQLite, whose driver likes to return "the database is + // locked" (camlistore.org/issue/114), so this keeps some + // pressure off. But we still trust SQLite to deal with + // concurrency in most cases. + Serial bool + + mu sync.Mutex // the mutex used, if Serial is set +} + +func (kv *KeyValue) sql(v string) string { + if f := kv.PlaceHolderFunc; f != nil { + return f(v) + } + return v +} + +type batchTx struct { + tx *sql.Tx + err error // sticky + + // SetFunc is an optional func to use when REPLACE INTO does not exist + SetFunc func(*sql.Tx, string, string) error + + // PlaceHolderFunc optionally replaces ? placeholders with the right ones for the rdbms + // in use + PlaceHolderFunc func(string) string +} + +func (b *batchTx) sql(v string) string { + if f := b.PlaceHolderFunc; f != nil { + return f(v) + } + return v +} + +func (b *batchTx) Set(key, value string) { + if b.err != nil { + return + } + if b.SetFunc != nil { + b.err = b.SetFunc(b.tx, key, value) + return + } + _, b.err = b.tx.Exec(b.sql("REPLACE INTO rows (k, v) VALUES (?, ?)"), key, value) +} + +func (b *batchTx) Delete(key string) { + if b.err != nil { + return + } + _, b.err = b.tx.Exec(b.sql("DELETE FROM rows WHERE k=?"), key) +} + +func (kv *KeyValue) BeginBatch() sorted.BatchMutation { + if kv.Serial { + kv.mu.Lock() + } + tx, err := kv.DB.Begin() + return &batchTx{ + tx: tx, + err: err, + SetFunc: kv.BatchSetFunc, + PlaceHolderFunc: kv.PlaceHolderFunc, + } +} + +func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error { + if kv.Serial { + defer kv.mu.Unlock() + } + bt, ok := b.(*batchTx) + if !ok { + return fmt.Errorf("wrong BatchMutation type %T", b) + } + if bt.err != nil { + return bt.err + } + return bt.tx.Commit() +} + +func (kv *KeyValue) Get(key string) (value string, err error) { + if kv.Serial { + kv.mu.Lock() + defer kv.mu.Unlock() + } + err = kv.DB.QueryRow(kv.sql("SELECT v FROM rows WHERE k=?"), key).Scan(&value) + if err == sql.ErrNoRows { + err = sorted.ErrNotFound + } + return +} + +func (kv *KeyValue) Set(key, value string) error { + if kv.Serial { + kv.mu.Lock() + defer kv.mu.Unlock() + } + if kv.SetFunc != nil { + return kv.SetFunc(kv.DB, key, value) + } + _, err := kv.DB.Exec(kv.sql("REPLACE INTO rows (k, v) VALUES (?, ?)"), key, value) + return err +} + +func (kv *KeyValue) Delete(key string) error { + if kv.Serial { + kv.mu.Lock() + defer kv.mu.Unlock() + } + _, err := kv.DB.Exec(kv.sql("DELETE FROM rows WHERE k=?"), key) + return err +} + +func (kv *KeyValue) Close() error { return kv.DB.Close() } + +func (kv *KeyValue) Find(start, end string) sorted.Iterator { + if kv.Serial { + kv.mu.Lock() + defer kv.mu.Unlock() + } + var rows *sql.Rows + var err error + if end == "" { + rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM rows WHERE k >= ? ORDER BY k "), start) + } else { + rows, err = kv.DB.Query(kv.sql("SELECT k, v FROM rows WHERE k >= ? AND k < ? ORDER BY k "), start, end) + } + if err != nil { + log.Printf("unexpected query error: %v", err) + return &iter{err: err} + } + + it := &iter{ + kv: kv, + rows: rows, + closeCheck: leak.NewChecker(), + } + return it +} + +var wordThenPunct = regexp.MustCompile(`^\w+\W$`) + +// iter is a iterator over sorted key/value pairs in rows. +type iter struct { + kv *KeyValue + end string // optional end bound + err error // accumulated error, returned at Close + + closeCheck *leak.Checker + + rows *sql.Rows // if non-nil, the rows we're reading from + + key sql.RawBytes + val sql.RawBytes + skey, sval *string // if non-nil, it's been stringified +} + +var errClosed = errors.New("sqlkv: Iterator already closed") + +func (t *iter) KeyBytes() []byte { return t.key } +func (t *iter) Key() string { + if t.skey != nil { + return *t.skey + } + str := string(t.key) + t.skey = &str + return str +} + +func (t *iter) ValueBytes() []byte { return t.val } +func (t *iter) Value() string { + if t.sval != nil { + return *t.sval + } + str := string(t.val) + t.sval = &str + return str +} + +func (t *iter) Close() error { + t.closeCheck.Close() + if t.rows != nil { + t.rows.Close() + t.rows = nil + } + err := t.err + t.err = errClosed + return err +} + +func (t *iter) Next() bool { + if t.err != nil { + return false + } + t.skey, t.sval = nil, nil + if !t.rows.Next() { + return false + } + t.err = t.rows.Scan(&t.key, &t.val) + if t.err != nil { + log.Printf("unexpected Scan error: %v", t.err) + return false + } + return true +} diff --git a/server/camlistored/camlistored.go b/server/camlistored/camlistored.go index a8e590d97..f48b8ff06 100644 --- a/server/camlistored/camlistored.go +++ b/server/camlistored/camlistored.go @@ -60,12 +60,17 @@ import ( _ "camlistore.org/pkg/blobserver/s3" _ "camlistore.org/pkg/blobserver/shard" // Indexers: (also present themselves as storage targets) + // sqlite is taken care of in option_sqlite.go _ "camlistore.org/pkg/index" // base indexer + in-memory dev index _ "camlistore.org/pkg/index/kvfile" _ "camlistore.org/pkg/index/mongo" _ "camlistore.org/pkg/index/mysql" _ "camlistore.org/pkg/index/postgres" - "camlistore.org/pkg/index/sqlite" + // KeyValue implementations: + _ "camlistore.org/pkg/sorted" + _ "camlistore.org/pkg/sorted/kvfile" + _ "camlistore.org/pkg/sorted/mongo" + "camlistore.org/pkg/sorted/sqlite" // Handlers: _ "camlistore.org/pkg/search"