Optimize the batch mode of libpq. (#205)

This commit is contained in:
An Tao 2019-08-02 14:30:37 +08:00 committed by GitHub
parent 2817253eb4
commit d97cfa8609
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 76 deletions

View File

@ -107,6 +107,9 @@ IF(NOT BUILD_ORM)
SET(BUILD_ORM TRUE CACHE BOOL INTERNAL)
ENDIF()
IF(NOT LIBPQ_BATCH_MODE)
SET(LIBPQ_BATCH_MODE TRUE CACHE BOOL INTERNAL)
ENDIF()
IF(BUILD_ORM)
@ -119,11 +122,13 @@ IF(BUILD_ORM)
LINK_LIBRARIES(${PostgreSQL_LIBRARIES})
SET(drogon_sources ${drogon_sources}
orm_lib/src/postgresql_impl/PostgreSQLResultImpl.cc)
IF(LIBPQ_BATCH_MODE)
TRY_COMPILE(libpq_supports_batch
${CMAKE_BINARY_DIR}/cmaketest
${PROJECT_SOURCE_DIR}/cmake/tests/test_libpq_batch_mode.cc
LINK_LIBRARIES ${PostgreSQL_LIBRARIES}
CMAKE_FLAGS "-DINCLUDE_DIRECTORIES=${PostgreSQL_INCLUDE_DIR}")
ENDIF()
IF(libpq_supports_batch)
MESSAGE(STATUS "The libpq supports fatch mode")
OPTION(LIBPQ_SUPPORTS_BATCH_MODE "ibpq fatch mode" ON)

View File

@ -102,8 +102,9 @@ void DbClientLockFree::execSql(
}
return;
}
else
else if (_sqlCmdBuffer.empty() && _transCallbacks.empty())
{
#if (!LIBPQ_SUPPORTS_BATCH_MODE)
for (auto &conn : _connections)
{
if (!conn->isWorking() &&
@ -130,6 +131,61 @@ void DbClientLockFree::execSql(
return;
}
}
#else
if (_type != ClientType::PostgreSQL)
{
for (auto &conn : _connections)
{
if (!conn->isWorking() &&
(_transSet.empty() ||
_transSet.find(conn) == _transSet.end()))
{
conn->execSql(
std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
[rcb = std::move(rcb), this](const Result &r) {
if (_sqlCmdBuffer.empty())
{
rcb(r);
}
else
{
_loop->queueInLoop(
[rcb = std::move(rcb), r]() { rcb(r); });
}
},
std::move(exceptCallback));
return;
}
}
}
else
{
/// pg batch mode
for (size_t i = 0; i < _connections.size(); i++)
{
auto &conn = _connections[_connectionPos++];
if (_connectionPos >= _connections.size())
_connectionPos = 0;
if (_transSet.empty() ||
_transSet.find(conn) == _transSet.end())
{
conn->execSql(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
return;
}
}
}
#endif
}
if (_sqlCmdBuffer.size() > 20000)

View File

@ -70,6 +70,7 @@ class DbClientLockFree : public DbClient,
std::function<void(const std::shared_ptr<Transaction> &)> &&callback);
void handleNewTask(const DbConnectionPtr &conn);
size_t _connectionPos = 0;//Used for pg batch mode.
};
} // namespace orm

View File

