1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-06-16 07:07:13 +02:00

Allow CPR to use the thread pool.

This commit is contained in:
Sandu Liviu Catalin
2021-03-28 00:19:09 +02:00
parent 6a0b7f246e
commit cd55e51d62
6 changed files with 492 additions and 63 deletions

View File

@ -18,6 +18,291 @@ SQMOD_DECL_TYPENAME(SqCpPayload, _SC("SqCprPayload"))
SQMOD_DECL_TYPENAME(SqCpProxies, _SC("SqCprProxies"))
SQMOD_DECL_TYPENAME(SqCpSession, _SC("SqCprSession"))
/* ------------------------------------------------------------------------------------------------
* Common session action implementation.
*/
struct CpBaseAction : public ThreadPoolItem
{
// --------------------------------------------------------------------------------------------
CpSession * mInstance{nullptr}; // Associated session.
Function mCallback{}; // Function to call when completed.
LightObj mObject{}; // Prevent the session from being destroyed.
cpr::Response mResponse{};
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpBaseAction(CpSession * session, Function & cb, LightObj && obj)
: mInstance(session)
, mCallback(std::move(cb))
, mObject(std::move(obj))
, mResponse()
{
}
/* --------------------------------------------------------------------------------------------
* Destructor.
*/
~CpBaseAction() override = default;
/* --------------------------------------------------------------------------------------------
* Task completed callback.
*/
void OnCompleted() override
{
// Is there a callback?
if (!mCallback.IsNull())
{
mCallback(mObject, CpResponse(std::move(mResponse))); // Invoke it
}
// Unlock the session
mInstance->mPending = nullptr;
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override { return false; }
};
/* ------------------------------------------------------------------------------------------------
* Delete action implementation.
*/
struct CpDeleteAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpDeleteAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Delete();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoDelete_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpDeleteAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
/* ------------------------------------------------------------------------------------------------
* Get action implementation.
*/
struct CpGetAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpGetAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Get();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoGet_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpGetAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
/* ------------------------------------------------------------------------------------------------
* Head action implementation.
*/
struct CpHeadAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpHeadAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Head();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoHead_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpHeadAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
/* ------------------------------------------------------------------------------------------------
* Options action implementation.
*/
struct CpOptionsAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpOptionsAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Options();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoOptions_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpOptionsAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
/* ------------------------------------------------------------------------------------------------
* Patch action implementation.
*/
struct CpPatchAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpPatchAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Patch();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoPatch_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpPatchAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
/* ------------------------------------------------------------------------------------------------
* Post action implementation.
*/
struct CpPostAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpPostAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Post();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoPost_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpPostAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
/* ------------------------------------------------------------------------------------------------
* Put action implementation.
*/
struct CpPutAction : public CpBaseAction
{
/* --------------------------------------------------------------------------------------------
* Base constructor.
*/
CpPutAction(CpSession * session, Function & cb, LightObj && obj)
: CpBaseAction(session, cb, std::move(obj))
{
}
/* --------------------------------------------------------------------------------------------
* Task process callback.
*/
SQMOD_NODISCARD bool OnProcess() override
{
mResponse = mInstance->Put();
return false; // We do this once
}
};
// ------------------------------------------------------------------------------------------------
void CpSession::DoPut_(Function & cb)
{
LockCheck();
// Create the task and lock session
mPending = new CpPutAction(this, cb, LightObj(1, SqVM()));
// Queue the task to be processed
ThreadPool::Get().Enqueue(mPending);
}
// ------------------------------------------------------------------------------------------------
static const EnumElement g_ErrorCodes[] = {
{_SC("OK"), SQInteger(cpr::ErrorCode::OK)},
@ -325,8 +610,11 @@ void Register_CURL(HSQUIRRELVM vm)
Class< CpSession, NoCopy< CpSession > >(vm, SqCpSession::Str)
// Constructors
.Ctor()
.Ctor< StackStrF & >()
// Meta-methods
.SquirrelFunc(_SC("_typename"), &SqCpSession::Fn)
// Member Properties
.Prop(_SC("Locked"), &CpSession::IsLocked)
// Member Methods
.FmtFunc(_SC("SetURL"), &CpSession::SetURL_)
.Func(_SC("SetParameters"), &CpSession::SetParameters_)
@ -358,6 +646,13 @@ void Register_CURL(HSQUIRRELVM vm)
.Func(_SC("Patch"), &CpSession::DoPatch)
.Func(_SC("Post"), &CpSession::DoPost)
.Func(_SC("Put"), &CpSession::DoPut)
.Func(_SC("AsyncDelete"), &CpSession::DoDelete_)
.Func(_SC("AsyncGet"), &CpSession::DoGet_)
.Func(_SC("AsyncHead"), &CpSession::DoHead_)
.Func(_SC("AsyncOptions"), &CpSession::DoOptions_)
.Func(_SC("AsyncPatch"), &CpSession::DoPatch_)
.Func(_SC("AsyncPost"), &CpSession::DoPost_)
.Func(_SC("AsyncPut"), &CpSession::DoPut_)
);
RootTable(vm).Bind(_SC("SqCPR"), cpns);

View File

@ -2,6 +2,7 @@
// ------------------------------------------------------------------------------------------------
#include "Core/Common.hpp"
#include "Core/ThreadPool.hpp"
// ------------------------------------------------------------------------------------------------
#include <cpr/cpr.h>
@ -1608,11 +1609,23 @@ struct CpProxies : public cpr::Proxies
*/
struct CpSession : public cpr::Session
{
// Pointer to the pending action associated with this session, if any.
ThreadPoolItem * mPending{nullptr};
/* --------------------------------------------------------------------------------------------
* Default constructor.
*/
CpSession() = default;
/* --------------------------------------------------------------------------------------------
* URL constructor.
*/
CpSession(StackStrF & url)
: cpr::Session()
{
cpr::Session::SetUrl(cpr::Url(url.mPtr, url.GetSize()));
}
/* --------------------------------------------------------------------------------------------
* Copy constructor (disabled).
*/
@ -1638,11 +1651,31 @@ struct CpSession : public cpr::Session
*/
CpSession & operator = (CpSession &&) noexcept = default;
/* --------------------------------------------------------------------------------------------
* Throw exception if the session is locked.
*/
void LockCheck()
{
if (mPending)
{
STHROWF("Session is currently locked by pending action.");
}
}
/* --------------------------------------------------------------------------------------------
* Check if the session is locked.
*/
SQMOD_NODISCARD bool IsLocked() const
{
return mPending != nullptr;
}
/* --------------------------------------------------------------------------------------------
* Modify URL option.
*/
CpSession & SetURL_(StackStrF & url)
{
LockCheck();
cpr::Session::SetUrl(cpr::Url(url.mPtr, url.GetSize()));
return *this; // Allow chaining
}
@ -1652,6 +1685,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetParameters_(const CpParameters & parameters)
{
LockCheck();
cpr::Session::SetParameters(parameters);
return *this; // Allow chaining
}
@ -1661,6 +1695,7 @@ struct CpSession : public cpr::Session
*/
CpSession & YieldParameters(CpParameters & parameters)
{
LockCheck();
cpr::Session::SetParameters(std::move(static_cast< cpr::Parameters & >(parameters)));
return *this; // Allow chaining
}
@ -1670,6 +1705,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetHeader_(const CpHeader & header)
{
LockCheck();
cpr::Session::SetHeader(header.mMap);
return *this; // Allow chaining
}
@ -1679,6 +1715,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetTimeout_(SQInteger ms)
{
LockCheck();
cpr::Session::SetTimeout(cpr::Timeout(std::chrono::milliseconds{ms}));
return *this; // Allow chaining
}
@ -1688,6 +1725,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetConnectTimeout_(SQInteger ms)
{
LockCheck();
cpr::Session::SetConnectTimeout(cpr::ConnectTimeout(std::chrono::milliseconds{ms}));
return *this; // Allow chaining
}
@ -1697,6 +1735,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetAuth_(StackStrF & username, StackStrF & password)
{
LockCheck();
cpr::Session::SetAuth(cpr::Authentication(username.ToStr(), password.ToStr()));
return *this; // Allow chaining
}
@ -1706,6 +1745,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetDigest_(StackStrF & username, StackStrF & password)
{
LockCheck();
cpr::Session::SetAuth(cpr::Digest(username.ToStr(), password.ToStr()));
return *this; // Allow chaining
}
@ -1715,6 +1755,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetUserAgent_(StackStrF & agent)
{
LockCheck();
cpr::Session::SetUserAgent(cpr::UserAgent(agent.mPtr, agent.GetSize()));
return *this; // Allow chaining
}
@ -1724,6 +1765,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetPayload_(const CpPayload & payload)
{
LockCheck();
cpr::Session::SetPayload(payload);
return *this; // Allow chaining
}
@ -1733,6 +1775,7 @@ struct CpSession : public cpr::Session
*/
CpSession & YieldPayload(CpPayload & payload)
{
LockCheck();
cpr::Session::SetPayload(std::move(static_cast< cpr::Payload & >(payload)));
return *this; // Allow chaining
}
@ -1742,6 +1785,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetProxies_(const CpProxies & proxies)
{
LockCheck();
cpr::Session::SetProxies(proxies);
return *this; // Allow chaining
}
@ -1751,6 +1795,7 @@ struct CpSession : public cpr::Session
*/
CpSession & YieldProxies(CpProxies & proxies)
{
LockCheck();
cpr::Session::SetProxies(std::move(static_cast< cpr::Proxies & >(proxies)));
return *this; // Allow chaining
}
@ -1774,6 +1819,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetNTLM_(StackStrF & username, StackStrF & password)
{
LockCheck();
cpr::Session::SetNTLM(cpr::NTLM(username.ToStr(), password.ToStr()));
return *this; // Allow chaining
}
@ -1783,6 +1829,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetRedirect_(bool redirect)
{
LockCheck();
cpr::Session::SetRedirect(redirect);
return *this; // Allow chaining
}
@ -1792,6 +1839,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetMaxRedirects_(SQInteger max_redirects)
{
LockCheck();
cpr::Session::SetMaxRedirects(cpr::MaxRedirects(static_cast< int32_t >(max_redirects)));
return *this; // Allow chaining
}
@ -1801,6 +1849,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetCookies_(const CpCookies & cookies)
{
LockCheck();
cpr::Session::SetCookies(cookies);
return *this; // Allow chaining
}
@ -1810,6 +1859,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetBody_(StackStrF & body)
{
LockCheck();
cpr::Session::SetBody(cpr::Body(body.mPtr, body.GetSize()));
return *this; // Allow chaining
}
@ -1819,6 +1869,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetLowSpeed_(SQInteger limit, SQInteger time)
{
LockCheck();
cpr::Session::SetLowSpeed(cpr::LowSpeed(static_cast< int32_t >(limit), static_cast< int32_t >(time)));
return *this; // Allow chaining
}
@ -1828,6 +1879,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetVerifySsl_(bool verify)
{
LockCheck();
cpr::Session::SetVerifySsl(cpr::VerifySsl(verify));
return *this; // Allow chaining
}
@ -1837,6 +1889,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetUnixSocket_(StackStrF & socket)
{
LockCheck();
cpr::Session::SetUnixSocket(cpr::UnixSocket(socket.ToStr()));
return *this; // Allow chaining
}
@ -1846,6 +1899,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetSslOptions_(const CpSslOptions & options)
{
LockCheck();
cpr::Session::SetSslOptions(options);
return *this; // Allow chaining
}
@ -1890,6 +1944,7 @@ struct CpSession : public cpr::Session
*/
CpSession & SetVerbose_(bool verbose)
{
LockCheck();
cpr::Session::SetVerbose(cpr::Verbose(verbose));
return *this; // Allow chaining
}
@ -1899,6 +1954,7 @@ struct CpSession : public cpr::Session
*/
CpResponse DoDelete()
{
LockCheck();
return CpResponse(cpr::Session::Delete());
}
@ -1907,6 +1963,7 @@ struct CpSession : public cpr::Session
*/
CpResponse DoGet()
{
LockCheck();
return CpResponse(cpr::Session::Get());
}
@ -1915,6 +1972,7 @@ struct CpSession : public cpr::Session
*/
CpResponse DoHead()
{
LockCheck();
return CpResponse(cpr::Session::Head());
}
@ -1923,6 +1981,7 @@ struct CpSession : public cpr::Session
*/
CpResponse DoOptions()
{
LockCheck();
return CpResponse(cpr::Session::Options());
}
@ -1931,6 +1990,7 @@ struct CpSession : public cpr::Session
*/
CpResponse DoPatch()
{
LockCheck();
return CpResponse(cpr::Session::Patch());
}
@ -1939,6 +1999,7 @@ struct CpSession : public cpr::Session
*/
CpResponse DoPost()
{
LockCheck();
return CpResponse(cpr::Session::Post());
}
@ -1947,12 +2008,47 @@ struct CpSession : public cpr::Session
*/
CpResponse DoPut()
{
LockCheck();
return CpResponse(cpr::Session::Put());
}
//CpResponse Download(const WriteCallback& write);
//CpResponse Download(std::ofstream& file);
/* --------------------------------------------------------------------------------------------
* Delete async request.
*/
void DoDelete_(Function & cb);
/* --------------------------------------------------------------------------------------------
* Get async request.
*/
void DoGet_(Function & cb);
/* --------------------------------------------------------------------------------------------
* Head async request.
*/
void DoHead_(Function & cb);
/* --------------------------------------------------------------------------------------------
* Options async request.
*/
void DoOptions_(Function & cb);
/* --------------------------------------------------------------------------------------------
* Patch async request.
*/
void DoPatch_(Function & cb);
/* --------------------------------------------------------------------------------------------
* Post async request.
*/
void DoPost_(Function & cb);
/* --------------------------------------------------------------------------------------------
* Put async request.
*/
void DoPut_(Function & cb);
};
} // Namespace:: SqMod