Add the setTimeout() method to the ReidsClient class (#830)

This commit is contained in:
An Tao 2021-04-30 08:00:11 +08:00 committed by GitHub
parent 60c877f920
commit 51814b76da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 543 additions and 89 deletions

View File

@ -75,7 +75,10 @@
"is_fast": false,
//number_of_connections: 1 by default, if the 'is_fast' is true, the number is the number of
//connections per IO thread, otherwise it is the total number of all connections.
"number_of_connections": 1
"number_of_connections": 1,
//timeout: -1.0 by default, in seconds, the timeout for executing a command.
//zero or negative value means no timeout.
"timeout": -1.0
}
],*/
"app": {

View File

@ -75,7 +75,10 @@
"is_fast": false,
//number_of_connections: 1 by default, if the 'is_fast' is true, the number is the number of
//connections per IO thread, otherwise it is the total number of all connections.
"number_of_connections": 1
"number_of_connections": 1,
//timeout: -1.0 by default, in seconds, the timeout for executing a command.
//zero or negative value means no timeout.
"timeout": -1.0
}
],*/
"app": {

View File

@ -1261,7 +1261,8 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
const std::string &name = "default",
const std::string &password = "",
size_t connectionNum = 1,
bool isFast = false) = 0;
bool isFast = false,
double timeout = -1.0) = 0;
/// Get the DNS resolver
/**

View File

@ -549,8 +549,9 @@ static void loadRedisClients(const Json::Value &redisClients)
}
auto name = client.get("name", "default").asString();
auto isFast = client.get("is_fast", false).asBool();
auto timeout = client.get("timeout", -1.0).asDouble();
drogon::app().createRedisClient(
host, port, name, password, connNum, isFast);
host, port, name, password, connNum, isFast, timeout);
}
}

View File

@ -983,11 +983,12 @@ HttpAppFramework &HttpAppFrameworkImpl::createRedisClient(
const std::string &name,
const std::string &password,
size_t connectionNum,
bool isFast)
bool isFast,
double timeout)
{
assert(!running_);
redisClientManagerPtr_->createRedisClient(
name, ip, port, password, connectionNum, isFast);
name, ip, port, password, connectionNum, isFast, timeout);
return *this;
}
void HttpAppFrameworkImpl::quit()

View File

@ -459,7 +459,8 @@ class HttpAppFrameworkImpl final : public HttpAppFramework
const std::string &name,
const std::string &password,
size_t connectionNum,
bool isFast) override;
bool isFast,
double timeout) override;
nosql::RedisClientPtr getRedisClient(const std::string &name) override;
nosql::RedisClientPtr getFastRedisClient(const std::string &name) override;
std::vector<trantor::InetAddress> getListeners() const override;

View File

@ -47,7 +47,8 @@ class RedisClientManager : public trantor::NonCopyable
unsigned short port,
const std::string &password,
size_t connectionNum,
bool isFast);
bool isFast,
double timeout);
// bool areAllRedisClientsAvailable() const noexcept;
private:
@ -61,6 +62,7 @@ class RedisClientManager : public trantor::NonCopyable
unsigned short port_;
bool isFast_;
size_t connectionNumber_;
double timeout_;
};
std::vector<RedisInfo> redisInfos_;
};

View File

@ -32,7 +32,8 @@ void RedisClientManager::createRedisClient(const std::string & /*name*/,
unsigned short /*port*/,
const std::string & /*password*/,
size_t /*connectionNum*/,
bool /*isFast*/)
bool /*isFast*/,
double /*timeout*/)
{
LOG_FATAL << "Redis is not supported by drogon, please install the "
"hiredis library first.";

View File

@ -125,17 +125,34 @@ class DROGON_EXPORT RedisClient
* @brief Create a redis transaction object.
*
* @return std::shared_ptr<RedisTransaction>
* @note An exception with kTimeout code is thrown if the operation is
* timed out. see RedisException.h
*/
virtual std::shared_ptr<RedisTransaction> newTransaction() = 0;
virtual std::shared_ptr<RedisTransaction> newTransaction() noexcept(
false) = 0;
/**
* @brief Create a transaction object in asynchronous mode.
*
* @return std::shared_ptr<RedisTransaction>
* @note An empty shared_ptr object is returned via the callback if the
* operation is timed out.
*/
virtual void newTransactionAsync(
const std::function<void(const std::shared_ptr<RedisTransaction> &)>
&callback) = 0;
/**
* @brief Set the Timeout value of execution of a command.
*
* @param timeout in seconds, if the result is not returned from the
* server within the timeout, a RedisException with "Command execution
* timeout" string is generated and returned to the caller.
* @note set the timeout value to zero or negative for no limit on time. The
* default value is -1.0, this means there is no time limit if this method
* is not called.
*/
virtual void setTimeout(double timeout) = 0;
virtual ~RedisClient() = default;
#ifdef __cpp_impl_coroutine
/**
@ -247,9 +264,9 @@ inline void internal::RedisTransactionAwaiter::await_suspend(
client_->newTransactionAsync(
[this, &handle](const std::shared_ptr<RedisTransaction> &transaction) {
if (transaction == nullptr)
setException(std::make_exception_ptr(
RedisException(RedisErrorCode::kInternalError,
"Failed to create transaction")));
setException(std::make_exception_ptr(RedisException(
RedisErrorCode::kTimeout,
"Timeout, no connection available for transaction")));
else
setValue(transaction);
handle.resume();

View File

@ -29,7 +29,8 @@ enum class RedisErrorCode
kRedisError,
kInternalError,
kTransactionCancelled,
kBadType
kBadType,
kTimeout
};
class RedisException final : public std::exception
{

View File

@ -14,6 +14,7 @@
#include "RedisClientImpl.h"
#include "RedisTransactionImpl.h"
#include "../../lib/src/TaskTimeoutFlag.h"
using namespace drogon::nosql;
std::shared_ptr<RedisClient> RedisClient::newRedisClient(
const trantor::InetAddress &serverAddress,
@ -102,6 +103,17 @@ void RedisClientImpl::execCommandAsync(
string_view command,
...) noexcept
{
if (timeout_ > 0.0)
{
va_list args;
va_start(args, command);
execCommandAsyncWithTimeout(command,
std::move(resultCallback),
std::move(exceptionCallback),
args);
va_end(args);
return;
}
RedisConnectionPtr connPtr;
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
@ -131,21 +143,21 @@ void RedisClientImpl::execCommandAsync(
else
{
LOG_TRACE << "no connection available, push command to buffer";
std::weak_ptr<RedisClientImpl> thisWeakPtr = shared_from_this();
va_list args;
va_start(args, command);
auto formattedCmd = RedisConnection::getFormattedCommand(command, args);
va_end(args);
std::lock_guard<std::mutex> lock(connectionsMutex_);
tasks_.emplace([thisWeakPtr,
resultCallback = std::move(resultCallback),
exceptionCallback = std::move(exceptionCallback),
formattedCmd = std::move(formattedCmd)](
const RedisConnectionPtr &connPtr) mutable {
connPtr->sendFormattedCommand(std::move(formattedCmd),
std::move(resultCallback),
std::move(exceptionCallback));
});
tasks_.emplace_back(
std::make_shared<std::function<void(const RedisConnectionPtr &)>>(
[resultCallback = std::move(resultCallback),
exceptionCallback = std::move(exceptionCallback),
formattedCmd = std::move(formattedCmd)](
const RedisConnectionPtr &connPtr) mutable {
connPtr->sendFormattedCommand(std::move(formattedCmd),
std::move(resultCallback),
std::move(exceptionCallback));
}));
}
}
@ -179,16 +191,71 @@ void RedisClientImpl::newTransactionAsync(
}
else
{
std::weak_ptr<RedisClientImpl> thisWeakPtr = shared_from_this();
std::lock_guard<std::mutex> lock(connectionsMutex_);
tasks_.emplace(
[callback, thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) {
auto thisPtr = thisWeakPtr.lock();
if (thisPtr)
{
thisPtr->newTransactionAsync(callback);
}
});
if (timeout_ <= 0.0)
{
std::weak_ptr<RedisClientImpl> thisWeakPtr = shared_from_this();
std::lock_guard<std::mutex> lock(connectionsMutex_);
tasks_.emplace_back(
std::make_shared<
std::function<void(const RedisConnectionPtr &)>>(
[callback,
thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) {
auto thisPtr = thisWeakPtr.lock();
if (thisPtr)
{
thisPtr->newTransactionAsync(callback);
}
}));
}
else
{
auto callbackPtr = std::make_shared<
std::function<void(const std::shared_ptr<RedisTransaction> &)>>(
callback);
auto transCbPtr = std::make_shared<std::weak_ptr<
std::function<void(const RedisConnectionPtr &)>>>();
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
loops_.getNextLoop(),
std::chrono::duration<double>(timeout_),
[callbackPtr, transCbPtr, this]() {
auto cbPtr = (*transCbPtr).lock();
if (cbPtr)
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
for (auto iter = tasks_.begin(); iter != tasks_.end();
++iter)
{
if (cbPtr == *iter)
{
tasks_.erase(iter);
break;
}
}
}
(*callbackPtr)(nullptr);
});
std::weak_ptr<RedisClientImpl> thisWeakPtr = shared_from_this();
auto bufferCbPtr = std::make_shared<
std::function<void(const RedisConnectionPtr &)>>(
[callbackPtr, timeoutFlagPtr, thisWeakPtr](
const RedisConnectionPtr & /*connPtr*/) {
auto thisPtr = thisWeakPtr.lock();
if (thisPtr)
{
if (timeoutFlagPtr->done())
{
return;
}
thisPtr->newTransactionAsync(*callbackPtr);
}
});
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
tasks_.emplace_back(bufferCbPtr);
}
(*transCbPtr) = bufferCbPtr;
timeoutFlagPtr->runTimer();
}
}
}
@ -217,17 +284,115 @@ std::shared_ptr<RedisTransaction> RedisClientImpl::makeTransaction(
void RedisClientImpl::handleNextTask(const RedisConnectionPtr &connPtr)
{
std::function<void(const RedisConnectionPtr &)> task;
std::shared_ptr<std::function<void(const RedisConnectionPtr &)>> taskPtr;
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
if (!tasks_.empty())
{
task = std::move(tasks_.front());
tasks_.pop();
taskPtr = std::move(tasks_.front());
tasks_.pop_front();
}
}
if (task)
if (taskPtr && (*taskPtr))
{
task(connPtr);
(*taskPtr)(connPtr);
}
}
void RedisClientImpl::execCommandAsyncWithTimeout(
string_view command,
RedisResultCallback &&resultCallback,
RedisExceptionCallback &&exceptionCallback,
va_list ap)
{
auto expCbPtr =
std::make_shared<RedisExceptionCallback>(std::move(exceptionCallback));
auto bufferCbPtr = std::make_shared<
std::weak_ptr<std::function<void(const RedisConnectionPtr &)>>>();
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
loops_.getNextLoop(),
std::chrono::duration<double>(timeout_),
[expCbPtr, bufferCbPtr, this]() {
auto bfCbPtr = (*bufferCbPtr).lock();
if (bfCbPtr)
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
for (auto iter = tasks_.begin(); iter != tasks_.end(); ++iter)
{
if (bfCbPtr == *iter)
{
tasks_.erase(iter);
break;
}
}
}
if (*expCbPtr)
{
(*expCbPtr)(RedisException(RedisErrorCode::kTimeout,
"Command execution timeout"));
}
});
auto newResultCallback = [resultCallback = std::move(resultCallback),
timeoutFlagPtr](const RedisResult &result) {
if (timeoutFlagPtr->done())
{
return;
}
if (resultCallback)
{
resultCallback(result);
}
};
auto newExceptionCallback = [expCbPtr,
timeoutFlagPtr](const RedisException &err) {
if (timeoutFlagPtr->done())
{
return;
}
if (*expCbPtr)
{
(*expCbPtr)(err);
}
};
RedisConnectionPtr connPtr;
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
if (!readyConnections_.empty())
{
if (connectionPos_ >= readyConnections_.size())
{
connPtr = readyConnections_[0];
connectionPos_ = 1;
}
else
{
connPtr = readyConnections_[connectionPos_++];
}
}
}
if (connPtr)
{
connPtr->sendvCommand(command,
std::move(newResultCallback),
std::move(newExceptionCallback),
ap);
}
else
{
LOG_TRACE << "no connection available, push command to buffer";
auto formattedCmd = RedisConnection::getFormattedCommand(command, ap);
auto bfCbPtr =
std::make_shared<std::function<void(const RedisConnectionPtr &)>>(
[resultCallback = std::move(newResultCallback),
exceptionCallback = std::move(newExceptionCallback),
formattedCmd = std::move(formattedCmd)](
const RedisConnectionPtr &connPtr) mutable {
connPtr->sendFormattedCommand(std::move(formattedCmd),
std::move(resultCallback),
std::move(exceptionCallback));
});
(*bufferCbPtr) = bfCbPtr;
std::lock_guard<std::mutex> lock(connectionsMutex_);
tasks_.emplace_back(bfCbPtr);
}
timeoutFlagPtr->runTimer();
}

