From 7d137362bdce17266e2fdfb1be85658620f19881 Mon Sep 17 00:00:00 2001 From: An Tao Date: Sun, 28 Nov 2021 16:26:23 +0800 Subject: [PATCH] Fix the error with multiple results when calling a procedure in mysql (#1091) --- orm_lib/src/mysql_impl/MysqlConnection.cc | 291 +++++++++++++--------- orm_lib/src/mysql_impl/MysqlConnection.h | 7 +- 2 files changed, 172 insertions(+), 126 deletions(-) diff --git a/orm_lib/src/mysql_impl/MysqlConnection.cc b/orm_lib/src/mysql_impl/MysqlConnection.cc index af0f768c..4b1b9d5b 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.cc +++ b/orm_lib/src/mysql_impl/MysqlConnection.cc @@ -40,8 +40,7 @@ Result makeResult( Result::SizeType affectedRows = 0, unsigned long long insertId = 0) { - return Result(std::shared_ptr( - new MysqlResultImpl(r, affectedRows, insertId))); + return Result{std::make_shared(r, affectedRows, insertId)}; } } // namespace orm @@ -56,7 +55,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, })) { mysql_init(mysqlPtr_.get()); - mysql_options(mysqlPtr_.get(), MYSQL_OPT_NONBLOCK, 0); + mysql_options(mysqlPtr_.get(), MYSQL_OPT_NONBLOCK, nullptr); // Get the key and value auto connParams = parseConnString(connInfo); @@ -99,12 +98,14 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, waitStatus_ = mysql_real_connect_start(&ret, mysqlPtr_.get(), - host_.empty() ? NULL : host_.c_str(), - user_.empty() ? NULL : user_.c_str(), - passwd_.empty() ? NULL : passwd_.c_str(), - dbname_.empty() ? NULL : dbname_.c_str(), + host_.empty() ? nullptr : host_.c_str(), + user_.empty() ? nullptr : user_.c_str(), + passwd_.empty() ? nullptr + : passwd_.c_str(), + dbname_.empty() ? nullptr + : dbname_.c_str(), port_.empty() ? 3306 : atol(port_.c_str()), - NULL, + nullptr, 0); // LOG_DEBUG << ret; auto fd = mysql_get_socket(mysqlPtr_.get()); @@ -115,8 +116,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, "limit. Please use the ulimit command to check."; exit(1); } - channelPtr_ = - std::unique_ptr(new trantor::Channel(loop_, fd)); + channelPtr_ = std::make_unique(loop_, fd); channelPtr_->setEventCallback([this]() { handleEvent(); }); setChannel(); }); @@ -218,6 +218,78 @@ void MysqlConnection::handleTimeout() { } } +void MysqlConnection::handleCmd(int status) +{ + switch (execStatus_) + { + case ExecStatus::RealQuery: + { + int err = 0; + waitStatus_ = mysql_real_query_cont(&err, mysqlPtr_.get(), status); + LOG_TRACE << "real_query:" << waitStatus_; + if (waitStatus_ == 0) + { + if (err) + { + execStatus_ = ExecStatus::None; + LOG_ERROR << "error:" << err << " status:" << status; + outputError(); + return; + } + startStoreResult(false); + } + setChannel(); + break; + } + case ExecStatus::StoreResult: + { + MYSQL_RES *ret; + waitStatus_ = + mysql_store_result_cont(&ret, mysqlPtr_.get(), status); + LOG_TRACE << "store_result:" << waitStatus_; + if (waitStatus_ == 0) + { + if (!ret && mysql_errno(mysqlPtr_.get())) + { + execStatus_ = ExecStatus::None; + LOG_ERROR << "error"; + outputError(); + return; + } + getResult(ret); + } + setChannel(); + break; + } + case ExecStatus::NextResult: + { + int err; + waitStatus_ = mysql_next_result_cont(&err, mysqlPtr_.get(), status); + if (waitStatus_ == 0) + { + if (err) + { + execStatus_ = ExecStatus::None; + LOG_ERROR << "error:" << err << " status:" << status; + outputError(); + return; + } + startStoreResult(false); + } + setChannel(); + break; + } + case ExecStatus::None: + { + // Connection closed! + if (waitStatus_ == 0) + handleClosed(); + break; + } + default: + return; + } +} void MysqlConnection::handleEvent() { int status = 0; @@ -263,73 +335,7 @@ void MysqlConnection::handleEvent() } else if (status_ == ConnectStatus::Ok) { - switch (execStatus_) - { - case ExecStatus::RealQuery: - { - int err = 0; - waitStatus_ = - mysql_real_query_cont(&err, mysqlPtr_.get(), status); - LOG_TRACE << "real_query:" << waitStatus_; - if (waitStatus_ == 0) - { - if (err) - { - execStatus_ = ExecStatus::None; - LOG_ERROR << "error:" << err << " status:" << status; - outputError(); - return; - } - execStatus_ = ExecStatus::StoreResult; - MYSQL_RES *ret; - waitStatus_ = - mysql_store_result_start(&ret, mysqlPtr_.get()); - LOG_TRACE << "store_result_start:" << waitStatus_; - if (waitStatus_ == 0) - { - execStatus_ = ExecStatus::None; - if (!ret && mysql_errno(mysqlPtr_.get())) - { - LOG_ERROR << "error in: " << sql_; - outputError(); - return; - } - getResult(ret); - } - } - setChannel(); - break; - } - case ExecStatus::StoreResult: - { - MYSQL_RES *ret; - waitStatus_ = - mysql_store_result_cont(&ret, mysqlPtr_.get(), status); - LOG_TRACE << "store_result:" << waitStatus_; - if (waitStatus_ == 0) - { - if (!ret && mysql_errno(mysqlPtr_.get())) - { - execStatus_ = ExecStatus::None; - LOG_ERROR << "error"; - outputError(); - return; - } - getResult(ret); - } - setChannel(); - break; - } - case ExecStatus::None: - { - // Connection closed! - if (waitStatus_ == 0) - handleClosed(); - break; - } - default: - return; - } + handleCmd(status); } else if (status_ == ConnectStatus::SettingCharacterSet) { @@ -415,7 +421,7 @@ void MysqlConnection::execSqlInLoop( std::string::size_type seekPos = std::string::npos; for (size_t i = 0; i < paraNum; ++i) { - seekPos = sql.find("?", pos); + seekPos = sql.find('?', pos); if (seekPos == std::string::npos) { auto sub = sql.substr(pos); @@ -481,47 +487,8 @@ void MysqlConnection::execSqlInLoop( { sql_ = std::string(sql.data(), sql.length()); } - LOG_TRACE << sql_; - int err; - // int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, - // unsigned long length) - waitStatus_ = mysql_real_query_start(&err, - mysqlPtr_.get(), - sql_.c_str(), - sql_.length()); - LOG_TRACE << "real_query:" << waitStatus_; - execStatus_ = ExecStatus::RealQuery; - if (waitStatus_ == 0) - { - if (err) - { - LOG_ERROR << "error"; - loop_->queueInLoop( - [thisPtr = shared_from_this()] { thisPtr->outputError(); }); - return; - } - - MYSQL_RES *ret; - waitStatus_ = mysql_store_result_start(&ret, mysqlPtr_.get()); - LOG_TRACE << "store_result:" << waitStatus_; - execStatus_ = ExecStatus::StoreResult; - if (waitStatus_ == 0) - { - execStatus_ = ExecStatus::None; - if (!ret && mysql_errno(mysqlPtr_.get())) - { - LOG_ERROR << "error in: " << sql_; - loop_->queueInLoop( - [thisPtr = shared_from_this()] { thisPtr->outputError(); }); - return; - } - loop_->queueInLoop([thisPtr = shared_from_this(), ret] { - thisPtr->getResult(ret); - }); - } - } + startQuery(); setChannel(); - return; } void MysqlConnection::outputError() @@ -530,7 +497,7 @@ void MysqlConnection::outputError() auto errorNo = mysql_errno(mysqlPtr_.get()); LOG_ERROR << "Error(" << errorNo << ") [" << mysql_sqlstate(mysqlPtr_.get()) << "] \"" << mysql_error(mysqlPtr_.get()) << "\""; - + LOG_ERROR << "sql:" << sql_; if (isWorking_) { // TODO: exception type @@ -551,7 +518,63 @@ void MysqlConnection::outputError() handleClosed(); } } - +void MysqlConnection::startQuery() +{ + int err; + // int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, + // unsigned long length) + waitStatus_ = mysql_real_query_start(&err, + mysqlPtr_.get(), + sql_.c_str(), + sql_.length()); + LOG_TRACE << "real_query:" << waitStatus_; + execStatus_ = ExecStatus::RealQuery; + if (waitStatus_ == 0) + { + if (err) + { + LOG_ERROR << "error"; + loop_->queueInLoop( + [thisPtr = shared_from_this()] { thisPtr->outputError(); }); + return; + } + startStoreResult(true); + } +} +void MysqlConnection::startStoreResult(bool queueInLoop) +{ + MYSQL_RES *ret; + execStatus_ = ExecStatus::StoreResult; + waitStatus_ = mysql_store_result_start(&ret, mysqlPtr_.get()); + LOG_TRACE << "store_result:" << waitStatus_; + if (waitStatus_ == 0) + { + execStatus_ = ExecStatus::None; + if (!ret && mysql_errno(mysqlPtr_.get())) + { + if (queueInLoop) + { + loop_->queueInLoop( + [thisPtr = shared_from_this()] { thisPtr->outputError(); }); + } + else + { + outputError(); + } + return; + } + if (queueInLoop) + { + loop_->queueInLoop([thisPtr = shared_from_this(), ret] { + thisPtr->getResult(ret); + }); + } + else + { + getResult(ret); + } + } +} void MysqlConnection::getResult(MYSQL_RES *res) { auto resultPtr = std::shared_ptr(res, [](MYSQL_RES *r) { @@ -563,9 +586,29 @@ void MysqlConnection::getResult(MYSQL_RES *res) if (isWorking_) { callback_(Result); - callback_ = nullptr; - exceptionCallback_ = nullptr; - isWorking_ = false; - idleCb_(); + if (!mysql_more_results(mysqlPtr_.get())) + { + callback_ = nullptr; + exceptionCallback_ = nullptr; + isWorking_ = false; + idleCb_(); + } + else + { + execStatus_ = ExecStatus::NextResult; + int err; + waitStatus_ = mysql_next_result_start(&err, mysqlPtr_.get()); + if (waitStatus_ == 0) + { + if (err) + { + execStatus_ = ExecStatus::None; + LOG_ERROR << "error:" << err; + outputError(); + return; + } + startStoreResult(false); + } + } } } diff --git a/orm_lib/src/mysql_impl/MysqlConnection.h b/orm_lib/src/mysql_impl/MysqlConnection.h index a870bf29..a4907887 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.h +++ b/orm_lib/src/mysql_impl/MysqlConnection.h @@ -102,17 +102,20 @@ class MysqlConnection : public DbConnection, std::shared_ptr mysqlPtr_; std::string characterSet_; void handleTimeout(); - + void handleCmd(int status); void handleClosed(); void handleEvent(); void setChannel(); void getResult(MYSQL_RES *res); + void startQuery(); + void startStoreResult(bool queueInLoop); int waitStatus_; enum class ExecStatus { None = 0, RealQuery, - StoreResult + StoreResult, + NextResult }; ExecStatus execStatus_{ExecStatus::None};