Fix the error with multiple results when calling a procedure in mysql (#1091)

This commit is contained in:
An Tao 2021-11-28 16:26:23 +08:00 committed by GitHub
parent 6e6493299e
commit 7d137362bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 172 additions and 126 deletions

View File

@ -40,8 +40,7 @@ Result makeResult(
Result::SizeType affectedRows = 0,
unsigned long long insertId = 0)
{
return Result(std::shared_ptr<MysqlResultImpl>(
new MysqlResultImpl(r, affectedRows, insertId)));
return Result{std::make_shared<MysqlResultImpl>(r, affectedRows, insertId)};
}
} // namespace orm
@ -56,7 +55,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop,
}))
{
mysql_init(mysqlPtr_.get());
mysql_options(mysqlPtr_.get(), MYSQL_OPT_NONBLOCK, 0);
mysql_options(mysqlPtr_.get(), MYSQL_OPT_NONBLOCK, nullptr);
// Get the key and value
auto connParams = parseConnString(connInfo);
@ -99,12 +98,14 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop,
waitStatus_ =
mysql_real_connect_start(&ret,
mysqlPtr_.get(),
host_.empty() ? NULL : host_.c_str(),
user_.empty() ? NULL : user_.c_str(),
passwd_.empty() ? NULL : passwd_.c_str(),
dbname_.empty() ? NULL : dbname_.c_str(),
host_.empty() ? nullptr : host_.c_str(),
user_.empty() ? nullptr : user_.c_str(),
passwd_.empty() ? nullptr
: passwd_.c_str(),
dbname_.empty() ? nullptr
: dbname_.c_str(),
port_.empty() ? 3306 : atol(port_.c_str()),
NULL,
nullptr,
0);
// LOG_DEBUG << ret;
auto fd = mysql_get_socket(mysqlPtr_.get());
@ -115,8 +116,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop,
"limit. Please use the ulimit command to check.";
exit(1);
}
channelPtr_ =
std::unique_ptr<trantor::Channel>(new trantor::Channel(loop_, fd));
channelPtr_ = std::make_unique<trantor::Channel>(loop_, fd);
channelPtr_->setEventCallback([this]() { handleEvent(); });
setChannel();
});
@ -218,6 +218,78 @@ void MysqlConnection::handleTimeout()
{
}
}
void MysqlConnection::handleCmd(int status)
{
switch (execStatus_)
{
case ExecStatus::RealQuery:
{
int err = 0;
waitStatus_ = mysql_real_query_cont(&err, mysqlPtr_.get(), status);
LOG_TRACE << "real_query:" << waitStatus_;
if (waitStatus_ == 0)
{
if (err)
{
execStatus_ = ExecStatus::None;
LOG_ERROR << "error:" << err << " status:" << status;
outputError();
return;
}
startStoreResult(false);
}
setChannel();
break;
}
case ExecStatus::StoreResult:
{
MYSQL_RES *ret;
waitStatus_ =
mysql_store_result_cont(&ret, mysqlPtr_.get(), status);
LOG_TRACE << "store_result:" << waitStatus_;
if (waitStatus_ == 0)
{
if (!ret && mysql_errno(mysqlPtr_.get()))
{
execStatus_ = ExecStatus::None;
LOG_ERROR << "error";
outputError();
return;
}
getResult(ret);
}
setChannel();
break;
}
case ExecStatus::NextResult:
{
int err;
waitStatus_ = mysql_next_result_cont(&err, mysqlPtr_.get(), status);
if (waitStatus_ == 0)
{
if (err)
{
execStatus_ = ExecStatus::None;
LOG_ERROR << "error:" << err << " status:" << status;
outputError();
return;
}
startStoreResult(false);
}
setChannel();
break;
}
case ExecStatus::None:
{
// Connection closed!
if (waitStatus_ == 0)
handleClosed();
break;
}
default:
return;
}
}
void MysqlConnection::handleEvent()
{
int status = 0;
@ -263,73 +335,7 @@ void MysqlConnection::handleEvent()
}
else if (status_ == ConnectStatus::Ok)
{
switch (execStatus_)
{
case ExecStatus::RealQuery:
{
int err = 0;
waitStatus_ =
mysql_real_query_cont(&err, mysqlPtr_.get(), status);
LOG_TRACE << "real_query:" << waitStatus_;
if (waitStatus_ == 0)
{
if (err)
{
execStatus_ = ExecStatus::None;
LOG_ERROR << "error:" << err << " status:" << status;
outputError();
return;
}
execStatus_ = ExecStatus::StoreResult;
MYSQL_RES *ret;
waitStatus_ =
mysql_store_result_start(&ret, mysqlPtr_.get());
LOG_TRACE << "store_result_start:" << waitStatus_;
if (waitStatus_ == 0)
{
execStatus_ = ExecStatus::None;
if (!ret && mysql_errno(mysqlPtr_.get()))
{
LOG_ERROR << "error in: " << sql_;
outputError();
return;
}
getResult(ret);
}
}
setChannel();
break;
}
case ExecStatus::StoreResult:
{
MYSQL_RES *ret;
waitStatus_ =
mysql_store_result_cont(&ret, mysqlPtr_.get(), status);
LOG_TRACE << "store_result:" << waitStatus_;
if (waitStatus_ == 0)
{
if (!ret && mysql_errno(mysqlPtr_.get()))
{
execStatus_ = ExecStatus::None;
LOG_ERROR << "error";
outputError();
return;
}
getResult(ret);
}
setChannel();
break;
}
case ExecStatus::None:
{
// Connection closed!
if (waitStatus_ == 0)
handleClosed();
break;
}
default:
return;
}
handleCmd(status);
}
else if (status_ == ConnectStatus::SettingCharacterSet)
{
@ -415,7 +421,7 @@ void MysqlConnection::execSqlInLoop(
std::string::size_type seekPos = std::string::npos;
for (size_t i = 0; i < paraNum; ++i)
{
seekPos = sql.find("?", pos);
seekPos = sql.find('?', pos);
if (seekPos == std::string::npos)
{
auto sub = sql.substr(pos);
@ -481,47 +487,8 @@ void MysqlConnection::execSqlInLoop(
{
sql_ = std::string(sql.data(), sql.length());
}
LOG_TRACE << sql_;
int err;
// int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q,
// unsigned long length)
waitStatus_ = mysql_real_query_start(&err,
mysqlPtr_.get(),
sql_.c_str(),
sql_.length());
LOG_TRACE << "real_query:" << waitStatus_;
execStatus_ = ExecStatus::RealQuery;
if (waitStatus_ == 0)
{
if (err)
{
LOG_ERROR << "error";
loop_->queueInLoop(
[thisPtr = shared_from_this()] { thisPtr->outputError(); });
return;
}
MYSQL_RES *ret;
waitStatus_ = mysql_store_result_start(&ret, mysqlPtr_.get());
LOG_TRACE << "store_result:" << waitStatus_;
execStatus_ = ExecStatus::StoreResult;
if (waitStatus_ == 0)
{
execStatus_ = ExecStatus::None;
if (!ret && mysql_errno(mysqlPtr_.get()))
{
LOG_ERROR << "error in: " << sql_;
loop_->queueInLoop(
[thisPtr = shared_from_this()] { thisPtr->outputError(); });
return;
}
loop_->queueInLoop([thisPtr = shared_from_this(), ret] {
thisPtr->getResult(ret);
});
}
}
startQuery();
setChannel();
return;
}
void MysqlConnection::outputError()
@ -530,7 +497,7 @@ void MysqlConnection::outputError()
auto errorNo = mysql_errno(mysqlPtr_.get());
LOG_ERROR << "Error(" << errorNo << ") [" << mysql_sqlstate(mysqlPtr_.get())
<< "] \"" << mysql_error(mysqlPtr_.get()) << "\"";
LOG_ERROR << "sql:" << sql_;
if (isWorking_)
{
// TODO: exception type
@ -551,7 +518,63 @@ void MysqlConnection::outputError()
handleClosed();
}
}
void MysqlConnection::startQuery()
{
int err;
// int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q,
// unsigned long length)
waitStatus_ = mysql_real_query_start(&err,
mysqlPtr_.get(),
sql_.c_str(),
sql_.length());
LOG_TRACE << "real_query:" << waitStatus_;
execStatus_ = ExecStatus::RealQuery;
if (waitStatus_ == 0)
{
if (err)
{
LOG_ERROR << "error";
loop_->queueInLoop(
[thisPtr = shared_from_this()] { thisPtr->outputError(); });
return;
}
startStoreResult(true);
}
}
void MysqlConnection::startStoreResult(bool queueInLoop)
{
MYSQL_RES *ret;
execStatus_ = ExecStatus::StoreResult;
waitStatus_ = mysql_store_result_start(&ret, mysqlPtr_.get());
LOG_TRACE << "store_result:" << waitStatus_;
if (waitStatus_ == 0)
{
execStatus_ = ExecStatus::None;
if (!ret && mysql_errno(mysqlPtr_.get()))
{
if (queueInLoop)
{
loop_->queueInLoop(
[thisPtr = shared_from_this()] { thisPtr->outputError(); });
}
else
{
outputError();
}
return;
}
if (queueInLoop)
{
loop_->queueInLoop([thisPtr = shared_from_this(), ret] {
thisPtr->getResult(ret);
});
}
else
{
getResult(ret);
}
}
}
void MysqlConnection::getResult(MYSQL_RES *res)
{
auto resultPtr = std::shared_ptr<MYSQL_RES>(res, [](MYSQL_RES *r) {
@ -563,9 +586,29 @@ void MysqlConnection::getResult(MYSQL_RES *res)
if (isWorking_)
{
callback_(Result);
callback_ = nullptr;
exceptionCallback_ = nullptr;
isWorking_ = false;
idleCb_();
if (!mysql_more_results(mysqlPtr_.get()))
{
callback_ = nullptr;
exceptionCallback_ = nullptr;
isWorking_ = false;
idleCb_();
}
else
{
execStatus_ = ExecStatus::NextResult;
int err;
waitStatus_ = mysql_next_result_start(&err, mysqlPtr_.get());
if (waitStatus_ == 0)
{
if (err)
{
execStatus_ = ExecStatus::None;
LOG_ERROR << "error:" << err;
outputError();
return;
}
startStoreResult(false);
}
}
}
}

View File

@ -102,17 +102,20 @@ class MysqlConnection : public DbConnection,
std::shared_ptr<MYSQL> mysqlPtr_;
std::string characterSet_;
void handleTimeout();
void handleCmd(int status);
void handleClosed();
void handleEvent();
void setChannel();
void getResult(MYSQL_RES *res);
void startQuery();
void startStoreResult(bool queueInLoop);
int waitStatus_;
enum class ExecStatus
{
None = 0,
RealQuery,
StoreResult
StoreResult,
NextResult
};
ExecStatus execStatus_{ExecStatus::None};