View File

@ -19,7 +19,7 @@
#include <trantor/net/EventLoopThreadPool.h>
#include <vector>
#include <unordered_set>
#include <queue>
#include <list>
#include <future>
namespace drogon
@ -40,18 +40,29 @@ class RedisClientImpl final
string_view command,
...) noexcept override;
~RedisClientImpl() override;
RedisTransactionPtr newTransaction() override
RedisTransactionPtr newTransaction() noexcept(false) override
{
std::promise<RedisTransactionPtr> prom;
auto f = prom.get_future();
newTransactionAsync([&prom](const RedisTransactionPtr &transPtr) {
prom.set_value(transPtr);
});
return f.get();
auto trans = f.get();
if (!trans)
{
throw RedisException(
RedisErrorCode::kTimeout,
"Timeout, no connection available for transaction");
}
return trans;
}
void newTransactionAsync(
const std::function<void(const RedisTransactionPtr &)> &callback)
override;
void setTimeout(double timeout) override
{
timeout_ = timeout;
}
private:
trantor::EventLoopThreadPool loops_;
@ -63,10 +74,16 @@ class RedisClientImpl final
const trantor::InetAddress serverAddr_;
const std::string password_;
const size_t numberOfConnections_;
std::queue<std::function<void(const RedisConnectionPtr &)>> tasks_;
double timeout_{-1.0};
std::list<std::shared_ptr<std::function<void(const RedisConnectionPtr &)>>>
tasks_;
std::shared_ptr<RedisTransaction> makeTransaction(
const RedisConnectionPtr &connPtr);
void handleNextTask(const RedisConnectionPtr &connPtr);
void execCommandAsyncWithTimeout(string_view command,
RedisResultCallback &&resultCallback,
RedisExceptionCallback &&exceptionCallback,
va_list ap);
};
} // namespace nosql
} // namespace drogon

