From cd55e51d62c83a58ca19e64e2d446c484035015a Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Sun, 28 Mar 2021 00:19:09 +0200 Subject: [PATCH] Allow CPR to use the thread pool. --- module/Core.cpp | 49 +++--- module/Core/ThreadPool.cpp | 50 +++++-- module/Core/ThreadPool.hpp | 60 ++++---- module/Library/CURL.cpp | 295 +++++++++++++++++++++++++++++++++++++ module/Library/CURL.hpp | 96 ++++++++++++ module/Main.cpp | 5 +- 6 files changed, 492 insertions(+), 63 deletions(-) diff --git a/module/Core.cpp b/module/Core.cpp index 7eacc3db..10628ead 100644 --- a/module/Core.cpp +++ b/module/Core.cpp @@ -242,7 +242,7 @@ bool Core::Initialize() { ThreadPool::Get().Terminate(); return false; - } + } else cLogDbg(m_Verbosity >= 1, "Initialized %zu worker threads", ThreadPool::Get().GetThreadCount()); #ifdef VCMP_ENABLE_OFFICIAL // See if debugging options should be enabled m_Official = conf.GetBoolValue("Squirrel", "OfficialCompatibility", m_Official); @@ -517,28 +517,36 @@ void Core::Terminate(bool shutdown) const ContainerCleaner cc_checkpoints(m_Checkpoints, ENT_CHECKPOINT, !shutdown); const ContainerCleaner cc_blips(m_Blips, ENT_BLIP, !shutdown); const ContainerCleaner cc_keybinds(m_KeyBinds, ENT_KEYBIND, !shutdown); - cLogDbg(m_Verbosity >= 1, "Terminating routines an commands"); - // Release third-party - // Release all resources from routines and tasks - TerminateRoutines(); - TerminateTasks(); - // Release all resources from command managers - TerminateCommands(); - // Release all resources from signals - TerminateSignals(); - // Release all managed areas - TerminateAreas(); - // Release privilege managers - TerminatePrivileges(); - // Release announcers - AnnounceTerminate(); - // Release Poco statement results - TerminatePocoData(); - // Release ZMQ sockets - ZmqTerminate(); // Terminate the thread pool ThreadPool::Get().Terminate(); + cLogDbg(m_Verbosity >= 1, "Thread pool terminated"); + // Release all resources from routines and tasks + TerminateRoutines(); + cLogDbg(m_Verbosity >= 2, "Routines terminated"); + TerminateTasks(); + cLogDbg(m_Verbosity >= 2, "Tasks terminated"); + // Release all resources from command managers + TerminateCommands(); + cLogDbg(m_Verbosity >= 2, "Commands terminated"); + // Release all resources from signals + TerminateSignals(); + cLogDbg(m_Verbosity >= 2, "Signals terminated"); + // Release all managed areas + TerminateAreas(); + cLogDbg(m_Verbosity >= 2, "Areas terminated"); + // Release privilege managers + TerminatePrivileges(); + cLogDbg(m_Verbosity >= 2, "Privileges terminated"); + // Release announcers + AnnounceTerminate(); + cLogDbg(m_Verbosity >= 1, "Announcer terminated"); + // Release Poco statement results + TerminatePocoData(); + cLogDbg(m_Verbosity >= 1, "Poco terminated"); + // Release ZMQ sockets + ZmqTerminate(); + cLogDbg(m_Verbosity >= 1, "ZMQ terminated"); // In case there's a payload for reload m_ReloadPayload.Release(); // Release null objects in case any reference to valid objects is stored in them @@ -555,6 +563,7 @@ void Core::Terminate(bool shutdown) m_NullPickup.Release(); m_NullPlayer.Release(); m_NullVehicle.Release(); + cLogDbg(m_Verbosity >= 2, "Temporary script objects released"); // Is there a VM to close? if (m_VM) { diff --git a/module/Core/ThreadPool.cpp b/module/Core/ThreadPool.cpp index cb8c18e1..bd4e74d9 100644 --- a/module/Core/ThreadPool.cpp +++ b/module/Core/ThreadPool.cpp @@ -10,10 +10,18 @@ namespace SqMod { // ------------------------------------------------------------------------------------------------ ThreadPool ThreadPool::s_Inst; +// ------------------------------------------------------------------------------------------------ +void ProcessThreads() +{ + ThreadPool::Get().Process(); +} + // ------------------------------------------------------------------------------------------------ ThreadPool::ThreadPool() noexcept : m_Running(false) - , m_Pending() + , m_Mutex() + , m_CV() + , m_Queue() , m_Finished() , m_Threads() { @@ -31,6 +39,8 @@ ThreadPool::~ThreadPool() t.join(); // Will block until work is finished! } } + // Clear all thread instances + m_Threads.clear(); } // ------------------------------------------------------------------------------------------------ @@ -46,21 +56,12 @@ bool ThreadPool::Initialize(uint32_t count) { count = MAX_WORKER_THREADS; // Hard coded worker limit } - // See if any of the threads are active - for (auto & t : m_Threads) - { - if (t.joinable()) - { - return false; // Something is not right! - } - } // Make sure the threads don't stop after creation m_Running = true; // Create the specified amount of worker threads for (uint32_t i = 0; i < count; ++i) { - // Pass the container index to the worker thread so it knows to find itself - m_Threads.emplace_back(&ThreadPool::WorkerProc, this, i); + m_Threads.emplace_back(&ThreadPool::WorkerProc, this); } // Thread pool initialized return m_Running; @@ -76,10 +77,15 @@ void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown)) } // Tell the threads to stop m_Running = false; - // Enqueue dummy items to wake the threads and allow them to stop - for (size_t n = 0; n < m_Threads.size(); ++n) { - m_Pending.enqueue(Item()); + std::lock_guard< std::mutex > lg(m_Mutex); + // Enqueue dummy items to wake the threads and allow them to stop + for (size_t n = 0; n < m_Threads.size(); ++n) + { + m_Queue.push(Item()); + } + // Allow threads to process the dummy items and stop + m_CV.notify_all(); } // Attempt to join the threads for (auto & t : m_Threads) @@ -89,6 +95,8 @@ void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown)) t.join(); // Will block until work is finished! } } + // Clear all thread instances + m_Threads.clear(); // Retrieve each item individually and process it for (Item item; m_Finished.try_dequeue(item);) { @@ -124,7 +132,7 @@ void ThreadPool::Process() } // ------------------------------------------------------------------------------------------------ -void ThreadPool::WorkerProc(uint32_t SQ_UNUSED_ARG(idx)) +void ThreadPool::WorkerProc() { // Whether this item wants to try again bool retry = false; @@ -147,7 +155,17 @@ void ThreadPool::WorkerProc(uint32_t SQ_UNUSED_ARG(idx)) // Attempt to get an item from the queue if (!retry) { - m_Pending.wait_dequeue(item); + // Acquire a lock on the mutex + std::unique_lock< std::mutex > lock(m_Mutex); + // Wait until there are items in the queue + while (m_Queue.empty()) + { + m_CV.wait(lock); + } + // Retrieve top item the queue + item = std::move(m_Queue.front()); + // Remove it from the queue + m_Queue.pop(); } // Do we have to stop? if (!m_Running) diff --git a/module/Core/ThreadPool.hpp b/module/Core/ThreadPool.hpp index f3dec822..d9eaf448 100644 --- a/module/Core/ThreadPool.hpp +++ b/module/Core/ThreadPool.hpp @@ -5,7 +5,6 @@ // ------------------------------------------------------------------------------------------------ #include -#include // ------------------------------------------------------------------------------------------------ #include @@ -13,6 +12,7 @@ #include #include #include +#include // ------------------------------------------------------------------------------------------------ namespace SqMod { @@ -40,6 +40,11 @@ struct ThreadPoolItem */ ThreadPoolItem(ThreadPoolItem && o) = delete; + /* -------------------------------------------------------------------------------------------- + * Destructor. + */ + virtual ~ThreadPoolItem() = default; + /* -------------------------------------------------------------------------------------------- * Copy assignment operator. (disabled) */ @@ -54,19 +59,18 @@ struct ThreadPoolItem * Invoked in worker thread by the thread pool after obtaining the task from the queue. * Must return true to indicate that the task can be performed. False indicates failure. */ - virtual bool OnPrepare() { return true; } + SQMOD_NODISCARD virtual bool OnPrepare() { return true; } /* -------------------------------------------------------------------------------------------- * Called in worker by the thread pool to performed by the associated tasks. * Will be called continuously while the returned value is true. While false means it finished. */ - virtual bool OnProcess() = 0; + SQMOD_NODISCARD virtual bool OnProcess() { return false; }; /* -------------------------------------------------------------------------------------------- * Invoked in main thread by the thread pool after the task was completed. - * If true is returned, the task is added back to the queue to be processed again. */ - virtual bool OnCompleted() { return false; } + virtual void OnCompleted() { } /* -------------------------------------------------------------------------------------------- * Called in worker by the thread pool to let the task know that it will be aborted. @@ -75,20 +79,6 @@ struct ThreadPoolItem virtual void OnAborted(bool SQ_UNUSED_ARG(retry)) { } }; -/* ------------------------------------------------------------------------------------------------ - * Common implementation of a basic item. -*/ -struct BasicThreadPoolItem -{ - // -------------------------------------------------------------------------------------------- - LightObj mCallback{}; - - /* -------------------------------------------------------------------------------------------- - * Default constructor. - */ - BasicThreadPoolItem() noexcept = default; -}; - /* ------------------------------------------------------------------------------------------------ * Internal thread pool used to reduce stuttering from the plug-in whenever necessary and/or possible. */ @@ -115,23 +105,25 @@ private: using Pool = std::vector< std::thread >; // Worker container. using Item = std::unique_ptr< ThreadPoolItem >; // Owning pointer of an item. // -------------------------------------------------------------------------------------------- - using Pending = moodycamel::BlockingConcurrentQueue< Item >; // Pending items. using Finished = moodycamel::ConcurrentQueue< Item >; // Finished items. // -------------------------------------------------------------------------------------------- - std::atomic_bool m_Running; // Whether the threads are allowed to run. + std::atomic_bool m_Running; // Whether the threads are allowed to run. // -------------------------------------------------------------------------------------------- - Pending m_Pending; // Blocking concurrent queue of pending items. - Finished m_Finished; // Non-blocking concurrent queue of finished items. + std::mutex m_Mutex; + std::condition_variable m_CV; + std::queue< Item > m_Queue; // -------------------------------------------------------------------------------------------- - Pool m_Threads; // Pool of worker threads. + Finished m_Finished; // Non-blocking concurrent queue of finished items. + // -------------------------------------------------------------------------------------------- + Pool m_Threads; // Pool of worker threads. private: /* -------------------------------------------------------------------------------------------- * Internal function used to process tasks. */ - void WorkerProc(uint32_t idx); + void WorkerProc(); public: @@ -188,7 +180,14 @@ public: // Only queue if worker threads exist if (!m_Threads.empty()) { - m_Pending.enqueue(Item(item)); + // Acquire a lock on the mutex + std::unique_lock< std::mutex > lock(m_Mutex); + // Push the item in the queue + m_Queue.push(Item(item)); + // Release the mutex before notifying + lock.unlock(); + // Notify one thread that there's work + m_CV.notify_one(); } else { @@ -204,6 +203,15 @@ public: item->OnCompleted(); } } + + /* -------------------------------------------------------------------------------------------- + * Retrieve the number of worker threads. + */ + SQMOD_NODISCARD size_t GetThreadCount() + { + return m_Threads.size(); + } + }; } // Namespace:: SqMod diff --git a/module/Library/CURL.cpp b/module/Library/CURL.cpp index d7a38ff7..953be0ce 100644 --- a/module/Library/CURL.cpp +++ b/module/Library/CURL.cpp @@ -18,6 +18,291 @@ SQMOD_DECL_TYPENAME(SqCpPayload, _SC("SqCprPayload")) SQMOD_DECL_TYPENAME(SqCpProxies, _SC("SqCprProxies")) SQMOD_DECL_TYPENAME(SqCpSession, _SC("SqCprSession")) +/* ------------------------------------------------------------------------------------------------ + * Common session action implementation. +*/ +struct CpBaseAction : public ThreadPoolItem +{ + // -------------------------------------------------------------------------------------------- + CpSession * mInstance{nullptr}; // Associated session. + Function mCallback{}; // Function to call when completed. + LightObj mObject{}; // Prevent the session from being destroyed. + cpr::Response mResponse{}; + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpBaseAction(CpSession * session, Function & cb, LightObj && obj) + : mInstance(session) + , mCallback(std::move(cb)) + , mObject(std::move(obj)) + , mResponse() + { + } + + /* -------------------------------------------------------------------------------------------- + * Destructor. + */ + ~CpBaseAction() override = default; + + /* -------------------------------------------------------------------------------------------- + * Task completed callback. + */ + void OnCompleted() override + { + // Is there a callback? + if (!mCallback.IsNull()) + { + mCallback(mObject, CpResponse(std::move(mResponse))); // Invoke it + } + // Unlock the session + mInstance->mPending = nullptr; + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override { return false; } +}; + +/* ------------------------------------------------------------------------------------------------ + * Delete action implementation. +*/ +struct CpDeleteAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpDeleteAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Delete(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoDelete_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpDeleteAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + +/* ------------------------------------------------------------------------------------------------ + * Get action implementation. +*/ +struct CpGetAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpGetAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Get(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoGet_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpGetAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + +/* ------------------------------------------------------------------------------------------------ + * Head action implementation. +*/ +struct CpHeadAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpHeadAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Head(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoHead_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpHeadAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + +/* ------------------------------------------------------------------------------------------------ + * Options action implementation. +*/ +struct CpOptionsAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpOptionsAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Options(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoOptions_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpOptionsAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + +/* ------------------------------------------------------------------------------------------------ + * Patch action implementation. +*/ +struct CpPatchAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpPatchAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Patch(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoPatch_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpPatchAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + +/* ------------------------------------------------------------------------------------------------ + * Post action implementation. +*/ +struct CpPostAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpPostAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Post(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoPost_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpPostAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + +/* ------------------------------------------------------------------------------------------------ + * Put action implementation. +*/ +struct CpPutAction : public CpBaseAction +{ + + /* -------------------------------------------------------------------------------------------- + * Base constructor. + */ + CpPutAction(CpSession * session, Function & cb, LightObj && obj) + : CpBaseAction(session, cb, std::move(obj)) + { + } + + /* -------------------------------------------------------------------------------------------- + * Task process callback. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mResponse = mInstance->Put(); + return false; // We do this once + } +}; + +// ------------------------------------------------------------------------------------------------ +void CpSession::DoPut_(Function & cb) +{ + LockCheck(); + // Create the task and lock session + mPending = new CpPutAction(this, cb, LightObj(1, SqVM())); + // Queue the task to be processed + ThreadPool::Get().Enqueue(mPending); +} + // ------------------------------------------------------------------------------------------------ static const EnumElement g_ErrorCodes[] = { {_SC("OK"), SQInteger(cpr::ErrorCode::OK)}, @@ -325,8 +610,11 @@ void Register_CURL(HSQUIRRELVM vm) Class< CpSession, NoCopy< CpSession > >(vm, SqCpSession::Str) // Constructors .Ctor() + .Ctor< StackStrF & >() // Meta-methods .SquirrelFunc(_SC("_typename"), &SqCpSession::Fn) + // Member Properties + .Prop(_SC("Locked"), &CpSession::IsLocked) // Member Methods .FmtFunc(_SC("SetURL"), &CpSession::SetURL_) .Func(_SC("SetParameters"), &CpSession::SetParameters_) @@ -358,6 +646,13 @@ void Register_CURL(HSQUIRRELVM vm) .Func(_SC("Patch"), &CpSession::DoPatch) .Func(_SC("Post"), &CpSession::DoPost) .Func(_SC("Put"), &CpSession::DoPut) + .Func(_SC("AsyncDelete"), &CpSession::DoDelete_) + .Func(_SC("AsyncGet"), &CpSession::DoGet_) + .Func(_SC("AsyncHead"), &CpSession::DoHead_) + .Func(_SC("AsyncOptions"), &CpSession::DoOptions_) + .Func(_SC("AsyncPatch"), &CpSession::DoPatch_) + .Func(_SC("AsyncPost"), &CpSession::DoPost_) + .Func(_SC("AsyncPut"), &CpSession::DoPut_) ); RootTable(vm).Bind(_SC("SqCPR"), cpns); diff --git a/module/Library/CURL.hpp b/module/Library/CURL.hpp index 9c6edac7..0d6b495d 100644 --- a/module/Library/CURL.hpp +++ b/module/Library/CURL.hpp @@ -2,6 +2,7 @@ // ------------------------------------------------------------------------------------------------ #include "Core/Common.hpp" +#include "Core/ThreadPool.hpp" // ------------------------------------------------------------------------------------------------ #include @@ -1608,11 +1609,23 @@ struct CpProxies : public cpr::Proxies */ struct CpSession : public cpr::Session { + // Pointer to the pending action associated with this session, if any. + ThreadPoolItem * mPending{nullptr}; + /* -------------------------------------------------------------------------------------------- * Default constructor. */ CpSession() = default; + /* -------------------------------------------------------------------------------------------- + * URL constructor. + */ + CpSession(StackStrF & url) + : cpr::Session() + { + cpr::Session::SetUrl(cpr::Url(url.mPtr, url.GetSize())); + } + /* -------------------------------------------------------------------------------------------- * Copy constructor (disabled). */ @@ -1638,11 +1651,31 @@ struct CpSession : public cpr::Session */ CpSession & operator = (CpSession &&) noexcept = default; + /* -------------------------------------------------------------------------------------------- + * Throw exception if the session is locked. + */ + void LockCheck() + { + if (mPending) + { + STHROWF("Session is currently locked by pending action."); + } + } + + /* -------------------------------------------------------------------------------------------- + * Check if the session is locked. + */ + SQMOD_NODISCARD bool IsLocked() const + { + return mPending != nullptr; + } + /* -------------------------------------------------------------------------------------------- * Modify URL option. */ CpSession & SetURL_(StackStrF & url) { + LockCheck(); cpr::Session::SetUrl(cpr::Url(url.mPtr, url.GetSize())); return *this; // Allow chaining } @@ -1652,6 +1685,7 @@ struct CpSession : public cpr::Session */ CpSession & SetParameters_(const CpParameters & parameters) { + LockCheck(); cpr::Session::SetParameters(parameters); return *this; // Allow chaining } @@ -1661,6 +1695,7 @@ struct CpSession : public cpr::Session */ CpSession & YieldParameters(CpParameters & parameters) { + LockCheck(); cpr::Session::SetParameters(std::move(static_cast< cpr::Parameters & >(parameters))); return *this; // Allow chaining } @@ -1670,6 +1705,7 @@ struct CpSession : public cpr::Session */ CpSession & SetHeader_(const CpHeader & header) { + LockCheck(); cpr::Session::SetHeader(header.mMap); return *this; // Allow chaining } @@ -1679,6 +1715,7 @@ struct CpSession : public cpr::Session */ CpSession & SetTimeout_(SQInteger ms) { + LockCheck(); cpr::Session::SetTimeout(cpr::Timeout(std::chrono::milliseconds{ms})); return *this; // Allow chaining } @@ -1688,6 +1725,7 @@ struct CpSession : public cpr::Session */ CpSession & SetConnectTimeout_(SQInteger ms) { + LockCheck(); cpr::Session::SetConnectTimeout(cpr::ConnectTimeout(std::chrono::milliseconds{ms})); return *this; // Allow chaining } @@ -1697,6 +1735,7 @@ struct CpSession : public cpr::Session */ CpSession & SetAuth_(StackStrF & username, StackStrF & password) { + LockCheck(); cpr::Session::SetAuth(cpr::Authentication(username.ToStr(), password.ToStr())); return *this; // Allow chaining } @@ -1706,6 +1745,7 @@ struct CpSession : public cpr::Session */ CpSession & SetDigest_(StackStrF & username, StackStrF & password) { + LockCheck(); cpr::Session::SetAuth(cpr::Digest(username.ToStr(), password.ToStr())); return *this; // Allow chaining } @@ -1715,6 +1755,7 @@ struct CpSession : public cpr::Session */ CpSession & SetUserAgent_(StackStrF & agent) { + LockCheck(); cpr::Session::SetUserAgent(cpr::UserAgent(agent.mPtr, agent.GetSize())); return *this; // Allow chaining } @@ -1724,6 +1765,7 @@ struct CpSession : public cpr::Session */ CpSession & SetPayload_(const CpPayload & payload) { + LockCheck(); cpr::Session::SetPayload(payload); return *this; // Allow chaining } @@ -1733,6 +1775,7 @@ struct CpSession : public cpr::Session */ CpSession & YieldPayload(CpPayload & payload) { + LockCheck(); cpr::Session::SetPayload(std::move(static_cast< cpr::Payload & >(payload))); return *this; // Allow chaining } @@ -1742,6 +1785,7 @@ struct CpSession : public cpr::Session */ CpSession & SetProxies_(const CpProxies & proxies) { + LockCheck(); cpr::Session::SetProxies(proxies); return *this; // Allow chaining } @@ -1751,6 +1795,7 @@ struct CpSession : public cpr::Session */ CpSession & YieldProxies(CpProxies & proxies) { + LockCheck(); cpr::Session::SetProxies(std::move(static_cast< cpr::Proxies & >(proxies))); return *this; // Allow chaining } @@ -1774,6 +1819,7 @@ struct CpSession : public cpr::Session */ CpSession & SetNTLM_(StackStrF & username, StackStrF & password) { + LockCheck(); cpr::Session::SetNTLM(cpr::NTLM(username.ToStr(), password.ToStr())); return *this; // Allow chaining } @@ -1783,6 +1829,7 @@ struct CpSession : public cpr::Session */ CpSession & SetRedirect_(bool redirect) { + LockCheck(); cpr::Session::SetRedirect(redirect); return *this; // Allow chaining } @@ -1792,6 +1839,7 @@ struct CpSession : public cpr::Session */ CpSession & SetMaxRedirects_(SQInteger max_redirects) { + LockCheck(); cpr::Session::SetMaxRedirects(cpr::MaxRedirects(static_cast< int32_t >(max_redirects))); return *this; // Allow chaining } @@ -1801,6 +1849,7 @@ struct CpSession : public cpr::Session */ CpSession & SetCookies_(const CpCookies & cookies) { + LockCheck(); cpr::Session::SetCookies(cookies); return *this; // Allow chaining } @@ -1810,6 +1859,7 @@ struct CpSession : public cpr::Session */ CpSession & SetBody_(StackStrF & body) { + LockCheck(); cpr::Session::SetBody(cpr::Body(body.mPtr, body.GetSize())); return *this; // Allow chaining } @@ -1819,6 +1869,7 @@ struct CpSession : public cpr::Session */ CpSession & SetLowSpeed_(SQInteger limit, SQInteger time) { + LockCheck(); cpr::Session::SetLowSpeed(cpr::LowSpeed(static_cast< int32_t >(limit), static_cast< int32_t >(time))); return *this; // Allow chaining } @@ -1828,6 +1879,7 @@ struct CpSession : public cpr::Session */ CpSession & SetVerifySsl_(bool verify) { + LockCheck(); cpr::Session::SetVerifySsl(cpr::VerifySsl(verify)); return *this; // Allow chaining } @@ -1837,6 +1889,7 @@ struct CpSession : public cpr::Session */ CpSession & SetUnixSocket_(StackStrF & socket) { + LockCheck(); cpr::Session::SetUnixSocket(cpr::UnixSocket(socket.ToStr())); return *this; // Allow chaining } @@ -1846,6 +1899,7 @@ struct CpSession : public cpr::Session */ CpSession & SetSslOptions_(const CpSslOptions & options) { + LockCheck(); cpr::Session::SetSslOptions(options); return *this; // Allow chaining } @@ -1890,6 +1944,7 @@ struct CpSession : public cpr::Session */ CpSession & SetVerbose_(bool verbose) { + LockCheck(); cpr::Session::SetVerbose(cpr::Verbose(verbose)); return *this; // Allow chaining } @@ -1899,6 +1954,7 @@ struct CpSession : public cpr::Session */ CpResponse DoDelete() { + LockCheck(); return CpResponse(cpr::Session::Delete()); } @@ -1907,6 +1963,7 @@ struct CpSession : public cpr::Session */ CpResponse DoGet() { + LockCheck(); return CpResponse(cpr::Session::Get()); } @@ -1915,6 +1972,7 @@ struct CpSession : public cpr::Session */ CpResponse DoHead() { + LockCheck(); return CpResponse(cpr::Session::Head()); } @@ -1923,6 +1981,7 @@ struct CpSession : public cpr::Session */ CpResponse DoOptions() { + LockCheck(); return CpResponse(cpr::Session::Options()); } @@ -1931,6 +1990,7 @@ struct CpSession : public cpr::Session */ CpResponse DoPatch() { + LockCheck(); return CpResponse(cpr::Session::Patch()); } @@ -1939,6 +1999,7 @@ struct CpSession : public cpr::Session */ CpResponse DoPost() { + LockCheck(); return CpResponse(cpr::Session::Post()); } @@ -1947,12 +2008,47 @@ struct CpSession : public cpr::Session */ CpResponse DoPut() { + LockCheck(); return CpResponse(cpr::Session::Put()); } //CpResponse Download(const WriteCallback& write); //CpResponse Download(std::ofstream& file); + /* -------------------------------------------------------------------------------------------- + * Delete async request. + */ + void DoDelete_(Function & cb); + + /* -------------------------------------------------------------------------------------------- + * Get async request. + */ + void DoGet_(Function & cb); + + /* -------------------------------------------------------------------------------------------- + * Head async request. + */ + void DoHead_(Function & cb); + + /* -------------------------------------------------------------------------------------------- + * Options async request. + */ + void DoOptions_(Function & cb); + + /* -------------------------------------------------------------------------------------------- + * Patch async request. + */ + void DoPatch_(Function & cb); + + /* -------------------------------------------------------------------------------------------- + * Post async request. + */ + void DoPost_(Function & cb); + + /* -------------------------------------------------------------------------------------------- + * Put async request. + */ + void DoPut_(Function & cb); }; } // Namespace:: SqMod diff --git a/module/Main.cpp b/module/Main.cpp index 26c03295..731d0da5 100644 --- a/module/Main.cpp +++ b/module/Main.cpp @@ -19,8 +19,9 @@ static bool g_Reload = false; // ------------------------------------------------------------------------------------------------ //extern void InitExports(); extern void InitializePocoDataConnectors(); -extern void ProcessTasks(); extern void ProcessRoutines(); +extern void ProcessTasks(); +extern void ProcessThreads(); /* ------------------------------------------------------------------------------------------------ * Will the scripts be reloaded at the end of the current event? @@ -163,6 +164,8 @@ static void OnServerFrame(float elapsed_time) // Process routines and tasks, if any ProcessRoutines(); ProcessTasks(); + // Process threads + ProcessThreads(); // Process log messages from other threads Logger::Get().ProcessQueue(); // See if a reload was requested