Use persisted idle-callback in db connections

This commit is contained in:
antao 2019-02-21 20:00:22 +08:00
parent 15ab5e1558
commit 52f1888a14
12 changed files with 138 additions and 157 deletions

View File

@ -129,15 +129,7 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn,
}
std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format),
std::move(rcb), std::move(exceptCallback),
[=]() -> void {
{
auto connPtr = weakConn.lock();
if (!connPtr)
return;
handleNewTask(connPtr);
}
});
std::move(rcb), std::move(exceptCallback));
}
void DbClientImpl::execSql(std::string &&sql,
size_t paraNum,
@ -233,22 +225,40 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<vo
conn = *iter;
_readyConnections.erase(iter);
}
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, commitCallback, [=]() {
std::weak_ptr<DbClientImpl> weakThis = shared_from_this();
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, commitCallback, [weakThis, conn]() {
auto thisPtr = weakThis.lock();
if (!thisPtr)
return;
if (conn->status() == ConnectStatus_Bad)
{
return;
}
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_connections.find(conn) == _connections.end() &&
_busyConnections.find(conn) == _busyConnections.find(conn))
std::lock_guard<std::mutex> guard(thisPtr->_connectionsMutex);
if (thisPtr->_connections.find(conn) == thisPtr->_connections.end() &&
thisPtr->_busyConnections.find(conn) == thisPtr->_busyConnections.find(conn))
{
//connection is broken and removed
return;
}
}
handleNewTask(conn);
conn->loop()->queueInLoop([weakThis, conn]() {
auto thisPtr = weakThis.lock();
if(!thisPtr)
return;
std::weak_ptr<DbConnection> weakConn = conn;
conn->setIdleCallback([weakThis, weakConn]() {
auto thisPtr = weakThis.lock();
if (!thisPtr)
return;
auto connPtr = weakConn.lock();
if (!connPtr)
return;
thisPtr->handleNewTask(connPtr);
});
thisPtr->handleNewTask(conn);
});
}));
trans->doBegin();
return trans;
@ -356,6 +366,16 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
}
thisPtr->handleNewTask(okConnPtr);
});
std::weak_ptr<DbConnection> weakConn = connPtr;
connPtr->setIdleCallback([weakPtr, weakConn]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
auto connPtr = weakConn.lock();
if (!connPtr)
return;
thisPtr->handleNewTask(connPtr);
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}

View File

@ -82,15 +82,7 @@ void DbClientLockFree::execSql(const DbConnectionPtr &conn,
assert(conn);
std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format),
std::move(rcb), std::move(exceptCallback),
[=]() -> void {
{
auto connPtr = weakConn.lock();
if (!connPtr)
return;
handleNewTask();
}
});
std::move(rcb), std::move(exceptCallback));
}
void DbClientLockFree::execSql(std::string &&sql,
size_t paraNum,
@ -121,7 +113,14 @@ void DbClientLockFree::execSql(std::string &&sql,
{
if (!_connection->isWorking())
{
execSql(_connection, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback));
execSql(_connection,
std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
return;
}
}
@ -164,7 +163,14 @@ void DbClientLockFree::handleNewTask()
if (!_sqlCmdBuffer.empty())
{
auto &cmd = _sqlCmdBuffer.front();
execSql(_connection, std::move(cmd._sql), cmd._paraNum, std::move(cmd._parameters), std::move(cmd._length), std::move(cmd._format), std::move(cmd._cb), std::move(cmd._exceptCb));
execSql(_connection,
std::move(cmd._sql),
cmd._paraNum,
std::move(cmd._parameters),
std::move(cmd._length),
std::move(cmd._format),
std::move(cmd._cb),
std::move(cmd._exceptCb));
_sqlCmdBuffer.pop_front();
return;
}
@ -220,6 +226,12 @@ DbConnectionPtr DbClientLockFree::newConnection()
thisPtr->_connection = okConnPtr;
thisPtr->handleNewTask();
});
connPtr->setIdleCallback([weakPtr]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->handleNewTask();
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}

View File

