Modify the PgConnection class

This commit is contained in:
antao 2019-02-19 18:52:25 +08:00
parent 6c4ad73f7a
commit 84ab5d369e
1 changed files with 98 additions and 53 deletions

View File

@ -172,13 +172,14 @@ void PgConnection::execSqlInLoop(std::string &&sql,
_idleCbPtr = std::make_shared<std::function<void()>>(std::move(idleCb));
_isWorking = true;
_exceptCb = std::move(exceptCallback);
auto iter = _preparedStatementMap.find(_sql);
if (iter != _preparedStatementMap.end())
if (paraNum == 0)
{
if (PQsendQueryPrepared(
_isRreparingStatement = false;
if (PQsendQueryParams(
_connPtr.get(),
iter->second.c_str(),
_sql.c_str(),
paraNum,
nullptr,
parameters.data(),
length.data(),
format.data(),
@ -188,6 +189,7 @@ void PgConnection::execSqlInLoop(std::string &&sql,
if (_isWorking)
{
_isWorking = false;
_isRreparingStatement = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
@ -211,75 +213,117 @@ void PgConnection::execSqlInLoop(std::string &&sql,
}
else
{
_isRreparingStatement = true;
auto statementName = getuuid();
if (PQsendPrepare(_connPtr.get(), statementName.c_str(), _sql.c_str(), paraNum, NULL) == 0)
auto iter = _preparedStatementMap.find(_sql);
if (iter != _preparedStatementMap.end())
{
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
return;
}
std::weak_ptr<PgConnection> weakPtr = shared_from_this();
_preparingCallback = [weakPtr, statementName, paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_isRreparingStatement = false;
thisPtr->_preparedStatementMap[thisPtr->_sql] = statementName;
_isRreparingStatement = false;
if (PQsendQueryPrepared(
thisPtr->_connPtr.get(),
statementName.c_str(),
_connPtr.get(),
iter->second.c_str(),
paraNum,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (thisPtr->_isWorking)
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
thisPtr->_isWorking = false;
_isWorking = false;
_isRreparingStatement = false;
try
{
throw Failure(PQerrorMessage(thisPtr->_connPtr.get()));
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
thisPtr->_exceptCb(exceptPtr);
thisPtr->_exceptCb = decltype(thisPtr->_exceptCb)();
_exceptCb(exceptPtr);
_exceptCb = decltype(_exceptCb)();
}
thisPtr->_cb = decltype(thisPtr->_cb)();
if (thisPtr->_idleCbPtr)
_cb = decltype(_cb)();
if (_idleCbPtr)
{
auto idle = std::move(thisPtr->_idleCbPtr);
thisPtr->_idleCbPtr.reset();
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
return;
}
};
}
else
{
_isRreparingStatement = true;
auto statementName = getuuid();
if (PQsendPrepare(_connPtr.get(), statementName.c_str(), _sql.c_str(), paraNum, NULL) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
return;
}
std::weak_ptr<PgConnection> weakPtr = shared_from_this();
_preparingCallback = [weakPtr, statementName, paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_isRreparingStatement = false;
thisPtr->_preparedStatementMap[thisPtr->_sql] = statementName;
if (PQsendQueryPrepared(
thisPtr->_connPtr.get(),
statementName.c_str(),
paraNum,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (thisPtr->_isWorking)
{
thisPtr->_isWorking = false;
try
{
throw Failure(PQerrorMessage(thisPtr->_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
thisPtr->_exceptCb(exceptPtr);
thisPtr->_exceptCb = decltype(thisPtr->_exceptCb)();
}
thisPtr->_cb = decltype(thisPtr->_cb)();
if (thisPtr->_idleCbPtr)
{
auto idle = std::move(thisPtr->_idleCbPtr);
thisPtr->_idleCbPtr.reset();
(*idle)();
}
}
return;
}
};
}
}
pgPoll();
}
@ -367,7 +411,7 @@ void PgConnection::handleRead()
}
if (_isWorking)
{
if(isPreparing)
if (isPreparing)
{
_preparingCallback();
_preparingCallback = std::function<void()>();
@ -375,12 +419,13 @@ void PgConnection::handleRead()
else
{
_isWorking = false;
_isRreparingStatement = false;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
}
}
}