Modify mysql client
This commit is contained in:
parent
894b634b22
commit
6adff3469c
|
@ -45,13 +45,20 @@ DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLo
|
|||
{
|
||||
_type = type;
|
||||
LOG_TRACE << "type=" << (int)type;
|
||||
if (type == ClientType::PostgreSQL || type == ClientType::Mysql)
|
||||
if (type == ClientType::PostgreSQL)
|
||||
{
|
||||
_loop->runInLoop([this]() {
|
||||
for (size_t i = 0; i < _connectionNum; i++)
|
||||
_connectionHolders.push_back(newConnection());
|
||||
});
|
||||
}
|
||||
else if (type == ClientType::Mysql)
|
||||
{
|
||||
for (size_t i = 0; i < _connectionNum; i++)
|
||||
_loop->runAfter(0.1 * (i + 1), [this]() {
|
||||
_connectionHolders.push_back(newConnection());
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_ERROR << "No supported database type:" << (int)type;
|
||||
|
|
|
@ -298,13 +298,13 @@ void MysqlConnection::handleEvent()
|
|||
}
|
||||
}
|
||||
|
||||
void MysqlConnection::execSql(std::string &&sql,
|
||||
size_t paraNum,
|
||||
std::vector<const char *> &¶meters,
|
||||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback)
|
||||
void MysqlConnection::execSqlInLoop(std::string &&sql,
|
||||
size_t paraNum,
|
||||
std::vector<const char *> &¶meters,
|
||||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback)
|
||||
{
|
||||
LOG_TRACE << sql;
|
||||
assert(paraNum == parameters.size());
|
||||
|
@ -377,39 +377,37 @@ void MysqlConnection::execSql(std::string &&sql,
|
|||
_sql = sql;
|
||||
}
|
||||
LOG_TRACE << _sql;
|
||||
_loop->runInLoop([=]() {
|
||||
int err;
|
||||
//int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length)
|
||||
_waitStatus = mysql_real_query_start(&err, _mysqlPtr.get(), _sql.c_str(), _sql.length());
|
||||
LOG_TRACE << "real_query:" << _waitStatus;
|
||||
_execStatus = ExecStatus_RealQuery;
|
||||
int err;
|
||||
//int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length)
|
||||
_waitStatus = mysql_real_query_start(&err, _mysqlPtr.get(), _sql.c_str(), _sql.length());
|
||||
LOG_TRACE << "real_query:" << _waitStatus;
|
||||
_execStatus = ExecStatus_RealQuery;
|
||||
if (_waitStatus == 0)
|
||||
{
|
||||
if (err)
|
||||
{
|
||||
LOG_ERROR << "error";
|
||||
outputError();
|
||||
return;
|
||||
}
|
||||
|
||||
MYSQL_RES *ret;
|
||||
_waitStatus = mysql_store_result_start(&ret, _mysqlPtr.get());
|
||||
LOG_TRACE << "store_result:" << _waitStatus;
|
||||
_execStatus = ExecStatus_StoreResult;
|
||||
if (_waitStatus == 0)
|
||||
{
|
||||
if (err)
|
||||
_execStatus = ExecStatus_None;
|
||||
if (!ret)
|
||||
{
|
||||
LOG_ERROR << "error";
|
||||
outputError();
|
||||
return;
|
||||
}
|
||||
|
||||
MYSQL_RES *ret;
|
||||
_waitStatus = mysql_store_result_start(&ret, _mysqlPtr.get());
|
||||
LOG_TRACE << "store_result:" << _waitStatus;
|
||||
_execStatus = ExecStatus_StoreResult;
|
||||
if (_waitStatus == 0)
|
||||
{
|
||||
_execStatus = ExecStatus_None;
|
||||
if (!ret)
|
||||
{
|
||||
LOG_ERROR << "error";
|
||||
outputError();
|
||||
return;
|
||||
}
|
||||
getResult(ret);
|
||||
}
|
||||
getResult(ret);
|
||||
}
|
||||
setChannel();
|
||||
});
|
||||
}
|
||||
setChannel();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,10 +43,50 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
|
|||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback) override
|
||||
{
|
||||
if (_loop->isInLoopThread())
|
||||
{
|
||||
execSqlInLoop(std::move(sql),
|
||||
paraNum,
|
||||
std::move(parameters),
|
||||
std::move(length),
|
||||
std::move(format),
|
||||
std::move(rcb),
|
||||
std::move(exceptCallback));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto thisPtr = shared_from_this();
|
||||
_loop->queueInLoop([thisPtr,
|
||||
sql = std::move(sql),
|
||||
paraNum,
|
||||
parameters = std::move(parameters),
|
||||
length = std::move(length),
|
||||
format = std::move(format),
|
||||
rcb = std::move(rcb),
|
||||
exceptCallback = std::move(exceptCallback)]() mutable {
|
||||
thisPtr->execSqlInLoop(std::move(sql),
|
||||
paraNum,
|
||||
std::move(parameters),
|
||||
std::move(length),
|
||||
std::move(format),
|
||||
std::move(rcb),
|
||||
std::move(exceptCallback));
|
||||
});
|
||||
}
|
||||
}
|
||||
virtual void disconnect() override;
|
||||
|
||||
private:
|
||||
void execSqlInLoop(std::string &&sql,
|
||||
size_t paraNum,
|
||||
std::vector<const char *> &¶meters,
|
||||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback);
|
||||
|
||||
std::unique_ptr<trantor::Channel> _channelPtr;
|
||||
std::shared_ptr<MYSQL> _mysqlPtr;
|
||||
|
||||
|
|
Loading…
Reference in New Issue