@ -59,14 +59,17 @@ class DbConnection : public trantor::NonCopyable
{
_closeCb = cb;
}
void setIdleCallback(const std::function<void()> &cb)
{
_idleCb = cb;
}
virtual void execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) = 0;
std::function<void(const std::exception_ptr &)> &&exceptCallback) = 0;
virtual ~DbConnection()
{
LOG_TRACE << "Destruct DbConn" << this;
@ -79,7 +82,7 @@ class DbConnection : public trantor::NonCopyable
protected:
QueryCallback _cb;
trantor::EventLoop *_loop;
std::shared_ptr<std::function<void()>> _idleCbPtr;
std::function<void()> _idleCb;
ConnectStatus _status = ConnectStatus_None;
DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {};
DbConnectionCallback _okCb = [](const DbConnectionPtr &) {};

View File

@ -35,6 +35,10 @@ TransactionImpl::~TransactionImpl()
{
auto loop = _connectionPtr->loop();
loop->queueInLoop([conn = _connectionPtr, ucb = std::move(_usedUpCallback), commitCb = std::move(_commitCallback)]() {
conn->setIdleCallback([ucb = std::move(ucb)]() {
if (ucb)
ucb();
});
conn->execSql("commit",
0,
std::vector<const char *>(),
@ -60,12 +64,6 @@ TransactionImpl::~TransactionImpl()
commitCb(false);
}
}
},
[ucb]() {
if (ucb)
{
ucb();
}
});
});
}
@ -78,8 +76,7 @@ void TransactionImpl::execSql(std::string &&sql,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable {
_loop->queueInLoop([thisPtr = shared_from_this(), sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable {
if (!thisPtr->_isCommitedOrRolledback)
{
if (!thisPtr->_isWorking)
@ -95,9 +92,6 @@ void TransactionImpl::execSql(std::string &&sql,
thisPtr->rollback();
if (exceptCallback)
exceptCallback(ePtr);
},
[thisPtr]() {
thisPtr->execNewTask();
});
}
else
@ -111,6 +105,7 @@ void TransactionImpl::execSql(std::string &&sql,
cmd._format = std::move(format);
cmd._cb = std::move(rcb);
cmd._exceptCb = std::move(exceptCallback);
cmd._thisPtr = thisPtr;
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
}
}
@ -136,29 +131,22 @@ void TransactionImpl::rollback()
_loop->runInLoop([thisPtr]() {
if (thisPtr->_isCommitedOrRolledback)
return;
auto clearupCb = [thisPtr]() {
thisPtr->_isCommitedOrRolledback = true;
if (thisPtr->_usedUpCallback)
{
thisPtr->_usedUpCallback();
thisPtr->_usedUpCallback = decltype(thisPtr->_usedUpCallback)();
}
};
if (thisPtr->_isWorking)
{
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = "rollback";
cmd._paraNum = 0;
cmd._cb = [](const Result &r) {
LOG_TRACE << "Transaction roll back!";
//clearupCb();
cmd._cb = [thisPtr](const Result &r) {
LOG_DEBUG << "Transaction roll back!";
thisPtr->_isCommitedOrRolledback = true;
};
cmd._exceptCb = [](const std::exception_ptr &ePtr) {
cmd._exceptCb = [thisPtr](const std::exception_ptr &ePtr) {
//clearupCb();
thisPtr->_isCommitedOrRolledback = true;
LOG_ERROR << "Transaction rool back error";
};
cmd._idleCb = clearupCb;
cmd._isRollbackCmd = true;
//Rollback cmd should be executed firstly, so we push it in front of the list
thisPtr->_sqlCmdBuffer.push_front(std::move(cmd));
return;
@ -169,17 +157,15 @@ void TransactionImpl::rollback()
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[](const Result &r) {
[thisPtr](const Result &r) {
LOG_TRACE << "Transaction roll back!";
thisPtr->_isCommitedOrRolledback = true;
//clearupCb();
},
[](const std::exception_ptr &ePtr) {
[thisPtr](const std::exception_ptr &ePtr) {
//clearupCb();
LOG_ERROR << "Transaction rool back error";
},
[thisPtr, clearupCb]() {
clearupCb();
thisPtr->execNewTask();
thisPtr->_isCommitedOrRolledback = true;
});
});
}
@ -203,15 +189,10 @@ void TransactionImpl::execNewTask()
std::move(cmd._format),
std::move(cmd._cb),
[cmd, thisPtr](const std::exception_ptr &ePtr) {
if (!cmd._idleCb)
if (!cmd._isRollbackCmd)
thisPtr->rollback();
if (cmd._exceptCb)
cmd._exceptCb(ePtr);
},
[cmd, thisPtr]() {
if (cmd._idleCb)
cmd._idleCb();
thisPtr->execNewTask();
});
return;
}
@ -238,13 +219,23 @@ void TransactionImpl::execNewTask()
}
_sqlCmdBuffer.clear();
}
if (_usedUpCallback)
{
_usedUpCallback();
}
}
}
void TransactionImpl::doBegin()
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr]() {
_loop->queueInLoop([thisPtr = shared_from_this()]() {
std::weak_ptr<TransactionImpl> weakPtr = thisPtr;
thisPtr->_connectionPtr->setIdleCallback([weakPtr]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->execNewTask();
});
assert(!thisPtr->_isWorking);
assert(!thisPtr->_isCommitedOrRolledback);
thisPtr->_isWorking = true;
@ -263,9 +254,6 @@ void TransactionImpl::doBegin()
{
thisPtr->_usedUpCallback();
}
},
[thisPtr]() {
thisPtr->execNewTask();
});
});
}

