1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-01-18 11:37:15 +01:00

More work on async statements for SQLite.

This commit is contained in:
Sandu Liviu Catalin 2022-07-23 21:41:10 +03:00
parent 49df7b75ee
commit 34a78dc166
5 changed files with 167 additions and 34 deletions

View File

@ -606,6 +606,15 @@ MySQLConnHnd::MySQLConnHnd(Poco::Data::SessionImpl * session)
mPtr = Poco::AnyCast< MYSQL * >(session->getProperty("handle")); mPtr = Poco::AnyCast< MYSQL * >(session->getProperty("handle"));
} }
// ------------------------------------------------------------------------------------------------
MySQLConnHnd::MySQLConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session)
: MySQLConnHnd()
{
mSession = std::move(session);
// Retrieve the internal handle property
mPtr = Poco::AnyCast< MYSQL * >(session->getProperty("handle"));
}
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
MySQLConnHnd::~MySQLConnHnd() MySQLConnHnd::~MySQLConnHnd()
{ {

View File

@ -268,6 +268,11 @@ public:
*/ */
explicit MySQLConnHnd(Poco::Data::SessionImpl * session); explicit MySQLConnHnd(Poco::Data::SessionImpl * session);
/* --------------------------------------------------------------------------------------------
* Explicit constructor.
*/
explicit MySQLConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session);
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Destructor. * Destructor.
*/ */

View File

@ -625,16 +625,16 @@ SQLiteConnHnd::SQLiteConnHnd(Poco::Data::SessionImpl * session)
{ {
mSession.assign(session, true); mSession.assign(session, true);
// Retrieve the internal handle property // Retrieve the internal handle property
mPtr = Poco::AnyCast< sqlite3 * >(session->getProperty("handle")); mPtr = Poco::AnyCast< sqlite3 * >(mSession->getProperty("handle"));
} }
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
SQLiteConnHnd::SQLiteConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session) SQLiteConnHnd::SQLiteConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session)
: SQLiteConnHnd() : SQLiteConnHnd()
{ {
mSession == std::forward< Poco::AutoPtr< Poco::Data::SessionImpl > >(session); mSession = std::move(session);
// Retrieve the internal handle property // Retrieve the internal handle property
mPtr = Poco::AnyCast< sqlite3 * >(session->getProperty("handle")); mPtr = Poco::AnyCast< sqlite3 * >(mSession->getProperty("handle"));
} }
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------

View File

