Add an asynchronous interface for transaction creation
This commit is contained in:
parent
1ca5e3cde6
commit
620d875646
|
@ -144,8 +144,12 @@ class DbClient : public trantor::NonCopyable
|
|||
* The callback only indicates the result of the 'commit' command, which is the last
|
||||
* step of the transaction. If the transaction has been automatically or manually rolled back,
|
||||
* the callback will not be executed.
|
||||
* You can also use the setCommitCallback() method of a transaction object to set the callback.
|
||||
*/
|
||||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) = 0;
|
||||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = nullptr) = 0;
|
||||
|
||||
/// Create a transaction object in asynchronous mode.
|
||||
virtual void newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback) = 0;
|
||||
|
||||
ClientType type() const { return _type; }
|
||||
const std::string &connectionInfo() { return _connInfo; }
|
||||
|
@ -171,6 +175,7 @@ class Transaction : public DbClient
|
|||
public:
|
||||
virtual void rollback() = 0;
|
||||
//virtual void commit() = 0;
|
||||
virtual void setCommitCallback(const std::function<void(bool)> &commitCallback) = 0;
|
||||
};
|
||||
|
||||
} // namespace orm
|
||||
|
|
|
@ -213,24 +213,33 @@ void DbClientImpl::execSql(std::string &&sql,
|
|||
_sqlCmdBuffer.push_back(std::move(cmd));
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<void(bool)> &commitCallback)
|
||||
void DbClientImpl::newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback)
|
||||
{
|
||||
DbConnectionPtr conn;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(_connectionsMutex);
|
||||
_transWaitNum++;
|
||||
_condConnectionReady.wait(lock, [this]() {
|
||||
return _readyConnections.size() > 0;
|
||||
});
|
||||
_transWaitNum--;
|
||||
auto iter = _readyConnections.begin();
|
||||
_busyConnections.insert(*iter);
|
||||
conn = *iter;
|
||||
_readyConnections.erase(iter);
|
||||
std::lock_guard<std::mutex> lock(_connectionsMutex);
|
||||
if (!_readyConnections.empty())
|
||||
{
|
||||
auto iter = _readyConnections.begin();
|
||||
_busyConnections.insert(*iter);
|
||||
conn = *iter;
|
||||
_readyConnections.erase(iter);
|
||||
}
|
||||
}
|
||||
if (conn)
|
||||
{
|
||||
makeTrans(conn, std::function<void(const std::shared_ptr<Transaction> &)>(callback));
|
||||
}
|
||||
else
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_transMutex);
|
||||
_transCallbacks.push(callback);
|
||||
}
|
||||
}
|
||||
void DbClientImpl::makeTrans(const DbConnectionPtr &conn, std::function<void(const std::shared_ptr<Transaction> &)> &&callback)
|
||||
{
|
||||
std::weak_ptr<DbClientImpl> weakThis = shared_from_this();
|
||||
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, commitCallback, [weakThis, conn]() {
|
||||
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, std::function<void(bool)>(), [weakThis, conn]() {
|
||||
auto thisPtr = weakThis.lock();
|
||||
if (!thisPtr)
|
||||
return;
|
||||
|
@ -265,22 +274,38 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<vo
|
|||
});
|
||||
}));
|
||||
trans->doBegin();
|
||||
conn->loop()->queueInLoop([callback = std::move(callback), trans]() {
|
||||
callback(trans);
|
||||
});
|
||||
}
|
||||
std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<void(bool)> &commitCallback)
|
||||
{
|
||||
std::promise<std::shared_ptr<Transaction>> pro;
|
||||
auto f = pro.get_future();
|
||||
newTransactionAsync([&pro](const std::shared_ptr<Transaction> &trans) {
|
||||
pro.set_value(trans);
|
||||
});
|
||||
auto trans = f.get();
|
||||
trans->setCommitCallback(commitCallback);
|
||||
return trans;
|
||||
}
|
||||
|
||||
void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
|
||||
{
|
||||
std::function<void(const std::shared_ptr<Transaction> &)> transCallback;
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(_connectionsMutex);
|
||||
if (_transWaitNum > 0)
|
||||
std::lock_guard<std::mutex> guard(_transMutex);
|
||||
if (!_transCallbacks.empty())
|
||||
{
|
||||
//Prioritize the needs of the transaction
|
||||
_busyConnections.erase(connPtr);
|
||||
_readyConnections.insert(connPtr);
|
||||
_condConnectionReady.notify_one();
|
||||
return;
|
||||
transCallback = std::move(_transCallbacks.front());
|
||||
_transCallbacks.pop();
|
||||
}
|
||||
}
|
||||
if (transCallback)
|
||||
{
|
||||
makeTrans(connPtr, std::move(transCallback));
|
||||
return;
|
||||
}
|
||||
//Then check if there are some sql queries in the buffer
|
||||
std::shared_ptr<SqlCmd> cmd;
|
||||
{
|
||||
|
|
|
@ -42,7 +42,8 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
|
|||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
|
||||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
|
||||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = nullptr) override;
|
||||
virtual void newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback) override;
|
||||
|
||||
private:
|
||||
size_t _connectNum;
|
||||
|
@ -60,13 +61,15 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
|
|||
|
||||
DbConnectionPtr newConnection(trantor::EventLoop *loop);
|
||||
|
||||
void makeTrans(const DbConnectionPtr &conn, std::function<void(const std::shared_ptr<Transaction> &)> &&callback);
|
||||
|
||||
std::mutex _connectionsMutex;
|
||||
std::unordered_set<DbConnectionPtr> _connections;
|
||||
std::unordered_set<DbConnectionPtr> _readyConnections;
|
||||
std::unordered_set<DbConnectionPtr> _busyConnections;
|
||||
|
||||
std::condition_variable _condConnectionReady;
|
||||
size_t _transWaitNum = 0;
|
||||
std::mutex _transMutex;
|
||||
std::queue<std::function<void(const std::shared_ptr<Transaction> &)>> _transCallbacks;
|
||||
|
||||
struct SqlCmd
|
||||
{
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
*/
|
||||
|
||||
#include "DbClientLockFree.h"
|
||||
#include "TransactionImpl.h"
|
||||
#include "DbConnection.h"
|
||||
#if USE_POSTGRESQL
|
||||
#include "postgresql_impl/PgConnection.h"
|
||||
|
@ -101,7 +102,7 @@ void DbClientLockFree::execSql(std::string &&sql,
|
|||
{
|
||||
for (auto &conn : _connections)
|
||||
{
|
||||
if (!conn->isWorking())
|
||||
if (!conn->isWorking() && (_transSet.empty() || _transSet.find(conn) == _transSet.end()))
|
||||
{
|
||||
conn->execSql(std::move(sql),
|
||||
paraNum,
|
||||
|
@ -146,10 +147,85 @@ std::shared_ptr<Transaction> DbClientLockFree::newTransaction(const std::functio
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
void DbClientLockFree::newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback)
|
||||
{
|
||||
_loop->assertInLoopThread();
|
||||
for (auto &conn : _connections)
|
||||
{
|
||||
if (!conn->isWorking() && _transSet.find(conn) == _transSet.end())
|
||||
{
|
||||
makeTrans(conn, std::function<void(const std::shared_ptr<Transaction> &)>(callback));
|
||||
return;
|
||||
}
|
||||
}
|
||||
_transCallbacks.push(callback);
|
||||
}
|
||||
|
||||
void DbClientLockFree::makeTrans(const DbConnectionPtr &conn, std::function<void(const std::shared_ptr<Transaction> &)> &&callback)
|
||||
{
|
||||
std::weak_ptr<DbClientLockFree> weakThis = shared_from_this();
|
||||
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, std::function<void(bool)>(), [weakThis, conn]() {
|
||||
auto thisPtr = weakThis.lock();
|
||||
if (!thisPtr)
|
||||
return;
|
||||
|
||||
if (conn->status() == ConnectStatus_Bad)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (!thisPtr->_transCallbacks.empty())
|
||||
{
|
||||
auto callback = std::move(thisPtr->_transCallbacks.front());
|
||||
thisPtr->_transCallbacks.pop();
|
||||
thisPtr->makeTrans(conn, std::move(callback));
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto &connPtr : thisPtr->_connections)
|
||||
{
|
||||
if (connPtr == conn)
|
||||
{
|
||||
conn->loop()->queueInLoop([weakThis, conn]() {
|
||||
auto thisPtr = weakThis.lock();
|
||||
if (!thisPtr)
|
||||
return;
|
||||
std::weak_ptr<DbConnection> weakConn = conn;
|
||||
conn->setIdleCallback([weakThis, weakConn]() {
|
||||
auto thisPtr = weakThis.lock();
|
||||
if (!thisPtr)
|
||||
return;
|
||||
auto connPtr = weakConn.lock();
|
||||
if (!connPtr)
|
||||
return;
|
||||
thisPtr->handleNewTask(connPtr);
|
||||
});
|
||||
thisPtr->_transSet.erase(conn);
|
||||
thisPtr->handleNewTask(conn);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}));
|
||||
_transSet.insert(conn);
|
||||
trans->doBegin();
|
||||
conn->loop()->queueInLoop([callback = std::move(callback), trans] {
|
||||
callback(trans);
|
||||
});
|
||||
}
|
||||
|
||||
void DbClientLockFree::handleNewTask(const DbConnectionPtr &conn)
|
||||
{
|
||||
assert(conn);
|
||||
assert(!conn->isWorking());
|
||||
|
||||
if (!_transCallbacks.empty())
|
||||
{
|
||||
auto callback = std::move(_transCallbacks.front());
|
||||
_transCallbacks.pop();
|
||||
makeTrans(conn, std::move(callback));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_sqlCmdBuffer.empty())
|
||||
{
|
||||
auto &cmd = _sqlCmdBuffer.front();
|
||||
|
@ -213,6 +289,7 @@ DbConnectionPtr DbClientLockFree::newConnection()
|
|||
}
|
||||
}
|
||||
|
||||
thisPtr->_transSet.erase(closeConnPtr);
|
||||
//Reconnect after 1 second
|
||||
thisPtr->_loop->runAfter(1, [weakPtr] {
|
||||
auto thisPtr = weakPtr.lock();
|
||||
|
@ -235,7 +312,8 @@ DbConnectionPtr DbClientLockFree::newConnection()
|
|||
if (!thisPtr)
|
||||
return;
|
||||
auto connPtr = weakConnPtr.lock();
|
||||
assert(connPtr);
|
||||
if (!connPtr)
|
||||
return;
|
||||
thisPtr->handleNewTask(connPtr);
|
||||
});
|
||||
//std::cout<<"newConn end"<<connPtr<<std::endl;
|
||||
|
|
|
@ -22,9 +22,8 @@
|
|||
#include <thread>
|
||||
#include <functional>
|
||||
#include <string>
|
||||
#include <queue>
|
||||
#include <unordered_set>
|
||||
#include <list>
|
||||
|
||||
|
||||
namespace drogon
|
||||
{
|
||||
|
@ -43,7 +42,8 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this<Db
|
|||
std::vector<int> &&format,
|
||||
ResultCallback &&rcb,
|
||||
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
|
||||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
|
||||
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = nullptr) override;
|
||||
virtual void newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback) override;
|
||||
|
||||
private:
|
||||
std::string _connInfo;
|
||||
|
@ -52,6 +52,7 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this<Db
|
|||
const size_t _connectionNum = 4;
|
||||
std::vector<DbConnectionPtr> _connections;
|
||||
std::vector<DbConnectionPtr> _connectionHolders;
|
||||
std::unordered_set<DbConnectionPtr> _transSet;
|
||||
struct SqlCmd
|
||||
{
|
||||
std::string _sql;
|
||||
|
@ -80,6 +81,10 @@ class DbClientLockFree : public DbClient, public std::enable_shared_from_this<Db
|
|||
};
|
||||
std::deque<SqlCmd> _sqlCmdBuffer;
|
||||
|
||||
std::queue<std::function<void(const std::shared_ptr<Transaction> &)>> _transCallbacks;
|
||||
|
||||
void makeTrans(const DbConnectionPtr &conn, std::function<void(const std::shared_ptr<Transaction> &)> &&callback);
|
||||
|
||||
void handleNewTask(const DbConnectionPtr &conn);
|
||||
};
|
||||
|
||||
|
|
|
@ -29,6 +29,10 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
|
|||
TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, const std::function<void(bool)> &commitCallback, const std::function<void()> &usedUpCallback);
|
||||
~TransactionImpl();
|
||||
void rollback() override;
|
||||
virtual void setCommitCallback(const std::function<void(bool)> &commitCallback) override
|
||||
{
|
||||
_commitCallback = commitCallback;
|
||||
}
|
||||
|
||||
private:
|
||||
DbConnectionPtr _connectionPtr;
|
||||
|
@ -83,6 +87,10 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
|
|||
return shared_from_this();
|
||||
}
|
||||
|
||||
virtual void newTransactionAsync(const std::function<void(const std::shared_ptr<Transaction> &)> &callback) override
|
||||
{
|
||||
callback(shared_from_this());
|
||||
}
|
||||
std::function<void()> _usedUpCallback;
|
||||
bool _isCommitedOrRolledback = false;
|
||||
bool _isWorking = false;
|
||||
|
@ -102,6 +110,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
|
|||
std::list<SqlCmd> _sqlCmdBuffer;
|
||||
// std::mutex _bufferMutex;
|
||||
friend class DbClientImpl;
|
||||
friend class DbClientLockFree;
|
||||
void doBegin();
|
||||
trantor::EventLoop *_loop;
|
||||
std::function<void(bool)> _commitCallback;
|
||||
|
|
|
@ -74,7 +74,12 @@ int main()
|
|||
// pbuf->sgetn(&str[0], filesize);
|
||||
|
||||
{
|
||||
auto trans = clientPtr->newTransaction();
|
||||
auto trans = clientPtr->newTransaction([](bool ret){
|
||||
if(ret)
|
||||
{
|
||||
std::cout << "commited!!!!!!" << std::endl;
|
||||
}
|
||||
});
|
||||
*trans << "update users set file=? where id != ?"
|
||||
<< "hehaha" << 1000 >>
|
||||
[](const Result &r) {
|
||||
|
|
Loading…
Reference in New Issue