From 49df7b75ee626c3d93257c4129f737c1f5a71798 Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Sat, 23 Jul 2022 19:27:40 +0300 Subject: [PATCH] Asynchronous statements implementation for SQLite. Extend worker threads to allow tasks to re-queue themselves during completion. --- module/Core/ThreadPool.cpp | 8 +- module/Core/ThreadPool.hpp | 4 +- module/Library/CURL.cpp | 8 +- module/Library/SQLite.hpp | 11 +- module/PocoLib/Data.cpp | 310 +++++++++++++++++++++++++++++++++---- module/PocoLib/Data.hpp | 20 ++- 6 files changed, 324 insertions(+), 37 deletions(-) diff --git a/module/Core/ThreadPool.cpp b/module/Core/ThreadPool.cpp index 9f8aa8e0..fcd163fe 100644 --- a/module/Core/ThreadPool.cpp +++ b/module/Core/ThreadPool.cpp @@ -103,7 +103,7 @@ void ThreadPool::Terminate(bool SQ_UNUSED_ARG(shutdown)) // Is the item valid? if (item) { - item->OnCompleted(); // Allow the item to finish itself + [[maybe_unused]] auto _ = item->OnCompleted(true); // Allow the item to finish itself } // Item processed item.reset(); @@ -126,7 +126,11 @@ void ThreadPool::Process() if (item) { try { - item->OnCompleted(); // Allow the item to finish itself + // Allow the item to finish itself + if (item->OnCompleted(false)) + { + Enqueue(std::move(item)); // Queue again + } } catch (const std::exception & e) { LogErr("Exception occured in %s completion stage [%s] for [%s]", item->TypeName(), e.what(), item->IdentifiableInfo()); } diff --git a/module/Core/ThreadPool.hpp b/module/Core/ThreadPool.hpp index 4767d836..f56348ba 100644 --- a/module/Core/ThreadPool.hpp +++ b/module/Core/ThreadPool.hpp @@ -79,8 +79,10 @@ struct ThreadPoolItem /* -------------------------------------------------------------------------------------------- * Invoked in main thread by the thread pool after the task was completed. + * If it returns true then it will be put back into the queue to be processed again. + * If the boolean parameter is trye then the thread-pool is in the process of shutting down. */ - virtual void OnCompleted() { } + SQMOD_NODISCARD virtual bool OnCompleted(bool SQ_UNUSED_ARG(stop)) { return false; } /* -------------------------------------------------------------------------------------------- * Called in worker by the thread pool to let the task know that it will be aborted. diff --git a/module/Library/CURL.cpp b/module/Library/CURL.cpp index 91af6992..18643f84 100644 --- a/module/Library/CURL.cpp +++ b/module/Library/CURL.cpp @@ -47,9 +47,11 @@ struct CpBaseAction : public ThreadPoolItem ~CpBaseAction() override = default; /* -------------------------------------------------------------------------------------------- - * Task completed callback. + * Invoked in main thread by the thread pool after the task was completed. + * If it returns true then it will be put back into the queue to be processed again. + * If the boolean parameter is trye then the thread-pool is in the process of shutting down. */ - void OnCompleted() override + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override { // Is there a callback? if (!mCallback.IsNull()) @@ -58,6 +60,8 @@ struct CpBaseAction : public ThreadPoolItem } // Unlock the session mInstance->mPending = nullptr; + // Don't re-queue + return false; } /* -------------------------------------------------------------------------------------------- diff --git a/module/Library/SQLite.hpp b/module/Library/SQLite.hpp index dc0f8130..04f68b8a 100644 --- a/module/Library/SQLite.hpp +++ b/module/Library/SQLite.hpp @@ -1716,6 +1716,15 @@ public: SQMOD_GET_VALID(*this)->Create(query.mPtr, query.mLen); } + /* -------------------------------------------------------------------------------------------- + * Construct a statement under the specified connection using the specified string. + */ + SQLiteStatement(const SQLiteConnRef & connection, const SQChar * query, SQInteger length) + : m_Handle(new SQLiteStmtHnd(connection)) + { + SQMOD_GET_VALID(*this)->Create(query, length); + } + /* -------------------------------------------------------------------------------------------- * Construct a statement under the specified connection using the specified string. */ @@ -1724,7 +1733,7 @@ public: /* -------------------------------------------------------------------------------------------- * Direct handle constructor. */ - explicit SQLiteStatement(SQLiteStmtRef s) + explicit SQLiteStatement(SQLiteStmtRef s) : m_Handle(std::move(s)) { /* ... */ diff --git a/module/PocoLib/Data.cpp b/module/PocoLib/Data.cpp index 9ed65a07..d1711508 100644 --- a/module/PocoLib/Data.cpp +++ b/module/PocoLib/Data.cpp @@ -71,7 +71,7 @@ static LightObj SQLiteEscapeString(StackStrF & str) // Allocate a memory buffer Buffer b(static_cast< Buffer::SzType >(str.mLen * 2 + 1)); // Attempt to escape the specified string - sqlite3_snprintf(b.Capacity(), b.Get< char >(), "%q", str.mPtr); + sqlite3_snprintf(static_cast< int >(b.Capacity()), b.Get< char >(), "%q", str.mPtr); // Return the resulted string return LightObj(b.Get< SQChar >(), -1); } @@ -95,7 +95,7 @@ static LightObj SQLiteEscapeStringEx(SQChar spec, StackStrF & str) // Allocate a memory buffer Buffer b(static_cast< Buffer::SzType >(str.mLen * 2 + 1)); // Attempt to escape the specified string - sqlite3_snprintf(b.Capacity(), b.Get< char >(), fs, str.mPtr); + sqlite3_snprintf(static_cast< int >(b.Capacity()), b.Get< char >(), fs, str.mPtr); // Return the resulted string return LightObj(b.Get< SQChar >(), -1); } @@ -201,7 +201,7 @@ SqDataStatement SqDataSession::GetStatement(StackStrF & data) // ------------------------------------------------------------------------------------------------ SqDataRecordSet SqDataSession::GetRecordSet(StackStrF & data) { - return SqDataRecordSet(*this, data); + return {*this, data}; } // ------------------------------------------------------------------------------------------------ @@ -502,24 +502,35 @@ LightObj SqDataSessionPool::GetSq() // ------------------------------------------------------------------------------------------------ LightObj SqDataSessionPool::AsyncExec(StackStrF & sql) { - return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, false}; + return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, true, false, false}; } // ------------------------------------------------------------------------------------------------ LightObj SqDataSessionPool::AsyncQuery(StackStrF & sql) { - return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, true}; + return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, false, true, false}; +} + +// ------------------------------------------------------------------------------------------------ +LightObj SqDataSessionPool::IncAsyncQuery(StackStrF & sql) +{ + return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, false, true, true}; +} + +// ------------------------------------------------------------------------------------------------ +LightObj SqDataSessionPool::ExecAsyncQuery(StackStrF & sql) +{ + return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, true, true, false}; } /* ------------------------------------------------------------------------------------------------ -* Common session action implementation. +* Asynchronous SQLite query execution implementation. */ struct SQLiteAsyncExec : public ThreadPoolItem { using SessionRef = Poco::AutoPtr< Poco::Data::SessionImpl >; // -------------------------------------------------------------------------------------------- - SessionRef mSession{}; // The connection that will be used by this task. - sqlite3 * mConnection{nullptr}; // Raw connection handle. + SQLiteConnRef mConnection{}; // Internal connection handle. // -------------------------------------------------------------------------------------------- Function mResolved{}; // Callback to invoke when the task was completed. Function mRejected{}; // Callback to invoke when the task was aborted. @@ -558,8 +569,8 @@ struct SQLiteAsyncExec : public ThreadPoolItem */ SQMOD_NODISCARD bool OnPrepare() override { - // Coincidentally, this also dirties the handle time-stamp so it doesn't get collected - return mSession->isConnected(); + // Coincidentally, this also dirties the handle time-stamp so, it doesn't get collected + return mConnection->mSession->isConnected(); } /* -------------------------------------------------------------------------------------------- @@ -570,11 +581,11 @@ struct SQLiteAsyncExec : public ThreadPoolItem { char * err_msg = nullptr; // Attempt to execute the specified query - mResult = sqlite3_exec(mConnection, mQueryStr, nullptr, nullptr, &err_msg); + mResult = sqlite3_exec(mConnection->Access(), mQueryStr, nullptr, nullptr, &err_msg); // Store changes count if (mResult == SQLITE_OK) { - mChanges = sqlite3_changes(mConnection); + mChanges = sqlite3_changes(mConnection->Access()); } // Check for error message if (err_msg != nullptr) @@ -588,25 +599,26 @@ struct SQLiteAsyncExec : public ThreadPoolItem /* -------------------------------------------------------------------------------------------- * Invoked in main thread by the thread pool after the task was completed. + * If it returns true then it will be put back into the queue to be processed again. + * If the boolean parameter is try then the thread-pool is in the process of shutting down. */ - void OnCompleted() override + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override { if (mResult == SQLITE_OK) { if (!mResolved.IsNull()) { - SQLiteConnRef conn_ref{new SQLiteConnHnd(std::move(mSession))}; - LightObj connection{SqTypeIdentity< SQLiteConnection >{}, SqVM(), conn_ref}; - mResolved.Execute(connection, mChanges, mQueryObj); + LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection}; + mResolved.Execute(c, mChanges, mQueryObj); } } else if (!mRejected.IsNull()) { - SQLiteConnRef conn_ref{new SQLiteConnHnd(std::move(mSession))}; - LightObj connection{SqTypeIdentity< SQLiteConnection >{}, SqVM(), conn_ref}; - mRejected.Execute(connection, mResult, mError, mQueryObj); - + LightObj c{SqTypeIdentity< SQLiteConnection >{}, SqVM(), mConnection}; + mRejected.Execute(c, mResult, mError, mQueryObj); } + // Finished + return false; } /* -------------------------------------------------------------------------------------------- @@ -619,12 +631,228 @@ struct SQLiteAsyncExec : public ThreadPoolItem } }; +/* ------------------------------------------------------------------------------------------------ +* Asynchronous SQLite statement implementation. +*/ +struct SQLiteAsyncStmtBase : public ThreadPoolItem +{ + using SessionRef = Poco::AutoPtr< Poco::Data::SessionImpl >; + // -------------------------------------------------------------------------------------------- + SQLiteConnRef mConnection{}; // Internal connection handle. + SQLiteStmtRef mStatement{}; // Internal statement handle. + // -------------------------------------------------------------------------------------------- + Function mResolved{}; // Callback to invoke when the task was completed. + Function mRejected{}; // Callback to invoke when the task was aborted. + // -------------------------------------------------------------------------------------------- + SQLiteStatement * mStatementPtr{nullptr}; // Pointer to the script statement instance. + LightObj mStatementObj{}; // Strong reference to the statement instance object. + // -------------------------------------------------------------------------------------------- + const SQChar * mQueryStr{nullptr}; // The query string that will be executed. + LightObj mQueryObj{}; // Strong reference to the query string object. + /* -------------------------------------------------------------------------------------------- + * Base constructor. Members are supposed to be validated and filled by the builder/proxy. + */ + SQLiteAsyncStmtBase() = default; + + /* -------------------------------------------------------------------------------------------- + * Destructor. + */ + ~SQLiteAsyncStmtBase() override = default; + + /* -------------------------------------------------------------------------------------------- + * Provide a name to what type of task this is. Mainly for debugging purposes. + */ + SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async query"; } + + /* -------------------------------------------------------------------------------------------- + * Provide unique information that may help identify the task. Mainly for debugging purposes. + */ + SQMOD_NODISCARD const char * IdentifiableInfo() noexcept override { return mQueryStr; } + + /* -------------------------------------------------------------------------------------------- + * Invoked in worker thread by the thread pool after obtaining the task from the queue. + * Must return true to indicate that the task can be performed. False indicates failure. + */ + SQMOD_NODISCARD bool OnPrepare() override + { + // Coincidentally, this also dirties the handle time-stamp so, it doesn't get collected + if (mConnection->mSession->isConnected()) + { + if (mStatement->mPtr != nullptr) + { + mStatement->Create(mQueryStr, static_cast< SQInteger >(strlen(mQueryStr))); + } + // Prepared + return true; + } + // Can't prepare + return false; + } + + /* -------------------------------------------------------------------------------------------- + * Called in worker by the thread pool to let the task know that it will be aborted. + * Most likely due to a shutdown of the thread pool. + */ + void OnAborted(bool SQ_UNUSED_ARG(retry)) override + { + // We don't really have to do anything for now + } +}; + +/* ------------------------------------------------------------------------------------------------ +* Asynchronous SQLite statement execution implementation. +*/ +struct SQLiteAsyncStmtExec : public SQLiteAsyncStmtBase +{ + int32_t mChanges{0}; // Rows affected by this query. + /* -------------------------------------------------------------------------------------------- + * Called in worker by the thread pool to performed by the associated tasks. + * Will be called continuously while the returned value is true. While false means it finished. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mChanges = mStatementPtr->Exec(); + // Don't retry + return false; + }; + + /* -------------------------------------------------------------------------------------------- + * Invoked in main thread by the thread pool after the task was completed. + * If it returns true then it will be put back into the queue to be processed again. + * If the boolean parameter is try then the thread-pool is in the process of shutting down. + */ + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override + { + if (mStatement->mDone) + { + if (!mResolved.IsNull()) + { + mResolved.Execute(mStatementObj, mChanges); + } + } + else if (!mRejected.IsNull()) + { + mRejected.Execute(mStatementObj, mQueryObj); + } + // Finished + return false; + } +}; + +/* ------------------------------------------------------------------------------------------------ +* Asynchronous SQLite statement stepping implementation. +*/ +struct SQLiteAsyncStmtStep : public SQLiteAsyncStmtBase +{ + bool mRow{false}; // Whether we still have rows to process. + /* -------------------------------------------------------------------------------------------- + * Called in worker by the thread pool to performed by the associated tasks. + * Will be called continuously while the returned value is true. While false means it finished. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mRow = mStatementPtr->Step(); + // Don't retry + return false; + }; + + /* -------------------------------------------------------------------------------------------- + * Invoked in main thread by the thread pool after the task was completed. + * If it returns true then it will be put back into the queue to be processed again. + * If the boolean parameter is try then the thread-pool is in the process of shutting down. + */ + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override + { + if (mStatement->mGood || mStatement->mDone) + { + if (!mResolved.IsNull()) + { + // You are expected to step the statement manually until the end + mResolved.Execute(mStatementObj); + } + } + else if (!mRejected.IsNull()) + { + mRejected.Execute(mStatementObj, mQueryObj); + } + // Finished + return false; + } + +}; + +/* ------------------------------------------------------------------------------------------------ +* Asynchronous SQLite incremental statement stepping implementation. +*/ +struct SQLiteAsyncStmtIncStep : public SQLiteAsyncStmtBase +{ + + bool mRow{false}; // Whether we still have rows to process. + /* -------------------------------------------------------------------------------------------- + * Called in worker by the thread pool to performed by the associated tasks. + * Will be called continuously while the returned value is true. While false means it finished. + */ + SQMOD_NODISCARD bool OnProcess() override + { + mRow = mStatementPtr->Step(); + // Don't retry + return false; + }; + + /* -------------------------------------------------------------------------------------------- + * Invoked in main thread by the thread pool after the task was completed. + * If it returns true then it will be put back into the queue to be processed again. + * If the boolean parameter is try then the thread-pool is in the process of shutting down. + */ + SQMOD_NODISCARD bool OnCompleted(bool SQ_UNUSED_ARG(stop)) override + { + if (mStatement->mGood || mStatement->mDone) + { + if (!mResolved.IsNull()) + { + // Should all steps be completed here? + if (stop) + { + do { + LightObj o = mResolved.Eval(mStatementObj); + // Should we abort the whole thing? + if (!o.IsNull() && !o.Cast< bool >()) + { + return false; + } + // Don't let exceptions be stupid + mRow = false; + // Force process whole statement + if (mStatement->mGood) OnProcess(); + } while (mRow); + } + else + { + LightObj o = mResolved.Eval(mStatementObj); + // Should we abort the whole thing? + if (!o.IsNull() && !o.Cast< bool >()) + { + return false; + } + } + } + } + else if (!mRejected.IsNull()) + { + mRejected.Execute(mStatementObj, mQueryObj); + } + // Re-queue if we still have rows to process + return mRow; + } + +}; + // ------------------------------------------------------------------------------------------------ -SqDataAsyncBuilder::SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackStrF & sql, bool stmt) noexcept +SqDataAsyncBuilder::SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackStrF & sql, bool exec, bool stmt, bool inc) noexcept : mSession(session, true) , mResolved(), mRejected() , mQueryStr(sql.mPtr), mQueryObj(sql.mObj) - , mStmt(stmt) + , mExec(exec), mStmt(stmt), mInc(inc) { } @@ -644,7 +872,34 @@ void SqDataAsyncBuilder::Submit() // Is this a statement? if (mStmt) { - //... + SQLiteAsyncStmtBase * item = nullptr; + // Is this just for executing? + if (mExec) + { + item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtExec()); + } + // Is this incremental? + if (mInc) + { + item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtIncStep()); + } + else + { + item = static_cast< SQLiteAsyncStmtBase * >(new SQLiteAsyncStmtStep()); + } + // Take ownership before any exception can be thrown + std::unique_ptr< ThreadPoolItem > task{static_cast< ThreadPoolItem * >(item)}; + // Populate task information + item->mConnection = SQLiteConnRef{new SQLiteConnHnd(std::move(mSession))}; + item->mStatement = SQLiteStmtRef{new SQLiteStmtHnd(item->mConnection)}; + item->mResolved = std::move(mResolved); + item->mRejected = std::move(mRejected); + item->mStatementObj = LightObj{SqTypeIdentity< SQLiteStatement >{}, SqVM(), item->mStatement}; + item->mStatementPtr = item->mStatementObj.CastI< SQLiteStatement >(); + item->mQueryStr = mQueryStr; + item->mQueryObj = std::move(mQueryObj); + // Submit the task + ThreadPool::Get().Enqueue(std::move(task)); } else { @@ -652,12 +907,11 @@ void SqDataAsyncBuilder::Submit() // Take ownership before any exception can be thrown std::unique_ptr< ThreadPoolItem > task{static_cast< ThreadPoolItem * >(item)}; // Populate task information - item->mConnection = connection; + item->mConnection = SQLiteConnRef{new SQLiteConnHnd(std::move(mSession))}; item->mResolved = std::move(mResolved); item->mRejected = std::move(mRejected); item->mQueryStr = mQueryStr; item->mQueryObj = std::move(mQueryObj); - item->mSession = std::move(mSession); // Submit the task ThreadPool::Get().Enqueue(std::move(task)); } @@ -985,8 +1239,10 @@ void Register_POCO_Data(HSQUIRRELVM vm, Table &) // Member Methods .Func(_SC("Get"), &SqDataSessionPool::Get) .Func(_SC("GetSq"), &SqDataSessionPool::GetSq) - .Func(_SC("AsyncExec"), &SqDataSessionPool::AsyncExec) - .Func(_SC("AsyncQuery"), &SqDataSessionPool::AsyncQuery) + .FmtFunc(_SC("AsyncExec"), &SqDataSessionPool::AsyncExec) + .FmtFunc(_SC("AsyncQuery"), &SqDataSessionPool::AsyncQuery) + .FmtFunc(_SC("IncAsyncQuery"), &SqDataSessionPool::IncAsyncQuery) + .FmtFunc(_SC("ExecAsyncQuery"), &SqDataSessionPool::ExecAsyncQuery) .FmtFunc(_SC("GetWithProperty"), &SqDataSessionPool::GetWithProperty) .FmtFunc(_SC("GetWithFeature"), &SqDataSessionPool::GetWithFeature) .FmtFunc(_SC("SetFeature"), &SqDataSessionPool::SetFeature) diff --git a/module/PocoLib/Data.hpp b/module/PocoLib/Data.hpp index a2a06105..6772af0e 100644 --- a/module/PocoLib/Data.hpp +++ b/module/PocoLib/Data.hpp @@ -1772,15 +1772,25 @@ struct SqDataSessionPool : public SessionPool LightObj GetSq(); /* -------------------------------------------------------------------------------------------- - * Create an asynchronus query execution builder. + * Create an asynchronous query execution builder. */ LightObj AsyncExec(StackStrF & sql); /* -------------------------------------------------------------------------------------------- - * Create an asynchronus query execution builder. + * Create an asynchronous query execution builder. */ LightObj AsyncQuery(StackStrF & sql); + /* -------------------------------------------------------------------------------------------- + * Create an asynchronous query execution builder. + */ + LightObj IncAsyncQuery(StackStrF & sql); + + /* -------------------------------------------------------------------------------------------- + * Create an asynchronous query execution builder. + */ + LightObj ExecAsyncQuery(StackStrF & sql); + /* -------------------------------------------------------------------------------------------- * Retrieve a Session with requested property set. */ @@ -2100,12 +2110,14 @@ struct SqDataAsyncBuilder const SQChar * mQueryStr{nullptr}; // The query string that will be executed. LightObj mQueryObj{}; // Strong reference to the query string object. // -------------------------------------------------------------------------------------------- - bool mStmt{false}; // Whether this is a query statement or a simple query execution. + bool mExec{false}; // Whether this is a query execution. + bool mStmt{false}; // Whether this is a query statement. + bool mInc{false}; // Whether this is an incremental query statement. /* -------------------------------------------------------------------------------------------- * Default constructor. */ - SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackStrF & sql, bool stmt) noexcept; + SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackStrF & sql, bool exec, bool stmt, bool inc) noexcept; /* -------------------------------------------------------------------------------------------- * Copy constructor. (disabled)