1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-06-16 07:07:13 +02:00

Basic thread pool implementation for internal use.

This commit is contained in:
Sandu Liviu Catalin
2021-03-27 19:53:49 +02:00
parent 8a29e812b1
commit 6a0b7f246e
4 changed files with 402 additions and 3 deletions

181
module/Core/ThreadPool.cpp Normal file
View File

@ -0,0 +1,181 @@
// ------------------------------------------------------------------------------------------------
#include "Core/ThreadPool.hpp"
// ------------------------------------------------------------------------------------------------
#include <chrono>
// ------------------------------------------------------------------------------------------------
namespace SqMod {
// ------------------------------------------------------------------------------------------------
ThreadPool ThreadPool::s_Inst;
// ------------------------------------------------------------------------------------------------
ThreadPool::ThreadPool() noexcept
: m_Running(false)
, m_Pending()
, m_Finished()
, m_Threads()
{
m_Threads.reserve(MAX_WORKER_THREADS + 1); // Reserve thread memory in advance
}
// ------------------------------------------------------------------------------------------------
ThreadPool::~ThreadPool()
{
// Desperate attempt to gracefully shutdown
for (auto & t : m_Threads)
{
if (t.joinable())
{
t.join(); // Will block until work is finished!
}
}
}
// ------------------------------------------------------------------------------------------------
bool ThreadPool::Initialize(uint32_t count)
{
// Are there any threads requested?
// Are we already running or have threads?
if (!count || m_Running || !m_Threads.empty())
{
return true; // Nothing to do!
}
else if (count > MAX_WORKER_THREADS)
{
count = MAX_WORKER_THREADS; // Hard coded worker limit
}
// See if any of the threads are active
for (auto & t : m_Threads)
{
if (t.joinable())
{
return false; // Something is not right!
}
}
// Make sure the threads don't stop after creation
m_Running = true;
// Create the specified amount of worker threads
for (uint32_t i = 0; i < count; ++i)
{
// Pass the container index to the worker thread so it knows to find itself
m_Threads.emplace_back(&ThreadPool::WorkerProc, this, i);
}
// Thread pool initialized
return m_Running;
}
// ------------------------------------------------------------------------------------------------
void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown))
{
// Are there threads running?
if (m_Threads.empty() || !m_Running)
{
return; // Don't bother!
}
// Tell the threads to stop
m_Running = false;
// Enqueue dummy items to wake the threads and allow them to stop
for (size_t n = 0; n < m_Threads.size(); ++n)
{
m_Pending.enqueue(Item());
}
// Attempt to join the threads
for (auto & t : m_Threads)
{
if (t.joinable())
{
t.join(); // Will block until work is finished!
}
}
// Retrieve each item individually and process it
for (Item item; m_Finished.try_dequeue(item);)
{
// Is the item valid?
if (item)
{
item->OnCompleted(); // Allow the item to finish itself
}
// Item processed
item.reset();
}
}
// ------------------------------------------------------------------------------------------------
void ThreadPool::Process()
{
// Process only what's currently in the queue
const size_t count = m_Finished.size_approx();
// Retrieve each item individually and process it
for (size_t n = 0; n <= count; ++n)
{
Item item;
// Try to get an item from the queue
if (m_Finished.try_dequeue(item))
{
// Is the item valid?
if (item)
{
item->OnCompleted(); // Allow the item to finish itself
}
}
}
}
// ------------------------------------------------------------------------------------------------
void ThreadPool::WorkerProc(uint32_t SQ_UNUSED_ARG(idx))
{
// Whether this item wants to try again
bool retry = false;
// Pointer to the dequeued item
Item item;
// Constantly process items from the queue
while (true)
{
// Do we have to stop?
if (!m_Running)
{
// Is there an item that requested to try again?
if (item)
{
item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot!
}
// Exit the loop
break;
}
// Attempt to get an item from the queue
if (!retry)
{
m_Pending.wait_dequeue(item);
}
// Do we have to stop?
if (!m_Running)
{
// Is there an item to be processed?
if (item)
{
item->OnAborted(false); // It should mark itself as aborted somehow!
}
// Exit the loop
break;
}
// Perform the task
if (item->OnPrepare())
{
retry = item->OnProcess();
}
// The task was performed
if (!retry)
{
m_Finished.enqueue(std::move(item));
}
}
// Did we take an item from the queue?
if (item)
{
m_Finished.enqueue(std::move(item)); // Return it, even if not completed
}
}
} // Namespace:: SqMod

209
module/Core/ThreadPool.hpp Normal file
View File

