mirror of
https://github.com/VCMP-SqMod/SqMod.git
synced 2025-01-19 12:07:13 +01:00
49df7b75ee
Extend worker threads to allow tasks to re-queue themselves during completion.
229 lines
7.2 KiB
C++
229 lines
7.2 KiB
C++
// ------------------------------------------------------------------------------------------------
|
|
#include "Core/ThreadPool.hpp"
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
#include <chrono>
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
namespace SqMod {
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
ThreadPool ThreadPool::s_Inst;
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
void ProcessThreads()
|
|
{
|
|
ThreadPool::Get().Process();
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
ThreadPool::ThreadPool() noexcept
|
|
: m_Running(false)
|
|
, m_Mutex()
|
|
, m_CV()
|
|
, m_Queue()
|
|
, m_Finished()
|
|
, m_Threads()
|
|
{
|
|
m_Threads.reserve(MAX_WORKER_THREADS + 1); // Reserve thread memory in advance
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
ThreadPool::~ThreadPool()
|
|
{
|
|
// Desperate attempt to gracefully shutdown
|
|
for (auto & t : m_Threads)
|
|
{
|
|
if (t.joinable())
|
|
{
|
|
t.join(); // Will block until work is finished!
|
|
}
|
|
}
|
|
// Clear all thread instances
|
|
m_Threads.clear();
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
bool ThreadPool::Initialize(uint32_t count)
|
|
{
|
|
// Are there any threads requested?
|
|
// Are we already running or have threads?
|
|
if (!count || m_Running || !m_Threads.empty())
|
|
{
|
|
return true; // Nothing to do!
|
|
}
|
|
else if (count > MAX_WORKER_THREADS)
|
|
{
|
|
count = MAX_WORKER_THREADS; // Hard coded worker limit
|
|
}
|
|
// Make sure the threads don't stop after creation
|
|
m_Running = true;
|
|
// Create the specified amount of worker threads
|
|
for (uint32_t i = 0; i < count; ++i)
|
|
{
|
|
m_Threads.emplace_back(&ThreadPool::WorkerProc, this);
|
|
}
|
|
// Thread pool initialized
|
|
return m_Running;
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown))
|
|
{
|
|
// Are there threads running?
|
|
if (m_Threads.empty() || !m_Running)
|
|
{
|
|
return; // Don't bother!
|
|
}
|
|
// Tell the threads to stop
|
|
m_Running = false;
|
|
{
|
|
std::lock_guard< std::mutex > lg(m_Mutex);
|
|
// Enqueue dummy items to wake the threads and allow them to stop
|
|
for (size_t n = 0; n < m_Threads.size(); ++n)
|
|
{
|
|
m_Queue.push(Item());
|
|
}
|
|
// Allow threads to process the dummy items and stop
|
|
m_CV.notify_all();
|
|
}
|
|
// Attempt to join the threads
|
|
for (auto & t : m_Threads)
|
|
{
|
|
if (t.joinable())
|
|
{
|
|
t.join(); // Will block until work is finished!
|
|
}
|
|
}
|
|
// Clear all thread instances
|
|
m_Threads.clear();
|
|
// Retrieve each item individually and process it
|
|
for (Item item; m_Finished.try_dequeue(item);)
|
|
{
|
|
// Is the item valid?
|
|
if (item)
|
|
{
|
|
[[maybe_unused]] auto _ = item->OnCompleted(true); // Allow the item to finish itself
|
|
}
|
|
// Item processed
|
|
item.reset();
|
|
}
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
void ThreadPool::Process()
|
|
{
|
|
// Process only what's currently in the queue
|
|
const size_t count = m_Finished.size_approx();
|
|
// Retrieve each item individually and process it
|
|
for (size_t n = 0; n <= count; ++n)
|
|
{
|
|
Item item;
|
|
// Try to get an item from the queue
|
|
if (m_Finished.try_dequeue(item))
|
|
{
|
|
// Is the item valid?
|
|
if (item)
|
|
{
|
|
try {
|
|
// Allow the item to finish itself
|
|
if (item->OnCompleted(false))
|
|
{
|
|
Enqueue(std::move(item)); // Queue again
|
|
}
|
|
} catch (const std::exception & e) {
|
|
LogErr("Exception occured in %s completion stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
void ThreadPool::WorkerProc()
|
|
{
|
|
// Whether this item wants to try again
|
|
bool retry = false;
|
|
// Pointer to the dequeued item
|
|
Item item;
|
|
// Initialize third-party allocator for this thread
|
|
auto rpmallocinit = std::make_unique< RPMallocThreadInit >();
|
|
// Constantly process items from the queue
|
|
while (true)
|
|
{
|
|
// Do we have to stop?
|
|
if (!m_Running)
|
|
{
|
|
// Is there an item that requested to try again?
|
|
if (item)
|
|
{
|
|
try {
|
|
item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot!
|
|
} catch (const std::exception & e) {
|
|
LogErr("Exception occured in %s cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
|
|
}
|
|
}
|
|
// Exit the loop
|
|
break;
|
|
}
|
|
// Attempt to get an item from the queue
|
|
if (!retry)
|
|
{
|
|
// Acquire a lock on the mutex
|
|
std::unique_lock< std::mutex > lock(m_Mutex);
|
|
// Wait until there are items in the queue
|
|
while (m_Queue.empty())
|
|
{
|
|
m_CV.wait(lock);
|
|
}
|
|
// Retrieve top item the queue
|
|
item = std::move(m_Queue.front());
|
|
// Remove it from the queue
|
|
m_Queue.pop();
|
|
}
|
|
// Do we have to stop?
|
|
if (!m_Running)
|
|
{
|
|
// Is there an item to be processed?
|
|
if (item)
|
|
{
|
|
try {
|
|
item->OnAborted(false); // It should mark itself as aborted somehow!
|
|
} catch (const std::exception & e) {
|
|
LogErr("Exception occured in %s forced cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
|
|
}
|
|
}
|
|
// Exit the loop
|
|
break;
|
|
}
|
|
bool r;
|
|
// Attempt preparation
|
|
try {
|
|
r = item->OnPrepare();
|
|
} catch (const std::exception & e) {
|
|
LogErr("Exception occured in %s preparation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
|
|
}
|
|
// Perform the task
|
|
if (r)
|
|
{
|
|
try {
|
|
retry = item->OnProcess();
|
|
} catch (const std::exception & e) {
|
|
LogErr("Exception occured in %s processing stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
|
|
}
|
|
}
|
|
// The task was performed
|
|
if (!retry)
|
|
{
|
|
m_Finished.enqueue(std::move(item));
|
|
}
|
|
}
|
|
// Did we take an item from the queue?
|
|
if (item)
|
|
{
|
|
m_Finished.enqueue(std::move(item)); // Return it, even if not completed
|
|
}
|
|
}
|
|
|
|
} // Namespace:: SqMod
|