// ------------------------------------------------------------------------------------------------ #include "Core/ThreadPool.hpp" // ------------------------------------------------------------------------------------------------ #include // ------------------------------------------------------------------------------------------------ namespace SqMod { // ------------------------------------------------------------------------------------------------ ThreadPool ThreadPool::s_Inst; // ------------------------------------------------------------------------------------------------ void ProcessThreads() { ThreadPool::Get().Process(); } // ------------------------------------------------------------------------------------------------ ThreadPool::ThreadPool() noexcept : m_Running(false) , m_Mutex() , m_CV() , m_Queue() , m_Finished() , m_Threads() { m_Threads.reserve(MAX_WORKER_THREADS + 1); // Reserve thread memory in advance } // ------------------------------------------------------------------------------------------------ ThreadPool::~ThreadPool() { // Desperate attempt to gracefully shutdown for (auto & t : m_Threads) { if (t.joinable()) { t.join(); // Will block until work is finished! } } // Clear all thread instances m_Threads.clear(); } // ------------------------------------------------------------------------------------------------ bool ThreadPool::Initialize(uint32_t count) { // Are there any threads requested? // Are we already running or have threads? if (!count || m_Running || !m_Threads.empty()) { return true; // Nothing to do! } else if (count > MAX_WORKER_THREADS) { count = MAX_WORKER_THREADS; // Hard coded worker limit } // Make sure the threads don't stop after creation m_Running = true; // Create the specified amount of worker threads for (uint32_t i = 0; i < count; ++i) { m_Threads.emplace_back(&ThreadPool::WorkerProc, this); } // Thread pool initialized return m_Running; } // ------------------------------------------------------------------------------------------------ void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown)) { // Are there threads running? if (m_Threads.empty() || !m_Running) { return; // Don't bother! } // Tell the threads to stop m_Running = false; { std::lock_guard< std::mutex > lg(m_Mutex); // Enqueue dummy items to wake the threads and allow them to stop for (size_t n = 0; n < m_Threads.size(); ++n) { m_Queue.push(Item()); } // Allow threads to process the dummy items and stop m_CV.notify_all(); } // Attempt to join the threads for (auto & t : m_Threads) { if (t.joinable()) { t.join(); // Will block until work is finished! } } // Clear all thread instances m_Threads.clear(); // Retrieve each item individually and process it for (Item item; m_Finished.try_dequeue(item);) { // Is the item valid? if (item) { [[maybe_unused]] auto _ = item->OnCompleted(true); // Allow the item to finish itself } // Item processed item.reset(); } } // ------------------------------------------------------------------------------------------------ void ThreadPool::Process() { // Process only what's currently in the queue const size_t count = m_Finished.size_approx(); // Retrieve each item individually and process it for (size_t n = 0; n <= count; ++n) { Item item; // Try to get an item from the queue if (m_Finished.try_dequeue(item)) { // Is the item valid? if (item) { try { // Allow the item to finish itself if (item->OnCompleted(false)) { Enqueue(std::move(item)); // Queue again } } catch (const std::exception & e) { LogErr("Exception occured in %s completion stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); } } } } } // ------------------------------------------------------------------------------------------------ void ThreadPool::WorkerProc() { // Whether this item wants to try again bool retry = false; // Pointer to the dequeued item Item item; // Initialize third-party allocator for this thread auto rpmallocinit = std::make_unique< RPMallocThreadInit >(); // Constantly process items from the queue while (true) { // Do we have to stop? if (!m_Running) { // Is there an item that requested to try again? if (item) { try { item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot! } catch (const std::exception & e) { LogErr("Exception occured in %s cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); } } // Exit the loop break; } // Attempt to get an item from the queue if (!retry) { // Acquire a lock on the mutex std::unique_lock< std::mutex > lock(m_Mutex); // Wait until there are items in the queue while (m_Queue.empty()) { m_CV.wait(lock); } // Retrieve top item the queue item = std::move(m_Queue.front()); // Remove it from the queue m_Queue.pop(); } // Do we have to stop? if (!m_Running) { // Is there an item to be processed? if (item) { try { item->OnAborted(false); // It should mark itself as aborted somehow! } catch (const std::exception & e) { LogErr("Exception occured in %s forced cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); } } // Exit the loop break; } bool r; // Attempt preparation try { r = item->OnPrepare(); } catch (const std::exception & e) { LogErr("Exception occured in %s preparation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); } // Perform the task if (r) { try { retry = item->OnProcess(); } catch (const std::exception & e) { LogErr("Exception occured in %s processing stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); } } // The task was performed if (!retry) { m_Finished.enqueue(std::move(item)); } } // Did we take an item from the queue? if (item) { m_Finished.enqueue(std::move(item)); // Return it, even if not completed } } } // Namespace:: SqMod