Add lock-free db client class

This commit is contained in:
antao 2019-02-18 18:42:10 +08:00
parent 8e16228458
commit 358e45598b
10 changed files with 345 additions and 8 deletions

View File

@ -185,6 +185,7 @@ class HttpAppFramework : public trantor::NonCopyable
virtual void setIdleConnectionTimeout(size_t timeout) = 0;
#if USE_ORM
virtual orm::DbClientPtr getDbClient(const std::string &name = "default") = 0;
virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") = 0;
virtual void createDbClient(const std::string &dbType,
const std::string &host,
const u_short port,

View File

@ -15,6 +15,9 @@
#include "HttpAppFrameworkImpl.h"
#include "ConfigLoader.h"
#include "HttpServer.h"
#ifdef USE_ORM
#include "../../orm_lib/src/DbClientLockFree.h"
#endif
#include <drogon/HttpTypes.h>
#include <drogon/utils/Utilities.h>
#include <drogon/DrClassMap.h>
@ -361,6 +364,10 @@ void HttpAppFrameworkImpl::run()
servers.push_back(serverPtr);
#endif
}
#if USE_ORM
// Create fast db clients for every io loop
createFastDbClient(ioLoops);
#endif
_httpCtrlsRouter.init(ioLoops);
_httpSimpleCtrlsRouter.init(ioLoops);
_websockCtrlsRouter.init();
@ -395,7 +402,25 @@ void HttpAppFrameworkImpl::run()
_responseCachingMap = std::unique_ptr<CacheMap<std::string, HttpResponsePtr>>(new CacheMap<std::string, HttpResponsePtr>(loop(), 1.0, 4, 50)); //Max timeout up to about 70 days;
loop()->loop();
}
#if USE_ORM
void HttpAppFrameworkImpl::createFastDbClient(const std::vector<trantor::EventLoop *> &ioloops)
{
for (auto &iter : _dbClientsMap)
{
for (auto *loop : ioloops)
{
if (iter.second->type() == drogon::orm::ClientType::Sqlite3)
{
_dbFastClientsMap[iter.first][loop] = iter.second;
}
if (iter.second->type() == drogon::orm::ClientType::PostgreSQL || iter.second->type() == drogon::orm::ClientType::Mysql)
{
_dbFastClientsMap[iter.first][loop] = std::shared_ptr<drogon::orm::DbClient>(new drogon::orm::DbClientLockFree(iter.second->connectionInfo(), loop, iter.second->type()));
}
}
}
}
#endif
void HttpAppFrameworkImpl::onWebsockDisconnect(const WebSocketConnectionPtr &wsConnPtr)
{
auto wsConnImplPtr = std::dynamic_pointer_cast<WebSocketConnectionImpl>(wsConnPtr);
@ -775,7 +800,10 @@ orm::DbClientPtr HttpAppFrameworkImpl::getDbClient(const std::string &name)
{
return _dbClientsMap[name];
}
orm::DbClientPtr HttpAppFrameworkImpl::getFastDbClient(const std::string &name)
{
return _dbFastClientsMap[name][trantor::EventLoop::getEventLoopOfCurrentThread()];
}
void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
const std::string &host,
const u_short port,

View File

@ -108,6 +108,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework
}
#if USE_ORM
virtual orm::DbClientPtr getDbClient(const std::string &name = "default") override;
virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") override;
virtual void createDbClient(const std::string &dbType,
const std::string &host,
const u_short port,
@ -192,7 +193,9 @@ class HttpAppFrameworkImpl : public HttpAppFramework
std::mutex _staticFilesCacheMutex;
#if USE_ORM
std::map<std::string, orm::DbClientPtr> _dbClientsMap;
std::map<std::string, std::map<trantor::EventLoop *, orm::DbClientPtr>> _dbFastClientsMap;
std::vector<std::function<void()>> _dbFuncs;
void createFastDbClient(const std::vector<trantor::EventLoop *> &ioloops);
#endif
};

View File

@ -147,6 +147,7 @@ class DbClient : public trantor::NonCopyable
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) = 0;
ClientType type() const { return _type; }
const std::string &connectionInfo() { return _connInfo; }
private:
friend internal::SqlBinder;
@ -157,9 +158,10 @@ class DbClient : public trantor::NonCopyable
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) = 0;
protected:
ClientType _type;
std::string _connInfo;
};
typedef std::shared_ptr<DbClient> DbClientPtr;

View File

