From 6adff3469c7e838aa6d75a60e3ce4a5edec14d47 Mon Sep 17 00:00:00 2001 From: antao Date: Sat, 23 Mar 2019 15:32:34 +0800 Subject: [PATCH] Modify mysql client --- orm_lib/src/DbClientLockFree.cc | 9 +++- orm_lib/src/mysql_impl/MysqlConnection.cc | 62 +++++++++++------------ orm_lib/src/mysql_impl/MysqlConnection.h | 42 ++++++++++++++- 3 files changed, 79 insertions(+), 34 deletions(-) diff --git a/orm_lib/src/DbClientLockFree.cc b/orm_lib/src/DbClientLockFree.cc index d398e395..35cfd14f 100644 --- a/orm_lib/src/DbClientLockFree.cc +++ b/orm_lib/src/DbClientLockFree.cc @@ -45,13 +45,20 @@ DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLo { _type = type; LOG_TRACE << "type=" << (int)type; - if (type == ClientType::PostgreSQL || type == ClientType::Mysql) + if (type == ClientType::PostgreSQL) { _loop->runInLoop([this]() { for (size_t i = 0; i < _connectionNum; i++) _connectionHolders.push_back(newConnection()); }); } + else if (type == ClientType::Mysql) + { + for (size_t i = 0; i < _connectionNum; i++) + _loop->runAfter(0.1 * (i + 1), [this]() { + _connectionHolders.push_back(newConnection()); + }); + } else { LOG_ERROR << "No supported database type:" << (int)type; diff --git a/orm_lib/src/mysql_impl/MysqlConnection.cc b/orm_lib/src/mysql_impl/MysqlConnection.cc index d370a954..ef2715b9 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.cc +++ b/orm_lib/src/mysql_impl/MysqlConnection.cc @@ -298,13 +298,13 @@ void MysqlConnection::handleEvent() } } -void MysqlConnection::execSql(std::string &&sql, - size_t paraNum, - std::vector &¶meters, - std::vector &&length, - std::vector &&format, - ResultCallback &&rcb, - std::function &&exceptCallback) +void MysqlConnection::execSqlInLoop(std::string &&sql, + size_t paraNum, + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback) { LOG_TRACE << sql; assert(paraNum == parameters.size()); @@ -377,39 +377,37 @@ void MysqlConnection::execSql(std::string &&sql, _sql = sql; } LOG_TRACE << _sql; - _loop->runInLoop([=]() { - int err; - //int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length) - _waitStatus = mysql_real_query_start(&err, _mysqlPtr.get(), _sql.c_str(), _sql.length()); - LOG_TRACE << "real_query:" << _waitStatus; - _execStatus = ExecStatus_RealQuery; + int err; + //int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length) + _waitStatus = mysql_real_query_start(&err, _mysqlPtr.get(), _sql.c_str(), _sql.length()); + LOG_TRACE << "real_query:" << _waitStatus; + _execStatus = ExecStatus_RealQuery; + if (_waitStatus == 0) + { + if (err) + { + LOG_ERROR << "error"; + outputError(); + return; + } + + MYSQL_RES *ret; + _waitStatus = mysql_store_result_start(&ret, _mysqlPtr.get()); + LOG_TRACE << "store_result:" << _waitStatus; + _execStatus = ExecStatus_StoreResult; if (_waitStatus == 0) { - if (err) + _execStatus = ExecStatus_None; + if (!ret) { LOG_ERROR << "error"; outputError(); return; } - - MYSQL_RES *ret; - _waitStatus = mysql_store_result_start(&ret, _mysqlPtr.get()); - LOG_TRACE << "store_result:" << _waitStatus; - _execStatus = ExecStatus_StoreResult; - if (_waitStatus == 0) - { - _execStatus = ExecStatus_None; - if (!ret) - { - LOG_ERROR << "error"; - outputError(); - return; - } - getResult(ret); - } + getResult(ret); } - setChannel(); - }); + } + setChannel(); return; } diff --git a/orm_lib/src/mysql_impl/MysqlConnection.h b/orm_lib/src/mysql_impl/MysqlConnection.h index 4a2034f5..6846d40a 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.h +++ b/orm_lib/src/mysql_impl/MysqlConnection.h @@ -43,10 +43,50 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this std::vector &&length, std::vector &&format, ResultCallback &&rcb, - std::function &&exceptCallback) override; + std::function &&exceptCallback) override + { + if (_loop->isInLoopThread()) + { + execSqlInLoop(std::move(sql), + paraNum, + std::move(parameters), + std::move(length), + std::move(format), + std::move(rcb), + std::move(exceptCallback)); + } + else + { + auto thisPtr = shared_from_this(); + _loop->queueInLoop([thisPtr, + sql = std::move(sql), + paraNum, + parameters = std::move(parameters), + length = std::move(length), + format = std::move(format), + rcb = std::move(rcb), + exceptCallback = std::move(exceptCallback)]() mutable { + thisPtr->execSqlInLoop(std::move(sql), + paraNum, + std::move(parameters), + std::move(length), + std::move(format), + std::move(rcb), + std::move(exceptCallback)); + }); + } + } virtual void disconnect() override; private: + void execSqlInLoop(std::string &&sql, + size_t paraNum, + std::vector &¶meters, + std::vector &&length, + std::vector &&format, + ResultCallback &&rcb, + std::function &&exceptCallback); + std::unique_ptr _channelPtr; std::shared_ptr _mysqlPtr;