View File

@ -14,6 +14,7 @@
#include "RedisClientLockFree.h"
#include "RedisTransactionImpl.h"
#include "../../lib/src/TaskTimeoutFlag.h"
using namespace drogon::nosql;
RedisClientLockFree::RedisClientLockFree(
@ -85,6 +86,17 @@ void RedisClientLockFree::execCommandAsync(
...) noexcept
{
loop_->assertInLoopThread();
if (timeout_ > 0.0)
{
va_list args;
va_start(args, command);
execCommandAsyncWithTimeout(command,
std::move(resultCallback),
std::move(exceptionCallback),
args);
va_end(args);
return;
}
RedisConnectionPtr connPtr;
{
if (!readyConnections_.empty())
@ -118,15 +130,17 @@ void RedisClientLockFree::execCommandAsync(
va_start(args, command);
auto formattedCmd = RedisConnection::getFormattedCommand(command, args);
va_end(args);
tasks_.emplace([thisWeakPtr,
resultCallback = std::move(resultCallback),
exceptionCallback = std::move(exceptionCallback),
formattedCmd = std::move(formattedCmd)](
const RedisConnectionPtr &connPtr) mutable {
connPtr->sendFormattedCommand(std::move(formattedCmd),
std::move(resultCallback),
std::move(exceptionCallback));
});
tasks_.emplace_back(
std::make_shared<std::function<void(const RedisConnectionPtr &)>>(
[thisWeakPtr,
resultCallback = std::move(resultCallback),
exceptionCallback = std::move(exceptionCallback),
formattedCmd = std::move(formattedCmd)](
const RedisConnectionPtr &connPtr) mutable {
connPtr->sendFormattedCommand(std::move(formattedCmd),
std::move(resultCallback),
std::move(exceptionCallback));
}));
}
}
@ -159,15 +173,67 @@ void RedisClientLockFree::newTransactionAsync(
}
else
{
std::weak_ptr<RedisClientLockFree> thisWeakPtr = shared_from_this();
tasks_.emplace(
[callback, thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) {
auto thisPtr = thisWeakPtr.lock();
if (thisPtr)
{
thisPtr->newTransactionAsync(callback);
}
});
if (timeout_ <= 0.0)
{
std::weak_ptr<RedisClientLockFree> thisWeakPtr = shared_from_this();
tasks_.emplace_back(
std::make_shared<
std::function<void(const RedisConnectionPtr &)>>(
[callback,
thisWeakPtr](const RedisConnectionPtr & /*connPtr*/) {
auto thisPtr = thisWeakPtr.lock();
if (thisPtr)
{
thisPtr->newTransactionAsync(callback);
}
}));
}
else
{
auto callbackPtr = std::make_shared<
std::function<void(const std::shared_ptr<RedisTransaction> &)>>(
callback);
auto transCbPtr = std::make_shared<std::weak_ptr<
std::function<void(const RedisConnectionPtr &)>>>();
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
loop_,
std::chrono::duration<double>(timeout_),
[callbackPtr, transCbPtr, this]() {
auto cbPtr = (*transCbPtr).lock();
if (cbPtr)
{
for (auto iter = tasks_.begin(); iter != tasks_.end();
++iter)
{
if (cbPtr == *iter)
{
tasks_.erase(iter);
break;
}
}
}
(*callbackPtr)(nullptr);
});
std::weak_ptr<RedisClientLockFree> thisWeakPtr = shared_from_this();
auto bufferCbPtr = std::make_shared<
std::function<void(const RedisConnectionPtr &)>>(
[callbackPtr, timeoutFlagPtr, thisWeakPtr](
const RedisConnectionPtr & /*connPtr*/) {
auto thisPtr = thisWeakPtr.lock();
if (thisPtr)
{
if (timeoutFlagPtr->done())
{
return;
}
thisPtr->newTransactionAsync(*callbackPtr);
}
});
tasks_.emplace_back(bufferCbPtr);
(*transCbPtr) = bufferCbPtr;
timeoutFlagPtr->runTimer();
}
}
}
@ -193,16 +259,112 @@ std::shared_ptr<RedisTransaction> RedisClientLockFree::makeTransaction(
void RedisClientLockFree::handleNextTask(const RedisConnectionPtr &connPtr)
{
loop_->assertInLoopThread();
std::function<void(const RedisConnectionPtr &)> task;
std::shared_ptr<std::function<void(const RedisConnectionPtr &)>> taskPtr;
if (!tasks_.empty())
{
task = std::move(tasks_.front());
tasks_.pop();
taskPtr = std::move(tasks_.front());
tasks_.pop_front();
}
if (task)
if (taskPtr && (*taskPtr))
{
task(connPtr);
(*taskPtr)(connPtr);
}
}
void RedisClientLockFree::execCommandAsyncWithTimeout(
string_view command,
RedisResultCallback &&resultCallback,
RedisExceptionCallback &&exceptionCallback,
va_list ap)
{
auto expCbPtr =
std::make_shared<RedisExceptionCallback>(std::move(exceptionCallback));
auto bufferCbPtr = std::make_shared<
std::weak_ptr<std::function<void(const RedisConnectionPtr &)>>>();
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
loop_,
std::chrono::duration<double>(timeout_),
[expCbPtr, bufferCbPtr, this]() {
auto bfCbPtr = (*bufferCbPtr).lock();
if (bfCbPtr)
{
for (auto iter = tasks_.begin(); iter != tasks_.end(); ++iter)
{
if (bfCbPtr == *iter)
{
tasks_.erase(iter);
break;
}
}
}
if (*expCbPtr)
{
(*expCbPtr)(RedisException(RedisErrorCode::kTimeout,
"Command execution timeout"));
}
});
auto newResultCallback = [resultCallback = std::move(resultCallback),
timeoutFlagPtr](const RedisResult &result) {
if (timeoutFlagPtr->done())
{
return;
}
if (resultCallback)
{
resultCallback(result);
}
};
auto newExceptionCallback = [expCbPtr,
timeoutFlagPtr](const RedisException &err) {
if (timeoutFlagPtr->done())
{
return;
}
if (*expCbPtr)
{
(*expCbPtr)(err);
}
};
RedisConnectionPtr connPtr;
{
if (!readyConnections_.empty())
{
if (connectionPos_ >= readyConnections_.size())
{
connPtr = readyConnections_[0];
connectionPos_ = 1;
}
else
{
connPtr = readyConnections_[connectionPos_++];
}
}
}
if (connPtr)
{
connPtr->sendvCommand(command,
std::move(newResultCallback),
std::move(newExceptionCallback),
ap);
}
else
{
LOG_TRACE << "no connection available, push command to buffer";
auto formattedCmd = RedisConnection::getFormattedCommand(command, ap);
auto bfCbPtr =
std::make_shared<std::function<void(const RedisConnectionPtr &)>>(
[resultCallback = std::move(newResultCallback),
exceptionCallback = std::move(newExceptionCallback),
formattedCmd = std::move(formattedCmd)](
const RedisConnectionPtr &connPtr) mutable {
connPtr->sendFormattedCommand(std::move(formattedCmd),
std::move(resultCallback),
std::move(exceptionCallback));
});
(*bufferCbPtr) = bfCbPtr;
tasks_.emplace_back(bfCbPtr);
}
timeoutFlagPtr->runTimer();
}

