From 84ab5d369e0eddcbc938c42ae7819655d3eb823c Mon Sep 17 00:00:00 2001 From: antao Date: Tue, 19 Feb 2019 18:52:25 +0800 Subject: [PATCH] Modify the PgConnection class --- orm_lib/src/postgresql_impl/PgConnection.cc | 151 +++++++++++++------- 1 file changed, 98 insertions(+), 53 deletions(-) diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 03212ea1..bbeeda3f 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -172,13 +172,14 @@ void PgConnection::execSqlInLoop(std::string &&sql, _idleCbPtr = std::make_shared>(std::move(idleCb)); _isWorking = true; _exceptCb = std::move(exceptCallback); - auto iter = _preparedStatementMap.find(_sql); - if (iter != _preparedStatementMap.end()) + if (paraNum == 0) { - if (PQsendQueryPrepared( + _isRreparingStatement = false; + if (PQsendQueryParams( _connPtr.get(), - iter->second.c_str(), + _sql.c_str(), paraNum, + nullptr, parameters.data(), length.data(), format.data(), @@ -188,6 +189,7 @@ void PgConnection::execSqlInLoop(std::string &&sql, if (_isWorking) { _isWorking = false; + _isRreparingStatement = false; try { throw Failure(PQerrorMessage(_connPtr.get())); @@ -211,75 +213,117 @@ void PgConnection::execSqlInLoop(std::string &&sql, } else { - _isRreparingStatement = true; - auto statementName = getuuid(); - if (PQsendPrepare(_connPtr.get(), statementName.c_str(), _sql.c_str(), paraNum, NULL) == 0) + auto iter = _preparedStatementMap.find(_sql); + if (iter != _preparedStatementMap.end()) { - LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get()); - if (_isWorking) - { - _isWorking = false; - try - { - throw Failure(PQerrorMessage(_connPtr.get())); - } - catch (...) - { - auto exceptPtr = std::current_exception(); - _exceptCb(exceptPtr); - _exceptCb = decltype(_exceptCb)(); - } - _cb = decltype(_cb)(); - if (_idleCbPtr) - { - auto idle = std::move(_idleCbPtr); - _idleCbPtr.reset(); - (*idle)(); - } - } - return; - } - std::weak_ptr weakPtr = shared_from_this(); - _preparingCallback = [weakPtr, statementName, paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() { - auto thisPtr = weakPtr.lock(); - if (!thisPtr) - return; - thisPtr->_isRreparingStatement = false; - thisPtr->_preparedStatementMap[thisPtr->_sql] = statementName; + _isRreparingStatement = false; if (PQsendQueryPrepared( - thisPtr->_connPtr.get(), - statementName.c_str(), + _connPtr.get(), + iter->second.c_str(), paraNum, parameters.data(), length.data(), format.data(), 0) == 0) { - LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get()); - if (thisPtr->_isWorking) + LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get()); + if (_isWorking) { - thisPtr->_isWorking = false; + _isWorking = false; + _isRreparingStatement = false; try { - throw Failure(PQerrorMessage(thisPtr->_connPtr.get())); + throw Failure(PQerrorMessage(_connPtr.get())); } catch (...) { auto exceptPtr = std::current_exception(); - thisPtr->_exceptCb(exceptPtr); - thisPtr->_exceptCb = decltype(thisPtr->_exceptCb)(); + _exceptCb(exceptPtr); + _exceptCb = decltype(_exceptCb)(); } - thisPtr->_cb = decltype(thisPtr->_cb)(); - if (thisPtr->_idleCbPtr) + _cb = decltype(_cb)(); + if (_idleCbPtr) { - auto idle = std::move(thisPtr->_idleCbPtr); - thisPtr->_idleCbPtr.reset(); + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); (*idle)(); } } return; } - }; + } + else + { + _isRreparingStatement = true; + auto statementName = getuuid(); + if (PQsendPrepare(_connPtr.get(), statementName.c_str(), _sql.c_str(), paraNum, NULL) == 0) + { + LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get()); + if (_isWorking) + { + _isWorking = false; + try + { + throw Failure(PQerrorMessage(_connPtr.get())); + } + catch (...) + { + auto exceptPtr = std::current_exception(); + _exceptCb(exceptPtr); + _exceptCb = decltype(_exceptCb)(); + } + _cb = decltype(_cb)(); + if (_idleCbPtr) + { + auto idle = std::move(_idleCbPtr); + _idleCbPtr.reset(); + (*idle)(); + } + } + return; + } + std::weak_ptr weakPtr = shared_from_this(); + _preparingCallback = [weakPtr, statementName, paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() { + auto thisPtr = weakPtr.lock(); + if (!thisPtr) + return; + thisPtr->_isRreparingStatement = false; + thisPtr->_preparedStatementMap[thisPtr->_sql] = statementName; + if (PQsendQueryPrepared( + thisPtr->_connPtr.get(), + statementName.c_str(), + paraNum, + parameters.data(), + length.data(), + format.data(), + 0) == 0) + { + LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get()); + if (thisPtr->_isWorking) + { + thisPtr->_isWorking = false; + try + { + throw Failure(PQerrorMessage(thisPtr->_connPtr.get())); + } + catch (...) + { + auto exceptPtr = std::current_exception(); + thisPtr->_exceptCb(exceptPtr); + thisPtr->_exceptCb = decltype(thisPtr->_exceptCb)(); + } + thisPtr->_cb = decltype(thisPtr->_cb)(); + if (thisPtr->_idleCbPtr) + { + auto idle = std::move(thisPtr->_idleCbPtr); + thisPtr->_idleCbPtr.reset(); + (*idle)(); + } + } + return; + } + }; + } } pgPoll(); } @@ -367,7 +411,7 @@ void PgConnection::handleRead() } if (_isWorking) { - if(isPreparing) + if (isPreparing) { _preparingCallback(); _preparingCallback = std::function(); @@ -375,12 +419,13 @@ void PgConnection::handleRead() else { _isWorking = false; + _isRreparingStatement = false; if (_idleCbPtr) { auto idle = std::move(_idleCbPtr); _idleCbPtr.reset(); (*idle)(); } - } + } } }