diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index 9b85d4e3..92d19c31 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -274,19 +274,13 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) { _busyConnections.insert(connPtr); //For new connections, this sentence is necessary auto &cmd = _sqlCmdBuffer.front(); - connPtr->loop()->queueInLoop([connPtr, cmd, this]() { - execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb)); - }); + execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb)); _sqlCmdBuffer.pop_front(); return; } - } - //Idle connection - connPtr->loop()->queueInLoop([connPtr, this]() { - std::lock_guard guard(_connectionsMutex); _busyConnections.erase(connPtr); _readyConnections.insert(connPtr); - }); + } } } diff --git a/orm_lib/src/DbConnection.h b/orm_lib/src/DbConnection.h index 9fec52ef..6e256779 100644 --- a/orm_lib/src/DbConnection.h +++ b/orm_lib/src/DbConnection.h @@ -78,7 +78,7 @@ class DbConnection : public trantor::NonCopyable protected: QueryCallback _cb; trantor::EventLoop *_loop; - std::function _idleCb; + std::shared_ptr> _idleCbPtr; ConnectStatus _status = ConnectStatus_None; DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {}; DbConnectionCallback _okCb = [](const DbConnectionPtr &) {}; diff --git a/orm_lib/src/TransactionImpl.cc b/orm_lib/src/TransactionImpl.cc index 549ebecc..d3e6a5d9 100644 --- a/orm_lib/src/TransactionImpl.cc +++ b/orm_lib/src/TransactionImpl.cc @@ -150,13 +150,15 @@ void TransactionImpl::rollback() SqlCmd cmd; cmd._sql = "rollback"; cmd._paraNum = 0; - cmd._cb = [clearupCb](const Result &r) { + cmd._cb = [](const Result &r) { LOG_TRACE << "Transaction roll back!"; - clearupCb(); + //clearupCb(); }; - cmd._exceptCb = [clearupCb](const std::exception_ptr &ePtr) { - clearupCb(); + cmd._exceptCb = [](const std::exception_ptr &ePtr) { + //clearupCb(); + LOG_ERROR << "Transaction rool back error"; }; + cmd._idleCb = clearupCb; //Rollback cmd should be executed firstly, so we push it in front of the list thisPtr->_sqlCmdBuffer.push_front(std::move(cmd)); return; @@ -169,14 +171,16 @@ void TransactionImpl::rollback() std::vector(), std::vector(), std::vector(), - [clearupCb](const Result &r) { + [](const Result &r) { LOG_TRACE << "Transaction roll back!"; - clearupCb(); + //clearupCb(); }, - [clearupCb](const std::exception_ptr &ePtr) { - clearupCb(); + [](const std::exception_ptr &ePtr) { + //clearupCb(); + LOG_ERROR << "Transaction rool back error"; }, - [thisPtr]() { + [thisPtr, clearupCb]() { + clearupCb(); thisPtr->execNewTask(); }); }); @@ -204,11 +208,14 @@ void TransactionImpl::execNewTask() std::move(cmd._format), std::move(cmd._cb), [cmd, thisPtr](const std::exception_ptr &ePtr) { - thisPtr->rollback(); + if (!cmd._idleCb) + thisPtr->rollback(); if (cmd._exceptCb) cmd._exceptCb(ePtr); }, - [thisPtr]() { + [cmd,thisPtr]() { + if(cmd._idleCb) + cmd._idleCb(); thisPtr->execNewTask(); }); }); diff --git a/orm_lib/src/TransactionImpl.h b/orm_lib/src/TransactionImpl.h index dc26fc34..cdaec271 100644 --- a/orm_lib/src/TransactionImpl.h +++ b/orm_lib/src/TransactionImpl.h @@ -57,6 +57,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< std::vector _format; QueryCallback _cb; ExceptPtrCallback _exceptCb; + std::function _idleCb; }; std::list _sqlCmdBuffer; // std::mutex _bufferMutex; diff --git a/orm_lib/src/mysql_impl/MysqlConnection.cc b/orm_lib/src/mysql_impl/MysqlConnection.cc index 939b613e..ba8013af 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.cc +++ b/orm_lib/src/mysql_impl/MysqlConnection.cc @@ -311,7 +311,7 @@ void MysqlConnection::execSql(std::string &&sql, assert(!sql.empty()); _cb = std::move(rcb); - _idleCb = std::move(idleCb); + _idleCbPtr = std::make_shared>(std ::move(idleCb)); _isWorking = true; _exceptCb = std::move(exceptCallback); _sql.clear(); @@ -431,10 +431,11 @@ void MysqlConnection::outputError() _cb = decltype(_cb)(); _isWorking = false; - if (_idleCb) + if (_idleCbPtr) { - _idleCb(); - _idleCb = decltype(_idleCb)(); + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); + (*idle)(); } } } @@ -451,7 +452,11 @@ void MysqlConnection::getResult(MYSQL_RES *res) _cb = decltype(_cb)(); _exceptCb = decltype(_exceptCb)(); _isWorking = false; - _idleCb(); - _idleCb = decltype(_idleCb)(); + if (_idleCbPtr) + { + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); + (*idle)(); + } } } diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 3466ae90..46d463e8 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -16,6 +16,7 @@ #include "PostgreSQLResultImpl.h" #include #include +#include #include using namespace drogon::orm; @@ -166,14 +167,55 @@ void PgConnection::execSql(std::string &&sql, assert(!sql.empty()); _sql = std::move(sql); _cb = std::move(rcb); - _idleCb = std::move(idleCb); + _idleCbPtr = std::make_shared>(std::move(idleCb)); _isWorking = true; _exceptCb = std::move(exceptCallback); auto thisPtr = shared_from_this(); - _loop->runInLoop([thisPtr, paraNum = std::move(paraNum), parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() { + if (!_loop->isInLoopThread()) + { + _loop->queueInLoop([thisPtr, paraNum = std::move(paraNum), parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() { + if (PQsendQueryParams( + thisPtr->_connPtr.get(), + thisPtr->_sql.c_str(), + paraNum, + NULL, + parameters.data(), + length.data(), + format.data(), + 0) == 0) + { + LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get()); + if (thisPtr->_isWorking) + { + thisPtr->_isWorking = false; + try + { + throw Failure(PQerrorMessage(thisPtr->_connPtr.get())); + } + catch (...) + { + auto exceptPtr = std::current_exception(); + thisPtr->_exceptCb(exceptPtr); + thisPtr->_exceptCb = decltype(_exceptCb)(); + } + thisPtr->_cb = decltype(_cb)(); + if (thisPtr->_idleCbPtr) + { + auto idle = std::move(thisPtr->_idleCbPtr); + thisPtr->_idleCbPtr.reset(); + (*idle)(); + } + } + return; + } + thisPtr->pgPoll(); + }); + } + else + { if (PQsendQueryParams( - thisPtr->_connPtr.get(), - thisPtr->_sql.c_str(), + _connPtr.get(), + _sql.c_str(), paraNum, NULL, parameters.data(), @@ -182,9 +224,31 @@ void PgConnection::execSql(std::string &&sql, 0) == 0) { LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get()); + if (_isWorking) + { + _isWorking = false; + try + { + throw Failure(PQerrorMessage(_connPtr.get())); + } + catch (...) + { + auto exceptPtr = std::current_exception(); + _exceptCb(exceptPtr); + _exceptCb = decltype(_exceptCb)(); + } + _cb = decltype(_cb)(); + if (_idleCbPtr) + { + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); + (*idle)(); + } + } + return; } thisPtr->pgPoll(); - }); + } } void PgConnection::handleRead() @@ -209,10 +273,11 @@ void PgConnection::handleRead() _exceptCb = decltype(_exceptCb)(); } _cb = decltype(_cb)(); - if (_idleCb) + if (_idleCbPtr) { - _idleCb(); - _idleCb = decltype(_idleCb)(); + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); + (*idle)(); } } handleClosed(); @@ -225,7 +290,6 @@ void PgConnection::handleRead() } if (_channel.isWriting()) _channel.disableWriting(); - // got query results? while ((res = std::shared_ptr(PQgetResult(_connPtr.get()), [](PGresult *p) { PQclear(p); }))) @@ -236,18 +300,15 @@ void PgConnection::handleRead() LOG_WARN << PQerrorMessage(_connPtr.get()); if (_isWorking) { + try { - try - { - //TODO: exception type - throw SqlError(PQerrorMessage(_connPtr.get()), - _sql); - } - catch (...) - { - _exceptCb(std::current_exception()); - _exceptCb = decltype(_exceptCb)(); - } + //TODO: exception type + throw SqlError(PQerrorMessage(_connPtr.get()), _sql); + } + catch (...) + { + _exceptCb(std::current_exception()); + _exceptCb = decltype(_exceptCb)(); } _cb = decltype(_cb)(); } @@ -266,10 +327,11 @@ void PgConnection::handleRead() if (_isWorking) { _isWorking = false; - if (_idleCb) + if (_idleCbPtr) { - _idleCb(); - _idleCb = decltype(_idleCb)(); + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); + (*idle)(); } } }