View File

@ -19,7 +19,7 @@
#include <trantor/net/EventLoopThreadPool.h>
#include <vector>
#include <unordered_set>
#include <queue>
#include <list>
#include <future>
namespace drogon
@ -54,6 +54,11 @@ class RedisClientLockFree final
const std::function<void(const RedisTransactionPtr &)> &callback)
override;
void setTimeout(double timeout) override
{
timeout_ = timeout;
}
private:
trantor::EventLoop *loop_;
std::unordered_set<RedisConnectionPtr> connections_;
@ -63,10 +68,16 @@ class RedisClientLockFree final
const trantor::InetAddress serverAddr_;
const std::string password_;
const size_t numberOfConnections_;
std::queue<std::function<void(const RedisConnectionPtr &)>> tasks_;
std::list<std::shared_ptr<std::function<void(const RedisConnectionPtr &)>>>
tasks_;
double timeout_{-1.0};
std::shared_ptr<RedisTransaction> makeTransaction(
const RedisConnectionPtr &connPtr);
void handleNextTask(const RedisConnectionPtr &connPtr);
void execCommandAsyncWithTimeout(string_view command,
RedisResultCallback &&resultCallback,
RedisExceptionCallback &&exceptionCallback,
va_list ap);
};
} // namespace nosql
} // namespace drogon

