Modify PgClientImpl and PgConnection

This commit is contained in:
antao 2018-11-16 12:58:50 +08:00
parent b077d1f308
commit 5514bfd32a
4 changed files with 54 additions and 75 deletions

View File

@ -38,12 +38,7 @@ PgConnectionPtr PgClientImpl::newConnection(trantor::EventLoop *loop)
}); });
connPtr->setOkCallback([=](const PgConnectionPtr &okConnPtr) { connPtr->setOkCallback([=](const PgConnectionPtr &okConnPtr) {
LOG_TRACE << "postgreSQL connected!"; LOG_TRACE << "postgreSQL connected!";
{ handleNewTask(okConnPtr);
std::lock_guard<std::mutex> guard(_connectionsMutex);
_readyConnections.insert(okConnPtr);
}
_condConnectionReady.notify_one();
}); });
//std::cout<<"newConn end"<<connPtr<<std::endl; //std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr; return connPtr;
@ -105,32 +100,7 @@ void PgClientImpl::execSql(const PgConnectionPtr &conn,
auto connPtr = weakConn.lock(); auto connPtr = weakConn.lock();
if (!connPtr) if (!connPtr)
return; return;
{ handleNewTask(connPtr);
std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 0)
{
auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front();
_loopPtr->queueInLoop([=]() {
std::vector<const char *> paras;
std::vector<int> lens;
for (auto &p : cmd._parameters)
{
paras.push_back(p.c_str());
lens.push_back(p.length());
}
execSql(connPtr, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb);
});
return;
}
}
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
}
_condConnectionReady.notify_one();
} }
}); });
} }
@ -243,21 +213,55 @@ std::shared_ptr<Transaction> PgClientImpl::newTransaction()
PgConnectionPtr conn; PgConnectionPtr conn;
{ {
std::unique_lock<std::mutex> lock(_connectionsMutex); std::unique_lock<std::mutex> lock(_connectionsMutex);
_transWaitNum++;
_condConnectionReady.wait(lock, [this]() { _condConnectionReady.wait(lock, [this]() {
return _readyConnections.size() > 0; return _readyConnections.size() > 0;
}); });
_transWaitNum--;
auto iter = _readyConnections.begin(); auto iter = _readyConnections.begin();
_busyConnections.insert(*iter); _busyConnections.insert(*iter);
conn = *iter; conn = *iter;
_readyConnections.erase(iter); _readyConnections.erase(iter);
} }
auto trans = std::shared_ptr<PgTransactionImpl>(new PgTransactionImpl(conn, [=]() { auto trans = std::shared_ptr<PgTransactionImpl>(new PgTransactionImpl(conn, [=]() {
if (conn->status() == ConnectStatus_Bad)
{
return;
}
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_connections.find(conn) == _connections.end() &&
_busyConnections.find(conn) == _busyConnections.find(conn))
{
//connection is broken and removed
return;
}
}
handleNewTask(conn);
}));
trans->doBegin();
return trans;
}
void PgClientImpl::handleNewTask(const PgConnectionPtr &connPtr)
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_transWaitNum > 0)
{
//Prioritize the needs of the transaction
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
_condConnectionReady.notify_one();
}
else
{
//Then check if there are some sql queries in the buffer
{ {
std::lock_guard<std::mutex> guard(_bufferMutex); std::lock_guard<std::mutex> guard(_bufferMutex);
if (_sqlCmdBuffer.size() > 0) if (_sqlCmdBuffer.size() > 0)
{ {
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto cmd = _sqlCmdBuffer.front(); auto cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front(); _sqlCmdBuffer.pop_front();
_loopPtr->queueInLoop([=]() { _loopPtr->queueInLoop([=]() {
@ -268,18 +272,14 @@ std::shared_ptr<Transaction> PgClientImpl::newTransaction()
paras.push_back(p.c_str()); paras.push_back(p.c_str());
lens.push_back(p.length()); lens.push_back(p.length());
} }
execSql(conn, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb); execSql(connPtr, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb);
}); });
return; return;
} }
} }
{ //Idle connection
std::lock_guard<std::mutex> lock(_connectionsMutex); _busyConnections.erase(connPtr);
_readyConnections.insert(conn); _readyConnections.insert(connPtr);
} }
_condConnectionReady.notify_one();
}));
trans->doBegin();
return trans;
} }

View File

@ -35,14 +35,6 @@ class PgClientImpl : public DbClient
private: private:
void ioLoop(); void ioLoop();
std::unique_ptr<trantor::EventLoop> _loopPtr; std::unique_ptr<trantor::EventLoop> _loopPtr;
enum ConnectStatus
{
ConnectStatus_None = 0,
ConnectStatus_Connecting,
ConnectStatus_Ok,
ConnectStatus_Bad
};
void execSql(const PgConnectionPtr &conn, const std::string &sql, void execSql(const PgConnectionPtr &conn, const std::string &sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, const std::vector<const char *> &parameters,
@ -59,6 +51,8 @@ class PgClientImpl : public DbClient
std::thread _loopThread; std::thread _loopThread;
std::mutex _connectionsMutex; std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady; std::condition_variable _condConnectionReady;
size_t _transWaitNum = 0;
size_t _connectNum; size_t _connectNum;
bool _stop = false; bool _stop = false;
@ -73,6 +67,8 @@ class PgClientImpl : public DbClient
}; };
std::list<SqlCmd> _sqlCmdBuffer; std::list<SqlCmd> _sqlCmdBuffer;
std::mutex _bufferMutex; std::mutex _bufferMutex;
void handleNewTask(const PgConnectionPtr &conn);
}; };
} // namespace orm } // namespace orm

