Merge pull request #15 from an-tao/transaction

Add the callback for a transaction commit
This commit is contained in:
an-tao 2018-12-10 18:37:26 +08:00 committed by GitHub
commit 7513089055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 100 additions and 64 deletions

View File

@ -70,6 +70,7 @@ class DbClient : public trantor::NonCopyable
#if USE_MYSQL
static std::shared_ptr<DbClient> newMysqlClient(const std::string &connInfo, const size_t connNum);
#endif
//Async method, nonblocking by default;
template <
typename FUNCTION1,
@ -90,6 +91,7 @@ class DbClient : public trantor::NonCopyable
binder >> rCallback;
binder >> exceptCallback;
}
//Async and nonblocking method
template <typename... Arguments>
std::future<const Result> execSqlAsync(const std::string &sql,
@ -107,6 +109,7 @@ class DbClient : public trantor::NonCopyable
binder.exec();
return prom->get_future();
}
//Sync and blocking method
template <typename... Arguments>
const Result execSqlSync(const std::string &sql,
@ -127,10 +130,22 @@ class DbClient : public trantor::NonCopyable
return r;
}
/// A stream-type method for sql execution
internal::SqlBinder operator<<(const std::string &sql);
virtual std::shared_ptr<Transaction> newTransaction() = 0;
ClientType type() const { return _type; }
/// Create a transaction object.
/**
* @param commitCallback: the callback with which user can get the submitting result,
* The Boolean type parameter in the callback function indicates whether the
* transaction was submitted successfully.
* NOTE:
* The callback only indicates the result of the 'commit' command, which is the last
* step of the transaction. If the transaction has been automatically or manually rolled back,
* the callback will not be executed.
*/
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) = 0;
ClientType type() const { return _type; }
private:
friend internal::SqlBinder;

View File

@ -182,7 +182,7 @@ void DbClientImpl::execSql(const std::string &sql,
}
}
std::shared_ptr<Transaction> DbClientImpl::newTransaction()
std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<void(bool)> &commitCallback)
{
DbConnectionPtr conn;
{
@ -197,7 +197,7 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction()
conn = *iter;
_readyConnections.erase(iter);
}
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, [=]() {
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, commitCallback, [=]() {
if (conn->status() == ConnectStatus_Bad)
{
return;

View File

@ -31,57 +31,57 @@ namespace orm
class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClientImpl>
{
public:
DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type);
virtual ~DbClientImpl() noexcept;
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction() override;
public:
DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type);
virtual ~DbClientImpl() noexcept;
virtual void execSql(const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private:
void ioLoop();
std::shared_ptr<trantor::EventLoop> _loopPtr;
void execSql(const DbConnectionPtr &conn, const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback);
private:
void ioLoop();
std::shared_ptr<trantor::EventLoop> _loopPtr;
void execSql(const DbConnectionPtr &conn, const std::string &sql,
size_t paraNum,
const std::vector<const char *> &parameters,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback);
DbConnectionPtr newConnection();
DbConnectionPtr newConnection();
std::unordered_set<DbConnectionPtr> _connections;
std::unordered_set<DbConnectionPtr> _readyConnections;
std::unordered_set<DbConnectionPtr> _busyConnections;
std::string _connInfo;
std::thread _loopThread;
std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady;
size_t _transWaitNum = 0;
std::unordered_set<DbConnectionPtr> _connections;
std::unordered_set<DbConnectionPtr> _readyConnections;
std::unordered_set<DbConnectionPtr> _busyConnections;
std::string _connInfo;
std::thread _loopThread;
std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady;
size_t _transWaitNum = 0;
size_t _connectNum;
bool _stop = false;
size_t _connectNum;
bool _stop = false;
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<const char *> _parameters;
std::vector<int> _length;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::list<SqlCmd> _sqlCmdBuffer;
std::mutex _bufferMutex;
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<const char *> _parameters;
std::vector<int> _length;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::list<SqlCmd> _sqlCmdBuffer;
std::mutex _bufferMutex;
void handleNewTask(const DbConnectionPtr &conn);
void handleNewTask(const DbConnectionPtr &conn);
};
} // namespace orm

View File

@ -18,10 +18,12 @@
using namespace drogon::orm;
TransactionImpl::TransactionImpl(ClientType type, const DbConnectionPtr &connPtr,
const std::function<void(bool)> &commitCallback,
const std::function<void()> &usedUpCallback)
: _connectionPtr(connPtr),
_usedUpCallback(usedUpCallback),
_loop(connPtr->loop())
_loop(connPtr->loop()),
_commitCallback(commitCallback)
{
_type = type;
}
@ -31,25 +33,41 @@ TransactionImpl::~TransactionImpl()
assert(!_isWorking);
if (!_isCommitedOrRolledback)
{
auto cb = _usedUpCallback;
auto ucb = std::move(_usedUpCallback);
auto commitCb = std::move(_commitCallback);
auto loop = _connectionPtr->loop();
auto conn = _connectionPtr;
loop->queueInLoop([conn, cb]() {
loop->queueInLoop([conn, ucb, commitCb]() {
conn->execSql("commit",
0,
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[](const Result &r) {
[commitCb](const Result &r) {
LOG_TRACE << "Transaction commited!";
},
[](const std::exception_ptr &ePtr) {
},
[cb]() {
if (cb)
if (commitCb)
{
cb();
commitCb(true);
}
},
[commitCb](const std::exception_ptr &ePtr) {
if (commitCb)
{
try
{
std::rethrow_exception(ePtr);
}
catch (const DrogonDbException &e)
{
LOG_ERROR << "Transaction submission failed:" << e.base().what();
commitCb(false);
}
}
},
[ucb]() {
if (ucb)
{
ucb();
}
});
});

View File

@ -26,7 +26,7 @@ namespace orm
class TransactionImpl : public Transaction, public std::enable_shared_from_this<TransactionImpl>
{
public:
TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, const std::function<void()> &usedUpCallback);
TransactionImpl(ClientType type, const DbConnectionPtr &connPtr, const std::function<void(bool)> &commitCallback, const std::function<void()> &usedUpCallback);
~TransactionImpl();
void rollback() override;
@ -39,7 +39,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction() override
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)>&) override
{
return shared_from_this();
}
@ -63,6 +63,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
friend class DbClientImpl;
void doBegin();
trantor::EventLoop *_loop;
std::function<void(bool)> _commitCallback;
};
} // namespace orm
} // namespace drogon

View File

@ -34,7 +34,9 @@ int main()
sleep(1);
LOG_DEBUG << "start!";
{
auto trans = client->newTransaction();
auto trans = client->newTransaction([](bool committed) {
std::cout << "The transaction submission " << (committed ? "succeeded" : "failed") << std::endl;
});
*trans << "delete from users where user_uuid=201" >>
[trans](const Result &r) {
std::cout << "delete " << r.affectedRows() << "user!!!!!" << std::endl;
@ -72,7 +74,7 @@ int main()
} >> [](const DrogonDbException &e) {
std::cout << e.base().what() << std::endl;
};
for (int i = 0; i < 100;i++)
for (int i = 0; i < 100; i++)
mapper.findByPrimaryKey(2,
[](User u) {
std::cout << "get a user by pk" << std::endl;