Use multiple event loops in the db client
This commit is contained in:
parent
937a2fd136
commit
ac90c80710
|
@ -100,6 +100,7 @@ class HttpAppFramework : public trantor::NonCopyable
|
|||
virtual trantor::EventLoop *loop() = 0;
|
||||
|
||||
virtual void setThreadNum(size_t threadNum) = 0;
|
||||
virtual size_t getThreadNum() const = 0;
|
||||
virtual void setSSLFiles(const std::string &certPath,
|
||||
const std::string &keyPath) = 0;
|
||||
virtual void addListener(const std::string &ip,
|
||||
|
|
|
@ -45,6 +45,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework
|
|||
const std::string &certFile = "",
|
||||
const std::string &keyFile = "") override;
|
||||
virtual void setThreadNum(size_t threadNum) override;
|
||||
virtual size_t getThreadNum() const override { return _threadNum; }
|
||||
virtual void setSSLFiles(const std::string &certPath,
|
||||
const std::string &keyPath) override;
|
||||
virtual void run() override;
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "TransactionImpl.h"
|
||||
#include <trantor/net/EventLoop.h>
|
||||
#include <trantor/net/inner/Channel.h>
|
||||
#include <drogon/drogon.h>
|
||||
#include <drogon/orm/Exception.h>
|
||||
#include <drogon/orm/DbClient.h>
|
||||
#include <sys/select.h>
|
||||
|
@ -42,34 +43,29 @@ using namespace drogon::orm;
|
|||
|
||||
DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type)
|
||||
: _connInfo(connInfo),
|
||||
_connectNum(connNum)
|
||||
_connectNum(connNum),
|
||||
_loops(connNum / 10 > 0 ? (connNum / 10 < drogon::app().getThreadNum() ? connNum / 10 : drogon::app().getThreadNum()) : 1)
|
||||
{
|
||||
_type = type;
|
||||
LOG_TRACE << "type=" << (int)type;
|
||||
//LOG_DEBUG << _loops.getLoopNum();
|
||||
assert(connNum > 0);
|
||||
_loopThread = std::thread([=]() {
|
||||
_loopPtr = std::shared_ptr<trantor::EventLoop>(new trantor::EventLoop);
|
||||
ioLoop();
|
||||
});
|
||||
}
|
||||
void DbClientImpl::ioLoop()
|
||||
{
|
||||
auto thisPtr = shared_from_this();
|
||||
_loopPtr->runAfter(0, [thisPtr]() {
|
||||
for (size_t i = 0; i < thisPtr->_connectNum; i++)
|
||||
{
|
||||
thisPtr->_connections.insert(thisPtr->newConnection());
|
||||
}
|
||||
});
|
||||
_loopPtr->loop();
|
||||
_loops.start();
|
||||
for (size_t i = 0; i < _connectNum; i++)
|
||||
{
|
||||
auto loop = _loops.getNextLoop();
|
||||
loop->runInLoop([this, loop]() {
|
||||
_connections.insert(newConnection(loop));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
DbClientImpl::~DbClientImpl() noexcept
|
||||
{
|
||||
_stop = true;
|
||||
_loopPtr->quit();
|
||||
if (_loopThread.joinable())
|
||||
_loopThread.join();
|
||||
std::lock_guard<std::mutex> lock(_connectionsMutex);
|
||||
_connections.clear();
|
||||
_readyConnections.clear();
|
||||
_busyConnections.clear();
|
||||
}
|
||||
|
||||
void DbClientImpl::execSql(const DbConnectionPtr &conn,
|
||||
|
@ -240,7 +236,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
|
|||
{
|
||||
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary
|
||||
auto &cmd = _sqlCmdBuffer.front();
|
||||
_loopPtr->queueInLoop([connPtr, cmd, this]() {
|
||||
connPtr->loop()->queueInLoop([connPtr, cmd, this]() {
|
||||
execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
|
||||
});
|
||||
_sqlCmdBuffer.pop_front();
|
||||
|
@ -248,7 +244,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
|
|||
}
|
||||
}
|
||||
//Idle connection
|
||||
_loopPtr->queueInLoop([connPtr, this]() {
|
||||
connPtr->loop()->queueInLoop([connPtr, this]() {
|
||||
std::lock_guard<std::mutex> guard(_connectionsMutex);
|
||||
_busyConnections.erase(connPtr);
|
||||
_readyConnections.insert(connPtr);
|
||||
|
@ -256,13 +252,13 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
|
|||
}
|
||||
}
|
||||
|
||||
DbConnectionPtr DbClientImpl::newConnection()
|
||||
DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
|
||||
{
|
||||
DbConnectionPtr connPtr;
|
||||
if (_type == ClientType::PostgreSQL)
|
||||
{
|
||||
#if USE_POSTGRESQL
|
||||
connPtr = std::make_shared<PgConnection>(_loopPtr.get(), _connInfo);
|
||||
connPtr = std::make_shared<PgConnection>(loop, _connInfo);
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
|
@ -270,7 +266,7 @@ DbConnectionPtr DbClientImpl::newConnection()
|
|||
else if (_type == ClientType::Mysql)
|
||||
{
|
||||
#if USE_MYSQL
|
||||
connPtr = std::make_shared<MysqlConnection>(_loopPtr.get(), _connInfo);
|
||||
connPtr = std::make_shared<MysqlConnection>(loop, _connInfo);
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
|
@ -278,7 +274,7 @@ DbConnectionPtr DbClientImpl::newConnection()
|
|||
else if (_type == ClientType::Sqlite3)
|
||||
{
|
||||
#if USE_SQLITE3
|
||||
connPtr = std::make_shared<Sqlite3Connection>(_loopPtr.get(), _connInfo);
|
||||
connPtr = std::make_shared<Sqlite3Connection>(loop, _connInfo);
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
|
@ -287,11 +283,11 @@ DbConnectionPtr DbClientImpl::newConnection()
|
|||
{
|
||||
return nullptr;
|
||||
}
|
||||
auto loopPtr = _loopPtr;
|
||||
|
||||
std::weak_ptr<DbClientImpl> weakPtr = shared_from_this();
|
||||
connPtr->setCloseCallback([weakPtr, loopPtr](const DbConnectionPtr &closeConnPtr) {
|
||||
connPtr->setCloseCallback([weakPtr, loop](const DbConnectionPtr &closeConnPtr) {
|
||||
//Reconnect after 1 second
|
||||
loopPtr->runAfter(1, [weakPtr, closeConnPtr] {
|
||||
loop->runAfter(1, [weakPtr, closeConnPtr, loop] {
|
||||
auto thisPtr = weakPtr.lock();
|
||||
if (!thisPtr)
|
||||
return;
|
||||
|
@ -300,7 +296,7 @@ DbConnectionPtr DbClientImpl::newConnection()
|
|||
thisPtr->_busyConnections.erase(closeConnPtr);
|
||||
assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end());
|
||||
thisPtr->_connections.erase(closeConnPtr);
|
||||
thisPtr->_connections.insert(thisPtr->newConnection());
|
||||
thisPtr->_connections.insert(thisPtr->newConnection(loop));
|
||||
});
|
||||
});
|
||||
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
#include "DbConnection.h"
|
||||
#include <drogon/orm/DbClient.h>
|
||||
#include <trantor/net/EventLoop.h>
|
||||
#include <trantor/net/EventLoopThreadPool.h>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <functional>
|
||||
|
@ -44,8 +44,10 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
|
|||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
|
||||
|
||||
private:
|
||||
void ioLoop();
|
||||
std::shared_ptr<trantor::EventLoop> _loopPtr;
|
||||
std::string _connInfo;
|
||||
size_t _connectNum;
|
||||
trantor::EventLoopThreadPool _loops;
|
||||
|
||||
void execSql(const DbConnectionPtr &conn,
|
||||
std::string &&sql,
|
||||
size_t paraNum,
|
||||
|
@ -55,20 +57,16 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
|
|||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback);
|
||||
|
||||
DbConnectionPtr newConnection();
|
||||
DbConnectionPtr newConnection(trantor::EventLoop *loop);
|
||||
|
||||
std::mutex _connectionsMutex;
|
||||
std::unordered_set<DbConnectionPtr> _connections;
|
||||
std::unordered_set<DbConnectionPtr> _readyConnections;
|
||||
std::unordered_set<DbConnectionPtr> _busyConnections;
|
||||
std::string _connInfo;
|
||||
std::thread _loopThread;
|
||||
std::mutex _connectionsMutex;
|
||||
|
||||
std::condition_variable _condConnectionReady;
|
||||
size_t _transWaitNum = 0;
|
||||
|
||||
size_t _connectNum;
|
||||
bool _stop = false;
|
||||
|
||||
struct SqlCmd
|
||||
{
|
||||
std::string _sql;
|
||||
|
|
Loading…
Reference in New Issue