From 937a2fd13621943dcb676f12d1476546b5418128 Mon Sep 17 00:00:00 2001 From: antao Date: Tue, 8 Jan 2019 19:37:19 +0800 Subject: [PATCH 1/4] Optimize PostgreSQL client --- orm_lib/inc/drogon/orm/DbClient.h | 12 ++-- orm_lib/inc/drogon/orm/Result.h | 4 +- orm_lib/src/DbClientImpl.cc | 68 ++++++++++--------- orm_lib/src/DbClientImpl.h | 27 ++++---- orm_lib/src/DbConnection.h | 14 ++-- orm_lib/src/Result.cc | 5 +- orm_lib/src/ResultImpl.h | 5 ++ orm_lib/src/SqlBinder.cc | 4 +- orm_lib/src/TransactionImpl.cc | 48 ++++++------- orm_lib/src/TransactionImpl.h | 12 ++-- orm_lib/src/mysql_impl/MysqlConnection.cc | 20 +++--- orm_lib/src/mysql_impl/MysqlConnection.h | 14 ++-- orm_lib/src/mysql_impl/MysqlResultImpl.h | 5 +- orm_lib/src/postgresql_impl/PgConnection.cc | 46 +++++++------ orm_lib/src/postgresql_impl/PgConnection.h | 14 ++-- .../postgresql_impl/PostgreSQLResultImpl.h | 6 +- orm_lib/src/sqlite3_impl/Sqlite3Connection.cc | 16 ++--- orm_lib/src/sqlite3_impl/Sqlite3Connection.h | 14 ++-- orm_lib/src/sqlite3_impl/Sqlite3ResultImpl.h | 1 + 19 files changed, 175 insertions(+), 160 deletions(-) diff --git a/orm_lib/inc/drogon/orm/DbClient.h b/orm_lib/inc/drogon/orm/DbClient.h index a47ff3ff..59ac0314 100644 --- a/orm_lib/inc/drogon/orm/DbClient.h +++ b/orm_lib/inc/drogon/orm/DbClient.h @@ -152,13 +152,13 @@ class DbClient : public trantor::NonCopyable private: friend internal::SqlBinder; - virtual void execSql(const std::string &sql, + virtual void execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exptCallback) = 0; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) = 0; protected: ClientType _type; diff --git a/orm_lib/inc/drogon/orm/Result.h b/orm_lib/inc/drogon/orm/Result.h index c11cc51e..615bae45 100644 --- a/orm_lib/inc/drogon/orm/Result.h +++ b/orm_lib/inc/drogon/orm/Result.h @@ -84,9 +84,11 @@ class Result /// For Mysql, Sqlite3 database unsigned long long insertId() const noexcept; + const std::string &sql() const noexcept; + private: ResultImplPtr _resultPtr; - std::string _query; + std::string _errString; friend class Field; friend class Row; diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index fad9d048..5818b036 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -73,13 +73,13 @@ DbClientImpl::~DbClientImpl() noexcept } void DbClientImpl::execSql(const DbConnectionPtr &conn, - const std::string &sql, + std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &cb, - const std::function &exceptCallback) + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) { if (!conn) { @@ -94,8 +94,8 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn, return; } std::weak_ptr weakConn = conn; - conn->execSql(sql, paraNum, parameters, length, format, - cb, exceptCallback, + conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), + std::move(rcb), std::move(exceptCallback), [=]() -> void { { auto connPtr = weakConn.lock(); @@ -105,18 +105,18 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn, } }); } -void DbClientImpl::execSql(const std::string &sql, +void DbClientImpl::execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const QueryCallback &cb, - const ExceptPtrCallback &exceptCb) + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) { assert(paraNum == parameters.size()); assert(paraNum == length.size()); assert(paraNum == format.size()); - assert(cb); + assert(rcb); DbConnectionPtr conn; { std::lock_guard guard(_connectionsMutex); @@ -131,7 +131,7 @@ void DbClientImpl::execSql(const std::string &sql, } catch (...) { - exceptCb(std::current_exception()); + exceptCallback(std::current_exception()); } return; } @@ -146,7 +146,7 @@ void DbClientImpl::execSql(const std::string &sql, } if (conn) { - execSql(conn, sql, paraNum, parameters, length, format, cb, exceptCb); + execSql(conn, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback)); return; } bool busy = false; @@ -166,19 +166,19 @@ void DbClientImpl::execSql(const std::string &sql, } catch (...) { - exceptCb(std::current_exception()); + exceptCallback(std::current_exception()); } return; } //LOG_TRACE << "Push query to buffer"; - SqlCmd cmd; - cmd._sql = sql; - cmd._paraNum = paraNum; - cmd._parameters = parameters; - cmd._length = length; - cmd._format = format; - cmd._cb = cb; - cmd._exceptCb = exceptCb; + std::shared_ptr cmd = std::make_shared(); + cmd->_sql = std::move(sql); + cmd->_paraNum = paraNum; + cmd->_parameters = std::move(parameters); + cmd->_length = std::move(length); + cmd->_format = std::move(format); + cmd->_cb = std::move(rcb); + cmd->_exceptCb = std::move(exceptCallback); { std::lock_guard guard(_bufferMutex); _sqlCmdBuffer.push_back(std::move(cmd)); @@ -239,18 +239,20 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) if (!_sqlCmdBuffer.empty()) { _busyConnections.insert(connPtr); //For new connections, this sentence is necessary - auto cmd = _sqlCmdBuffer.front(); - _sqlCmdBuffer.pop_front(); - _loopPtr->queueInLoop([=]() { - execSql(connPtr, cmd._sql, cmd._paraNum, cmd._parameters, cmd._length, cmd._format, cmd._cb, cmd._exceptCb); + auto &cmd = _sqlCmdBuffer.front(); + _loopPtr->queueInLoop([connPtr, cmd, this]() { + execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb)); }); - + _sqlCmdBuffer.pop_front(); return; } } //Idle connection - _busyConnections.erase(connPtr); - _readyConnections.insert(connPtr); + _loopPtr->queueInLoop([connPtr, this]() { + std::lock_guard guard(_connectionsMutex); + _busyConnections.erase(connPtr); + _readyConnections.insert(connPtr); + }); } } diff --git a/orm_lib/src/DbClientImpl.h b/orm_lib/src/DbClientImpl.h index 3149ad2c..ad28950c 100644 --- a/orm_lib/src/DbClientImpl.h +++ b/orm_lib/src/DbClientImpl.h @@ -34,25 +34,26 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback) override; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) override; virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) override; private: void ioLoop(); std::shared_ptr _loopPtr; - void execSql(const DbConnectionPtr &conn, const std::string &sql, + void execSql(const DbConnectionPtr &conn, + std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback); + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback); DbConnectionPtr newConnection(); @@ -78,7 +79,7 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this _sqlCmdBuffer; + std::deque> _sqlCmdBuffer; std::mutex _bufferMutex; void handleNewTask(const DbConnectionPtr &conn); diff --git a/orm_lib/src/DbConnection.h b/orm_lib/src/DbConnection.h index 3f3ece48..389a1871 100644 --- a/orm_lib/src/DbConnection.h +++ b/orm_lib/src/DbConnection.h @@ -51,14 +51,14 @@ class DbConnection : public trantor::NonCopyable { _closeCb = cb; } - virtual void execSql(const std::string &sql, + virtual void execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) = 0; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) = 0; virtual ~DbConnection() { LOG_TRACE << "Destruct DbConn" << this; diff --git a/orm_lib/src/Result.cc b/orm_lib/src/Result.cc index d91108af..6eeea770 100644 --- a/orm_lib/src/Result.cc +++ b/orm_lib/src/Result.cc @@ -114,7 +114,6 @@ Result::size_type Result::size() const noexcept void Result::swap(Result &other) noexcept { _resultPtr.swap(other._resultPtr); - _query.swap(other._query); _errString.swap(other._errString); } Result::row_size_type Result::columns() const noexcept @@ -148,4 +147,8 @@ Result::field_size_type Result::getLength(Result::size_type row, Result::row_siz unsigned long long Result::insertId() const noexcept { return _resultPtr->insertId(); +} +const std::string &Result::sql() const noexcept +{ + return _resultPtr->sql(); } \ No newline at end of file diff --git a/orm_lib/src/ResultImpl.h b/orm_lib/src/ResultImpl.h index 6e1e5f46..f13e7fa7 100644 --- a/orm_lib/src/ResultImpl.h +++ b/orm_lib/src/ResultImpl.h @@ -24,6 +24,7 @@ namespace orm class ResultImpl : public trantor::NonCopyable, public Result { public: + ResultImpl(const std::string &query) : _query(query) {} virtual size_type size() const noexcept = 0; virtual row_size_type columns() const noexcept = 0; virtual const char *columnName(row_size_type Number) const = 0; @@ -32,8 +33,12 @@ class ResultImpl : public trantor::NonCopyable, public Result virtual const char *getValue(size_type row, row_size_type column) const = 0; virtual bool isNull(size_type row, row_size_type column) const = 0; virtual field_size_type getLength(size_type row, row_size_type column) const = 0; + virtual const std::string &sql() const { return _query; } virtual unsigned long long insertId() const noexcept { return 0; } virtual ~ResultImpl() {} + + private: + std::string _query; }; } // namespace orm diff --git a/orm_lib/src/SqlBinder.cc b/orm_lib/src/SqlBinder.cc index bb5a8214..e27f840e 100644 --- a/orm_lib/src/SqlBinder.cc +++ b/orm_lib/src/SqlBinder.cc @@ -28,7 +28,7 @@ void SqlBinder::exec() //nonblocking mode,default mode //Retain shared_ptrs of parameters until we get the result; std::shared_ptr objs = std::make_shared(std::move(_objs)); - _client.execSql(_sql, _paraNum, _parameters, _length, _format, + _client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format), [holder = std::move(_callbackHolder), objs](const Result &r) { objs->clear(); if (holder) @@ -67,7 +67,7 @@ void SqlBinder::exec() std::shared_ptr> pro(new std::promise); auto f = pro->get_future(); - _client.execSql(_sql, _paraNum, _parameters, _length, _format, + _client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format), [pro](const Result &r) { pro->set_value(r); }, diff --git a/orm_lib/src/TransactionImpl.cc b/orm_lib/src/TransactionImpl.cc index ddab86a4..9e8c9536 100644 --- a/orm_lib/src/TransactionImpl.cc +++ b/orm_lib/src/TransactionImpl.cc @@ -70,27 +70,27 @@ TransactionImpl::~TransactionImpl() }); } } -void TransactionImpl::execSql(const std::string &sql, +void TransactionImpl::execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback) + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) { auto thisPtr = shared_from_this(); - _loop->queueInLoop([thisPtr, sql, paraNum, parameters, length, format, rcb, exceptCallback]() { + _loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable { if (!thisPtr->_isCommitedOrRolledback) { if (!thisPtr->_isWorking) { thisPtr->_isWorking = true; - thisPtr->_connectionPtr->execSql(sql, + thisPtr->_connectionPtr->execSql(std::move(sql), paraNum, - parameters, - length, - format, - rcb, + std::move(parameters), + std::move(length), + std::move(format), + std::move(rcb), [exceptCallback, thisPtr](const std::exception_ptr &ePtr) { thisPtr->rollback(); if (exceptCallback) @@ -104,13 +104,13 @@ void TransactionImpl::execSql(const std::string &sql, { //push sql cmd to buffer; SqlCmd cmd; - cmd._sql = sql; + cmd._sql = std::move(sql); cmd._paraNum = paraNum; - cmd._parameters = parameters; - cmd._length = length; - cmd._format = format; - cmd._cb = rcb; - cmd._exceptCb = exceptCallback; + cmd._parameters = std::move(parameters); + cmd._length = std::move(length); + cmd._format = std::move(format); + cmd._cb = std::move(rcb); + cmd._exceptCb = std::move(exceptCallback); thisPtr->_sqlCmdBuffer.push_back(std::move(cmd)); } } @@ -196,13 +196,13 @@ void TransactionImpl::execNewTask() auto conn = _connectionPtr; - _loop->queueInLoop([=]() { - conn->execSql(cmd._sql, + _loop->queueInLoop([=]() mutable { + conn->execSql(std::move(cmd._sql), cmd._paraNum, - cmd._parameters, - cmd._length, - cmd._format, - cmd._cb, + std::move(cmd._parameters), + std::move(cmd._length), + std::move(cmd._format), + std::move(cmd._cb), [cmd, thisPtr](const std::exception_ptr &ePtr) { thisPtr->rollback(); if (cmd._exceptCb) diff --git a/orm_lib/src/TransactionImpl.h b/orm_lib/src/TransactionImpl.h index 2212db74..dc26fc34 100644 --- a/orm_lib/src/TransactionImpl.h +++ b/orm_lib/src/TransactionImpl.h @@ -32,13 +32,13 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< private: DbConnectionPtr _connectionPtr; - virtual void execSql(const std::string &sql, + virtual void execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback) override; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) override; virtual std::shared_ptr newTransaction(const std::function&) override { return shared_from_this(); diff --git a/orm_lib/src/mysql_impl/MysqlConnection.cc b/orm_lib/src/mysql_impl/MysqlConnection.cc index 2e04c4dd..fd62a8c4 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.cc +++ b/orm_lib/src/mysql_impl/MysqlConnection.cc @@ -268,14 +268,14 @@ void MysqlConnection::handleEvent() } } -void MysqlConnection::execSql(const std::string &sql, +void MysqlConnection::execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) { LOG_TRACE << sql; assert(paraNum == parameters.size()); @@ -286,10 +286,10 @@ void MysqlConnection::execSql(const std::string &sql, assert(!_isWorking); assert(!sql.empty()); - _cb = rcb; - _idleCb = idleCb; + _cb = std::move(rcb); + _idleCb = std::move(idleCb); _isWorking = true; - _exceptCb = exceptCallback; + _exceptCb = std::move(exceptCallback); _sql.clear(); if (paraNum > 0) { diff --git a/orm_lib/src/mysql_impl/MysqlConnection.h b/orm_lib/src/mysql_impl/MysqlConnection.h index 6182bda7..60f300cf 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.h +++ b/orm_lib/src/mysql_impl/MysqlConnection.h @@ -37,14 +37,14 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this public: MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo); ~MysqlConnection() {} - virtual void execSql(const std::string &sql, + virtual void execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) override; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) override; private: std::unique_ptr _channelPtr; diff --git a/orm_lib/src/mysql_impl/MysqlResultImpl.h b/orm_lib/src/mysql_impl/MysqlResultImpl.h index 0c132fc7..7f031e3f 100644 --- a/orm_lib/src/mysql_impl/MysqlResultImpl.h +++ b/orm_lib/src/mysql_impl/MysqlResultImpl.h @@ -34,8 +34,8 @@ class MysqlResultImpl : public ResultImpl const std::string &query, size_type affectedRows, unsigned long long insertId) noexcept - : _result(r), - _query(query), + : ResultImpl(query), + _result(r), _rowsNum(_result ? mysql_num_rows(_result.get()) : 0), _fieldArray(r ? mysql_fetch_fields(r.get()) : nullptr), _fieldNum(r ? mysql_num_fields(r.get()) : 0), @@ -78,7 +78,6 @@ class MysqlResultImpl : public ResultImpl private: const std::shared_ptr _result; - const std::string _query; const Result::size_type _rowsNum; const MYSQL_FIELD *_fieldArray; const Result::row_size_type _fieldNum; diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 545374f3..f6780d1e 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -97,12 +97,14 @@ void PgConnection::pgPoll() LOG_ERROR << "!!!Pg connection failed: " << PQerrorMessage(_connPtr.get()); break; case PGRES_POLLING_WRITING: - _channel.enableWriting(); - _channel.disableReading(); + if (!_channel.isWriting()) + _channel.enableWriting(); break; case PGRES_POLLING_READING: - _channel.enableReading(); - _channel.disableWriting(); + if (!_channel.isReading()) + _channel.enableReading(); + if (_channel.isWriting()) + _channel.disableWriting(); break; case PGRES_POLLING_OK: @@ -112,8 +114,10 @@ void PgConnection::pgPoll() assert(_okCb); _okCb(shared_from_this()); } - _channel.enableReading(); - _channel.disableWriting(); + if (!_channel.isReading()) + _channel.enableReading(); + if (_channel.isWriting()) + _channel.disableWriting(); break; case PGRES_POLLING_ACTIVE: //unused! @@ -122,14 +126,14 @@ void PgConnection::pgPoll() break; } } -void PgConnection::execSql(const std::string &sql, +void PgConnection::execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) { LOG_TRACE << sql; assert(paraNum == parameters.size()); @@ -139,16 +143,16 @@ void PgConnection::execSql(const std::string &sql, assert(idleCb); assert(!_isWorking); assert(!sql.empty()); - _sql = sql; - _cb = rcb; - _idleCb = idleCb; + _sql = std::move(sql); + _cb = std::move(rcb); + _idleCb = std::move(idleCb); _isWorking = true; - _exceptCb = exceptCallback; + _exceptCb = std::move(exceptCallback); auto thisPtr = shared_from_this(); - _loop->runInLoop([thisPtr, sql, paraNum, parameters, length, format]() { + _loop->runInLoop([thisPtr, paraNum=std::move(paraNum), parameters=std::move(parameters), length=std::move(length), format=std::move(format)]() { if (PQsendQueryParams( thisPtr->_connPtr.get(), - sql.c_str(), + thisPtr->_sql.c_str(), paraNum, NULL, parameters.data(), @@ -198,8 +202,8 @@ void PgConnection::handleRead() //need read more data from socket; return; } - - _channel.disableWriting(); + if (_channel.isWriting()) + _channel.disableWriting(); // got query results? while ((res = std::shared_ptr(PQgetResult(_connPtr.get()), [](PGresult *p) { PQclear(p); diff --git a/orm_lib/src/postgresql_impl/PgConnection.h b/orm_lib/src/postgresql_impl/PgConnection.h index 4d8feb55..79ef8ec0 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.h +++ b/orm_lib/src/postgresql_impl/PgConnection.h @@ -37,14 +37,14 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) override; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) override; private: std::shared_ptr _connPtr; diff --git a/orm_lib/src/postgresql_impl/PostgreSQLResultImpl.h b/orm_lib/src/postgresql_impl/PostgreSQLResultImpl.h index 1cdf49f8..dcd4795e 100644 --- a/orm_lib/src/postgresql_impl/PostgreSQLResultImpl.h +++ b/orm_lib/src/postgresql_impl/PostgreSQLResultImpl.h @@ -29,8 +29,8 @@ class PostgreSQLResultImpl : public ResultImpl { public: PostgreSQLResultImpl(const std::shared_ptr &r, const std::string &query) noexcept - : _result(r), - _query(query) + :ResultImpl(query), + _result(r) { } virtual size_type size() const noexcept override; @@ -41,10 +41,8 @@ class PostgreSQLResultImpl : public ResultImpl virtual const char *getValue(size_type row, row_size_type column) const override; virtual bool isNull(size_type row, row_size_type column) const override; virtual field_size_type getLength(size_type row, row_size_type column) const override; - private: std::shared_ptr _result; - std::string _query; }; } // namespace orm diff --git a/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc b/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc index 0759c961..a5e092a4 100644 --- a/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc +++ b/orm_lib/src/sqlite3_impl/Sqlite3Connection.cc @@ -86,17 +86,17 @@ Sqlite3Connection::Sqlite3Connection(trantor::EventLoop *loop, const std::string }); } -void Sqlite3Connection::execSql(const std::string &sql, +void Sqlite3Connection::execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) { auto thisPtr = shared_from_this(); - _loopThread.getLoop()->runInLoop([=]() { + _loopThread.getLoop()->runInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable { thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb); }); } diff --git a/orm_lib/src/sqlite3_impl/Sqlite3Connection.h b/orm_lib/src/sqlite3_impl/Sqlite3Connection.h index b196d5f9..0bdcf419 100644 --- a/orm_lib/src/sqlite3_impl/Sqlite3Connection.h +++ b/orm_lib/src/sqlite3_impl/Sqlite3Connection.h @@ -38,14 +38,14 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th public: Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo); - virtual void execSql(const std::string &sql, + virtual void execSql(std::string &&sql, size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback, - const std::function &idleCb) override; + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback, + std::function &&idleCb) override; private: static std::once_flag _once; diff --git a/orm_lib/src/sqlite3_impl/Sqlite3ResultImpl.h b/orm_lib/src/sqlite3_impl/Sqlite3ResultImpl.h index 55fc00b7..715cb3f1 100644 --- a/orm_lib/src/sqlite3_impl/Sqlite3ResultImpl.h +++ b/orm_lib/src/sqlite3_impl/Sqlite3ResultImpl.h @@ -30,6 +30,7 @@ class Sqlite3ResultImpl : public ResultImpl { public: Sqlite3ResultImpl(const std::string &query) noexcept + : ResultImpl(query) { } virtual size_type size() const noexcept override; From ac90c8071079c57a361f667e6064447f6c66a84c Mon Sep 17 00:00:00 2001 From: antao Date: Wed, 9 Jan 2019 10:46:58 +0800 Subject: [PATCH 2/4] Use multiple event loops in the db client --- lib/inc/drogon/HttpAppFramework.h | 1 + lib/src/HttpAppFrameworkImpl.h | 1 + orm_lib/src/DbClientImpl.cc | 56 ++++++++++++++----------------- orm_lib/src/DbClientImpl.h | 18 +++++----- 4 files changed, 36 insertions(+), 40 deletions(-) diff --git a/lib/inc/drogon/HttpAppFramework.h b/lib/inc/drogon/HttpAppFramework.h index ce106704..02677572 100755 --- a/lib/inc/drogon/HttpAppFramework.h +++ b/lib/inc/drogon/HttpAppFramework.h @@ -100,6 +100,7 @@ class HttpAppFramework : public trantor::NonCopyable virtual trantor::EventLoop *loop() = 0; virtual void setThreadNum(size_t threadNum) = 0; + virtual size_t getThreadNum() const = 0; virtual void setSSLFiles(const std::string &certPath, const std::string &keyPath) = 0; virtual void addListener(const std::string &ip, diff --git a/lib/src/HttpAppFrameworkImpl.h b/lib/src/HttpAppFrameworkImpl.h index 7526b07d..8cdcadeb 100644 --- a/lib/src/HttpAppFrameworkImpl.h +++ b/lib/src/HttpAppFrameworkImpl.h @@ -45,6 +45,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework const std::string &certFile = "", const std::string &keyFile = "") override; virtual void setThreadNum(size_t threadNum) override; + virtual size_t getThreadNum() const override { return _threadNum; } virtual void setSSLFiles(const std::string &certPath, const std::string &keyPath) override; virtual void run() override; diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index 5818b036..c097fddf 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -26,6 +26,7 @@ #include "TransactionImpl.h" #include #include +#include #include #include #include @@ -42,34 +43,29 @@ using namespace drogon::orm; DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type) : _connInfo(connInfo), - _connectNum(connNum) + _connectNum(connNum), + _loops(connNum / 10 > 0 ? (connNum / 10 < drogon::app().getThreadNum() ? connNum / 10 : drogon::app().getThreadNum()) : 1) { _type = type; LOG_TRACE << "type=" << (int)type; + //LOG_DEBUG << _loops.getLoopNum(); assert(connNum > 0); - _loopThread = std::thread([=]() { - _loopPtr = std::shared_ptr(new trantor::EventLoop); - ioLoop(); - }); -} -void DbClientImpl::ioLoop() -{ - auto thisPtr = shared_from_this(); - _loopPtr->runAfter(0, [thisPtr]() { - for (size_t i = 0; i < thisPtr->_connectNum; i++) - { - thisPtr->_connections.insert(thisPtr->newConnection()); - } - }); - _loopPtr->loop(); + _loops.start(); + for (size_t i = 0; i < _connectNum; i++) + { + auto loop = _loops.getNextLoop(); + loop->runInLoop([this, loop]() { + _connections.insert(newConnection(loop)); + }); + } } DbClientImpl::~DbClientImpl() noexcept { - _stop = true; - _loopPtr->quit(); - if (_loopThread.joinable()) - _loopThread.join(); + std::lock_guard lock(_connectionsMutex); + _connections.clear(); + _readyConnections.clear(); + _busyConnections.clear(); } void DbClientImpl::execSql(const DbConnectionPtr &conn, @@ -240,7 +236,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) { _busyConnections.insert(connPtr); //For new connections, this sentence is necessary auto &cmd = _sqlCmdBuffer.front(); - _loopPtr->queueInLoop([connPtr, cmd, this]() { + connPtr->loop()->queueInLoop([connPtr, cmd, this]() { execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb)); }); _sqlCmdBuffer.pop_front(); @@ -248,7 +244,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) } } //Idle connection - _loopPtr->queueInLoop([connPtr, this]() { + connPtr->loop()->queueInLoop([connPtr, this]() { std::lock_guard guard(_connectionsMutex); _busyConnections.erase(connPtr); _readyConnections.insert(connPtr); @@ -256,13 +252,13 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) } } -DbConnectionPtr DbClientImpl::newConnection() +DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop) { DbConnectionPtr connPtr; if (_type == ClientType::PostgreSQL) { #if USE_POSTGRESQL - connPtr = std::make_shared(_loopPtr.get(), _connInfo); + connPtr = std::make_shared(loop, _connInfo); #else return nullptr; #endif @@ -270,7 +266,7 @@ DbConnectionPtr DbClientImpl::newConnection() else if (_type == ClientType::Mysql) { #if USE_MYSQL - connPtr = std::make_shared(_loopPtr.get(), _connInfo); + connPtr = std::make_shared(loop, _connInfo); #else return nullptr; #endif @@ -278,7 +274,7 @@ DbConnectionPtr DbClientImpl::newConnection() else if (_type == ClientType::Sqlite3) { #if USE_SQLITE3 - connPtr = std::make_shared(_loopPtr.get(), _connInfo); + connPtr = std::make_shared(loop, _connInfo); #else return nullptr; #endif @@ -287,11 +283,11 @@ DbConnectionPtr DbClientImpl::newConnection() { return nullptr; } - auto loopPtr = _loopPtr; + std::weak_ptr weakPtr = shared_from_this(); - connPtr->setCloseCallback([weakPtr, loopPtr](const DbConnectionPtr &closeConnPtr) { + connPtr->setCloseCallback([weakPtr, loop](const DbConnectionPtr &closeConnPtr) { //Reconnect after 1 second - loopPtr->runAfter(1, [weakPtr, closeConnPtr] { + loop->runAfter(1, [weakPtr, closeConnPtr, loop] { auto thisPtr = weakPtr.lock(); if (!thisPtr) return; @@ -300,7 +296,7 @@ DbConnectionPtr DbClientImpl::newConnection() thisPtr->_busyConnections.erase(closeConnPtr); assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end()); thisPtr->_connections.erase(closeConnPtr); - thisPtr->_connections.insert(thisPtr->newConnection()); + thisPtr->_connections.insert(thisPtr->newConnection(loop)); }); }); connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { diff --git a/orm_lib/src/DbClientImpl.h b/orm_lib/src/DbClientImpl.h index ad28950c..947850bd 100644 --- a/orm_lib/src/DbClientImpl.h +++ b/orm_lib/src/DbClientImpl.h @@ -16,7 +16,7 @@ #include "DbConnection.h" #include -#include +#include #include #include #include @@ -44,8 +44,10 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this newTransaction(const std::function &commitCallback = std::function()) override; private: - void ioLoop(); - std::shared_ptr _loopPtr; + std::string _connInfo; + size_t _connectNum; + trantor::EventLoopThreadPool _loops; + void execSql(const DbConnectionPtr &conn, std::string &&sql, size_t paraNum, @@ -55,20 +57,16 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this &&exceptCallback); - DbConnectionPtr newConnection(); + DbConnectionPtr newConnection(trantor::EventLoop *loop); + std::mutex _connectionsMutex; std::unordered_set _connections; std::unordered_set _readyConnections; std::unordered_set _busyConnections; - std::string _connInfo; - std::thread _loopThread; - std::mutex _connectionsMutex; + std::condition_variable _condConnectionReady; size_t _transWaitNum = 0; - size_t _connectNum; - bool _stop = false; - struct SqlCmd { std::string _sql; From e1b873ba01c3fb8fde8d585f88f083a555afce73 Mon Sep 17 00:00:00 2001 From: antao Date: Wed, 9 Jan 2019 13:12:07 +0800 Subject: [PATCH 3/4] Modify the DbClientImpl class --- orm_lib/src/DbClientImpl.cc | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index c097fddf..07a72a51 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -51,13 +51,15 @@ DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, Cl //LOG_DEBUG << _loops.getLoopNum(); assert(connNum > 0); _loops.start(); - for (size_t i = 0; i < _connectNum; i++) - { - auto loop = _loops.getNextLoop(); - loop->runInLoop([this, loop]() { - _connections.insert(newConnection(loop)); - }); - } + std::thread([this]() { + for (size_t i = 0; i < _connectNum; i++) + { + auto loop = _loops.getNextLoop(); + loop->runInLoop([this, loop]() { + _connections.insert(newConnection(loop)); + }); + } + }).detach(); } DbClientImpl::~DbClientImpl() noexcept From 08300f93c252dfa9e3f0627300fcc68da0edf83d Mon Sep 17 00:00:00 2001 From: an-tao <20741618@qq.com> Date: Wed, 9 Jan 2019 14:47:07 +0800 Subject: [PATCH 4/4] Modify the constructor of the DbClientImpl class --- orm_lib/src/DbClientImpl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index 07a72a51..2213f7e9 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -44,7 +44,7 @@ using namespace drogon::orm; DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type) : _connInfo(connInfo), _connectNum(connNum), - _loops(connNum / 10 > 0 ? (connNum / 10 < drogon::app().getThreadNum() ? connNum / 10 : drogon::app().getThreadNum()) : 1) + _loops(connNum / 100 > 0 ? connNum / 100 : 1) { _type = type; LOG_TRACE << "type=" << (int)type;