mirror of
https://github.com/VCMP-SqMod/SqMod.git
synced 2025-06-15 22:57:12 +02:00
More work on async sql.
Current implementation only provides execution for sqlite queries in worker threads. Statements and MySQL is on the TODO list.
This commit is contained in:
@ -1,5 +1,8 @@
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
#include "PocoLib/Data.hpp"
|
||||
#include "Core/ThreadPool.hpp"
|
||||
#include "Library/SQLite.hpp"
|
||||
#include "Library/MySQL.hpp"
|
||||
#include "Poco/Data/SessionImpl.h"
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
@ -34,6 +37,7 @@ SQMOD_DECL_TYPENAME(SqPcDataStatement, _SC("SqDataStatement"))
|
||||
SQMOD_DECL_TYPENAME(SqPcDataRecordSet, _SC("SqDataRecordSet"))
|
||||
SQMOD_DECL_TYPENAME(SqPcDataTransaction, _SC("SqDataTransaction"))
|
||||
SQMOD_DECL_TYPENAME(SqPcDataSessionPool, _SC("SqDataSessionPool"))
|
||||
SQMOD_DECL_TYPENAME(SqPcSqDataAsyncBuilder, _SC("SqSqDataAsyncBuilder"))
|
||||
SQMOD_DECL_TYPENAME(SqPcDataStatementResult, _SC("SqDataStatementResult"))
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
@ -472,8 +476,8 @@ SqDataStatement & SqDataStatement::Into_(LightObj & obj, LightObj & def)
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
extern LightObj GteSQLiteFromSession(Poco::Data::SessionImpl * session);
|
||||
extern LightObj GteMySQLFromSession(Poco::Data::SessionImpl * session);
|
||||
extern LightObj GetSQLiteFromSession(Poco::Data::SessionImpl * session);
|
||||
extern LightObj GetMySQLFromSession(Poco::Data::SessionImpl * session);
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
LightObj SqDataSessionPool::GetSq()
|
||||
@ -484,17 +488,202 @@ LightObj SqDataSessionPool::GetSq()
|
||||
// Is this a SQLite session?
|
||||
if (connector == "sqlite")
|
||||
{
|
||||
return GteSQLiteFromSession(session_impl);
|
||||
return GetSQLiteFromSession(session_impl);
|
||||
}
|
||||
// Is this a MySQL session?
|
||||
else if (connector == "mysql")
|
||||
{
|
||||
return GteMySQLFromSession(session_impl);
|
||||
return GetMySQLFromSession(session_impl);
|
||||
}
|
||||
STHROWF("Unknown connector type {}", connector);
|
||||
SQ_UNREACHABLE
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
LightObj SqDataSessionPool::AsyncExec(StackStrF & sql)
|
||||
{
|
||||
return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, false};
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
LightObj SqDataSessionPool::AsyncQuery(StackStrF & sql)
|
||||
{
|
||||
return LightObj{SqTypeIdentity< SqDataAsyncBuilder >{}, SqVM(), get().impl(), sql, true};
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------------------------------------
|
||||
* Common session action implementation.
|
||||
*/
|
||||
struct SQLiteAsyncExec : public ThreadPoolItem
|
||||
{
|
||||
using SessionRef = Poco::AutoPtr< Poco::Data::SessionImpl >;
|
||||
// --------------------------------------------------------------------------------------------
|
||||
SessionRef mSession{}; // The connection that will be used by this task.
|
||||
sqlite3 * mConnection{nullptr}; // Raw connection handle.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
Function mResolved{}; // Callback to invoke when the task was completed.
|
||||
Function mRejected{}; // Callback to invoke when the task was aborted.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
int32_t mResult{SQLITE_OK}; // Execution result code.
|
||||
int32_t mChanges{0}; // Rows affected by this query.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
const SQChar * mQueryStr{nullptr}; // The query string that will be executed.
|
||||
LightObj mQueryObj{}; // Strong reference to the query string object.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
String mError{}; // Error message, if any.
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Base constructor. Members are supposed to be validated and filled by the builder/proxy.
|
||||
*/
|
||||
SQLiteAsyncExec() = default;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Destructor.
|
||||
*/
|
||||
~SQLiteAsyncExec() override = default;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Provide a name to what type of task this is. Mainly for debugging purposes.
|
||||
*/
|
||||
SQMOD_NODISCARD const char * TypeName() noexcept override { return "sqlite async execute"; }
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Provide unique information that may help identify the task. Mainly for debugging purposes.
|
||||
*/
|
||||
SQMOD_NODISCARD const char * IdentifiableInfo() noexcept override { return mQueryStr; }
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Invoked in worker thread by the thread pool after obtaining the task from the queue.
|
||||
* Must return true to indicate that the task can be performed. False indicates failure.
|
||||
*/
|
||||
SQMOD_NODISCARD bool OnPrepare() override
|
||||
{
|
||||
// Coincidentally, this also dirties the handle time-stamp so it doesn't get collected
|
||||
return mSession->isConnected();
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Called in worker by the thread pool to performed by the associated tasks.
|
||||
* Will be called continuously while the returned value is true. While false means it finished.
|
||||
*/
|
||||
SQMOD_NODISCARD bool OnProcess() override
|
||||
{
|
||||
OutputError("query process begin");
|
||||
char * err_msg = nullptr;
|
||||
// Attempt to execute the specified query
|
||||
mResult = sqlite3_exec(mConnection, mQueryStr, nullptr, nullptr, &err_msg);
|
||||
OutputError("query process check");
|
||||
// Store changes count
|
||||
if (mResult == SQLITE_OK)
|
||||
{
|
||||
mChanges = sqlite3_changes(mConnection);
|
||||
}
|
||||
OutputError("query process remember");
|
||||
// Check for error message
|
||||
if (err_msg != nullptr)
|
||||
{
|
||||
mError.assign(err_msg);
|
||||
sqlite3_free(err_msg);
|
||||
}
|
||||
OutputError("query process completed");
|
||||
// Don't retry
|
||||
return false;
|
||||
};
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Invoked in main thread by the thread pool after the task was completed.
|
||||
*/
|
||||
void OnCompleted() override
|
||||
{
|
||||
if (mResult == SQLITE_OK)
|
||||
{
|
||||
OutputError("result was ok");
|
||||
if (!mResolved.IsNull())
|
||||
{
|
||||
SQLiteConnRef conn_ref{new SQLiteConnHnd(std::move(mSession))};
|
||||
LightObj connection{SqTypeIdentity< SQLiteConnection >{}, SqVM(), conn_ref};
|
||||
mResolved.Execute(connection, mChanges, mQueryObj);
|
||||
}
|
||||
}
|
||||
else if (!mRejected.IsNull())
|
||||
{
|
||||
OutputError("result was bad");
|
||||
SQLiteConnRef conn_ref{new SQLiteConnHnd(std::move(mSession))};
|
||||
OutputError("handle created");
|
||||
LightObj connection{SqTypeIdentity< SQLiteConnection >{}, SqVM(), conn_ref};
|
||||
OutputError("object is ready");
|
||||
mRejected.Execute(connection, mResult, mError, mQueryObj);
|
||||
OutputError("callback finished");
|
||||
}
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Called in worker by the thread pool to let the task know that it will be aborted.
|
||||
* Most likely due to a shutdown of the thread pool.
|
||||
*/
|
||||
void OnAborted(bool SQ_UNUSED_ARG(retry)) override
|
||||
{
|
||||
// We don't really have to do anything for now
|
||||
}
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
SqDataAsyncBuilder::SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackStrF & sql, bool stmt) noexcept
|
||||
: mSession(session, true)
|
||||
, mResolved(), mRejected()
|
||||
, mQueryStr(sql.mPtr), mQueryObj(sql.mObj)
|
||||
, mStmt(stmt)
|
||||
{
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
void SqDataAsyncBuilder::Submit()
|
||||
{
|
||||
auto & connector = mSession->connectorName();
|
||||
// Is this a SQLite session?
|
||||
if (connector == "sqlite")
|
||||
{
|
||||
// Retrieve the internal handle property
|
||||
auto * connection = Poco::AnyCast< sqlite3 * >(mSession->getProperty("handle"));
|
||||
// Is this a statement?
|
||||
if (mStmt)
|
||||
{
|
||||
//...
|
||||
}
|
||||
else
|
||||
{
|
||||
auto * item = new SQLiteAsyncExec();
|
||||
// Take ownership before any exception can be thrown
|
||||
std::unique_ptr< ThreadPoolItem > task{static_cast< ThreadPoolItem * >(item)};
|
||||
// Populate task information
|
||||
item->mConnection = connection;
|
||||
item->mResolved = std::move(mResolved);
|
||||
item->mRejected = std::move(mRejected);
|
||||
item->mQueryStr = mQueryStr;
|
||||
item->mQueryObj = std::move(mQueryObj);
|
||||
item->mSession = std::move(mSession);
|
||||
// Submit the task
|
||||
ThreadPool::Get().Enqueue(std::move(task));
|
||||
}
|
||||
}
|
||||
// Is this a MySQL session?
|
||||
else if (connector == "mysql")
|
||||
{
|
||||
if (mStmt)
|
||||
{
|
||||
//...
|
||||
}
|
||||
else
|
||||
{
|
||||
//...
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
STHROWF("Unknown connector type {}", connector);
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
LightObj SqDataSessionPool::GetProperty(StackStrF & name)
|
||||
{
|
||||
@ -771,6 +960,16 @@ void Register_POCO_Data(HSQUIRRELVM vm, Table &)
|
||||
.Overload(_SC("Value"), &SqDataRecordSet::GetValueOr)
|
||||
);
|
||||
// --------------------------------------------------------------------------------------------
|
||||
ns.Bind(_SC("AsyncBuilder"),
|
||||
Class< SqDataAsyncBuilder, NoConstructor< SqDataAsyncBuilder > >(vm, SqPcSqDataAsyncBuilder::Str)
|
||||
// Meta-methods
|
||||
.SquirrelFunc(_SC("_typename"), &SqPcSqDataAsyncBuilder::Fn)
|
||||
// Member Methods
|
||||
.Func(_SC("Submit"), &SqDataAsyncBuilder::Submit)
|
||||
.CbFunc(_SC("Resolved"), &SqDataAsyncBuilder::OnResolved)
|
||||
.CbFunc(_SC("Rejected"), &SqDataAsyncBuilder::OnRejected)
|
||||
);
|
||||
// --------------------------------------------------------------------------------------------
|
||||
ns.Bind(_SC("SessionPool"),
|
||||
Class< SqDataSessionPool, NoCopy< SqDataSessionPool > >(vm, SqPcDataSessionPool::Str)
|
||||
// Constructors
|
||||
@ -790,6 +989,8 @@ void Register_POCO_Data(HSQUIRRELVM vm, Table &)
|
||||
// Member Methods
|
||||
.Func(_SC("Get"), &SqDataSessionPool::Get)
|
||||
.Func(_SC("GetSq"), &SqDataSessionPool::GetSq)
|
||||
.Func(_SC("AsyncExec"), &SqDataSessionPool::AsyncExec)
|
||||
.Func(_SC("AsyncQuery"), &SqDataSessionPool::AsyncQuery)
|
||||
.FmtFunc(_SC("GetWithProperty"), &SqDataSessionPool::GetWithProperty)
|
||||
.FmtFunc(_SC("GetWithFeature"), &SqDataSessionPool::GetWithFeature)
|
||||
.FmtFunc(_SC("SetFeature"), &SqDataSessionPool::SetFeature)
|
||||
|
@ -1771,6 +1771,16 @@ struct SqDataSessionPool : public SessionPool
|
||||
*/
|
||||
LightObj GetSq();
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Create an asynchronus query execution builder.
|
||||
*/
|
||||
LightObj AsyncExec(StackStrF & sql);
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Create an asynchronus query execution builder.
|
||||
*/
|
||||
LightObj AsyncQuery(StackStrF & sql);
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Retrieve a Session with requested property set.
|
||||
*/
|
||||
@ -2075,4 +2085,75 @@ struct SqDataTransaction : public Transaction
|
||||
}
|
||||
};
|
||||
|
||||
/* ------------------------------------------------------------------------------------------------
|
||||
* Common session action implementation.
|
||||
*/
|
||||
struct SqDataAsyncBuilder
|
||||
{
|
||||
using SessionRef = Poco::AutoPtr< Poco::Data::SessionImpl >;
|
||||
// --------------------------------------------------------------------------------------------
|
||||
SessionRef mSession{}; // The connection that will be used by the task.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
Function mResolved{}; // Callback to invoke when the task was completed.
|
||||
Function mRejected{}; // Callback to invoke when the task was aborted.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
const SQChar * mQueryStr{nullptr}; // The query string that will be executed.
|
||||
LightObj mQueryObj{}; // Strong reference to the query string object.
|
||||
// --------------------------------------------------------------------------------------------
|
||||
bool mStmt{false}; // Whether this is a query statement or a simple query execution.
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Default constructor.
|
||||
*/
|
||||
SqDataAsyncBuilder(Poco::Data::SessionImpl * session, StackStrF & sql, bool stmt) noexcept;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Copy constructor. (disabled)
|
||||
*/
|
||||
SqDataAsyncBuilder(const SqDataAsyncBuilder & o) = delete;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Move constructor.
|
||||
*/
|
||||
SqDataAsyncBuilder(SqDataAsyncBuilder && o) = default;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Destructor.
|
||||
*/
|
||||
~SqDataAsyncBuilder() = default;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Copy assignment operator. (disabled)
|
||||
*/
|
||||
SqDataAsyncBuilder & operator = (const SqDataAsyncBuilder & o) = delete;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Move assignment operator.
|
||||
*/
|
||||
SqDataAsyncBuilder & operator = (SqDataAsyncBuilder && o) = default;
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Create the task with the suplied information and submit it to the worker pool.
|
||||
*/
|
||||
void Submit();
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Set the callback to be executed if the query was resolved.
|
||||
*/
|
||||
SqDataAsyncBuilder & OnResolved(Function & cb)
|
||||
{
|
||||
mResolved = std::move(cb);
|
||||
return *this; // Allow chaining
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------------------------
|
||||
* Set the callback to be executed if the query was rejected/failed.
|
||||
*/
|
||||
SqDataAsyncBuilder & OnRejected(Function & cb)
|
||||
{
|
||||
mRejected = std::move(cb);
|
||||
return *this; // Allow chaining
|
||||
}
|
||||
};
|
||||
|
||||
} // Namespace:: SqMod
|
||||
|
Reference in New Issue
Block a user