@ -0,0 +1,209 @@
#pragma once
// ------------------------------------------------------------------------------------------------
#include "Core/Common.hpp"
// ------------------------------------------------------------------------------------------------
#include <concurrentqueue.h>
#include <blockingconcurrentqueue.h>
// ------------------------------------------------------------------------------------------------
#include <queue>
#include <mutex>
#include <vector>
#include <atomic>
#include <thread>
// ------------------------------------------------------------------------------------------------
namespace SqMod {
// ------------------------------------------------------------------------------------------------
static constexpr uint32_t MAX_WORKER_THREADS = 32; // Hard coded worker threads limit.
/* ------------------------------------------------------------------------------------------------
* Item that can be given to the thread pool to process data in a separate thread.
*/
struct ThreadPoolItem
{
/* --------------------------------------------------------------------------------------------
* Default constructor.
*/
ThreadPoolItem() noexcept = default;
/* --------------------------------------------------------------------------------------------
* Copy constructor. (disabled)
*/
ThreadPoolItem(const ThreadPoolItem & o) = delete;
/* --------------------------------------------------------------------------------------------
* Move constructor. (disabled)
*/
ThreadPoolItem(ThreadPoolItem && o) = delete;
/* --------------------------------------------------------------------------------------------
* Copy assignment operator. (disabled)
*/
ThreadPoolItem & operator = (const ThreadPoolItem & o) = delete;
/* --------------------------------------------------------------------------------------------
* Move assignment operator. (disabled)
*/
ThreadPoolItem & operator = (ThreadPoolItem && o) = delete;
/* --------------------------------------------------------------------------------------------
* Invoked in worker thread by the thread pool after obtaining the task from the queue.
* Must return true to indicate that the task can be performed. False indicates failure.
*/
virtual bool OnPrepare() { return true; }
/* --------------------------------------------------------------------------------------------
* Called in worker by the thread pool to performed by the associated tasks.
* Will be called continuously while the returned value is true. While false means it finished.
*/
virtual bool OnProcess() = 0;
/* --------------------------------------------------------------------------------------------
* Invoked in main thread by the thread pool after the task was completed.
* If true is returned, the task is added back to the queue to be processed again.
*/
virtual bool OnCompleted() { return false; }
/* --------------------------------------------------------------------------------------------
* Called in worker by the thread pool to let the task know that it will be aborted.
* Most likely due to a shutdown of the thread pool.
*/
virtual void OnAborted(bool SQ_UNUSED_ARG(retry)) { }
};
/* ------------------------------------------------------------------------------------------------
* Common implementation of a basic item.
*/
struct BasicThreadPoolItem
{
// --------------------------------------------------------------------------------------------
LightObj mCallback{};
/* --------------------------------------------------------------------------------------------
* Default constructor.
*/
BasicThreadPoolItem() noexcept = default;
};
/* ------------------------------------------------------------------------------------------------
* Internal thread pool used to reduce stuttering from the plug-in whenever necessary and/or possible.
*/
class ThreadPool
{
private:
// --------------------------------------------------------------------------------------------
static ThreadPool s_Inst; // ThreadPool instance.
/* --------------------------------------------------------------------------------------------
* Default constructor.
*/
ThreadPool() noexcept;
/* --------------------------------------------------------------------------------------------
* Destructor.
*/
~ThreadPool();
private:
// --------------------------------------------------------------------------------------------
using Pool = std::vector< std::thread >; // Worker container.
using Item = std::unique_ptr< ThreadPoolItem >; // Owning pointer of an item.
// --------------------------------------------------------------------------------------------
using Pending = moodycamel::BlockingConcurrentQueue< Item >; // Pending items.
using Finished = moodycamel::ConcurrentQueue< Item >; // Finished items.
// --------------------------------------------------------------------------------------------
std::atomic_bool m_Running; // Whether the threads are allowed to run.
// --------------------------------------------------------------------------------------------
Pending m_Pending; // Blocking concurrent queue of pending items.
Finished m_Finished; // Non-blocking concurrent queue of finished items.
// --------------------------------------------------------------------------------------------
Pool m_Threads; // Pool of worker threads.
private:
/* --------------------------------------------------------------------------------------------
* Internal function used to process tasks.
*/
void WorkerProc(uint32_t idx);
public:
/* --------------------------------------------------------------------------------------------
* Copy constructor. (disabled)
*/
ThreadPool(const ThreadPool & o) = delete;
/* --------------------------------------------------------------------------------------------
* Move constructor. (disabled)
*/
ThreadPool(ThreadPool && o) = delete;
/* --------------------------------------------------------------------------------------------
* Copy assignment operator. (disabled)
*/
ThreadPool & operator = (const ThreadPool & o) = delete;
/* --------------------------------------------------------------------------------------------
* Move assignment operator. (disabled)
*/
ThreadPool & operator = (ThreadPool && o) = delete;
/* --------------------------------------------------------------------------------------------
* Retrieve the thread pool instance.
*/
static ThreadPool & Get()
{
return s_Inst;
}
/* --------------------------------------------------------------------------------------------
* Initialize the thread pool.
*/
bool Initialize(uint32_t count);
/* --------------------------------------------------------------------------------------------
* Terminate the thread pool.
*/
void Terminate(bool shutdown = false);
/* --------------------------------------------------------------------------------------------
* Process finished items.
*/
void Process();
/* --------------------------------------------------------------------------------------------
* Queue an item to be processed.
*/
void Enqueue(ThreadPoolItem * item)
{
// Only queue valid items
if (!item || !m_Running) return;
// Only queue if worker threads exist
if (!m_Threads.empty())
{
m_Pending.enqueue(Item(item));
}
else
{
// Perform the task in-place
if (item->OnPrepare())
{
if (item->OnProcess())
{
item->OnAborted(true); // Not accepted in single thread
}
}
// Item was finished in main thread
item->OnCompleted();
}
}
};
} // Namespace:: SqMod