diff --git a/lib/inc/drogon/HttpAppFramework.h b/lib/inc/drogon/HttpAppFramework.h index deeb2be9..6948ca6f 100755 --- a/lib/inc/drogon/HttpAppFramework.h +++ b/lib/inc/drogon/HttpAppFramework.h @@ -185,6 +185,7 @@ class HttpAppFramework : public trantor::NonCopyable virtual void setIdleConnectionTimeout(size_t timeout) = 0; #if USE_ORM virtual orm::DbClientPtr getDbClient(const std::string &name = "default") = 0; + virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") = 0; virtual void createDbClient(const std::string &dbType, const std::string &host, const u_short port, diff --git a/lib/src/HttpAppFrameworkImpl.cc b/lib/src/HttpAppFrameworkImpl.cc index d2b2d7b6..0d01f368 100755 --- a/lib/src/HttpAppFrameworkImpl.cc +++ b/lib/src/HttpAppFrameworkImpl.cc @@ -15,6 +15,9 @@ #include "HttpAppFrameworkImpl.h" #include "ConfigLoader.h" #include "HttpServer.h" +#ifdef USE_ORM +#include "../../orm_lib/src/DbClientLockFree.h" +#endif #include #include #include @@ -361,6 +364,10 @@ void HttpAppFrameworkImpl::run() servers.push_back(serverPtr); #endif } +#if USE_ORM + // Create fast db clients for every io loop + createFastDbClient(ioLoops); +#endif _httpCtrlsRouter.init(ioLoops); _httpSimpleCtrlsRouter.init(ioLoops); _websockCtrlsRouter.init(); @@ -395,7 +402,25 @@ void HttpAppFrameworkImpl::run() _responseCachingMap = std::unique_ptr>(new CacheMap(loop(), 1.0, 4, 50)); //Max timeout up to about 70 days; loop()->loop(); } - +#if USE_ORM +void HttpAppFrameworkImpl::createFastDbClient(const std::vector &ioloops) +{ + for (auto &iter : _dbClientsMap) + { + for (auto *loop : ioloops) + { + if (iter.second->type() == drogon::orm::ClientType::Sqlite3) + { + _dbFastClientsMap[iter.first][loop] = iter.second; + } + if (iter.second->type() == drogon::orm::ClientType::PostgreSQL || iter.second->type() == drogon::orm::ClientType::Mysql) + { + _dbFastClientsMap[iter.first][loop] = std::shared_ptr(new drogon::orm::DbClientLockFree(iter.second->connectionInfo(), loop, iter.second->type())); + } + } + } +} +#endif void HttpAppFrameworkImpl::onWebsockDisconnect(const WebSocketConnectionPtr &wsConnPtr) { auto wsConnImplPtr = std::dynamic_pointer_cast(wsConnPtr); @@ -775,7 +800,10 @@ orm::DbClientPtr HttpAppFrameworkImpl::getDbClient(const std::string &name) { return _dbClientsMap[name]; } - +orm::DbClientPtr HttpAppFrameworkImpl::getFastDbClient(const std::string &name) +{ + return _dbFastClientsMap[name][trantor::EventLoop::getEventLoopOfCurrentThread()]; +} void HttpAppFrameworkImpl::createDbClient(const std::string &dbType, const std::string &host, const u_short port, diff --git a/lib/src/HttpAppFrameworkImpl.h b/lib/src/HttpAppFrameworkImpl.h index 681b6399..995e7a2a 100644 --- a/lib/src/HttpAppFrameworkImpl.h +++ b/lib/src/HttpAppFrameworkImpl.h @@ -108,6 +108,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework } #if USE_ORM virtual orm::DbClientPtr getDbClient(const std::string &name = "default") override; + virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") override; virtual void createDbClient(const std::string &dbType, const std::string &host, const u_short port, @@ -192,7 +193,9 @@ class HttpAppFrameworkImpl : public HttpAppFramework std::mutex _staticFilesCacheMutex; #if USE_ORM std::map _dbClientsMap; + std::map> _dbFastClientsMap; std::vector> _dbFuncs; + void createFastDbClient(const std::vector &ioloops); #endif }; diff --git a/orm_lib/inc/drogon/orm/DbClient.h b/orm_lib/inc/drogon/orm/DbClient.h index 3d1f48ca..19b1c707 100644 --- a/orm_lib/inc/drogon/orm/DbClient.h +++ b/orm_lib/inc/drogon/orm/DbClient.h @@ -147,6 +147,7 @@ class DbClient : public trantor::NonCopyable virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) = 0; ClientType type() const { return _type; } + const std::string &connectionInfo() { return _connInfo; } private: friend internal::SqlBinder; @@ -157,9 +158,10 @@ class DbClient : public trantor::NonCopyable std::vector &&format, ResultCallback &&rcb, std::function &&exceptCallback) = 0; - + protected: ClientType _type; + std::string _connInfo; }; typedef std::shared_ptr DbClientPtr; diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index f00f55e6..8c01b119 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -42,11 +42,11 @@ using namespace drogon::orm; DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type) - : _connInfo(connInfo), - _connectNum(connNum), + : _connectNum(connNum), _loops(type == ClientType::Sqlite3 ? 1 : (connNum < std::thread::hardware_concurrency() ? connNum : std::thread::hardware_concurrency()), "DbLoop") { _type = type; + _connInfo = connInfo; LOG_TRACE << "type=" << (int)type; //LOG_DEBUG << _loops.getLoopNum(); assert(connNum > 0); diff --git a/orm_lib/src/DbClientImpl.h b/orm_lib/src/DbClientImpl.h index 3e0199d3..3664b294 100644 --- a/orm_lib/src/DbClientImpl.h +++ b/orm_lib/src/DbClientImpl.h @@ -45,7 +45,6 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this newTransaction(const std::function &commitCallback = std::function()) override; private: - std::string _connInfo; size_t _connectNum; trantor::EventLoopThreadPool _loops; std::shared_ptr _sharedMutexPtr; diff --git a/orm_lib/src/DbClientLockFree.cc b/orm_lib/src/DbClientLockFree.cc new file mode 100644 index 00000000..62edc30f --- /dev/null +++ b/orm_lib/src/DbClientLockFree.cc @@ -0,0 +1,223 @@ +/** + * + * DbClientLockFree.cc + * An Tao + * + * Copyright 2018, An Tao. All rights reserved. + * https://github.com/an-tao/drogon + * Use of this source code is governed by a MIT license + * that can be found in the License file. + * + * Drogon + * + */ + +#include "DbClientLockFree.h" +#include "DbConnection.h" +#if USE_POSTGRESQL +#include "postgresql_impl/PgConnection.h" +#endif +#if USE_MYSQL +#include "mysql_impl/MysqlConnection.h" +#endif +#include "TransactionImpl.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace drogon::orm; + +DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLoop *loop, ClientType type) + : _connInfo(connInfo), + _loop(loop) +{ + _type = type; + LOG_TRACE << "type=" << (int)type; + if (type == ClientType::PostgreSQL) + { + newConnection(); + } + else if (type == ClientType::Mysql) + { + newConnection(); + } + else + { + LOG_ERROR << "No supported database type!"; + } +} + +DbClientLockFree::~DbClientLockFree() noexcept +{ + if (_connection) + { + _connection->disconnect(); + } +} + +void DbClientLockFree::execSql(const DbConnectionPtr &conn, + std::string &&sql, + size_t paraNum, + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) +{ + assert(conn); + std::weak_ptr weakConn = conn; + conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), + std::move(rcb), std::move(exceptCallback), + [=]() -> void { + { + auto connPtr = weakConn.lock(); + if (!connPtr) + return; + handleNewTask(); + } + }); +} +void DbClientLockFree::execSql(std::string &&sql, + size_t paraNum, + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) +{ + assert(paraNum == parameters.size()); + assert(paraNum == length.size()); + assert(paraNum == format.size()); + assert(rcb); + _loop->assertInLoopThread(); + if (!_connection) + { + try + { + throw BrokenConnection("No connection to database server"); + } + catch (...) + { + exceptCallback(std::current_exception()); + } + return; + } + else + { + if (!_connection->isWorking()) + { + execSql(_connection, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback)); + return; + } + } + + if (_sqlCmdBuffer.size() > 20000) + { + //too many queries in buffer; + try + { + throw Failure("Too many queries in buffer"); + } + catch (...) + { + exceptCallback(std::current_exception()); + } + return; + } + + //LOG_TRACE << "Push query to buffer"; + std::shared_ptr cmd = std::make_shared(); + cmd->_sql = std::move(sql); + cmd->_paraNum = paraNum; + cmd->_parameters = std::move(parameters); + cmd->_length = std::move(length); + cmd->_format = std::move(format); + cmd->_cb = std::move(rcb); + cmd->_exceptCb = std::move(exceptCallback); + _sqlCmdBuffer.push_back(std::move(cmd)); +} + +std::shared_ptr DbClientLockFree::newTransaction(const std::function &commitCallback) +{ + // Don't support transaction; + assert(0); + return nullptr; +} + +void DbClientLockFree::handleNewTask() +{ + assert(_connection); + assert(!_connection->isWorking()); + if (!_sqlCmdBuffer.empty()) + { + auto cmd = _sqlCmdBuffer.front(); + _sqlCmdBuffer.pop_front(); + execSql(_connection, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb)); + return; + } +} + +DbConnectionPtr DbClientLockFree::newConnection() +{ + DbConnectionPtr connPtr; + if (_type == ClientType::PostgreSQL) + { +#if USE_POSTGRESQL + connPtr = std::make_shared(_loop, _connInfo); +#else + return nullptr; +#endif + } + else if (_type == ClientType::Mysql) + { +#if USE_MYSQL + connPtr = std::make_shared(_loop, _connInfo); +#else + return nullptr; +#endif + } + else + { + return nullptr; + } + + std::weak_ptr weakPtr = shared_from_this(); + connPtr->setCloseCallback([weakPtr](const DbConnectionPtr &closeConnPtr) { + //Erase the connection + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + + assert(thisPtr->_connection); + thisPtr->_connection.reset(); + + //Reconnect after 1 second + thisPtr->_loop->runAfter(1, [weakPtr] { + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + thisPtr->newConnection(); + }); + }); + connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { + LOG_TRACE << "connected!"; + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + thisPtr->_connection = okConnPtr; + thisPtr->handleNewTask(); + }); + //std::cout<<"newConn end"< +#include +#include +#include +#include +#include +#include +#include +#include + +namespace drogon +{ +namespace orm +{ + +class DbClientLockFree : public DbClient, public std::enable_shared_from_this +{ + public: + DbClientLockFree(const std::string &connInfo, trantor::EventLoop *loop, ClientType type); + virtual ~DbClientLockFree() noexcept; + virtual void execSql(std::string &&sql, + size_t paraNum, + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) override; + virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) override; + + private: + std::string _connInfo; + trantor::EventLoop *_loop; + + void execSql(const DbConnectionPtr &conn, + std::string &&sql, + size_t paraNum, + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback); + + DbConnectionPtr newConnection(); + + DbConnectionPtr _connection; + + struct SqlCmd + { + std::string _sql; + size_t _paraNum; + std::vector _parameters; + std::vector _length; + std::vector _format; + QueryCallback _cb; + ExceptPtrCallback _exceptCb; + }; + std::deque> _sqlCmdBuffer; + + void handleNewTask(); +}; + +} // namespace orm +} // namespace drogon diff --git a/orm_lib/src/DbConnection.h b/orm_lib/src/DbConnection.h index 6e256779..c5ccd5e9 100644 --- a/orm_lib/src/DbConnection.h +++ b/orm_lib/src/DbConnection.h @@ -74,6 +74,7 @@ class DbConnection : public trantor::NonCopyable ConnectStatus status() const { return _status; } trantor::EventLoop *loop() { return _loop; } virtual void disconnect() = 0; + bool isWorking() { return _isWorking; } protected: QueryCallback _cb; diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 46d463e8..0716ba3b 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -275,9 +275,9 @@ void PgConnection::handleRead() _cb = decltype(_cb)(); if (_idleCbPtr) { - auto idle = std::move(_idleCbPtr); + //auto idle = std::move(_idleCbPtr); _idleCbPtr.reset(); - (*idle)(); + //(*idle)(); } } handleClosed();