Temporary commit

This commit is contained in:
antao 2018-11-29 15:16:35 +08:00
parent 17e765501b
commit cfe55d5495
3 changed files with 148 additions and 6 deletions

View File

@ -216,11 +216,39 @@ void MysqlConnection::handleEvent()
_execStatus = ExecStatus_None; _execStatus = ExecStatus_None;
if (err) if (err)
{ {
outputError(); outputStmtError();
//FIXME exception callback; //FIXME exception callback;
return; return;
} }
LOG_TRACE << "stmt ready!"; LOG_TRACE << "stmt ready!";
if (mysql_stmt_bind_param(_stmtPtr.get(), _binds.data()))
{
outputStmtError();
return;
}
_waitStatus = mysql_stmt_execute_start(&err, _stmtPtr.get());
_execStatus = ExecStatus_StmtExec;
if (_waitStatus == 0)
{
_execStatus = ExecStatus_None;
if (err)
{
outputStmtError();
return;
}
_waitStatus = mysql_stmt_store_result_start(&err, _stmtPtr.get());
_execStatus = ExecStatus_StmtStoreResult;
if (_waitStatus == 0)
{
_execStatus = ExecStatus_None;
if (err)
{
outputStmtError();
return;
}
getStmtResult();
}
}
} }
setChannel(); setChannel();
break; break;
@ -278,6 +306,51 @@ void MysqlConnection::handleEvent()
setChannel(); setChannel();
break; break;
} }
case ExecStatus_StmtExec:
{
int err;
_waitStatus = mysql_stmt_execute_cont(&err, _stmtPtr.get(), status);
if (_waitStatus == 0)
{
_execStatus = ExecStatus_None;
if (err)
{
outputStmtError();
return;
}
_waitStatus = mysql_stmt_store_result_start(&err, _stmtPtr.get());
_execStatus = ExecStatus_StmtStoreResult;
if (_waitStatus == 0)
{
_execStatus = ExecStatus_None;
if (err)
{
outputStmtError();
return;
}
getStmtResult();
}
}
setChannel();
break;
}
case ExecStatus_StmtStoreResult:
{
int err;
_waitStatus = mysql_stmt_store_result_cont(&err, _stmtPtr.get(), status);
if (_waitStatus == 0)
{
_execStatus = ExecStatus_None;
if (err)
{
outputStmtError();
return;
}
getStmtResult();
}
setChannel();
break;
}
default: default:
return; return;
} }
@ -342,18 +415,28 @@ void MysqlConnection::execSql(const std::string &sql,
}); });
return; return;
} }
_stmtPtr = std::shared_ptr<MYSQL_STMT>(mysql_stmt_init(_mysqlPtr.get()), [](MYSQL_STMT *stmt) { _stmtPtr = std::shared_ptr<MYSQL_STMT>(mysql_stmt_init(_mysqlPtr.get()), [](MYSQL_STMT *stmt) {
//blocking method; //Is it a blocking method?
mysql_stmt_close(stmt); mysql_stmt_close(stmt);
}); });
if (!_stmtPtr) if (!_stmtPtr)
{ {
LOG_ERROR << " mysql_stmt_init(), out of memory"; outputError();
//FIXME,exception callback
return; return;
} }
_binds.resize(paraNum);
_lengths.resize(paraNum);
_isNulls.resize(paraNum);
for (size_t i = 0; i < paraNum; i++)
{
_binds[i].buffer = (void *)parameters[i];
_binds[i].buffer_type = (enum_field_types)format[i];
_binds[i].buffer_length = length[i];
_binds[i].length = (length[i] == 0 ? 0 : (_lengths[i] = length[i], &_lengths[i]));
_binds[i].is_null = (parameters[i] == NULL ? 0 : (_isNulls[i] = true, &_isNulls[i]));
}
_loop->runInLoop([=]() { _loop->runInLoop([=]() {
int err; int err;
_waitStatus = mysql_stmt_prepare_start(&err, _stmtPtr.get(), sql.c_str(), sql.length()); _waitStatus = mysql_stmt_prepare_start(&err, _stmtPtr.get(), sql.c_str(), sql.length());
@ -420,6 +503,36 @@ void MysqlConnection::outputError()
} }
} }
void MysqlConnection::outputStmtError()
{
_channelPtr->disableAll();
LOG_ERROR << "Error("
<< mysql_stmt_errno(_stmtPtr.get()) << ") [" << mysql_stmt_sqlstate(_stmtPtr.get()) << "] \""
<< mysql_stmt_error(_stmtPtr.get()) << "\"";
if (_isWorking)
{
try
{
//FIXME exception type
throw SqlError(mysql_stmt_error(_stmtPtr.get()),
_sql);
}
catch (...)
{
_exceptCb(std::current_exception());
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
_isWorking = false;
if (_idleCb)
{
_idleCb();
_idleCb = decltype(_idleCb)();
}
}
}
void MysqlConnection::getResult(MYSQL_RES *res) void MysqlConnection::getResult(MYSQL_RES *res)
{ {
auto resultPtr = std::shared_ptr<MYSQL_RES>(res, [](MYSQL_RES *r) { auto resultPtr = std::shared_ptr<MYSQL_RES>(res, [](MYSQL_RES *r) {
@ -435,4 +548,21 @@ void MysqlConnection::getResult(MYSQL_RES *res)
_idleCb(); _idleCb();
_idleCb = decltype(_idleCb)(); _idleCb = decltype(_idleCb)();
} }
}
void MysqlConnection::getStmtResult()
{
auto resultPtr = std::shared_ptr<MYSQL_RES>(_stmtPtr->default_rset_handler(_stmtPtr.get()), [](MYSQL_RES *r) {
mysql_free_result(r);
});
auto Result = makeResult(resultPtr, _sql, mysql_affected_rows(_mysqlPtr.get()));
if (_isWorking)
{
_cb(Result);
_cb = decltype(_cb)();
_exceptCb = decltype(_exceptCb)();
_isWorking = false;
_idleCb();
_idleCb = decltype(_idleCb)();
}
} }

View File

@ -55,18 +55,26 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
void handleEvent(); void handleEvent();
void setChannel(); void setChannel();
void getResult(MYSQL_RES *res); void getResult(MYSQL_RES *res);
void getStmtResult();
int _waitStatus; int _waitStatus;
enum ExecStatus enum ExecStatus
{ {
ExecStatus_None = 0, ExecStatus_None = 0,
ExecStatus_RealQuery, ExecStatus_RealQuery,
ExecStatus_StmtPrepare, ExecStatus_StmtPrepare,
ExecStatus_StoreResult ExecStatus_StoreResult,
ExecStatus_StmtExec,
ExecStatus_StmtStoreResult
}; };
ExecStatus _execStatus = ExecStatus_None; ExecStatus _execStatus = ExecStatus_None;
std::shared_ptr<MYSQL_STMT> _stmtPtr; std::shared_ptr<MYSQL_STMT> _stmtPtr;
void outputError(); void outputError();
void outputStmtError();
std::vector<MYSQL_BIND>
_binds;
std::vector<unsigned long> _lengths;
std::vector<my_bool> _isNulls;
}; };
} // namespace orm } // namespace orm

View File

@ -24,6 +24,10 @@ int main()
*clientPtr << "select * from users" >> [](const Result &r) { *clientPtr << "select * from users" >> [](const Result &r) {
std::cout << "rows:" << r.size() << std::endl; std::cout << "rows:" << r.size() << std::endl;
std::cout << "column num:" << r.columns() << std::endl; std::cout << "column num:" << r.columns() << std::endl;
for(auto row:r)
{
std::cout << "user_id=" << row["user_id"].as<std::string>() << std::endl;
}
// for (auto row : r) // for (auto row : r)
// { // {
// for (auto f : row) // for (auto f : row)