From 4809cc950814d0a8160500fced9e0b311ea8fcfc Mon Sep 17 00:00:00 2001 From: An Tao Date: Mon, 16 Dec 2019 00:34:40 +0800 Subject: [PATCH] Modify the reverse proxy example (#319) --- README.md | 2 +- examples/simple_reverse_proxy/config.json | 14 +- .../plugins/SimpleReverseProxy.cc | 35 ++-- .../plugins/SimpleReverseProxy.h | 7 +- lib/inc/drogon/HttpRequest.h | 11 ++ lib/inc/drogon/HttpResponse.h | 11 ++ lib/src/HttpClientImpl.cc | 14 +- lib/src/HttpRequestImpl.cc | 17 +- lib/src/HttpRequestImpl.h | 11 ++ lib/src/HttpResponseImpl.cc | 171 ++++++++++-------- lib/src/HttpResponseImpl.h | 5 + 11 files changed, 175 insertions(+), 123 deletions(-) diff --git a/README.md b/README.md index 7e623919..343bd30a 100755 --- a/README.md +++ b/README.md @@ -180,4 +180,4 @@ As you can see, users can use the `HttpController` to map paths and parameters a In addition, you can also find that all handler interfaces are in asynchronous mode, where the response is returned by a callback object. This design is for performance reasons because in asynchronous mode the drogon application can handle a large number of concurrent requests with a small number of threads. -After compiling all of the above source files, we get a very simple web application. This is a good start. **for more information, please visit the [wiki](https://github.com/an-tao/drogon/wiki/01-Overview) site** +After compiling all of the above source files, we get a very simple web application. This is a good start. **for more information, please visit the [wiki](https://github.com/an-tao/drogon/wiki/01-Overview) or the [doxiz](https://doxiz.com/drogon/master/overview/)** diff --git a/examples/simple_reverse_proxy/config.json b/examples/simple_reverse_proxy/config.json index ae4dbd8d..eb7d383e 100644 --- a/examples/simple_reverse_proxy/config.json +++ b/examples/simple_reverse_proxy/config.json @@ -18,7 +18,7 @@ "app": { //threads_num: The number of IO threads, 1 by default, if the value is set to 0, the number of threads //is the number of CPU cores - "threads_num": 0, + "threads_num": 2, //enable_session: False by default "enable_session": false, "session_timeout": 0, @@ -141,14 +141,14 @@ //config: The configuration of the plugin. This json object is the parameter to initialize the plugin. //It can be commented out "config": { + // The pipelining depth of HTTP clients. "pipelining": 16, - "backends": ["http://127.0.0.1:8848", "https://localhost:8849"], - "same_client_to_same_backend": { - "enabled": false, - "cache_timeout": 3600 - } + "backends": ["http://127.0.0.1:8848"], + "same_client_to_same_backend": false, + //The number of connections created by proxy for each backend in very event loop (IO thread). + "connection_factor": 1 } }], //custom_config: custom configuration for users. This object can be get by the app().getCustomConfig() method. "custom_config": {} -} \ No newline at end of file +} diff --git a/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.cc b/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.cc index 63e0e2b2..50f4ee5a 100644 --- a/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.cc +++ b/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.cc @@ -30,23 +30,17 @@ void SimpleReverseProxy::initAndStart(const Json::Value &config) abort(); } pipeliningDepth_ = config.get("pipelining", 0).asInt(); - if (config.isMember("same_client_to_same_backend")) + sameClientToSameBackend_ = + config.get("same_client_to_same_backend", false).asBool(); + connectionFactor_ = config.get("connection_factor", 1).asInt(); + if (connectionFactor_ == 0 || connectionFactor_ > 100) { - sameClientToSameBackend_ = config["same_client_to_same_backend"] - .get("enabled", false) - .asBool(); - cacheTimeout_ = config["same_client_to_same_backend"] - .get("cache_timeout", 0) - .asInt(); - } - if (sameClientToSameBackend_) - { - clientMap_ = std::make_unique>( - app().getLoop()); + LOG_ERROR << "invalid number of connection factor"; + abort(); } clients_.init( [this](std::vector &clients, size_t ioLoopIndex) { - clients.resize(backendAddrs_.size()); + clients.resize(backendAddrs_.size() * connectionFactor_); }); clientIndex_.init( [this](size_t &index, size_t ioLoopIndex) { index = ioLoopIndex; }); @@ -69,13 +63,10 @@ void SimpleReverseProxy::preRouting(const HttpRequestPtr &req, auto &clientsVector = *clients_; if (sameClientToSameBackend_) { - std::lock_guard lock(mapMutex_); - auto ip = req->peerAddr().toIp(); - if (!clientMap_->findAndFetch(ip, index)) - { - index = (++*clientIndex_) % clientsVector.size(); - clientMap_->insert(ip, index, cacheTimeout_); - } + index = std::hash{}(req->getPeerAddr().ipNetEndian()) % + clientsVector.size(); + index = (index + (++(*clientIndex_)) * backendAddrs_.size()) % + clientsVector.size(); } else { @@ -84,17 +75,19 @@ void SimpleReverseProxy::preRouting(const HttpRequestPtr &req, auto &clientPtr = clientsVector[index]; if (!clientPtr) { - auto &addr = backendAddrs_[index]; + auto &addr = backendAddrs_[index % backendAddrs_.size()]; clientPtr = HttpClient::newHttpClient( addr, trantor::EventLoop::getEventLoopOfCurrentThread()); clientPtr->setPipeliningDepth(pipeliningDepth_); } + req->setPassThrough(true); clientPtr->sendRequest( req, [callback = std::move(callback)](ReqResult result, const HttpResponsePtr &resp) { if (result == ReqResult::Ok) { + resp->setPassThrough(true); callback(resp); } else diff --git a/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.h b/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.h index 3d4d4fb7..7900cec8 100644 --- a/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.h +++ b/examples/simple_reverse_proxy/plugins/SimpleReverseProxy.h @@ -28,17 +28,16 @@ class SimpleReverseProxy : public drogon::Plugin virtual void shutdown() override; private: - // Create a HTTP client for every backend in every IO event loop. + // Create 'connectionFactor_' HTTP clients for every backend in every IO + // event loop. drogon::IOThreadStorage> clients_; drogon::IOThreadStorage clientIndex_{0}; std::vector backendAddrs_; bool sameClientToSameBackend_{false}; - size_t cacheTimeout_{0}; - std::mutex mapMutex_; size_t pipeliningDepth_{0}; void preRouting(const drogon::HttpRequestPtr &, drogon::AdviceCallback &&, drogon::AdviceChainCallback &&); - std::unique_ptr> clientMap_; + size_t connectionFactor_{1}; }; } // namespace my_plugin diff --git a/lib/inc/drogon/HttpRequest.h b/lib/inc/drogon/HttpRequest.h index e9fb6700..6e2dfbb0 100644 --- a/lib/inc/drogon/HttpRequest.h +++ b/lib/inc/drogon/HttpRequest.h @@ -321,6 +321,17 @@ class HttpRequest virtual void addCookie(const std::string &key, const std::string &value) = 0; + /** + * @brief Set the request object to the pass-through mode or not. It's not + * by default when a new request object is created. + * In pass-through mode, no addtional headers (including server, date, + * content-type and content-length, etc.) are added to the request. This + * mode is useful for some applications such as a proxy. + * + * @param flag + */ + virtual void setPassThrough(bool flag) = 0; + /// The following methods are a series of factory methods that help users /// create request objects. diff --git a/lib/inc/drogon/HttpResponse.h b/lib/inc/drogon/HttpResponse.h index 10b7143a..64ee7a44 100644 --- a/lib/inc/drogon/HttpResponse.h +++ b/lib/inc/drogon/HttpResponse.h @@ -278,6 +278,17 @@ class HttpResponse return jsonObject(); } + /** + * @brief Set the reponse object to the pass-through mode or not. It's not + * by default when a new response object is created. + * In pass-through mode, no addtional headers (including server, date, + * content-type and content-length, etc.) are added to the response. This + * mode is useful for some applications such as a proxy. + * + * @param flag + */ + virtual void setPassThrough(bool flag) = 0; + /* The following methods are a series of factory methods that help users * create response objects. */ diff --git a/lib/src/HttpClientImpl.cc b/lib/src/HttpClientImpl.cc index 81e67b5b..08c70145 100644 --- a/lib/src/HttpClientImpl.cc +++ b/lib/src/HttpClientImpl.cc @@ -213,14 +213,16 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req, const drogon::HttpReqCallback &callback) { loop_->assertInLoopThread(); - req->addHeader("Connection", "Keep-Alive"); - // req->addHeader("Accept", "*/*"); - if (!domain_.empty()) + if (!static_cast(req.get())->passThrough()) { - req->addHeader("Host", domain_); + req->addHeader("Connection", "Keep-Alive"); + // req->addHeader("Accept", "*/*"); + if (!domain_.empty()) + { + req->addHeader("Host", domain_); + } + req->addHeader("User-Agent", "DrogonClient"); } - req->addHeader("User-Agent", "DrogonClient"); - for (auto &cookie : validCookies_) { if ((cookie.expiresDate().microSecondsSinceEpoch() == 0 || diff --git a/lib/src/HttpRequestImpl.cc b/lib/src/HttpRequestImpl.cc index 1d49a827..6c266516 100644 --- a/lib/src/HttpRequestImpl.cc +++ b/lib/src/HttpRequestImpl.cc @@ -185,7 +185,9 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const } std::string content; - if (!parameters_.empty() && contentType_ != CT_MULTIPART_FORM_DATA) + + if (!passThrough_ && + (!parameters_.empty() && contentType_ != CT_MULTIPART_FORM_DATA)) { for (auto const &p : parameters_) { @@ -221,9 +223,9 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const << "You can't set parameters in the query string when the " "request content type is JSON and http method " "is POST or PUT"; - LOG_ERROR - << "Please put these parameters into the path or into the json " - "string"; + LOG_ERROR << "Please put these parameters into the path or " + "into the json " + "string"; content.clear(); } } @@ -242,7 +244,8 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const return; } output->append("\r\n"); - if (contentType_ == CT_MULTIPART_FORM_DATA) + + if (!passThrough_ && contentType_ == CT_MULTIPART_FORM_DATA) { auto mReq = dynamic_cast(this); if (mReq) @@ -291,7 +294,7 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const } } assert(!(!content.empty() && !content_.empty())); - if (!content.empty() || !content_.empty()) + if (!passThrough_ && (!content.empty() || !content_.empty())) { char buf[64]; auto len = snprintf(buf, @@ -306,7 +309,7 @@ void HttpRequestImpl::appendToBuffer(trantor::MsgBuffer *output) const output->append(type.data(), type.length()); } } - if (!contentTypeString_.empty()) + if (!passThrough_ && !contentTypeString_.empty()) { output->append(contentTypeString_); } diff --git a/lib/src/HttpRequestImpl.h b/lib/src/HttpRequestImpl.h index 0c327173..cf8cb15e 100644 --- a/lib/src/HttpRequestImpl.h +++ b/lib/src/HttpRequestImpl.h @@ -330,6 +330,16 @@ class HttpRequestImpl : public HttpRequest cookies_[key] = value; } + virtual void setPassThrough(bool flag) override + { + passThrough_ = flag; + } + + bool passThrough() const + { + return passThrough_; + } + void appendToBuffer(trantor::MsgBuffer *output) const; virtual SessionPtr session() const override @@ -460,6 +470,7 @@ class HttpRequestImpl : public HttpRequest std::string expect_; bool keepAlive_{true}; bool isOnSecureConnection_{false}; + bool passThrough_{false}; protected: std::string content_; diff --git a/lib/src/HttpResponseImpl.cc b/lib/src/HttpResponseImpl.cc index bc0bfaed..438500ae 100644 --- a/lib/src/HttpResponseImpl.cc +++ b/lib/src/HttpResponseImpl.cc @@ -214,72 +214,8 @@ void HttpResponseImpl::makeHeaderString( headerStringPtr->append(statusMessage_.data(), statusMessage_.length()); headerStringPtr->append("\r\n"); generateBodyFromJson(); - if (sendfileName_.empty()) + if (!passThrough_) { - long unsigned int bodyLength = - bodyPtr_ ? bodyPtr_->length() - : (bodyViewPtr_ ? bodyViewPtr_->length() : 0); - len = snprintf(buf, sizeof buf, "Content-Length: %lu\r\n", bodyLength); - } - else - { - struct stat filestat; - if (stat(sendfileName_.c_str(), &filestat) < 0) - { - LOG_SYSERR << sendfileName_ << " stat error"; - return; - } - len = snprintf(buf, - sizeof buf, - "Content-Length: %llu\r\n", - static_cast(filestat.st_size)); - } - - headerStringPtr->append(buf, len); - if (headers_.find("Connection") == headers_.end()) - { - if (closeConnection_) - { - headerStringPtr->append("Connection: close\r\n"); - } - else - { - // output->append("Connection: Keep-Alive\r\n"); - } - } - headerStringPtr->append(contentTypeString_.data(), - contentTypeString_.length()); - for (auto it = headers_.begin(); it != headers_.end(); ++it) - { - headerStringPtr->append(it->first); - headerStringPtr->append(": "); - headerStringPtr->append(it->second); - headerStringPtr->append("\r\n"); - } - if (HttpAppFrameworkImpl::instance().sendServerHeader()) - { - headerStringPtr->append( - HttpAppFrameworkImpl::instance().getServerHeaderString()); - } -} -void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer) -{ - if (expriedTime_ >= 0) - { - auto strPtr = renderToString(); - buffer.append(strPtr->data(), strPtr->length()); - return; - } - - if (!fullHeaderString_) - { - char buf[128]; - auto len = snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_); - buffer.append(buf, len); - if (!statusMessage_.empty()) - buffer.append(statusMessage_.data(), statusMessage_.length()); - buffer.append("\r\n"); - generateBodyFromJson(); if (sendfileName_.empty()) { long unsigned int bodyLength = @@ -305,19 +241,101 @@ void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer) static_cast(filestat.st_size)); } - buffer.append(buf, len); + headerStringPtr->append(buf, len); if (headers_.find("Connection") == headers_.end()) { if (closeConnection_) { - buffer.append("Connection: close\r\n"); + headerStringPtr->append("Connection: close\r\n"); } else { // output->append("Connection: Keep-Alive\r\n"); } } - buffer.append(contentTypeString_.data(), contentTypeString_.length()); + headerStringPtr->append(contentTypeString_.data(), + contentTypeString_.length()); + if (HttpAppFrameworkImpl::instance().sendServerHeader()) + { + headerStringPtr->append( + HttpAppFrameworkImpl::instance().getServerHeaderString()); + } + } + + for (auto it = headers_.begin(); it != headers_.end(); ++it) + { + headerStringPtr->append(it->first); + headerStringPtr->append(": "); + headerStringPtr->append(it->second); + headerStringPtr->append("\r\n"); + } +} +void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer) +{ + if (expriedTime_ >= 0) + { + auto strPtr = renderToString(); + buffer.append(strPtr->data(), strPtr->length()); + return; + } + + if (!fullHeaderString_) + { + char buf[128]; + auto len = snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_); + buffer.append(buf, len); + if (!statusMessage_.empty()) + buffer.append(statusMessage_.data(), statusMessage_.length()); + buffer.append("\r\n"); + generateBodyFromJson(); + if (!passThrough_) + { + if (sendfileName_.empty()) + { + long unsigned int bodyLength = + bodyPtr_ ? bodyPtr_->length() + : (bodyViewPtr_ ? bodyViewPtr_->length() : 0); + len = snprintf(buf, + sizeof buf, + "Content-Length: %lu\r\n", + bodyLength); + } + else + { + struct stat filestat; + if (stat(sendfileName_.c_str(), &filestat) < 0) + { + LOG_SYSERR << sendfileName_ << " stat error"; + return; + } + len = snprintf(buf, + sizeof buf, + "Content-Length: %llu\r\n", + static_cast( + filestat.st_size)); + } + + buffer.append(buf, len); + if (headers_.find("Connection") == headers_.end()) + { + if (closeConnection_) + { + buffer.append("Connection: close\r\n"); + } + else + { + // output->append("Connection: Keep-Alive\r\n"); + } + } + buffer.append(contentTypeString_.data(), + contentTypeString_.length()); + if (HttpAppFrameworkImpl::instance().sendServerHeader()) + { + buffer.append( + HttpAppFrameworkImpl::instance().getServerHeaderString()); + } + } + for (auto it = headers_.begin(); it != headers_.end(); ++it) { buffer.append(it->first); @@ -325,11 +343,6 @@ void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer) buffer.append(it->second); buffer.append("\r\n"); } - if (HttpAppFrameworkImpl::instance().sendServerHeader()) - { - buffer.append( - HttpAppFrameworkImpl::instance().getServerHeaderString()); - } } else { @@ -346,7 +359,8 @@ void HttpResponseImpl::renderToBuffer(trantor::MsgBuffer &buffer) } // output Date header - if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) + if (!passThrough_ && + drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) { buffer.append("Date: "); buffer.append(utils::getHttpFullDate(trantor::Date::date()), @@ -366,7 +380,8 @@ std::shared_ptr HttpResponseImpl::renderToString() { if (expriedTime_ >= 0) { - if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) + if (!passThrough_ && + drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) { if (datePos_ != std::string::npos) { @@ -418,7 +433,8 @@ std::shared_ptr HttpResponseImpl::renderToString() } // output Date header - if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) + if (!passThrough_ && + drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) { httpString->append("Date: "); auto datePos = httpString->length(); @@ -467,7 +483,8 @@ std::shared_ptr HttpResponseImpl::renderHeaderForHeadMethod() } // output Date header - if (drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) + if (!passThrough_ && + drogon::HttpAppFrameworkImpl::instance().sendDateHeader()) { httpString->append("Date: "); httpString->append(utils::getHttpFullDate(trantor::Date::date()), diff --git a/lib/src/HttpResponseImpl.h b/lib/src/HttpResponseImpl.h index 27570fe0..036dc12d 100644 --- a/lib/src/HttpResponseImpl.h +++ b/lib/src/HttpResponseImpl.h @@ -44,6 +44,10 @@ class HttpResponseImpl : public HttpResponse contentTypeString_(webContentTypeToString(type)) { } + virtual void setPassThrough(bool flag) override + { + passThrough_ = flag; + } virtual HttpStatusCode statusCode() const override { return statusCode_; @@ -363,6 +367,7 @@ class HttpResponseImpl : public HttpResponse ContentType contentType_{CT_TEXT_HTML}; string_view contentTypeString_{ "Content-Type: text/html; charset=utf-8\r\n"}; + bool passThrough_{false}; void setContentType(const string_view &contentType) { contentTypeString_ = contentType;