From cfe55d5495fe65ef9d8f06df753f3ac0961e2e44 Mon Sep 17 00:00:00 2001 From: antao Date: Thu, 29 Nov 2018 15:16:35 +0800 Subject: [PATCH] Temporary commit --- orm_lib/src/mysql_impl/MysqlConnection.cc | 140 +++++++++++++++++++++- orm_lib/src/mysql_impl/MysqlConnection.h | 10 +- orm_lib/src/mysql_impl/test/test1.cc | 4 + 3 files changed, 148 insertions(+), 6 deletions(-) diff --git a/orm_lib/src/mysql_impl/MysqlConnection.cc b/orm_lib/src/mysql_impl/MysqlConnection.cc index b7a5df05..1a4b7644 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.cc +++ b/orm_lib/src/mysql_impl/MysqlConnection.cc @@ -216,11 +216,39 @@ void MysqlConnection::handleEvent() _execStatus = ExecStatus_None; if (err) { - outputError(); + outputStmtError(); //FIXME exception callback; return; } LOG_TRACE << "stmt ready!"; + if (mysql_stmt_bind_param(_stmtPtr.get(), _binds.data())) + { + outputStmtError(); + return; + } + _waitStatus = mysql_stmt_execute_start(&err, _stmtPtr.get()); + _execStatus = ExecStatus_StmtExec; + if (_waitStatus == 0) + { + _execStatus = ExecStatus_None; + if (err) + { + outputStmtError(); + return; + } + _waitStatus = mysql_stmt_store_result_start(&err, _stmtPtr.get()); + _execStatus = ExecStatus_StmtStoreResult; + if (_waitStatus == 0) + { + _execStatus = ExecStatus_None; + if (err) + { + outputStmtError(); + return; + } + getStmtResult(); + } + } } setChannel(); break; @@ -278,6 +306,51 @@ void MysqlConnection::handleEvent() setChannel(); break; } + case ExecStatus_StmtExec: + { + int err; + _waitStatus = mysql_stmt_execute_cont(&err, _stmtPtr.get(), status); + if (_waitStatus == 0) + { + _execStatus = ExecStatus_None; + if (err) + { + outputStmtError(); + return; + } + _waitStatus = mysql_stmt_store_result_start(&err, _stmtPtr.get()); + _execStatus = ExecStatus_StmtStoreResult; + if (_waitStatus == 0) + { + _execStatus = ExecStatus_None; + if (err) + { + outputStmtError(); + return; + } + getStmtResult(); + } + } + setChannel(); + break; + } + case ExecStatus_StmtStoreResult: + { + int err; + _waitStatus = mysql_stmt_store_result_cont(&err, _stmtPtr.get(), status); + if (_waitStatus == 0) + { + _execStatus = ExecStatus_None; + if (err) + { + outputStmtError(); + return; + } + getStmtResult(); + } + setChannel(); + break; + } default: return; } @@ -342,18 +415,28 @@ void MysqlConnection::execSql(const std::string &sql, }); return; } + _stmtPtr = std::shared_ptr(mysql_stmt_init(_mysqlPtr.get()), [](MYSQL_STMT *stmt) { - //blocking method; + //Is it a blocking method? mysql_stmt_close(stmt); }); - if (!_stmtPtr) { - LOG_ERROR << " mysql_stmt_init(), out of memory"; - //FIXME,exception callback + outputError(); return; } + _binds.resize(paraNum); + _lengths.resize(paraNum); + _isNulls.resize(paraNum); + for (size_t i = 0; i < paraNum; i++) + { + _binds[i].buffer = (void *)parameters[i]; + _binds[i].buffer_type = (enum_field_types)format[i]; + _binds[i].buffer_length = length[i]; + _binds[i].length = (length[i] == 0 ? 0 : (_lengths[i] = length[i], &_lengths[i])); + _binds[i].is_null = (parameters[i] == NULL ? 0 : (_isNulls[i] = true, &_isNulls[i])); + } _loop->runInLoop([=]() { int err; _waitStatus = mysql_stmt_prepare_start(&err, _stmtPtr.get(), sql.c_str(), sql.length()); @@ -420,6 +503,36 @@ void MysqlConnection::outputError() } } +void MysqlConnection::outputStmtError() +{ + _channelPtr->disableAll(); + LOG_ERROR << "Error(" + << mysql_stmt_errno(_stmtPtr.get()) << ") [" << mysql_stmt_sqlstate(_stmtPtr.get()) << "] \"" + << mysql_stmt_error(_stmtPtr.get()) << "\""; + if (_isWorking) + { + try + { + //FIXME exception type + throw SqlError(mysql_stmt_error(_stmtPtr.get()), + _sql); + } + catch (...) + { + _exceptCb(std::current_exception()); + _exceptCb = decltype(_exceptCb)(); + } + + _cb = decltype(_cb)(); + _isWorking = false; + if (_idleCb) + { + _idleCb(); + _idleCb = decltype(_idleCb)(); + } + } +} + void MysqlConnection::getResult(MYSQL_RES *res) { auto resultPtr = std::shared_ptr(res, [](MYSQL_RES *r) { @@ -435,4 +548,21 @@ void MysqlConnection::getResult(MYSQL_RES *res) _idleCb(); _idleCb = decltype(_idleCb)(); } +} + +void MysqlConnection::getStmtResult() +{ + auto resultPtr = std::shared_ptr(_stmtPtr->default_rset_handler(_stmtPtr.get()), [](MYSQL_RES *r) { + mysql_free_result(r); + }); + auto Result = makeResult(resultPtr, _sql, mysql_affected_rows(_mysqlPtr.get())); + if (_isWorking) + { + _cb(Result); + _cb = decltype(_cb)(); + _exceptCb = decltype(_exceptCb)(); + _isWorking = false; + _idleCb(); + _idleCb = decltype(_idleCb)(); + } } \ No newline at end of file diff --git a/orm_lib/src/mysql_impl/MysqlConnection.h b/orm_lib/src/mysql_impl/MysqlConnection.h index 548c29cf..227c4cc5 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.h +++ b/orm_lib/src/mysql_impl/MysqlConnection.h @@ -55,18 +55,26 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this void handleEvent(); void setChannel(); void getResult(MYSQL_RES *res); + void getStmtResult(); int _waitStatus; enum ExecStatus { ExecStatus_None = 0, ExecStatus_RealQuery, ExecStatus_StmtPrepare, - ExecStatus_StoreResult + ExecStatus_StoreResult, + ExecStatus_StmtExec, + ExecStatus_StmtStoreResult }; ExecStatus _execStatus = ExecStatus_None; std::shared_ptr _stmtPtr; void outputError(); + void outputStmtError(); + std::vector + _binds; + std::vector _lengths; + std::vector _isNulls; }; } // namespace orm diff --git a/orm_lib/src/mysql_impl/test/test1.cc b/orm_lib/src/mysql_impl/test/test1.cc index b5ce35de..4d6d5ce8 100644 --- a/orm_lib/src/mysql_impl/test/test1.cc +++ b/orm_lib/src/mysql_impl/test/test1.cc @@ -24,6 +24,10 @@ int main() *clientPtr << "select * from users" >> [](const Result &r) { std::cout << "rows:" << r.size() << std::endl; std::cout << "column num:" << r.columns() << std::endl; + for(auto row:r) + { + std::cout << "user_id=" << row["user_id"].as() << std::endl; + } // for (auto row : r) // { // for (auto f : row)