@ -541,6 +541,8 @@ struct SQLiteAsyncExec : public ThreadPoolItem
const SQChar * mQueryStr{nullptr}; // The query string that will be executed. const SQChar * mQueryStr{nullptr}; // The query string that will be executed.
LightObj mQueryObj{}; // Strong reference to the query string object. LightObj mQueryObj{}; // Strong reference to the query string object.
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
LightObj mCtx{}; // User specified context object, if any.
// --------------------------------------------------------------------------------------------
String mError{}; // Error message, if any. String mError{}; // Error message, if any.
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -609,13 +611,13 @@ struct SQLiteAsyncExec : public ThreadPoolItem
if (!mResolved.IsNull()) if (!mResolved.IsNull())
{ {
LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection}; LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection};
mResolved.Execute(c, mChanges, mQueryObj); mResolved.Execute(c, mCtx, mChanges, mQueryObj);
} }
} }
else if (!mRejected.IsNull()) else if (!mRejected.IsNull())
{ {
LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection}; LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection};
mRejected.Execute(c, mResult, mError, mQueryObj); mRejected.Execute(c, mCtx, mResult, mError, mQueryObj);
} }
// Finished // Finished
return false; return false;
@ -643,12 +645,19 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Function mResolved{}; // Callback to invoke when the task was completed. Function mResolved{}; // Callback to invoke when the task was completed.
Function mRejected{}; // Callback to invoke when the task was aborted. Function mRejected{}; // Callback to invoke when the task was aborted.
Function mPrepared{}; // Callback to invoke when the task must be prepared.
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
SQLiteStatement * mStatementPtr{nullptr}; // Pointer to the script statement instance. SQLiteStatement * mStatementPtr{nullptr}; // Pointer to the script statement instance.
LightObj mStatementObj{}; // Strong reference to the statement instance object. LightObj mStatementObj{}; // Strong reference to the statement instance object.
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
const SQChar * mQueryStr{nullptr}; // The query string that will be executed. const SQChar * mQueryStr{nullptr}; // The query string that will be executed.
LightObj mQueryObj{}; // Strong reference to the query string object. LightObj mQueryObj{}; // Strong reference to the query string object.
// --------------------------------------------------------------------------------------------
LightObj mCtx{}; // User specified context object, if any.
// --------------------------------------------------------------------------------------------
int32_t mChanges{0}; // Rows affected by this query.
bool mPrepped{false}; // Whether the statement was prepared.
bool mRow{false}; // Whether we still have rows to process.
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Base constructor. Members are supposed to be validated and filled by the builder/proxy. * Base constructor. Members are supposed to be validated and filled by the builder/proxy.
*/ */
@ -659,11 +668,6 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem
*/ */
~SQLiteAsyncStmtBase() override = default; ~SQLiteAsyncStmtBase() override = default;
/* --------------------------------------------------------------------------------------------
* Provide a name to what type of task this is. Mainly for debugging purposes.
*/
SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query"; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Provide unique information that may help identify the task. Mainly for debugging purposes. * Provide unique information that may help identify the task. Mainly for debugging purposes.
*/ */
@ -678,9 +682,11 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem
// Coincidentally, this also dirties the handle time-stamp so, it doesn't get collected // Coincidentally, this also dirties the handle time-stamp so, it doesn't get collected
if (mConnection->mSession->isConnected()) if (mConnection->mSession->isConnected())
{ {
if (mStatement->mPtr != nullptr) if (mStatement->mPtr == nullptr)
{ {
mStatement->Create(mQueryStr, static_cast< SQInteger >(strlen(mQueryStr))); mStatement->Create(mQueryStr, static_cast< SQInteger >(strlen(mQueryStr)));
// Statement was not prepared/filled with information (yet)
mPrepped = mPrepared.IsNull();
} }
// Prepared // Prepared
return true; return true;
@ -704,14 +710,22 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem
*/ */
struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase
{ {
int32_t mChanges{0}; // Rows affected by this query. /* --------------------------------------------------------------------------------------------
* Provide a name to what type of task this is. Mainly for debugging purposes.
*/
SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query exec"; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Called in worker by the thread pool to performed by the associated tasks. * Called in worker by the thread pool to performed by the associated tasks.
* Will be called continuously while the returned value is true. While false means it finished. * Will be called continuously while the returned value is true. While false means it finished.
*/ */
SQMOD_NODISCARD bool OnProcess() override SQMOD_NODISCARD bool OnProcess() override
{ {
mChanges = mStatementPtr->Exec(); // Was the statement prepared?
if (mPrepped)
{
mChanges = mStatementPtr->Exec();
}
// Don't retry // Don't retry
return false; return false;
}; };
@ -721,18 +735,49 @@ struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase
* If it returns true then it will be put back into the queue to be processed again. * If it returns true then it will be put back into the queue to be processed again.
* If the boolean parameter is try then the thread-pool is in the process of shutting down. * If the boolean parameter is try then the thread-pool is in the process of shutting down.
*/ */
SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override
{ {
if (mStatement->mDone) if (mPrepped && mStatement->mDone)
{ {
if (!mResolved.IsNull()) if (!mResolved.IsNull())
{ {
mResolved.Execute(mStatementObj, mChanges); LightObj o = mResolved.Eval(mStatementObj, mCtx, mChanges);
// Should we abort the whole thing?
if (!o.IsNull() && o.Cast< bool >() == false)
{
return false; // Allow to abort itself
}
}
// No longer prepared
mPrepped = false;
}
// Allow to prepare itself, either on initial call or again after execution
if (!mPrepped && (mStatement->mStatus == SQLITE_OK || mStatement->mStatus == SQLITE_DONE))
{
mPrepped = true;
// Is there a prepping callback?
if (!mPrepared.IsNull())
{
// Should we reset?
if (mStatement->mStatus == SQLITE_DONE)
{
mStatementPtr->Reset();
}
LightObj o = mPrepared.Eval(mStatementObj, mCtx);
// Should we abort the whole thing?
if (!o.IsNull())
{
return o.Cast< bool >(); // Allow to abort itself
}
else
{
return true; // Re-queue the task by default
}
} }
} }
else if (!mRejected.IsNull()) else if (!mRejected.IsNull() && (mStatement->mStatus != SQLITE_OK && mStatement->mStatus != SQLITE_DONE))
{ {
mRejected.Execute(mStatementObj, mQueryObj); mRejected.Execute(mStatementObj, mCtx, mQueryObj);
} }
// Finished // Finished
return false; return false;
@ -744,14 +789,22 @@ struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase
*/ */
struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase
{ {
bool mRow{false}; // Whether we still have rows to process. /* --------------------------------------------------------------------------------------------
* Provide a name to what type of task this is. Mainly for debugging purposes.
*/
SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query step"; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Called in worker by the thread pool to performed by the associated tasks. * Called in worker by the thread pool to performed by the associated tasks.
* Will be called continuously while the returned value is true. While false means it finished. * Will be called continuously while the returned value is true. While false means it finished.
*/ */
SQMOD_NODISCARD bool OnProcess() override SQMOD_NODISCARD bool OnProcess() override
{ {
mRow = mStatementPtr->Step(); // Was the statement prepared?
if (mPrepped)
{
mRow = mStatementPtr->Step();
}
// Don't retry // Don't retry
return false; return false;
}; };
@ -761,19 +814,38 @@ struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase
* If it returns true then it will be put back into the queue to be processed again. * If it returns true then it will be put back into the queue to be processed again.
* If the boolean parameter is try then the thread-pool is in the process of shutting down. * If the boolean parameter is try then the thread-pool is in the process of shutting down.
*/ */
SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override
{ {
// This is only done once, before performing any step
if (!mPrepped)
{
mPrepped = true;
// Is there a prepping callback?
if (!mPrepared.IsNull())
{
LightObj o = mPrepared.Eval(mStatementObj, mCtx);
// Should we abort the whole thing?
if (!o.IsNull())
{
return o.Cast< bool >(); // Allow to abort itself
}
else
{
return true; // Re-queue the task by default
}
}
}
if (mStatement->mGood || mStatement->mDone) if (mStatement->mGood || mStatement->mDone)
{ {
if (!mResolved.IsNull()) if (!mResolved.IsNull())
{ {
// You are expected to step the statement manually until the end // You are expected to step the statement manually until the end
mResolved.Execute(mStatementObj); mResolved.Execute(mStatementObj, mCtx);
} }
} }
else if (!mRejected.IsNull()) else if (!mRejected.IsNull())
{ {
mRejected.Execute(mStatementObj, mQueryObj); mRejected.Execute(mStatementObj, mCtx, mQueryObj);
} }
// Finished // Finished
return false; return false;
@ -786,15 +858,22 @@ struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase
*/ */
struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase
{ {
/* --------------------------------------------------------------------------------------------
* Provide a name to what type of task this is. Mainly for debugging purposes.
*/
SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite incremental async query step"; }
bool mRow{false}; // Whether we still have rows to process.
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Called in worker by the thread pool to performed by the associated tasks. * Called in worker by the thread pool to performed by the associated tasks.
* Will be called continuously while the returned value is true. While false means it finished. * Will be called continuously while the returned value is true. While false means it finished.
*/ */
SQMOD_NODISCARD bool OnProcess() override SQMOD_NODISCARD bool OnProcess() override
{ {
mRow = mStatementPtr->Step(); // Was the statement prepared?
if (mPrepped)
{
mRow = mStatementPtr->Step();
}
// Don't retry // Don't retry
return false; return false;
}; };
@ -806,7 +885,26 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase
*/ */
SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override
{ {
if (mStatement->mGood || mStatement->mDone) // This is only done once, before performing any step
if (!mPrepped)
{
mPrepped = true;
// Is there a prepping callback?
if (!mPrepared.IsNull())
{
LightObj o = mPrepared.Eval(mStatementObj, mCtx);
// Should we abort the whole thing?
if (!o.IsNull())
{
return o.Cast< bool >(); // Allow to abort itself
}
else
{
return true; // Re-queue the task by default
}
}
}
if (mStatement->mGood && !mStatement->mDone)
{ {
if (!mResolved.IsNull()) if (!mResolved.IsNull())
{ {
@ -814,7 +912,7 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase
if (stop) if (stop)
{ {
do { do {
LightObj o = mResolved.Eval(mStatementObj); LightObj o = mResolved.Eval(mStatementObj, mCtx);
// Should we abort the whole thing? // Should we abort the whole thing?
if (!o.IsNull() && !o.Cast< bool >()) if (!o.IsNull() && !o.Cast< bool >())
{ {
@ -828,7 +926,7 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase
} }
else else
{ {
LightObj o = mResolved.Eval(mStatementObj); LightObj o = mResolved.Eval(mStatementObj, mCtx);
// Should we abort the whole thing? // Should we abort the whole thing?
if (!o.IsNull() && !o.Cast< bool >()) if (!o.IsNull() && !o.Cast< bool >())
{ {
@ -837,9 +935,9 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase
} }
} }
} }
else if (!mRejected.IsNull()) else if (!mRejected.IsNull() && (mStatement->mStatus != SQLITE_OK && mStatement->mStatus != SQLITE_DONE))
{ {
mRejected.Execute(mStatementObj, mQueryObj); mRejected.Execute(mStatementObj, mCtx, mQueryObj);
} }
// Re-queue if we still have rows to process // Re-queue if we still have rows to process
return mRow; return mRow;
@ -857,7 +955,7 @@ SqDataAsyncBuilder::SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackS
} }
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
void SqDataAsyncBuilder::Submit() void SqDataAsyncBuilder::Submit_(LightObj & ctx)
{ {
if (mSession.isNull()) if (mSession.isNull())
{ {
@ -879,7 +977,7 @@ void SqDataAsyncBuilder::Submit()
item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtExec()); item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtExec());
} }
// Is this incremental? // Is this incremental?
if (mInc) else if (mInc)
{ {
item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtIncStep()); item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtIncStep());
} }
@ -894,10 +992,12 @@ void SqDataAsyncBuilder::Submit()
item->mStatement = SQLiteStmtRef{new SQLiteStmtHnd(item->mConnection)}; item->mStatement = SQLiteStmtRef{new SQLiteStmtHnd(item->mConnection)};
item->mResolved = std::move(mResolved); item->mResolved = std::move(mResolved);
item->mRejected = std::move(mRejected); item->mRejected = std::move(mRejected);
item->mPrepared = std::move(mPrepared);
item->mStatementObj = LightObj{SqTypeIdentity< SQLiteStatement >{}, SqVM(), item->mStatement}; item->mStatementObj = LightObj{SqTypeIdentity< SQLiteStatement >{}, SqVM(), item->mStatement};
item->mStatementPtr = item->mStatementObj.CastI< SQLiteStatement >(); item->mStatementPtr = item->mStatementObj.CastI< SQLiteStatement >();
item->mQueryStr = mQueryStr; item->mQueryStr = mQueryStr;
item->mQueryObj = std::move(mQueryObj); item->mQueryObj = std::move(mQueryObj);
item->mCtx = std::move(ctx);
// Submit the task // Submit the task
ThreadPool::Get().Enqueue(std::move(task)); ThreadPool::Get().Enqueue(std::move(task));
} }
@ -912,6 +1012,7 @@ void SqDataAsyncBuilder::Submit()
item->mRejected = std::move(mRejected); item->mRejected = std::move(mRejected);
item->mQueryStr = mQueryStr; item->mQueryStr = mQueryStr;
item->mQueryObj = std::move(mQueryObj); item->mQueryObj = std::move(mQueryObj);
item->mCtx = std::move(ctx);
// Submit the task // Submit the task
ThreadPool::Get().Enqueue(std::move(task)); ThreadPool::Get().Enqueue(std::move(task));
} }
@ -1215,9 +1316,12 @@ void Register_POCO_Data(HSQUIRRELVM vm, Table &)
// Meta-methods // Meta-methods
.SquirrelFunc(_SC("_typename"), &SqPcSqDataAsyncBuilder::Fn) .SquirrelFunc(_SC("_typename"), &SqPcSqDataAsyncBuilder::Fn)
// Member Methods // Member Methods
.Func(_SC("Submit"), &SqDataAsyncBuilder::Submit)
.CbFunc(_SC("Resolved"), &SqDataAsyncBuilder::OnResolved) .CbFunc(_SC("Resolved"), &SqDataAsyncBuilder::OnResolved)
.CbFunc(_SC("Rejected"), &SqDataAsyncBuilder::OnRejected) .CbFunc(_SC("Rejected"), &SqDataAsyncBuilder::OnRejected)
.CbFunc(_SC("Prepared"), &SqDataAsyncBuilder::OnPrepared)
// Overloaded methods
.Overload(_SC("Submit"), &SqDataAsyncBuilder::Submit)
.Overload(_SC("Submit"), &SqDataAsyncBuilder::Submit_)
); );
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
ns.Bind(_SC("SessionPool"), ns.Bind(_SC("SessionPool"),

View File

@ -2106,6 +2106,7 @@ struct SqDataAsyncBuilder
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Function mResolved{}; // Callback to invoke when the task was completed. Function mResolved{}; // Callback to invoke when the task was completed.
Function mRejected{}; // Callback to invoke when the task was aborted. Function mRejected{}; // Callback to invoke when the task was aborted.
Function mPrepared{}; // Callback to invoke when the task was must be prepared.
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
const SQChar * mQueryStr{nullptr}; // The query string that will be executed. const SQChar * mQueryStr{nullptr}; // The query string that will be executed.
LightObj mQueryObj{}; // Strong reference to the query string object. LightObj mQueryObj{}; // Strong reference to the query string object.
@ -2147,7 +2148,12 @@ struct SqDataAsyncBuilder
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Create the task with the suplied information and submit it to the worker pool. * Create the task with the suplied information and submit it to the worker pool.
*/ */
void Submit(); void Submit() { Submit_(NullLightObj()); }
/* --------------------------------------------------------------------------------------------
* Create the task with the suplied information and submit it to the worker pool.
*/
void Submit_(LightObj & ctx);
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Set the callback to be executed if the query was resolved. * Set the callback to be executed if the query was resolved.
@ -2166,6 +2172,15 @@ struct SqDataAsyncBuilder
mRejected = std::move(cb); mRejected = std::move(cb);
return *this; // Allow chaining return *this; // Allow chaining
} }
/* --------------------------------------------------------------------------------------------
* Set the callback to be executed in order to compose the query/statement.
*/
SqDataAsyncBuilder & OnPrepared(Function & cb)
{
mPrepared = std::move(cb);
return *this; // Allow chaining
}
}; };
} // Namespace:: SqMod } // Namespace:: SqMod