From 6a0b7f246e2399c64e47019e1b63b593983ed2f5 Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Sat, 27 Mar 2021 19:53:49 +0200 Subject: [PATCH] Basic thread pool implementation for internal use. --- module/CMakeLists.txt | 1 + module/Core.cpp | 14 ++- module/Core/ThreadPool.cpp | 181 ++++++++++++++++++++++++++++++++ module/Core/ThreadPool.hpp | 209 +++++++++++++++++++++++++++++++++++++ 4 files changed, 402 insertions(+), 3 deletions(-) create mode 100644 module/Core/ThreadPool.cpp create mode 100644 module/Core/ThreadPool.hpp diff --git a/module/CMakeLists.txt b/module/CMakeLists.txt index 45ff5f4f..c5778e6a 100644 --- a/module/CMakeLists.txt +++ b/module/CMakeLists.txt @@ -47,6 +47,7 @@ add_library(SqModule MODULE SqBase.hpp Main.cpp Core/Script.cpp Core/Script.hpp Core/Signal.cpp Core/Signal.hpp Core/Tasks.cpp Core/Tasks.hpp + Core/ThreadPool.cpp Core/ThreadPool.hpp Core/Utility.cpp Core/Utility.hpp Core/VecMap.hpp # Entity diff --git a/module/Core.cpp b/module/Core.cpp index 646e185d..7eacc3db 100644 --- a/module/Core.cpp +++ b/module/Core.cpp @@ -4,6 +4,7 @@ #include "Core/Areas.hpp" #include "Core/Signal.hpp" #include "Core/Buffer.hpp" +#include "Core/ThreadPool.hpp" #include "Library/IO/Buffer.hpp" // ------------------------------------------------------------------------------------------------ @@ -37,7 +38,6 @@ namespace SqMod { extern bool RegisterAPI(HSQUIRRELVM vm); // ------------------------------------------------------------------------------------------------ -extern void ZmqProcess(); extern void ZmqTerminate(); extern void AnnounceTerminate(); extern void InitializeTasks(); @@ -236,7 +236,13 @@ bool Core::Initialize() // Failed to load the configuration file return false; } - + // Attempt to initialize the thread pool + if (!ThreadPool::Get().Initialize(conf.GetLongValue("General", "WorkerThreads", + static_cast< long >(std::thread::hardware_concurrency())))) + { + ThreadPool::Get().Terminate(); + return false; + } #ifdef VCMP_ENABLE_OFFICIAL // See if debugging options should be enabled m_Official = conf.GetBoolValue("Squirrel", "OfficialCompatibility", m_Official); @@ -531,6 +537,8 @@ void Core::Terminate(bool shutdown) TerminatePocoData(); // Release ZMQ sockets ZmqTerminate(); + // Terminate the thread pool + ThreadPool::Get().Terminate(); // In case there's a payload for reload m_ReloadPayload.Release(); // Release null objects in case any reference to valid objects is stored in them @@ -701,7 +709,7 @@ bool Core::LoadScript(const SQChar * filepath, bool delay) if (std::find_if(m_Scripts.cbegin(), m_Scripts.cend(), [&path](Scripts::const_reference s) { return (s.mPath == path); }) != m_Scripts.end()) - { + { // NOLINT(bugprone-branch-clone) LogWrn("Script was specified before: %s", path.c_str()); } // Also check the pending scripts container diff --git a/module/Core/ThreadPool.cpp b/module/Core/ThreadPool.cpp new file mode 100644 index 00000000..cb8c18e1 --- /dev/null +++ b/module/Core/ThreadPool.cpp @@ -0,0 +1,181 @@ +// ------------------------------------------------------------------------------------------------ +#include "Core/ThreadPool.hpp" + +// ------------------------------------------------------------------------------------------------ +#include + +// ------------------------------------------------------------------------------------------------ +namespace SqMod { + +// ------------------------------------------------------------------------------------------------ +ThreadPool ThreadPool::s_Inst; + +// ------------------------------------------------------------------------------------------------ +ThreadPool::ThreadPool() noexcept + : m_Running(false) + , m_Pending() + , m_Finished() + , m_Threads() +{ + m_Threads.reserve(MAX_WORKER_THREADS + 1); // Reserve thread memory in advance +} + +// ------------------------------------------------------------------------------------------------ +ThreadPool::~ThreadPool() +{ + // Desperate attempt to gracefully shutdown + for (auto & t : m_Threads) + { + if (t.joinable()) + { + t.join(); // Will block until work is finished! + } + } +} + +// ------------------------------------------------------------------------------------------------ +bool ThreadPool::Initialize(uint32_t count) +{ + // Are there any threads requested? + // Are we already running or have threads? + if (!count || m_Running || !m_Threads.empty()) + { + return true; // Nothing to do! + } + else if (count > MAX_WORKER_THREADS) + { + count = MAX_WORKER_THREADS; // Hard coded worker limit + } + // See if any of the threads are active + for (auto & t : m_Threads) + { + if (t.joinable()) + { + return false; // Something is not right! + } + } + // Make sure the threads don't stop after creation + m_Running = true; + // Create the specified amount of worker threads + for (uint32_t i = 0; i < count; ++i) + { + // Pass the container index to the worker thread so it knows to find itself + m_Threads.emplace_back(&ThreadPool::WorkerProc, this, i); + } + // Thread pool initialized + return m_Running; +} + +// ------------------------------------------------------------------------------------------------ +void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown)) +{ + // Are there threads running? + if (m_Threads.empty() || !m_Running) + { + return; // Don't bother! + } + // Tell the threads to stop + m_Running = false; + // Enqueue dummy items to wake the threads and allow them to stop + for (size_t n = 0; n < m_Threads.size(); ++n) + { + m_Pending.enqueue(Item()); + } + // Attempt to join the threads + for (auto & t : m_Threads) + { + if (t.joinable()) + { + t.join(); // Will block until work is finished! + } + } + // Retrieve each item individually and process it + for (Item item; m_Finished.try_dequeue(item);) + { + // Is the item valid? + if (item) + { + item->OnCompleted(); // Allow the item to finish itself + } + // Item processed + item.reset(); + } +} + +// ------------------------------------------------------------------------------------------------ +void ThreadPool::Process() +{ + // Process only what's currently in the queue + const size_t count = m_Finished.size_approx(); + // Retrieve each item individually and process it + for (size_t n = 0; n <= count; ++n) + { + Item item; + // Try to get an item from the queue + if (m_Finished.try_dequeue(item)) + { + // Is the item valid? + if (item) + { + item->OnCompleted(); // Allow the item to finish itself + } + } + } +} + +// ------------------------------------------------------------------------------------------------ +void ThreadPool::WorkerProc(uint32_t SQ_UNUSED_ARG(idx)) +{ + // Whether this item wants to try again + bool retry = false; + // Pointer to the dequeued item + Item item; + // Constantly process items from the queue + while (true) + { + // Do we have to stop? + if (!m_Running) + { + // Is there an item that requested to try again? + if (item) + { + item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot! + } + // Exit the loop + break; + } + // Attempt to get an item from the queue + if (!retry) + { + m_Pending.wait_dequeue(item); + } + // Do we have to stop? + if (!m_Running) + { + // Is there an item to be processed? + if (item) + { + item->OnAborted(false); // It should mark itself as aborted somehow! + } + // Exit the loop + break; + } + // Perform the task + if (item->OnPrepare()) + { + retry = item->OnProcess(); + } + // The task was performed + if (!retry) + { + m_Finished.enqueue(std::move(item)); + } + } + // Did we take an item from the queue? + if (item) + { + m_Finished.enqueue(std::move(item)); // Return it, even if not completed + } +} + +} // Namespace:: SqMod diff --git a/module/Core/ThreadPool.hpp b/module/Core/ThreadPool.hpp new file mode 100644 index 00000000..f3dec822 --- /dev/null +++ b/module/Core/ThreadPool.hpp @@ -0,0 +1,209 @@ +#pragma once + +// ------------------------------------------------------------------------------------------------ +#include "Core/Common.hpp" + +// ------------------------------------------------------------------------------------------------ +#include +#include + +// ------------------------------------------------------------------------------------------------ +#include +#include +#include +#include +#include + +// ------------------------------------------------------------------------------------------------ +namespace SqMod { + +// ------------------------------------------------------------------------------------------------ +static constexpr uint32_t MAX_WORKER_THREADS = 32; // Hard coded worker threads limit. + +/* ------------------------------------------------------------------------------------------------ + * Item that can be given to the thread pool to process data in a separate thread. +*/ +struct ThreadPoolItem +{ + /* -------------------------------------------------------------------------------------------- + * Default constructor. + */ + ThreadPoolItem() noexcept = default; + + /* -------------------------------------------------------------------------------------------- + * Copy constructor. (disabled) + */ + ThreadPoolItem(const ThreadPoolItem & o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Move constructor. (disabled) + */ + ThreadPoolItem(ThreadPoolItem && o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Copy assignment operator. (disabled) + */ + ThreadPoolItem & operator = (const ThreadPoolItem & o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Move assignment operator. (disabled) + */ + ThreadPoolItem & operator = (ThreadPoolItem && o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Invoked in worker thread by the thread pool after obtaining the task from the queue. + * Must return true to indicate that the task can be performed. False indicates failure. + */ + virtual bool OnPrepare() { return true; } + + /* -------------------------------------------------------------------------------------------- + * Called in worker by the thread pool to performed by the associated tasks. + * Will be called continuously while the returned value is true. While false means it finished. + */ + virtual bool OnProcess() = 0; + + /* -------------------------------------------------------------------------------------------- + * Invoked in main thread by the thread pool after the task was completed. + * If true is returned, the task is added back to the queue to be processed again. + */ + virtual bool OnCompleted() { return false; } + + /* -------------------------------------------------------------------------------------------- + * Called in worker by the thread pool to let the task know that it will be aborted. + * Most likely due to a shutdown of the thread pool. + */ + virtual void OnAborted(bool SQ_UNUSED_ARG(retry)) { } +}; + +/* ------------------------------------------------------------------------------------------------ + * Common implementation of a basic item. +*/ +struct BasicThreadPoolItem +{ + // -------------------------------------------------------------------------------------------- + LightObj mCallback{}; + + /* -------------------------------------------------------------------------------------------- + * Default constructor. + */ + BasicThreadPoolItem() noexcept = default; +}; + +/* ------------------------------------------------------------------------------------------------ + * Internal thread pool used to reduce stuttering from the plug-in whenever necessary and/or possible. +*/ +class ThreadPool +{ +private: + + // -------------------------------------------------------------------------------------------- + static ThreadPool s_Inst; // ThreadPool instance. + + /* -------------------------------------------------------------------------------------------- + * Default constructor. + */ + ThreadPool() noexcept; + + /* -------------------------------------------------------------------------------------------- + * Destructor. + */ + ~ThreadPool(); + +private: + + // -------------------------------------------------------------------------------------------- + using Pool = std::vector< std::thread >; // Worker container. + using Item = std::unique_ptr< ThreadPoolItem >; // Owning pointer of an item. + // -------------------------------------------------------------------------------------------- + using Pending = moodycamel::BlockingConcurrentQueue< Item >; // Pending items. + using Finished = moodycamel::ConcurrentQueue< Item >; // Finished items. + + // -------------------------------------------------------------------------------------------- + std::atomic_bool m_Running; // Whether the threads are allowed to run. + // -------------------------------------------------------------------------------------------- + Pending m_Pending; // Blocking concurrent queue of pending items. + Finished m_Finished; // Non-blocking concurrent queue of finished items. + // -------------------------------------------------------------------------------------------- + Pool m_Threads; // Pool of worker threads. + +private: + + /* -------------------------------------------------------------------------------------------- + * Internal function used to process tasks. + */ + void WorkerProc(uint32_t idx); + +public: + + /* -------------------------------------------------------------------------------------------- + * Copy constructor. (disabled) + */ + ThreadPool(const ThreadPool & o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Move constructor. (disabled) + */ + ThreadPool(ThreadPool && o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Copy assignment operator. (disabled) + */ + ThreadPool & operator = (const ThreadPool & o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Move assignment operator. (disabled) + */ + ThreadPool & operator = (ThreadPool && o) = delete; + + /* -------------------------------------------------------------------------------------------- + * Retrieve the thread pool instance. + */ + static ThreadPool & Get() + { + return s_Inst; + } + + /* -------------------------------------------------------------------------------------------- + * Initialize the thread pool. + */ + bool Initialize(uint32_t count); + + /* -------------------------------------------------------------------------------------------- + * Terminate the thread pool. + */ + void Terminate(bool shutdown = false); + + /* -------------------------------------------------------------------------------------------- + * Process finished items. + */ + void Process(); + + /* -------------------------------------------------------------------------------------------- + * Queue an item to be processed. + */ + void Enqueue(ThreadPoolItem * item) + { + // Only queue valid items + if (!item || !m_Running) return; + // Only queue if worker threads exist + if (!m_Threads.empty()) + { + m_Pending.enqueue(Item(item)); + } + else + { + // Perform the task in-place + if (item->OnPrepare()) + { + if (item->OnProcess()) + { + item->OnAborted(true); // Not accepted in single thread + } + } + // Item was finished in main thread + item->OnCompleted(); + } + } +}; + +} // Namespace:: SqMod