1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-01-18 19:47:15 +01:00

Implement multi-part messages for ZMQ.

This commit is contained in:
Sandu Liviu Catalin 2021-02-04 07:55:10 +02:00
parent 5d63520c16
commit 89d90971fd
2 changed files with 329 additions and 70 deletions

View File

@ -11,30 +11,109 @@ namespace SqMod {
SQMOD_DECL_TYPENAME(SqZContext, _SC("SqZmqContext")) SQMOD_DECL_TYPENAME(SqZContext, _SC("SqZmqContext"))
SQMOD_DECL_TYPENAME(SqZSocket, _SC("SqZmqSocket")) SQMOD_DECL_TYPENAME(SqZSocket, _SC("SqZmqSocket"))
// ------------------------------------------------------------------------------------------------
static void FlushSingleString(HSQUIRRELVM, Function & callback, Buffer & data)
{
// Transform the message into a script object
LightObj o(static_cast< const SQChar * >(data.Get()),
static_cast< SQInteger >(data.Position()));
// Forward it to the callback
callback(o, false);
}
// ------------------------------------------------------------------------------------------------
static void FlushSingleBuffer(HSQUIRRELVM vm, Function & callback, Buffer & data)
{
// Transform the message into a script object
LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(data));
// Forward it to the callback
callback(o, false);
}
// ------------------------------------------------------------------------------------------------
static void FlushMultiString(HSQUIRRELVM vm, Function & callback, ZMsg::List & list)
{
// Create a script array
Array a(vm);
// Reserve space upfront
a.Reserve(static_cast< SQInteger >(list.size()));
// Populate the array with elements from the list
a.AppendFromCounted([&list](HSQUIRRELVM vm, SQInteger i) -> bool {
// Are we still withing range of our list?
if (static_cast< size_t >(i) < list.size())
{
// Transform the message into a script object
sq_pushstring(vm, static_cast< const SQChar * >(list[i].Get()),
static_cast< SQInteger >(list[i].Position()));
// We have an element on the stack
return true;
}
// We don't have an element on the stack
return false;
});
// Forward it to the callback
callback(a, true);
}
// ------------------------------------------------------------------------------------------------
static void FlushMultiBuffer(HSQUIRRELVM vm, Function & callback, ZMsg::List & list)
{
// Create a script array
Array a(vm);
// Reserve space upfront
a.Reserve(static_cast< SQInteger >(list.size()));
// Populate the array with elements from the list
a.AppendFromCounted([&list](HSQUIRRELVM vm, SQInteger i) -> bool {
// Are we still withing range of our list?
if (static_cast< size_t >(i) < list.size())
{
// Transform the message into a script object
LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(list[i]));
// Push it on the stack
Var< LightObj >::push(vm, o);
// We have an element on the stack
return true;
}
// We don't have an element on the stack
return false;
});
// Forward it to the callback
callback(a, true);
}
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
void ZSkt::Flush(HSQUIRRELVM vm) void ZSkt::Flush(HSQUIRRELVM vm)
{ {
// Need someone to receive the message // Need someone to receive the message
Item data; Item item;
// Try to get a message from the queue // Try to get a message from the queue
while (mOutputQueue.try_dequeue(data)) while (mOutputQueue.try_dequeue(item))
{ {
// Is there a callback to receive the message? // Is there a callback to receive the message?
if (!mOnData.IsNull()) if (!mOnData.IsNull())
{ {
// Transform the message into a script object // Is this a multi-part message?
if (mStringMessages) if (!(item->mMulti))
{ {
LightObj o(static_cast< const SQChar * >(data->Get()), if (mStringMessages)
static_cast< SQInteger >(data->Size< SQChar >())); {
// Forward it to the callback FlushSingleString(vm, mOnData, item->mBuff);
mOnData(o); }
else
{
FlushSingleBuffer(vm, mOnData, item->mBuff);
}
} }
else else
{ {
LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(*data)); if (mStringMessages)
// Forward it to the callback {
mOnData(o); FlushMultiString(vm, mOnData, item->mList);
}
else
{
FlushMultiBuffer(vm, mOnData, item->mList);
}
} }
} }
} }

View File

