diff --git a/orm_lib/src/postgresql_impl/PgClientImpl.cc b/orm_lib/src/postgresql_impl/PgClientImpl.cc index 87a04e12..491d3f2d 100644 --- a/orm_lib/src/postgresql_impl/PgClientImpl.cc +++ b/orm_lib/src/postgresql_impl/PgClientImpl.cc @@ -38,12 +38,7 @@ PgConnectionPtr PgClientImpl::newConnection(trantor::EventLoop *loop) }); connPtr->setOkCallback([=](const PgConnectionPtr &okConnPtr) { LOG_TRACE << "postgreSQL connected!"; - { - std::lock_guard guard(_connectionsMutex); - _readyConnections.insert(okConnPtr); - } - - _condConnectionReady.notify_one(); + handleNewTask(okConnPtr); }); //std::cout<<"newConn end"< guard(_bufferMutex); - if (_sqlCmdBuffer.size() > 0) - { - auto cmd = _sqlCmdBuffer.front(); - _sqlCmdBuffer.pop_front(); - _loopPtr->queueInLoop([=]() { - std::vector paras; - std::vector lens; - for (auto &p : cmd._parameters) - { - paras.push_back(p.c_str()); - lens.push_back(p.length()); - } - execSql(connPtr, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb); - }); - - return; - } - } - { - std::lock_guard guard(_connectionsMutex); - _busyConnections.erase(connPtr); - _readyConnections.insert(connPtr); - } - _condConnectionReady.notify_one(); + handleNewTask(connPtr); } }); } @@ -243,21 +213,55 @@ std::shared_ptr PgClientImpl::newTransaction() PgConnectionPtr conn; { std::unique_lock lock(_connectionsMutex); - + _transWaitNum++; _condConnectionReady.wait(lock, [this]() { return _readyConnections.size() > 0; }); - + _transWaitNum--; auto iter = _readyConnections.begin(); _busyConnections.insert(*iter); conn = *iter; _readyConnections.erase(iter); } auto trans = std::shared_ptr(new PgTransactionImpl(conn, [=]() { + if (conn->status() == ConnectStatus_Bad) + { + return; + } + { + std::lock_guard guard(_connectionsMutex); + + if (_connections.find(conn) == _connections.end() && + _busyConnections.find(conn) == _busyConnections.find(conn)) + { + //connection is broken and removed + return; + } + } + handleNewTask(conn); + })); + trans->doBegin(); + return trans; +} + +void PgClientImpl::handleNewTask(const PgConnectionPtr &connPtr) +{ + std::lock_guard guard(_connectionsMutex); + if (_transWaitNum > 0) + { + //Prioritize the needs of the transaction + _busyConnections.erase(connPtr); + _readyConnections.insert(connPtr); + _condConnectionReady.notify_one(); + } + else + { + //Then check if there are some sql queries in the buffer { std::lock_guard guard(_bufferMutex); if (_sqlCmdBuffer.size() > 0) { + _busyConnections.insert(connPtr); //For new connections, this sentence is necessary auto cmd = _sqlCmdBuffer.front(); _sqlCmdBuffer.pop_front(); _loopPtr->queueInLoop([=]() { @@ -268,18 +272,14 @@ std::shared_ptr PgClientImpl::newTransaction() paras.push_back(p.c_str()); lens.push_back(p.length()); } - execSql(conn, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb); + execSql(connPtr, cmd._sql, cmd._paraNum, paras, lens, cmd._format, cmd._cb, cmd._exceptCb); }); return; } } - { - std::lock_guard lock(_connectionsMutex); - _readyConnections.insert(conn); - } - _condConnectionReady.notify_one(); - })); - trans->doBegin(); - return trans; + //Idle connection + _busyConnections.erase(connPtr); + _readyConnections.insert(connPtr); + } } diff --git a/orm_lib/src/postgresql_impl/PgClientImpl.h b/orm_lib/src/postgresql_impl/PgClientImpl.h index 2cd23e65..85cc2024 100644 --- a/orm_lib/src/postgresql_impl/PgClientImpl.h +++ b/orm_lib/src/postgresql_impl/PgClientImpl.h @@ -35,14 +35,6 @@ class PgClientImpl : public DbClient private: void ioLoop(); std::unique_ptr _loopPtr; - enum ConnectStatus - { - ConnectStatus_None = 0, - ConnectStatus_Connecting, - ConnectStatus_Ok, - ConnectStatus_Bad - }; - void execSql(const PgConnectionPtr &conn, const std::string &sql, size_t paraNum, const std::vector ¶meters, @@ -59,6 +51,8 @@ class PgClientImpl : public DbClient std::thread _loopThread; std::mutex _connectionsMutex; std::condition_variable _condConnectionReady; + size_t _transWaitNum = 0; + size_t _connectNum; bool _stop = false; @@ -73,6 +67,8 @@ class PgClientImpl : public DbClient }; std::list _sqlCmdBuffer; std::mutex _bufferMutex; + + void handleNewTask(const PgConnectionPtr &conn); }; } // namespace orm diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 67d83c9c..ecaab348 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -1,5 +1,6 @@ #include "PgConnection.h" #include "PostgreSQLResultImpl.h" +#include #include #include @@ -23,9 +24,7 @@ PgConnection::PgConnection(trantor::EventLoop *loop, const std::string &connInfo })), _loop(loop), _channel(_loop, PQsocket(_connPtr.get())) { - //std::cout<<"sock="<assertInLoopThread(); _channel.disableAll(); _channel.remove(); @@ -79,20 +77,7 @@ void PgConnection::pgPoll() switch (connStatus) { case PGRES_POLLING_FAILED: - /* fprintf(stderr, "!!!Pg connection failed: %s", - PQerrorMessage(_connPtr.get())); - if(_isWorking){ - _isWorking=false; - auto r=makeResult(SqlStatus::NetworkError, nullptr,_sql); - r.setError(PQerrorMessage(_connPtr.get())); - assert(_cb); - _cb(r); - } - handleClosed(); -*/ - fprintf(stderr, "!!!Pg connection failed: %s", - PQerrorMessage(_connPtr.get())); - + LOG_ERROR << "!!!Pg connection failed: " << PQerrorMessage(_connPtr.get()); break; case PGRES_POLLING_WRITING: _channel.enableWriting(); @@ -115,7 +100,6 @@ void PgConnection::pgPoll() break; case PGRES_POLLING_ACTIVE: //unused! - printf("active\n"); break; default: break; @@ -154,7 +138,7 @@ void PgConnection::execSql(const std::string &sql, format.data(), 0) == 0) { - fprintf(stderr, "send query error:%s\n", PQerrorMessage(_connPtr.get())); + LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get()); // connection broken! will be handled in handleRead() // _loop->queueInLoop([=]() { // try @@ -187,8 +171,7 @@ void PgConnection::handleRead() if (!PQconsumeInput(_connPtr.get())) { - fprintf(stderr, "Failed to consume pg input: %s\n", - PQerrorMessage(_connPtr.get())); + LOG_ERROR << "Failed to consume pg input:" << PQerrorMessage(_connPtr.get()); if (_isWorking) { _isWorking = false; @@ -216,7 +199,6 @@ void PgConnection::handleRead() if (PQisBusy(_connPtr.get())) { //need read more data from socket; - printf("need read more data from socket!\n"); return; } @@ -229,7 +211,7 @@ void PgConnection::handleRead() auto type = PQresultStatus(res.get()); if (type == PGRES_BAD_RESPONSE || type == PGRES_FATAL_ERROR) { - fprintf(stderr, "Result error: %s", PQerrorMessage(_connPtr.get())); + LOG_ERROR << "Result error: %s" << PQerrorMessage(_connPtr.get()); if (_isWorking) { { diff --git a/orm_lib/src/postgresql_impl/PgConnection.h b/orm_lib/src/postgresql_impl/PgConnection.h index 5b62ab6a..7900f26d 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.h +++ b/orm_lib/src/postgresql_impl/PgConnection.h @@ -53,6 +53,7 @@ class PgConnection : public trantor::NonCopyable, public std::enable_shared_from } int sock(); trantor::EventLoop *loop() { return _loop; } + ConnectStatus status() const { return _status; } private: std::shared_ptr _connPtr;