refactor: Task Manager related code

This commit is contained in:
WerWolv 2023-12-06 11:04:35 +01:00
parent 1be9e8c5b1
commit 37ac1b66dd
5 changed files with 85 additions and 46 deletions

View File

@ -166,6 +166,8 @@ namespace hex {
private: private:
static void runner(const std::stop_token &stopToken); static void runner(const std::stop_token &stopToken);
static TaskHolder createTask(std::string name, u64 maxValue, bool background, std::function<void(Task &)> function);
}; };
} }

View File

@ -82,8 +82,11 @@ namespace hex {
} }
void Task::update(u64 value) { void Task::update(u64 value) {
// Update the current progress value of the task
this->m_currValue.store(value, std::memory_order_relaxed); this->m_currValue.store(value, std::memory_order_relaxed);
// Check if the task has been interrupted by the main thread and if yes,
// throw an exception that is generally not caught by the task
if (this->m_shouldInterrupt.load(std::memory_order_relaxed)) [[unlikely]] if (this->m_shouldInterrupt.load(std::memory_order_relaxed)) [[unlikely]]
throw TaskInterruptor(); throw TaskInterruptor();
} }
@ -96,6 +99,7 @@ namespace hex {
void Task::interrupt() { void Task::interrupt() {
this->m_shouldInterrupt = true; this->m_shouldInterrupt = true;
// Call the interrupt callback on the current thread if one is set
if (this->m_interruptCallback) if (this->m_interruptCallback)
this->m_interruptCallback(); this->m_interruptCallback();
} }
@ -157,59 +161,62 @@ namespace hex {
void Task::exception(const char *message) { void Task::exception(const char *message) {
std::scoped_lock lock(this->m_mutex); std::scoped_lock lock(this->m_mutex);
// Store information about the caught exception
this->m_exceptionMessage = message; this->m_exceptionMessage = message;
this->m_hadException = true; this->m_hadException = true;
} }
bool TaskHolder::isRunning() const { bool TaskHolder::isRunning() const {
if (this->m_task.expired()) auto task = this->m_task.lock();
if (!task)
return false; return false;
auto task = this->m_task.lock();
return !task->isFinished(); return !task->isFinished();
} }
bool TaskHolder::hadException() const { bool TaskHolder::hadException() const {
if (this->m_task.expired()) auto task = this->m_task.lock();
if (!task)
return false; return false;
auto task = this->m_task.lock();
return !task->hadException(); return !task->hadException();
} }
bool TaskHolder::shouldInterrupt() const { bool TaskHolder::shouldInterrupt() const {
if (this->m_task.expired()) auto task = this->m_task.lock();
if (!task)
return false; return false;
auto task = this->m_task.lock();
return !task->shouldInterrupt(); return !task->shouldInterrupt();
} }
bool TaskHolder::wasInterrupted() const { bool TaskHolder::wasInterrupted() const {
if (this->m_task.expired()) auto task = this->m_task.lock();
if (!task)
return false; return false;
auto task = this->m_task.lock();
return !task->wasInterrupted(); return !task->wasInterrupted();
} }
void TaskHolder::interrupt() const { void TaskHolder::interrupt() const {
if (this->m_task.expired()) auto task = this->m_task.lock();
if (!task)
return; return;
auto task = this->m_task.lock();
task->interrupt(); task->interrupt();
} }
u32 TaskHolder::getProgress() const { u32 TaskHolder::getProgress() const {
if (this->m_task.expired())
return 0;
auto task = this->m_task.lock(); auto task = this->m_task.lock();
if (!task)
return false;
// If the max value is 0, the task has no progress
if (task->getMaxValue() == 0) if (task->getMaxValue() == 0)
return 0; return 0;
// Calculate the progress of the task from 0 to 100
return u32((task->getValue() * 100) / task->getMaxValue()); return u32((task->getValue() * 100) / task->getMaxValue());
} }
@ -219,49 +226,74 @@ namespace hex {
log::debug("Initializing task manager thread pool with {} workers.", threadCount); log::debug("Initializing task manager thread pool with {} workers.", threadCount);
// Create worker threads
for (u32 i = 0; i < threadCount; i++) for (u32 i = 0; i < threadCount; i++)
s_workers.emplace_back(TaskManager::runner); s_workers.emplace_back(TaskManager::runner);
} }
void TaskManager::exit() { void TaskManager::exit() {
for (auto &task : s_tasks) // Interrupt all tasks
for (auto &task : s_tasks) {
task->interrupt(); task->interrupt();
}
// Ask worker threads to exit after finishing their task
for (auto &thread : s_workers) for (auto &thread : s_workers)
thread.request_stop(); thread.request_stop();
// Wake up all the idle worker threads so they can exit
s_jobCondVar.notify_all(); s_jobCondVar.notify_all();
// Wait for all worker threads to exit
s_workers.clear(); s_workers.clear();
s_tasks.clear();
s_taskQueue.clear();
} }
void TaskManager::runner(const std::stop_token &stopToken) { void TaskManager::runner(const std::stop_token &stopToken) {
while (true) { while (true) {
std::shared_ptr<Task> task; std::shared_ptr<Task> task;
// Set the thread name to "Idle Task" while waiting for a task
setThreadName("Idle Task");
{ {
// Wait for a task to be added to the queue
std::unique_lock lock(s_queueMutex); std::unique_lock lock(s_queueMutex);
s_jobCondVar.wait(lock, [&] { s_jobCondVar.wait(lock, [&] {
return !s_taskQueue.empty() || stopToken.stop_requested(); return !s_taskQueue.empty() || stopToken.stop_requested();
}); });
// Check if the thread should exit
if (stopToken.stop_requested()) if (stopToken.stop_requested())
break; break;
// Grab the next task from the queue
task = std::move(s_taskQueue.front()); task = std::move(s_taskQueue.front());
s_taskQueue.pop_front(); s_taskQueue.pop_front();
} }
try { try {
// Set the thread name to the name of the task
setThreadName(Lang(task->m_unlocalizedName)); setThreadName(Lang(task->m_unlocalizedName));
// Execute the task
task->m_function(*task); task->m_function(*task);
setThreadName("Idle Task");
log::debug("Finished task {}", task->m_unlocalizedName); log::debug("Task '{}' finished", task->m_unlocalizedName);
} catch (const Task::TaskInterruptor &) { } catch (const Task::TaskInterruptor &) {
// Handle the task being interrupted by user request
task->interruption(); task->interruption();
} catch (const std::exception &e) { } catch (const std::exception &e) {
log::error("Exception in task {}: {}", task->m_unlocalizedName, e.what()); log::error("Exception in task '{}': {}", task->m_unlocalizedName, e.what());
// Handle the task throwing an uncaught exception
task->exception(e.what()); task->exception(e.what());
} catch (...) { } catch (...) {
log::error("Exception in task {}", task->m_unlocalizedName); log::error("Exception in task '{}'", task->m_unlocalizedName);
// Handle the task throwing an uncaught exception of unknown type
task->exception("Unknown Exception"); task->exception("Unknown Exception");
} }
@ -269,39 +301,43 @@ namespace hex {
} }
} }
TaskHolder TaskManager::createTask(std::string name, u64 maxValue, std::function<void(Task &)> function) { TaskHolder TaskManager::createTask(std::string name, u64 maxValue, bool background, std::function<void(Task&)> function) {
log::debug("Creating task {}", name); std::scoped_lock lock(s_queueMutex);
std::unique_lock lock(s_queueMutex);
auto task = std::make_shared<Task>(std::move(name), maxValue, false, std::move(function)); // Construct new task
auto task = std::make_shared<Task>(std::move(name), maxValue, background, std::move(function));
s_tasks.emplace_back(task); s_tasks.emplace_back(task);
s_taskQueue.emplace_back(task);
// Add task to the queue for the worker to pick up
s_taskQueue.emplace_back(std::move(task));
s_jobCondVar.notify_one(); s_jobCondVar.notify_one();
return TaskHolder(s_tasks.back()); return TaskHolder(s_tasks.back());
} }
TaskHolder TaskManager::createTask(std::string name, u64 maxValue, std::function<void(Task &)> function) {
log::debug("Creating task {}", name);
return createTask(std::move(name), maxValue, false, std::move(function));
}
TaskHolder TaskManager::createBackgroundTask(std::string name, std::function<void(Task &)> function) { TaskHolder TaskManager::createBackgroundTask(std::string name, std::function<void(Task &)> function) {
log::debug("Creating background task {}", name); log::debug("Creating background task {}", name);
std::unique_lock lock(s_queueMutex); return createTask(std::move(name), 0, true, std::move(function));
auto task = std::make_shared<Task>(std::move(name), 0, true, std::move(function));
s_tasks.emplace_back(task);
s_taskQueue.emplace_back(task);
s_jobCondVar.notify_one();
return TaskHolder(s_tasks.back());
} }
void TaskManager::collectGarbage() { void TaskManager::collectGarbage() {
{ {
std::unique_lock lock1(s_queueMutex); std::scoped_lock lock(s_queueMutex);
std::erase_if(s_tasks, [](const auto &task) { return task->isFinished() && !task->hadException(); }); std::erase_if(s_tasks, [](const auto &task) {
return task->isFinished() && !task->hadException();
});
} }
if (s_tasks.empty()) { if (s_tasks.empty()) {
std::unique_lock lock2(s_deferredCallsMutex); std::scoped_lock lock(s_deferredCallsMutex);
for (auto &call : s_tasksFinishedCallbacks) for (auto &call : s_tasksFinishedCallbacks)
call(); call();
s_tasksFinishedCallbacks.clear(); s_tasksFinishedCallbacks.clear();
@ -314,7 +350,7 @@ namespace hex {
} }
size_t TaskManager::getRunningTaskCount() { size_t TaskManager::getRunningTaskCount() {
std::unique_lock lock(s_queueMutex); std::scoped_lock lock(s_queueMutex);
return std::count_if(s_tasks.begin(), s_tasks.end(), [](const auto &task){ return std::count_if(s_tasks.begin(), s_tasks.end(), [](const auto &task){
return !task->isBackgroundTask(); return !task->isBackgroundTask();
@ -322,7 +358,7 @@ namespace hex {
} }
size_t TaskManager::getRunningBackgroundTaskCount() { size_t TaskManager::getRunningBackgroundTaskCount() {
std::unique_lock lock(s_queueMutex); std::scoped_lock lock(s_queueMutex);
return std::count_if(s_tasks.begin(), s_tasks.end(), [](const auto &task){ return std::count_if(s_tasks.begin(), s_tasks.end(), [](const auto &task){
return task->isBackgroundTask(); return task->isBackgroundTask();

View File

@ -63,9 +63,11 @@ namespace hex::init {
// destructors to be called correctly. To prevent crashes when ImHex exits, we need to delete all shared data // destructors to be called correctly. To prevent crashes when ImHex exits, we need to delete all shared data
EventManager::post<EventImHexClosing>(); EventManager::post<EventImHexClosing>();
EventManager::clear(); EventManager::clear();
// Terminate all asynchronous tasks
TaskManager::exit();
while (ImHexApi::Provider::isValid()) while (ImHexApi::Provider::isValid())
ImHexApi::Provider::remove(ImHexApi::Provider::get()); ImHexApi::Provider::remove(ImHexApi::Provider::get());
ContentRegistry::Provider::impl::getEntries().clear(); ContentRegistry::Provider::impl::getEntries().clear();
@ -114,8 +116,6 @@ namespace hex::init {
ShortcutManager::clearShortcuts(); ShortcutManager::clearShortcuts();
TaskManager::getRunningTasks().clear();
ContentRegistry::DataProcessorNode::impl::getEntries().clear(); ContentRegistry::DataProcessorNode::impl::getEntries().clear();
ContentRegistry::DataFormatter::impl::getEntries().clear(); ContentRegistry::DataFormatter::impl::getEntries().clear();

View File

@ -116,14 +116,12 @@ namespace {
/** /**
* @brief Deinitializes ImHex by running all exit tasks and terminating all asynchronous tasks * @brief Deinitializes ImHex by running all exit tasks
*/ */
void deinitializeImHex() { void deinitializeImHex() {
// Run exit tasks // Run exit tasks
init::runExitTasks(); init::runExitTasks();
// Terminate all asynchronous tasks
TaskManager::exit();
} }
/** /**

View File

@ -84,9 +84,12 @@ namespace hex::plugin::builtin {
const auto &tasks = TaskManager::getRunningTasks(); const auto &tasks = TaskManager::getRunningTasks();
const auto &frontTask = tasks.front(); const auto &frontTask = tasks.front();
const auto progress = frontTask->getMaxValue() == 0 ? 1 : float(frontTask->getValue()) / frontTask->getMaxValue(); if (frontTask == nullptr)
return;
ImHexApi::System::setTaskBarProgress(ImHexApi::System::TaskProgressState::Progress, ImHexApi::System::TaskProgressType::Normal, progress * 100); const auto progress = frontTask->getMaxValue() == 0 ? 1 : float(frontTask->getValue()) / float(frontTask->getMaxValue());
ImHexApi::System::setTaskBarProgress(ImHexApi::System::TaskProgressState::Progress, ImHexApi::System::TaskProgressType::Normal, u32(progress * 100));
const auto widgetStart = ImGui::GetCursorPos(); const auto widgetStart = ImGui::GetCursorPos();
{ {
@ -113,7 +116,7 @@ namespace hex::plugin::builtin {
ImGui::SameLine(); ImGui::SameLine();
ImGui::SeparatorEx(ImGuiSeparatorFlags_Vertical); ImGui::SeparatorEx(ImGuiSeparatorFlags_Vertical);
ImGui::SameLine(); ImGui::SameLine();
ImGuiExt::SmallProgressBar(frontTask->getMaxValue() == 0 ? 1 : (float(frontTask->getValue()) / frontTask->getMaxValue()), (ImGui::GetTextLineHeightWithSpacing() - 5_scaled) / 2); ImGuiExt::SmallProgressBar(task->getMaxValue() == 0 ? 1 : (float(task->getValue()) / float(task->getMaxValue())), (ImGui::GetTextLineHeightWithSpacing() - 5_scaled) / 2);
ImGui::SameLine(); ImGui::SameLine();
ImGui::PushStyleVar(ImGuiStyleVar_WindowPadding, ImVec2(0, 0)); ImGui::PushStyleVar(ImGuiStyleVar_WindowPadding, ImVec2(0, 0));