From 51814b76da40b7c595a6768b83b42ef1f891bf88 Mon Sep 17 00:00:00 2001 From: An Tao Date: Fri, 30 Apr 2021 08:00:11 +0800 Subject: [PATCH] Add the setTimeout() method to the ReidsClient class (#830) --- config.example.json | 5 +- drogon_ctl/templates/config.csp | 5 +- lib/inc/drogon/HttpAppFramework.h | 3 +- lib/src/ConfigLoader.cc | 3 +- lib/src/HttpAppFrameworkImpl.cc | 5 +- lib/src/HttpAppFrameworkImpl.h | 3 +- lib/src/RedisClientManager.h | 4 +- lib/src/RedisClientManagerSkipped.cc | 3 +- .../redis/inc/drogon/nosql/RedisClient.h | 25 +- .../redis/inc/drogon/nosql/RedisException.h | 3 +- nosql_lib/redis/src/RedisClientImpl.cc | 215 ++++++++++++++++-- nosql_lib/redis/src/RedisClientImpl.h | 25 +- nosql_lib/redis/src/RedisClientLockFree.cc | 208 +++++++++++++++-- nosql_lib/redis/src/RedisClientLockFree.h | 15 +- nosql_lib/redis/src/RedisClientManager.cc | 22 +- nosql_lib/redis/src/RedisConnection.h | 4 + nosql_lib/redis/src/RedisTransactionImpl.cc | 74 ++++-- nosql_lib/redis/src/RedisTransactionImpl.h | 5 + orm_lib/inc/drogon/orm/DbClient.h | 5 + 19 files changed, 543 insertions(+), 89 deletions(-) diff --git a/config.example.json b/config.example.json index 68558f75..bdb9caf8 100644 --- a/config.example.json +++ b/config.example.json @@ -75,7 +75,10 @@ "is_fast": false, //number_of_connections: 1 by default, if the 'is_fast' is true, the number is the number of //connections per IO thread, otherwise it is the total number of all connections. - "number_of_connections": 1 + "number_of_connections": 1, + //timeout: -1.0 by default, in seconds, the timeout for executing a command. + //zero or negative value means no timeout. + "timeout": -1.0 } ],*/ "app": { diff --git a/drogon_ctl/templates/config.csp b/drogon_ctl/templates/config.csp index 2b9d738e..86046c42 100644 --- a/drogon_ctl/templates/config.csp +++ b/drogon_ctl/templates/config.csp @@ -75,7 +75,10 @@ "is_fast": false, //number_of_connections: 1 by default, if the 'is_fast' is true, the number is the number of //connections per IO thread, otherwise it is the total number of all connections. - "number_of_connections": 1 + "number_of_connections": 1, + //timeout: -1.0 by default, in seconds, the timeout for executing a command. + //zero or negative value means no timeout. + "timeout": -1.0 } ],*/ "app": { diff --git a/lib/inc/drogon/HttpAppFramework.h b/lib/inc/drogon/HttpAppFramework.h index 2f4175fa..253c8c1d 100644 --- a/lib/inc/drogon/HttpAppFramework.h +++ b/lib/inc/drogon/HttpAppFramework.h @@ -1261,7 +1261,8 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable const std::string &name = "default", const std::string &password = "", size_t connectionNum = 1, - bool isFast = false) = 0; + bool isFast = false, + double timeout = -1.0) = 0; /// Get the DNS resolver /** diff --git a/lib/src/ConfigLoader.cc b/lib/src/ConfigLoader.cc index 12b3d5c8..269b5e74 100644 --- a/lib/src/ConfigLoader.cc +++ b/lib/src/ConfigLoader.cc @@ -549,8 +549,9 @@ static void loadRedisClients(const Json::Value &redisClients) } auto name = client.get("name", "default").asString(); auto isFast = client.get("is_fast", false).asBool(); + auto timeout = client.get("timeout", -1.0).asDouble(); drogon::app().createRedisClient( - host, port, name, password, connNum, isFast); + host, port, name, password, connNum, isFast, timeout); } } diff --git a/lib/src/HttpAppFrameworkImpl.cc b/lib/src/HttpAppFrameworkImpl.cc index 03bd232a..dd23dd27 100644 --- a/lib/src/HttpAppFrameworkImpl.cc +++ b/lib/src/HttpAppFrameworkImpl.cc @@ -983,11 +983,12 @@ HttpAppFramework &HttpAppFrameworkImpl::createRedisClient( const std::string &name, const std::string &password, size_t connectionNum, - bool isFast) + bool isFast, + double timeout) { assert(!running_); redisClientManagerPtr_->createRedisClient( - name, ip, port, password, connectionNum, isFast); + name, ip, port, password, connectionNum, isFast, timeout); return *this; } void HttpAppFrameworkImpl::quit() diff --git a/lib/src/HttpAppFrameworkImpl.h b/lib/src/HttpAppFrameworkImpl.h index 7923649a..28dfcab5 100644 --- a/lib/src/HttpAppFrameworkImpl.h +++ b/lib/src/HttpAppFrameworkImpl.h @@ -459,7 +459,8 @@ class HttpAppFrameworkImpl final : public HttpAppFramework const std::string &name, const std::string &password, size_t connectionNum, - bool isFast) override; + bool isFast, + double timeout) override; nosql::RedisClientPtr getRedisClient(const std::string &name) override; nosql::RedisClientPtr getFastRedisClient(const std::string &name) override; std::vector getListeners() const override; diff --git a/lib/src/RedisClientManager.h b/lib/src/RedisClientManager.h index ae509646..10264c6c 100644 --- a/lib/src/RedisClientManager.h +++ b/lib/src/RedisClientManager.h @@ -47,7 +47,8 @@ class RedisClientManager : public trantor::NonCopyable unsigned short port, const std::string &password, size_t connectionNum, - bool isFast); + bool isFast, + double timeout); // bool areAllRedisClientsAvailable() const noexcept; private: @@ -61,6 +62,7 @@ class RedisClientManager : public trantor::NonCopyable unsigned short port_; bool isFast_; size_t connectionNumber_; + double timeout_; }; std::vector redisInfos_; }; diff --git a/lib/src/RedisClientManagerSkipped.cc b/lib/src/RedisClientManagerSkipped.cc index 8558a42a..f1c7547e 100644 --- a/lib/src/RedisClientManagerSkipped.cc +++ b/lib/src/RedisClientManagerSkipped.cc @@ -32,7 +32,8 @@ void RedisClientManager::createRedisClient(const std::string & /*name*/, unsigned short /*port*/, const std::string & /*password*/, size_t /*connectionNum*/, - bool /*isFast*/) + bool /*isFast*/, + double /*timeout*/) { LOG_FATAL << "Redis is not supported by drogon, please install the " "hiredis library first."; diff --git a/nosql_lib/redis/inc/drogon/nosql/RedisClient.h b/nosql_lib/redis/inc/drogon/nosql/RedisClient.h index 85431163..64047420 100644 --- a/nosql_lib/redis/inc/drogon/nosql/RedisClient.h +++ b/nosql_lib/redis/inc/drogon/nosql/RedisClient.h @@ -125,17 +125,34 @@ class DROGON_EXPORT RedisClient * @brief Create a redis transaction object. * * @return std::shared_ptr + * @note An exception with kTimeout code is thrown if the operation is + * timed out. see RedisException.h */ - virtual std::shared_ptr newTransaction() = 0; + virtual std::shared_ptr newTransaction() noexcept( + false) = 0; /** * @brief Create a transaction object in asynchronous mode. * * @return std::shared_ptr + * @note An empty shared_ptr object is returned via the callback if the + * operation is timed out. */ virtual void newTransactionAsync( const std::function &)> &callback) = 0; + /** + * @brief Set the Timeout value of execution of a command. + * + * @param timeout in seconds, if the result is not returned from the + * server within the timeout, a RedisException with "Command execution + * timeout" string is generated and returned to the caller. + * @note set the timeout value to zero or negative for no limit on time. The + * default value is -1.0, this means there is no time limit if this method + * is not called. + */ + virtual void setTimeout(double timeout) = 0; + virtual ~RedisClient() = default; #ifdef __cpp_impl_coroutine /** @@ -247,9 +264,9 @@ inline void internal::RedisTransactionAwaiter::await_suspend( client_->newTransactionAsync( [this, &handle](const std::shared_ptr &transaction) { if (transaction == nullptr) - setException(std::make_exception_ptr( - RedisException(RedisErrorCode::kInternalError, - "Failed to create transaction"))); + setException(std::make_exception_ptr(RedisException( + RedisErrorCode::kTimeout, + "Timeout, no connection available for transaction"))); else setValue(transaction); handle.resume(); diff --git a/nosql_lib/redis/inc/drogon/nosql/RedisException.h b/nosql_lib/redis/inc/drogon/nosql/RedisException.h index 72b2ac51..3cfbf0ae 100644 --- a/nosql_lib/redis/inc/drogon/nosql/RedisException.h +++ b/nosql_lib/redis/inc/drogon/nosql/RedisException.h @@ -29,7 +29,8 @@ enum class RedisErrorCode kRedisError, kInternalError, kTransactionCancelled, - kBadType + kBadType, + kTimeout }; class RedisException final : public std::exception { diff --git a/nosql_lib/redis/src/RedisClientImpl.cc b/nosql_lib/redis/src/RedisClientImpl.cc index efddee33..47c3f4e3 100644 --- a/nosql_lib/redis/src/RedisClientImpl.cc +++ b/nosql_lib/redis/src/RedisClientImpl.cc @@ -14,6 +14,7 @@ #include "RedisClientImpl.h" #include "RedisTransactionImpl.h" +#include "../../lib/src/TaskTimeoutFlag.h" using namespace drogon::nosql; std::shared_ptr RedisClient::newRedisClient( const trantor::InetAddress &serverAddress, @@ -102,6 +103,17 @@ void RedisClientImpl::execCommandAsync( string_view command, ...) noexcept { + if (timeout_ > 0.0) + { + va_list args; + va_start(args, command); + execCommandAsyncWithTimeout(command, + std::move(resultCallback), + std::move(exceptionCallback), + args); + va_end(args); + return; + } RedisConnectionPtr connPtr; { std::lock_guard lock(connectionsMutex_); @@ -131,21 +143,21 @@ void RedisClientImpl::execCommandAsync( else { LOG_TRACE << "no connection available, push command to buffer"; - std::weak_ptr thisWeakPtr = shared_from_this(); va_list args; va_start(args, command); auto formattedCmd = RedisConnection::getFormattedCommand(command, args); va_end(args); std::lock_guard lock(connectionsMutex_); - tasks_.emplace([thisWeakPtr, - resultCallback = std::move(resultCallback), - exceptionCallback = std::move(exceptionCallback), - formattedCmd = std::move(formattedCmd)]( - const RedisConnectionPtr &connPtr) mutable { - connPtr->sendFormattedCommand(std::move(formattedCmd), - std::move(resultCallback), - std::move(exceptionCallback)); - }); + tasks_.emplace_back( + std::make_shared>( + [resultCallback = std::move(resultCallback), + exceptionCallback = std::move(exceptionCallback), + formattedCmd = std::move(formattedCmd)]( + const RedisConnectionPtr &connPtr) mutable { + connPtr->sendFormattedCommand(std::move(formattedCmd), + std::move(resultCallback), + std::move(exceptionCallback)); + })); } } @@ -179,16 +191,71 @@ void RedisClientImpl::newTransactionAsync( } else { - std::weak_ptr thisWeakPtr = shared_from_this(); - std::lock_guard lock(connectionsMutex_); - tasks_.emplace( - [callback, thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) { - auto thisPtr = thisWeakPtr.lock(); - if (thisPtr) - { - thisPtr->newTransactionAsync(callback); - } - }); + if (timeout_ <= 0.0) + { + std::weak_ptr thisWeakPtr = shared_from_this(); + std::lock_guard lock(connectionsMutex_); + tasks_.emplace_back( + std::make_shared< + std::function>( + [callback, + thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) { + auto thisPtr = thisWeakPtr.lock(); + if (thisPtr) + { + thisPtr->newTransactionAsync(callback); + } + })); + } + else + { + auto callbackPtr = std::make_shared< + std::function &)>>( + callback); + auto transCbPtr = std::make_shared>>(); + auto timeoutFlagPtr = std::make_shared( + loops_.getNextLoop(), + std::chrono::duration(timeout_), + [callbackPtr, transCbPtr, this]() { + auto cbPtr = (*transCbPtr).lock(); + if (cbPtr) + { + std::lock_guard lock(connectionsMutex_); + for (auto iter = tasks_.begin(); iter != tasks_.end(); + ++iter) + { + if (cbPtr == *iter) + { + tasks_.erase(iter); + break; + } + } + } + (*callbackPtr)(nullptr); + }); + std::weak_ptr thisWeakPtr = shared_from_this(); + auto bufferCbPtr = std::make_shared< + std::function>( + [callbackPtr, timeoutFlagPtr, thisWeakPtr]( + const RedisConnectionPtr & /*connPtr*/) { + auto thisPtr = thisWeakPtr.lock(); + if (thisPtr) + { + if (timeoutFlagPtr->done()) + { + return; + } + thisPtr->newTransactionAsync(*callbackPtr); + } + }); + { + std::lock_guard lock(connectionsMutex_); + tasks_.emplace_back(bufferCbPtr); + } + (*transCbPtr) = bufferCbPtr; + timeoutFlagPtr->runTimer(); + } } } @@ -217,17 +284,115 @@ std::shared_ptr RedisClientImpl::makeTransaction( void RedisClientImpl::handleNextTask(const RedisConnectionPtr &connPtr) { - std::function task; + std::shared_ptr> taskPtr; { std::lock_guard lock(connectionsMutex_); if (!tasks_.empty()) { - task = std::move(tasks_.front()); - tasks_.pop(); + taskPtr = std::move(tasks_.front()); + tasks_.pop_front(); } } - if (task) + if (taskPtr && (*taskPtr)) { - task(connPtr); + (*taskPtr)(connPtr); } } +void RedisClientImpl::execCommandAsyncWithTimeout( + string_view command, + RedisResultCallback &&resultCallback, + RedisExceptionCallback &&exceptionCallback, + va_list ap) +{ + auto expCbPtr = + std::make_shared(std::move(exceptionCallback)); + auto bufferCbPtr = std::make_shared< + std::weak_ptr>>(); + auto timeoutFlagPtr = std::make_shared( + loops_.getNextLoop(), + std::chrono::duration(timeout_), + [expCbPtr, bufferCbPtr, this]() { + auto bfCbPtr = (*bufferCbPtr).lock(); + if (bfCbPtr) + { + std::lock_guard lock(connectionsMutex_); + for (auto iter = tasks_.begin(); iter != tasks_.end(); ++iter) + { + if (bfCbPtr == *iter) + { + tasks_.erase(iter); + break; + } + } + } + if (*expCbPtr) + { + (*expCbPtr)(RedisException(RedisErrorCode::kTimeout, + "Command execution timeout")); + } + }); + auto newResultCallback = [resultCallback = std::move(resultCallback), + timeoutFlagPtr](const RedisResult &result) { + if (timeoutFlagPtr->done()) + { + return; + } + if (resultCallback) + { + resultCallback(result); + } + }; + auto newExceptionCallback = [expCbPtr, + timeoutFlagPtr](const RedisException &err) { + if (timeoutFlagPtr->done()) + { + return; + } + if (*expCbPtr) + { + (*expCbPtr)(err); + } + }; + RedisConnectionPtr connPtr; + { + std::lock_guard lock(connectionsMutex_); + if (!readyConnections_.empty()) + { + if (connectionPos_ >= readyConnections_.size()) + { + connPtr = readyConnections_[0]; + connectionPos_ = 1; + } + else + { + connPtr = readyConnections_[connectionPos_++]; + } + } + } + if (connPtr) + { + connPtr->sendvCommand(command, + std::move(newResultCallback), + std::move(newExceptionCallback), + ap); + } + else + { + LOG_TRACE << "no connection available, push command to buffer"; + auto formattedCmd = RedisConnection::getFormattedCommand(command, ap); + auto bfCbPtr = + std::make_shared>( + [resultCallback = std::move(newResultCallback), + exceptionCallback = std::move(newExceptionCallback), + formattedCmd = std::move(formattedCmd)]( + const RedisConnectionPtr &connPtr) mutable { + connPtr->sendFormattedCommand(std::move(formattedCmd), + std::move(resultCallback), + std::move(exceptionCallback)); + }); + (*bufferCbPtr) = bfCbPtr; + std::lock_guard lock(connectionsMutex_); + tasks_.emplace_back(bfCbPtr); + } + timeoutFlagPtr->runTimer(); +} \ No newline at end of file diff --git a/nosql_lib/redis/src/RedisClientImpl.h b/nosql_lib/redis/src/RedisClientImpl.h index a4f29cbe..a8d8b9d8 100644 --- a/nosql_lib/redis/src/RedisClientImpl.h +++ b/nosql_lib/redis/src/RedisClientImpl.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include namespace drogon @@ -40,18 +40,29 @@ class RedisClientImpl final string_view command, ...) noexcept override; ~RedisClientImpl() override; - RedisTransactionPtr newTransaction() override + RedisTransactionPtr newTransaction() noexcept(false) override { std::promise prom; auto f = prom.get_future(); newTransactionAsync([&prom](const RedisTransactionPtr &transPtr) { prom.set_value(transPtr); }); - return f.get(); + auto trans = f.get(); + if (!trans) + { + throw RedisException( + RedisErrorCode::kTimeout, + "Timeout, no connection available for transaction"); + } + return trans; } void newTransactionAsync( const std::function &callback) override; + void setTimeout(double timeout) override + { + timeout_ = timeout; + } private: trantor::EventLoopThreadPool loops_; @@ -63,10 +74,16 @@ class RedisClientImpl final const trantor::InetAddress serverAddr_; const std::string password_; const size_t numberOfConnections_; - std::queue> tasks_; + double timeout_{-1.0}; + std::list>> + tasks_; std::shared_ptr makeTransaction( const RedisConnectionPtr &connPtr); void handleNextTask(const RedisConnectionPtr &connPtr); + void execCommandAsyncWithTimeout(string_view command, + RedisResultCallback &&resultCallback, + RedisExceptionCallback &&exceptionCallback, + va_list ap); }; } // namespace nosql } // namespace drogon \ No newline at end of file diff --git a/nosql_lib/redis/src/RedisClientLockFree.cc b/nosql_lib/redis/src/RedisClientLockFree.cc index d88e2889..66f39fd2 100644 --- a/nosql_lib/redis/src/RedisClientLockFree.cc +++ b/nosql_lib/redis/src/RedisClientLockFree.cc @@ -14,6 +14,7 @@ #include "RedisClientLockFree.h" #include "RedisTransactionImpl.h" +#include "../../lib/src/TaskTimeoutFlag.h" using namespace drogon::nosql; RedisClientLockFree::RedisClientLockFree( @@ -85,6 +86,17 @@ void RedisClientLockFree::execCommandAsync( ...) noexcept { loop_->assertInLoopThread(); + if (timeout_ > 0.0) + { + va_list args; + va_start(args, command); + execCommandAsyncWithTimeout(command, + std::move(resultCallback), + std::move(exceptionCallback), + args); + va_end(args); + return; + } RedisConnectionPtr connPtr; { if (!readyConnections_.empty()) @@ -118,15 +130,17 @@ void RedisClientLockFree::execCommandAsync( va_start(args, command); auto formattedCmd = RedisConnection::getFormattedCommand(command, args); va_end(args); - tasks_.emplace([thisWeakPtr, - resultCallback = std::move(resultCallback), - exceptionCallback = std::move(exceptionCallback), - formattedCmd = std::move(formattedCmd)]( - const RedisConnectionPtr &connPtr) mutable { - connPtr->sendFormattedCommand(std::move(formattedCmd), - std::move(resultCallback), - std::move(exceptionCallback)); - }); + tasks_.emplace_back( + std::make_shared>( + [thisWeakPtr, + resultCallback = std::move(resultCallback), + exceptionCallback = std::move(exceptionCallback), + formattedCmd = std::move(formattedCmd)]( + const RedisConnectionPtr &connPtr) mutable { + connPtr->sendFormattedCommand(std::move(formattedCmd), + std::move(resultCallback), + std::move(exceptionCallback)); + })); } } @@ -159,15 +173,67 @@ void RedisClientLockFree::newTransactionAsync( } else { - std::weak_ptr thisWeakPtr = shared_from_this(); - tasks_.emplace( - [callback, thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) { - auto thisPtr = thisWeakPtr.lock(); - if (thisPtr) - { - thisPtr->newTransactionAsync(callback); - } - }); + if (timeout_ <= 0.0) + { + std::weak_ptr thisWeakPtr = shared_from_this(); + tasks_.emplace_back( + std::make_shared< + std::function>( + [callback, + thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) { + auto thisPtr = thisWeakPtr.lock(); + if (thisPtr) + { + thisPtr->newTransactionAsync(callback); + } + })); + } + else + { + auto callbackPtr = std::make_shared< + std::function &)>>( + callback); + auto transCbPtr = std::make_shared>>(); + auto timeoutFlagPtr = std::make_shared( + loop_, + std::chrono::duration(timeout_), + [callbackPtr, transCbPtr, this]() { + auto cbPtr = (*transCbPtr).lock(); + if (cbPtr) + { + for (auto iter = tasks_.begin(); iter != tasks_.end(); + ++iter) + { + if (cbPtr == *iter) + { + tasks_.erase(iter); + break; + } + } + } + (*callbackPtr)(nullptr); + }); + std::weak_ptr thisWeakPtr = shared_from_this(); + auto bufferCbPtr = std::make_shared< + std::function>( + [callbackPtr, timeoutFlagPtr, thisWeakPtr]( + const RedisConnectionPtr & /*connPtr*/) { + auto thisPtr = thisWeakPtr.lock(); + if (thisPtr) + { + if (timeoutFlagPtr->done()) + { + return; + } + thisPtr->newTransactionAsync(*callbackPtr); + } + }); + tasks_.emplace_back(bufferCbPtr); + + (*transCbPtr) = bufferCbPtr; + timeoutFlagPtr->runTimer(); + } } } @@ -193,16 +259,112 @@ std::shared_ptr RedisClientLockFree::makeTransaction( void RedisClientLockFree::handleNextTask(const RedisConnectionPtr &connPtr) { loop_->assertInLoopThread(); - std::function task; + std::shared_ptr> taskPtr; if (!tasks_.empty()) { - task = std::move(tasks_.front()); - tasks_.pop(); + taskPtr = std::move(tasks_.front()); + tasks_.pop_front(); } - if (task) + if (taskPtr && (*taskPtr)) { - task(connPtr); + (*taskPtr)(connPtr); } } + +void RedisClientLockFree::execCommandAsyncWithTimeout( + string_view command, + RedisResultCallback &&resultCallback, + RedisExceptionCallback &&exceptionCallback, + va_list ap) +{ + auto expCbPtr = + std::make_shared(std::move(exceptionCallback)); + auto bufferCbPtr = std::make_shared< + std::weak_ptr>>(); + auto timeoutFlagPtr = std::make_shared( + loop_, + std::chrono::duration(timeout_), + [expCbPtr, bufferCbPtr, this]() { + auto bfCbPtr = (*bufferCbPtr).lock(); + if (bfCbPtr) + { + for (auto iter = tasks_.begin(); iter != tasks_.end(); ++iter) + { + if (bfCbPtr == *iter) + { + tasks_.erase(iter); + break; + } + } + } + if (*expCbPtr) + { + (*expCbPtr)(RedisException(RedisErrorCode::kTimeout, + "Command execution timeout")); + } + }); + auto newResultCallback = [resultCallback = std::move(resultCallback), + timeoutFlagPtr](const RedisResult &result) { + if (timeoutFlagPtr->done()) + { + return; + } + if (resultCallback) + { + resultCallback(result); + } + }; + auto newExceptionCallback = [expCbPtr, + timeoutFlagPtr](const RedisException &err) { + if (timeoutFlagPtr->done()) + { + return; + } + if (*expCbPtr) + { + (*expCbPtr)(err); + } + }; + RedisConnectionPtr connPtr; + { + if (!readyConnections_.empty()) + { + if (connectionPos_ >= readyConnections_.size()) + { + connPtr = readyConnections_[0]; + connectionPos_ = 1; + } + else + { + connPtr = readyConnections_[connectionPos_++]; + } + } + } + if (connPtr) + { + connPtr->sendvCommand(command, + std::move(newResultCallback), + std::move(newExceptionCallback), + ap); + } + else + { + LOG_TRACE << "no connection available, push command to buffer"; + auto formattedCmd = RedisConnection::getFormattedCommand(command, ap); + auto bfCbPtr = + std::make_shared>( + [resultCallback = std::move(newResultCallback), + exceptionCallback = std::move(newExceptionCallback), + formattedCmd = std::move(formattedCmd)]( + const RedisConnectionPtr &connPtr) mutable { + connPtr->sendFormattedCommand(std::move(formattedCmd), + std::move(resultCallback), + std::move(exceptionCallback)); + }); + (*bufferCbPtr) = bfCbPtr; + tasks_.emplace_back(bfCbPtr); + } + timeoutFlagPtr->runTimer(); +} diff --git a/nosql_lib/redis/src/RedisClientLockFree.h b/nosql_lib/redis/src/RedisClientLockFree.h index 86b2854b..46119258 100644 --- a/nosql_lib/redis/src/RedisClientLockFree.h +++ b/nosql_lib/redis/src/RedisClientLockFree.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include namespace drogon @@ -54,6 +54,11 @@ class RedisClientLockFree final const std::function &callback) override; + void setTimeout(double timeout) override + { + timeout_ = timeout; + } + private: trantor::EventLoop *loop_; std::unordered_set connections_; @@ -63,10 +68,16 @@ class RedisClientLockFree final const trantor::InetAddress serverAddr_; const std::string password_; const size_t numberOfConnections_; - std::queue> tasks_; + std::list>> + tasks_; + double timeout_{-1.0}; std::shared_ptr makeTransaction( const RedisConnectionPtr &connPtr); void handleNextTask(const RedisConnectionPtr &connPtr); + void execCommandAsyncWithTimeout(string_view command, + RedisResultCallback &&resultCallback, + RedisExceptionCallback &&exceptionCallback, + va_list ap); }; } // namespace nosql } // namespace drogon \ No newline at end of file diff --git a/nosql_lib/redis/src/RedisClientManager.cc b/nosql_lib/redis/src/RedisClientManager.cc index 6800a094..2104b3d7 100644 --- a/nosql_lib/redis/src/RedisClientManager.cc +++ b/nosql_lib/redis/src/RedisClientManager.cc @@ -41,15 +41,23 @@ void RedisClientManager::createRedisClients( redisInfo.connectionNumber_, ioLoops[idx], redisInfo.password_); + if (redisInfo.timeout_ > 0.0) + { + c->setTimeout(redisInfo.timeout_); + } }); } else { - redisClientsMap_[redisInfo.name_] = - std::make_shared( - trantor::InetAddress(redisInfo.addr_, redisInfo.port_), - redisInfo.connectionNumber_, - redisInfo.password_); + auto clientPtr = std::make_shared( + trantor::InetAddress(redisInfo.addr_, redisInfo.port_), + redisInfo.connectionNumber_, + redisInfo.password_); + if (redisInfo.timeout_ > 0.0) + { + clientPtr->setTimeout(redisInfo.timeout_); + } + redisClientsMap_[redisInfo.name_] = std::move(clientPtr); } } } @@ -59,7 +67,8 @@ void RedisClientManager::createRedisClient(const std::string &name, unsigned short port, const std::string &password, const size_t connectionNum, - const bool isFast) + const bool isFast, + double timeout) { RedisInfo info; info.name_ = name; @@ -68,6 +77,7 @@ void RedisClientManager::createRedisClient(const std::string &name, info.password_ = password; info.connectionNumber_ = connectionNum; info.isFast_ = isFast; + info.timeout_ = timeout; redisInfos_.emplace_back(std::move(info)); } diff --git a/nosql_lib/redis/src/RedisConnection.h b/nosql_lib/redis/src/RedisConnection.h index 01bc11be..05f246dc 100644 --- a/nosql_lib/redis/src/RedisConnection.h +++ b/nosql_lib/redis/src/RedisConnection.h @@ -162,6 +162,10 @@ class RedisConnection : public trantor::NonCopyable, args); va_end(args); } + trantor::EventLoop *getLoop() const + { + return loop_; + } private: redisAsyncContext *redisContext_{nullptr}; diff --git a/nosql_lib/redis/src/RedisTransactionImpl.cc b/nosql_lib/redis/src/RedisTransactionImpl.cc index a1a2c3bb..2c8081ba 100644 --- a/nosql_lib/redis/src/RedisTransactionImpl.cc +++ b/nosql_lib/redis/src/RedisTransactionImpl.cc @@ -13,7 +13,7 @@ */ #include "RedisTransactionImpl.h" - +#include "../../lib/src/TaskTimeoutFlag.h" using namespace drogon::nosql; RedisTransactionImpl::RedisTransactionImpl(RedisConnectionPtr connPtr) noexcept @@ -46,20 +46,64 @@ void RedisTransactionImpl::execCommandAsync( "Transaction was cancelled")); return; } - va_list args; - va_start(args, command); - connPtr_->sendvCommand( - command, - std::move(resultCallback), - [thisPtr = shared_from_this(), - exceptionCallback = - std::move(exceptionCallback)](const RedisException &err) { - LOG_ERROR << err.what(); - thisPtr->isExecutedOrCancelled_ = true; - exceptionCallback(err); - }, - args); - va_end(args); + if (timeout_ <= 0.0) + { + va_list args; + va_start(args, command); + connPtr_->sendvCommand( + command, + std::move(resultCallback), + [thisPtr = shared_from_this(), + exceptionCallback = + std::move(exceptionCallback)](const RedisException &err) { + LOG_ERROR << err.what(); + thisPtr->isExecutedOrCancelled_ = true; + exceptionCallback(err); + }, + args); + va_end(args); + } + else + { + auto expCbPtr = std::make_shared( + std::move(exceptionCallback)); + auto timeoutFlagPtr = std::make_shared( + connPtr_->getLoop(), + std::chrono::duration(timeout_), + [expCbPtr]() { + if (*expCbPtr) + { + (*expCbPtr)(RedisException(RedisErrorCode::kTimeout, + "Command execution timeout")); + } + }); + va_list args; + va_start(args, command); + connPtr_->sendvCommand( + command, + [resultCallback = std::move(resultCallback), + timeoutFlagPtr](const RedisResult &result) { + if (timeoutFlagPtr->done()) + { + return; + } + resultCallback(result); + }, + [thisPtr = shared_from_this(), expCbPtr, timeoutFlagPtr]( + const RedisException &err) { + if (timeoutFlagPtr->done()) + { + return; + } + LOG_ERROR << err.what(); + thisPtr->isExecutedOrCancelled_ = true; + if (*expCbPtr) + (*expCbPtr)(err); + }, + args); + va_end(args); + timeoutFlagPtr->runTimer(); + } } void RedisTransactionImpl::doBegin() diff --git a/nosql_lib/redis/src/RedisTransactionImpl.h b/nosql_lib/redis/src/RedisTransactionImpl.h index 947c3dfd..08614f32 100644 --- a/nosql_lib/redis/src/RedisTransactionImpl.h +++ b/nosql_lib/redis/src/RedisTransactionImpl.h @@ -44,12 +44,17 @@ class RedisTransactionImpl final { callback(shared_from_this()); } + void setTimeout(double timeout) override + { + timeout_ = timeout; + } void doBegin(); ~RedisTransactionImpl() override; private: bool isExecutedOrCancelled_{false}; RedisConnectionPtr connPtr_; + double timeout_{-1.0}; }; } // namespace nosql } // namespace drogon \ No newline at end of file diff --git a/orm_lib/inc/drogon/orm/DbClient.h b/orm_lib/inc/drogon/orm/DbClient.h index c82cf059..85f036e4 100644 --- a/orm_lib/inc/drogon/orm/DbClient.h +++ b/orm_lib/inc/drogon/orm/DbClient.h @@ -236,12 +236,17 @@ class DROGON_EXPORT DbClient : public trantor::NonCopyable * automatically or manually rolled back, the callback will never be * executed. You can also use the setCommitCallback() method of a * transaction object to set the callback. + * @note A TimeoutError exception is thrown if the operation is timed out. */ virtual std::shared_ptr newTransaction( const std::function &commitCallback = std::function()) noexcept(false) = 0; /// Create a transaction object in asynchronous mode. + /** + * @note An empty shared_ptr object is returned via the callback if the + * operation is timed out. + */ virtual void newTransactionAsync( const std::function &)> &callback) = 0;