Merge pull request #88 from an-tao/dev
Optimize the app.run() method and the DbClientLockFree class
This commit is contained in:
commit
40464da6b2
|
@ -44,7 +44,10 @@
|
|||
"user": "",
|
||||
//passwd: '' by default
|
||||
"passwd": "",
|
||||
//connection_number: 1 by default
|
||||
//is_fast: false by default, if it is true, the client is faster but user can't call
|
||||
//any synchronous interface of it.
|
||||
is_fast: false,
|
||||
//connection_number: 1 by default, valid only if is_fast is false.
|
||||
"connection_number": 1
|
||||
}
|
||||
],*/
|
||||
|
@ -136,8 +139,6 @@
|
|||
//idle_connection_timeout: Defaults to 60 seconds, the lifetime
|
||||
//of the connection without read or write
|
||||
"idle_connection_timeout": 60,
|
||||
//enable_fast_db_client: Defaults to false
|
||||
"enable_fast_db_client": false,
|
||||
//server_header_field: Set the 'server' header field in each response sent by drogon,
|
||||
//empty string by default with which the 'server' header field is set to "Server: drogon/version string\r\n"
|
||||
"server_header_field": "",
|
||||
|
|
|
@ -44,7 +44,10 @@
|
|||
"user": "",
|
||||
//passwd: '' by default
|
||||
"passwd": "",
|
||||
//connection_number: 1 by default
|
||||
//is_fast: false by default, if it is true, the client is faster but user can't call
|
||||
//any synchronous interface of it.
|
||||
is_fast: false,
|
||||
//connection_number: 1 by default, valid only if is_fast is false.
|
||||
"connection_number": 1
|
||||
}
|
||||
],*/
|
||||
|
@ -136,8 +139,6 @@
|
|||
//idle_connection_timeout: Defaults to 60 seconds, the lifetime
|
||||
//of the connection without read or write
|
||||
"idle_connection_timeout": 60,
|
||||
//enable_fast_db_client: Defaults to false
|
||||
"enable_fast_db_client": false,
|
||||
//server_header_field: Set the 'server' header field in each response sent by drogon,
|
||||
//empty string by default with which the 'server' header field is set to "Server: drogon/version string\r\n"
|
||||
"server_header_field": "",
|
||||
|
|
|
@ -71,9 +71,7 @@ class HttpAppFramework : public trantor::NonCopyable
|
|||
/**
|
||||
* Calling this method starts the IO event loops and the main loop of the application;
|
||||
* Usually, the thread that calls this method is the main thread of the application;
|
||||
* If the loop() method is called before this method, it must be called from the thread
|
||||
* that first calls the loop () method.
|
||||
* If all loop() calls are after this method, it can be called from any thread.
|
||||
* This method blocks the calling thread until the main loop exits.
|
||||
*/
|
||||
virtual void run() = 0;
|
||||
|
||||
|
@ -188,7 +186,6 @@ class HttpAppFramework : public trantor::NonCopyable
|
|||
virtual void setPipelineRequestsNumber(const size_t number) = 0;
|
||||
#if USE_ORM
|
||||
virtual orm::DbClientPtr getDbClient(const std::string &name = "default") = 0;
|
||||
virtual void enableFastDbClient() = 0;
|
||||
virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") = 0;
|
||||
virtual void createDbClient(const std::string &dbType,
|
||||
const std::string &host,
|
||||
|
@ -198,7 +195,8 @@ class HttpAppFramework : public trantor::NonCopyable
|
|||
const std::string &password,
|
||||
const size_t connectionNum = 1,
|
||||
const std::string &filename = "",
|
||||
const std::string &name = "default") = 0;
|
||||
const std::string &name = "default",
|
||||
const bool isFast = false) = 0;
|
||||
#endif
|
||||
|
||||
private:
|
||||
|
|
|
@ -228,14 +228,8 @@ static void loadApp(const Json::Value &app)
|
|||
//Kick off idle connections
|
||||
auto kickOffTimeout = app.get("idle_connection_timeout", 60).asUInt64();
|
||||
drogon::app().setIdleConnectionTimeout(kickOffTimeout);
|
||||
#if USE_ORM
|
||||
//Fast db client
|
||||
auto fastDbClient = app.get("enable_fast_db_client", false).asBool();
|
||||
if (fastDbClient)
|
||||
drogon::app().enableFastDbClient();
|
||||
#endif
|
||||
auto server = app.get("server_header_field", "").asString();
|
||||
if(!server.empty())
|
||||
if (!server.empty())
|
||||
drogon::app().setServerHeaderField(server);
|
||||
auto keepaliveReqs = app.get("keepalive_requests", 0).asUInt64();
|
||||
drogon::app().setKeepaliveRequestsNumber(keepaliveReqs);
|
||||
|
@ -263,7 +257,8 @@ static void loadDbClients(const Json::Value &dbClients)
|
|||
auto connNum = client.get("connection_number", 1).asUInt();
|
||||
auto name = client.get("name", "default").asString();
|
||||
auto filename = client.get("filename", "").asString();
|
||||
drogon::app().createDbClient(type, host, (u_short)port, dbname, user, password, connNum, filename, name);
|
||||
auto isFast = client.get("is_fast", false).asBool();
|
||||
drogon::app().createDbClient(type, host, (u_short)port, dbname, user, password, connNum, filename, name, isFast);
|
||||
}
|
||||
#else
|
||||
std::cout << "No database is supported by drogon, please install the database development library first." << std::endl;
|
||||
|
|
|
@ -266,14 +266,6 @@ void HttpAppFrameworkImpl::run()
|
|||
|
||||
_running = true;
|
||||
|
||||
#if USE_ORM
|
||||
//Create db clients
|
||||
for (auto const &fun : _dbFuncs)
|
||||
{
|
||||
fun();
|
||||
}
|
||||
#endif
|
||||
|
||||
if (!_libFilePaths.empty())
|
||||
{
|
||||
_sharedLibManagerPtr = std::unique_ptr<SharedLibManager>(new SharedLibManager(loop(), _libFilePaths));
|
||||
|
@ -288,7 +280,7 @@ void HttpAppFrameworkImpl::run()
|
|||
{
|
||||
LOG_TRACE << "thread num=" << _threadNum;
|
||||
auto loopThreadPtr = std::make_shared<EventLoopThread>("DrogonIoLoop");
|
||||
loopThreadPtr->run();
|
||||
//loopThreadPtr->run();
|
||||
loopThreads.push_back(loopThreadPtr);
|
||||
ioLoops.push_back(loopThreadPtr->getLoop());
|
||||
for (auto const &listener : _listeners)
|
||||
|
@ -328,7 +320,7 @@ void HttpAppFrameworkImpl::run()
|
|||
{
|
||||
LOG_TRACE << "thread num=" << _threadNum;
|
||||
auto loopThreadPtr = std::make_shared<EventLoopThread>("DrogonListeningLoop");
|
||||
loopThreadPtr->run();
|
||||
//loopThreadPtr->run();
|
||||
loopThreads.push_back(loopThreadPtr);
|
||||
auto ip = std::get<0>(listener);
|
||||
bool isIpv6 = ip.find(":") == std::string::npos ? false : true;
|
||||
|
@ -362,12 +354,12 @@ void HttpAppFrameworkImpl::run()
|
|||
serverPtr->kickoffIdleConnections(_idleConnectionTimeout);
|
||||
serverPtr->start();
|
||||
/// Use std::promise to ensure that IO loops have been created
|
||||
std::promise<int> pro;
|
||||
auto f = pro.get_future();
|
||||
serverPtr->getLoop()->runInLoop([&pro]() {
|
||||
pro.set_value(1);
|
||||
});
|
||||
f.get();
|
||||
// std::promise<int> pro;
|
||||
// auto f = pro.get_future();
|
||||
// serverPtr->getLoop()->runInLoop([&pro]() {
|
||||
// pro.set_value(1);
|
||||
// });
|
||||
// f.get();
|
||||
auto serverIoLoops = serverPtr->getIoLoops();
|
||||
for (auto serverIoLoop : serverIoLoops)
|
||||
{
|
||||
|
@ -378,9 +370,7 @@ void HttpAppFrameworkImpl::run()
|
|||
#endif
|
||||
|
||||
#if USE_ORM
|
||||
// Create fast db clients for every io loop
|
||||
if (_enableFastDbClient)
|
||||
createFastDbClient(ioLoops);
|
||||
createDbClients(ioLoops);
|
||||
#endif
|
||||
_httpCtrlsRouter.init(ioLoops);
|
||||
_httpSimpleCtrlsRouter.init(ioLoops);
|
||||
|
@ -414,22 +404,56 @@ void HttpAppFrameworkImpl::run()
|
|||
}
|
||||
}
|
||||
_responseCachingMap = std::unique_ptr<CacheMap<std::string, HttpResponsePtr>>(new CacheMap<std::string, HttpResponsePtr>(loop(), 1.0, 4, 50)); //Max timeout up to about 70 days;
|
||||
loop()->loop();
|
||||
|
||||
// Let listener event loops run when everything is ready.
|
||||
for (auto &loopTh : loopThreads)
|
||||
{
|
||||
loopTh->run();
|
||||
}
|
||||
_mainLoopThread.run();
|
||||
_mainLoopThread.wait();
|
||||
}
|
||||
#if USE_ORM
|
||||
void HttpAppFrameworkImpl::createFastDbClient(const std::vector<trantor::EventLoop *> &ioloops)
|
||||
void HttpAppFrameworkImpl::createDbClients(const std::vector<trantor::EventLoop *> &ioloops)
|
||||
{
|
||||
for (auto &iter : _dbClientsMap)
|
||||
assert(_dbClientsMap.empty());
|
||||
assert(_dbFastClientsMap.empty());
|
||||
for (auto &dbInfo : _dbInfos)
|
||||
{
|
||||
for (auto *loop : ioloops)
|
||||
if (dbInfo._isFast)
|
||||
{
|
||||
if (iter.second->type() == drogon::orm::ClientType::Sqlite3)
|
||||
for (auto *loop : ioloops)
|
||||
{
|
||||
_dbFastClientsMap[iter.first][loop] = iter.second;
|
||||
if (dbInfo._dbType == drogon::orm::ClientType::Sqlite3)
|
||||
{
|
||||
LOG_ERROR << "Sqlite3 don't support fast mode";
|
||||
abort();
|
||||
}
|
||||
if (dbInfo._dbType == drogon::orm::ClientType::PostgreSQL || dbInfo._dbType == drogon::orm::ClientType::Mysql)
|
||||
{
|
||||
_dbFastClientsMap[dbInfo._name][loop] = std::shared_ptr<drogon::orm::DbClient>(new drogon::orm::DbClientLockFree(dbInfo._connectionInfo, loop, dbInfo._dbType));
|
||||
}
|
||||
}
|
||||
if (iter.second->type() == drogon::orm::ClientType::PostgreSQL || iter.second->type() == drogon::orm::ClientType::Mysql)
|
||||
}
|
||||
else
|
||||
{
|
||||
if (dbInfo._dbType == drogon::orm::ClientType::PostgreSQL)
|
||||
{
|
||||
_dbFastClientsMap[iter.first][loop] = std::shared_ptr<drogon::orm::DbClient>(new drogon::orm::DbClientLockFree(iter.second->connectionInfo(), loop, iter.second->type()));
|
||||
#if USE_POSTGRESQL
|
||||
_dbClientsMap[dbInfo._name] = drogon::orm::DbClient::newPgClient(dbInfo._connectionInfo, dbInfo._connectionNumber);
|
||||
#endif
|
||||
}
|
||||
else if (dbInfo._dbType == drogon::orm::ClientType::Mysql)
|
||||
{
|
||||
#if USE_MYSQL
|
||||
_dbClientsMap[dbInfo._name] = drogon::orm::DbClient::newMysqlClient(dbInfo._connectionInfo, dbInfo._connectionNumber);
|
||||
#endif
|
||||
}
|
||||
else if (dbInfo._dbType == drogon::orm::ClientType::Sqlite3)
|
||||
{
|
||||
#if USE_SQLITE3
|
||||
_dbClientsMap[dbInfo._name] = drogon::orm::DbClient::newSqlite3Client(dbInfo._connectionInfo, dbInfo._connectionNumber);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -721,8 +745,7 @@ void HttpAppFrameworkImpl::onAsyncRequest(const HttpRequestImplPtr &req, std::fu
|
|||
|
||||
trantor::EventLoop *HttpAppFrameworkImpl::loop()
|
||||
{
|
||||
static trantor::EventLoop loop;
|
||||
return &loop;
|
||||
return _mainLoopThread.getLoop();
|
||||
}
|
||||
|
||||
HttpAppFramework &HttpAppFramework::instance()
|
||||
|
@ -737,10 +760,13 @@ HttpAppFramework::~HttpAppFramework()
|
|||
#if USE_ORM
|
||||
orm::DbClientPtr HttpAppFrameworkImpl::getDbClient(const std::string &name)
|
||||
{
|
||||
assert(_dbClientsMap.find(name) != _dbClientsMap.end());
|
||||
return _dbClientsMap[name];
|
||||
}
|
||||
orm::DbClientPtr HttpAppFrameworkImpl::getFastDbClient(const std::string &name)
|
||||
{
|
||||
assert(_dbFastClientsMap[name].find(trantor::EventLoop::getEventLoopOfCurrentThread()) !=
|
||||
_dbFastClientsMap[name].end());
|
||||
return _dbFastClientsMap[name][trantor::EventLoop::getEventLoopOfCurrentThread()];
|
||||
}
|
||||
void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
|
||||
|
@ -751,7 +777,8 @@ void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
|
|||
const std::string &password,
|
||||
const size_t connectionNum,
|
||||
const std::string &filename,
|
||||
const std::string &name)
|
||||
const std::string &name,
|
||||
const bool isFast)
|
||||
{
|
||||
assert(!_running);
|
||||
auto connStr = utils::formattedString("host=%s port=%u dbname=%s user=%s", host.c_str(), port, databaseName.c_str(), userName.c_str());
|
||||
|
@ -762,13 +789,17 @@ void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
|
|||
}
|
||||
std::string type = dbType;
|
||||
std::transform(type.begin(), type.end(), type.begin(), tolower);
|
||||
DbInfo info;
|
||||
info._connectionInfo = connStr;
|
||||
info._connectionNumber = connectionNum;
|
||||
info._isFast = isFast;
|
||||
info._name = name;
|
||||
|
||||
if (type == "postgresql")
|
||||
{
|
||||
#if USE_POSTGRESQL
|
||||
_dbFuncs.push_back([this, connStr, connectionNum, name]() {
|
||||
auto client = drogon::orm::DbClient::newPgClient(connStr, connectionNum);
|
||||
_dbClientsMap[name] = client;
|
||||
});
|
||||
info._dbType = orm::ClientType::PostgreSQL;
|
||||
_dbInfos.push_back(info);
|
||||
#else
|
||||
std::cout << "The PostgreSQL is not supported by drogon, please install the development library first." << std::endl;
|
||||
exit(1);
|
||||
|
@ -777,10 +808,8 @@ void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
|
|||
else if (type == "mysql")
|
||||
{
|
||||
#if USE_MYSQL
|
||||
_dbFuncs.push_back([this, connStr, connectionNum, name]() {
|
||||
auto client = drogon::orm::DbClient::newMysqlClient(connStr, connectionNum);
|
||||
_dbClientsMap[name] = client;
|
||||
});
|
||||
info._dbType = orm::ClientType::Mysql;
|
||||
_dbInfos.push_back(info);
|
||||
#else
|
||||
std::cout << "The Mysql is not supported by drogon, please install the development library first." << std::endl;
|
||||
exit(1);
|
||||
|
@ -790,10 +819,9 @@ void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
|
|||
{
|
||||
#if USE_SQLITE3
|
||||
std::string sqlite3ConnStr = "filename=" + filename;
|
||||
_dbFuncs.push_back([this, sqlite3ConnStr, connectionNum, name]() {
|
||||
auto client = drogon::orm::DbClient::newSqlite3Client(sqlite3ConnStr, connectionNum);
|
||||
_dbClientsMap[name] = client;
|
||||
});
|
||||
info._connectionInfo = sqlite3ConnStr;
|
||||
info._dbType = orm::ClientType::Sqlite3;
|
||||
_dbInfos.push_back(info);
|
||||
#else
|
||||
std::cout << "The Sqlite3 is not supported by drogon, please install the development library first." << std::endl;
|
||||
exit(1);
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <drogon/HttpSimpleController.h>
|
||||
#include <drogon/version.h>
|
||||
#include <trantor/net/EventLoop.h>
|
||||
#include <trantor/net/EventLoopThread.h>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
@ -94,7 +95,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework
|
|||
virtual void setPipelineRequestsNumber(const size_t number) override { _pipelineRequestsNumber = number; }
|
||||
size_t keepaliveRequestsNumber() const { return _keepaliveRequestsNumber; }
|
||||
size_t pipelineRequestsNumber() const { return _pipelineRequestsNumber; }
|
||||
|
||||
|
||||
virtual ~HttpAppFrameworkImpl() noexcept
|
||||
{
|
||||
//Destroy the following objects before _loop destruction
|
||||
|
@ -124,10 +125,6 @@ class HttpAppFrameworkImpl : public HttpAppFramework
|
|||
|
||||
#if USE_ORM
|
||||
virtual orm::DbClientPtr getDbClient(const std::string &name = "default") override;
|
||||
virtual void enableFastDbClient() override
|
||||
{
|
||||
_enableFastDbClient = true;
|
||||
}
|
||||
virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") override;
|
||||
virtual void createDbClient(const std::string &dbType,
|
||||
const std::string &host,
|
||||
|
@ -137,7 +134,8 @@ class HttpAppFrameworkImpl : public HttpAppFramework
|
|||
const std::string &password,
|
||||
const size_t connectionNum = 1,
|
||||
const std::string &filename = "",
|
||||
const std::string &name = "default") override;
|
||||
const std::string &name = "default",
|
||||
const bool isFast = false) override;
|
||||
#endif
|
||||
|
||||
inline static HttpAppFrameworkImpl &instance()
|
||||
|
@ -212,16 +210,24 @@ class HttpAppFrameworkImpl : public HttpAppFramework
|
|||
size_t _pipelineRequestsNumber = 0;
|
||||
bool _useSendfile = true;
|
||||
bool _useGzip = true;
|
||||
bool _enableFastDbClient = false;
|
||||
int _staticFilesCacheTime = 5;
|
||||
std::unordered_map<std::string, std::weak_ptr<HttpResponse>> _staticFilesCache;
|
||||
std::mutex _staticFilesCacheMutex;
|
||||
#if USE_ORM
|
||||
std::map<std::string, orm::DbClientPtr> _dbClientsMap;
|
||||
std::vector<std::function<void()>> _dbFuncs;
|
||||
struct DbInfo
|
||||
{
|
||||
std::string _name;
|
||||
std::string _connectionInfo;
|
||||
orm::ClientType _dbType;
|
||||
bool _isFast;
|
||||
size_t _connectionNumber;
|
||||
};
|
||||
std::vector<DbInfo> _dbInfos;
|
||||
std::map<std::string, std::map<trantor::EventLoop *, orm::DbClientPtr>> _dbFastClientsMap;
|
||||
void createFastDbClient(const std::vector<trantor::EventLoop *> &ioloops);
|
||||
void createDbClients(const std::vector<trantor::EventLoop *> &ioloops);
|
||||
#endif
|
||||
trantor::EventLoopThread _mainLoopThread;
|
||||
};
|
||||
|
||||
} // namespace drogon
|
||||
|
|
|
@ -54,14 +54,14 @@ DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLo
|
|||
}
|
||||
else if (type == ClientType::Mysql)
|
||||
{
|
||||
_loop->runInLoop([this]() {
|
||||
for (size_t i = 0; i < _connectionNum; i++)
|
||||
for (size_t i = 0; i < _connectionNum; i++)
|
||||
_loop->runAfter(0.1 * (i + 1), [this]() {
|
||||
_connectionHolders.push_back(newConnection());
|
||||
});
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_ERROR << "No supported database type!";
|
||||
LOG_ERROR << "No supported database type:" << (int)type;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,13 +131,13 @@ void DbClientLockFree::execSql(std::string &&sql,
|
|||
}
|
||||
|
||||
//LOG_TRACE << "Push query to buffer";
|
||||
_sqlCmdBuffer.emplace_back(std::move(sql),
|
||||
paraNum,
|
||||
std::move(parameters),
|
||||
std::move(length),
|
||||
std::move(format),
|
||||
std::move(rcb),
|
||||
std::move(exceptCallback));
|
||||
_sqlCmdBuffer.emplace_back(std::make_shared<SqlCmd>(std::move(sql),
|
||||
paraNum,
|
||||
std::move(parameters),
|
||||
std::move(length),
|
||||
std::move(format),
|
||||
std::move(rcb),
|
||||
std::move(exceptCallback)));
|
||||
}
|
||||
|
||||
std::shared_ptr<Transaction> DbClientLockFree::newTransaction(const std::function<void(bool)> &commitCallback)
|
||||
|
@ -228,15 +228,15 @@ void DbClientLockFree::handleNewTask(const DbConnectionPtr &conn)
|
|||
|
||||
if (!_sqlCmdBuffer.empty())
|
||||
{
|
||||
auto &cmd = _sqlCmdBuffer.front();
|
||||
conn->execSql(std::move(cmd._sql),
|
||||
cmd._paraNum,
|
||||
std::move(cmd._parameters),
|
||||
std::move(cmd._length),
|
||||
std::move(cmd._format),
|
||||
std::move(cmd._cb),
|
||||
std::move(cmd._exceptCb));
|
||||
auto cmdPtr = std::move(_sqlCmdBuffer.front());
|
||||
_sqlCmdBuffer.pop_front();
|
||||
conn->execSql(std::move(cmdPtr->_sql),
|
||||
cmdPtr->_paraNum,
|
||||
std::move(cmdPtr->_parameters),
|
||||
std::move(cmdPtr->_length),
|
||||
std::move(cmdPtr->_format),
|
||||
std::move(cmdPtr->_cb),
|
||||
std::move(cmdPtr->_exceptCb));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,7 +79,8 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this<Db
|
|||
{
|
||||
}
|
||||
};
|
||||
std::deque<SqlCmd> _sqlCmdBuffer;
|
||||
|
||||
std::deque<std::shared_ptr<SqlCmd>> _sqlCmdBuffer;
|
||||
|
||||
std::queue<std::function<void(const std::shared_ptr<Transaction> &)>> _transCallbacks;
|
||||
|
||||
|
|
|
@ -298,13 +298,13 @@ void MysqlConnection::handleEvent()
|
|||
}
|
||||
}
|
||||
|
||||
void MysqlConnection::execSql(std::string &&sql,
|
||||
size_t paraNum,
|
||||
std::vector<const char *> &¶meters,
|
||||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback)
|
||||
void MysqlConnection::execSqlInLoop(std::string &&sql,
|
||||
size_t paraNum,
|
||||
std::vector<const char *> &¶meters,
|
||||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback)
|
||||
{
|
||||
LOG_TRACE << sql;
|
||||
assert(paraNum == parameters.size());
|
||||
|
@ -377,39 +377,37 @@ void MysqlConnection::execSql(std::string &&sql,
|
|||
_sql = sql;
|
||||
}
|
||||
LOG_TRACE << _sql;
|
||||
_loop->runInLoop([=]() {
|
||||
int err;
|
||||
//int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length)
|
||||
_waitStatus = mysql_real_query_start(&err, _mysqlPtr.get(), _sql.c_str(), _sql.length());
|
||||
LOG_TRACE << "real_query:" << _waitStatus;
|
||||
_execStatus = ExecStatus_RealQuery;
|
||||
int err;
|
||||
//int mysql_real_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length)
|
||||
_waitStatus = mysql_real_query_start(&err, _mysqlPtr.get(), _sql.c_str(), _sql.length());
|
||||
LOG_TRACE << "real_query:" << _waitStatus;
|
||||
_execStatus = ExecStatus_RealQuery;
|
||||
if (_waitStatus == 0)
|
||||
{
|
||||
if (err)
|
||||
{
|
||||
LOG_ERROR << "error";
|
||||
outputError();
|
||||
return;
|
||||
}
|
||||
|
||||
MYSQL_RES *ret;
|
||||
_waitStatus = mysql_store_result_start(&ret, _mysqlPtr.get());
|
||||
LOG_TRACE << "store_result:" << _waitStatus;
|
||||
_execStatus = ExecStatus_StoreResult;
|
||||
if (_waitStatus == 0)
|
||||
{
|
||||
if (err)
|
||||
_execStatus = ExecStatus_None;
|
||||
if (!ret)
|
||||
{
|
||||
LOG_ERROR << "error";
|
||||
outputError();
|
||||
return;
|
||||
}
|
||||
|
||||
MYSQL_RES *ret;
|
||||
_waitStatus = mysql_store_result_start(&ret, _mysqlPtr.get());
|
||||
LOG_TRACE << "store_result:" << _waitStatus;
|
||||
_execStatus = ExecStatus_StoreResult;
|
||||
if (_waitStatus == 0)
|
||||
{
|
||||
_execStatus = ExecStatus_None;
|
||||
if (!ret)
|
||||
{
|
||||
LOG_ERROR << "error";
|
||||
outputError();
|
||||
return;
|
||||
}
|
||||
getResult(ret);
|
||||
}
|
||||
getResult(ret);
|
||||
}
|
||||
setChannel();
|
||||
});
|
||||
}
|
||||
setChannel();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,10 +43,50 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
|
|||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback) override
|
||||
{
|
||||
if (_loop->isInLoopThread())
|
||||
{
|
||||
execSqlInLoop(std::move(sql),
|
||||
paraNum,
|
||||
std::move(parameters),
|
||||
std::move(length),
|
||||
std::move(format),
|
||||
std::move(rcb),
|
||||
std::move(exceptCallback));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto thisPtr = shared_from_this();
|
||||
_loop->queueInLoop([thisPtr,
|
||||
sql = std::move(sql),
|
||||
paraNum,
|
||||
parameters = std::move(parameters),
|
||||
length = std::move(length),
|
||||
format = std::move(format),
|
||||
rcb = std::move(rcb),
|
||||
exceptCallback = std::move(exceptCallback)]() mutable {
|
||||
thisPtr->execSqlInLoop(std::move(sql),
|
||||
paraNum,
|
||||
std::move(parameters),
|
||||
std::move(length),
|
||||
std::move(format),
|
||||
std::move(rcb),
|
||||
std::move(exceptCallback));
|
||||
});
|
||||
}
|
||||
}
|
||||
virtual void disconnect() override;
|
||||
|
||||
private:
|
||||
void execSqlInLoop(std::string &&sql,
|
||||
size_t paraNum,
|
||||
std::vector<const char *> &¶meters,
|
||||
std::vector<int> &&length,
|
||||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback);
|
||||
|
||||
std::unique_ptr<trantor::Channel> _channelPtr;
|
||||
std::shared_ptr<MYSQL> _mysqlPtr;
|
||||
|
||||
|
|
Loading…
Reference in New Issue