Use read-write lock to avoid the race condition of sqlite3 database operation

This commit is contained in:
antao 2019-01-11 15:01:25 +08:00
parent 52dceb45c2
commit cd257203f7
4 changed files with 49 additions and 18 deletions

View File

@ -62,7 +62,8 @@ DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, Cl
_connections.insert(newConnection(loop));
});
}
}).detach();
})
.detach();
}
else if (type == ClientType::Mysql)
{
@ -75,10 +76,13 @@ DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, Cl
_connections.insert(newConnection(loop));
});
}
}).detach();
})
.detach();
}
else if (type == ClientType::Sqlite3)
{
_sharedMutexPtr = std::make_shared<std::shared_mutex>();
assert(_sharedMutexPtr);
auto loop = _loops.getNextLoop();
loop->runInLoop([this]() {
std::lock_guard<std::mutex> lock(_connectionsMutex);
@ -304,7 +308,7 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
else if (_type == ClientType::Sqlite3)
{
#if USE_SQLITE3
connPtr = std::make_shared<Sqlite3Connection>(loop, _connInfo);
connPtr = std::make_shared<Sqlite3Connection>(loop, _connInfo, _sharedMutexPtr);
#else
return nullptr;
#endif
@ -329,7 +333,7 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
}
//Reconnect after 1 second
auto loop = closeConnPtr->loop();
loop->runAfter(1, [weakPtr, closeConnPtr, loop] {
loop->runAfter(1, [weakPtr, loop] {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;

View File

@ -23,6 +23,7 @@
#include <string>
#include <unordered_set>
#include <list>
#include <shared_mutex>
namespace drogon
{
@ -47,6 +48,7 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
std::string _connInfo;
size_t _connectNum;
trantor::EventLoopThreadPool _loops;
std::shared_ptr<std::shared_mutex> _sharedMutexPtr;
void execSql(const DbConnectionPtr &conn,
std::string &&sql,

View File

@ -34,8 +34,9 @@ void Sqlite3Connection::onError(const std::string &sql, const std::function<void
}
}
Sqlite3Connection::Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo)
: DbConnection(loop)
Sqlite3Connection::Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo, const std::shared_ptr<std::shared_mutex> &sharedMutex)
: DbConnection(loop),
_sharedMutexPtr(sharedMutex)
{
_loopThread.run();
_loop = _loopThread.getLoop();
@ -186,6 +187,36 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
resultPtr->_columnNames.push_back(name);
resultPtr->_columnNameMap.insert({name, i});
}
if (sqlite3_stmt_readonly(stmt))
{
//Readonly, hold read lock;
std::shared_lock<std::shared_mutex> lock(*_sharedMutexPtr);
r = stmtStep(stmt, resultPtr, columnNum);
}
else
{
//Hold write lock
std::unique_lock<std::shared_mutex> lock(*_sharedMutexPtr);
r = stmtStep(stmt, resultPtr, columnNum);
}
if (r != SQLITE_DONE)
{
onError(sql, exceptCallback);
return;
}
// If the sql is a select statement? FIXME
resultPtr->_affectedRows = sqlite3_changes(_conn.get());
resultPtr->_insertId = sqlite3_last_insert_rowid(_conn.get());
// sqlite3_set_last_insert_rowid(_conn.get(), 0);
rcb(Result(resultPtr));
idleCb();
}
int Sqlite3Connection::stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite3ResultImpl> &resultPtr, int columnNum)
{
int r;
while ((r = sqlite3_step(stmt)) == SQLITE_ROW)
{
std::vector<std::shared_ptr<std::string>> row;
@ -217,15 +248,5 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
}
resultPtr->_result.push_back(std::move(row));
}
if (r != SQLITE_DONE)
{
onError(sql, exceptCallback);
return;
}
// If the sql is a select statement? FIXME
resultPtr->_affectedRows = sqlite3_changes(_conn.get());
resultPtr->_insertId = sqlite3_last_insert_rowid(_conn.get());
// sqlite3_set_last_insert_rowid(_conn.get(), 0);
rcb(Result(resultPtr));
idleCb();
return r;
}

View File

@ -15,6 +15,7 @@
#pragma once
#include "../DbConnection.h"
#include "Sqlite3ResultImpl.h"
#include <drogon/orm/DbClient.h>
#include <trantor/utils/NonCopyable.h>
#include <trantor/utils/SerialTaskQueue.h>
@ -25,6 +26,7 @@
#include <functional>
#include <iostream>
#include <thread>
#include <shared_mutex>
namespace drogon
{
@ -36,7 +38,7 @@ typedef std::shared_ptr<Sqlite3Connection> Sqlite3ConnectionPtr;
class Sqlite3Connection : public DbConnection, public std::enable_shared_from_this<Sqlite3Connection>
{
public:
Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo);
Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo, const std::shared_ptr<std::shared_mutex> &sharedMutex);
virtual void execSql(std::string &&sql,
size_t paraNum,
@ -58,8 +60,10 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb);
void onError(const std::string &sql, const std::function<void(const std::exception_ptr &)> &exceptCallback);
int stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite3ResultImpl> &resultPtr, int columnNum);
trantor::EventLoopThread _loopThread;
std::shared_ptr<sqlite3> _conn;
std::shared_ptr<std::shared_mutex> _sharedMutexPtr;
};
} // namespace orm