@ -16,9 +16,10 @@
#include "PostgreSQLResultImpl.h"
#include <drogon/orm/Exception.h>
#include <drogon/utils/Utilities.h>
#include <memory>
#include <stdio.h>
#include <trantor/utils/Logger.h>
#include <memory>
#include <algorithm>
#include <stdio.h>
using namespace drogon::orm;
@ -196,6 +197,7 @@ void PgConnection::execSqlInLoop(
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
LOG_TRACE << sql;
_isWorking = true;
_batchSqlCommands.emplace_back(
std::make_shared<SqlCmd>(std::move(sql),
paraNum,
@ -204,14 +206,45 @@ void PgConnection::execSqlInLoop(
std::move(format),
std::move(rcb),
std::move(exceptCallback)));
sendBatchedSql();
if (_batchSqlCommands.size() == 1 && !_channel.isWriting())
{
_loop->queueInLoop([thisPtr = shared_from_this()](){
thisPtr->sendBatchedSql();
});
}
}
int PgConnection::sendBatchEnd()
{
if (!PQsendEndBatch(_connPtr.get()))
{
_isWorking = false;
handleFatalError();
handleClosed();
return 0;
}
return 1;
}
void PgConnection::sendBatchedSql()
{
assert(!_isWorking);
if (_batchSqlCommands.empty())
if (_isWorking && _batchSqlCommands.empty())
{
if (_sendBatchEnd)
{
if (sendBatchEnd())
{
_sendBatchEnd = false;
if (flush())
{
return;
}
else
{
if (_channel.isWriting())
_channel.disableWriting();
}
}
return;
}
if (_channel.isWriting())
_channel.disableWriting();
return;
@ -242,7 +275,7 @@ void PgConnection::sendBatchedSql()
cmd->_preparingStatement = statName;
if (flush())
{
break;
return;
}
}
else
@ -250,6 +283,24 @@ void PgConnection::sendBatchedSql()
statName = iter->second;
}
if (_batchSqlCommands.size() == 1)
{
_sendBatchEnd = true;
}
else
{
auto sql = cmd->_sql;
std::transform(sql.begin(), sql.end(), sql.begin(), tolower);
if (sql.find("update") != std::string::npos ||
sql.find("insert") != std::string::npos ||
sql.find("delete") != std::string::npos ||
sql.find("drop") != std::string::npos ||
sql.find("truncate") != std::string::npos ||
sql.find("lock") != std::string::npos)
{
_sendBatchEnd = true;
}
}
if (PQsendQueryPrepared(_connPtr.get(),
statName.c_str(),
cmd->_paraNum,
@ -267,27 +318,23 @@ void PgConnection::sendBatchedSql()
_batchSqlCommands.pop_front();
if (flush())
{
break;
}
}
if (_batchSqlCommands.empty())
{
if (!PQsendEndBatch(_connPtr.get()))
{
_isWorking = false;
handleFatalError();
handleClosed();
return;
}
flush();
if (_sendBatchEnd)
{
_sendBatchEnd = false;
if (!sendBatchEnd())
{
return;
}
if (flush())
{
return;
}
}
}
}
void PgConnection::batchSqlInLoop(
std::deque<std::shared_ptr<SqlCmd>> &&sqlCommands)
{
_batchSqlCommands = std::move(sqlCommands);
sendBatchedSql();
}
void PgConnection::handleRead()
{
_loop->assertInLoopThread();
@ -301,7 +348,6 @@ void PgConnection::handleRead()
{
_isWorking = false;
handleFatalError();
_cb = nullptr;
}
handleClosed();
return;
@ -326,8 +372,7 @@ void PgConnection::handleRead()
*/
if (!PQgetNextQuery(_connPtr.get()))
{
handleFatalError();
handleClosed();
return;
}
continue;
}
@ -340,11 +385,14 @@ void PgConnection::handleRead()
}
if (type == PGRES_BATCH_END)
{
assert(_batchCommandsForWaitingResults.empty());
assert(_batchSqlCommands.empty());
_isWorking = false;
_idleCb();
return;
if (_batchCommandsForWaitingResults.empty() &&
_batchSqlCommands.empty())
{
_isWorking = false;
_idleCb();
return;
}
continue;
}
if (!_batchCommandsForWaitingResults.empty())
{
@ -376,27 +424,6 @@ void PgConnection::handleRead()
void PgConnection::doAfterPreparing()
{
_isRreparingStatement = false;
_preparedStatementMap[_sql] = _statementName;
if (PQsendQueryPrepared(_connPtr.get(),
_statementName.c_str(),
_paraNum,
_parameters.data(),
_length.data(),
_format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
handleFatalError();
_cb = nullptr;
_idleCb();
}
return;
}
flush();
}
void PgConnection::handleFatalError()
@ -424,16 +451,7 @@ void PgConnection::handleFatalError()
void PgConnection::batchSql(std::deque<std::shared_ptr<SqlCmd>> &&sqlCommands)
{
if (_loop->isInLoopThread())
{
batchSqlInLoop(std::move(sqlCommands));
}
else
{
auto thisPtr = shared_from_this();
_loop->queueInLoop(
[thisPtr, sqlCommands = std::move(sqlCommands)]() mutable {
thisPtr->batchSqlInLoop(std::move(sqlCommands));
});
}
_loop->assertInLoopThread();
_batchSqlCommands = std::move(sqlCommands);
sendBatchedSql();
}

View File

@ -101,12 +101,8 @@ PgConnection::PgConnection(trantor::EventLoop *loop,
pgPoll();
}
});
_channel.setCloseCallback([=]() {
handleClosed();
});
_channel.setErrorCallback([=]() {
handleClosed();
});
_channel.setCloseCallback([=]() { handleClosed(); });
_channel.setErrorCallback([=]() { handleClosed(); });
_channel.enableReading();
_channel.enableWriting();
}
@ -385,8 +381,7 @@ void PgConnection::handleFatalError()
}
}
void PgConnection::batchSql(
std::deque<std::shared_ptr<SqlCmd>> &&sqlCommands)
void PgConnection::batchSql(std::deque<std::shared_ptr<SqlCmd>> &&sqlCommands)
{
assert(false);
}

View File

@ -111,11 +111,12 @@ class PgConnection : public DbConnection,
std::vector<int> _format;
int flush();
void handleFatalError();
#ifdef LIBPQ_SUPPORTS_BATCH_MODE
#if LIBPQ_SUPPORTS_BATCH_MODE
std::list<std::shared_ptr<SqlCmd>> _batchCommandsForWaitingResults;
std::deque<std::shared_ptr<SqlCmd>> _batchSqlCommands;
void batchSqlInLoop(std::deque<std::shared_ptr<SqlCmd>> &&sqlCommands);
void sendBatchedSql();
int sendBatchEnd();
bool _sendBatchEnd = false;
#endif
};