// // ThreadPool.cpp // // Library: Foundation // Package: Threading // Module: ThreadPool // // Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. // and Contributors. // // SPDX-License-Identifier: BSL-1.0 // #include "Poco/ThreadPool.h" #include "Poco/Runnable.h" #include "Poco/Thread.h" #include "Poco/Event.h" #include "Poco/ThreadLocal.h" #include "Poco/ErrorHandler.h" #include #include #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 #include "wce_time.h" #endif namespace Poco { class PooledThread: public Runnable { public: PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); ~PooledThread(); void start(); void start(Thread::Priority priority, Runnable& target); void start(Thread::Priority priority, Runnable& target, const std::string& name); bool idle(); int idleTime(); void join(); void activate(); void release(); void run(); private: volatile bool _idle; volatile std::time_t _idleTime; Runnable* _pTarget; std::string _name; Thread _thread; Event _targetReady; Event _targetCompleted; Event _started; FastMutex _mutex; }; PooledThread::PooledThread(const std::string& name, int stackSize): _idle(true), _idleTime(0), _pTarget(0), _name(name), _thread(name), _targetCompleted(false) { poco_assert_dbg (stackSize >= 0); _thread.setStackSize(stackSize); #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 _idleTime = wceex_time(NULL); #else _idleTime = std::time(NULL); #endif } PooledThread::~PooledThread() { } void PooledThread::start() { _thread.start(*this); _started.wait(); } void PooledThread::start(Thread::Priority priority, Runnable& target) { FastMutex::ScopedLock lock(_mutex); poco_assert (_pTarget == 0); _pTarget = ⌖ _thread.setPriority(priority); _targetReady.set(); } void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name) { FastMutex::ScopedLock lock(_mutex); std::string fullName(name); if (name.empty()) { fullName = _name; } else { fullName.append(" ("); fullName.append(_name); fullName.append(")"); } _thread.setName(fullName); _thread.setPriority(priority); poco_assert (_pTarget == 0); _pTarget = ⌖ _targetReady.set(); } inline bool PooledThread::idle() { FastMutex::ScopedLock lock(_mutex); return _idle; } int PooledThread::idleTime() { FastMutex::ScopedLock lock(_mutex); #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 return (int) (wceex_time(NULL) - _idleTime); #else return (int) (time(NULL) - _idleTime); #endif } void PooledThread::join() { _mutex.lock(); Runnable* pTarget = _pTarget; _mutex.unlock(); if (pTarget) _targetCompleted.wait(); } void PooledThread::activate() { FastMutex::ScopedLock lock(_mutex); poco_assert (_idle); _idle = false; _targetCompleted.reset(); } void PooledThread::release() { const long JOIN_TIMEOUT = 10000; _mutex.lock(); _pTarget = 0; _mutex.unlock(); // In case of a statically allocated thread pool (such // as the default thread pool), Windows may have already // terminated the thread before we got here. if (_thread.isRunning()) _targetReady.set(); if (_thread.tryJoin(JOIN_TIMEOUT)) { delete this; } } void PooledThread::run() { _started.set(); for (;;) { _targetReady.wait(); _mutex.lock(); if (_pTarget) // a NULL target means kill yourself { Runnable* pTarget = _pTarget; _mutex.unlock(); try { pTarget->run(); } catch (Exception& exc) { ErrorHandler::handle(exc); } catch (std::exception& exc) { ErrorHandler::handle(exc); } catch (...) { ErrorHandler::handle(); } FastMutex::ScopedLock lock(_mutex); _pTarget = 0; #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 _idleTime = wceex_time(NULL); #else _idleTime = time(NULL); #endif _idle = true; _targetCompleted.set(); ThreadLocalStorage::clear(); _thread.setName(_name); _thread.setPriority(Thread::PRIO_NORMAL); } else { _mutex.unlock(); break; } } } ThreadPool::ThreadPool(int minCapacity, int maxCapacity, int idleTime, int stackSize): _minCapacity(minCapacity), _maxCapacity(maxCapacity), _idleTime(idleTime), _serial(0), _age(0), _stackSize(stackSize) { poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); for (int i = 0; i < _minCapacity; i++) { PooledThread* pThread = createThread(); _threads.push_back(pThread); pThread->start(); } } ThreadPool::ThreadPool(const std::string& name, int minCapacity, int maxCapacity, int idleTime, int stackSize): _name(name), _minCapacity(minCapacity), _maxCapacity(maxCapacity), _idleTime(idleTime), _serial(0), _age(0), _stackSize(stackSize) { poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); for (int i = 0; i < _minCapacity; i++) { PooledThread* pThread = createThread(); _threads.push_back(pThread); pThread->start(); } } ThreadPool::~ThreadPool() { try { stopAll(); } catch (...) { poco_unexpected(); } } void ThreadPool::addCapacity(int n) { FastMutex::ScopedLock lock(_mutex); poco_assert (_maxCapacity + n >= _minCapacity); _maxCapacity += n; housekeep(); } int ThreadPool::capacity() const { FastMutex::ScopedLock lock(_mutex); return _maxCapacity; } int ThreadPool::available() const { FastMutex::ScopedLock lock(_mutex); int count = 0; for (auto pThread: _threads) { if (pThread->idle()) ++count; } return (int) (count + _maxCapacity - _threads.size()); } int ThreadPool::used() const { FastMutex::ScopedLock lock(_mutex); int count = 0; for (auto pThread: _threads) { if (!pThread->idle()) ++count; } return count; } int ThreadPool::allocated() const { FastMutex::ScopedLock lock(_mutex); return int(_threads.size()); } void ThreadPool::start(Runnable& target) { getThread()->start(Thread::PRIO_NORMAL, target); } void ThreadPool::start(Runnable& target, const std::string& name) { getThread()->start(Thread::PRIO_NORMAL, target, name); } void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target) { getThread()->start(priority, target); } void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name) { getThread()->start(priority, target, name); } void ThreadPool::stopAll() { FastMutex::ScopedLock lock(_mutex); for (auto pThread: _threads) { pThread->release(); } _threads.clear(); } void ThreadPool::joinAll() { FastMutex::ScopedLock lock(_mutex); for (auto pThread: _threads) { pThread->join(); } housekeep(); } void ThreadPool::collect() { FastMutex::ScopedLock lock(_mutex); housekeep(); } void ThreadPool::housekeep() { _age = 0; if (_threads.size() <= _minCapacity) return; ThreadVec idleThreads; ThreadVec expiredThreads; ThreadVec activeThreads; idleThreads.reserve(_threads.size()); activeThreads.reserve(_threads.size()); for (auto pThread: _threads) { if (pThread->idle()) { if (pThread->idleTime() < _idleTime) idleThreads.push_back(pThread); else expiredThreads.push_back(pThread); } else activeThreads.push_back(pThread); } int n = (int) activeThreads.size(); int limit = (int) idleThreads.size() + n; if (limit < _minCapacity) limit = _minCapacity; idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end()); _threads.clear(); for (auto pIdle: idleThreads) { if (n < limit) { _threads.push_back(pIdle); ++n; } else pIdle->release(); } _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end()); } PooledThread* ThreadPool::getThread() { FastMutex::ScopedLock lock(_mutex); if (++_age == 32) housekeep(); PooledThread* pThread = 0; for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it) { if ((*it)->idle()) pThread = *it; } if (!pThread) { if (_threads.size() < _maxCapacity) { pThread = createThread(); try { pThread->start(); _threads.push_back(pThread); } catch (...) { delete pThread; throw; } } else throw NoThreadAvailableException(); } pThread->activate(); return pThread; } PooledThread* ThreadPool::createThread() { std::ostringstream name; name << _name << "[#" << ++_serial << "]"; return new PooledThread(name.str(), _stackSize); } class ThreadPoolSingletonHolder { public: ThreadPoolSingletonHolder() { _pPool = 0; } ~ThreadPoolSingletonHolder() { delete _pPool; } ThreadPool* pool() { FastMutex::ScopedLock lock(_mutex); if (!_pPool) { _pPool = new ThreadPool("default"); if (POCO_THREAD_STACK_SIZE > 0) _pPool->setStackSize(POCO_THREAD_STACK_SIZE); } return _pPool; } private: ThreadPool* _pPool; FastMutex _mutex; }; namespace { static ThreadPoolSingletonHolder sh; } ThreadPool& ThreadPool::defaultPool() { return *sh.pool(); } } // namespace Poco