View File

@ -41,15 +41,23 @@ void RedisClientManager::createRedisClients(
redisInfo.connectionNumber_,
ioLoops[idx],
redisInfo.password_);
if (redisInfo.timeout_ > 0.0)
{
c->setTimeout(redisInfo.timeout_);
}
});
}
else
{
redisClientsMap_[redisInfo.name_] =
std::make_shared<RedisClientImpl>(
trantor::InetAddress(redisInfo.addr_, redisInfo.port_),
redisInfo.connectionNumber_,
redisInfo.password_);
auto clientPtr = std::make_shared<RedisClientImpl>(
trantor::InetAddress(redisInfo.addr_, redisInfo.port_),
redisInfo.connectionNumber_,
redisInfo.password_);
if (redisInfo.timeout_ > 0.0)
{
clientPtr->setTimeout(redisInfo.timeout_);
}
redisClientsMap_[redisInfo.name_] = std::move(clientPtr);
}
}
}
@ -59,7 +67,8 @@ void RedisClientManager::createRedisClient(const std::string &name,
unsigned short port,
const std::string &password,
const size_t connectionNum,
const bool isFast)
const bool isFast,
double timeout)
{
RedisInfo info;
info.name_ = name;
@ -68,6 +77,7 @@ void RedisClientManager::createRedisClient(const std::string &name,
info.password_ = password;
info.connectionNumber_ = connectionNum;
info.isFast_ = isFast;
info.timeout_ = timeout;
redisInfos_.emplace_back(std::move(info));
}

View File

@ -162,6 +162,10 @@ class RedisConnection : public trantor::NonCopyable,
args);
va_end(args);
}
trantor::EventLoop *getLoop() const
{
return loop_;
}
private:
redisAsyncContext *redisContext_{nullptr};