@ -42,11 +42,11 @@
using namespace drogon::orm;
DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type)
: _connInfo(connInfo),
_connectNum(connNum),
: _connectNum(connNum),
_loops(type == ClientType::Sqlite3 ? 1 : (connNum < std::thread::hardware_concurrency() ? connNum : std::thread::hardware_concurrency()), "DbLoop")
{
_type = type;
_connInfo = connInfo;
LOG_TRACE << "type=" << (int)type;
//LOG_DEBUG << _loops.getLoopNum();
assert(connNum > 0);

View File

@ -45,7 +45,6 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private:
std::string _connInfo;
size_t _connectNum;
trantor::EventLoopThreadPool _loops;
std::shared_ptr<SharedMutex> _sharedMutexPtr;

View File

@ -0,0 +1,223 @@
/**
*
* DbClientLockFree.cc
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#include "DbClientLockFree.h"
#include "DbConnection.h"
#if USE_POSTGRESQL
#include "postgresql_impl/PgConnection.h"
#endif
#if USE_MYSQL
#include "mysql_impl/MysqlConnection.h"
#endif
#include "TransactionImpl.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/drogon.h>
#include <drogon/orm/Exception.h>
#include <drogon/orm/DbClient.h>
#include <sys/select.h>
#include <iostream>
#include <thread>
#include <vector>
#include <unordered_set>
#include <memory>
#include <stdio.h>
#include <unistd.h>
#include <sstream>
using namespace drogon::orm;
DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLoop *loop, ClientType type)
: _connInfo(connInfo),
_loop(loop)
{
_type = type;
LOG_TRACE << "type=" << (int)type;
if (type == ClientType::PostgreSQL)
{
newConnection();
}
else if (type == ClientType::Mysql)
{
newConnection();
}
else
{
LOG_ERROR << "No supported database type!";
}
}
DbClientLockFree::~DbClientLockFree() noexcept
{
if (_connection)
{
_connection->disconnect();
}
}
void DbClientLockFree::execSql(const DbConnectionPtr &conn,
std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
assert(conn);
std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format),
std::move(rcb), std::move(exceptCallback),
[=]() -> void {
{
auto connPtr = weakConn.lock();
if (!connPtr)
return;
handleNewTask();
}
});
}
void DbClientLockFree::execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
_loop->assertInLoopThread();
if (!_connection)
{
try
{
throw BrokenConnection("No connection to database server");
}
catch (...)
{
exceptCallback(std::current_exception());
}
return;
}
else
{
if (!_connection->isWorking())
{
execSql(_connection, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback));
return;
}
}
if (_sqlCmdBuffer.size() > 20000)
{
//too many queries in buffer;
try
{
throw Failure("Too many queries in buffer");
}
catch (...)
{
exceptCallback(std::current_exception());
}
return;
}
//LOG_TRACE << "Push query to buffer";
std::shared_ptr<SqlCmd> cmd = std::make_shared<SqlCmd>();
cmd->_sql = std::move(sql);
cmd->_paraNum = paraNum;
cmd->_parameters = std::move(parameters);
cmd->_length = std::move(length);
cmd->_format = std::move(format);
cmd->_cb = std::move(rcb);
cmd->_exceptCb = std::move(exceptCallback);
_sqlCmdBuffer.push_back(std::move(cmd));
}
std::shared_ptr<Transaction> DbClientLockFree::newTransaction(const std::function<void(bool)> &commitCallback)
{
// Don't support transaction;
assert(0);
return nullptr;
}
void DbClientLockFree::handleNewTask()
{
assert(_connection);
assert(!_connection->isWorking());
if (!_sqlCmdBuffer.empty())
{
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
execSql(_connection, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
return;
}
}
DbConnectionPtr DbClientLockFree::newConnection()
{
DbConnectionPtr connPtr;
if (_type == ClientType::PostgreSQL)
{
#if USE_POSTGRESQL
connPtr = std::make_shared<PgConnection>(_loop, _connInfo);
#else
return nullptr;
#endif
}
else if (_type == ClientType::Mysql)
{
#if USE_MYSQL
connPtr = std::make_shared<MysqlConnection>(_loop, _connInfo);
#else
return nullptr;
#endif
}
else
{
return nullptr;
}
std::weak_ptr<DbClientLockFree> weakPtr = shared_from_this();
connPtr->setCloseCallback([weakPtr](const DbConnectionPtr &closeConnPtr) {
//Erase the connection
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
assert(thisPtr->_connection);
thisPtr->_connection.reset();
//Reconnect after 1 second
thisPtr->_loop->runAfter(1, [weakPtr] {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->newConnection();
});
});
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {
LOG_TRACE << "connected!";
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_connection = okConnPtr;
thisPtr->handleNewTask();
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}

View File

@ -0,0 +1,80 @@
/**
*
* DbClientLockFree.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#pragma once
#include "DbConnection.h"
#include <drogon/HttpTypes.h>
#include <drogon/orm/DbClient.h>
#include <trantor/net/EventLoopThreadPool.h>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <unordered_set>
#include <list>
namespace drogon
{
namespace orm
{
class DbClientLockFree : public DbClient, public std::enable_shared_from_this<DbClientLockFree>
{
public:
DbClientLockFree(const std::string &connInfo, trantor::EventLoop *loop, ClientType type);
virtual ~DbClientLockFree() noexcept;
virtual void execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private:
std::string _connInfo;
trantor::EventLoop *_loop;
void execSql(const DbConnectionPtr &conn,
std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
DbConnectionPtr newConnection();
DbConnectionPtr _connection;
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<const char *> _parameters;
std::vector<int> _length;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
};
std::deque<std::shared_ptr<SqlCmd>> _sqlCmdBuffer;
void handleNewTask();
};
} // namespace orm
} // namespace drogon

View File

@ -74,6 +74,7 @@ class DbConnection : public trantor::NonCopyable
ConnectStatus status() const { return _status; }
trantor::EventLoop *loop() { return _loop; }
virtual void disconnect() = 0;
bool isWorking() { return _isWorking; }
protected:
QueryCallback _cb;

View File

@ -275,9 +275,9 @@ void PgConnection::handleRead()
_cb = decltype(_cb)();
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
//auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
//(*idle)();
}
}
handleClosed();