1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-01-19 03:57:14 +01:00
SqMod/module/Core/ThreadPool.cpp
2021-03-28 00:19:09 +02:00

200 lines
5.7 KiB
C++

// ------------------------------------------------------------------------------------------------
#include "Core/ThreadPool.hpp"
// ------------------------------------------------------------------------------------------------
#include <chrono>
// ------------------------------------------------------------------------------------------------
namespace SqMod {
// ------------------------------------------------------------------------------------------------
ThreadPool ThreadPool::s_Inst;
// ------------------------------------------------------------------------------------------------
void ProcessThreads()
{
ThreadPool::Get().Process();
}
// ------------------------------------------------------------------------------------------------
ThreadPool::ThreadPool() noexcept
: m_Running(false)
, m_Mutex()
, m_CV()
, m_Queue()
, m_Finished()
, m_Threads()
{
m_Threads.reserve(MAX_WORKER_THREADS + 1); // Reserve thread memory in advance
}
// ------------------------------------------------------------------------------------------------
ThreadPool::~ThreadPool()
{
// Desperate attempt to gracefully shutdown
for (auto & t : m_Threads)
{
if (t.joinable())
{
t.join(); // Will block until work is finished!
}
}
// Clear all thread instances
m_Threads.clear();
}
// ------------------------------------------------------------------------------------------------
bool ThreadPool::Initialize(uint32_t count)
{
// Are there any threads requested?
// Are we already running or have threads?
if (!count || m_Running || !m_Threads.empty())
{
return true; // Nothing to do!
}
else if (count > MAX_WORKER_THREADS)
{
count = MAX_WORKER_THREADS; // Hard coded worker limit
}
// Make sure the threads don't stop after creation
m_Running = true;
// Create the specified amount of worker threads
for (uint32_t i = 0; i < count; ++i)
{
m_Threads.emplace_back(&ThreadPool::WorkerProc, this);
}
// Thread pool initialized
return m_Running;
}
// ------------------------------------------------------------------------------------------------
void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown))
{
// Are there threads running?
if (m_Threads.empty() || !m_Running)
{
return; // Don't bother!
}
// Tell the threads to stop
m_Running = false;
{
std::lock_guard< std::mutex > lg(m_Mutex);
// Enqueue dummy items to wake the threads and allow them to stop
for (size_t n = 0; n < m_Threads.size(); ++n)
{
m_Queue.push(Item());
}
// Allow threads to process the dummy items and stop
m_CV.notify_all();
}
// Attempt to join the threads
for (auto & t : m_Threads)
{
if (t.joinable())
{
t.join(); // Will block until work is finished!
}
}
// Clear all thread instances
m_Threads.clear();
// Retrieve each item individually and process it
for (Item item; m_Finished.try_dequeue(item);)
{
// Is the item valid?
if (item)
{
item->OnCompleted(); // Allow the item to finish itself
}
// Item processed
item.reset();
}
}
// ------------------------------------------------------------------------------------------------
void ThreadPool::Process()
{
// Process only what's currently in the queue
const size_t count = m_Finished.size_approx();
// Retrieve each item individually and process it
for (size_t n = 0; n <= count; ++n)
{
Item item;
// Try to get an item from the queue
if (m_Finished.try_dequeue(item))
{
// Is the item valid?
if (item)
{
item->OnCompleted(); // Allow the item to finish itself
}
}
}
}
// ------------------------------------------------------------------------------------------------
void ThreadPool::WorkerProc()
{
// Whether this item wants to try again
bool retry = false;
// Pointer to the dequeued item
Item item;
// Constantly process items from the queue
while (true)
{
// Do we have to stop?
if (!m_Running)
{
// Is there an item that requested to try again?
if (item)
{
item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot!
}
// Exit the loop
break;
}
// Attempt to get an item from the queue
if (!retry)
{
// Acquire a lock on the mutex
std::unique_lock< std::mutex > lock(m_Mutex);
// Wait until there are items in the queue
while (m_Queue.empty())
{
m_CV.wait(lock);
}
// Retrieve top item the queue
item = std::move(m_Queue.front());
// Remove it from the queue
m_Queue.pop();
}
// Do we have to stop?
if (!m_Running)
{
// Is there an item to be processed?
if (item)
{
item->OnAborted(false); // It should mark itself as aborted somehow!
}
// Exit the loop
break;
}
// Perform the task
if (item->OnPrepare())
{
retry = item->OnProcess();
}
// The task was performed
if (!retry)
{
m_Finished.enqueue(std::move(item));
}
}
// Did we take an item from the queue?
if (item)
{
m_Finished.enqueue(std::move(item)); // Return it, even if not completed
}
}
} // Namespace:: SqMod