From ed8d05de3323b3de2c02d5f02c161f32ac5900d7 Mon Sep 17 00:00:00 2001 From: antao Date: Mon, 10 Dec 2018 18:33:14 +0800 Subject: [PATCH] Add the callback for a transaction commit --- orm_lib/inc/drogon/orm/DbClient.h | 19 ++++- orm_lib/src/DbClientImpl.cc | 4 +- orm_lib/src/DbClientImpl.h | 90 +++++++++++------------ orm_lib/src/TransactionImpl.cc | 40 +++++++--- orm_lib/src/TransactionImpl.h | 5 +- orm_lib/src/postgresql_impl/test/test2.cc | 6 +- 6 files changed, 100 insertions(+), 64 deletions(-) diff --git a/orm_lib/inc/drogon/orm/DbClient.h b/orm_lib/inc/drogon/orm/DbClient.h index 0cd4a5b8..3d72c5e0 100644 --- a/orm_lib/inc/drogon/orm/DbClient.h +++ b/orm_lib/inc/drogon/orm/DbClient.h @@ -70,6 +70,7 @@ class DbClient : public trantor::NonCopyable #if USE_MYSQL static std::shared_ptr newMysqlClient(const std::string &connInfo, const size_t connNum); #endif + //Async method, nonblocking by default; template < typename FUNCTION1, @@ -90,6 +91,7 @@ class DbClient : public trantor::NonCopyable binder >> rCallback; binder >> exceptCallback; } + //Async and nonblocking method template std::future execSqlAsync(const std::string &sql, @@ -107,6 +109,7 @@ class DbClient : public trantor::NonCopyable binder.exec(); return prom->get_future(); } + //Sync and blocking method template const Result execSqlSync(const std::string &sql, @@ -127,10 +130,22 @@ class DbClient : public trantor::NonCopyable return r; } + /// A stream-type method for sql execution internal::SqlBinder operator<<(const std::string &sql); - virtual std::shared_ptr newTransaction() = 0; - ClientType type() const { return _type; } + /// Create a transaction object. + /** + * @param commitCallback: the callback with which user can get the submitting result, + * The Boolean type parameter in the callback function indicates whether the + * transaction was submitted successfully. + * NOTE: + * The callback only indicates the result of the 'commit' command, which is the last + * step of the transaction. If the transaction has been automatically or manually rolled back, + * the callback will not be executed. + */ + virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) = 0; + + ClientType type() const { return _type; } private: friend internal::SqlBinder; diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index d2bf5565..4a1af5c2 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -182,7 +182,7 @@ void DbClientImpl::execSql(const std::string &sql, } } -std::shared_ptr DbClientImpl::newTransaction() +std::shared_ptr DbClientImpl::newTransaction(const std::function &commitCallback) { DbConnectionPtr conn; { @@ -197,7 +197,7 @@ std::shared_ptr DbClientImpl::newTransaction() conn = *iter; _readyConnections.erase(iter); } - auto trans = std::shared_ptr(new TransactionImpl(_type, conn, [=]() { + auto trans = std::shared_ptr(new TransactionImpl(_type, conn, commitCallback, [=]() { if (conn->status() == ConnectStatus_Bad) { return; diff --git a/orm_lib/src/DbClientImpl.h b/orm_lib/src/DbClientImpl.h index 01216423..d402566b 100644 --- a/orm_lib/src/DbClientImpl.h +++ b/orm_lib/src/DbClientImpl.h @@ -31,57 +31,57 @@ namespace orm class DbClientImpl : public DbClient, public std::enable_shared_from_this { - public: - DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type); - virtual ~DbClientImpl() noexcept; - virtual void execSql(const std::string &sql, - size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback) override; - virtual std::shared_ptr newTransaction() override; +public: + DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type); + virtual ~DbClientImpl() noexcept; + virtual void execSql(const std::string &sql, + size_t paraNum, + const std::vector ¶meters, + const std::vector &length, + const std::vector &format, + const ResultCallback &rcb, + const std::function &exceptCallback) override; + virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) override; - private: - void ioLoop(); - std::shared_ptr _loopPtr; - void execSql(const DbConnectionPtr &conn, const std::string &sql, - size_t paraNum, - const std::vector ¶meters, - const std::vector &length, - const std::vector &format, - const ResultCallback &rcb, - const std::function &exceptCallback); +private: + void ioLoop(); + std::shared_ptr _loopPtr; + void execSql(const DbConnectionPtr &conn, const std::string &sql, + size_t paraNum, + const std::vector ¶meters, + const std::vector &length, + const std::vector &format, + const ResultCallback &rcb, + const std::function &exceptCallback); - DbConnectionPtr newConnection(); + DbConnectionPtr newConnection(); - std::unordered_set _connections; - std::unordered_set _readyConnections; - std::unordered_set _busyConnections; - std::string _connInfo; - std::thread _loopThread; - std::mutex _connectionsMutex; - std::condition_variable _condConnectionReady; - size_t _transWaitNum = 0; + std::unordered_set _connections; + std::unordered_set _readyConnections; + std::unordered_set _busyConnections; + std::string _connInfo; + std::thread _loopThread; + std::mutex _connectionsMutex; + std::condition_variable _condConnectionReady; + size_t _transWaitNum = 0; - size_t _connectNum; - bool _stop = false; + size_t _connectNum; + bool _stop = false; - struct SqlCmd - { - std::string _sql; - size_t _paraNum; - std::vector _parameters; - std::vector _length; - std::vector _format; - QueryCallback _cb; - ExceptPtrCallback _exceptCb; - }; - std::list _sqlCmdBuffer; - std::mutex _bufferMutex; + struct SqlCmd + { + std::string _sql; + size_t _paraNum; + std::vector _parameters; + std::vector _length; + std::vector _format; + QueryCallback _cb; + ExceptPtrCallback _exceptCb; + }; + std::list _sqlCmdBuffer; + std::mutex _bufferMutex; - void handleNewTask(const DbConnectionPtr &conn); + void handleNewTask(const DbConnectionPtr &conn); }; } // namespace orm diff --git a/orm_lib/src/TransactionImpl.cc b/orm_lib/src/TransactionImpl.cc index c7945bfb..a2764f8b 100644 --- a/orm_lib/src/TransactionImpl.cc +++ b/orm_lib/src/TransactionImpl.cc @@ -18,10 +18,12 @@ using namespace drogon::orm; TransactionImpl::TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, + const std::function &commitCallback, const std::function &usedUpCallback) : _connectionPtr(connPtr), _usedUpCallback(usedUpCallback), - _loop(connPtr->loop()) + _loop(connPtr->loop()), + _commitCallback(commitCallback) { _type = type; } @@ -31,25 +33,41 @@ TransactionImpl::~TransactionImpl() assert(!_isWorking); if (!_isCommitedOrRolledback) { - auto cb = _usedUpCallback; + auto ucb = std::move(_usedUpCallback); + auto commitCb = std::move(_commitCallback); auto loop = _connectionPtr->loop(); auto conn = _connectionPtr; - loop->queueInLoop([conn, cb]() { + loop->queueInLoop([conn, ucb, commitCb]() { conn->execSql("commit", 0, std::vector(), std::vector(), std::vector(), - [](const Result &r) { + [commitCb](const Result &r) { LOG_TRACE << "Transaction commited!"; - }, - [](const std::exception_ptr &ePtr) { - - }, - [cb]() { - if (cb) + if (commitCb) { - cb(); + commitCb(true); + } + }, + [commitCb](const std::exception_ptr &ePtr) { + if (commitCb) + { + try + { + std::rethrow_exception(ePtr); + } + catch (const DrogonDbException &e) + { + LOG_ERROR << "Transaction submission failed:" << e.base().what(); + commitCb(false); + } + } + }, + [ucb]() { + if (ucb) + { + ucb(); } }); }); diff --git a/orm_lib/src/TransactionImpl.h b/orm_lib/src/TransactionImpl.h index 85128055..2212db74 100644 --- a/orm_lib/src/TransactionImpl.h +++ b/orm_lib/src/TransactionImpl.h @@ -26,7 +26,7 @@ namespace orm class TransactionImpl : public Transaction, public std::enable_shared_from_this { public: - TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, const std::function &usedUpCallback); + TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, const std::function &commitCallback, const std::function &usedUpCallback); ~TransactionImpl(); void rollback() override; @@ -39,7 +39,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< const std::vector &format, const ResultCallback &rcb, const std::function &exceptCallback) override; - virtual std::shared_ptr newTransaction() override + virtual std::shared_ptr newTransaction(const std::function&) override { return shared_from_this(); } @@ -63,6 +63,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< friend class DbClientImpl; void doBegin(); trantor::EventLoop *_loop; + std::function _commitCallback; }; } // namespace orm } // namespace drogon diff --git a/orm_lib/src/postgresql_impl/test/test2.cc b/orm_lib/src/postgresql_impl/test/test2.cc index 399dec93..e3932b48 100644 --- a/orm_lib/src/postgresql_impl/test/test2.cc +++ b/orm_lib/src/postgresql_impl/test/test2.cc @@ -34,7 +34,9 @@ int main() sleep(1); LOG_DEBUG << "start!"; { - auto trans = client->newTransaction(); + auto trans = client->newTransaction([](bool committed) { + std::cout << "The transaction submission " << (committed ? "succeeded" : "failed") << std::endl; + }); *trans << "delete from users where user_uuid=201" >> [trans](const Result &r) { std::cout << "delete " << r.affectedRows() << "user!!!!!" << std::endl; @@ -72,7 +74,7 @@ int main() } >> [](const DrogonDbException &e) { std::cout << e.base().what() << std::endl; }; - for (int i = 0; i < 100;i++) + for (int i = 0; i < 100; i++) mapper.findByPrimaryKey(2, [](User u) { std::cout << "get a user by pk" << std::endl;