diff --git a/lib/inc/drogon/HttpAppFramework.h b/lib/inc/drogon/HttpAppFramework.h index ce106704..02677572 100755 --- a/lib/inc/drogon/HttpAppFramework.h +++ b/lib/inc/drogon/HttpAppFramework.h @@ -100,6 +100,7 @@ class HttpAppFramework : public trantor::NonCopyable virtual trantor::EventLoop *loop() = 0; virtual void setThreadNum(size_t threadNum) = 0; + virtual size_t getThreadNum() const = 0; virtual void setSSLFiles(const std::string &certPath, const std::string &keyPath) = 0; virtual void addListener(const std::string &ip, diff --git a/lib/src/HttpAppFrameworkImpl.h b/lib/src/HttpAppFrameworkImpl.h index 7526b07d..8cdcadeb 100644 --- a/lib/src/HttpAppFrameworkImpl.h +++ b/lib/src/HttpAppFrameworkImpl.h @@ -45,6 +45,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework const std::string &certFile = "", const std::string &keyFile = "") override; virtual void setThreadNum(size_t threadNum) override; + virtual size_t getThreadNum() const override { return _threadNum; } virtual void setSSLFiles(const std::string &certPath, const std::string &keyPath) override; virtual void run() override; diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index 5818b036..c097fddf 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -26,6 +26,7 @@ #include "TransactionImpl.h" #include #include +#include #include #include #include @@ -42,34 +43,29 @@ using namespace drogon::orm; DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type) : _connInfo(connInfo), - _connectNum(connNum) + _connectNum(connNum), + _loops(connNum / 10 > 0 ? (connNum / 10 < drogon::app().getThreadNum() ? connNum / 10 : drogon::app().getThreadNum()) : 1) { _type = type; LOG_TRACE << "type=" << (int)type; + //LOG_DEBUG << _loops.getLoopNum(); assert(connNum > 0); - _loopThread = std::thread([=]() { - _loopPtr = std::shared_ptr(new trantor::EventLoop); - ioLoop(); - }); -} -void DbClientImpl::ioLoop() -{ - auto thisPtr = shared_from_this(); - _loopPtr->runAfter(0, [thisPtr]() { - for (size_t i = 0; i < thisPtr->_connectNum; i++) - { - thisPtr->_connections.insert(thisPtr->newConnection()); - } - }); - _loopPtr->loop(); + _loops.start(); + for (size_t i = 0; i < _connectNum; i++) + { + auto loop = _loops.getNextLoop(); + loop->runInLoop([this, loop]() { + _connections.insert(newConnection(loop)); + }); + } } DbClientImpl::~DbClientImpl() noexcept { - _stop = true; - _loopPtr->quit(); - if (_loopThread.joinable()) - _loopThread.join(); + std::lock_guard lock(_connectionsMutex); + _connections.clear(); + _readyConnections.clear(); + _busyConnections.clear(); } void DbClientImpl::execSql(const DbConnectionPtr &conn, @@ -240,7 +236,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) { _busyConnections.insert(connPtr); //For new connections, this sentence is necessary auto &cmd = _sqlCmdBuffer.front(); - _loopPtr->queueInLoop([connPtr, cmd, this]() { + connPtr->loop()->queueInLoop([connPtr, cmd, this]() { execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb)); }); _sqlCmdBuffer.pop_front(); @@ -248,7 +244,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) } } //Idle connection - _loopPtr->queueInLoop([connPtr, this]() { + connPtr->loop()->queueInLoop([connPtr, this]() { std::lock_guard guard(_connectionsMutex); _busyConnections.erase(connPtr); _readyConnections.insert(connPtr); @@ -256,13 +252,13 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) } } -DbConnectionPtr DbClientImpl::newConnection() +DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop) { DbConnectionPtr connPtr; if (_type == ClientType::PostgreSQL) { #if USE_POSTGRESQL - connPtr = std::make_shared(_loopPtr.get(), _connInfo); + connPtr = std::make_shared(loop, _connInfo); #else return nullptr; #endif @@ -270,7 +266,7 @@ DbConnectionPtr DbClientImpl::newConnection() else if (_type == ClientType::Mysql) { #if USE_MYSQL - connPtr = std::make_shared(_loopPtr.get(), _connInfo); + connPtr = std::make_shared(loop, _connInfo); #else return nullptr; #endif @@ -278,7 +274,7 @@ DbConnectionPtr DbClientImpl::newConnection() else if (_type == ClientType::Sqlite3) { #if USE_SQLITE3 - connPtr = std::make_shared(_loopPtr.get(), _connInfo); + connPtr = std::make_shared(loop, _connInfo); #else return nullptr; #endif @@ -287,11 +283,11 @@ DbConnectionPtr DbClientImpl::newConnection() { return nullptr; } - auto loopPtr = _loopPtr; + std::weak_ptr weakPtr = shared_from_this(); - connPtr->setCloseCallback([weakPtr, loopPtr](const DbConnectionPtr &closeConnPtr) { + connPtr->setCloseCallback([weakPtr, loop](const DbConnectionPtr &closeConnPtr) { //Reconnect after 1 second - loopPtr->runAfter(1, [weakPtr, closeConnPtr] { + loop->runAfter(1, [weakPtr, closeConnPtr, loop] { auto thisPtr = weakPtr.lock(); if (!thisPtr) return; @@ -300,7 +296,7 @@ DbConnectionPtr DbClientImpl::newConnection() thisPtr->_busyConnections.erase(closeConnPtr); assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end()); thisPtr->_connections.erase(closeConnPtr); - thisPtr->_connections.insert(thisPtr->newConnection()); + thisPtr->_connections.insert(thisPtr->newConnection(loop)); }); }); connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { diff --git a/orm_lib/src/DbClientImpl.h b/orm_lib/src/DbClientImpl.h index ad28950c..947850bd 100644 --- a/orm_lib/src/DbClientImpl.h +++ b/orm_lib/src/DbClientImpl.h @@ -16,7 +16,7 @@ #include "DbConnection.h" #include -#include +#include #include #include #include @@ -44,8 +44,10 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this newTransaction(const std::function &commitCallback = std::function()) override; private: - void ioLoop(); - std::shared_ptr _loopPtr; + std::string _connInfo; + size_t _connectNum; + trantor::EventLoopThreadPool _loops; + void execSql(const DbConnectionPtr &conn, std::string &&sql, size_t paraNum, @@ -55,20 +57,16 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this &&exceptCallback); - DbConnectionPtr newConnection(); + DbConnectionPtr newConnection(trantor::EventLoop *loop); + std::mutex _connectionsMutex; std::unordered_set _connections; std::unordered_set _readyConnections; std::unordered_set _busyConnections; - std::string _connInfo; - std::thread _loopThread; - std::mutex _connectionsMutex; + std::condition_variable _condConnectionReady; size_t _transWaitNum = 0; - size_t _connectNum; - bool _stop = false; - struct SqlCmd { std::string _sql;