View File

@ -13,7 +13,7 @@
*/
#include "RedisTransactionImpl.h"
#include "../../lib/src/TaskTimeoutFlag.h"
using namespace drogon::nosql;
RedisTransactionImpl::RedisTransactionImpl(RedisConnectionPtr connPtr) noexcept
@ -46,20 +46,64 @@ void RedisTransactionImpl::execCommandAsync(
"Transaction was cancelled"));
return;
}
va_list args;
va_start(args, command);
connPtr_->sendvCommand(
command,
std::move(resultCallback),
[thisPtr = shared_from_this(),
exceptionCallback =
std::move(exceptionCallback)](const RedisException &err) {
LOG_ERROR << err.what();
thisPtr->isExecutedOrCancelled_ = true;
exceptionCallback(err);
},
args);
va_end(args);
if (timeout_ <= 0.0)
{
va_list args;
va_start(args, command);
connPtr_->sendvCommand(
command,
std::move(resultCallback),
[thisPtr = shared_from_this(),
exceptionCallback =
std::move(exceptionCallback)](const RedisException &err) {
LOG_ERROR << err.what();
thisPtr->isExecutedOrCancelled_ = true;
exceptionCallback(err);
},
args);
va_end(args);
}
else
{
auto expCbPtr = std::make_shared<RedisExceptionCallback>(
std::move(exceptionCallback));
auto timeoutFlagPtr = std::make_shared<TaskTimeoutFlag>(
connPtr_->getLoop(),
std::chrono::duration<double>(timeout_),
[expCbPtr]() {
if (*expCbPtr)
{
(*expCbPtr)(RedisException(RedisErrorCode::kTimeout,
"Command execution timeout"));
}
});
va_list args;
va_start(args, command);
connPtr_->sendvCommand(
command,
[resultCallback = std::move(resultCallback),
timeoutFlagPtr](const RedisResult &result) {
if (timeoutFlagPtr->done())
{
return;
}
resultCallback(result);
},
[thisPtr = shared_from_this(), expCbPtr, timeoutFlagPtr](
const RedisException &err) {
if (timeoutFlagPtr->done())
{
return;
}
LOG_ERROR << err.what();
thisPtr->isExecutedOrCancelled_ = true;
if (*expCbPtr)
(*expCbPtr)(err);
},
args);
va_end(args);
timeoutFlagPtr->runTimer();
}
}
void RedisTransactionImpl::doBegin()

View File

@ -44,12 +44,17 @@ class RedisTransactionImpl final
{
callback(shared_from_this());
}
void setTimeout(double timeout) override
{
timeout_ = timeout;
}
void doBegin();
~RedisTransactionImpl() override;
private:
bool isExecutedOrCancelled_{false};
RedisConnectionPtr connPtr_;
double timeout_{-1.0};
};
} // namespace nosql
} // namespace drogon

View File

@ -236,12 +236,17 @@ class DROGON_EXPORT DbClient : public trantor::NonCopyable
* automatically or manually rolled back, the callback will never be
* executed. You can also use the setCommitCallback() method of a
* transaction object to set the callback.
* @note A TimeoutError exception is thrown if the operation is timed out.
*/
virtual std::shared_ptr<Transaction> newTransaction(
const std::function<void(bool)> &commitCallback =
std::function<void(bool)>()) noexcept(false) = 0;
/// Create a transaction object in asynchronous mode.
/**
* @note An empty shared_ptr object is returned via the callback if the
* operation is timed out.
*/
virtual void newTransactionAsync(
const std::function<void(const std::shared_ptr<Transaction> &)>
&callback) = 0;