From 62fad73bc91285cb1de4a3bce4a8084416d86cb7 Mon Sep 17 00:00:00 2001 From: antao Date: Fri, 5 Apr 2019 01:50:29 +0800 Subject: [PATCH] Enhance HttpClient to support pipelining --- drogon_ctl/create_controller.cc | 9 +- .../simple_example_test/HttpPipelineTest.cc | 16 +- examples/simple_example_test/main.cc | 54 ++++++- lib/inc/drogon/HttpClient.h | 10 +- lib/src/HttpClientImpl.cc | 153 +++++++++++------- lib/src/HttpClientImpl.h | 10 +- 6 files changed, 180 insertions(+), 72 deletions(-) diff --git a/drogon_ctl/create_controller.cc b/drogon_ctl/create_controller.cc index 82234462..3f2a615e 100755 --- a/drogon_ctl/create_controller.cc +++ b/drogon_ctl/create_controller.cc @@ -77,7 +77,6 @@ void create_controller::newSimpleControllerHeaderFile(std::ofstream &file, const file << indent << "class " << class_name << ":public drogon::HttpSimpleController<" << class_name << ">\n"; file << indent << "{\n"; file << indent << "public:\n"; - //TestController(){} file << indent << " virtual void asyncHandleHttpRequest(const HttpRequestPtr& req,const std::function & callback) override;\n"; file << indent << " PATH_LIST_BEGIN\n"; @@ -132,11 +131,6 @@ void create_controller::newWebsockControllerHeaderFile(std::ofstream &file, cons file << indent << "class " << class_name << ":public drogon::WebSocketController<" << class_name << ">\n"; file << indent << "{\n"; file << indent << "public:\n"; - //TestController(){} - // virtual void handleNewMessage(const WebSocketConnectionPtr&, - // trantor::MsgBuffer*)=0; - // //on new connection or after disconnect - // virtual void handleConnection(const WebSocketConnectionPtr&)=0; file << indent << " virtual void handleNewMessage(const WebSocketConnectionPtr&,\n"; file << indent << " std::string &&)override;\n"; file << indent << " virtual void handleNewConnection(const HttpRequestPtr &,\n"; @@ -211,6 +205,9 @@ void create_controller::newHttpControllerHeaderFile(std::ofstream &file, const s file << indent << "//METHOD_ADD(" << class_name << "::your_method_name,\"/{1}/{2}/list\",Get);" "//path is " << namepace_path << class_name << "/{arg1}/{arg2}/list\n"; + file << indent << "//ADD_METHOD_TO(" << class_name << "::your_method_name,\"/absolute/path/{1}/{2}/list\",Get);" + "//path is " + << namepace_path << "/absolute/path/{arg1}/{arg2}/list\n"; file << indent << "\n"; file << indent << "METHOD_LIST_END\n"; file << indent << "//your declaration of processing function maybe like this:\n"; diff --git a/examples/simple_example_test/HttpPipelineTest.cc b/examples/simple_example_test/HttpPipelineTest.cc index e737acb6..346cfca4 100755 --- a/examples/simple_example_test/HttpPipelineTest.cc +++ b/examples/simple_example_test/HttpPipelineTest.cc @@ -16,8 +16,22 @@ int main() if (conn->connected()) { std::string str = "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" + "\r\n" + "GET /pipe HTTP/1.1\r\n" "\r\n"; - for (int i = 0; i < 32; i++) + for (int i = 0; i < 4; i++) conn->send(str); } else diff --git a/examples/simple_example_test/main.cc b/examples/simple_example_test/main.cc index f55c4932..3cb54f93 100644 --- a/examples/simple_example_test/main.cc +++ b/examples/simple_example_test/main.cc @@ -276,7 +276,7 @@ void doTest(const HttpClientPtr &client, std::promise &pro, bool isHttps = req = HttpRequest::newHttpRequest(); req->setMethod(drogon::Get); req->setPath("/api/v1/apitest/3.14/List"); - req->setParameter("P2","1234"); + req->setParameter("P2", "1234"); client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) { if (result == ReqResult::Ok) { @@ -377,7 +377,7 @@ void doTest(const HttpClientPtr &client, std::promise &pro, bool isHttps = req = HttpRequest::newHttpRequest(); req->setMethod(drogon::Get); req->setPath("/api/v1/handle11/11/22/?p3=33"); - req->setParameter("p4","44"); + req->setParameter("p4", "44"); client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) { if (result == ReqResult::Ok) { @@ -582,6 +582,54 @@ void doTest(const HttpClientPtr &client, std::promise &pro, bool isHttps = exit(1); } }); + /// Test pipelining + // req = HttpRequest::newHttpRequest(); + // req->setPath("/pipe"); + // auto flag = std::make_shared(0); + // client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) { + // if (result == ReqResult::Ok) + // { + // if (resp->statusCode() == k200OK && *flag == 0) + // { + // ++(*flag); + // outputGood(req, isHttps); + // } + // else + // { + // LOG_DEBUG << resp->getBody(); + // LOG_ERROR << "Error!"; + // exit(1); + // } + // } + // else + // { + // LOG_ERROR << "Error!"; + // exit(1); + // } + // }); + // req = HttpRequest::newHttpRequest(); + // req->setPath("/pipe"); + // client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) { + // if (result == ReqResult::Ok) + // { + // if (resp->statusCode() == k200OK && *flag == 1) + // { + // ++(*flag); + // outputGood(req, isHttps); + // } + // else + // { + // LOG_DEBUG << resp->getBody(); + // LOG_ERROR << "Error!"; + // exit(1); + // } + // } + // else + // { + // LOG_ERROR << "Error!"; + // exit(1); + // } + // }); /// Test form post req = HttpRequest::newHttpFormPostRequest(); req->setPath("/api/v1/apitest/form"); @@ -653,6 +701,7 @@ void doTest(const HttpClientPtr &client, std::promise &pro, bool isHttps = else { LOG_DEBUG << resp->getBody().length(); + LOG_DEBUG << resp->getBody(); LOG_ERROR << "Error!"; exit(1); } @@ -679,6 +728,7 @@ int main(int argc, char *argv[]) { std::promise pro1; auto client = HttpClient::newHttpClient("127.0.0.1", 8848, false, loop[0].getLoop()); + client->setPipeliningDepth(10); doTest(client, pro1); #ifdef USE_OPENSSL std::promise pro2; diff --git a/lib/inc/drogon/HttpClient.h b/lib/inc/drogon/HttpClient.h index 30334ee3..fa2808c1 100644 --- a/lib/inc/drogon/HttpClient.h +++ b/lib/inc/drogon/HttpClient.h @@ -60,6 +60,14 @@ class HttpClient : public trantor::NonCopyable */ virtual void sendRequest(const HttpRequestPtr &req, const HttpReqCallback &callback) = 0; virtual void sendRequest(const HttpRequestPtr &req, HttpReqCallback &&callback) = 0; + + /// Set the pipelining depth, which is the number of requests that are not responding. + /** + * If this method is not called, the default depth value is 0 which means the pipelining is disabled. + * For details about pipelining, see rfc2616-8.1.2.2 + */ + virtual void setPipeliningDepth(size_t depth) = 0; + /// Use ip and port to connect to server /** * If useSSL is set to true, the client @@ -75,7 +83,7 @@ class HttpClient : public trantor::NonCopyable uint16_t port, bool useSSL = false, trantor::EventLoop *loop = nullptr); - + /// Get the event loop of the client; virtual trantor::EventLoop *loop() = 0; diff --git a/lib/src/HttpClientImpl.cc b/lib/src/HttpClientImpl.cc index 1e50f462..f0196fe8 100644 --- a/lib/src/HttpClientImpl.cc +++ b/lib/src/HttpClientImpl.cc @@ -162,9 +162,12 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req, _tcpClient->enableSSL(); } #endif - std::weak_ptr weakPtr = shared_from_this(); - assert(_reqAndCallbacks.empty()); - _reqAndCallbacks.push(std::make_pair(req, callback)); + auto thisPtr = shared_from_this(); + std::weak_ptr weakPtr = thisPtr; + assert(_requestsBuffer.empty()); + _requestsBuffer.push({req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) { + callback(result, response); + }}); _tcpClient->setConnectionCallback([weakPtr](const trantor::TcpConnectionPtr &connPtr) { auto thisPtr = weakPtr.lock(); if (!thisPtr) @@ -174,19 +177,18 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req, connPtr->setContext(HttpResponseParser(connPtr)); //send request; LOG_TRACE << "Connection established!"; - auto req = thisPtr->_reqAndCallbacks.front().first; - thisPtr->sendReq(connPtr, req); + while (thisPtr->_pipeliningCallbacks.size() <= thisPtr->_pipeliningDepth && + !thisPtr->_requestsBuffer.empty()) + { + thisPtr->sendReq(connPtr, thisPtr->_requestsBuffer.front().first); + thisPtr->_pipeliningCallbacks.push(std::move(thisPtr->_requestsBuffer.front().second)); + thisPtr->_requestsBuffer.pop(); + } } else { LOG_TRACE << "connection disconnect"; - while (!(thisPtr->_reqAndCallbacks.empty())) - { - auto reqCallback = thisPtr->_reqAndCallbacks.front().second; - thisPtr->_reqAndCallbacks.pop(); - reqCallback(ReqResult::NetworkFailure, HttpResponse::newHttpResponse()); - } - thisPtr->_tcpClient.reset(); + thisPtr->onError(ReqResult::NetworkFailure); } }); _tcpClient->setConnectionErrorCallback([weakPtr]() { @@ -194,13 +196,7 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req, if (!thisPtr) return; //can't connect to server - while (!(thisPtr->_reqAndCallbacks.empty())) - { - auto reqCallback = thisPtr->_reqAndCallbacks.front().second; - thisPtr->_reqAndCallbacks.pop(); - reqCallback(ReqResult::BadServerAddress, HttpResponse::newHttpResponse()); - } - thisPtr->_tcpClient.reset(); + thisPtr->onError(ReqResult::BadServerAddress); }); _tcpClient->setMessageCallback([weakPtr](const trantor::TcpConnectionPtr &connPtr, trantor::MsgBuffer *msg) { auto thisPtr = weakPtr.lock(); @@ -222,18 +218,29 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req, { //send request; auto connPtr = _tcpClient->connection(); + auto thisPtr = shared_from_this(); if (connPtr && connPtr->connected()) { - if (_reqAndCallbacks.empty()) + if (_pipeliningCallbacks.size() <= _pipeliningDepth && _requestsBuffer.empty()) { sendReq(connPtr, req); + _pipeliningCallbacks.push([thisPtr, callback](ReqResult result, const HttpResponsePtr &response) { + callback(result, response); + }); + } + else + { + _requestsBuffer.push({req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) { + callback(result, response); + }}); } } - auto thisPtr = shared_from_this(); - _reqAndCallbacks.push(std::make_pair(req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) { - //thisPtr.reset(); - callback(result, response); - })); + else + { + _requestsBuffer.push({req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) { + callback(result, response); + }}); + } } } @@ -253,50 +260,57 @@ void HttpClientImpl::onRecvMessage(const trantor::TcpConnectionPtr &connPtr, tra HttpResponseParser *responseParser = any_cast(connPtr->getMutableContext()); //LOG_TRACE << "###:" << msg->readableBytes(); - if (!responseParser->parseResponse(msg)) + while (msg->readableBytes() > 0) { - assert(!_reqAndCallbacks.empty()); - auto cb = _reqAndCallbacks.front().second; - cb(ReqResult::BadResponse, HttpResponse::newHttpResponse()); - _reqAndCallbacks.pop(); - - _tcpClient.reset(); - return; - } - - if (responseParser->gotAll()) - { - auto resp = responseParser->responseImpl(); - responseParser->reset(); - - assert(!_reqAndCallbacks.empty()); - - auto &type = resp->getHeaderBy("content-type"); - if (type.find("application/json") != std::string::npos) + if (!responseParser->parseResponse(msg)) { - resp->parseJson(); + assert(!_pipeliningCallbacks.empty()); + onError(ReqResult::BadResponse); + return; } - - if (resp->getHeaderBy("content-encoding") == "gzip") + if (responseParser->gotAll()) { - resp->gunzip(); - } - auto &cb = _reqAndCallbacks.front().second; - cb(ReqResult::Ok, resp); - _reqAndCallbacks.pop(); + auto resp = responseParser->responseImpl(); + responseParser->reset(); - LOG_TRACE << "req buffer size=" << _reqAndCallbacks.size(); - if (!_reqAndCallbacks.empty()) - { - auto req = _reqAndCallbacks.front().first; - sendReq(connPtr, req); + assert(!_pipeliningCallbacks.empty()); + + auto &type = resp->getHeaderBy("content-type"); + if (type.find("application/json") != std::string::npos) + { + resp->parseJson(); + } + + if (resp->getHeaderBy("content-encoding") == "gzip") + { + resp->gunzip(); + } + + auto cb = std::move(_pipeliningCallbacks.front()); + _pipeliningCallbacks.pop(); + cb(ReqResult::Ok, resp); + + // LOG_TRACE << "pipelining buffer size=" << _pipeliningCallbacks.size(); + // LOG_TRACE << "requests buffer size=" << _requestsBuffer.size(); + + if (!_requestsBuffer.empty()) + { + auto &reqAndCb = _requestsBuffer.front(); + sendReq(connPtr, reqAndCb.first); + _pipeliningCallbacks.push(std::move(reqAndCb.second)); + _requestsBuffer.pop(); + } + else + { + if (resp->closeConnection() && _pipeliningCallbacks.empty()) + { + _tcpClient.reset(); + } + } } else { - if (resp->closeConnection()) - { - _tcpClient.reset(); - } + break; } } } @@ -311,3 +325,20 @@ HttpClientPtr HttpClient::newHttpClient(const std::string &hostString, trantor:: { return std::make_shared(loop == nullptr ? HttpAppFrameworkImpl::instance().getLoop() : loop, hostString); } + +void HttpClientImpl::onError(ReqResult result) +{ + while (!_pipeliningCallbacks.empty()) + { + auto cb = std::move(_pipeliningCallbacks.front()); + _pipeliningCallbacks.pop(); + cb(result, HttpResponse::newHttpResponse()); + } + while (!_requestsBuffer.empty()) + { + auto cb = std::move(_requestsBuffer.front().second); + _requestsBuffer.pop(); + cb(result, HttpResponse::newHttpResponse()); + } + _tcpClient.reset(); +} \ No newline at end of file diff --git a/lib/src/HttpClientImpl.h b/lib/src/HttpClientImpl.h index fab55b9a..5e0a1eea 100644 --- a/lib/src/HttpClientImpl.h +++ b/lib/src/HttpClientImpl.h @@ -19,6 +19,7 @@ #include #include #include + namespace drogon { class HttpClientImpl : public HttpClient, public std::enable_shared_from_this @@ -29,6 +30,10 @@ class HttpClientImpl : public HttpClient, public std::enable_shared_from_this> _reqAndCallbacks; + std::queue _pipeliningCallbacks; + std::queue> _requestsBuffer; void onRecvMessage(const trantor::TcpConnectionPtr &, trantor::MsgBuffer *); + void onError(ReqResult result); std::string _domain; + size_t _pipeliningDepth = 0; }; } // namespace drogon