From 1ca5e3cde6d0af8ac73e92b7066ddaf0067abf68 Mon Sep 17 00:00:00 2001 From: antao Date: Tue, 26 Feb 2019 09:50:51 +0800 Subject: [PATCH] Use multiple database connections in the DbClientLockFree class --- orm_lib/src/DbClientLockFree.cc | 84 +++++++++++++++++++++------------ orm_lib/src/DbClientLockFree.h | 8 ++-- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/orm_lib/src/DbClientLockFree.cc b/orm_lib/src/DbClientLockFree.cc index 2d35b51e..657ae6fd 100644 --- a/orm_lib/src/DbClientLockFree.cc +++ b/orm_lib/src/DbClientLockFree.cc @@ -47,13 +47,15 @@ DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLo if (type == ClientType::PostgreSQL) { _loop->runInLoop([this]() { - _connectionHolder = newConnection(); + for (size_t i = 0; i < _connectionNum; i++) + _connectionHolders.push_back(newConnection()); }); } else if (type == ClientType::Mysql) { _loop->runInLoop([this]() { - _connectionHolder = newConnection(); + for (size_t i = 0; i < _connectionNum; i++) + _connectionHolders.push_back(newConnection()); }); } else @@ -64,9 +66,9 @@ DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLo DbClientLockFree::~DbClientLockFree() noexcept { - if (_connection) + for (auto &conn : _connections) { - _connection->disconnect(); + conn->disconnect(); } } @@ -83,7 +85,7 @@ void DbClientLockFree::execSql(std::string &&sql, assert(paraNum == format.size()); assert(rcb); _loop->assertInLoopThread(); - if (!_connection) + if (_connections.empty()) { try { @@ -97,16 +99,19 @@ void DbClientLockFree::execSql(std::string &&sql, } else { - if (!_connection->isWorking()) + for (auto &conn : _connections) { - _connection->execSql(std::move(sql), - paraNum, - std::move(parameters), - std::move(length), - std::move(format), - std::move(rcb), - std::move(exceptCallback)); - return; + if (!conn->isWorking()) + { + conn->execSql(std::move(sql), + paraNum, + std::move(parameters), + std::move(length), + std::move(format), + std::move(rcb), + std::move(exceptCallback)); + return; + } } } @@ -141,20 +146,20 @@ std::shared_ptr DbClientLockFree::newTransaction(const std::functio return nullptr; } -void DbClientLockFree::handleNewTask() +void DbClientLockFree::handleNewTask(const DbConnectionPtr &conn) { - assert(_connection); - assert(!_connection->isWorking()); + assert(conn); + assert(!conn->isWorking()); if (!_sqlCmdBuffer.empty()) { auto &cmd = _sqlCmdBuffer.front(); - _connection->execSql(std::move(cmd._sql), - cmd._paraNum, - std::move(cmd._parameters), - std::move(cmd._length), - std::move(cmd._format), - std::move(cmd._cb), - std::move(cmd._exceptCb)); + conn->execSql(std::move(cmd._sql), + cmd._paraNum, + std::move(cmd._parameters), + std::move(cmd._length), + std::move(cmd._format), + std::move(cmd._cb), + std::move(cmd._exceptCb)); _sqlCmdBuffer.pop_front(); return; } @@ -191,15 +196,29 @@ DbConnectionPtr DbClientLockFree::newConnection() if (!thisPtr) return; - assert(thisPtr->_connection); - thisPtr->_connection.reset(); + for (auto iter = thisPtr->_connections.begin(); iter != thisPtr->_connections.end(); iter++) + { + if (closeConnPtr == *iter) + { + thisPtr->_connections.erase(iter); + break; + } + } + for (auto iter = thisPtr->_connectionHolders.begin(); iter != thisPtr->_connectionHolders.end(); iter++) + { + if (closeConnPtr == *iter) + { + thisPtr->_connectionHolders.erase(iter); + break; + } + } //Reconnect after 1 second thisPtr->_loop->runAfter(1, [weakPtr] { auto thisPtr = weakPtr.lock(); if (!thisPtr) return; - thisPtr->_connectionHolder = thisPtr->newConnection(); + thisPtr->_connectionHolders.push_back(thisPtr->newConnection()); }); }); connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { @@ -207,14 +226,17 @@ DbConnectionPtr DbClientLockFree::newConnection() auto thisPtr = weakPtr.lock(); if (!thisPtr) return; - thisPtr->_connection = okConnPtr; - thisPtr->handleNewTask(); + thisPtr->_connections.push_back(okConnPtr); + thisPtr->handleNewTask(okConnPtr); }); - connPtr->setIdleCallback([weakPtr]() { + std::weak_ptr weakConnPtr = connPtr; + connPtr->setIdleCallback([weakPtr, weakConnPtr]() { auto thisPtr = weakPtr.lock(); if (!thisPtr) return; - thisPtr->handleNewTask(); + auto connPtr = weakConnPtr.lock(); + assert(connPtr); + thisPtr->handleNewTask(connPtr); }); //std::cout<<"newConn end"< #include + namespace drogon { namespace orm @@ -48,8 +49,9 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this _connections; + std::vector _connectionHolders; struct SqlCmd { std::string _sql; @@ -78,7 +80,7 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this _sqlCmdBuffer; - void handleNewTask(); + void handleNewTask(const DbConnectionPtr &conn); }; } // namespace orm