Add a mutex

This commit is contained in:
antao 2019-01-10 17:54:51 +08:00
parent 4e704d398d
commit 59a0431512
3 changed files with 21 additions and 8 deletions

View File

@ -44,7 +44,7 @@ using namespace drogon::orm;
DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type)
: _connInfo(connInfo),
_connectNum(connNum),
_loops((connNum / 100 > 0 ? connNum / 100 : 1),"DbLoop")
_loops((connNum / 100 > 0 ? connNum / 100 : 1), "DbLoop")
{
_type = type;
LOG_TRACE << "type=" << (int)type;
@ -56,10 +56,12 @@ DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, Cl
{
auto loop = _loops.getNextLoop();
loop->runInLoop([this, loop]() {
std::lock_guard<std::mutex> lock(_connectionsMutex);
_connections.insert(newConnection(loop));
});
}
}).detach();
})
.detach();
}
DbClientImpl::~DbClientImpl() noexcept
@ -288,16 +290,23 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
std::weak_ptr<DbClientImpl> weakPtr = shared_from_this();
connPtr->setCloseCallback([weakPtr, loop](const DbConnectionPtr &closeConnPtr) {
//Erase the connection
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
{
std::lock_guard<std::mutex> guard(thisPtr->_connectionsMutex);
thisPtr->_readyConnections.erase(closeConnPtr);
thisPtr->_busyConnections.erase(closeConnPtr);
assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end());
thisPtr->_connections.erase(closeConnPtr);
}
//Reconnect after 1 second
loop->runAfter(1, [weakPtr, closeConnPtr, loop] {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
std::lock_guard<std::mutex> guard(thisPtr->_connectionsMutex);
thisPtr->_readyConnections.erase(closeConnPtr);
thisPtr->_busyConnections.erase(closeConnPtr);
assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end());
thisPtr->_connections.erase(closeConnPtr);
thisPtr->_connections.insert(thisPtr->newConnection(loop));
});
});

View File

@ -133,8 +133,10 @@ void MysqlConnection::setChannel()
void MysqlConnection::handleClosed()
{
_status = ConnectStatus_Bad;
_loop->assertInLoopThread();
if (_status == ConnectStatus_Bad)
return;
_status = ConnectStatus_Bad;
_channelPtr->disableAll();
_channelPtr->remove();
assert(_closeCb);

View File

@ -78,8 +78,10 @@ PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo
// }
void PgConnection::handleClosed()
{
_status = ConnectStatus_Bad;
_loop->assertInLoopThread();
if (_status == ConnectStatus_Bad)
return;
_status = ConnectStatus_Bad;
_channel.disableAll();
_channel.remove();
assert(_closeCb);