Merge pull request #122 from an-tao/optimize-db
Optimize database operations
This commit is contained in:
commit
2d9ca5ae5f
|
@ -74,10 +74,26 @@ class DbClient : public trantor::NonCopyable
|
|||
#if USE_SQLITE3
|
||||
static std::shared_ptr<DbClient> newSqlite3Client(const std::string &connInfo, const size_t connNum);
|
||||
#endif
|
||||
|
||||
/// Async and nonblocking method
|
||||
/**
|
||||
* FUNCTION1 is usually the ResultCallback type;
|
||||
* FUNCTION2 is usually the ExceptionCallback type;
|
||||
* @param args are parameters that are bound to placeholders in the @param sql.
|
||||
* NOTE:
|
||||
*
|
||||
* If the number of @param args is not zero, make sure that all criteria
|
||||
* in @param sql are set by bind parameters, for example:
|
||||
*
|
||||
* 1. select * from users where user_id > 10 limit 10 offset 10; //Not bad, no bind parameters are used.
|
||||
* 2. select * from users where user_id > ? limit ? offset ?; //Good, fully use bind parameters.
|
||||
* 3. select * from users where user_id > ? limit ? offset 10; //Bad, partially use bind parameters.
|
||||
*
|
||||
* Strictly speaking, try not to splice SQL statements dynamically, Instead, use the constant sql string
|
||||
* with placeholders and the bind parameters to execute sql.
|
||||
* This rule makes the sql execute faster and more securely, and users should follow this rule when calling
|
||||
* all methods of DbClient.
|
||||
*
|
||||
*/
|
||||
template <typename FUNCTION1,
|
||||
typename FUNCTION2,
|
||||
|
@ -131,7 +147,7 @@ class DbClient : public trantor::NonCopyable
|
|||
return r;
|
||||
}
|
||||
|
||||
/// A stream-type method for sql execution
|
||||
/// Streaming-like method for sql execution. For more information, see the wiki page.
|
||||
internal::SqlBinder operator<<(const std::string &sql);
|
||||
internal::SqlBinder operator<<(std::string &&sql);
|
||||
|
||||
|
|
|
@ -119,14 +119,14 @@ class Mapper
|
|||
|
||||
private:
|
||||
DbClientPtr _client;
|
||||
std::string _limitString;
|
||||
std::string _offsetString;
|
||||
size_t _limit = 0;
|
||||
size_t _offset = 0;
|
||||
std::string _orderbyString;
|
||||
bool _forUpdate = false;
|
||||
void clear()
|
||||
{
|
||||
_limitString.clear();
|
||||
_offsetString.clear();
|
||||
_limit = 0;
|
||||
_offset = 0;
|
||||
_orderbyString.clear();
|
||||
_forUpdate = false;
|
||||
}
|
||||
|
@ -309,23 +309,40 @@ inline T Mapper<T>::findOne(const Criteria &criteria) noexcept(false)
|
|||
{
|
||||
std::string sql = "select * from ";
|
||||
sql += T::tableName;
|
||||
bool hasParameters = false;
|
||||
if (criteria)
|
||||
{
|
||||
sql += " where ";
|
||||
sql += criteria.criteriaString();
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
hasParameters = true;
|
||||
}
|
||||
sql.append(_orderbyString).append(_limitString).append(_offsetString);
|
||||
sql.append(_orderbyString);
|
||||
if (_limit > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" limit $?");
|
||||
}
|
||||
if (_offset > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" offset $?");
|
||||
}
|
||||
if (hasParameters)
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
if (_forUpdate)
|
||||
{
|
||||
sql += " for update";
|
||||
}
|
||||
clear();
|
||||
Result r(nullptr);
|
||||
{
|
||||
auto binder = *_client << std::move(sql);
|
||||
if (criteria)
|
||||
criteria.outputArgs(binder);
|
||||
if (_limit > 0)
|
||||
binder << _limit;
|
||||
if (_offset)
|
||||
binder << _offset;
|
||||
clear();
|
||||
binder << Mode::Blocking;
|
||||
binder >> [&r](const Result &result) {
|
||||
r = result;
|
||||
|
@ -351,21 +368,38 @@ inline void Mapper<T>::findOne(const Criteria &criteria,
|
|||
{
|
||||
std::string sql = "select * from ";
|
||||
sql += T::tableName;
|
||||
bool hasParameters = false;
|
||||
if (criteria)
|
||||
{
|
||||
sql += " where ";
|
||||
sql += criteria.criteriaString();
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
hasParameters = true;
|
||||
}
|
||||
sql.append(_orderbyString).append(_limitString).append(_offsetString);
|
||||
sql.append(_orderbyString);
|
||||
if (_limit > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" limit $?");
|
||||
}
|
||||
if (_offset > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" offset $?");
|
||||
}
|
||||
if (hasParameters)
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
if (_forUpdate)
|
||||
{
|
||||
sql += " for update";
|
||||
}
|
||||
clear();
|
||||
auto binder = *_client << std::move(sql);
|
||||
if (criteria)
|
||||
criteria.outputArgs(binder);
|
||||
if (_limit > 0)
|
||||
binder << _limit;
|
||||
if (_offset)
|
||||
binder << _offset;
|
||||
clear();
|
||||
binder >> [=](const Result &r) {
|
||||
if (r.size() == 0)
|
||||
{
|
||||
|
@ -388,22 +422,38 @@ inline std::future<T> Mapper<T>::findFutureOne(const Criteria &criteria) noexcep
|
|||
{
|
||||
std::string sql = "select * from ";
|
||||
sql += T::tableName;
|
||||
bool hasParameters = false;
|
||||
if (criteria)
|
||||
{
|
||||
sql += " where ";
|
||||
sql += criteria.criteriaString();
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
hasParameters = true;
|
||||
}
|
||||
sql.append(_orderbyString).append(_limitString).append(_offsetString);
|
||||
sql.append(_orderbyString);
|
||||
if (_limit > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" limit $?");
|
||||
}
|
||||
if (_offset > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" offset $?");
|
||||
}
|
||||
if (hasParameters)
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
if (_forUpdate)
|
||||
{
|
||||
sql += " for update";
|
||||
}
|
||||
clear();
|
||||
auto binder = *_client << std::move(sql);
|
||||
if (criteria)
|
||||
criteria.outputArgs(binder);
|
||||
|
||||
if (_limit > 0)
|
||||
binder << _limit;
|
||||
if (_offset)
|
||||
binder << _offset;
|
||||
clear();
|
||||
std::shared_ptr<std::promise<T>> prom = std::make_shared<std::promise<T>>();
|
||||
binder >> [=](const Result &r) {
|
||||
if (r.size() == 0)
|
||||
|
@ -444,23 +494,40 @@ inline std::vector<T> Mapper<T>::findBy(const Criteria &criteria) noexcept(false
|
|||
{
|
||||
std::string sql = "select * from ";
|
||||
sql += T::tableName;
|
||||
bool hasParameters = false;
|
||||
if (criteria)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql += " where ";
|
||||
sql += criteria.criteriaString();
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
}
|
||||
sql.append(_orderbyString).append(_limitString).append(_offsetString);
|
||||
sql.append(_orderbyString);
|
||||
if (_limit > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" limit $?");
|
||||
}
|
||||
if (_offset > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" offset $?");
|
||||
}
|
||||
if (hasParameters)
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
if (_forUpdate)
|
||||
{
|
||||
sql += " for update";
|
||||
}
|
||||
clear();
|
||||
Result r(nullptr);
|
||||
{
|
||||
auto binder = *_client << std::move(sql);
|
||||
if (criteria)
|
||||
criteria.outputArgs(binder);
|
||||
if (_limit > 0)
|
||||
binder << _limit;
|
||||
if (_offset)
|
||||
binder << _offset;
|
||||
clear();
|
||||
binder << Mode::Blocking;
|
||||
binder >> [&r](const Result &result) {
|
||||
r = result;
|
||||
|
@ -481,21 +548,38 @@ inline void Mapper<T>::findBy(const Criteria &criteria,
|
|||
{
|
||||
std::string sql = "select * from ";
|
||||
sql += T::tableName;
|
||||
bool hasParameters = false;
|
||||
if (criteria)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql += " where ";
|
||||
sql += criteria.criteriaString();
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
}
|
||||
sql.append(_orderbyString).append(_limitString).append(_offsetString);
|
||||
sql.append(_orderbyString);
|
||||
if (_limit > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" limit $?");
|
||||
}
|
||||
if (_offset > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" offset $?");
|
||||
}
|
||||
if (hasParameters)
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
if (_forUpdate)
|
||||
{
|
||||
sql += " for update";
|
||||
}
|
||||
clear();
|
||||
auto binder = *_client << std::move(sql);
|
||||
if (criteria)
|
||||
criteria.outputArgs(binder);
|
||||
if (_limit > 0)
|
||||
binder << _limit;
|
||||
if (_offset)
|
||||
binder << _offset;
|
||||
clear();
|
||||
binder >> [=](const Result &r) {
|
||||
std::vector<T> ret;
|
||||
for (auto const &row : r)
|
||||
|
@ -511,22 +595,38 @@ inline std::future<std::vector<T>> Mapper<T>::findFutureBy(const Criteria &crite
|
|||
{
|
||||
std::string sql = "select * from ";
|
||||
sql += T::tableName;
|
||||
bool hasParameters = false;
|
||||
if (criteria)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql += " where ";
|
||||
sql += criteria.criteriaString();
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
}
|
||||
sql.append(_orderbyString).append(_limitString).append(_offsetString);
|
||||
sql.append(_orderbyString);
|
||||
if (_limit > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" limit $?");
|
||||
}
|
||||
if (_offset > 0)
|
||||
{
|
||||
hasParameters = true;
|
||||
sql.append(" offset $?");
|
||||
}
|
||||
if (hasParameters)
|
||||
sql = replaceSqlPlaceHolder(sql, "$?");
|
||||
if (_forUpdate)
|
||||
{
|
||||
sql += " for update";
|
||||
}
|
||||
clear();
|
||||
auto binder = *_client << std::move(sql);
|
||||
if (criteria)
|
||||
criteria.outputArgs(binder);
|
||||
|
||||
if (_limit > 0)
|
||||
binder << _limit;
|
||||
if (_offset)
|
||||
binder << _offset;
|
||||
clear();
|
||||
std::shared_ptr<std::promise<std::vector<T>>> prom = std::make_shared<std::promise<std::vector<T>>>();
|
||||
binder >> [=](const Result &r) {
|
||||
std::vector<T> ret;
|
||||
|
@ -1028,16 +1128,13 @@ template <typename T>
|
|||
inline Mapper<T> &Mapper<T>::limit(size_t limit)
|
||||
{
|
||||
assert(limit > 0);
|
||||
if (limit > 0)
|
||||
{
|
||||
_limitString = utils::formattedString(" limit %u", limit);
|
||||
}
|
||||
_limit = limit;
|
||||
return *this;
|
||||
}
|
||||
template <typename T>
|
||||
inline Mapper<T> &Mapper<T>::offset(size_t offset)
|
||||
{
|
||||
_offsetString = utils::formattedString(" offset %u", offset);
|
||||
_offset = offset;
|
||||
return *this;
|
||||
}
|
||||
template <typename T>
|
||||
|
|
|
@ -75,8 +75,8 @@ int main()
|
|||
// pbuf->sgetn(&str[0], filesize);
|
||||
|
||||
{
|
||||
auto trans = clientPtr->newTransaction([](bool ret){
|
||||
if(ret)
|
||||
auto trans = clientPtr->newTransaction([](bool ret) {
|
||||
if (ret)
|
||||
{
|
||||
std::cout << "commited!!!!!!" << std::endl;
|
||||
}
|
||||
|
@ -97,6 +97,16 @@ int main()
|
|||
} >> [](const DrogonDbException &e) {
|
||||
std::cerr << e.base().what() << std::endl;
|
||||
};
|
||||
|
||||
*clientPtr << "select * from users limit ? offset ?"
|
||||
<< 2
|
||||
<< 2 >>
|
||||
[](const Result &r) {
|
||||
std::cout << "select " << r.size() << " records" << std::endl;
|
||||
} >>
|
||||
[](const DrogonDbException &e) {
|
||||
std::cerr << e.base().what() << std::endl;
|
||||
};
|
||||
LOG_TRACE << "end";
|
||||
getchar();
|
||||
}
|
|
@ -77,7 +77,7 @@ int main()
|
|||
// LOG_DEBUG << "async blocking except callback:" << e.base().what();
|
||||
// },
|
||||
// true);
|
||||
auto f = clientPtr->execSqlAsyncFuture("select * from users where user_uuid > $1 limit $2", 100, "5");
|
||||
auto f = clientPtr->execSqlAsyncFuture("select * from users where user_uuid > $1 limit $2 offset $3", 100, (size_t)2, (size_t)2);
|
||||
try
|
||||
{
|
||||
auto r = f.get();
|
||||
|
|
|
@ -117,33 +117,49 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
|
|||
const std::function<void(const std::exception_ptr &)> &exceptCallback)
|
||||
{
|
||||
LOG_TRACE << "sql:" << sql;
|
||||
sqlite3_stmt *stmt = nullptr;
|
||||
const char *remaining;
|
||||
std::shared_ptr<sqlite3_stmt> stmtPtr;
|
||||
bool newStmt = false;
|
||||
if (paraNum > 0)
|
||||
{
|
||||
|
||||
auto ret = sqlite3_prepare_v2(_conn.get(), sql.data(), -1, &stmt, &remaining);
|
||||
std::shared_ptr<sqlite3_stmt> stmtPtr = stmt ? std::shared_ptr<sqlite3_stmt>(stmt,
|
||||
[](sqlite3_stmt *p) {
|
||||
sqlite3_finalize(p);
|
||||
})
|
||||
: nullptr;
|
||||
if (ret != SQLITE_OK)
|
||||
{
|
||||
onError(sql, exceptCallback);
|
||||
return;
|
||||
}
|
||||
if (!std::all_of(remaining, sql.data() + sql.size(), [](char ch) { return std::isspace(ch); }))
|
||||
{
|
||||
try
|
||||
auto iter = _stmtMap.find(sql);
|
||||
if (iter != _stmtMap.end())
|
||||
{
|
||||
throw SqlError("Multiple semicolon separated statements are unsupported", sql);
|
||||
stmtPtr = iter->second;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
auto exceptPtr = std::current_exception();
|
||||
exceptCallback(exceptPtr);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!stmtPtr)
|
||||
{
|
||||
sqlite3_stmt *stmt = nullptr;
|
||||
newStmt = true;
|
||||
const char *remaining;
|
||||
auto ret = sqlite3_prepare_v2(_conn.get(), sql.data(), -1, &stmt, &remaining);
|
||||
stmtPtr = stmt ? std::shared_ptr<sqlite3_stmt>(stmt,
|
||||
[](sqlite3_stmt *p) {
|
||||
sqlite3_finalize(p);
|
||||
})
|
||||
: nullptr;
|
||||
if (ret != SQLITE_OK || !stmtPtr)
|
||||
{
|
||||
onError(sql, exceptCallback);
|
||||
return;
|
||||
}
|
||||
if (!std::all_of(remaining, sql.data() + sql.size(), [](char ch) { return std::isspace(ch); }))
|
||||
{
|
||||
try
|
||||
{
|
||||
throw SqlError("Multiple semicolon separated statements are unsupported", sql);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
auto exceptPtr = std::current_exception();
|
||||
exceptCallback(exceptPtr);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
assert(stmtPtr);
|
||||
auto stmt = stmtPtr.get();
|
||||
for (int i = 0; i < (int)parameters.size(); i++)
|
||||
{
|
||||
int bindRet;
|
||||
|
@ -177,6 +193,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
|
|||
if (bindRet != SQLITE_OK)
|
||||
{
|
||||
onError(sql, exceptCallback);
|
||||
sqlite3_reset(stmt);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -197,7 +214,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
|
|||
//Readonly, hold read lock;
|
||||
std::shared_lock<SharedMutex> lock(*_sharedMutexPtr);
|
||||
r = stmtStep(stmt, resultPtr, columnNum);
|
||||
stmtPtr.reset();
|
||||
sqlite3_reset(stmt);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -209,15 +226,17 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
|
|||
resultPtr->_affectedRows = sqlite3_changes(_conn.get());
|
||||
resultPtr->_insertId = sqlite3_last_insert_rowid(_conn.get());
|
||||
}
|
||||
stmtPtr.reset();
|
||||
sqlite3_reset(stmt);
|
||||
}
|
||||
|
||||
if (r != SQLITE_DONE)
|
||||
{
|
||||
onError(sql, exceptCallback);
|
||||
sqlite3_reset(stmt);
|
||||
return;
|
||||
}
|
||||
|
||||
if (paraNum > 0 && newStmt)
|
||||
_stmtMap[sql] = stmtPtr;
|
||||
rcb(Result(resultPtr));
|
||||
_idleCb();
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
|
|||
trantor::EventLoopThread _loopThread;
|
||||
std::shared_ptr<sqlite3> _conn;
|
||||
std::shared_ptr<SharedMutex> _sharedMutexPtr;
|
||||
std::unordered_map<std::string,std::shared_ptr<sqlite3_stmt>> _stmtMap;
|
||||
};
|
||||
|
||||
} // namespace orm
|
||||
|
|
|
@ -11,7 +11,7 @@ int main()
|
|||
trantor::Logger::setLogLevel(trantor::Logger::TRACE);
|
||||
auto clientPtr = DbClient::newSqlite3Client("filename=test.db", 3);
|
||||
sleep(1);
|
||||
|
||||
|
||||
LOG_DEBUG << "start!";
|
||||
// *clientPtr << "Drop table groups;" << Mode::Blocking >>
|
||||
// [](const Result &r) {
|
||||
|
@ -44,6 +44,15 @@ int main()
|
|||
[](const DrogonDbException &e) {
|
||||
std::cout << e.base().what() << std::endl;
|
||||
};
|
||||
*clientPtr << "insert into GROUPS (group_name) values(?)"
|
||||
<< "test_group" << Mode::Blocking >>
|
||||
[](const Result &r) {
|
||||
LOG_DEBUG << "inserted:" << r.affectedRows();
|
||||
LOG_DEBUG << "id:" << r.insertId();
|
||||
} >>
|
||||
[](const DrogonDbException &e) {
|
||||
std::cout << e.base().what() << std::endl;
|
||||
};
|
||||
*clientPtr << "select * from GROUPS " >>
|
||||
[](const Result &r) {
|
||||
LOG_DEBUG << "affected rows:" << r.affectedRows();
|
||||
|
@ -62,8 +71,7 @@ int main()
|
|||
LOG_DEBUG << (success ? "commit success!" : "commit failed!");
|
||||
});
|
||||
Mapper<drogon_model::sqlite3::Groups> mapper(trans);
|
||||
mapper.findAll([trans](const std::vector<drogon_model::sqlite3::Groups> &v) {
|
||||
|
||||
mapper.limit(2).offset(1).findAll([trans](const std::vector<drogon_model::sqlite3::Groups> &v) {
|
||||
Mapper<drogon_model::sqlite3::Groups> mapper(trans);
|
||||
for(auto group:v)
|
||||
{
|
||||
|
@ -71,15 +79,24 @@ int main()
|
|||
std::cout << group.toJson() << std::endl;
|
||||
std::cout << "avatar:" << group.getValueOfAvatarAsString() << std::endl;
|
||||
group.setAvatarId("xixi");
|
||||
mapper.update(group, [=](const size_t count) { LOG_DEBUG << "update " << count << " rows";
|
||||
}, [](const DrogonDbException &e) { LOG_ERROR << e.base().what(); });
|
||||
} }, [](const DrogonDbException &e) { LOG_ERROR << e.base().what(); });
|
||||
mapper.update(group,
|
||||
[=](const size_t count)
|
||||
{
|
||||
LOG_DEBUG << "update " << count << " rows";
|
||||
},
|
||||
[](const DrogonDbException &e)
|
||||
{
|
||||
LOG_ERROR << e.base().what();
|
||||
});
|
||||
} },
|
||||
[](const DrogonDbException &e) { LOG_ERROR << e.base().what(); });
|
||||
drogon_model::sqlite3::Groups group;
|
||||
group.setAvatar("hahahaha,xixixixix");
|
||||
try{
|
||||
try
|
||||
{
|
||||
mapper.insert(group);
|
||||
}
|
||||
catch(const DrogonDbException &e)
|
||||
catch (const DrogonDbException &e)
|
||||
{
|
||||
std::cerr << e.base().what() << std::endl;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue