Fix some race conditions (#280)

* Fix a race condition of static files

* Fix a race condition of 404 pages
This commit is contained in:
An Tao 2019-10-17 09:23:14 +08:00 committed by GitHub
parent ddc41f7907
commit d830c4f057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 154 additions and 85 deletions

View File

@ -278,6 +278,8 @@ int main()
}
std::cout << std::get<2>(info) << std::endl;
}
auto resp = HttpResponse::newFileResponse("index.html");
resp->setExpiredTime(0);
app().setCustom404Page(resp);
app().run();
}

View File

@ -24,6 +24,7 @@
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <future>
#include <assert.h>
#define WHEELS_NUM 4
@ -127,17 +128,17 @@ class CacheMap
};
~CacheMap()
{
_loop->invalidateTimer(_timerId);
{
std::lock_guard<std::mutex> guard(_mtx);
_map.clear();
}
for (int i = _wheels.size() - 1; i >= 0; i--)
{
_wheels[i].clear();
std::lock_guard<std::mutex> lock(_bucketMutex);
for (int i = _wheels.size() - 1; i >= 0; i--)
{
_wheels[i].clear();
}
}
LOG_TRACE << "CacheMap destruct!";
}
typedef struct MapValue
@ -293,6 +294,15 @@ class CacheMap
std::lock_guard<std::mutex> lock(_mtx);
_map.erase(key);
}
/**
* @brief Get the event loop object
*
* @return trantor::EventLoop*
*/
trantor::EventLoop *getLoop()
{
return _loop;
}
private:
std::unordered_map<T1, MapValue> _map;

View File

