From 620d875646a0ab96842ade96ac58ff4116a6fae8 Mon Sep 17 00:00:00 2001 From: antao Date: Tue, 26 Feb 2019 15:15:23 +0800 Subject: [PATCH] Add an asynchronous interface for transaction creation --- orm_lib/inc/drogon/orm/DbClient.h | 7 ++- orm_lib/src/DbClientImpl.cc | 65 +++++++++++++++------- orm_lib/src/DbClientImpl.h | 9 ++- orm_lib/src/DbClientLockFree.cc | 82 +++++++++++++++++++++++++++- orm_lib/src/DbClientLockFree.h | 11 +++- orm_lib/src/TransactionImpl.h | 9 +++ orm_lib/src/mysql_impl/test/test1.cc | 7 ++- 7 files changed, 160 insertions(+), 30 deletions(-) diff --git a/orm_lib/inc/drogon/orm/DbClient.h b/orm_lib/inc/drogon/orm/DbClient.h index f5364180..ca064625 100644 --- a/orm_lib/inc/drogon/orm/DbClient.h +++ b/orm_lib/inc/drogon/orm/DbClient.h @@ -144,8 +144,12 @@ class DbClient : public trantor::NonCopyable * The callback only indicates the result of the 'commit' command, which is the last * step of the transaction. If the transaction has been automatically or manually rolled back, * the callback will not be executed. + * You can also use the setCommitCallback() method of a transaction object to set the callback. */ - virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) = 0; + virtual std::shared_ptr newTransaction(const std::function &commitCallback = nullptr) = 0; + + /// Create a transaction object in asynchronous mode. + virtual void newTransactionAsync(const std::function &)> &callback) = 0; ClientType type() const { return _type; } const std::string &connectionInfo() { return _connInfo; } @@ -171,6 +175,7 @@ class Transaction : public DbClient public: virtual void rollback() = 0; //virtual void commit() = 0; + virtual void setCommitCallback(const std::function &commitCallback) = 0; }; } // namespace orm diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index d47355d9..99b02230 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -213,24 +213,33 @@ void DbClientImpl::execSql(std::string &&sql, _sqlCmdBuffer.push_back(std::move(cmd)); } } - -std::shared_ptr DbClientImpl::newTransaction(const std::function &commitCallback) +void DbClientImpl::newTransactionAsync(const std::function &)> &callback) { DbConnectionPtr conn; { - std::unique_lock lock(_connectionsMutex); - _transWaitNum++; - _condConnectionReady.wait(lock, [this]() { - return _readyConnections.size() > 0; - }); - _transWaitNum--; - auto iter = _readyConnections.begin(); - _busyConnections.insert(*iter); - conn = *iter; - _readyConnections.erase(iter); + std::lock_guard lock(_connectionsMutex); + if (!_readyConnections.empty()) + { + auto iter = _readyConnections.begin(); + _busyConnections.insert(*iter); + conn = *iter; + _readyConnections.erase(iter); + } } + if (conn) + { + makeTrans(conn, std::function &)>(callback)); + } + else + { + std::lock_guard lock(_transMutex); + _transCallbacks.push(callback); + } +} +void DbClientImpl::makeTrans(const DbConnectionPtr &conn, std::function &)> &&callback) +{ std::weak_ptr weakThis = shared_from_this(); - auto trans = std::shared_ptr(new TransactionImpl(_type, conn, commitCallback, [weakThis, conn]() { + auto trans = std::shared_ptr(new TransactionImpl(_type, conn, std::function(), [weakThis, conn]() { auto thisPtr = weakThis.lock(); if (!thisPtr) return; @@ -265,22 +274,38 @@ std::shared_ptr DbClientImpl::newTransaction(const std::functiondoBegin(); + conn->loop()->queueInLoop([callback = std::move(callback), trans]() { + callback(trans); + }); +} +std::shared_ptr DbClientImpl::newTransaction(const std::function &commitCallback) +{ + std::promise> pro; + auto f = pro.get_future(); + newTransactionAsync([&pro](const std::shared_ptr &trans) { + pro.set_value(trans); + }); + auto trans = f.get(); + trans->setCommitCallback(commitCallback); return trans; } void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr) { + std::function &)> transCallback; { - std::lock_guard guard(_connectionsMutex); - if (_transWaitNum > 0) + std::lock_guard guard(_transMutex); + if (!_transCallbacks.empty()) { - //Prioritize the needs of the transaction - _busyConnections.erase(connPtr); - _readyConnections.insert(connPtr); - _condConnectionReady.notify_one(); - return; + transCallback = std::move(_transCallbacks.front()); + _transCallbacks.pop(); } } + if (transCallback) + { + makeTrans(connPtr, std::move(transCallback)); + return; + } //Then check if there are some sql queries in the buffer std::shared_ptr cmd; { diff --git a/orm_lib/src/DbClientImpl.h b/orm_lib/src/DbClientImpl.h index 2235174f..6b14da40 100644 --- a/orm_lib/src/DbClientImpl.h +++ b/orm_lib/src/DbClientImpl.h @@ -42,7 +42,8 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this &&format, ResultCallback &&rcb, std::function &&exceptCallback) override; - virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) override; + virtual std::shared_ptr newTransaction(const std::function &commitCallback = nullptr) override; + virtual void newTransactionAsync(const std::function &)> &callback) override; private: size_t _connectNum; @@ -60,13 +61,15 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this &)> &&callback); + std::mutex _connectionsMutex; std::unordered_set _connections; std::unordered_set _readyConnections; std::unordered_set _busyConnections; - std::condition_variable _condConnectionReady; - size_t _transWaitNum = 0; + std::mutex _transMutex; + std::queue &)>> _transCallbacks; struct SqlCmd { diff --git a/orm_lib/src/DbClientLockFree.cc b/orm_lib/src/DbClientLockFree.cc index 657ae6fd..9f9a44ff 100644 --- a/orm_lib/src/DbClientLockFree.cc +++ b/orm_lib/src/DbClientLockFree.cc @@ -13,6 +13,7 @@ */ #include "DbClientLockFree.h" +#include "TransactionImpl.h" #include "DbConnection.h" #if USE_POSTGRESQL #include "postgresql_impl/PgConnection.h" @@ -101,7 +102,7 @@ void DbClientLockFree::execSql(std::string &&sql, { for (auto &conn : _connections) { - if (!conn->isWorking()) + if (!conn->isWorking() && (_transSet.empty() || _transSet.find(conn) == _transSet.end())) { conn->execSql(std::move(sql), paraNum, @@ -146,10 +147,85 @@ std::shared_ptr DbClientLockFree::newTransaction(const std::functio return nullptr; } +void DbClientLockFree::newTransactionAsync(const std::function &)> &callback) +{ + _loop->assertInLoopThread(); + for (auto &conn : _connections) + { + if (!conn->isWorking() && _transSet.find(conn) == _transSet.end()) + { + makeTrans(conn, std::function &)>(callback)); + return; + } + } + _transCallbacks.push(callback); +} + +void DbClientLockFree::makeTrans(const DbConnectionPtr &conn, std::function &)> &&callback) +{ + std::weak_ptr weakThis = shared_from_this(); + auto trans = std::shared_ptr(new TransactionImpl(_type, conn, std::function(), [weakThis, conn]() { + auto thisPtr = weakThis.lock(); + if (!thisPtr) + return; + + if (conn->status() == ConnectStatus_Bad) + { + return; + } + if (!thisPtr->_transCallbacks.empty()) + { + auto callback = std::move(thisPtr->_transCallbacks.front()); + thisPtr->_transCallbacks.pop(); + thisPtr->makeTrans(conn, std::move(callback)); + return; + } + + for (auto &connPtr : thisPtr->_connections) + { + if (connPtr == conn) + { + conn->loop()->queueInLoop([weakThis, conn]() { + auto thisPtr = weakThis.lock(); + if (!thisPtr) + return; + std::weak_ptr weakConn = conn; + conn->setIdleCallback([weakThis, weakConn]() { + auto thisPtr = weakThis.lock(); + if (!thisPtr) + return; + auto connPtr = weakConn.lock(); + if (!connPtr) + return; + thisPtr->handleNewTask(connPtr); + }); + thisPtr->_transSet.erase(conn); + thisPtr->handleNewTask(conn); + }); + break; + } + } + })); + _transSet.insert(conn); + trans->doBegin(); + conn->loop()->queueInLoop([callback = std::move(callback), trans] { + callback(trans); + }); +} + void DbClientLockFree::handleNewTask(const DbConnectionPtr &conn) { assert(conn); assert(!conn->isWorking()); + + if (!_transCallbacks.empty()) + { + auto callback = std::move(_transCallbacks.front()); + _transCallbacks.pop(); + makeTrans(conn, std::move(callback)); + return; + } + if (!_sqlCmdBuffer.empty()) { auto &cmd = _sqlCmdBuffer.front(); @@ -213,6 +289,7 @@ DbConnectionPtr DbClientLockFree::newConnection() } } + thisPtr->_transSet.erase(closeConnPtr); //Reconnect after 1 second thisPtr->_loop->runAfter(1, [weakPtr] { auto thisPtr = weakPtr.lock(); @@ -235,7 +312,8 @@ DbConnectionPtr DbClientLockFree::newConnection() if (!thisPtr) return; auto connPtr = weakConnPtr.lock(); - assert(connPtr); + if (!connPtr) + return; thisPtr->handleNewTask(connPtr); }); //std::cout<<"newConn end"< #include #include +#include #include -#include - namespace drogon { @@ -43,7 +42,8 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this &&format, ResultCallback &&rcb, std::function &&exceptCallback) override; - virtual std::shared_ptr newTransaction(const std::function &commitCallback = std::function()) override; + virtual std::shared_ptr newTransaction(const std::function &commitCallback = nullptr) override; + virtual void newTransactionAsync(const std::function &)> &callback) override; private: std::string _connInfo; @@ -52,6 +52,7 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this _connections; std::vector _connectionHolders; + std::unordered_set _transSet; struct SqlCmd { std::string _sql; @@ -80,6 +81,10 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this _sqlCmdBuffer; + std::queue &)>> _transCallbacks; + + void makeTrans(const DbConnectionPtr &conn, std::function &)> &&callback); + void handleNewTask(const DbConnectionPtr &conn); }; diff --git a/orm_lib/src/TransactionImpl.h b/orm_lib/src/TransactionImpl.h index c8b69bd2..b464ccf0 100644 --- a/orm_lib/src/TransactionImpl.h +++ b/orm_lib/src/TransactionImpl.h @@ -29,6 +29,10 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, const std::function &commitCallback, const std::function &usedUpCallback); ~TransactionImpl(); void rollback() override; + virtual void setCommitCallback(const std::function &commitCallback) override + { + _commitCallback = commitCallback; + } private: DbConnectionPtr _connectionPtr; @@ -83,6 +87,10 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< return shared_from_this(); } + virtual void newTransactionAsync(const std::function &)> &callback) override + { + callback(shared_from_this()); + } std::function _usedUpCallback; bool _isCommitedOrRolledback = false; bool _isWorking = false; @@ -102,6 +110,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this< std::list _sqlCmdBuffer; // std::mutex _bufferMutex; friend class DbClientImpl; + friend class DbClientLockFree; void doBegin(); trantor::EventLoop *_loop; std::function _commitCallback; diff --git a/orm_lib/src/mysql_impl/test/test1.cc b/orm_lib/src/mysql_impl/test/test1.cc index eeac609d..5795ab85 100644 --- a/orm_lib/src/mysql_impl/test/test1.cc +++ b/orm_lib/src/mysql_impl/test/test1.cc @@ -74,7 +74,12 @@ int main() // pbuf->sgetn(&str[0], filesize); { - auto trans = clientPtr->newTransaction(); + auto trans = clientPtr->newTransaction([](bool ret){ + if(ret) + { + std::cout << "commited!!!!!!" << std::endl; + } + }); *trans << "update users set file=? where id != ?" << "hehaha" << 1000 >> [](const Result &r) {