Merge pull request #25 from an-tao/DbClient

Modify the DbClientImpl class
This commit is contained in:
An Tao 2019-01-15 18:18:31 +08:00 committed by GitHub
commit 9156058c79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 44 additions and 1 deletions

View File

@ -97,6 +97,10 @@ DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, Cl
DbClientImpl::~DbClientImpl() noexcept
{
std::lock_guard<std::mutex> lock(_connectionsMutex);
for (auto &conn : _connections)
{
conn->disconnect();
}
_connections.clear();
_readyConnections.clear();
_busyConnections.clear();
@ -182,7 +186,7 @@ void DbClientImpl::execSql(std::string &&sql,
bool busy = false;
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 10000)
if (_sqlCmdBuffer.size() > 200000)
{
//too many queries in buffer;
busy = true;

View File

@ -65,6 +65,7 @@ class DbConnection : public trantor::NonCopyable
}
ConnectStatus status() const { return _status; }
trantor::EventLoop *loop() { return _loop; }
virtual void disconnect() = 0;
protected:
QueryCallback _cb;

View File

@ -149,6 +149,16 @@ void MysqlConnection::handleClosed()
auto thisPtr = shared_from_this();
_closeCb(thisPtr);
}
void MysqlConnection::disconnect()
{
auto thisPtr = shared_from_this();
_loop->runInLoop([thisPtr]() {
thisPtr->_status = ConnectStatus_Bad;
thisPtr->_channelPtr->disableAll();
thisPtr->_channelPtr->remove();
thisPtr->_mysqlPtr.reset();
});
}
void MysqlConnection::handleTimeout()
{
LOG_TRACE << "channel index:" << _channelPtr->index();

View File

@ -45,6 +45,7 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
virtual void disconnect() override;
private:
std::unique_ptr<trantor::Channel> _channelPtr;

View File

@ -88,6 +88,20 @@ void PgConnection::handleClosed()
auto thisPtr = shared_from_this();
_closeCb(thisPtr);
}
void PgConnection::disconnect()
{
std::promise<int> pro;
auto f = pro.get_future();
auto thisPtr = shared_from_this();
_loop->runInLoop([thisPtr, &pro]() {
thisPtr->_status = ConnectStatus_Bad;
thisPtr->_channel.disableAll();
thisPtr->_channel.remove();
thisPtr->_connPtr.reset();
pro.set_value(1);
});
f.get();
}
void PgConnection::pgPoll()
{
_loop->assertInLoopThread();

View File

@ -45,6 +45,7 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
virtual void disconnect() override;
private:
std::shared_ptr<PGconn> _connPtr;

View File

@ -253,3 +253,14 @@ int Sqlite3Connection::stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite
}
return r;
}
void Sqlite3Connection::disconnect()
{
std::promise<int> pro;
auto f = pro.get_future();
auto thisPtr = shared_from_this();
_loopThread.getLoop()->runInLoop([thisPtr, &pro]() {
thisPtr->_conn.reset();
pro.set_value(1);
});
f.get();
}

View File

@ -48,6 +48,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
virtual void disconnect() override;
private:
static std::once_flag _once;