View File

@ -39,7 +39,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)>&) override
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &) override
{
return shared_from_this();
}
@ -57,7 +57,8 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
std::function<void()> _idleCb;
bool _isRollbackCmd = false;
std::shared_ptr<TransactionImpl> _thisPtr;
};
std::list<SqlCmd> _sqlCmdBuffer;
// std::mutex _bufferMutex;

View File

@ -304,20 +304,17 @@ void MysqlConnection::execSql(std::string &&sql,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
LOG_TRACE << sql;
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
assert(idleCb);
assert(!_isWorking);
assert(!sql.empty());
_cb = std::move(rcb);
_idleCbPtr = std::make_shared<std::function<void()>>(std ::move(idleCb));
_isWorking = true;
_exceptCb = std::move(exceptCallback);
_sql.clear();
@ -439,12 +436,7 @@ void MysqlConnection::outputError()
_cb = nullptr;
_isWorking = false;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
_idleCb();
}
}
@ -460,11 +452,6 @@ void MysqlConnection::getResult(MYSQL_RES *res)
_cb = nullptr;
_exceptCb = nullptr;
_isWorking = false;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
_idleCb();
}
}

View File

@ -43,8 +43,7 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual void disconnect() override;
private:

View File

@ -73,12 +73,19 @@ int main()
// str.resize(filesize);
// pbuf->sgetn(&str[0], filesize);
// *clientPtr << "update users set file=? where id=?" << str << 1000 << Mode::Blocking >> [](const Result &r) {
// std::cout << "update " << r.affectedRows() << " rows" << std::endl;
// } >> [](const DrogonDbException &e) {
// std::cerr << e.base().what() << std::endl;
// };
{
auto trans = clientPtr->newTransaction();
*trans << "update users set file=? where id != ?"
<< "hehaha" << 1000 >>
[](const Result &r) {
std::cout << "hahaha update " << r.affectedRows() << " rows" << std::endl;
//trans->rollback();
} >>
[](const DrogonDbException &e) {
std::cerr << e.base().what() << std::endl;
};
}
LOG_DEBUG << "out of transaction block";
*clientPtr << "select * from users where id=1000" >> [](const Result &r) {
std::cout << "file:" << r[0]["file"].as<std::string>() << std::endl;
} >> [](const DrogonDbException &e) {

View File

@ -155,8 +155,7 @@ void PgConnection::execSqlInLoop(std::string &&sql,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
LOG_TRACE << sql;
_loop->assertInLoopThread();
@ -164,12 +163,10 @@ void PgConnection::execSqlInLoop(std::string &&sql,
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
assert(idleCb);
assert(!_isWorking);
assert(!sql.empty());
_sql = std::move(sql);
_cb = std::move(rcb);
_idleCbPtr = std::make_shared<std::function<void()>>(std::move(idleCb));
_isWorking = true;
_exceptCb = std::move(exceptCallback);
if (paraNum == 0)
@ -200,12 +197,7 @@ void PgConnection::execSqlInLoop(std::string &&sql,
_exceptCb = nullptr;
}
_cb = nullptr;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
_idleCb();
}
return;
}
@ -240,12 +232,7 @@ void PgConnection::execSqlInLoop(std::string &&sql,
_exceptCb = nullptr;
}
_cb = nullptr;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
_idleCb();
}
return;
}
@ -271,12 +258,7 @@ void PgConnection::execSqlInLoop(std::string &&sql,
_exceptCb = nullptr;
}
_cb = nullptr;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
_idleCb();
}
return;
}
@ -315,12 +297,7 @@ void PgConnection::execSqlInLoop(std::string &&sql,
thisPtr->_exceptCb = nullptr;
}
thisPtr->_cb = nullptr;
if (thisPtr->_idleCbPtr)
{
auto idle = std::move(thisPtr->_idleCbPtr);
thisPtr->_idleCbPtr.reset();
(*idle)();
}
thisPtr->_idleCb();
}
return;
}
@ -352,12 +329,6 @@ void PgConnection::handleRead()
_exceptCb = nullptr;
}
_cb = nullptr;
if (_idleCbPtr)
{
//auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
//(*idle)();
}
}
handleClosed();
return;
@ -422,12 +393,7 @@ void PgConnection::handleRead()
{
_isWorking = false;
_isRreparingStatement = false;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
_idleCb();
}
}
}

