Send query

This commit is contained in:
antao 2018-11-28 10:48:03 +08:00
parent a661d01755
commit d92ce4cf9f
3 changed files with 133 additions and 0 deletions

View File

@ -18,6 +18,18 @@
#include <poll.h>
using namespace drogon::orm;
namespace drogon
{
namespace orm
{
Result makeResult(const std::shared_ptr<MYSQL_RES> &r = std::shared_ptr<MYSQL_RES>(nullptr), const std::string &query = "")
{
return Result(std::shared_ptr<MysqlResultImpl>(new MysqlResultImpl(r, query)));
}
} // namespace orm
} // namespace drogon
MysqlConnection::MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo)
: DbConnection(loop)
@ -210,7 +222,60 @@ void MysqlConnection::handleEvent()
setChannel();
break;
}
case ExecStatus_SendQuery:
{
int err;
_waitStatus = mysql_send_query_cont(&err, _stmtPtr.get(), status);
if (_waitStatus == 0)
{
if (err)
{
_execStatus = ExecStatus_None;
outputError();
//FIXME exception callback;
return;
}
_execStatus = ExecStatus_StoreResult;
MYSQL_RES *ret;
_waitStatus = mysql_store_result_start(&ret, MYSQL * mysql);
LOG_TRACE
<< "send_query completely!";
}
setChannel();
break;
}
case ExecStatus_StoreResult:
{
MYSQL_RES *ret;
_waitStatus = mysql_store_result_cont(&ret, MYSQL * mysql);
if (_waitStatus == 0)
{
if(!ret)
{
_execStatus = ExecStatus_None;
outputError();
//FIXME exception callback;
return;
}
//make result!
auto resultPtr = std::shared_ptr<MYSQL_RES>(ret, [](MYSQL_RES *r) {
mysql_free_result(r);
});
auto Result = makeResult(r, _sql);
if(_isWorking)
{
_cb(Result);
_cb = decltype(_cb)();
_exceptCb = decltype(_exceptCallback)();
_isWorking = false;
_idleCb();
_idleCb = decltype(_idleCb)();
}
}
setChannel();
break;
}
default:
return;
}
@ -240,7 +305,26 @@ void MysqlConnection::execSql(const std::string &sql,
_exceptCb = exceptCallback;
//_channel.enableWriting();
LOG_TRACE << sql;
if (paraNum == 0)
{
_loop->runInLoop([=]() {
int err;
//int mysql_send_query_start(int *ret, MYSQL *mysql, const char *q, unsigned long length)
_waitStatus = mysql_send_query_start(&err, _stmtPtr.get(), sql.c_str(), sql.length());
if (_waitStatus == 0)
{
if (err)
{
outputError();
return;
}
}
_execStatus = ExecStatus_SendQuery;
setChannel();
});
return;
}
_stmtPtr = std::shared_ptr<MYSQL_STMT>(mysql_stmt_init(_mysqlPtr.get()), [](MYSQL_STMT *stmt) {
//blocking method;
mysql_stmt_close(stmt);

View File

@ -58,6 +58,7 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
enum ExecStatus
{
ExecStatus_None = 0,
ExecStatus_SendQuery,
ExecStatus_StmtPrepare
};
ExecStatus _execStatus = ExecStatus_None;

View File

@ -0,0 +1,48 @@
/**
*
* MysqlResultImpl.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
*
*/
#pragma once
#include "../ResultImpl.h"
#include <mysql.h>
#include <memory>
#include <string>
namespace drogon
{
namespace orm
{
class MysqlResultImpl : public ResultImpl
{
public:
MysqlResultImpl(const std::shared_ptr<MYSQL_RES> &r, const std::string &query) noexcept
: _result(r),
_query(query)
{
}
virtual size_type size() const noexcept override;
virtual row_size_type columns() const noexcept override;
virtual const char *columnName(row_size_type Number) const override;
virtual size_type affectedRows() const noexcept override;
virtual row_size_type columnNumber(const char colName[]) const override;
virtual const char *getValue(size_type row, row_size_type column) const override;
virtual bool isNull(size_type row, row_size_type column) const override;
virtual field_size_type getLength(size_type row, row_size_type column) const override;
private:
std::shared_ptr<MYSQL_RES> _result;
std::string _query;
};
} // namespace orm
} // namespace drogon