Enhance HttpClient to support pipelining

This commit is contained in:
antao 2019-04-05 01:50:29 +08:00
parent d07d1ee7db
commit 62fad73bc9
6 changed files with 180 additions and 72 deletions

View File

@ -77,7 +77,6 @@ void create_controller::newSimpleControllerHeaderFile(std::ofstream &file, const
file << indent << "class " << class_name << ":public drogon::HttpSimpleController<" << class_name << ">\n";
file << indent << "{\n";
file << indent << "public:\n";
//TestController(){}
file << indent << " virtual void asyncHandleHttpRequest(const HttpRequestPtr& req,const std::function<void (const HttpResponsePtr &)> & callback) override;\n";
file << indent << " PATH_LIST_BEGIN\n";
@ -132,11 +131,6 @@ void create_controller::newWebsockControllerHeaderFile(std::ofstream &file, cons
file << indent << "class " << class_name << ":public drogon::WebSocketController<" << class_name << ">\n";
file << indent << "{\n";
file << indent << "public:\n";
//TestController(){}
// virtual void handleNewMessage(const WebSocketConnectionPtr&,
// trantor::MsgBuffer*)=0;
// //on new connection or after disconnect
// virtual void handleConnection(const WebSocketConnectionPtr&)=0;
file << indent << " virtual void handleNewMessage(const WebSocketConnectionPtr&,\n";
file << indent << " std::string &&)override;\n";
file << indent << " virtual void handleNewConnection(const HttpRequestPtr &,\n";
@ -211,6 +205,9 @@ void create_controller::newHttpControllerHeaderFile(std::ofstream &file, const s
file << indent << "//METHOD_ADD(" << class_name << "::your_method_name,\"/{1}/{2}/list\",Get);"
"//path is "
<< namepace_path << class_name << "/{arg1}/{arg2}/list\n";
file << indent << "//ADD_METHOD_TO(" << class_name << "::your_method_name,\"/absolute/path/{1}/{2}/list\",Get);"
"//path is "
<< namepace_path << "/absolute/path/{arg1}/{arg2}/list\n";
file << indent << "\n";
file << indent << "METHOD_LIST_END\n";
file << indent << "//your declaration of processing function maybe like this:\n";

View File

@ -16,8 +16,22 @@ int main()
if (conn->connected())
{
std::string str = "GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n"
"GET /pipe HTTP/1.1\r\n"
"\r\n";
for (int i = 0; i < 32; i++)
for (int i = 0; i < 4; i++)
conn->send(str);
}
else

View File

@ -276,7 +276,7 @@ void doTest(const HttpClientPtr &client, std::promise<int> &pro, bool isHttps =
req = HttpRequest::newHttpRequest();
req->setMethod(drogon::Get);
req->setPath("/api/v1/apitest/3.14/List");
req->setParameter("P2","1234");
req->setParameter("P2", "1234");
client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) {
if (result == ReqResult::Ok)
{
@ -377,7 +377,7 @@ void doTest(const HttpClientPtr &client, std::promise<int> &pro, bool isHttps =
req = HttpRequest::newHttpRequest();
req->setMethod(drogon::Get);
req->setPath("/api/v1/handle11/11/22/?p3=33");
req->setParameter("p4","44");
req->setParameter("p4", "44");
client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) {
if (result == ReqResult::Ok)
{
@ -582,6 +582,54 @@ void doTest(const HttpClientPtr &client, std::promise<int> &pro, bool isHttps =
exit(1);
}
});
/// Test pipelining
// req = HttpRequest::newHttpRequest();
// req->setPath("/pipe");
// auto flag = std::make_shared<int>(0);
// client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) {
// if (result == ReqResult::Ok)
// {
// if (resp->statusCode() == k200OK && *flag == 0)
// {
// ++(*flag);
// outputGood(req, isHttps);
// }
// else
// {
// LOG_DEBUG << resp->getBody();
// LOG_ERROR << "Error!";
// exit(1);
// }
// }
// else
// {
// LOG_ERROR << "Error!";
// exit(1);
// }
// });
// req = HttpRequest::newHttpRequest();
// req->setPath("/pipe");
// client->sendRequest(req, [=](ReqResult result, const HttpResponsePtr &resp) {
// if (result == ReqResult::Ok)
// {
// if (resp->statusCode() == k200OK && *flag == 1)
// {
// ++(*flag);
// outputGood(req, isHttps);
// }
// else
// {
// LOG_DEBUG << resp->getBody();
// LOG_ERROR << "Error!";
// exit(1);
// }
// }
// else
// {
// LOG_ERROR << "Error!";
// exit(1);
// }
// });
/// Test form post
req = HttpRequest::newHttpFormPostRequest();
req->setPath("/api/v1/apitest/form");
@ -653,6 +701,7 @@ void doTest(const HttpClientPtr &client, std::promise<int> &pro, bool isHttps =
else
{
LOG_DEBUG << resp->getBody().length();
LOG_DEBUG << resp->getBody();
LOG_ERROR << "Error!";
exit(1);
}
@ -679,6 +728,7 @@ int main(int argc, char *argv[])
{
std::promise<int> pro1;
auto client = HttpClient::newHttpClient("127.0.0.1", 8848, false, loop[0].getLoop());
client->setPipeliningDepth(10);
doTest(client, pro1);
#ifdef USE_OPENSSL
std::promise<int> pro2;

View File

@ -60,6 +60,14 @@ class HttpClient : public trantor::NonCopyable
*/
virtual void sendRequest(const HttpRequestPtr &req, const HttpReqCallback &callback) = 0;
virtual void sendRequest(const HttpRequestPtr &req, HttpReqCallback &&callback) = 0;
/// Set the pipelining depth, which is the number of requests that are not responding.
/**
* If this method is not called, the default depth value is 0 which means the pipelining is disabled.
* For details about pipelining, see rfc2616-8.1.2.2
*/
virtual void setPipeliningDepth(size_t depth) = 0;
/// Use ip and port to connect to server
/**
* If useSSL is set to true, the client
@ -75,7 +83,7 @@ class HttpClient : public trantor::NonCopyable
uint16_t port,
bool useSSL = false,
trantor::EventLoop *loop = nullptr);
/// Get the event loop of the client;
virtual trantor::EventLoop *loop() = 0;

View File

@ -162,9 +162,12 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
_tcpClient->enableSSL();
}
#endif
std::weak_ptr<HttpClientImpl> weakPtr = shared_from_this();
assert(_reqAndCallbacks.empty());
_reqAndCallbacks.push(std::make_pair(req, callback));
auto thisPtr = shared_from_this();
std::weak_ptr<HttpClientImpl> weakPtr = thisPtr;
assert(_requestsBuffer.empty());
_requestsBuffer.push({req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) {
callback(result, response);
}});
_tcpClient->setConnectionCallback([weakPtr](const trantor::TcpConnectionPtr &connPtr) {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
@ -174,19 +177,18 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
connPtr->setContext(HttpResponseParser(connPtr));
//send request;
LOG_TRACE << "Connection established!";
auto req = thisPtr->_reqAndCallbacks.front().first;
thisPtr->sendReq(connPtr, req);
while (thisPtr->_pipeliningCallbacks.size() <= thisPtr->_pipeliningDepth &&
!thisPtr->_requestsBuffer.empty())
{
thisPtr->sendReq(connPtr, thisPtr->_requestsBuffer.front().first);
thisPtr->_pipeliningCallbacks.push(std::move(thisPtr->_requestsBuffer.front().second));
thisPtr->_requestsBuffer.pop();
}
}
else
{
LOG_TRACE << "connection disconnect";
while (!(thisPtr->_reqAndCallbacks.empty()))
{
auto reqCallback = thisPtr->_reqAndCallbacks.front().second;
thisPtr->_reqAndCallbacks.pop();
reqCallback(ReqResult::NetworkFailure, HttpResponse::newHttpResponse());
}
thisPtr->_tcpClient.reset();
thisPtr->onError(ReqResult::NetworkFailure);
}
});
_tcpClient->setConnectionErrorCallback([weakPtr]() {
@ -194,13 +196,7 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
if (!thisPtr)
return;
//can't connect to server
while (!(thisPtr->_reqAndCallbacks.empty()))
{
auto reqCallback = thisPtr->_reqAndCallbacks.front().second;
thisPtr->_reqAndCallbacks.pop();
reqCallback(ReqResult::BadServerAddress, HttpResponse::newHttpResponse());
}
thisPtr->_tcpClient.reset();
thisPtr->onError(ReqResult::BadServerAddress);
});
_tcpClient->setMessageCallback([weakPtr](const trantor::TcpConnectionPtr &connPtr, trantor::MsgBuffer *msg) {
auto thisPtr = weakPtr.lock();
@ -222,18 +218,29 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
{
//send request;
auto connPtr = _tcpClient->connection();
auto thisPtr = shared_from_this();
if (connPtr && connPtr->connected())
{
if (_reqAndCallbacks.empty())
if (_pipeliningCallbacks.size() <= _pipeliningDepth && _requestsBuffer.empty())
{
sendReq(connPtr, req);
_pipeliningCallbacks.push([thisPtr, callback](ReqResult result, const HttpResponsePtr &response) {
callback(result, response);
});
}
else
{
_requestsBuffer.push({req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) {
callback(result, response);
}});
}
}
auto thisPtr = shared_from_this();
_reqAndCallbacks.push(std::make_pair(req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) {
//thisPtr.reset();
callback(result, response);
}));
else
{
_requestsBuffer.push({req, [thisPtr, callback](ReqResult result, const HttpResponsePtr &response) {
callback(result, response);
}});
}
}
}
@ -253,50 +260,57 @@ void HttpClientImpl::onRecvMessage(const trantor::TcpConnectionPtr &connPtr, tra
HttpResponseParser *responseParser = any_cast<HttpResponseParser>(connPtr->getMutableContext());
//LOG_TRACE << "###:" << msg->readableBytes();
if (!responseParser->parseResponse(msg))
while (msg->readableBytes() > 0)
{
assert(!_reqAndCallbacks.empty());
auto cb = _reqAndCallbacks.front().second;
cb(ReqResult::BadResponse, HttpResponse::newHttpResponse());
_reqAndCallbacks.pop();
_tcpClient.reset();
return;
}
if (responseParser->gotAll())
{
auto resp = responseParser->responseImpl();
responseParser->reset();
assert(!_reqAndCallbacks.empty());
auto &type = resp->getHeaderBy("content-type");
if (type.find("application/json") != std::string::npos)
if (!responseParser->parseResponse(msg))
{
resp->parseJson();
assert(!_pipeliningCallbacks.empty());
onError(ReqResult::BadResponse);
return;
}
if (resp->getHeaderBy("content-encoding") == "gzip")
if (responseParser->gotAll())
{
resp->gunzip();
}
auto &cb = _reqAndCallbacks.front().second;
cb(ReqResult::Ok, resp);
_reqAndCallbacks.pop();
auto resp = responseParser->responseImpl();
responseParser->reset();
LOG_TRACE << "req buffer size=" << _reqAndCallbacks.size();
if (!_reqAndCallbacks.empty())
{
auto req = _reqAndCallbacks.front().first;
sendReq(connPtr, req);
assert(!_pipeliningCallbacks.empty());
auto &type = resp->getHeaderBy("content-type");
if (type.find("application/json") != std::string::npos)
{
resp->parseJson();
}
if (resp->getHeaderBy("content-encoding") == "gzip")
{
resp->gunzip();
}
auto cb = std::move(_pipeliningCallbacks.front());
_pipeliningCallbacks.pop();
cb(ReqResult::Ok, resp);
// LOG_TRACE << "pipelining buffer size=" << _pipeliningCallbacks.size();
// LOG_TRACE << "requests buffer size=" << _requestsBuffer.size();
if (!_requestsBuffer.empty())
{
auto &reqAndCb = _requestsBuffer.front();
sendReq(connPtr, reqAndCb.first);
_pipeliningCallbacks.push(std::move(reqAndCb.second));
_requestsBuffer.pop();
}
else
{
if (resp->closeConnection() && _pipeliningCallbacks.empty())
{
_tcpClient.reset();
}
}
}
else
{
if (resp->closeConnection())
{
_tcpClient.reset();
}
break;
}
}
}
@ -311,3 +325,20 @@ HttpClientPtr HttpClient::newHttpClient(const std::string &hostString, trantor::
{
return std::make_shared<HttpClientImpl>(loop == nullptr ? HttpAppFrameworkImpl::instance().getLoop() : loop, hostString);
}
void HttpClientImpl::onError(ReqResult result)
{
while (!_pipeliningCallbacks.empty())
{
auto cb = std::move(_pipeliningCallbacks.front());
_pipeliningCallbacks.pop();
cb(result, HttpResponse::newHttpResponse());
}
while (!_requestsBuffer.empty())
{
auto cb = std::move(_requestsBuffer.front().second);
_requestsBuffer.pop();
cb(result, HttpResponse::newHttpResponse());
}
_tcpClient.reset();
}

View File

@ -19,6 +19,7 @@
#include <trantor/net/TcpClient.h>
#include <mutex>
#include <queue>
namespace drogon
{
class HttpClientImpl : public HttpClient, public std::enable_shared_from_this<HttpClientImpl>
@ -29,6 +30,10 @@ class HttpClientImpl : public HttpClient, public std::enable_shared_from_this<Ht
virtual void sendRequest(const HttpRequestPtr &req, const HttpReqCallback &callback) override;
virtual void sendRequest(const HttpRequestPtr &req, HttpReqCallback &&callback) override;
virtual trantor::EventLoop *loop() override { return _loop; }
virtual void setPipeliningDepth(size_t depth) override
{
_pipeliningDepth = depth;
}
~HttpClientImpl();
private:
@ -38,8 +43,11 @@ class HttpClientImpl : public HttpClient, public std::enable_shared_from_this<Ht
bool _useSSL;
void sendReq(const trantor::TcpConnectionPtr &connPtr, const HttpRequestPtr &req);
void sendRequestInLoop(const HttpRequestPtr &req, const HttpReqCallback &callback);
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> _reqAndCallbacks;
std::queue<HttpReqCallback> _pipeliningCallbacks;
std::queue<std::pair<HttpRequestPtr, HttpReqCallback>> _requestsBuffer;
void onRecvMessage(const trantor::TcpConnectionPtr &, trantor::MsgBuffer *);
void onError(ReqResult result);
std::string _domain;
size_t _pipeliningDepth = 0;
};
} // namespace drogon