View File

@ -44,8 +44,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override
std::function<void(const std::exception_ptr &)> &&exceptCallback) override
{
if (_loop->isInLoopThread())
{
@ -55,8 +54,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback),
std::move(idleCb));
std::move(exceptCallback));
}
else
{
@ -68,16 +66,14 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
length = std::move(length),
format = std::move(format),
rcb = std::move(rcb),
exceptCallback = std::move(exceptCallback),
idleCb = std::move(idleCb)]() mutable {
exceptCallback = std::move(exceptCallback)]() mutable {
thisPtr->execSqlInLoop(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback),
std::move(idleCb));
std::move(exceptCallback));
});
}
}
@ -98,8 +94,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb);
std::function<void(const std::exception_ptr &)> &&exceptCallback);
std::function<void()> _preparingCallback;
};

View File

@ -93,12 +93,18 @@ void Sqlite3Connection::execSql(std::string &&sql,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
auto thisPtr = shared_from_this();
_loopThread.getLoop()->runInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable {
thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb);
_loopThread.getLoop()->runInLoop([thisPtr,
sql = std::move(sql),
paraNum,
parameters = std::move(parameters),
length = std::move(length),
format = std::move(format),
rcb = std::move(rcb),
exceptCallback = std::move(exceptCallback)]() mutable {
thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback);
});
}
@ -108,8 +114,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb)
const std::function<void(const std::exception_ptr &)> &exceptCallback)
{
LOG_TRACE << "sql:" << sql;
sqlite3_stmt *stmt = nullptr;
@ -214,7 +219,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
}
rcb(Result(resultPtr));
idleCb();
_idleCb();
}
int Sqlite3Connection::stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite3ResultImpl> &resultPtr, int columnNum)

View File

@ -47,8 +47,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual void disconnect() override;
private:
@ -59,8 +58,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb);
const std::function<void(const std::exception_ptr &)> &exceptCallback);
void onError(const std::string &sql, const std::function<void(const std::exception_ptr &)> &exceptCallback);
int stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite3ResultImpl> &resultPtr, int columnNum);
trantor::EventLoopThread _loopThread;