From 39524098f1b3ac33e50397e751c20c000dbf737d Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Tue, 19 Jul 2022 19:28:13 +0300 Subject: [PATCH] Extend worker pool. Task exceptions are caught and they can provide custom information to identify the task that failed. Take ownership of the task instance sooner. --- module/Core/ThreadPool.cpp | 33 ++++++++++++++++++++++---- module/Core/ThreadPool.hpp | 47 +++++++++++++++++++++++++++++++------- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/module/Core/ThreadPool.cpp b/module/Core/ThreadPool.cpp index 622ea884..9f8aa8e0 100644 --- a/module/Core/ThreadPool.cpp +++ b/module/Core/ThreadPool.cpp @@ -125,7 +125,11 @@ void ThreadPool::Process() // Is the item valid? if (item) { - item->OnCompleted(); // Allow the item to finish itself + try { + item->OnCompleted(); // Allow the item to finish itself + } catch (const std::exception & e) { + LogErr("Exception occured in %s completion stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } } } } @@ -149,7 +153,11 @@ void ThreadPool::WorkerProc() // Is there an item that requested to try again? if (item) { - item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot! + try { + item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot! + } catch (const std::exception & e) { + LogErr("Exception occured in %s cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } } // Exit the loop break; @@ -175,15 +183,30 @@ void ThreadPool::WorkerProc() // Is there an item to be processed? if (item) { - item->OnAborted(false); // It should mark itself as aborted somehow! + try { + item->OnAborted(false); // It should mark itself as aborted somehow! + } catch (const std::exception & e) { + LogErr("Exception occured in %s forced cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } } // Exit the loop break; } + bool r; + // Attempt preparation + try { + r = item->OnPrepare(); + } catch (const std::exception & e) { + LogErr("Exception occured in %s preparation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } // Perform the task - if (item->OnPrepare()) + if (r) { - retry = item->OnProcess(); + try { + retry = item->OnProcess(); + } catch (const std::exception & e) { + LogErr("Exception occured in %s processing stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } } // The task was performed if (!retry) diff --git a/module/Core/ThreadPool.hpp b/module/Core/ThreadPool.hpp index f5ee1b10..4767d836 100644 --- a/module/Core/ThreadPool.hpp +++ b/module/Core/ThreadPool.hpp @@ -55,6 +55,16 @@ struct ThreadPoolItem */ ThreadPoolItem & operator = (ThreadPoolItem && o) = delete; + /* -------------------------------------------------------------------------------------------- + * Provide a name to what type of task this is. Mainly for debugging purposes. + */ + SQMOD_NODISCARD virtual const char * TypeName() noexcept { return "worker item"; } + + /* -------------------------------------------------------------------------------------------- + * Provide unique information that may help identify the task. Mainly for debugging purposes. + */ + SQMOD_NODISCARD virtual const char * IdentifiableInfo() noexcept { return ""; } + /* -------------------------------------------------------------------------------------------- * Invoked in worker thread by the thread pool after obtaining the task from the queue. * Must return true to indicate that the task can be performed. False indicates failure. @@ -174,6 +184,14 @@ public: * Queue an item to be processed. Will take ownership of the given pointer! */ void Enqueue(ThreadPoolItem * item) + { + Enqueue(Item{item}); + } + + /* -------------------------------------------------------------------------------------------- + * Queue an item to be processed. Will take ownership of the given pointer! + */ + void Enqueue(Item && item) { // Only queue valid items if (!item || !m_Running) return; @@ -183,7 +201,7 @@ public: // Acquire a lock on the mutex std::unique_lock< std::mutex > lock(m_Mutex); // Push the item in the queue - m_Queue.push(Item(item)); + m_Queue.push(std::forward< Item >(item)); // Release the mutex before notifying lock.unlock(); // Notify one thread that there's work @@ -191,18 +209,32 @@ public: } else { - // Take ownership - Item i{item}; + bool r; + // Attempt preparation + try { + r = item->OnPrepare(); + } catch (const std::exception & e) { + LogErr("Exception occured in %s preparation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } // Perform the task in-place - if (i->OnPrepare()) + if (r) { - if (i->OnProcess()) + try { + r = item->OnProcess(); + } catch (const std::exception & e) { + LogErr("Exception occured in %s processing stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } + if (r) { - i->OnAborted(true); // Not accepted in single thread + try { + item->OnAborted(true); // Not accepted in single thread + } catch (const std::exception & e) { + LogErr("Exception occured in %s cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); + } } } // Task is completed in processing stage - m_Finished.enqueue(std::move(i)); + m_Finished.enqueue(std::forward< Item >(item)); } } @@ -213,7 +245,6 @@ public: { return m_Threads.size(); } - }; } // Namespace:: SqMod