mirror of https://github.com/perkeep/perkeep.git
sorted/sqlkv, sorted/sqlite: replace Serial bool with a *syncutil.Gate
Change-Id: I820220d49b1d316d2be0ab7fe5bf302d204108b0
This commit is contained in:
parent
6fd35bd4de
commit
b3ba9aa7cd
|
@ -28,6 +28,7 @@ import (
|
|||
"camlistore.org/pkg/sorted"
|
||||
"camlistore.org/pkg/sorted/sqlkv"
|
||||
"go4.org/jsonconfig"
|
||||
"go4.org/syncutil"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -64,8 +65,8 @@ func newKeyValueFromConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
|
|||
file: file,
|
||||
db: db,
|
||||
KeyValue: &sqlkv.KeyValue{
|
||||
DB: db,
|
||||
Serial: true,
|
||||
DB: db,
|
||||
Gate: syncutil.NewGate(1),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
"camlistore.org/pkg/leak"
|
||||
"camlistore.org/pkg/sorted"
|
||||
"go4.org/syncutil"
|
||||
)
|
||||
|
||||
// KeyValue implements the sorted.KeyValue interface using an *sql.DB.
|
||||
|
@ -42,20 +43,21 @@ type KeyValue struct {
|
|||
// with the right ones for the rdbms in use.
|
||||
PlaceHolderFunc func(string) string
|
||||
|
||||
// Serial determines whether a Go-level mutex protects DB from
|
||||
// concurrent access. This isn't perfect and exists just for
|
||||
// SQLite, whose driver likes to return "the database is
|
||||
// locked" (camlistore.org/issue/114), so this keeps some
|
||||
// pressure off. But we still trust SQLite to deal with
|
||||
// concurrency in most cases.
|
||||
Serial bool
|
||||
// Gate optionally limits concurrent access.
|
||||
//
|
||||
// This originally existed just for SQLite, whose driver likes
|
||||
// to return "the database is locked"
|
||||
// (camlistore.org/issue/114), so this keeps some pressure
|
||||
// off. But we still trust SQLite to deal with concurrency in
|
||||
// most cases.
|
||||
//
|
||||
// It's also used to limit the number of MySQL connections.
|
||||
Gate *syncutil.Gate
|
||||
|
||||
// TablePrefix optionally provides a prefix for SQL table
|
||||
// names. This is typically "dbname.", ending in a period.
|
||||
TablePrefix string
|
||||
|
||||
mu sync.Mutex // the mutex used, if Serial is set
|
||||
|
||||
queriesInitOnce sync.Once // guards initialization of both queries and replacer
|
||||
replacer *strings.Replacer
|
||||
|
||||
|
@ -126,8 +128,8 @@ func (b *batchTx) Delete(key string) {
|
|||
}
|
||||
|
||||
func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
|
||||
if kv.Serial {
|
||||
kv.mu.Lock()
|
||||
if kv.Gate != nil {
|
||||
kv.Gate.Start()
|
||||
}
|
||||
tx, err := kv.DB.Begin()
|
||||
if err != nil {
|
||||
|
@ -141,8 +143,8 @@ func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
|
|||
}
|
||||
|
||||
func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error {
|
||||
if kv.Serial {
|
||||
defer kv.mu.Unlock()
|
||||
if kv.Gate != nil {
|
||||
defer kv.Gate.Done()
|
||||
}
|
||||
bt, ok := b.(*batchTx)
|
||||
if !ok {
|
||||
|
@ -155,9 +157,9 @@ func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error {
|
|||
}
|
||||
|
||||
func (kv *KeyValue) Get(key string) (value string, err error) {
|
||||
if kv.Serial {
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
if kv.Gate != nil {
|
||||
kv.Gate.Start()
|
||||
defer kv.Gate.Done()
|
||||
}
|
||||
err = kv.DB.QueryRow(kv.sql("SELECT v FROM /*TPRE*/rows WHERE k=?"), key).Scan(&value)
|
||||
if err == sql.ErrNoRows {
|
||||
|
@ -170,9 +172,9 @@ func (kv *KeyValue) Set(key, value string) error {
|
|||
if err := sorted.CheckSizes(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
if kv.Serial {
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
if kv.Gate != nil {
|
||||
kv.Gate.Start()
|
||||
defer kv.Gate.Done()
|
||||
}
|
||||
if kv.SetFunc != nil {
|
||||
return kv.SetFunc(kv.DB, key, value)
|
||||
|
@ -182,18 +184,18 @@ func (kv *KeyValue) Set(key, value string) error {
|
|||
}
|
||||
|
||||
func (kv *KeyValue) Delete(key string) error {
|
||||
if kv.Serial {
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
if kv.Gate != nil {
|
||||
kv.Gate.Start()
|
||||
defer kv.Gate.Done()
|
||||
}
|
||||
_, err := kv.DB.Exec(kv.sql("DELETE FROM /*TPRE*/rows WHERE k=?"), key)
|
||||
return err
|
||||
}
|
||||
|
||||
func (kv *KeyValue) Wipe() error {
|
||||
if kv.Serial {
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
if kv.Gate != nil {
|
||||
kv.Gate.Start()
|
||||
defer kv.Gate.Done()
|
||||
}
|
||||
_, err := kv.DB.Exec(kv.sql("DELETE FROM /*TPRE*/rows"))
|
||||
return err
|
||||
|
@ -202,12 +204,12 @@ func (kv *KeyValue) Wipe() error {
|
|||
func (kv *KeyValue) Close() error { return kv.DB.Close() }
|
||||
|
||||
func (kv *KeyValue) Find(start, end string) sorted.Iterator {
|
||||
if kv.Serial {
|
||||
kv.mu.Lock()
|
||||
if kv.Gate != nil {
|
||||
kv.Gate.Start()
|
||||
// TODO(mpl): looks like sqlite considers the db locked until we've closed
|
||||
// the iterator, so we can't do anything else until then. We should probably
|
||||
// move that Unlock to the closing of the iterator. Investigating.
|
||||
defer kv.mu.Unlock()
|
||||
defer kv.Gate.Done()
|
||||
}
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
|
|
Loading…
Reference in New Issue