From d97cfa86093fd44c108597c261b5541c84934e40 Mon Sep 17 00:00:00 2001 From: An Tao Date: Fri, 2 Aug 2019 14:30:37 +0800 Subject: [PATCH] Optimize the batch mode of libpq. (#205) --- CMakeLists.txt | 5 + orm_lib/src/DbClientLockFree.cc | 58 ++++++- orm_lib/src/DbClientLockFree.h | 1 + .../src/postgresql_impl/PgBatchConnection.cc | 148 ++++++++++-------- orm_lib/src/postgresql_impl/PgConnection.cc | 11 +- orm_lib/src/postgresql_impl/PgConnection.h | 5 +- 6 files changed, 152 insertions(+), 76 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f9ef6796..46dcaef6 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -107,6 +107,9 @@ IF(NOT BUILD_ORM) SET(BUILD_ORM TRUE CACHE BOOL INTERNAL) ENDIF() +IF(NOT LIBPQ_BATCH_MODE) + SET(LIBPQ_BATCH_MODE TRUE CACHE BOOL INTERNAL) +ENDIF() IF(BUILD_ORM) @@ -119,11 +122,13 @@ IF(BUILD_ORM) LINK_LIBRARIES(${PostgreSQL_LIBRARIES}) SET(drogon_sources ${drogon_sources} orm_lib/src/postgresql_impl/PostgreSQLResultImpl.cc) + IF(LIBPQ_BATCH_MODE) TRY_COMPILE(libpq_supports_batch ${CMAKE_BINARY_DIR}/cmaketest ${PROJECT_SOURCE_DIR}/cmake/tests/test_libpq_batch_mode.cc LINK_LIBRARIES ${PostgreSQL_LIBRARIES} CMAKE_FLAGS "-DINCLUDE_DIRECTORIES=${PostgreSQL_INCLUDE_DIR}") + ENDIF() IF(libpq_supports_batch) MESSAGE(STATUS "The libpq supports fatch mode") OPTION(LIBPQ_SUPPORTS_BATCH_MODE "ibpq fatch mode" ON) diff --git a/orm_lib/src/DbClientLockFree.cc b/orm_lib/src/DbClientLockFree.cc index 7d02c601..0c9d550a 100644 --- a/orm_lib/src/DbClientLockFree.cc +++ b/orm_lib/src/DbClientLockFree.cc @@ -102,8 +102,9 @@ void DbClientLockFree::execSql( } return; } - else + else if (_sqlCmdBuffer.empty() && _transCallbacks.empty()) { +#if (!LIBPQ_SUPPORTS_BATCH_MODE) for (auto &conn : _connections) { if (!conn->isWorking() && @@ -130,6 +131,61 @@ void DbClientLockFree::execSql( return; } } +#else + if (_type != ClientType::PostgreSQL) + { + for (auto &conn : _connections) + { + if (!conn->isWorking() && + (_transSet.empty() || + _transSet.find(conn) == _transSet.end())) + { + conn->execSql( + std::move(sql), + paraNum, + std::move(parameters), + std::move(length), + std::move(format), + [rcb = std::move(rcb), this](const Result &r) { + if (_sqlCmdBuffer.empty()) + { + rcb(r); + } + else + { + _loop->queueInLoop( + [rcb = std::move(rcb), r]() { rcb(r); }); + } + }, + std::move(exceptCallback)); + return; + } + } + } + else + { + /// pg batch mode + for (size_t i = 0; i < _connections.size(); i++) + { + auto &conn = _connections[_connectionPos++]; + if (_connectionPos >= _connections.size()) + _connectionPos = 0; + if (_transSet.empty() || + _transSet.find(conn) == _transSet.end()) + { + conn->execSql(std::move(sql), + paraNum, + std::move(parameters), + std::move(length), + std::move(format), + std::move(rcb), + std::move(exceptCallback)); + return; + } + } + } + +#endif } if (_sqlCmdBuffer.size() > 20000) diff --git a/orm_lib/src/DbClientLockFree.h b/orm_lib/src/DbClientLockFree.h index 9daa2a7f..050a5dfd 100644 --- a/orm_lib/src/DbClientLockFree.h +++ b/orm_lib/src/DbClientLockFree.h @@ -70,6 +70,7 @@ class DbClientLockFree : public DbClient, std::function &)> &&callback); void handleNewTask(const DbConnectionPtr &conn); + size_t _connectionPos = 0;//Used for pg batch mode. }; } // namespace orm diff --git a/orm_lib/src/postgresql_impl/PgBatchConnection.cc b/orm_lib/src/postgresql_impl/PgBatchConnection.cc index bf83db2d..02fb61f4 100644 --- a/orm_lib/src/postgresql_impl/PgBatchConnection.cc +++ b/orm_lib/src/postgresql_impl/PgBatchConnection.cc @@ -16,9 +16,10 @@ #include "PostgreSQLResultImpl.h" #include #include -#include -#include #include +#include +#include +#include using namespace drogon::orm; @@ -196,6 +197,7 @@ void PgConnection::execSqlInLoop( std::function &&exceptCallback) { LOG_TRACE << sql; + _isWorking = true; _batchSqlCommands.emplace_back( std::make_shared(std::move(sql), paraNum, @@ -204,14 +206,45 @@ void PgConnection::execSqlInLoop( std::move(format), std::move(rcb), std::move(exceptCallback))); - sendBatchedSql(); + if (_batchSqlCommands.size() == 1 && !_channel.isWriting()) + { + _loop->queueInLoop([thisPtr = shared_from_this()](){ + thisPtr->sendBatchedSql(); + }); + } +} +int PgConnection::sendBatchEnd() +{ + if (!PQsendEndBatch(_connPtr.get())) + { + _isWorking = false; + handleFatalError(); + handleClosed(); + return 0; + } + return 1; } - void PgConnection::sendBatchedSql() { - assert(!_isWorking); - if (_batchSqlCommands.empty()) + if (_isWorking && _batchSqlCommands.empty()) { + if (_sendBatchEnd) + { + if (sendBatchEnd()) + { + _sendBatchEnd = false; + if (flush()) + { + return; + } + else + { + if (_channel.isWriting()) + _channel.disableWriting(); + } + } + return; + } if (_channel.isWriting()) _channel.disableWriting(); return; @@ -242,7 +275,7 @@ void PgConnection::sendBatchedSql() cmd->_preparingStatement = statName; if (flush()) { - break; + return; } } else @@ -250,6 +283,24 @@ void PgConnection::sendBatchedSql() statName = iter->second; } + if (_batchSqlCommands.size() == 1) + { + _sendBatchEnd = true; + } + else + { + auto sql = cmd->_sql; + std::transform(sql.begin(), sql.end(), sql.begin(), tolower); + if (sql.find("update") != std::string::npos || + sql.find("insert") != std::string::npos || + sql.find("delete") != std::string::npos || + sql.find("drop") != std::string::npos || + sql.find("truncate") != std::string::npos || + sql.find("lock") != std::string::npos) + { + _sendBatchEnd = true; + } + } if (PQsendQueryPrepared(_connPtr.get(), statName.c_str(), cmd->_paraNum, @@ -267,27 +318,23 @@ void PgConnection::sendBatchedSql() _batchSqlCommands.pop_front(); if (flush()) { - break; - } - } - if (_batchSqlCommands.empty()) - { - if (!PQsendEndBatch(_connPtr.get())) - { - _isWorking = false; - handleFatalError(); - handleClosed(); return; } - flush(); + if (_sendBatchEnd) + { + _sendBatchEnd = false; + if (!sendBatchEnd()) + { + return; + } + if (flush()) + { + return; + } + } } } -void PgConnection::batchSqlInLoop( - std::deque> &&sqlCommands) -{ - _batchSqlCommands = std::move(sqlCommands); - sendBatchedSql(); -} + void PgConnection::handleRead() { _loop->assertInLoopThread(); @@ -301,7 +348,6 @@ void PgConnection::handleRead() { _isWorking = false; handleFatalError(); - _cb = nullptr; } handleClosed(); return; @@ -326,8 +372,7 @@ void PgConnection::handleRead() */ if (!PQgetNextQuery(_connPtr.get())) { - handleFatalError(); - handleClosed(); + return; } continue; } @@ -340,11 +385,14 @@ void PgConnection::handleRead() } if (type == PGRES_BATCH_END) { - assert(_batchCommandsForWaitingResults.empty()); - assert(_batchSqlCommands.empty()); - _isWorking = false; - _idleCb(); - return; + if (_batchCommandsForWaitingResults.empty() && + _batchSqlCommands.empty()) + { + _isWorking = false; + _idleCb(); + return; + } + continue; } if (!_batchCommandsForWaitingResults.empty()) { @@ -376,27 +424,6 @@ void PgConnection::handleRead() void PgConnection::doAfterPreparing() { - _isRreparingStatement = false; - _preparedStatementMap[_sql] = _statementName; - if (PQsendQueryPrepared(_connPtr.get(), - _statementName.c_str(), - _paraNum, - _parameters.data(), - _length.data(), - _format.data(), - 0) == 0) - { - LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get()); - if (_isWorking) - { - _isWorking = false; - handleFatalError(); - _cb = nullptr; - _idleCb(); - } - return; - } - flush(); } void PgConnection::handleFatalError() @@ -424,16 +451,7 @@ void PgConnection::handleFatalError() void PgConnection::batchSql(std::deque> &&sqlCommands) { - if (_loop->isInLoopThread()) - { - batchSqlInLoop(std::move(sqlCommands)); - } - else - { - auto thisPtr = shared_from_this(); - _loop->queueInLoop( - [thisPtr, sqlCommands = std::move(sqlCommands)]() mutable { - thisPtr->batchSqlInLoop(std::move(sqlCommands)); - }); - } + _loop->assertInLoopThread(); + _batchSqlCommands = std::move(sqlCommands); + sendBatchedSql(); } diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 8e66239e..36504460 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -101,12 +101,8 @@ PgConnection::PgConnection(trantor::EventLoop *loop, pgPoll(); } }); - _channel.setCloseCallback([=]() { - handleClosed(); - }); - _channel.setErrorCallback([=]() { - handleClosed(); - }); + _channel.setCloseCallback([=]() { handleClosed(); }); + _channel.setErrorCallback([=]() { handleClosed(); }); _channel.enableReading(); _channel.enableWriting(); } @@ -385,8 +381,7 @@ void PgConnection::handleFatalError() } } -void PgConnection::batchSql( - std::deque> &&sqlCommands) +void PgConnection::batchSql(std::deque> &&sqlCommands) { assert(false); } \ No newline at end of file diff --git a/orm_lib/src/postgresql_impl/PgConnection.h b/orm_lib/src/postgresql_impl/PgConnection.h index fecc70a5..a1a7f134 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.h +++ b/orm_lib/src/postgresql_impl/PgConnection.h @@ -111,11 +111,12 @@ class PgConnection : public DbConnection, std::vector _format; int flush(); void handleFatalError(); -#ifdef LIBPQ_SUPPORTS_BATCH_MODE +#if LIBPQ_SUPPORTS_BATCH_MODE std::list> _batchCommandsForWaitingResults; std::deque> _batchSqlCommands; - void batchSqlInLoop(std::deque> &&sqlCommands); void sendBatchedSql(); + int sendBatchEnd(); + bool _sendBatchEnd = false; #endif };