Modify the reverse proxy example (#319)

This commit is contained in:
An Tao 2019-12-16 00:34:40 +08:00 committed by GitHub
parent cc04a013be
commit 4809cc9508
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 175 additions and 123 deletions

View File

@ -180,4 +180,4 @@ As you can see, users can use the `HttpController` to map paths and parameters a
In addition, you can also find that all handler interfaces are in asynchronous mode, where the response is returned by a callback object. This design is for performance reasons because in asynchronous mode the drogon application can handle a large number of concurrent requests with a small number of threads.
After compiling all of the above source files, we get a very simple web application. This is a good start. **for more information, please visit the [wiki](https://github.com/an-tao/drogon/wiki/01-Overview) site**
After compiling all of the above source files, we get a very simple web application. This is a good start. **for more information, please visit the [wiki](https://github.com/an-tao/drogon/wiki/01-Overview) or the [doxiz](https://doxiz.com/drogon/master/overview/)**

View File

@ -18,7 +18,7 @@
"app": {
//threads_num: The number of IO threads, 1 by default, if the value is set to 0, the number of threads
//is the number of CPU cores
"threads_num": 0,
"threads_num": 2,
//enable_session: False by default
"enable_session": false,
"session_timeout": 0,
@ -141,14 +141,14 @@
//config: The configuration of the plugin. This json object is the parameter to initialize the plugin.
//It can be commented out
"config": {
// The pipelining depth of HTTP clients.
"pipelining": 16,
"backends": ["http://127.0.0.1:8848", "https://localhost:8849"],
"same_client_to_same_backend": {
"enabled": false,
"cache_timeout": 3600
}
"backends": ["http://127.0.0.1:8848"],
"same_client_to_same_backend": false,
//The number of connections created by proxy for each backend in very event loop (IO thread).
"connection_factor": 1
}
}],
//custom_config: custom configuration for users. This object can be get by the app().getCustomConfig() method.
"custom_config": {}
}
}

View File