View File

@ -1,5 +1,6 @@
#include "PgConnection.h" #include "PgConnection.h"
#include "PostgreSQLResultImpl.h" #include "PostgreSQLResultImpl.h"
#include <trantor/utils/Logger.h>
#include <drogon/orm/Exception.h> #include <drogon/orm/Exception.h>
#include <stdio.h> #include <stdio.h>
@ -23,9 +24,7 @@ PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo
})), })),
_loop(loop), _channel(_loop, PQsocket(_connPtr.get())) _loop(loop), _channel(_loop, PQsocket(_connPtr.get()))
{ {
//std::cout<<"sock="<<sock()<<std::endl;
_channel.setReadCallback([=]() { _channel.setReadCallback([=]() {
//std::cout<<"reading callback"<<std::endl;
if (_status != ConnectStatus_Ok) if (_status != ConnectStatus_Ok)
{ {
pgPoll(); pgPoll();
@ -36,7 +35,6 @@ PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo
} }
}); });
_channel.setWriteCallback([=]() { _channel.setWriteCallback([=]() {
//std::cout<<"writing callback"<<std::endl;
if (_status != ConnectStatus_Ok) if (_status != ConnectStatus_Ok)
{ {
pgPoll(); pgPoll();
@ -63,7 +61,7 @@ int PgConnection::sock()
} }
void PgConnection::handleClosed() void PgConnection::handleClosed()
{ {
std::cout << "handleClosed!" << this << std::endl; _status = ConnectStatus_Bad;
_loop->assertInLoopThread(); _loop->assertInLoopThread();
_channel.disableAll(); _channel.disableAll();
_channel.remove(); _channel.remove();
@ -79,20 +77,7 @@ void PgConnection::pgPoll()
switch (connStatus) switch (connStatus)
{ {
case PGRES_POLLING_FAILED: case PGRES_POLLING_FAILED:
/* fprintf(stderr, "!!!Pg connection failed: %s", LOG_ERROR << "!!!Pg connection failed: " << PQerrorMessage(_connPtr.get());
PQerrorMessage(_connPtr.get()));
if(_isWorking){
_isWorking=false;
auto r=makeResult(SqlStatus::NetworkError, nullptr,_sql);
r.setError(PQerrorMessage(_connPtr.get()));
assert(_cb);
_cb(r);
}
handleClosed();
*/
fprintf(stderr, "!!!Pg connection failed: %s",
PQerrorMessage(_connPtr.get()));
break; break;
case PGRES_POLLING_WRITING: case PGRES_POLLING_WRITING:
_channel.enableWriting(); _channel.enableWriting();
@ -115,7 +100,6 @@ void PgConnection::pgPoll()
break; break;
case PGRES_POLLING_ACTIVE: case PGRES_POLLING_ACTIVE:
//unused! //unused!
printf("active\n");
break; break;
default: default:
break; break;
@ -154,7 +138,7 @@ void PgConnection::execSql(const std::string &sql,
format.data(), format.data(),
0) == 0) 0) == 0)
{ {
fprintf(stderr, "send query error:%s\n", PQerrorMessage(_connPtr.get())); LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
// connection broken! will be handled in handleRead() // connection broken! will be handled in handleRead()
// _loop->queueInLoop([=]() { // _loop->queueInLoop([=]() {
// try // try
@ -187,8 +171,7 @@ void PgConnection::handleRead()
if (!PQconsumeInput(_connPtr.get())) if (!PQconsumeInput(_connPtr.get()))
{ {
fprintf(stderr, "Failed to consume pg input: %s\n", LOG_ERROR << "Failed to consume pg input:" << PQerrorMessage(_connPtr.get());
PQerrorMessage(_connPtr.get()));
if (_isWorking) if (_isWorking)
{ {
_isWorking = false; _isWorking = false;
@ -216,7 +199,6 @@ void PgConnection::handleRead()
if (PQisBusy(_connPtr.get())) if (PQisBusy(_connPtr.get()))
{ {
//need read more data from socket; //need read more data from socket;
printf("need read more data from socket!\n");
return; return;
} }
@ -229,7 +211,7 @@ void PgConnection::handleRead()
auto type = PQresultStatus(res.get()); auto type = PQresultStatus(res.get());
if (type == PGRES_BAD_RESPONSE || type == PGRES_FATAL_ERROR) if (type == PGRES_BAD_RESPONSE || type == PGRES_FATAL_ERROR)
{ {
fprintf(stderr, "Result error: %s", PQerrorMessage(_connPtr.get())); LOG_ERROR << "Result error: %s" << PQerrorMessage(_connPtr.get());
if (_isWorking) if (_isWorking)
{ {
{ {

View File

@ -53,6 +53,7 @@ class PgConnection : public trantor::NonCopyable, public std::enable_shared_from
} }
int sock(); int sock();
trantor::EventLoop *loop() { return _loop; } trantor::EventLoop *loop() { return _loop; }
ConnectStatus status() const { return _status; }
private: private:
std::shared_ptr<PGconn> _connPtr; std::shared_ptr<PGconn> _connPtr;