From 52f1888a149c9b50a263f131fe0c54ea338bea5f Mon Sep 17 00:00:00 2001 From: antao Date: Thu, 21 Feb 2019 20:00:22 +0800 Subject: [PATCH] Use persisted idle-callback in db connections --- orm_lib/src/DbClientImpl.cc | 50 +++++++++---- orm_lib/src/DbClientLockFree.cc | 34 ++++++--- orm_lib/src/DbConnection.h | 9 ++- orm_lib/src/TransactionImpl.cc | 70 ++++++++----------- orm_lib/src/TransactionImpl.h | 5 +- orm_lib/src/mysql_impl/MysqlConnection.cc | 19 +---- orm_lib/src/mysql_impl/MysqlConnection.h | 3 +- orm_lib/src/mysql_impl/test/test1.cc | 19 +++-- orm_lib/src/postgresql_impl/PgConnection.cc | 46 ++---------- orm_lib/src/postgresql_impl/PgConnection.h | 15 ++-- orm_lib/src/sqlite3_impl/Sqlite3Connection.cc | 19 +++-- orm_lib/src/sqlite3_impl/Sqlite3Connection.h | 6 +- 12 files changed, 138 insertions(+), 157 deletions(-) diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index 98a2c4bb..8dccbfaa 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -129,15 +129,7 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn, } std::weak_ptr weakConn = conn; conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), - std::move(rcb), std::move(exceptCallback), - [=]() -> void { - { - auto connPtr = weakConn.lock(); - if (!connPtr) - return; - handleNewTask(connPtr); - } - }); + std::move(rcb), std::move(exceptCallback)); } void DbClientImpl::execSql(std::string &&sql, size_t paraNum, @@ -233,22 +225,40 @@ std::shared_ptr DbClientImpl::newTransaction(const std::function(new TransactionImpl(_type, conn, commitCallback, [=]() { + std::weak_ptr weakThis = shared_from_this(); + auto trans = std::shared_ptr(new TransactionImpl(_type, conn, commitCallback, [weakThis, conn]() { + auto thisPtr = weakThis.lock(); + if (!thisPtr) + return; if (conn->status() == ConnectStatus_Bad) { return; } { - std::lock_guard guard(_connectionsMutex); - - if (_connections.find(conn) == _connections.end() && - _busyConnections.find(conn) == _busyConnections.find(conn)) + std::lock_guard guard(thisPtr->_connectionsMutex); + if (thisPtr->_connections.find(conn) == thisPtr->_connections.end() && + thisPtr->_busyConnections.find(conn) == thisPtr->_busyConnections.find(conn)) { //connection is broken and removed return; } } - handleNewTask(conn); + conn->loop()->queueInLoop([weakThis, conn]() { + auto thisPtr = weakThis.lock(); + if(!thisPtr) + return; + std::weak_ptr weakConn = conn; + conn->setIdleCallback([weakThis, weakConn]() { + auto thisPtr = weakThis.lock(); + if (!thisPtr) + return; + auto connPtr = weakConn.lock(); + if (!connPtr) + return; + thisPtr->handleNewTask(connPtr); + }); + thisPtr->handleNewTask(conn); + }); })); trans->doBegin(); return trans; @@ -356,6 +366,16 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop) } thisPtr->handleNewTask(okConnPtr); }); + std::weak_ptr weakConn = connPtr; + connPtr->setIdleCallback([weakPtr, weakConn]() { + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + auto connPtr = weakConn.lock(); + if (!connPtr) + return; + thisPtr->handleNewTask(connPtr); + }); //std::cout<<"newConn end"< weakConn = conn; conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), - std::move(rcb), std::move(exceptCallback), - [=]() -> void { - { - auto connPtr = weakConn.lock(); - if (!connPtr) - return; - handleNewTask(); - } - }); + std::move(rcb), std::move(exceptCallback)); } void DbClientLockFree::execSql(std::string &&sql, size_t paraNum, @@ -121,7 +113,14 @@ void DbClientLockFree::execSql(std::string &&sql, { if (!_connection->isWorking()) { - execSql(_connection, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback)); + execSql(_connection, + std::move(sql), + paraNum, + std::move(parameters), + std::move(length), + std::move(format), + std::move(rcb), + std::move(exceptCallback)); return; } } @@ -164,7 +163,14 @@ void DbClientLockFree::handleNewTask() if (!_sqlCmdBuffer.empty()) { auto &cmd = _sqlCmdBuffer.front(); - execSql(_connection, std::move(cmd._sql), cmd._paraNum, std::move(cmd._parameters), std::move(cmd._length), std::move(cmd._format), std::move(cmd._cb), std::move(cmd._exceptCb)); + execSql(_connection, + std::move(cmd._sql), + cmd._paraNum, + std::move(cmd._parameters), + std::move(cmd._length), + std::move(cmd._format), + std::move(cmd._cb), + std::move(cmd._exceptCb)); _sqlCmdBuffer.pop_front(); return; } @@ -220,6 +226,12 @@ DbConnectionPtr DbClientLockFree::newConnection() thisPtr->_connection = okConnPtr; thisPtr->handleNewTask(); }); + connPtr->setIdleCallback([weakPtr]() { + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + thisPtr->handleNewTask(); + }); //std::cout<<"newConn end"< &cb) + { + _idleCb = cb; + } virtual void execSql(std::string &&sql, size_t paraNum, std::vector &¶meters, std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) = 0; + std::function &&exceptCallback) = 0; virtual ~DbConnection() { LOG_TRACE << "Destruct DbConn" << this; @@ -79,7 +82,7 @@ class DbConnection : public trantor::NonCopyable protected: QueryCallback _cb; trantor::EventLoop *_loop; - std::shared_ptr> _idleCbPtr; + std::function _idleCb; ConnectStatus _status = ConnectStatus_None; DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {}; DbConnectionCallback _okCb = [](const DbConnectionPtr &) {}; diff --git a/orm_lib/src/TransactionImpl.cc b/orm_lib/src/TransactionImpl.cc index fd98ca89..8cbec4dc 100644 --- a/orm_lib/src/TransactionImpl.cc +++ b/orm_lib/src/TransactionImpl.cc @@ -35,6 +35,10 @@ TransactionImpl::~TransactionImpl() { auto loop = _connectionPtr->loop(); loop->queueInLoop([conn = _connectionPtr, ucb = std::move(_usedUpCallback), commitCb = std::move(_commitCallback)]() { + conn->setIdleCallback([ucb = std::move(ucb)]() { + if (ucb) + ucb(); + }); conn->execSql("commit", 0, std::vector(), @@ -60,12 +64,6 @@ TransactionImpl::~TransactionImpl() commitCb(false); } } - }, - [ucb]() { - if (ucb) - { - ucb(); - } }); }); } @@ -78,8 +76,7 @@ void TransactionImpl::execSql(std::string &&sql, ResultCallback &&rcb, std::function &&exceptCallback) { - auto thisPtr = shared_from_this(); - _loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable { + _loop->queueInLoop([thisPtr = shared_from_this(), sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable { if (!thisPtr->_isCommitedOrRolledback) { if (!thisPtr->_isWorking) @@ -95,9 +92,6 @@ void TransactionImpl::execSql(std::string &&sql, thisPtr->rollback(); if (exceptCallback) exceptCallback(ePtr); - }, - [thisPtr]() { - thisPtr->execNewTask(); }); } else @@ -111,6 +105,7 @@ void TransactionImpl::execSql(std::string &&sql, cmd._format = std::move(format); cmd._cb = std::move(rcb); cmd._exceptCb = std::move(exceptCallback); + cmd._thisPtr = thisPtr; thisPtr->_sqlCmdBuffer.push_back(std::move(cmd)); } } @@ -136,29 +131,22 @@ void TransactionImpl::rollback() _loop->runInLoop([thisPtr]() { if (thisPtr->_isCommitedOrRolledback) return; - auto clearupCb = [thisPtr]() { - thisPtr->_isCommitedOrRolledback = true; - if (thisPtr->_usedUpCallback) - { - thisPtr->_usedUpCallback(); - thisPtr->_usedUpCallback = decltype(thisPtr->_usedUpCallback)(); - } - }; if (thisPtr->_isWorking) { //push sql cmd to buffer; SqlCmd cmd; cmd._sql = "rollback"; cmd._paraNum = 0; - cmd._cb = [](const Result &r) { - LOG_TRACE << "Transaction roll back!"; - //clearupCb(); + cmd._cb = [thisPtr](const Result &r) { + LOG_DEBUG << "Transaction roll back!"; + thisPtr->_isCommitedOrRolledback = true; }; - cmd._exceptCb = [](const std::exception_ptr &ePtr) { + cmd._exceptCb = [thisPtr](const std::exception_ptr &ePtr) { //clearupCb(); + thisPtr->_isCommitedOrRolledback = true; LOG_ERROR << "Transaction rool back error"; }; - cmd._idleCb = clearupCb; + cmd._isRollbackCmd = true; //Rollback cmd should be executed firstly, so we push it in front of the list thisPtr->_sqlCmdBuffer.push_front(std::move(cmd)); return; @@ -169,17 +157,15 @@ void TransactionImpl::rollback() std::vector(), std::vector(), std::vector(), - [](const Result &r) { + [thisPtr](const Result &r) { LOG_TRACE << "Transaction roll back!"; + thisPtr->_isCommitedOrRolledback = true; //clearupCb(); }, - [](const std::exception_ptr &ePtr) { + [thisPtr](const std::exception_ptr &ePtr) { //clearupCb(); LOG_ERROR << "Transaction rool back error"; - }, - [thisPtr, clearupCb]() { - clearupCb(); - thisPtr->execNewTask(); + thisPtr->_isCommitedOrRolledback = true; }); }); } @@ -203,15 +189,10 @@ void TransactionImpl::execNewTask() std::move(cmd._format), std::move(cmd._cb), [cmd, thisPtr](const std::exception_ptr &ePtr) { - if (!cmd._idleCb) + if (!cmd._isRollbackCmd) thisPtr->rollback(); if (cmd._exceptCb) cmd._exceptCb(ePtr); - }, - [cmd, thisPtr]() { - if (cmd._idleCb) - cmd._idleCb(); - thisPtr->execNewTask(); }); return; } @@ -238,13 +219,23 @@ void TransactionImpl::execNewTask() } _sqlCmdBuffer.clear(); } + if (_usedUpCallback) + { + _usedUpCallback(); + } } } void TransactionImpl::doBegin() { - auto thisPtr = shared_from_this(); - _loop->queueInLoop([thisPtr]() { + _loop->queueInLoop([thisPtr = shared_from_this()]() { + std::weak_ptr weakPtr = thisPtr; + thisPtr->_connectionPtr->setIdleCallback([weakPtr]() { + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + thisPtr->execNewTask(); + }); assert(!thisPtr->_isWorking); assert(!thisPtr->_isCommitedOrRolledback); thisPtr->_isWorking = true; @@ -263,9 +254,6 @@ void TransactionImpl::doBegin() { thisPtr->_usedUpCallback(); } - }, - [thisPtr]() { - thisPtr->execNewTask(); }); }); } diff --git a/orm_lib/src/TransactionImpl.h b/orm_lib/src/TransactionImpl.h index cdaec271..5fc13d52 100644 --- a/orm_lib/src/TransactionImpl.h +++ b/orm_lib/src/TransactionImpl.h @@ -39,7 +39,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< std::vector &&format, ResultCallback &&rcb, std::function &&exceptCallback) override; - virtual std::shared_ptr newTransaction(const std::function&) override + virtual std::shared_ptr newTransaction(const std::function &) override { return shared_from_this(); } @@ -57,7 +57,8 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< std::vector _format; QueryCallback _cb; ExceptPtrCallback _exceptCb; - std::function _idleCb; + bool _isRollbackCmd = false; + std::shared_ptr _thisPtr; }; std::list _sqlCmdBuffer; // std::mutex _bufferMutex; diff --git a/orm_lib/src/mysql_impl/MysqlConnection.cc b/orm_lib/src/mysql_impl/MysqlConnection.cc index 6681926d..db8ca764 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.cc +++ b/orm_lib/src/mysql_impl/MysqlConnection.cc @@ -304,20 +304,17 @@ void MysqlConnection::execSql(std::string &&sql, std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) + std::function &&exceptCallback) { LOG_TRACE << sql; assert(paraNum == parameters.size()); assert(paraNum == length.size()); assert(paraNum == format.size()); assert(rcb); - assert(idleCb); assert(!_isWorking); assert(!sql.empty()); _cb = std::move(rcb); - _idleCbPtr = std::make_shared>(std ::move(idleCb)); _isWorking = true; _exceptCb = std::move(exceptCallback); _sql.clear(); @@ -439,12 +436,7 @@ void MysqlConnection::outputError() _cb = nullptr; _isWorking = false; - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } + _idleCb(); } } @@ -460,11 +452,6 @@ void MysqlConnection::getResult(MYSQL_RES *res) _cb = nullptr; _exceptCb = nullptr; _isWorking = false; - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } + _idleCb(); } } diff --git a/orm_lib/src/mysql_impl/MysqlConnection.h b/orm_lib/src/mysql_impl/MysqlConnection.h index fd291d7c..4a2034f5 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.h +++ b/orm_lib/src/mysql_impl/MysqlConnection.h @@ -43,8 +43,7 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) override; + std::function &&exceptCallback) override; virtual void disconnect() override; private: diff --git a/orm_lib/src/mysql_impl/test/test1.cc b/orm_lib/src/mysql_impl/test/test1.cc index e7f88aa5..eeac609d 100644 --- a/orm_lib/src/mysql_impl/test/test1.cc +++ b/orm_lib/src/mysql_impl/test/test1.cc @@ -73,12 +73,19 @@ int main() // str.resize(filesize); // pbuf->sgetn(&str[0], filesize); - // *clientPtr << "update users set file=? where id=?" << str << 1000 << Mode::Blocking >> [](const Result &r) { - // std::cout << "update " << r.affectedRows() << " rows" << std::endl; - // } >> [](const DrogonDbException &e) { - // std::cerr << e.base().what() << std::endl; - // }; - + { + auto trans = clientPtr->newTransaction(); + *trans << "update users set file=? where id != ?" + << "hehaha" << 1000 >> + [](const Result &r) { + std::cout << "hahaha update " << r.affectedRows() << " rows" << std::endl; + //trans->rollback(); + } >> + [](const DrogonDbException &e) { + std::cerr << e.base().what() << std::endl; + }; + } + LOG_DEBUG << "out of transaction block"; *clientPtr << "select * from users where id=1000" >> [](const Result &r) { std::cout << "file:" << r[0]["file"].as() << std::endl; } >> [](const DrogonDbException &e) { diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index bfbe5794..7beca50c 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -155,8 +155,7 @@ void PgConnection::execSqlInLoop(std::string &&sql, std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) + std::function &&exceptCallback) { LOG_TRACE << sql; _loop->assertInLoopThread(); @@ -164,12 +163,10 @@ void PgConnection::execSqlInLoop(std::string &&sql, assert(paraNum == length.size()); assert(paraNum == format.size()); assert(rcb); - assert(idleCb); assert(!_isWorking); assert(!sql.empty()); _sql = std::move(sql); _cb = std::move(rcb); - _idleCbPtr = std::make_shared>(std::move(idleCb)); _isWorking = true; _exceptCb = std::move(exceptCallback); if (paraNum == 0) @@ -200,12 +197,7 @@ void PgConnection::execSqlInLoop(std::string &&sql, _exceptCb = nullptr; } _cb = nullptr; - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } + _idleCb(); } return; } @@ -240,12 +232,7 @@ void PgConnection::execSqlInLoop(std::string &&sql, _exceptCb = nullptr; } _cb = nullptr; - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } + _idleCb(); } return; } @@ -271,12 +258,7 @@ void PgConnection::execSqlInLoop(std::string &&sql, _exceptCb = nullptr; } _cb = nullptr; - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } + _idleCb(); } return; } @@ -315,12 +297,7 @@ void PgConnection::execSqlInLoop(std::string &&sql, thisPtr->_exceptCb = nullptr; } thisPtr->_cb = nullptr; - if (thisPtr->_idleCbPtr) - { - auto idle = std::move(thisPtr->_idleCbPtr); - thisPtr->_idleCbPtr.reset(); - (*idle)(); - } + thisPtr->_idleCb(); } return; } @@ -352,12 +329,6 @@ void PgConnection::handleRead() _exceptCb = nullptr; } _cb = nullptr; - if (_idleCbPtr) - { - //auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - //(*idle)(); - } } handleClosed(); return; @@ -422,12 +393,7 @@ void PgConnection::handleRead() { _isWorking = false; _isRreparingStatement = false; - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } + _idleCb(); } } } diff --git a/orm_lib/src/postgresql_impl/PgConnection.h b/orm_lib/src/postgresql_impl/PgConnection.h index 4fda720d..9713567e 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.h +++ b/orm_lib/src/postgresql_impl/PgConnection.h @@ -44,8 +44,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) override + std::function &&exceptCallback) override { if (_loop->isInLoopThread()) { @@ -55,8 +54,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_thisexecSqlInLoop(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), - std::move(exceptCallback), - std::move(idleCb)); + std::move(exceptCallback)); }); } } @@ -98,8 +94,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb); + std::function &&exceptCallback); std::function _preparingCallback; }; diff --git a/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc b/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc index f1cba3d3..2c831cf1 100644 --- a/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc +++ b/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc @@ -93,12 +93,18 @@ void Sqlite3Connection::execSql(std::string &&sql, std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) + std::function &&exceptCallback) { auto thisPtr = shared_from_this(); - _loopThread.getLoop()->runInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable { - thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb); + _loopThread.getLoop()->runInLoop([thisPtr, + sql = std::move(sql), + paraNum, + parameters = std::move(parameters), + length = std::move(length), + format = std::move(format), + rcb = std::move(rcb), + exceptCallback = std::move(exceptCallback)]() mutable { + thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback); }); } @@ -108,8 +114,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql, const std::vector &length, const std::vector &format, const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) + const std::function &exceptCallback) { LOG_TRACE << "sql:" << sql; sqlite3_stmt *stmt = nullptr; @@ -214,7 +219,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql, } rcb(Result(resultPtr)); - idleCb(); + _idleCb(); } int Sqlite3Connection::stmtStep(sqlite3_stmt *stmt, const std::shared_ptr &resultPtr, int columnNum) diff --git a/orm_lib/src/sqlite3_impl/Sqlite3Connection.h b/orm_lib/src/sqlite3_impl/Sqlite3Connection.h index e7125572..07a8f3c5 100644 --- a/orm_lib/src/sqlite3_impl/Sqlite3Connection.h +++ b/orm_lib/src/sqlite3_impl/Sqlite3Connection.h @@ -47,8 +47,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback, - std::function &&idleCb) override; + std::function &&exceptCallback) override; virtual void disconnect() override; private: @@ -59,8 +58,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th const std::vector &length, const std::vector &format, const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb); + const std::function &exceptCallback); void onError(const std::string &sql, const std::function &exceptCallback); int stmtStep(sqlite3_stmt *stmt, const std::shared_ptr &resultPtr, int columnNum); trantor::EventLoopThread _loopThread;