@ -30,23 +30,17 @@ void SimpleReverseProxy::initAndStart(const Json::Value &config)
abort();
}
pipeliningDepth_ = config.get("pipelining", 0).asInt();
if (config.isMember("same_client_to_same_backend"))
sameClientToSameBackend_ =
config.get("same_client_to_same_backend", false).asBool();
connectionFactor_ = config.get("connection_factor", 1).asInt();
if (connectionFactor_ == 0 || connectionFactor_ > 100)
{
sameClientToSameBackend_ = config["same_client_to_same_backend"]
.get("enabled", false)
.asBool();
cacheTimeout_ = config["same_client_to_same_backend"]
.get("cache_timeout", 0)
.asInt();
}
if (sameClientToSameBackend_)
{
clientMap_ = std::make_unique<drogon::CacheMap<std::string, size_t>>(
app().getLoop());
LOG_ERROR << "invalid number of connection factor";
abort();
}
clients_.init(
[this](std::vector<HttpClientPtr> &clients, size_t ioLoopIndex) {
clients.resize(backendAddrs_.size());
clients.resize(backendAddrs_.size() * connectionFactor_);
});
clientIndex_.init(
[this](size_t &index, size_t ioLoopIndex) { index = ioLoopIndex; });
@ -69,13 +63,10 @@ void SimpleReverseProxy::preRouting(const HttpRequestPtr &req,
auto &clientsVector = *clients_;
if (sameClientToSameBackend_)
{
std::lock_guard<std::mutex> lock(mapMutex_);
auto ip = req->peerAddr().toIp();
if (!clientMap_->findAndFetch(ip, index))
{
index = (++*clientIndex_) % clientsVector.size();
clientMap_->insert(ip, index, cacheTimeout_);
}
index = std::hash<uint32_t>{}(req->getPeerAddr().ipNetEndian()) %
clientsVector.size();
index = (index + (++(*clientIndex_)) * backendAddrs_.size()) %
clientsVector.size();
}
else
{
@ -84,17 +75,19 @@ void SimpleReverseProxy::preRouting(const HttpRequestPtr &req,
auto &clientPtr = clientsVector[index];
if (!clientPtr)
{
auto &addr = backendAddrs_[index];
auto &addr = backendAddrs_[index % backendAddrs_.size()];
clientPtr = HttpClient::newHttpClient(
addr, trantor::EventLoop::getEventLoopOfCurrentThread());
clientPtr->setPipeliningDepth(pipeliningDepth_);
}
req->setPassThrough(true);
clientPtr->sendRequest(
req,
[callback = std::move(callback)](ReqResult result,
const HttpResponsePtr &resp) {
if (result == ReqResult::Ok)
{
resp->setPassThrough(true);
callback(resp);
}
else

View File

@ -28,17 +28,16 @@ class SimpleReverseProxy : public drogon::Plugin<SimpleReverseProxy>
virtual void shutdown() override;
private:
// Create a HTTP client for every backend in every IO event loop.
// Create 'connectionFactor_' HTTP clients for every backend in every IO
// event loop.
drogon::IOThreadStorage<std::vector<drogon::HttpClientPtr>> clients_;
drogon::IOThreadStorage<size_t> clientIndex_{0};
std::vector<std::string> backendAddrs_;
bool sameClientToSameBackend_{false};
size_t cacheTimeout_{0};
std::mutex mapMutex_;
size_t pipeliningDepth_{0};
void preRouting(const drogon::HttpRequestPtr &,
drogon::AdviceCallback &&,
drogon::AdviceChainCallback &&);
std::unique_ptr<drogon::CacheMap<std::string, size_t>> clientMap_;
size_t connectionFactor_{1};
};
} // namespace my_plugin

View File

@ -321,6 +321,17 @@ class HttpRequest
virtual void addCookie(const std::string &key,
const std::string &value) = 0;
/**
* @brief Set the request object to the pass-through mode or not. It's not
* by default when a new request object is created.
* In pass-through mode, no addtional headers (including server, date,
* content-type and content-length, etc.) are added to the request. This
* mode is useful for some applications such as a proxy.
*
* @param flag
*/
virtual void setPassThrough(bool flag) = 0;
/// The following methods are a series of factory methods that help users
/// create request objects.

View File

@ -278,6 +278,17 @@ class HttpResponse
return jsonObject();
}
/**
* @brief Set the reponse object to the pass-through mode or not. It's not
* by default when a new response object is created.
* In pass-through mode, no addtional headers (including server, date,
* content-type and content-length, etc.) are added to the response. This
* mode is useful for some applications such as a proxy.
*
* @param flag
*/
virtual void setPassThrough(bool flag) = 0;
/* The following methods are a series of factory methods that help users
* create response objects. */

View File

@ -213,14 +213,16 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
const drogon::HttpReqCallback &callback)
{
loop_->assertInLoopThread();
req->addHeader("Connection", "Keep-Alive");
// req->addHeader("Accept", "*/*");
if (!domain_.empty())
if (!static_cast<drogon::HttpRequestImpl *>(req.get())->passThrough())
{
req->addHeader("Host", domain_);
req->addHeader("Connection", "Keep-Alive");
// req->addHeader("Accept", "*/*");
if (!domain_.empty())
{
req->addHeader("Host", domain_);
}
req->addHeader("User-Agent", "DrogonClient");
}
req->addHeader("User-Agent", "DrogonClient");
for (auto &cookie : validCookies_)
{
if ((cookie.expiresDate().microSecondsSinceEpoch() == 0 ||

View File

@ -185,7 +185,9 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
}
std::string content;
if (!parameters_.empty() && contentType_ != CT_MULTIPART_FORM_DATA)
if (!passThrough_ &&
(!parameters_.empty() && contentType_ != CT_MULTIPART_FORM_DATA))
{
for (auto const &p : parameters_)
{
@ -221,9 +223,9 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
<< "You can't set parameters in the query string when the "
"request content type is JSON and http method "
"is POST or PUT";
LOG_ERROR
<< "Please put these parameters into the path or into the json "
"string";
LOG_ERROR << "Please put these parameters into the path or "
"into the json "
"string";
content.clear();
}
}
@ -242,7 +244,8 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
return;
}
output->append("\r\n");
if (contentType_ == CT_MULTIPART_FORM_DATA)
if (!passThrough_ && contentType_ == CT_MULTIPART_FORM_DATA)
{
auto mReq = dynamic_cast<const HttpFileUploadRequest *>(this);
if (mReq)
@ -291,7 +294,7 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
}
}
assert(!(!content.empty() && !content_.empty()));
if (!content.empty() || !content_.empty())
if (!passThrough_ && (!content.empty() || !content_.empty()))
{
char buf[64];
auto len = snprintf(buf,
@ -306,7 +309,7 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const
output->append(type.data(), type.length());
}
}
if (!contentTypeString_.empty())
if (!passThrough_ && !contentTypeString_.empty())
{
output->append(contentTypeString_);
}

View File

@ -330,6 +330,16 @@ class HttpRequestImpl : public HttpRequest
cookies_[key] = value;
}
virtual void setPassThrough(bool flag) override
{
passThrough_ = flag;
}
bool passThrough() const
{
return passThrough_;
}
void appendToBuffer(trantor::MsgBuffer *output) const;
virtual SessionPtr session() const override
@ -460,6 +470,7 @@ class HttpRequestImpl : public HttpRequest
std::string expect_;
bool keepAlive_{true};
bool isOnSecureConnection_{false};
bool passThrough_{false};
protected:
std::string content_;

View File

@ -214,72 +214,8 @@ void HttpResponseImpl::makeHeaderString(
headerStringPtr->append(statusMessage_.data(), statusMessage_.length());
headerStringPtr->append("\r\n");
generateBodyFromJson();
if (sendfileName_.empty())
if (!passThrough_)
{
long unsigned int bodyLength =
bodyPtr_ ? bodyPtr_->length()
: (bodyViewPtr_ ? bodyViewPtr_->length() : 0);
len = snprintf(buf, sizeof buf, "Content-Length: %lu\r\n", bodyLength);
}
else
{
struct stat filestat;
if (stat(sendfileName_.c_str(), &filestat) < 0)
{
LOG_SYSERR << sendfileName_ << " stat error";
return;
}
len = snprintf(buf,
sizeof buf,
"Content-Length: %llu\r\n",
static_cast<long long unsigned int>(filestat.st_size));
}
headerStringPtr->append(buf, len);
if (headers_.find("Connection") == headers_.end())
{
if (closeConnection_)
{
headerStringPtr->append("Connection: close\r\n");
}
else
{
// output->append("Connection: Keep-Alive\r\n");
}
}
headerStringPtr->append(contentTypeString_.data(),
contentTypeString_.length());
for (auto it = headers_.begin(); it != headers_.end(); ++it)
{
headerStringPtr->append(it->first);
headerStringPtr->append(": ");
headerStringPtr->append(it->second);
headerStringPtr->append("\r\n");
}
if (HttpAppFrameworkImpl::instance().sendServerHeader())
{
headerStringPtr->append(
HttpAppFrameworkImpl::instance().getServerHeaderString());
}
}
void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer)
{
if (expriedTime_ >= 0)
{
auto strPtr = renderToString();
buffer.append(strPtr->data(), strPtr->length());
return;
}
if (!fullHeaderString_)
{
char buf[128];
auto len = snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_);
buffer.append(buf, len);
if (!statusMessage_.empty())
buffer.append(statusMessage_.data(), statusMessage_.length());
buffer.append("\r\n");
generateBodyFromJson();
if (sendfileName_.empty())
{
long unsigned int bodyLength =
@ -305,19 +241,101 @@ void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer)
static_cast<long long unsigned int>(filestat.st_size));
}
buffer.append(buf, len);
headerStringPtr->append(buf, len);
if (headers_.find("Connection") == headers_.end())
{
if (closeConnection_)
{
buffer.append("Connection: close\r\n");
headerStringPtr->append("Connection: close\r\n");
}
else
{
// output->append("Connection: Keep-Alive\r\n");
}
}
buffer.append(contentTypeString_.data(), contentTypeString_.length());
headerStringPtr->append(contentTypeString_.data(),
contentTypeString_.length());
if (HttpAppFrameworkImpl::instance().sendServerHeader())
{
headerStringPtr->append(
HttpAppFrameworkImpl::instance().getServerHeaderString());
}
}
for (auto it = headers_.begin(); it != headers_.end(); ++it)
{
headerStringPtr->append(it->first);
headerStringPtr->append(": ");
headerStringPtr->append(it->second);
headerStringPtr->append("\r\n");
}
}
void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer)
{
if (expriedTime_ >= 0)
{
auto strPtr = renderToString();
buffer.append(strPtr->data(), strPtr->length());
return;
}
if (!fullHeaderString_)
{
char buf[128];
auto len = snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_);
buffer.append(buf, len);
if (!statusMessage_.empty())
buffer.append(statusMessage_.data(), statusMessage_.length());
buffer.append("\r\n");
generateBodyFromJson();
if (!passThrough_)
{
if (sendfileName_.empty())
{
long unsigned int bodyLength =
bodyPtr_ ? bodyPtr_->length()
: (bodyViewPtr_ ? bodyViewPtr_->length() : 0);
len = snprintf(buf,
sizeof buf,
"Content-Length: %lu\r\n",
bodyLength);
}
else
{
struct stat filestat;
if (stat(sendfileName_.c_str(), &filestat) < 0)
{
LOG_SYSERR << sendfileName_ << " stat error";
return;
}
len = snprintf(buf,
sizeof buf,
"Content-Length: %llu\r\n",
static_cast<long long unsigned int>(
filestat.st_size));
}
buffer.append(buf, len);
if (headers_.find("Connection") == headers_.end())
{
if (closeConnection_)
{
buffer.append("Connection: close\r\n");
}
else
{
// output->append("Connection: Keep-Alive\r\n");
}
}
buffer.append(contentTypeString_.data(),
contentTypeString_.length());
if (HttpAppFrameworkImpl::instance().sendServerHeader())
{
buffer.append(
HttpAppFrameworkImpl::instance().getServerHeaderString());
}
}
for (auto it = headers_.begin(); it != headers_.end(); ++it)
{
buffer.append(it->first);
@ -325,11 +343,6 @@ void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer)
buffer.append(it->second);
buffer.append("\r\n");
}
if (HttpAppFrameworkImpl::instance().sendServerHeader())
{
buffer.append(
HttpAppFrameworkImpl::instance().getServerHeaderString());
}
}
else
{
@ -346,7 +359,8 @@ void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer)
}
// output Date header
if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
if (!passThrough_ &&
drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
{
buffer.append("Date: ");
buffer.append(utils::getHttpFullDate(trantor::Date::date()),
@ -366,7 +380,8 @@ std::shared_ptr<std::string> HttpResponseImpl::renderToString()
{
if (expriedTime_ >= 0)
{
if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
if (!passThrough_ &&
drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
{
if (datePos_ != std::string::npos)
{
@ -418,7 +433,8 @@ std::shared_ptr<std::string> HttpResponseImpl::renderToString()
}
// output Date header
if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
if (!passThrough_ &&
drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
{
httpString->append("Date: ");
auto datePos = httpString->length();
@ -467,7 +483,8 @@ std::shared_ptr<std::string> HttpResponseImpl::renderHeaderForHeadMethod()
}
// output Date header
if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
if (!passThrough_ &&
drogon::HttpAppFrameworkImpl::instance().sendDateHeader())
{
httpString->append("Date: ");
httpString->append(utils::getHttpFullDate(trantor::Date::date()),

View File

@ -44,6 +44,10 @@ class HttpResponseImpl : public HttpResponse
contentTypeString_(webContentTypeToString(type))
{
}
virtual void setPassThrough(bool flag) override
{
passThrough_ = flag;
}
virtual HttpStatusCode statusCode() const override
{
return statusCode_;
@ -363,6 +367,7 @@ class HttpResponseImpl : public HttpResponse
ContentType contentType_{CT_TEXT_HTML};
string_view contentTypeString_{
"Content-Type: text/html; charset=utf-8\r\n"};
bool passThrough_{false};
void setContentType(const string_view &contentType)
{
contentTypeString_ = contentType;