1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2024-11-08 00:37:15 +01:00

Extend worker pool.

Task exceptions are caught and they can provide custom information to identify the task that failed.
Take ownership of the task instance sooner.
This commit is contained in:
Sandu Liviu Catalin 2022-07-19 19:28:13 +03:00
parent ea63899c9a
commit 39524098f1
2 changed files with 67 additions and 13 deletions

View File

@ -125,7 +125,11 @@ void ThreadPool::Process()
// Is the item valid? // Is the item valid?
if (item) if (item)
{ {
try {
item->OnCompleted(); // Allow the item to finish itself item->OnCompleted(); // Allow the item to finish itself
} catch (const std::exception & e) {
LogErr("Exception occured in %s completion stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
} }
} }
} }
@ -149,7 +153,11 @@ void ThreadPool::WorkerProc()
// Is there an item that requested to try again? // Is there an item that requested to try again?
if (item) if (item)
{ {
try {
item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot! item->OnAborted(true); // NOLINT(bugprone-use-after-move) There's an `if` condition above idiot!
} catch (const std::exception & e) {
LogErr("Exception occured in %s cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
} }
// Exit the loop // Exit the loop
break; break;
@ -175,15 +183,30 @@ void ThreadPool::WorkerProc()
// Is there an item to be processed? // Is there an item to be processed?
if (item) if (item)
{ {
try {
item->OnAborted(false); // It should mark itself as aborted somehow! item->OnAborted(false); // It should mark itself as aborted somehow!
} catch (const std::exception & e) {
LogErr("Exception occured in %s forced cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
} }
// Exit the loop // Exit the loop
break; break;
} }
bool r;
// Attempt preparation
try {
r = item->OnPrepare();
} catch (const std::exception & e) {
LogErr("Exception occured in %s preparation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
// Perform the task // Perform the task
if (item->OnPrepare()) if (r)
{ {
try {
retry = item->OnProcess(); retry = item->OnProcess();
} catch (const std::exception & e) {
LogErr("Exception occured in %s processing stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
} }
// The task was performed // The task was performed
if (!retry) if (!retry)

View File

@ -55,6 +55,16 @@ struct ThreadPoolItem
*/ */
ThreadPoolItem & operator = (ThreadPoolItem && o) = delete; ThreadPoolItem & operator = (ThreadPoolItem && o) = delete;
/* --------------------------------------------------------------------------------------------
* Provide a name to what type of task this is. Mainly for debugging purposes.
*/
SQMOD_NODISCARD virtual const char * TypeName() noexcept { return "worker item"; }
/* --------------------------------------------------------------------------------------------
* Provide unique information that may help identify the task. Mainly for debugging purposes.
*/
SQMOD_NODISCARD virtual const char * IdentifiableInfo() noexcept { return ""; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Invoked in worker thread by the thread pool after obtaining the task from the queue. * Invoked in worker thread by the thread pool after obtaining the task from the queue.
* Must return true to indicate that the task can be performed. False indicates failure. * Must return true to indicate that the task can be performed. False indicates failure.
@ -174,6 +184,14 @@ public:
* Queue an item to be processed. Will take ownership of the given pointer! * Queue an item to be processed. Will take ownership of the given pointer!
*/ */
void Enqueue(ThreadPoolItem * item) void Enqueue(ThreadPoolItem * item)
{
Enqueue(Item{item});
}
/* --------------------------------------------------------------------------------------------
* Queue an item to be processed. Will take ownership of the given pointer!
*/
void Enqueue(Item && item)
{ {
// Only queue valid items // Only queue valid items
if (!item || !m_Running) return; if (!item || !m_Running) return;
@ -183,7 +201,7 @@ public:
// Acquire a lock on the mutex // Acquire a lock on the mutex
std::unique_lock< std::mutex > lock(m_Mutex); std::unique_lock< std::mutex > lock(m_Mutex);
// Push the item in the queue // Push the item in the queue
m_Queue.push(Item(item)); m_Queue.push(std::forward< Item >(item));
// Release the mutex before notifying // Release the mutex before notifying
lock.unlock(); lock.unlock();
// Notify one thread that there's work // Notify one thread that there's work
@ -191,18 +209,32 @@ public:
} }
else else
{ {
// Take ownership bool r;
Item i{item}; // Attempt preparation
try {
r = item->OnPrepare();
} catch (const std::exception & e) {
LogErr("Exception occured in %s preparation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
// Perform the task in-place // Perform the task in-place
if (i->OnPrepare()) if (r)
{ {
if (i->OnProcess()) try {
r = item->OnProcess();
} catch (const std::exception & e) {
LogErr("Exception occured in %s processing stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
if (r)
{ {
i->OnAborted(true); // Not accepted in single thread try {
item->OnAborted(true); // Not accepted in single thread
} catch (const std::exception & e) {
LogErr("Exception occured in %s cancelation stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo());
}
} }
} }
// Task is completed in processing stage // Task is completed in processing stage
m_Finished.enqueue(std::move(i)); m_Finished.enqueue(std::forward< Item >(item));
} }
} }
@ -213,7 +245,6 @@ public:
{ {
return m_Threads.size(); return m_Threads.size();
} }
}; };
} // Namespace:: SqMod } // Namespace:: SqMod