Database connection pool refactor (#5274)

* Move optimise out of RunAllMigrations
* Separate read and write database connections
* Enforce readonly connection constraint
* Fix migrations not using tx
* #5155 - allow setting cache size from environment
* Document new environment variable
This commit is contained in:
WithoutPants 2024-09-20 12:56:26 +10:00 committed by GitHub
parent 7152be6086
commit 476688c84d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 207 additions and 178 deletions

View File

@ -28,7 +28,7 @@ type Anonymiser struct {
}
func NewAnonymiser(db *Database, outPath string) (*Anonymiser, error) {
if _, err := db.db.Exec(fmt.Sprintf(`VACUUM INTO "%s"`, outPath)); err != nil {
if _, err := db.writeDB.Exec(fmt.Sprintf(`VACUUM INTO "%s"`, outPath)); err != nil {
return nil, fmt.Errorf("vacuuming into %s: %w", outPath, err)
}
@ -75,12 +75,12 @@ func (db *Anonymiser) Anonymise(ctx context.Context) error {
}
func (db *Anonymiser) truncateColumn(tableName string, column string) error {
_, err := db.db.Exec("UPDATE " + tableName + " SET " + column + " = NULL")
_, err := db.writeDB.Exec("UPDATE " + tableName + " SET " + column + " = NULL")
return err
}
func (db *Anonymiser) truncateTable(tableName string) error {
_, err := db.db.Exec("DELETE FROM " + tableName)
_, err := db.writeDB.Exec("DELETE FROM " + tableName)
return err
}

View File

@ -17,17 +17,21 @@ import (
)
const (
// Number of database connections to use
maxWriteConnections = 1
// Number of database read connections to use
// The same value is used for both the maximum and idle limit,
// to prevent opening connections on the fly which has a notieable performance penalty.
// Fewer connections use less memory, more connections increase performance,
// but have diminishing returns.
// 10 was found to be a good tradeoff.
dbConns = 10
maxReadConnections = 10
// Idle connection timeout, in seconds
// Closes a connection after a period of inactivity, which saves on memory and
// causes the sqlite -wal and -shm files to be automatically deleted.
dbConnTimeout = 30
dbConnTimeout = 30 * time.Second
// environment variable to set the cache size
cacheSizeEnv = "STASH_SQLITE_CACHE_SIZE"
)
var appSchemaVersion uint = 67
@ -80,8 +84,9 @@ type storeRepository struct {
type Database struct {
*storeRepository
db *sqlx.DB
dbPath string
readDB *sqlx.DB
writeDB *sqlx.DB
dbPath string
schemaVersion uint
@ -128,7 +133,7 @@ func (db *Database) SetBlobStoreOptions(options BlobStoreOptions) {
// Ready returns an error if the database is not ready to begin transactions.
func (db *Database) Ready() error {
if db.db == nil {
if db.readDB == nil || db.writeDB == nil {
return ErrDatabaseNotInitialized
}
@ -140,7 +145,7 @@ func (db *Database) Ready() error {
// necessary migrations must be run separately using RunMigrations.
// Returns true if the database is new.
func (db *Database) Open(dbPath string) error {
db.lockNoCtx()
db.lock()
defer db.unlock()
db.dbPath = dbPath
@ -152,7 +157,9 @@ func (db *Database) Open(dbPath string) error {
db.schemaVersion = databaseSchemaVersion
if databaseSchemaVersion == 0 {
isNew := databaseSchemaVersion == 0
if isNew {
// new database, just run the migrations
if err := db.RunAllMigrations(); err != nil {
return fmt.Errorf("error running initial schema migrations: %w", err)
@ -174,31 +181,23 @@ func (db *Database) Open(dbPath string) error {
}
}
// RunMigrations may have opened a connection already
if db.db == nil {
const disableForeignKeys = false
db.db, err = db.open(disableForeignKeys)
if err := db.initialise(); err != nil {
return err
}
if isNew {
// optimize database after migration
err = db.Optimise(context.Background())
if err != nil {
return err
logger.Warnf("error while performing post-migration optimisation: %v", err)
}
}
return nil
}
// lock locks the database for writing.
// This method will block until the lock is acquired of the context is cancelled.
func (db *Database) lock(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case db.lockChan <- struct{}{}:
return nil
}
}
// lock locks the database for writing. This method will block until the lock is acquired.
func (db *Database) lockNoCtx() {
func (db *Database) lock() {
db.lockChan <- struct{}{}
}
@ -214,31 +213,47 @@ func (db *Database) unlock() {
}
func (db *Database) Close() error {
db.lockNoCtx()
db.lock()
defer db.unlock()
if db.db != nil {
if err := db.db.Close(); err != nil {
if db.readDB != nil {
if err := db.readDB.Close(); err != nil {
return err
}
db.db = nil
db.readDB = nil
}
if db.writeDB != nil {
if err := db.writeDB.Close(); err != nil {
return err
}
db.writeDB = nil
}
return nil
}
func (db *Database) open(disableForeignKeys bool) (*sqlx.DB, error) {
func (db *Database) open(disableForeignKeys bool, writable bool) (*sqlx.DB, error) {
// https://github.com/mattn/go-sqlite3
url := "file:" + db.dbPath + "?_journal=WAL&_sync=NORMAL&_busy_timeout=50"
if !disableForeignKeys {
url += "&_fk=true"
}
if writable {
url += "&_txlock=immediate"
} else {
url += "&mode=ro"
}
// #5155 - set the cache size if the environment variable is set
// default is -2000 which is 2MB
if cacheSize := os.Getenv(cacheSizeEnv); cacheSize != "" {
url += "&_cache_size=" + cacheSize
}
conn, err := sqlx.Open(sqlite3Driver, url)
conn.SetMaxOpenConns(dbConns)
conn.SetMaxIdleConns(dbConns)
conn.SetConnMaxIdleTime(dbConnTimeout * time.Second)
if err != nil {
return nil, fmt.Errorf("db.Open(): %w", err)
}
@ -246,6 +261,43 @@ func (db *Database) open(disableForeignKeys bool) (*sqlx.DB, error) {
return conn, nil
}
func (db *Database) initialise() error {
if err := db.openReadDB(); err != nil {
return fmt.Errorf("opening read database: %w", err)
}
if err := db.openWriteDB(); err != nil {
return fmt.Errorf("opening write database: %w", err)
}
return nil
}
func (db *Database) openReadDB() error {
const (
disableForeignKeys = false
writable = false
)
var err error
db.readDB, err = db.open(disableForeignKeys, writable)
db.readDB.SetMaxOpenConns(maxReadConnections)
db.readDB.SetMaxIdleConns(maxReadConnections)
db.readDB.SetConnMaxIdleTime(dbConnTimeout)
return err
}
func (db *Database) openWriteDB() error {
const (
disableForeignKeys = false
writable = true
)
var err error
db.writeDB, err = db.open(disableForeignKeys, writable)
db.writeDB.SetMaxOpenConns(maxWriteConnections)
db.writeDB.SetMaxIdleConns(maxWriteConnections)
db.writeDB.SetConnMaxIdleTime(dbConnTimeout)
return err
}
func (db *Database) Remove() error {
databasePath := db.dbPath
err := db.Close()
@ -289,7 +341,7 @@ func (db *Database) Reset() error {
// Backup the database. If db is nil, then uses the existing database
// connection.
func (db *Database) Backup(backupPath string) (err error) {
thisDB := db.db
thisDB := db.writeDB
if thisDB == nil {
thisDB, err = sqlx.Connect(sqlite3Driver, "file:"+db.dbPath+"?_fk=true")
if err != nil {
@ -372,13 +424,13 @@ func (db *Database) Optimise(ctx context.Context) error {
// Vacuum runs a VACUUM on the database, rebuilding the database file into a minimal amount of disk space.
func (db *Database) Vacuum(ctx context.Context) error {
_, err := db.db.ExecContext(ctx, "VACUUM")
_, err := db.writeDB.ExecContext(ctx, "VACUUM")
return err
}
// Analyze runs an ANALYZE on the database to improve query performance.
func (db *Database) Analyze(ctx context.Context) error {
_, err := db.db.ExecContext(ctx, "ANALYZE")
_, err := db.writeDB.ExecContext(ctx, "ANALYZE")
return err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/golang-migrate/migrate/v4"
sqlite3mig "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jmoiron/sqlx"
"github.com/stashapp/stash/pkg/logger"
)
@ -15,8 +16,9 @@ func (db *Database) needsMigration() bool {
}
type Migrator struct {
db *Database
m *migrate.Migrate
db *Database
conn *sqlx.DB
m *migrate.Migrate
}
func NewMigrator(db *Database) (*Migrator, error) {
@ -24,7 +26,18 @@ func NewMigrator(db *Database) (*Migrator, error) {
db: db,
}
const disableForeignKeys = true
const writable = true
var err error
m.conn, err = m.db.open(disableForeignKeys, writable)
if err != nil {
return nil, err
}
m.conn.SetMaxOpenConns(maxReadConnections)
m.conn.SetMaxIdleConns(maxReadConnections)
m.conn.SetConnMaxIdleTime(dbConnTimeout)
m.m, err = m.getMigrate()
return m, err
}
@ -51,13 +64,7 @@ func (m *Migrator) getMigrate() (*migrate.Migrate, error) {
return nil, err
}
const disableForeignKeys = true
conn, err := m.db.open(disableForeignKeys)
if err != nil {
return nil, err
}
driver, err := sqlite3mig.WithInstance(conn.DB, &sqlite3mig.Config{})
driver, err := sqlite3mig.WithInstance(m.conn.DB, &sqlite3mig.Config{})
if err != nil {
return nil, err
}
@ -110,14 +117,7 @@ func (m *Migrator) runCustomMigrations(ctx context.Context, fns []customMigratio
}
func (m *Migrator) runCustomMigration(ctx context.Context, fn customMigrationFunc) error {
const disableForeignKeys = false
d, err := m.db.open(disableForeignKeys)
if err != nil {
return err
}
defer d.Close()
if err := fn(ctx, d); err != nil {
if err := fn(ctx, m.conn); err != nil {
return err
}
@ -136,14 +136,7 @@ func (db *Database) getDatabaseSchemaVersion() (uint, error) {
}
func (db *Database) ReInitialise() error {
const disableForeignKeys = false
var err error
db.db, err = db.open(disableForeignKeys)
if err != nil {
return fmt.Errorf("re-initializing the database: %w", err)
}
return nil
return db.initialise()
}
// RunAllMigrations runs all migrations to bring the database up to the current schema version
@ -171,18 +164,5 @@ func (db *Database) RunAllMigrations() error {
}
}
// re-initialise the database
const disableForeignKeys = false
db.db, err = db.open(disableForeignKeys)
if err != nil {
return fmt.Errorf("re-initializing the database: %w", err)
}
// optimize database after migration
err = db.Optimise(ctx)
if err != nil {
logger.Warnf("error while performing post-migration optimisation: %v", err)
}
return nil
}

View File

@ -247,7 +247,7 @@ func (m *schema45Migrator) insertImage(data []byte, id int, destTable string, de
func (m *schema45Migrator) dropTable(ctx context.Context, table string) error {
if err := m.withTxn(ctx, func(tx *sqlx.Tx) error {
logger.Debugf("Dropping %s", table)
_, err := m.db.Exec(fmt.Sprintf("DROP TABLE `%s`", table))
_, err := tx.Exec(fmt.Sprintf("DROP TABLE `%s`", table))
return err
}); err != nil {
return err

View File

@ -52,7 +52,7 @@ func (m *schema48PreMigrator) validateScrapedItems(ctx context.Context) error {
func (m *schema48PreMigrator) fixStudioNames(ctx context.Context) error {
// First remove NULL names
if err := m.withTxn(ctx, func(tx *sqlx.Tx) error {
_, err := m.db.Exec("UPDATE studios SET name = 'NULL' WHERE name IS NULL")
_, err := tx.Exec("UPDATE studios SET name = 'NULL' WHERE name IS NULL")
return err
}); err != nil {
return err
@ -64,7 +64,7 @@ func (m *schema48PreMigrator) fixStudioNames(ctx context.Context) error {
// collect names
if err := m.withTxn(ctx, func(tx *sqlx.Tx) error {
rows, err := m.db.Query("SELECT id, name FROM studios ORDER BY name, id")
rows, err := tx.Query("SELECT id, name FROM studios ORDER BY name, id")
if err != nil {
return err
}
@ -114,7 +114,7 @@ func (m *schema48PreMigrator) fixStudioNames(ctx context.Context) error {
var count int
row := m.db.QueryRowx("SELECT COUNT(*) FROM studios WHERE name = ?", newName)
row := tx.QueryRowx("SELECT COUNT(*) FROM studios WHERE name = ?", newName)
err := row.Scan(&count)
if err != nil {
return err
@ -131,7 +131,7 @@ func (m *schema48PreMigrator) fixStudioNames(ctx context.Context) error {
}
logger.Infof("Renaming duplicate studio id %d to %s", id, newName)
_, err := m.db.Exec("UPDATE studios SET name = ? WHERE id = ?", newName, id)
_, err := tx.Exec("UPDATE studios SET name = ? WHERE id = ?", newName, id)
if err != nil {
return err
}

View File

@ -48,7 +48,7 @@ func (m *schema60Migrator) migrate(ctx context.Context) error {
if err := m.withTxn(ctx, func(tx *sqlx.Tx) error {
query := "SELECT id, mode, find_filter, object_filter, ui_options FROM `saved_filters` WHERE `name` = ''"
rows, err := m.db.Query(query)
rows, err := tx.Query(query)
if err != nil {
return err
}
@ -98,7 +98,7 @@ func (m *schema60Migrator) migrate(ctx context.Context) error {
// remove the default filters from the database
query = "DELETE FROM `saved_filters` WHERE `name` = ''"
if _, err := m.db.Exec(query); err != nil {
if _, err := tx.Exec(query); err != nil {
return fmt.Errorf("deleting default filters: %w", err)
}

View File

@ -17,7 +17,7 @@ type key int
const (
txnKey key = iota + 1
dbKey
exclusiveKey
writableKey
)
func (db *Database) WithDatabase(ctx context.Context) (context.Context, error) {
@ -26,10 +26,10 @@ func (db *Database) WithDatabase(ctx context.Context) (context.Context, error) {
return ctx, nil
}
return context.WithValue(ctx, dbKey, db.db), nil
return context.WithValue(ctx, dbKey, db.readDB), nil
}
func (db *Database) Begin(ctx context.Context, exclusive bool) (context.Context, error) {
func (db *Database) Begin(ctx context.Context, writable bool) (context.Context, error) {
if tx, _ := getTx(ctx); tx != nil {
// log the stack trace so we can see
logger.Error(string(debug.Stack()))
@ -37,22 +37,17 @@ func (db *Database) Begin(ctx context.Context, exclusive bool) (context.Context,
return nil, fmt.Errorf("already in transaction")
}
if exclusive {
if err := db.lock(ctx); err != nil {
return nil, err
}
dbtx := db.readDB
if writable {
dbtx = db.writeDB
}
tx, err := db.db.BeginTxx(ctx, nil)
tx, err := dbtx.BeginTxx(ctx, nil)
if err != nil {
// begin failed, unlock
if exclusive {
db.unlock()
}
return nil, fmt.Errorf("beginning transaction: %w", err)
}
ctx = context.WithValue(ctx, exclusiveKey, exclusive)
ctx = context.WithValue(ctx, writableKey, writable)
return context.WithValue(ctx, txnKey, tx), nil
}
@ -88,9 +83,6 @@ func (db *Database) Rollback(ctx context.Context) error {
}
func (db *Database) txnComplete(ctx context.Context) {
if exclusive := ctx.Value(exclusiveKey).(bool); exclusive {
db.unlock()
}
}
func getTx(ctx context.Context) (*sqlx.Tx, error) {

View File

@ -77,80 +77,83 @@ func waitForOtherThread(c chan struct{}) error {
}
}
func TestConcurrentReadTxn(t *testing.T) {
var wg sync.WaitGroup
ctx := context.Background()
c := make(chan struct{})
// this test is left commented as it's no longer possible to write to the database
// with a read-only transaction.
// first thread
wg.Add(2)
go func() {
defer wg.Done()
if err := txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
scene := &models.Scene{
Title: "test",
}
// func TestConcurrentReadTxn(t *testing.T) {
// var wg sync.WaitGroup
// ctx := context.Background()
// c := make(chan struct{})
if err := db.Scene.Create(ctx, scene, nil); err != nil {
return err
}
// // first thread
// wg.Add(2)
// go func() {
// defer wg.Done()
// if err := txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
// scene := &models.Scene{
// Title: "test",
// }
// wait for other thread to start
if err := signalOtherThread(c); err != nil {
return err
}
if err := waitForOtherThread(c); err != nil {
return err
}
// if err := db.Scene.Create(ctx, scene, nil); err != nil {
// return err
// }
if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
return err
}
// // wait for other thread to start
// if err := signalOtherThread(c); err != nil {
// return err
// }
// if err := waitForOtherThread(c); err != nil {
// return err
// }
return nil
}); err != nil {
t.Errorf("unexpected error in first thread: %v", err)
}
}()
// if err := db.Scene.Destroy(ctx, scene.ID); err != nil {
// return err
// }
// second thread
go func() {
defer wg.Done()
_ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
// wait for first thread
if err := waitForOtherThread(c); err != nil {
t.Errorf(err.Error())
return err
}
// return nil
// }); err != nil {
// t.Errorf("unexpected error in first thread: %v", err)
// }
// }()
defer func() {
if err := signalOtherThread(c); err != nil {
t.Errorf(err.Error())
}
}()
// // second thread
// go func() {
// defer wg.Done()
// _ = txn.WithReadTxn(ctx, db, func(ctx context.Context) error {
// // wait for first thread
// if err := waitForOtherThread(c); err != nil {
// t.Errorf(err.Error())
// return err
// }
scene := &models.Scene{
Title: "test",
}
// defer func() {
// if err := signalOtherThread(c); err != nil {
// t.Errorf(err.Error())
// }
// }()
// expect error when we try to do this, as the other thread has already
// modified this table
// this takes time to fail, so we need to wait for it
if err := db.Scene.Create(ctx, scene, nil); err != nil {
if !db.IsLocked(err) {
t.Errorf("unexpected error: %v", err)
}
return err
} else {
t.Errorf("expected locked error in second thread")
}
// scene := &models.Scene{
// Title: "test",
// }
return nil
})
}()
// // expect error when we try to do this, as the other thread has already
// // modified this table
// // this takes time to fail, so we need to wait for it
// if err := db.Scene.Create(ctx, scene, nil); err != nil {
// if !db.IsLocked(err) {
// t.Errorf("unexpected error: %v", err)
// }
// return err
// } else {
// t.Errorf("expected locked error in second thread")
// }
wg.Wait()
}
// return nil
// })
// }()
// wg.Wait()
// }
func TestConcurrentExclusiveAndReadTxn(t *testing.T) {
var wg sync.WaitGroup

View File

@ -7,7 +7,7 @@ import (
)
type Manager interface {
Begin(ctx context.Context, exclusive bool) (context.Context, error)
Begin(ctx context.Context, writable bool) (context.Context, error)
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
@ -28,34 +28,30 @@ type MustFunc func(ctx context.Context)
// WithTxn executes fn in a transaction. If fn returns an error then
// the transaction is rolled back. Otherwise it is committed.
// Transaction is exclusive. Only one thread may run a transaction
// using this function at a time. This function will wait until the
// lock is available before executing.
// This function will call m.Begin with writable = true.
// This function should be used for making changes to the database.
func WithTxn(ctx context.Context, m Manager, fn TxnFunc) error {
const (
execComplete = true
exclusive = true
writable = true
)
return withTxn(ctx, m, fn, exclusive, execComplete)
return withTxn(ctx, m, fn, writable, execComplete)
}
// WithReadTxn executes fn in a transaction. If fn returns an error then
// the transaction is rolled back. Otherwise it is committed.
// Transaction is not exclusive and does not enforce read-only restrictions.
// Multiple threads can run transactions using this function concurrently,
// but concurrent writes may result in locked database error.
// This function will call m.Begin with writable = false.
func WithReadTxn(ctx context.Context, m Manager, fn TxnFunc) error {
const (
execComplete = true
exclusive = false
writable = false
)
return withTxn(ctx, m, fn, exclusive, execComplete)
return withTxn(ctx, m, fn, writable, execComplete)
}
func withTxn(ctx context.Context, m Manager, fn TxnFunc, exclusive bool, execCompleteOnLocked bool) error {
func withTxn(ctx context.Context, m Manager, fn TxnFunc, writable bool, execCompleteOnLocked bool) error {
// post-hooks should be executed with the outside context
txnCtx, err := begin(ctx, m, exclusive)
txnCtx, err := begin(ctx, m, writable)
if err != nil {
return err
}
@ -94,9 +90,9 @@ func withTxn(ctx context.Context, m Manager, fn TxnFunc, exclusive bool, execCom
return err
}
func begin(ctx context.Context, m Manager, exclusive bool) (context.Context, error) {
func begin(ctx context.Context, m Manager, writable bool) (context.Context, error) {
var err error
ctx, err = m.Begin(ctx, exclusive)
ctx, err = m.Begin(ctx, writable)
if err != nil {
return nil, err
}

View File

@ -149,6 +149,12 @@ These options are typically not exposed in the UI and must be changed manually i
| `no_proxy` | A list of domains for which the proxy must not be used. Default is all local LAN: localhost,127.0.0.1,192.168.0.0/16,10.0.0.0/8,172.16.0.0/12 |
| `sequential_scanning` | Modifies behaviour of the scanning functionality to generate support files (previews/sprites/phash) at the same time as fingerprinting/screenshotting. Useful when scanning cached remote files. |
The following environment variables are also supported:
| Environment variable | Remarks |
|----------------------|---------|
| `STASH_SQLITE_CACHE_SIZE` | Sets the SQLite cache size. See https://www.sqlite.org/pragma.html#pragma_cache_size. Default is `-2000` which is 2MB. |
### Custom served folders
Custom served folders are served when the server handles a request with the `/custom` URL prefix. The following is an example configuration: