From b3ba9aa7cd4372977a875de941b6787e7029e7d1 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 13 Feb 2016 13:01:46 -0800 Subject: [PATCH] sorted/sqlkv, sorted/sqlite: replace Serial bool with a *syncutil.Gate Change-Id: I820220d49b1d316d2be0ab7fe5bf302d204108b0 --- pkg/sorted/sqlite/sqlitekv.go | 5 +-- pkg/sorted/sqlkv/sqlkv.go | 58 ++++++++++++++++++----------------- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/pkg/sorted/sqlite/sqlitekv.go b/pkg/sorted/sqlite/sqlitekv.go index 06ea41f35..737353534 100644 --- a/pkg/sorted/sqlite/sqlitekv.go +++ b/pkg/sorted/sqlite/sqlitekv.go @@ -28,6 +28,7 @@ import ( "camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted/sqlkv" "go4.org/jsonconfig" + "go4.org/syncutil" ) func init() { @@ -64,8 +65,8 @@ func newKeyValueFromConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) { file: file, db: db, KeyValue: &sqlkv.KeyValue{ - DB: db, - Serial: true, + DB: db, + Gate: syncutil.NewGate(1), }, } diff --git a/pkg/sorted/sqlkv/sqlkv.go b/pkg/sorted/sqlkv/sqlkv.go index 93b51abba..1acf4a5c9 100644 --- a/pkg/sorted/sqlkv/sqlkv.go +++ b/pkg/sorted/sqlkv/sqlkv.go @@ -28,6 +28,7 @@ import ( "camlistore.org/pkg/leak" "camlistore.org/pkg/sorted" + "go4.org/syncutil" ) // KeyValue implements the sorted.KeyValue interface using an *sql.DB. @@ -42,20 +43,21 @@ type KeyValue struct { // with the right ones for the rdbms in use. PlaceHolderFunc func(string) string - // Serial determines whether a Go-level mutex protects DB from - // concurrent access. This isn't perfect and exists just for - // SQLite, whose driver likes to return "the database is - // locked" (camlistore.org/issue/114), so this keeps some - // pressure off. But we still trust SQLite to deal with - // concurrency in most cases. - Serial bool + // Gate optionally limits concurrent access. + // + // This originally existed just for SQLite, whose driver likes + // to return "the database is locked" + // (camlistore.org/issue/114), so this keeps some pressure + // off. But we still trust SQLite to deal with concurrency in + // most cases. + // + // It's also used to limit the number of MySQL connections. + Gate *syncutil.Gate // TablePrefix optionally provides a prefix for SQL table // names. This is typically "dbname.", ending in a period. TablePrefix string - mu sync.Mutex // the mutex used, if Serial is set - queriesInitOnce sync.Once // guards initialization of both queries and replacer replacer *strings.Replacer @@ -126,8 +128,8 @@ func (b *batchTx) Delete(key string) { } func (kv *KeyValue) BeginBatch() sorted.BatchMutation { - if kv.Serial { - kv.mu.Lock() + if kv.Gate != nil { + kv.Gate.Start() } tx, err := kv.DB.Begin() if err != nil { @@ -141,8 +143,8 @@ func (kv *KeyValue) BeginBatch() sorted.BatchMutation { } func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error { - if kv.Serial { - defer kv.mu.Unlock() + if kv.Gate != nil { + defer kv.Gate.Done() } bt, ok := b.(*batchTx) if !ok { @@ -155,9 +157,9 @@ func (kv *KeyValue) CommitBatch(b sorted.BatchMutation) error { } func (kv *KeyValue) Get(key string) (value string, err error) { - if kv.Serial { - kv.mu.Lock() - defer kv.mu.Unlock() + if kv.Gate != nil { + kv.Gate.Start() + defer kv.Gate.Done() } err = kv.DB.QueryRow(kv.sql("SELECT v FROM /*TPRE*/rows WHERE k=?"), key).Scan(&value) if err == sql.ErrNoRows { @@ -170,9 +172,9 @@ func (kv *KeyValue) Set(key, value string) error { if err := sorted.CheckSizes(key, value); err != nil { return err } - if kv.Serial { - kv.mu.Lock() - defer kv.mu.Unlock() + if kv.Gate != nil { + kv.Gate.Start() + defer kv.Gate.Done() } if kv.SetFunc != nil { return kv.SetFunc(kv.DB, key, value) @@ -182,18 +184,18 @@ func (kv *KeyValue) Set(key, value string) error { } func (kv *KeyValue) Delete(key string) error { - if kv.Serial { - kv.mu.Lock() - defer kv.mu.Unlock() + if kv.Gate != nil { + kv.Gate.Start() + defer kv.Gate.Done() } _, err := kv.DB.Exec(kv.sql("DELETE FROM /*TPRE*/rows WHERE k=?"), key) return err } func (kv *KeyValue) Wipe() error { - if kv.Serial { - kv.mu.Lock() - defer kv.mu.Unlock() + if kv.Gate != nil { + kv.Gate.Start() + defer kv.Gate.Done() } _, err := kv.DB.Exec(kv.sql("DELETE FROM /*TPRE*/rows")) return err @@ -202,12 +204,12 @@ func (kv *KeyValue) Wipe() error { func (kv *KeyValue) Close() error { return kv.DB.Close() } func (kv *KeyValue) Find(start, end string) sorted.Iterator { - if kv.Serial { - kv.mu.Lock() + if kv.Gate != nil { + kv.Gate.Start() // TODO(mpl): looks like sqlite considers the db locked until we've closed // the iterator, so we can't do anything else until then. We should probably // move that Unlock to the closing of the iterator. Investigating. - defer kv.mu.Unlock() + defer kv.Gate.Done() } var rows *sql.Rows var err error