From 34a78dc1667ed2849313abf4c3259e001ad055e1 Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Sat, 23 Jul 2022 21:41:10 +0300 Subject: [PATCH] More work on async statements for SQLite. --- module/Library/MySQL.cpp | 9 +++ module/Library/MySQL.hpp | 5 ++ module/Library/SQLite.cpp | 6 +- module/PocoLib/Data.cpp | 164 +++++++++++++++++++++++++++++++------- module/PocoLib/Data.hpp | 17 +++- 5 files changed, 167 insertions(+), 34 deletions(-) diff --git a/module/Library/MySQL.cpp b/module/Library/MySQL.cpp index 9bf2ab1f..e21af2e1 100644 --- a/module/Library/MySQL.cpp +++ b/module/Library/MySQL.cpp @@ -606,6 +606,15 @@ MySQLConnHnd::MySQLConnHnd(Poco::Data::SessionImpl * session) mPtr = Poco::AnyCast< MYSQL * >(session->getProperty("handle")); } +// ------------------------------------------------------------------------------------------------ +MySQLConnHnd::MySQLConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session) + : MySQLConnHnd() +{ + mSession = std::move(session); + // Retrieve the internal handle property + mPtr = Poco::AnyCast< MYSQL * >(session->getProperty("handle")); +} + // ------------------------------------------------------------------------------------------------ MySQLConnHnd::~MySQLConnHnd() { diff --git a/module/Library/MySQL.hpp b/module/Library/MySQL.hpp index 2be3c28d..c4670049 100644 --- a/module/Library/MySQL.hpp +++ b/module/Library/MySQL.hpp @@ -268,6 +268,11 @@ public: */ explicit MySQLConnHnd(Poco::Data::SessionImpl * session); + /* -------------------------------------------------------------------------------------------- + * Explicit constructor. + */ + explicit MySQLConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session); + /* -------------------------------------------------------------------------------------------- * Destructor. */ diff --git a/module/Library/SQLite.cpp b/module/Library/SQLite.cpp index 0b5738d1..87999aad 100644 --- a/module/Library/SQLite.cpp +++ b/module/Library/SQLite.cpp @@ -625,16 +625,16 @@ SQLiteConnHnd::SQLiteConnHnd(Poco::Data::SessionImpl * session) { mSession.assign(session, true); // Retrieve the internal handle property - mPtr = Poco::AnyCast< sqlite3 * >(session->getProperty("handle")); + mPtr = Poco::AnyCast< sqlite3 * >(mSession->getProperty("handle")); } // ------------------------------------------------------------------------------------------------ SQLiteConnHnd::SQLiteConnHnd(Poco::AutoPtr< Poco::Data::SessionImpl > && session) : SQLiteConnHnd() { - mSession == std::forward< Poco::AutoPtr< Poco::Data::SessionImpl > >(session); + mSession = std::move(session); // Retrieve the internal handle property - mPtr = Poco::AnyCast< sqlite3 * >(session->getProperty("handle")); + mPtr = Poco::AnyCast< sqlite3 * >(mSession->getProperty("handle")); } // ------------------------------------------------------------------------------------------------ diff --git a/module/PocoLib/Data.cpp b/module/PocoLib/Data.cpp index d1711508..afee0d6f 100644 --- a/module/PocoLib/Data.cpp +++ b/module/PocoLib/Data.cpp @@ -541,6 +541,8 @@ struct SQLiteAsyncExec : public ThreadPoolItem const SQChar * mQueryStr{nullptr}; // The query string that will be executed. LightObj mQueryObj{}; // Strong reference to the query string object. // -------------------------------------------------------------------------------------------- + LightObj mCtx{}; // User specified context object, if any. + // -------------------------------------------------------------------------------------------- String mError{}; // Error message, if any. /* -------------------------------------------------------------------------------------------- @@ -609,13 +611,13 @@ struct SQLiteAsyncExec : public ThreadPoolItem if (!mResolved.IsNull()) { LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection}; - mResolved.Execute(c, mChanges, mQueryObj); + mResolved.Execute(c, mCtx, mChanges, mQueryObj); } } else if (!mRejected.IsNull()) { LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection}; - mRejected.Execute(c, mResult, mError, mQueryObj); + mRejected.Execute(c, mCtx, mResult, mError, mQueryObj); } // Finished return false; @@ -643,12 +645,19 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem // -------------------------------------------------------------------------------------------- Function mResolved{}; // Callback to invoke when the task was completed. Function mRejected{}; // Callback to invoke when the task was aborted. + Function mPrepared{}; // Callback to invoke when the task must be prepared. // -------------------------------------------------------------------------------------------- SQLiteStatement * mStatementPtr{nullptr}; // Pointer to the script statement instance. LightObj mStatementObj{}; // Strong reference to the statement instance object. // -------------------------------------------------------------------------------------------- const SQChar * mQueryStr{nullptr}; // The query string that will be executed. LightObj mQueryObj{}; // Strong reference to the query string object. + // -------------------------------------------------------------------------------------------- + LightObj mCtx{}; // User specified context object, if any. + // -------------------------------------------------------------------------------------------- + int32_t mChanges{0}; // Rows affected by this query. + bool mPrepped{false}; // Whether the statement was prepared. + bool mRow{false}; // Whether we still have rows to process. /* -------------------------------------------------------------------------------------------- * Base constructor. Members are supposed to be validated and filled by the builder/proxy. */ @@ -659,11 +668,6 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem */ ~SQLiteAsyncStmtBase() override = default; - /* -------------------------------------------------------------------------------------------- - * Provide a name to what type of task this is. Mainly for debugging purposes. - */ - SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query"; } - /* -------------------------------------------------------------------------------------------- * Provide unique information that may help identify the task. Mainly for debugging purposes. */ @@ -678,9 +682,11 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem // Coincidentally, this also dirties the handle time-stamp so, it doesn't get collected if (mConnection->mSession->isConnected()) { - if (mStatement->mPtr != nullptr) + if (mStatement->mPtr == nullptr) { mStatement->Create(mQueryStr, static_cast< SQInteger >(strlen(mQueryStr))); + // Statement was not prepared/filled with information (yet) + mPrepped = mPrepared.IsNull(); } // Prepared return true; @@ -704,14 +710,22 @@ struct SQLiteAsyncStmtBase : public ThreadPoolItem */ struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase { - int32_t mChanges{0}; // Rows affected by this query. + /* -------------------------------------------------------------------------------------------- + * Provide a name to what type of task this is. Mainly for debugging purposes. + */ + SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query exec"; } + /* -------------------------------------------------------------------------------------------- * Called in worker by the thread pool to performed by the associated tasks. * Will be called continuously while the returned value is true. While false means it finished. */ SQMOD_NODISCARD bool OnProcess() override { - mChanges = mStatementPtr->Exec(); + // Was the statement prepared? + if (mPrepped) + { + mChanges = mStatementPtr->Exec(); + } // Don't retry return false; }; @@ -721,18 +735,49 @@ struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase * If it returns true then it will be put back into the queue to be processed again. * If the boolean parameter is try then the thread-pool is in the process of shutting down. */ - SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override { - if (mStatement->mDone) + if (mPrepped && mStatement->mDone) { if (!mResolved.IsNull()) { - mResolved.Execute(mStatementObj, mChanges); + LightObj o = mResolved.Eval(mStatementObj, mCtx, mChanges); + // Should we abort the whole thing? + if (!o.IsNull() && o.Cast< bool >() == false) + { + return false; // Allow to abort itself + } + } + // No longer prepared + mPrepped = false; + } + // Allow to prepare itself, either on initial call or again after execution + if (!mPrepped && (mStatement->mStatus == SQLITE_OK || mStatement->mStatus == SQLITE_DONE)) + { + mPrepped = true; + // Is there a prepping callback? + if (!mPrepared.IsNull()) + { + // Should we reset? + if (mStatement->mStatus == SQLITE_DONE) + { + mStatementPtr->Reset(); + } + LightObj o = mPrepared.Eval(mStatementObj, mCtx); + // Should we abort the whole thing? + if (!o.IsNull()) + { + return o.Cast< bool >(); // Allow to abort itself + } + else + { + return true; // Re-queue the task by default + } } } - else if (!mRejected.IsNull()) + else if (!mRejected.IsNull() && (mStatement->mStatus != SQLITE_OK && mStatement->mStatus != SQLITE_DONE)) { - mRejected.Execute(mStatementObj, mQueryObj); + mRejected.Execute(mStatementObj, mCtx, mQueryObj); } // Finished return false; @@ -744,14 +789,22 @@ struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase */ struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase { - bool mRow{false}; // Whether we still have rows to process. + /* -------------------------------------------------------------------------------------------- + * Provide a name to what type of task this is. Mainly for debugging purposes. + */ + SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query step"; } + /* -------------------------------------------------------------------------------------------- * Called in worker by the thread pool to performed by the associated tasks. * Will be called continuously while the returned value is true. While false means it finished. */ SQMOD_NODISCARD bool OnProcess() override { - mRow = mStatementPtr->Step(); + // Was the statement prepared? + if (mPrepped) + { + mRow = mStatementPtr->Step(); + } // Don't retry return false; }; @@ -761,19 +814,38 @@ struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase * If it returns true then it will be put back into the queue to be processed again. * If the boolean parameter is try then the thread-pool is in the process of shutting down. */ - SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override { + // This is only done once, before performing any step + if (!mPrepped) + { + mPrepped = true; + // Is there a prepping callback? + if (!mPrepared.IsNull()) + { + LightObj o = mPrepared.Eval(mStatementObj, mCtx); + // Should we abort the whole thing? + if (!o.IsNull()) + { + return o.Cast< bool >(); // Allow to abort itself + } + else + { + return true; // Re-queue the task by default + } + } + } if (mStatement->mGood || mStatement->mDone) { if (!mResolved.IsNull()) { // You are expected to step the statement manually until the end - mResolved.Execute(mStatementObj); + mResolved.Execute(mStatementObj, mCtx); } } else if (!mRejected.IsNull()) { - mRejected.Execute(mStatementObj, mQueryObj); + mRejected.Execute(mStatementObj, mCtx, mQueryObj); } // Finished return false; @@ -786,15 +858,22 @@ struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase */ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase { + /* -------------------------------------------------------------------------------------------- + * Provide a name to what type of task this is. Mainly for debugging purposes. + */ + SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite incremental async query step"; } - bool mRow{false}; // Whether we still have rows to process. /* -------------------------------------------------------------------------------------------- * Called in worker by the thread pool to performed by the associated tasks. * Will be called continuously while the returned value is true. While false means it finished. */ SQMOD_NODISCARD bool OnProcess() override { - mRow = mStatementPtr->Step(); + // Was the statement prepared? + if (mPrepped) + { + mRow = mStatementPtr->Step(); + } // Don't retry return false; }; @@ -806,7 +885,26 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase */ SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override { - if (mStatement->mGood || mStatement->mDone) + // This is only done once, before performing any step + if (!mPrepped) + { + mPrepped = true; + // Is there a prepping callback? + if (!mPrepared.IsNull()) + { + LightObj o = mPrepared.Eval(mStatementObj, mCtx); + // Should we abort the whole thing? + if (!o.IsNull()) + { + return o.Cast< bool >(); // Allow to abort itself + } + else + { + return true; // Re-queue the task by default + } + } + } + if (mStatement->mGood && !mStatement->mDone) { if (!mResolved.IsNull()) { @@ -814,7 +912,7 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase if (stop) { do { - LightObj o = mResolved.Eval(mStatementObj); + LightObj o = mResolved.Eval(mStatementObj, mCtx); // Should we abort the whole thing? if (!o.IsNull() && !o.Cast< bool >()) { @@ -828,7 +926,7 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase } else { - LightObj o = mResolved.Eval(mStatementObj); + LightObj o = mResolved.Eval(mStatementObj, mCtx); // Should we abort the whole thing? if (!o.IsNull() && !o.Cast< bool >()) { @@ -837,9 +935,9 @@ struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase } } } - else if (!mRejected.IsNull()) + else if (!mRejected.IsNull() && (mStatement->mStatus != SQLITE_OK && mStatement->mStatus != SQLITE_DONE)) { - mRejected.Execute(mStatementObj, mQueryObj); + mRejected.Execute(mStatementObj, mCtx, mQueryObj); } // Re-queue if we still have rows to process return mRow; @@ -857,7 +955,7 @@ SqDataAsyncBuilder::SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackS } // ------------------------------------------------------------------------------------------------ -void SqDataAsyncBuilder::Submit() +void SqDataAsyncBuilder::Submit_(LightObj & ctx) { if (mSession.isNull()) { @@ -879,7 +977,7 @@ void SqDataAsyncBuilder::Submit() item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtExec()); } // Is this incremental? - if (mInc) + else if (mInc) { item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtIncStep()); } @@ -894,10 +992,12 @@ void SqDataAsyncBuilder::Submit() item->mStatement = SQLiteStmtRef{new SQLiteStmtHnd(item->mConnection)}; item->mResolved = std::move(mResolved); item->mRejected = std::move(mRejected); + item->mPrepared = std::move(mPrepared); item->mStatementObj = LightObj{SqTypeIdentity< SQLiteStatement >{}, SqVM(), item->mStatement}; item->mStatementPtr = item->mStatementObj.CastI< SQLiteStatement >(); item->mQueryStr = mQueryStr; item->mQueryObj = std::move(mQueryObj); + item->mCtx = std::move(ctx); // Submit the task ThreadPool::Get().Enqueue(std::move(task)); } @@ -912,6 +1012,7 @@ void SqDataAsyncBuilder::Submit() item->mRejected = std::move(mRejected); item->mQueryStr = mQueryStr; item->mQueryObj = std::move(mQueryObj); + item->mCtx = std::move(ctx); // Submit the task ThreadPool::Get().Enqueue(std::move(task)); } @@ -1215,9 +1316,12 @@ void Register_POCO_Data(HSQUIRRELVM vm, Table &) // Meta-methods .SquirrelFunc(_SC("_typename"), &SqPcSqDataAsyncBuilder::Fn) // Member Methods - .Func(_SC("Submit"), &SqDataAsyncBuilder::Submit) .CbFunc(_SC("Resolved"), &SqDataAsyncBuilder::OnResolved) .CbFunc(_SC("Rejected"), &SqDataAsyncBuilder::OnRejected) + .CbFunc(_SC("Prepared"), &SqDataAsyncBuilder::OnPrepared) + // Overloaded methods + .Overload(_SC("Submit"), &SqDataAsyncBuilder::Submit) + .Overload(_SC("Submit"), &SqDataAsyncBuilder::Submit_) ); // -------------------------------------------------------------------------------------------- ns.Bind(_SC("SessionPool"), diff --git a/module/PocoLib/Data.hpp b/module/PocoLib/Data.hpp index 6772af0e..dbe731e9 100644 --- a/module/PocoLib/Data.hpp +++ b/module/PocoLib/Data.hpp @@ -2106,6 +2106,7 @@ struct SqDataAsyncBuilder // -------------------------------------------------------------------------------------------- Function mResolved{}; // Callback to invoke when the task was completed. Function mRejected{}; // Callback to invoke when the task was aborted. + Function mPrepared{}; // Callback to invoke when the task was must be prepared. // -------------------------------------------------------------------------------------------- const SQChar * mQueryStr{nullptr}; // The query string that will be executed. LightObj mQueryObj{}; // Strong reference to the query string object. @@ -2147,7 +2148,12 @@ struct SqDataAsyncBuilder /* -------------------------------------------------------------------------------------------- * Create the task with the suplied information and submit it to the worker pool. */ - void Submit(); + void Submit() { Submit_(NullLightObj()); } + + /* -------------------------------------------------------------------------------------------- + * Create the task with the suplied information and submit it to the worker pool. + */ + void Submit_(LightObj & ctx); /* -------------------------------------------------------------------------------------------- * Set the callback to be executed if the query was resolved. @@ -2166,6 +2172,15 @@ struct SqDataAsyncBuilder mRejected = std::move(cb); return *this; // Allow chaining } + + /* -------------------------------------------------------------------------------------------- + * Set the callback to be executed in order to compose the query/statement. + */ + SqDataAsyncBuilder & OnPrepared(Function & cb) + { + mPrepared = std::move(cb); + return *this; // Allow chaining + } }; } // Namespace:: SqMod