@ -112,6 +112,135 @@ struct ZCtx
operator void * () const noexcept { return mPtr; } // NOLINT(google-explicit-constructor) operator void * () const noexcept { return mPtr; } // NOLINT(google-explicit-constructor)
}; };
/* ------------------------------------------------------------------------------------------------
* Core implementation and management for a message.
*/
struct ZMsg
{
/* --------------------------------------------------------------------------------------------
* List of messages.
*/
using List = std::vector< Buffer >;
/* --------------------------------------------------------------------------------------------
* Raw union data size.
*/
static constexpr size_t SIZE = sizeof(Buffer) > sizeof(List) ? sizeof(Buffer) : sizeof(List);
/* --------------------------------------------------------------------------------------------
* Tag used to indicate a multi-part message.
*/
struct Multipart { };
/* --------------------------------------------------------------------------------------------
* Message contents.
*/
union
{
Buffer mBuff;
List mList;
uint8_t mData[SIZE];
};
/* --------------------------------------------------------------------------------------------
* Whether this is a multi-part-message.
*/
const bool mMulti;
/* --------------------------------------------------------------------------------------------
* Default constructor.
*/
ZMsg() // NOLINT(cppcoreguidelines-pro-type-member-init)
: mBuff(), mMulti(false)
{
}
/* --------------------------------------------------------------------------------------------
* Default multi-part constructor.
*/
explicit ZMsg(Multipart) // NOLINT(cppcoreguidelines-pro-type-member-init)
: mList(), mMulti(true)
{
}
/* --------------------------------------------------------------------------------------------
* Copy constructor.
*/
explicit ZMsg(const Buffer & b) // NOLINT(cppcoreguidelines-pro-type-member-init,modernize-pass-by-value)
: mBuff(b), mMulti(false)
{
}
/* --------------------------------------------------------------------------------------------
* Copy multi-part constructor.
*/
explicit ZMsg(const List & l) // NOLINT(cppcoreguidelines-pro-type-member-init,modernize-pass-by-value)
: mList(l), mMulti(true)
{
}
/* --------------------------------------------------------------------------------------------
* Move constructor.
*/
explicit ZMsg(Buffer && b) // NOLINT(cppcoreguidelines-pro-type-member-init)
: mBuff(std::move(b)), mMulti(false)
{
}
/* --------------------------------------------------------------------------------------------
* Move multi-part constructor.
*/
explicit ZMsg(List && l) // NOLINT(cppcoreguidelines-pro-type-member-init)
: mList(std::move(l)), mMulti(true)
{
}
/* --------------------------------------------------------------------------------------------
* Copy constructor (disabled).
*/
ZMsg(const ZMsg &) = delete;
/* --------------------------------------------------------------------------------------------
* Move constructor (disabled).
*/
ZMsg(ZMsg &&) noexcept = delete;
/* --------------------------------------------------------------------------------------------
* Destructor.
*/
~ZMsg()
{
// Is this multi-part?
if (mMulti)
{
mList.~List(); // Invoke list destructor
}
else
{
mBuff.~Buffer(); // Invoke buffer destructor
}
}
/* --------------------------------------------------------------------------------------------
* Assignment operator (disabled).
*/
ZMsg & operator = (const ZMsg &) = delete;
/* --------------------------------------------------------------------------------------------
* Move assignment (disabled).
*/
ZMsg & operator = (ZMsg &&) noexcept = delete;
/* --------------------------------------------------------------------------------------------
* Push the specified message to the multi-part list.
*/
void Push(zmq_msg_t & msg)
{
mList.emplace_back(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)),
static_cast< Buffer::SzType >(zmq_msg_size(&msg)));
}
};
/* ------------------------------------------------------------------------------------------------ /* ------------------------------------------------------------------------------------------------
* Core implementation and management for a ZMQ socket. * Core implementation and management for a ZMQ socket.
*/ */
@ -125,28 +254,13 @@ struct ZSkt : SqChainedInstances< ZSkt >
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Message queue type. * Message queue type.
*/ */
using Item = std::unique_ptr< Buffer >; using Item = std::unique_ptr< ZMsg >;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Message queue type. * Message queue type.
*/ */
using Queue = moodycamel::ConcurrentQueue< Item >; using Queue = moodycamel::ConcurrentQueue< Item >;
/* --------------------------------------------------------------------------------------------
* List of messages.
*/
using List = std::vector< Buffer >;
/* --------------------------------------------------------------------------------------------
* Message list item.
*/
using ListItem = std::unique_ptr< List >;
/* --------------------------------------------------------------------------------------------
* Message list queue type.
*/
using ListQueue = moodycamel::ConcurrentQueue< ListItem >;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Context pointer. * Context pointer.
*/ */
@ -182,11 +296,6 @@ struct ZSkt : SqChainedInstances< ZSkt >
*/ */
Queue mInputQueue; Queue mInputQueue;
/* --------------------------------------------------------------------------------------------
* Multi-part messages to be sent through the socket.
*/
ListQueue mInputListQueue;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Message received callback. * Message received callback.
*/ */
@ -208,7 +317,7 @@ struct ZSkt : SqChainedInstances< ZSkt >
ZSkt(const ZCtx::Ptr & ctx, int type) ZSkt(const ZCtx::Ptr & ctx, int type)
: SqChainedInstances< ZSkt >() : SqChainedInstances< ZSkt >()
, mPtr(nullptr), mRun(true), mStringMessages(true), mType(type), mMtx() , mPtr(nullptr), mRun(true), mStringMessages(true), mType(type), mMtx()
, mOutputQueue(4096), mInputQueue(4096), mInputListQueue(1024) , mOutputQueue(4096), mInputQueue(4096)
, mOnData(), mThread(), mContext(ctx) , mOnData(), mThread(), mContext(ctx)
{ {
// Validate the context // Validate the context
@ -317,7 +426,7 @@ struct ZSkt : SqChainedInstances< ZSkt >
// Acquire exclusive access to the socket // Acquire exclusive access to the socket
mMtx.lock(); mMtx.lock();
// Perform tasks until there's none left // Perform tasks until there's none left
if (!Recv() && !Send() && !SendMore()) if (!Recv() && !Send())
{ {
// Release exclusive access to the socket // Release exclusive access to the socket
mMtx.unlock(); mMtx.unlock();
@ -380,7 +489,7 @@ struct ZSkt : SqChainedInstances< ZSkt >
*/ */
void Send(const Buffer & data) void Send(const Buffer & data)
{ {
mInputQueue.enqueue(std::make_unique< Buffer >(data)); mInputQueue.enqueue(std::make_unique< ZMsg >(data));
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -388,15 +497,23 @@ struct ZSkt : SqChainedInstances< ZSkt >
*/ */
void Send(Buffer && data) void Send(Buffer && data)
{ {
mInputQueue.enqueue(std::make_unique< Buffer >(std::move(data))); mInputQueue.enqueue(std::make_unique< ZMsg >(std::move(data)));
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Queue a multi-part message to be sent through the socket. * Queue a multi-part message to be sent through the socket.
*/ */
void Send(List && list) void Send(const ZMsg::List & list)
{ {
mInputListQueue.enqueue(std::make_unique< List >(std::move(list))); mInputQueue.enqueue(std::make_unique< ZMsg >(list));
}
/* --------------------------------------------------------------------------------------------
* Queue a multi-part message to be sent through the socket.
*/
void Send(ZMsg::List && list)
{
mInputQueue.enqueue(std::make_unique< ZMsg >(std::move(list)));
} }
protected: protected:
@ -419,16 +536,21 @@ protected:
} }
// Ask for a message, if any // Ask for a message, if any
r = zmq_msg_recv(&msg, mPtr, ZMQ_DONTWAIT); r = zmq_msg_recv(&msg, mPtr, ZMQ_DONTWAIT);
// Is this a multi-part message?
if (zmq_msg_more(&msg) == 1)
{
return RecvMore(msg, r);
}
// Extract the message data // Extract the message data
Item data = std::make_unique< Buffer >(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)), Item item = std::make_unique< ZMsg >(Buffer(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)),
static_cast< Buffer::SzType >(zmq_msg_size(&msg))); static_cast< Buffer::SzType >(zmq_msg_size(&msg))));
// Release this message // Release this message
zmq_msg_close(&msg); zmq_msg_close(&msg);
// Did we have a message? // Did we have a message?
if (r >= 0) if (r >= 0)
{ {
// Put it in the queue // Put it in the queue
mOutputQueue.enqueue(std::move(data)); mOutputQueue.enqueue(std::move(item));
// We received a message // We received a message
return true; return true;
} }
@ -437,7 +559,61 @@ protected:
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Send one message to the socket. * Receive multiple messages from the socket.
*/
bool RecvMore(zmq_msg_t & msg, int r)
{
size_t more_sz = sizeof(int);
// Used to see if more parts follow
int more = 1;
// Create an empty multi-part message container
Item item = std::make_unique< ZMsg >(ZMsg::Multipart{});
// Add the initial message to the list
if (r >= 0)
{
// Save it to the list
item->Push(msg);
// Close the message
zmq_msg_close(&msg);
}
// Keep receiving messages while there's more
do
{
// Initialize an empty message
r = zmq_msg_init(&msg);
// Make sure we have a message
if (r != 0)
{
LogErr("Unable to initialize ZMQ message part");
// Abort everything
return false;
}
// Ask for another message, if any (blocking operation!)
r = zmq_msg_recv(&msg, mPtr, 0);
// Do we actually have a message?
if (r >= 0)
{
// Save it to the list
item->Push(msg);
// Close the message
zmq_msg_close(&msg);
}
// See if the message part last received from the socket was a data part with more parts to follow.
zmq_getsockopt(mPtr, ZMQ_RCVMORE, &more, &more_sz);
} while (more);
// Did we actually have any valid messages?
if (!(item->mList.empty()))
{
mOutputQueue.enqueue(std::move(item));
// Messages were present in the list
return true;
}
// No messages were added
return false;
}
/* --------------------------------------------------------------------------------------------
* Send one queued message to the socket.
*/ */
bool Send() bool Send()
{ {
@ -446,13 +622,13 @@ protected:
// Try to get a message from the queue // Try to get a message from the queue
if (mInputQueue.try_dequeue(data)) if (mInputQueue.try_dequeue(data))
{ {
// Attempt to send the message if (data->mMulti)
int r = zmq_send(mPtr, data->Data(), data->Size(), ZMQ_DONTWAIT);
// Could we send what the message had?
if (r >= 0 && static_cast< Buffer::SzType >(r) != data->Size())
{ {
LogErr("Unable to send data to socket: [%d], {%s}", r, zmq_strerror(errno)); SendMore(data->mList);
// NOTE: Should we put the buffer back into the queue? }
else
{
SendOne(data->mBuff);
} }
// One item was found in the queue // One item was found in the queue
return true; return true;
@ -463,33 +639,37 @@ protected:
} }
} }
/* --------------------------------------------------------------------------------------------
* Send a single message to the socket.
*/
void SendOne(Buffer & buff) const
{
// Attempt to send the message
int r = zmq_send(mPtr, buff.Data(), buff.Position(), ZMQ_DONTWAIT);
// Could we send what the message had?
if (r >= 0 && static_cast< Buffer::SzType >(r) != buff.Position())
{
LogErr("Unable to send data to socket: [%d], {%s}", r, zmq_strerror(errno));
// NOTE: Should we put the buffer back into the queue?
}
}
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Send a multi-part message to the socket. * Send a multi-part message to the socket.
*/ */
bool SendMore() void SendMore(ZMsg::List & list) const
{ {
ListItem mp_msg; // Send all message parts
// Try to get a multi-part message from the queue for (size_t i = 0, n = list.size(); i < n; ++i)
if (mInputListQueue.try_dequeue(mp_msg))
{ {
// Send all message parts // Attempt to send the message
for (size_t i = 0, n = mp_msg->size(); i < n; ++i) int r = zmq_send(mPtr, list[i].Data(), list[i].Position(), (i + 1) == n ? ZMQ_DONTWAIT : ZMQ_SNDMORE);
// Could we send what the message had?
if (r >= 0 && static_cast< Buffer::SzType >(r) != list[i].Position())
{ {
// Attempt to send the message LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno));
int r = zmq_send(mPtr, (*mp_msg)[i].Data(), (*mp_msg)[i].Size(), (i + 1) == n ? ZMQ_DONTWAIT : ZMQ_SNDMORE); // NOTE: Should we abort the whole thing? But we probably already sent some.
// Could we send what the message had?
if (r >= 0 && static_cast< Buffer::SzType >(r) != (*mp_msg)[i].Size())
{
LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno));
// NOTE: Should we abort the whole thing? But we probably already sent some.
}
} }
// One item was found in the queue
return true;
}
else
{
return false; // No item in the queue
} }
} }
}; };
@ -851,7 +1031,7 @@ struct ZSocket
{ {
Validate(); Validate();
ZSkt::List list; ZMsg::List list;
// Extract the messages from the array // Extract the messages from the array
arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) { arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) {
// Extract the buffer from the stack // Extract the buffer from the stack
@ -878,7 +1058,7 @@ struct ZSocket
{ {
Validate(); Validate();
ZSkt::List list; ZMsg::List list;
// Extract the messages from the array // Extract the messages from the array
arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) { arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) {
StackStrF str(vm, -1); StackStrF str(vm, -1);