@ -14,13 +14,13 @@
#pragma once
#include <drogon/HttpAppFramework.h>
#include <trantor/utils/NonCopyable.h>
#include <memory>
#include <vector>
#include <limits>
#include <functional>
#include <drogon/HttpAppFramework.h>
namespace drogon
{
/**
@ -56,22 +56,17 @@ namespace drogon
* };
* @endcode
*/
template <typename C, typename... Args>
class IOThreadStorage
template <typename C>
class IOThreadStorage : public trantor::NonCopyable
{
static_assert(std::is_constructible<C, Args &&...>::value,
"Unable to construct storage with given signature");
public:
using InitCallback = std::function<void(C &, size_t)>;
template <typename... Args>
IOThreadStorage(Args &&... args)
: IOThreadStorage(std::forward(args)..., [](C &, size_t) {})
{
}
IOThreadStorage(Args &&... args, const InitCallback &initCB)
{
static_assert(std::is_constructible<C, Args &&...>::value,
"Unable to construct storage with given signature");
size_t numThreads = app().getThreadNum();
assert(numThreads > 0 &&
numThreads != std::numeric_limits<size_t>::max());
@ -81,7 +76,14 @@ class IOThreadStorage
for (size_t i = 0; i <= numThreads; ++i)
{
_storage.emplace_back(std::forward(args)...);
_storage.emplace_back(std::forward<Args>(args)...);
}
}
void init(const InitCallback &initCB)
{
for (size_t i = 0; i < _storage.size(); ++i)
{
initCB(_storage[i], i);
}
}

View File

@ -410,7 +410,7 @@ void HttpAppFrameworkImpl::run()
ioLoops.pop_back();
_httpCtrlsRouterPtr->init(ioLoops);
_httpSimpleCtrlsRouterPtr->init(ioLoops);
_staticFileRouterPtr->init();
_staticFileRouterPtr->init(ioLoops);
_websockCtrlsRouterPtr->init();
if (_useSession)
@ -764,4 +764,30 @@ void HttpAppFrameworkImpl::quit()
{
getLoop()->queueInLoop([this]() { getLoop()->quit(); });
}
}
const HttpResponsePtr &HttpAppFrameworkImpl::getCustom404Page()
{
if (!_custom404)
{
return _custom404;
}
auto loop = trantor::EventLoop::getEventLoopOfCurrentThread();
if (loop && loop->index() < app().getThreadNum())
{
// If the current thread is an IO thread
static IOThreadStorage<HttpResponsePtr> thread404Pages;
static std::once_flag once;
std::call_once(once, [this] {
thread404Pages.init([this](HttpResponsePtr &resp, size_t index) {
resp = std::make_shared<HttpResponseImpl>(
*static_cast<HttpResponseImpl *>(_custom404.get()));
});
});
return thread404Pages.getThreadData();
}
else
{
return _custom404;
}
}

View File

@ -81,10 +81,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework
return *this;
}
const HttpResponsePtr &getCustom404Page()
{
return _custom404;
}
const HttpResponsePtr &getCustom404Page();
virtual void forward(
const HttpRequestPtr &req,

View File

@ -398,7 +398,7 @@ void HttpControllersRouter::doControllerHandler(
paraList,
req,
[=, callback = std::move(callback)](const HttpResponsePtr &resp) {
if (resp->expiredTime() >= 0)
if (resp->expiredTime() >= 0 && resp->statusCode() != k404NotFound)
{
// cache the response;
static_cast<HttpResponseImpl *>(resp.get())->makeHeaderString();

View File

@ -16,6 +16,7 @@
#include "HttpAppFrameworkImpl.h"
#include "HttpUtils.h"
#include <drogon/HttpViewData.h>
#include <drogon/IOThreadStorage.h>
#include <fstream>
#include <memory>
#include <stdio.h>
@ -82,27 +83,50 @@ void HttpResponseImpl::generateBodyFromJson()
HttpResponsePtr HttpResponse::newNotFoundResponse()
{
auto loop = trantor::EventLoop::getEventLoopOfCurrentThread();
auto &resp = HttpAppFrameworkImpl::instance().getCustom404Page();
if (resp)
{
return resp;
if (loop && loop->index() < app().getThreadNum())
{
return resp;
}
else
{
return HttpResponsePtr{new HttpResponseImpl(
*static_cast<HttpResponseImpl *>(resp.get()))};
}
}
else
{
static std::once_flag once;
static HttpResponsePtr notFoundResp;
std::call_once(once, []() {
if (loop && loop->index() < app().getThreadNum())
{
// If the current thread is an IO thread
static std::once_flag threadOnce;
static IOThreadStorage<HttpResponsePtr> thread404Pages;
std::call_once(threadOnce, [] {
thread404Pages.init([](drogon::HttpResponsePtr &resp,
size_t index) {
HttpViewData data;
data.insert("version", getVersion());
resp = HttpResponse::newHttpViewResponse("drogon::NotFound",
data);
resp->setStatusCode(k404NotFound);
resp->setExpiredTime(0);
});
});
LOG_TRACE << "Use cached 404 response";
return thread404Pages.getThreadData();
}
else
{
HttpViewData data;
data.insert("version", getVersion());
notFoundResp =
auto notFoundResp =
HttpResponse::newHttpViewResponse("drogon::NotFound", data);
notFoundResp->setStatusCode(k404NotFound);
notFoundResp->setExpiredTime(0);
auto str = static_cast<HttpResponseImpl *>(notFoundResp.get())
->renderToString();
LOG_TRACE << *str;
});
return notFoundResp;
return notFoundResp;
}
}
}
HttpResponsePtr HttpResponse::newRedirectionResponse(
@ -584,6 +608,8 @@ void HttpResponseImpl::clear()
_leftBodyLength = 0;
_currentChunkLength = 0;
_jsonPtr.reset();
_expriedTime = -1;
_datePos = std::string::npos;
}
void HttpResponseImpl::parseJson() const

View File

@ -240,6 +240,7 @@ class HttpResponseImpl : public HttpResponse
virtual void setExpiredTime(ssize_t expiredTime) override
{
_expriedTime = expiredTime;
_datePos = std::string::npos;
}
virtual ssize_t expiredTime() const override

View File

@ -221,7 +221,8 @@ void HttpSimpleControllersRouter::doControllerHandler(
[this, req, callback = std::move(callback), &ctrlBinderPtr](
const HttpResponsePtr &resp) {
auto newResp = resp;
if (resp->expiredTime() >= 0)
if (resp->expiredTime() >= 0 &&
resp->statusCode() != k404NotFound)
{
// cache the response;
static_cast<HttpResponseImpl *>(resp.get())

View File

@ -186,6 +186,8 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
void ListenerManager::startListening()
{
if (_listeners.size() == 0)
return;
for (auto &loopThread : _listeningloopThreads)
{
loopThread->run();

View File

@ -26,15 +26,21 @@
using namespace drogon;
void StaticFileRouter::init()
void StaticFileRouter::init(const std::vector<trantor::EventLoop *> &ioloops)
{
_responseCachingMap =
std::unique_ptr<CacheMap<std::string, HttpResponsePtr>>(
new CacheMap<std::string, HttpResponsePtr>(
HttpAppFrameworkImpl::instance().getLoop(),
1.0,
4,
50)); // Max timeout up to about 70 days;
// Max timeout up to about 70 days;
_staticFilesCacheMap = decltype(_staticFilesCacheMap)(
new IOThreadStorage<std::unique_ptr<CacheMap<std::string, char>>>);
_staticFilesCacheMap->init(
[&ioloops](std::unique_ptr<CacheMap<std::string, char>> &mapPtr,
size_t i) {
assert(i == ioloops[i]->index());
mapPtr = std::unique_ptr<CacheMap<std::string, char>>(
new CacheMap<std::string, char>(ioloops[i], 1.0, 4, 50));
});
_staticFilesCache = decltype(_staticFilesCache)(
new IOThreadStorage<
std::unordered_map<std::string, HttpResponsePtr>>{});
}
void StaticFileRouter::route(
@ -62,16 +68,11 @@ void StaticFileRouter::route(
}
// find cached response
HttpResponsePtr cachedResp;
auto &cacheMap = _staticFilesCache->getThreadData();
auto iter = cacheMap.find(filePath);
if (iter != cacheMap.end())
{
std::lock_guard<std::mutex> guard(_staticFilesCacheMutex);
if (_staticFilesCache.find(filePath) != _staticFilesCache.end())
{
cachedResp = _staticFilesCache[filePath].lock();
if (!cachedResp)
{
_staticFilesCache.erase(filePath);
}
}
cachedResp = iter->second;
}
// check last modified time,rfc2616-14.25
@ -128,6 +129,7 @@ void StaticFileRouter::route(
if (cachedResp)
{
LOG_TRACE << "Using file cache";
HttpAppFrameworkImpl::instance().callCallback(req,
cachedResp,
callback);
@ -160,18 +162,18 @@ void StaticFileRouter::route(
// cache the response for 5 seconds by default
if (_staticFilesCacheTime >= 0)
{
LOG_TRACE << "Save in cache for " << _staticFilesCacheTime
<< " seconds";
resp->setExpiredTime(_staticFilesCacheTime);
_responseCachingMap->insert(
filePath, resp, resp->expiredTime(), [=]() {
std::lock_guard<std::mutex> guard(
_staticFilesCacheMutex);
_staticFilesCache.erase(filePath);
_staticFilesCache->getThreadData()[filePath] = resp;
_staticFilesCacheMap->getThreadData()->insert(
filePath, 0, _staticFilesCacheTime, [this, filePath]() {
LOG_TRACE << "Erase cache";
assert(_staticFilesCache->getThreadData().find(
filePath) !=
_staticFilesCache->getThreadData().end());
_staticFilesCache->getThreadData().erase(filePath);
});
{
std::lock_guard<std::mutex> guard(
_staticFilesCacheMutex);
_staticFilesCache[filePath] = resp;
}
}
HttpAppFrameworkImpl::instance().callCallback(req,
resp,

View File

@ -16,6 +16,7 @@
#include "impl_forwards.h"
#include <drogon/CacheMap.h>
#include <drogon/IOThreadStorage.h>
#include <functional>
#include <set>
#include <string>
@ -41,7 +42,7 @@ class StaticFileRouter
{
_gzipStaticFlag = useGzipStatic;
}
void init();
void init(const std::vector<trantor::EventLoop *> &ioloops);
private:
std::set<std::string> _fileTypeSet = {"html",
@ -64,14 +65,14 @@ class StaticFileRouter
"ico",
"icns"};
std::unique_ptr<drogon::CacheMap<std::string, HttpResponsePtr>>
_responseCachingMap;
int _staticFilesCacheTime = 5;
bool _enableLastModify = true;
bool _gzipStaticFlag = true;
std::unordered_map<std::string, std::weak_ptr<HttpResponse>>
std::unique_ptr<
IOThreadStorage<std::unique_ptr<CacheMap<std::string, char>>>>
_staticFilesCacheMap;
std::unique_ptr<
IOThreadStorage<std::unordered_map<std::string, HttpResponsePtr>>>
_staticFilesCache;
std::mutex _staticFilesCacheMutex;
};
} // namespace drogon

View File

@ -39,21 +39,20 @@ void DbClientManager::createDbClients(
if (dbInfo._dbType == drogon::orm::ClientType::PostgreSQL ||
dbInfo._dbType == drogon::orm::ClientType::Mysql)
{
_dbFastClientsMap.insert(
{dbInfo._name,
IOThreadStorage<orm::DbClientPtr>([&](orm::DbClientPtr &c,
size_t idx) {
assert(idx == ioloops[idx]->index());
LOG_TRACE
<< "create fast database client for the thread "
<< idx;
c = std::shared_ptr<orm::DbClient>(
new drogon::orm::DbClientLockFree(
dbInfo._connectionInfo,
ioloops[idx],
dbInfo._dbType,
dbInfo._connectionNumber));
})});
_dbFastClientsMap[dbInfo._name] =
IOThreadStorage<orm::DbClientPtr>();
_dbFastClientsMap[dbInfo._name].init([&](orm::DbClientPtr &c,
size_t idx) {
assert(idx == ioloops[idx]->index());
LOG_TRACE << "create fast database client for the thread "
<< idx;
c = std::shared_ptr<orm::DbClient>(
new drogon::orm::DbClientLockFree(
dbInfo._connectionInfo,
ioloops[idx],
dbInfo._dbType,
dbInfo._connectionNumber));
});
}
}
else