diff --git a/module/Library/ZMQ.cpp b/module/Library/ZMQ.cpp index eb6417fe..9fba22c0 100644 --- a/module/Library/ZMQ.cpp +++ b/module/Library/ZMQ.cpp @@ -11,30 +11,109 @@ namespace SqMod { SQMOD_DECL_TYPENAME(SqZContext, _SC("SqZmqContext")) SQMOD_DECL_TYPENAME(SqZSocket, _SC("SqZmqSocket")) +// ------------------------------------------------------------------------------------------------ +static void FlushSingleString(HSQUIRRELVM, Function & callback, Buffer & data) +{ + // Transform the message into a script object + LightObj o(static_cast< const SQChar * >(data.Get()), + static_cast< SQInteger >(data.Position())); + // Forward it to the callback + callback(o, false); +} + +// ------------------------------------------------------------------------------------------------ +static void FlushSingleBuffer(HSQUIRRELVM vm, Function & callback, Buffer & data) +{ + // Transform the message into a script object + LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(data)); + // Forward it to the callback + callback(o, false); +} + +// ------------------------------------------------------------------------------------------------ +static void FlushMultiString(HSQUIRRELVM vm, Function & callback, ZMsg::List & list) +{ + // Create a script array + Array a(vm); + // Reserve space upfront + a.Reserve(static_cast< SQInteger >(list.size())); + // Populate the array with elements from the list + a.AppendFromCounted([&list](HSQUIRRELVM vm, SQInteger i) -> bool { + // Are we still withing range of our list? + if (static_cast< size_t >(i) < list.size()) + { + // Transform the message into a script object + sq_pushstring(vm, static_cast< const SQChar * >(list[i].Get()), + static_cast< SQInteger >(list[i].Position())); + // We have an element on the stack + return true; + } + // We don't have an element on the stack + return false; + }); + // Forward it to the callback + callback(a, true); +} + +// ------------------------------------------------------------------------------------------------ +static void FlushMultiBuffer(HSQUIRRELVM vm, Function & callback, ZMsg::List & list) +{ + // Create a script array + Array a(vm); + // Reserve space upfront + a.Reserve(static_cast< SQInteger >(list.size())); + // Populate the array with elements from the list + a.AppendFromCounted([&list](HSQUIRRELVM vm, SQInteger i) -> bool { + // Are we still withing range of our list? + if (static_cast< size_t >(i) < list.size()) + { + // Transform the message into a script object + LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(list[i])); + // Push it on the stack + Var< LightObj >::push(vm, o); + // We have an element on the stack + return true; + } + // We don't have an element on the stack + return false; + }); + // Forward it to the callback + callback(a, true); +} + // ------------------------------------------------------------------------------------------------ void ZSkt::Flush(HSQUIRRELVM vm) { // Need someone to receive the message - Item data; + Item item; // Try to get a message from the queue - while (mOutputQueue.try_dequeue(data)) + while (mOutputQueue.try_dequeue(item)) { // Is there a callback to receive the message? if (!mOnData.IsNull()) { - // Transform the message into a script object - if (mStringMessages) + // Is this a multi-part message? + if (!(item->mMulti)) { - LightObj o(static_cast< const SQChar * >(data->Get()), - static_cast< SQInteger >(data->Size< SQChar >())); - // Forward it to the callback - mOnData(o); + if (mStringMessages) + { + FlushSingleString(vm, mOnData, item->mBuff); + } + else + { + FlushSingleBuffer(vm, mOnData, item->mBuff); + } } else { - LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(*data)); - // Forward it to the callback - mOnData(o); + if (mStringMessages) + { + FlushMultiString(vm, mOnData, item->mList); + } + else + { + FlushMultiBuffer(vm, mOnData, item->mList); + } } } } diff --git a/module/Library/ZMQ.hpp b/module/Library/ZMQ.hpp index e2aa1077..f1f4931a 100644 --- a/module/Library/ZMQ.hpp +++ b/module/Library/ZMQ.hpp @@ -112,6 +112,135 @@ struct ZCtx operator void * () const noexcept { return mPtr; } // NOLINT(google-explicit-constructor) }; +/* ------------------------------------------------------------------------------------------------ + * Core implementation and management for a message. +*/ +struct ZMsg +{ + /* -------------------------------------------------------------------------------------------- + * List of messages. + */ + using List = std::vector< Buffer >; + + /* -------------------------------------------------------------------------------------------- + * Raw union data size. + */ + static constexpr size_t SIZE = sizeof(Buffer) > sizeof(List) ? sizeof(Buffer) : sizeof(List); + + /* -------------------------------------------------------------------------------------------- + * Tag used to indicate a multi-part message. + */ + struct Multipart { }; + + /* -------------------------------------------------------------------------------------------- + * Message contents. + */ + union + { + Buffer mBuff; + List mList; + uint8_t mData[SIZE]; + }; + + /* -------------------------------------------------------------------------------------------- + * Whether this is a multi-part-message. + */ + const bool mMulti; + + /* -------------------------------------------------------------------------------------------- + * Default constructor. + */ + ZMsg() // NOLINT(cppcoreguidelines-pro-type-member-init) + : mBuff(), mMulti(false) + { + } + + /* -------------------------------------------------------------------------------------------- + * Default multi-part constructor. + */ + explicit ZMsg(Multipart) // NOLINT(cppcoreguidelines-pro-type-member-init) + : mList(), mMulti(true) + { + } + + /* -------------------------------------------------------------------------------------------- + * Copy constructor. + */ + explicit ZMsg(const Buffer & b) // NOLINT(cppcoreguidelines-pro-type-member-init,modernize-pass-by-value) + : mBuff(b), mMulti(false) + { + } + + /* -------------------------------------------------------------------------------------------- + * Copy multi-part constructor. + */ + explicit ZMsg(const List & l) // NOLINT(cppcoreguidelines-pro-type-member-init,modernize-pass-by-value) + : mList(l), mMulti(true) + { + } + + /* -------------------------------------------------------------------------------------------- + * Move constructor. + */ + explicit ZMsg(Buffer && b) // NOLINT(cppcoreguidelines-pro-type-member-init) + : mBuff(std::move(b)), mMulti(false) + { + } + + /* -------------------------------------------------------------------------------------------- + * Move multi-part constructor. + */ + explicit ZMsg(List && l) // NOLINT(cppcoreguidelines-pro-type-member-init) + : mList(std::move(l)), mMulti(true) + { + } + + /* -------------------------------------------------------------------------------------------- + * Copy constructor (disabled). + */ + ZMsg(const ZMsg &) = delete; + + /* -------------------------------------------------------------------------------------------- + * Move constructor (disabled). + */ + ZMsg(ZMsg &&) noexcept = delete; + + /* -------------------------------------------------------------------------------------------- + * Destructor. + */ + ~ZMsg() + { + // Is this multi-part? + if (mMulti) + { + mList.~List(); // Invoke list destructor + } + else + { + mBuff.~Buffer(); // Invoke buffer destructor + } + } + + /* -------------------------------------------------------------------------------------------- + * Assignment operator (disabled). + */ + ZMsg & operator = (const ZMsg &) = delete; + + /* -------------------------------------------------------------------------------------------- + * Move assignment (disabled). + */ + ZMsg & operator = (ZMsg &&) noexcept = delete; + + /* -------------------------------------------------------------------------------------------- + * Push the specified message to the multi-part list. + */ + void Push(zmq_msg_t & msg) + { + mList.emplace_back(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)), + static_cast< Buffer::SzType >(zmq_msg_size(&msg))); + } +}; + /* ------------------------------------------------------------------------------------------------ * Core implementation and management for a ZMQ socket. */ @@ -125,28 +254,13 @@ struct ZSkt : SqChainedInstances< ZSkt > /* -------------------------------------------------------------------------------------------- * Message queue type. */ - using Item = std::unique_ptr< Buffer >; + using Item = std::unique_ptr< ZMsg >; /* -------------------------------------------------------------------------------------------- * Message queue type. */ using Queue = moodycamel::ConcurrentQueue< Item >; - /* -------------------------------------------------------------------------------------------- - * List of messages. - */ - using List = std::vector< Buffer >; - - /* -------------------------------------------------------------------------------------------- - * Message list item. - */ - using ListItem = std::unique_ptr< List >; - - /* -------------------------------------------------------------------------------------------- - * Message list queue type. - */ - using ListQueue = moodycamel::ConcurrentQueue< ListItem >; - /* -------------------------------------------------------------------------------------------- * Context pointer. */ @@ -182,11 +296,6 @@ struct ZSkt : SqChainedInstances< ZSkt > */ Queue mInputQueue; - /* -------------------------------------------------------------------------------------------- - * Multi-part messages to be sent through the socket. - */ - ListQueue mInputListQueue; - /* -------------------------------------------------------------------------------------------- * Message received callback. */ @@ -208,7 +317,7 @@ struct ZSkt : SqChainedInstances< ZSkt > ZSkt(const ZCtx::Ptr & ctx, int type) : SqChainedInstances< ZSkt >() , mPtr(nullptr), mRun(true), mStringMessages(true), mType(type), mMtx() - , mOutputQueue(4096), mInputQueue(4096), mInputListQueue(1024) + , mOutputQueue(4096), mInputQueue(4096) , mOnData(), mThread(), mContext(ctx) { // Validate the context @@ -317,7 +426,7 @@ struct ZSkt : SqChainedInstances< ZSkt > // Acquire exclusive access to the socket mMtx.lock(); // Perform tasks until there's none left - if (!Recv() && !Send() && !SendMore()) + if (!Recv() && !Send()) { // Release exclusive access to the socket mMtx.unlock(); @@ -380,7 +489,7 @@ struct ZSkt : SqChainedInstances< ZSkt > */ void Send(const Buffer & data) { - mInputQueue.enqueue(std::make_unique< Buffer >(data)); + mInputQueue.enqueue(std::make_unique< ZMsg >(data)); } /* -------------------------------------------------------------------------------------------- @@ -388,15 +497,23 @@ struct ZSkt : SqChainedInstances< ZSkt > */ void Send(Buffer && data) { - mInputQueue.enqueue(std::make_unique< Buffer >(std::move(data))); + mInputQueue.enqueue(std::make_unique< ZMsg >(std::move(data))); } /* -------------------------------------------------------------------------------------------- * Queue a multi-part message to be sent through the socket. */ - void Send(List && list) + void Send(const ZMsg::List & list) { - mInputListQueue.enqueue(std::make_unique< List >(std::move(list))); + mInputQueue.enqueue(std::make_unique< ZMsg >(list)); + } + + /* -------------------------------------------------------------------------------------------- + * Queue a multi-part message to be sent through the socket. + */ + void Send(ZMsg::List && list) + { + mInputQueue.enqueue(std::make_unique< ZMsg >(std::move(list))); } protected: @@ -419,16 +536,21 @@ protected: } // Ask for a message, if any r = zmq_msg_recv(&msg, mPtr, ZMQ_DONTWAIT); + // Is this a multi-part message? + if (zmq_msg_more(&msg) == 1) + { + return RecvMore(msg, r); + } // Extract the message data - Item data = std::make_unique< Buffer >(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)), - static_cast< Buffer::SzType >(zmq_msg_size(&msg))); + Item item = std::make_unique< ZMsg >(Buffer(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)), + static_cast< Buffer::SzType >(zmq_msg_size(&msg)))); // Release this message zmq_msg_close(&msg); // Did we have a message? if (r >= 0) { // Put it in the queue - mOutputQueue.enqueue(std::move(data)); + mOutputQueue.enqueue(std::move(item)); // We received a message return true; } @@ -437,7 +559,61 @@ protected: } /* -------------------------------------------------------------------------------------------- - * Send one message to the socket. + * Receive multiple messages from the socket. + */ + bool RecvMore(zmq_msg_t & msg, int r) + { + size_t more_sz = sizeof(int); + // Used to see if more parts follow + int more = 1; + // Create an empty multi-part message container + Item item = std::make_unique< ZMsg >(ZMsg::Multipart{}); + // Add the initial message to the list + if (r >= 0) + { + // Save it to the list + item->Push(msg); + // Close the message + zmq_msg_close(&msg); + } + // Keep receiving messages while there's more + do + { + // Initialize an empty message + r = zmq_msg_init(&msg); + // Make sure we have a message + if (r != 0) + { + LogErr("Unable to initialize ZMQ message part"); + // Abort everything + return false; + } + // Ask for another message, if any (blocking operation!) + r = zmq_msg_recv(&msg, mPtr, 0); + // Do we actually have a message? + if (r >= 0) + { + // Save it to the list + item->Push(msg); + // Close the message + zmq_msg_close(&msg); + } + // See if the message part last received from the socket was a data part with more parts to follow. + zmq_getsockopt(mPtr, ZMQ_RCVMORE, &more, &more_sz); + } while (more); + // Did we actually have any valid messages? + if (!(item->mList.empty())) + { + mOutputQueue.enqueue(std::move(item)); + // Messages were present in the list + return true; + } + // No messages were added + return false; + } + + /* -------------------------------------------------------------------------------------------- + * Send one queued message to the socket. */ bool Send() { @@ -446,13 +622,13 @@ protected: // Try to get a message from the queue if (mInputQueue.try_dequeue(data)) { - // Attempt to send the message - int r = zmq_send(mPtr, data->Data(), data->Size(), ZMQ_DONTWAIT); - // Could we send what the message had? - if (r >= 0 && static_cast< Buffer::SzType >(r) != data->Size()) + if (data->mMulti) { - LogErr("Unable to send data to socket: [%d], {%s}", r, zmq_strerror(errno)); - // NOTE: Should we put the buffer back into the queue? + SendMore(data->mList); + } + else + { + SendOne(data->mBuff); } // One item was found in the queue return true; @@ -463,33 +639,37 @@ protected: } } + /* -------------------------------------------------------------------------------------------- + * Send a single message to the socket. + */ + void SendOne(Buffer & buff) const + { + // Attempt to send the message + int r = zmq_send(mPtr, buff.Data(), buff.Position(), ZMQ_DONTWAIT); + // Could we send what the message had? + if (r >= 0 && static_cast< Buffer::SzType >(r) != buff.Position()) + { + LogErr("Unable to send data to socket: [%d], {%s}", r, zmq_strerror(errno)); + // NOTE: Should we put the buffer back into the queue? + } + } + /* -------------------------------------------------------------------------------------------- * Send a multi-part message to the socket. */ - bool SendMore() + void SendMore(ZMsg::List & list) const { - ListItem mp_msg; - // Try to get a multi-part message from the queue - if (mInputListQueue.try_dequeue(mp_msg)) + // Send all message parts + for (size_t i = 0, n = list.size(); i < n; ++i) { - // Send all message parts - for (size_t i = 0, n = mp_msg->size(); i < n; ++i) + // Attempt to send the message + int r = zmq_send(mPtr, list[i].Data(), list[i].Position(), (i + 1) == n ? ZMQ_DONTWAIT : ZMQ_SNDMORE); + // Could we send what the message had? + if (r >= 0 && static_cast< Buffer::SzType >(r) != list[i].Position()) { - // Attempt to send the message - int r = zmq_send(mPtr, (*mp_msg)[i].Data(), (*mp_msg)[i].Size(), (i + 1) == n ? ZMQ_DONTWAIT : ZMQ_SNDMORE); - // Could we send what the message had? - if (r >= 0 && static_cast< Buffer::SzType >(r) != (*mp_msg)[i].Size()) - { - LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno)); - // NOTE: Should we abort the whole thing? But we probably already sent some. - } + LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno)); + // NOTE: Should we abort the whole thing? But we probably already sent some. } - // One item was found in the queue - return true; - } - else - { - return false; // No item in the queue } } }; @@ -851,7 +1031,7 @@ struct ZSocket { Validate(); - ZSkt::List list; + ZMsg::List list; // Extract the messages from the array arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) { // Extract the buffer from the stack @@ -878,7 +1058,7 @@ struct ZSocket { Validate(); - ZSkt::List list; + ZMsg::List list; // Extract the messages from the array arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) { StackStrF str(vm, -1);