From 203a02cb2d35f2a28bf4f73c096b901bbb885c70 Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Wed, 3 Feb 2021 11:49:35 +0200 Subject: [PATCH] Refactor ZMQ to be simpler. Fixed a meory leak. --- module/Library/IO/Buffer.hpp | 8 + module/Library/ZMQ.cpp | 60 ++- module/Library/ZMQ.hpp | 706 +++++++++-------------------------- 3 files changed, 213 insertions(+), 561 deletions(-) diff --git a/module/Library/IO/Buffer.hpp b/module/Library/IO/Buffer.hpp index 2cae0a67..f5726269 100644 --- a/module/Library/IO/Buffer.hpp +++ b/module/Library/IO/Buffer.hpp @@ -144,6 +144,14 @@ public: return m_Buffer; } + /* -------------------------------------------------------------------------------------------- + * Retrieve a reference to the managed memory buffer. + */ + SQMOD_NODISCARD Buffer & GetInst() const + { + return *m_Buffer; + } + /* -------------------------------------------------------------------------------------------- * Validate the managed memory buffer reference. */ diff --git a/module/Library/ZMQ.cpp b/module/Library/ZMQ.cpp index 2edbf4de..eb6417fe 100644 --- a/module/Library/ZMQ.cpp +++ b/module/Library/ZMQ.cpp @@ -9,24 +9,33 @@ namespace SqMod { // ------------------------------------------------------------------------------------------------ SQMOD_DECL_TYPENAME(SqZContext, _SC("SqZmqContext")) -SQMOD_DECL_TYPENAME(SqZMessage, _SC("SqZmqMessage")) SQMOD_DECL_TYPENAME(SqZSocket, _SC("SqZmqSocket")) // ------------------------------------------------------------------------------------------------ void ZSkt::Flush(HSQUIRRELVM vm) { // Need someone to receive the message - ZMsg msg; + Item data; // Try to get a message from the queue - while (mOutputQueue.try_dequeue(msg)) + while (mOutputQueue.try_dequeue(data)) { // Is there a callback to receive the message? if (!mOnData.IsNull()) { // Transform the message into a script object - LightObj o(SqTypeIdentity< ZMessage >{}, vm, std::make_shared< ZMsg >(std::move(msg))); - // Forward it to the callback - mOnData(o); + if (mStringMessages) + { + LightObj o(static_cast< const SQChar * >(data->Get()), + static_cast< SQInteger >(data->Size< SQChar >())); + // Forward it to the callback + mOnData(o); + } + else + { + LightObj o(SqTypeIdentity< SqBuffer >{}, vm, std::move(*data)); + // Forward it to the callback + mOnData(o); + } } } } @@ -37,9 +46,11 @@ LightObj ZContext::Socket(int type) const return LightObj(SqTypeIdentity< ZSocket >{}, SqVM(), *this, type); } // ------------------------------------------------------------------------------------------------ -LightObj ZSocket::GetOpt(int opt) const +LightObj ZSocket::GetOpt(int opt) { int r = 0; + // Acquire exclusive access to the socket + std::lock_guard< std::mutex > guard(Valid().mMtx); // Identify option switch (opt) { @@ -177,7 +188,7 @@ LightObj ZSocket::GetOpt(int opt) const // Validate result if (r != 0) { - STHROWF("Unable to retrieve socket option: [%d] %s", r, zmq_strerror(errno)); + STHROWF("Unable to retrieve socket option: [{}] {}", r, zmq_strerror(errno)); } SQ_UNREACHABLE; // Never reaches here @@ -188,6 +199,8 @@ LightObj ZSocket::GetOpt(int opt) const void ZSocket::SetOpt(int opt, LightObj & value) { int r = 0; + // Acquire exclusive access to the socket + std::lock_guard< std::mutex > guard(Valid().mMtx); // Identify option switch (opt) { @@ -297,7 +310,7 @@ void ZSocket::SetOpt(int opt, LightObj & value) // Validate result if (r != 0) { - STHROWF("Unable to modify socket option: [%d] %s", r, zmq_strerror(errno)); + STHROWF("Unable to modify socket option: [{}] {}", r, zmq_strerror(errno)); } // Never reaches here SQ_UNREACHABLE; @@ -359,29 +372,6 @@ void Register_ZMQ(HSQUIRRELVM vm) .Func(_SC("Socket"), &ZContext::Socket) ); - // -------------------------------------------------------------------------------------------- - ns.Bind(_SC("Message"), - Class< ZMessage, NoCopy< ZMessage > >(vm, SqZMessage::Str) - // Constructors - .Ctor() - .Ctor< StackStrF & >() - // Meta-methods - .SquirrelFunc(_SC("_typename"), &SqZMessage::Fn) - // Properties - .Prop(_SC("IsNull"), &ZMessage::IsNull) - .Prop(_SC("More"), &ZMessage::More) - .Prop(_SC("Size"), &ZMessage::GetSize) - // Member Methods - .Func(_SC("Get"), &ZMessage::Get) - .Func(_SC("Set"), &ZMessage::Set) - .Func(_SC("Meta"), &ZMessage::Meta) - .Func(_SC("Copy"), &ZMessage::Copy) - .Func(_SC("ToString"), &ZMessage::ToString) - .Func(_SC("FromString"), &ZMessage::FromString) - .Func(_SC("ToBuffer"), &ZMessage::ToBuffer) - .Func(_SC("FromBuffer"), &ZMessage::FromBuffer) - ); - // -------------------------------------------------------------------------------------------- ns.Bind(_SC("Socket"), Class< ZSocket, NoCopy< ZSocket > >(vm, SqZSocket::Str) @@ -391,16 +381,16 @@ void Register_ZMQ(HSQUIRRELVM vm) .SquirrelFunc(_SC("_typename"), &SqZSocket::Fn) // Properties .Prop(_SC("IsNull"), &ZSocket::IsNull) + .Prop(_SC("StringMessages"), &ZSocket::GetStringMessages, &ZSocket::SetStringMessages) // Member Methods .CbFunc(_SC("OnData"), &ZSocket::OnData) .FmtFunc(_SC("Bind"), &ZSocket::Bind) .FmtFunc(_SC("Connect"), &ZSocket::Connect) .FmtFunc(_SC("Disconnect"), &ZSocket::Disconnect) - .Func(_SC("Run"), &ZSocket::Run) .Func(_SC("Close"), &ZSocket::Close) - .Func(_SC("SendMessage"), &ZSocket::SendMessage) + .Func(_SC("SendBuffer"), &ZSocket::SendBuffer) .FmtFunc(_SC("SendString"), &ZSocket::SendString) - .Func(_SC("SendMessages"), &ZSocket::SendMessages) + .Func(_SC("SendBuffers"), &ZSocket::SendBuffers) .Func(_SC("SendStrings"), &ZSocket::SendStrings) .Func(_SC("GetOpt"), &ZSocket::GetOpt) .Func(_SC("SetOpt"), &ZSocket::SetOpt) diff --git a/module/Library/ZMQ.hpp b/module/Library/ZMQ.hpp index 3b08676d..481ae5bc 100644 --- a/module/Library/ZMQ.hpp +++ b/module/Library/ZMQ.hpp @@ -22,38 +22,10 @@ namespace SqMod { // ------------------------------------------------------------------------------------------------ struct ZSkt; -struct ZMsg; struct ZCtx; struct ZSocket; -struct ZMessage; struct ZContext; -/* ------------------------------------------------------------------------------------------------ - * Given as a function pointer to free memory using std::free(data). -*/ -inline void ZmqFreeSTD(void * data, void *) -{ - std::free(data); -} - -/* ------------------------------------------------------------------------------------------------ - * Given as a function pointer to free memory using delete data. -*/ -template < class T > inline void ZmqFreeDelete(void * data, void *) -{ - // If this throws an exception we may as well just be fked. But very (VERY!) low chances. - delete static_cast< T * >(data); -} - -/* ------------------------------------------------------------------------------------------------ - * Given as a function pointer to free memory using delete[] data. -*/ -template < class T > inline void ZmqFreeDeleteArray(void * data, void *) -{ - // If this throws an exception we may as well just be fked. But very (VERY!) low chances. - delete[] static_cast< T * >(data); -} - /* ------------------------------------------------------------------------------------------------ * Allocate raw memory for a string, fill it with data from a StackStrF instance and return it. */ @@ -85,7 +57,6 @@ struct ZCtx * Smart pointers to this type. Helper typedefs. */ using Ptr = std::shared_ptr< ZCtx >; - using Ref = std::weak_ptr< ZCtx >; /* -------------------------------------------------------------------------------------------- * Context pointer. @@ -100,7 +71,7 @@ struct ZCtx { if (!mPtr) { - STHROWF("Unable to initialize context: %s", zmq_strerror(errno)); + STHROWF("Unable to initialize context: {}", zmq_strerror(errno)); } } @@ -163,166 +134,6 @@ struct ZCtx operator void * () const noexcept { return mPtr; } // NOLINT(google-explicit-constructor) }; -/* ------------------------------------------------------------------------------------------------ - * Core implementation and management for a ZMQ message. -*/ -struct ZMsg -{ - /* -------------------------------------------------------------------------------------------- - * Smart pointers to this type. Helper typedefs. - */ - using Ptr = std::shared_ptr< ZMsg >; - using Ref = std::weak_ptr< ZMsg >; - - /* -------------------------------------------------------------------------------------------- - * The underlying message. - */ - std::unique_ptr< zmq_msg_t > mPtr; - - /* -------------------------------------------------------------------------------------------- - * Default constructor. - */ - ZMsg() - : mPtr(std::make_unique< zmq_msg_t >()) - { - int r = zmq_msg_init(mPtr.get()); - // Validate result - if (r != 0) - { - STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); - } - } - - /* -------------------------------------------------------------------------------------------- - * Explicit message size constructor. - */ - explicit ZMsg(SQInteger size) - : mPtr(std::make_unique< zmq_msg_t >()) - { - int r = zmq_msg_init_size(mPtr.get(), ClampL< SQInteger, size_t >(size)); - // Validate result - if (r != 0) - { - STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); - } - } - - /* -------------------------------------------------------------------------------------------- - * Explicit message data and size constructor. - */ - ZMsg(void * data, SQInteger size, zmq_free_fn * ffn, void * hint = nullptr) - : mPtr(std::make_unique< zmq_msg_t >()) - { - // Make sure there's data if required - if (size > 0 && !data) - { - STHROWF("Invalid message data"); - } - // Now the message can be initialized - int r = zmq_msg_init_data(mPtr.get(), data, ClampL< SQInteger, size_t >(size), ffn, hint); - // Validate result - if (r != 0) - { - STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); - } - } - - /* -------------------------------------------------------------------------------------------- - * Copy constructor. - */ - ZMsg(const ZMsg & o) - : mPtr(std::make_unique< zmq_msg_t >()) - { - int r = zmq_msg_init(mPtr.get()); - // Validate result - if (r != 0) - { - LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); - } - r = zmq_msg_copy(mPtr.get(), o.mPtr.get()); - // Validate result - if (r != 0) - { - LogFtl("Unable to copy message: [%d] %s", r, zmq_strerror(errno)); - } - } - - /* -------------------------------------------------------------------------------------------- - * Move constructor. - */ - ZMsg(ZMsg && o) noexcept = default; - - /* -------------------------------------------------------------------------------------------- - * Destructor. - */ - ~ZMsg() - { - if (mPtr) - { - zmq_msg_close(mPtr.get()); - // We don't really care if the above failed (i.e. returned EFAULT) - // We probably did it already before but we need to be sure - // This is something I can live with in this under the circumstances - } - } - - /* -------------------------------------------------------------------------------------------- - * Assignment operator. - */ - ZMsg & operator = (const ZMsg & o) - { - // Prevent self assignment - if (this != &o) - { - // We need a message, even if empty - if (!mPtr) - { - int r = zmq_msg_init(mPtr.get()); - // Validate result - if (r != 0) - { - LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); - } - } - // Do we have a message? - if (mPtr) - { - int r = zmq_msg_copy(mPtr.get(), o.mPtr.get()); - // Validate result - if (r != 0) - { - LogFtl("Unable to copy message: [%d] %s", r, zmq_strerror(errno)); - } - } - } - return *this; - } - - /* -------------------------------------------------------------------------------------------- - * Move assignment. - */ - ZMsg & operator = (ZMsg && o) noexcept - { - // Prevent self assignment - if (this != &o) - { - // Close current message, if any - if (mPtr) - { - zmq_msg_close(mPtr.get()); - } - // Now the message can be moved - mPtr = std::move(o.mPtr); - } - return *this; - } - - /* -------------------------------------------------------------------------------------------- - * Implicit conversion to const message pointer (const zmq_msg_t *) operator. - */ - operator zmq_msg_t * () const noexcept { return mPtr.get(); } // NOLINT(google-explicit-constructor) -}; - /* ------------------------------------------------------------------------------------------------ * Core implementation and management for a ZMQ socket. */ @@ -332,23 +143,27 @@ struct ZSkt : SqChainedInstances< ZSkt > * Smart pointers to this type. Helper typedefs. */ using Ptr = std::shared_ptr< ZSkt >; - using Ref = std::weak_ptr< ZSkt >; + + /* -------------------------------------------------------------------------------------------- + * Message queue type. + */ + using Item = std::unique_ptr< Buffer >; + + /* -------------------------------------------------------------------------------------------- + * Message queue type. + */ + using Queue = moodycamel::ConcurrentQueue< Item >; /* -------------------------------------------------------------------------------------------- * List of messages. */ - using List = std::vector< ZMsg >; + using List = std::vector< Buffer >; /* -------------------------------------------------------------------------------------------- * Message list item. */ using ListItem = std::unique_ptr< List >; - /* -------------------------------------------------------------------------------------------- - * Message queue type. - */ - using Queue = moodycamel::ConcurrentQueue< ZMsg >; - /* -------------------------------------------------------------------------------------------- * Message list queue type. */ @@ -362,7 +177,17 @@ struct ZSkt : SqChainedInstances< ZSkt > /* -------------------------------------------------------------------------------------------- * Socket status. */ - int mStatus; + bool mRun; + + /* -------------------------------------------------------------------------------------------- + * Messages should be delivered as string instead of binary data. + */ + bool mStringMessages; + + /* -------------------------------------------------------------------------------------------- + * Socket type. + */ + int mType; /* -------------------------------------------------------------------------------------------- * Synchronization mutex. @@ -394,22 +219,52 @@ struct ZSkt : SqChainedInstances< ZSkt > */ std::thread mThread; + /* -------------------------------------------------------------------------------------------- + * Socket context. + */ + ZCtx::Ptr mContext; + /* -------------------------------------------------------------------------------------------- * Base constructor. */ - ZSkt(void * ctx, int type) + ZSkt(const ZCtx::Ptr & ctx, int type) : SqChainedInstances< ZSkt >() - /* normally we'd validate ctx. but i have a feeling we'd be fine here */ - , mPtr(zmq_socket(ctx, type)), mStatus(0), mMtx() + , mPtr(nullptr), mRun(true), mStringMessages(true), mType(type), mMtx() , mOutputQueue(4096), mInputQueue(4096), mInputListQueue(1024) - , mOnData(), mThread() + , mOnData(), mThread(), mContext(ctx) { - if (!mPtr) + using namespace std::chrono_literals; + // Validate the context + if (!ctx) { - STHROWF("Unable to initialize socket: %s", zmq_strerror(errno)); + STHROWF("Invalid context"); } + // Create the processing thread + mThread = std::thread(&ZSkt::Proc, this); // Remember this instance ChainInstance(); + // Wait for the socket to be created + for (int n = 0; n < 100; ++n) + { + // Acquire exclusive access to the socket + mMtx.lock(); + // Was the socket created? + if (mPtr) + { + // Release exclusive access to the socket + mMtx.unlock(); + // Socket created + break; + } + else + { + // Release exclusive access to the socket + mMtx.unlock(); + // Wait for the socket to be created + std::this_thread::sleep_for(10ms); + } + } + // If it wasn't created by this point then something isn't right } /* -------------------------------------------------------------------------------------------- @@ -427,14 +282,10 @@ struct ZSkt : SqChainedInstances< ZSkt > */ ~ZSkt() { + // Anything to close? if (mPtr) { - int r = zmq_close(mPtr); - // Just in case - if (r != 0) - { - LogFtl("Socket failed to close properly: [%d], %s", r, zmq_strerror(errno)); - } + Close(); } // Forget about this instance UnchainInstance(); @@ -468,18 +319,53 @@ struct ZSkt : SqChainedInstances< ZSkt > */ void Proc() { - - while (mStatus > 0) + // Acquire exclusive access to the socket + mMtx.lock(); + // Create the socket in this thread + mPtr = zmq_socket(*mContext, mType); + // Release exclusive access to the socket + mMtx.unlock(); + // Validate the socket + if (!mPtr) + { + LogErr("Unable to initialize socket: %s", zmq_strerror(errno)); + // Stop the thread + return; + } + // Enter processing loop + while (mRun) { using namespace std::chrono_literals; // Acquire exclusive access to the socket - std::lock_guard< std::mutex > guard(mMtx); + mMtx.lock(); // Perform tasks until there's none left if (!Recv() && !Send() && !SendMore()) { + // Release exclusive access to the socket + mMtx.unlock(); // Don't exhaust resources pointlessly std::this_thread::sleep_for(50ms); } + else + { + // Release exclusive access to the socket + mMtx.unlock(); + } + } + // Acquire exclusive access to the socket + mMtx.lock(); + // Forget about the context + mContext.reset(); + // Close the socket + int r = zmq_close(mPtr); + // Forget about it + mPtr = nullptr; + // Release exclusive access to the socket + mMtx.unlock(); + // Validate result + if (r != 0) + { + LogErr("Unable to close socket: [{}] {}", r, zmq_strerror(errno)); } } @@ -499,55 +385,38 @@ struct ZSkt : SqChainedInstances< ZSkt > // Acquire exclusive access mMtx.lock(); // Stop the loop - mStatus = 0; + mRun = false; // Yield exclusive access mMtx.unlock(); // Wait for the thread mThread.join(); } - // Make sure it wasn't closed already - if (mPtr != nullptr) + else { - // Now close the socket - int r = zmq_close(mPtr); - // Forget about this socket - mPtr = nullptr; - // Validate result - if (r != 0) - { - STHROWF("Unable to close socket: [%d] %s", r, zmq_strerror(errno)); - } + mRun = false; // Just in case } } /* -------------------------------------------------------------------------------------------- * Queue a message to be sent through the socket. */ - void Send(ZMsg & msg) + void Send(const Buffer & data) { - // Queue it - mInputQueue.enqueue(std::move(msg)); - // Initialize it back to default - msg = ZMsg(); + mInputQueue.enqueue(std::make_unique< Buffer >(data)); } /* -------------------------------------------------------------------------------------------- * Queue a message to be sent through the socket. */ - void Send(const SQChar * str, size_t length) + void Send(Buffer && data) { - // Create a new message - ZMsg msg(static_cast< SQInteger >(length)); - // Populate it with data - std::memcpy(zmq_msg_data(msg), str, length); - // Queue it - mInputQueue.enqueue(std::move(msg)); + mInputQueue.enqueue(std::make_unique< Buffer >(std::move(data))); } /* -------------------------------------------------------------------------------------------- * Queue a multi-part message to be sent through the socket. */ - void Send(List & list) + void Send(List && list) { mInputListQueue.enqueue(std::make_unique< List >(std::move(list))); } @@ -560,14 +429,28 @@ protected: bool Recv() { // Need someone to receive the message - ZMsg msg; + zmq_msg_t msg; + // Initialize to an empty message + int r = zmq_msg_init(&msg); + // Make sure we have a message + if (r != 0) + { + LogErr("Unable to initialize ZMQ message"); + // We couldn't receive anything + return false; + } // Ask for a message, if any - int r = zmq_msg_recv(msg, mPtr, ZMQ_DONTWAIT); + r = zmq_msg_recv(&msg, mPtr, ZMQ_DONTWAIT); + // Extract the message data + Item data = std::make_unique< Buffer >(static_cast< Buffer::ConstPtr >(zmq_msg_data(&msg)), + static_cast< Buffer::SzType >(zmq_msg_size(&msg))); + // Release this message + zmq_msg_close(&msg); // Did we have a message? if (r >= 0) { // Put it in the queue - mOutputQueue.enqueue(std::move(msg)); + mOutputQueue.enqueue(std::move(data)); // We received a message return true; } @@ -581,16 +464,17 @@ protected: bool Send() { // Need someone to receive the message - ZMsg msg; + Item data; // Try to get a message from the queue - if (mInputQueue.try_dequeue(msg)) + if (mInputQueue.try_dequeue(data)) { // Attempt to send the message - int r = zmq_msg_send(msg, mPtr, ZMQ_DONTWAIT); + int r = zmq_send(mPtr, data->Data(), data->Size(), ZMQ_DONTWAIT); // Could we send what the message had? - if (r != zmq_msg_size(msg)) + if (r >= 0 && static_cast< Buffer::SzType >(r) != data->Size()) { LogErr("Unable to send data to socket: [%d], %s", r, zmq_strerror(errno)); + // Could not send. NOTE: Should we put the buffer back into the queue? } // One item was found in the queue return true; @@ -610,17 +494,16 @@ protected: // Try to get a multi-part message from the queue if (mInputListQueue.try_dequeue(mp_msg)) { - // Need someone to receive the message - ZMsg msg; // Send all message parts for (size_t i = 0, n = mp_msg->size(); i < n; ++i) { // Attempt to send the message - int r = zmq_msg_send((*mp_msg)[i], mPtr, (i + 1) == n ? ZMQ_DONTWAIT : ZMQ_SNDMORE); + int r = zmq_send(mPtr, (*mp_msg)[i].Data(), (*mp_msg)[i].Size(), (i + 1) == n ? ZMQ_DONTWAIT : ZMQ_SNDMORE); // Could we send what the message had? - if (r != zmq_msg_size(msg)) + if (r >= 0 && static_cast< Buffer::SzType >(r) != (*mp_msg)[i].Size()) { LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno)); + // Could not send. NOTE: Should we abort the whole thing? But we probably already sent some. } } // One item was found in the queue @@ -641,13 +524,13 @@ struct ZContext /* -------------------------------------------------------------------------------------------- * Pointer to the interfaced context. */ - ZCtx::Ptr mPtr; + ZCtx::Ptr mHnd; /* -------------------------------------------------------------------------------------------- * Default constructor. */ ZContext() - : mPtr(std::make_shared< ZCtx >()) + : mHnd(std::make_shared< ZCtx >()) { } @@ -655,7 +538,7 @@ struct ZContext * Pointer constructor. */ explicit ZContext(ZCtx::Ptr ptr) - : mPtr(std::move(ptr)) + : mHnd(std::move(ptr)) { } @@ -689,7 +572,7 @@ struct ZContext */ void Validate() const { - if (!mPtr) + if (!mHnd) { STHROWF("Invalid context instance"); } @@ -698,29 +581,29 @@ struct ZContext /* -------------------------------------------------------------------------------------------- * Make sure a context instance is referenced and return the context. */ - SQMOD_NODISCARD ZCtx & Valid() { Validate(); return *mPtr; } + SQMOD_NODISCARD ZCtx & Valid() { Validate(); return *mHnd; } /* -------------------------------------------------------------------------------------------- * Make sure a context instance is referenced and return the context. */ - SQMOD_NODISCARD const ZCtx & Valid() const { Validate(); return *mPtr; } + SQMOD_NODISCARD const ZCtx & Valid() const { Validate(); return *mHnd; } /* -------------------------------------------------------------------------------------------- * Make sure a context instance is referenced and return the reference. */ - SQMOD_NODISCARD ZCtx::Ptr & ValidRef() { Validate(); return mPtr; } + SQMOD_NODISCARD ZCtx::Ptr & ValidRef() { Validate(); return mHnd; } /* -------------------------------------------------------------------------------------------- * Make sure a context instance is referenced and return the reference. */ - SQMOD_NODISCARD const ZCtx::Ptr & ValidRef() const { Validate(); return mPtr; } + SQMOD_NODISCARD const ZCtx::Ptr & ValidRef() const { Validate(); return mHnd; } /* -------------------------------------------------------------------------------------------- * Check if a context instance is referenced. */ SQMOD_NODISCARD bool IsNull() const { - return static_cast< bool >(mPtr); + return static_cast< bool >(mHnd); } /* -------------------------------------------------------------------------------------------- @@ -740,7 +623,7 @@ struct ZContext // Validate result if (r != 0) { - STHROWF("Unable to set context option: [%d] %s", r, zmq_strerror(errno)); + STHROWF("Unable to set context option: [{}] {}", r, zmq_strerror(errno)); } } @@ -753,7 +636,7 @@ struct ZContext // Validate result if (r != 0) { - STHROWF("Unable to shutdown context: %s", zmq_strerror(errno)); + STHROWF("Unable to shutdown context: {}", zmq_strerror(errno)); } } @@ -763,232 +646,6 @@ struct ZContext SQMOD_NODISCARD LightObj Socket(int type) const; }; -/* ------------------------------------------------------------------------------------------------ - * Interface for ZMQ messages. -*/ -struct ZMessage -{ - /* -------------------------------------------------------------------------------------------- - * Pointer to the interfaced message. - */ - ZMsg::Ptr mHnd; - - /* -------------------------------------------------------------------------------------------- - * Default constructor. - */ - ZMessage() - : mHnd() - { - } - - /* -------------------------------------------------------------------------------------------- - * Explicit message data and size constructor. - */ - ZMessage(StackStrF & data) - : mHnd(std::make_shared< ZMsg >(data.mLen)) - { - // Copy the string in the memory buffer - std::memcpy(zmq_msg_data(*mHnd), data.mPtr, static_cast< size_t >(data.mLen)); - /* normally you'd have to do static_cast< size_t >(data.mLen) * sizeof(SQChar) */ - /* but this SQChar is required to be 1 byte so we don't bother with it */ - } - - /* -------------------------------------------------------------------------------------------- - * Pointer constructor. - */ - explicit ZMessage(ZMsg::Ptr ptr) - : mHnd(std::move(ptr)) - { - } - - /* -------------------------------------------------------------------------------------------- - * Copy constructor. - */ - ZMessage(const ZMessage &) = default; - - /* -------------------------------------------------------------------------------------------- - * Move constructor. - */ - ZMessage(ZMessage &&) noexcept = default; - - /* -------------------------------------------------------------------------------------------- - * Destructor. - */ - ~ZMessage() = default; - - /* -------------------------------------------------------------------------------------------- - * Assignment operator. - */ - ZMessage & operator = (const ZMessage &) = default; - - /* -------------------------------------------------------------------------------------------- - * Move assignment. - */ - ZMessage & operator = (ZMessage &&) noexcept = default; - - /* -------------------------------------------------------------------------------------------- - * Make sure a message instance is referenced. - */ - void Validate() const - { - if (!mHnd) - { - STHROWF("Invalid message instance"); - } - } - - /* -------------------------------------------------------------------------------------------- - * Make sure a message instance is referenced and return the message. - */ - SQMOD_NODISCARD ZMsg & Valid() { Validate(); return *mHnd; } - - /* -------------------------------------------------------------------------------------------- - * Make sure a message instance is referenced and return the message. - */ - SQMOD_NODISCARD const ZMsg & Valid() const { Validate(); return *mHnd; } - - /* -------------------------------------------------------------------------------------------- - * Make sure a message instance is referenced and return the reference. - */ - SQMOD_NODISCARD ZMsg::Ptr & ValidRef() { Validate(); return mHnd; } - - /* -------------------------------------------------------------------------------------------- - * Make sure a message instance is referenced and return the reference. - */ - SQMOD_NODISCARD const ZMsg::Ptr & ValidRef() const { Validate(); return mHnd; } - - /* -------------------------------------------------------------------------------------------- - * Check if a message instance is referenced. - */ - SQMOD_NODISCARD bool IsNull() const - { - return static_cast< bool >(mHnd); - } - - /* -------------------------------------------------------------------------------------------- - * Retrieve the value of a property. - */ - SQMOD_NODISCARD int Get(int opt) const - { - return zmq_msg_get(Valid(), opt); - } - - /* -------------------------------------------------------------------------------------------- - * Modify the value of an property. - */ - ZMessage & Set(int prop, int value) - { - int r = zmq_msg_set(Valid(), prop, value); - // Validate result - if (r != 0) - { - STHROWF("Unable to set message option: [%d] %s", r, zmq_strerror(errno)); - } - // Allow chaining - return *this; - } - - /* -------------------------------------------------------------------------------------------- - * Retrieve the value of a meta-data property. - */ - SQMOD_NODISCARD const SQChar * Meta(StackStrF & prop) const - { - return zmq_msg_gets (Valid(), prop.mPtr); - } - - /* -------------------------------------------------------------------------------------------- - * Copy another message. - */ - ZMessage & Copy(ZMessage & msg) - { - int r = zmq_msg_copy(Valid(), msg.Valid()); - // Validate result - if (r != 0) - { - STHROWF("Unable to copy message: [%d] %s", r, zmq_strerror(errno)); - } - // Allow chaining - return *this; - } - - /* -------------------------------------------------------------------------------------------- - * Indicate if there are more message parts to receive. - */ - SQMOD_NODISCARD bool More() const - { - return static_cast< bool >(zmq_msg_more(Valid())); - } - - /* -------------------------------------------------------------------------------------------- - * Retrieve message content size in bytes. - */ - SQMOD_NODISCARD SQInteger GetSize() const - { - return static_cast< SQInteger >(zmq_msg_size(Valid())); - } - - /* -------------------------------------------------------------------------------------------- - * Retrieve the message data as a string. - */ - SQMOD_NODISCARD LightObj ToString() const - { - return LightObj(static_cast< const SQChar * >(zmq_msg_data(Valid())), GetSize()); - } - - /* -------------------------------------------------------------------------------------------- - * Generate a message from a string. - */ - ZMessage & FromString(StackStrF & data) - { - // If there's no handle we make one - if (!mHnd) - { - mHnd = (std::make_shared< ZMsg >(data.mLen)); - } - else - { - (*mHnd) = ZMsg(data.mLen); // Update the current one - } - // Copy the string in the memory buffer - std::memcpy(zmq_msg_data(*mHnd), data.mPtr, static_cast< size_t >(data.mLen)); - // Allow chaining - return *this; - } - - /* -------------------------------------------------------------------------------------------- - * Retrieve the message data as a buffer. - */ - SQMOD_NODISCARD LightObj ToBuffer() const - { - Validate(); - // Create the buffer instance and return it - return LightObj(SqTypeIdentity< SqBuffer >{}, SqVM(), - Buffer(static_cast< Buffer::ConstPtr >(zmq_msg_data(*mHnd)), zmq_msg_size(*mHnd), 0)); - } - - /* -------------------------------------------------------------------------------------------- - * Generate a message from a buffer. - */ - ZMessage & FromBuffer(SqBuffer & data) - { - data.ValidateDeeper(); - // If there's no handle we make one - if (!mHnd) - { - mHnd = (std::make_shared< ZMsg >(data.GetPosition())); - } - else - { - (*mHnd) = ZMsg(data.GetPosition()); // Update the current one - } - - // Copy the string in the memory buffer - std::memcpy(zmq_msg_data(*mHnd), data.GetRef()->Data(), static_cast< size_t >(data.GetPosition())); - // Allow chaining - return *this; - } -}; - /* ------------------------------------------------------------------------------------------------ * Interface for ZMQ sockets. */ @@ -1003,7 +660,7 @@ struct ZSocket * Default constructor. */ ZSocket(const ZContext & ctx, int type) - : mHnd(std::make_shared< ZSkt >(ctx.Valid(), type)) + : mHnd(std::make_shared< ZSkt >(ctx.mHnd, type)) { } @@ -1085,13 +742,29 @@ struct ZSocket /* -------------------------------------------------------------------------------------------- * Retrieve the value of a socket option. */ - SQMOD_NODISCARD LightObj GetOpt(int opt) const; + SQMOD_NODISCARD LightObj GetOpt(int opt); /* -------------------------------------------------------------------------------------------- * Modify the value of a socket option. */ void SetOpt(int opt, LightObj & value); + /* -------------------------------------------------------------------------------------------- + * Instruct the socket to always deliver messages as strings instead of binary data. + */ + SQMOD_NODISCARD bool GetStringMessages() const + { + return Valid().mStringMessages; + } + + /* -------------------------------------------------------------------------------------------- + * Instruct the socket to always deliver messages as strings instead of binary data. + */ + void SetStringMessages(bool value) + { + Valid().mStringMessages = value; + } + /* -------------------------------------------------------------------------------------------- * Callback to receive incoming messages. */ @@ -1114,7 +787,7 @@ struct ZSocket // Validate result if (r != 0) { - STHROWF("Unable to bind socket: [%d] %s", r, zmq_strerror(errno)); + STHROWF("Unable to bind socket: [{}] {}", r, zmq_strerror(errno)); } // Allow chaining return *this; @@ -1132,7 +805,7 @@ struct ZSocket // Validate result if (r != 0) { - STHROWF("Unable to connect socket: [%d] %s", r, zmq_strerror(errno)); + STHROWF("Unable to connect socket: [{}] {}", r, zmq_strerror(errno)); } // Allow chaining return *this; @@ -1150,30 +823,12 @@ struct ZSocket // Validate result if (r != 0) { - STHROWF("Unable to disconnect socket: [%d] %s", r, zmq_strerror(errno)); + STHROWF("Unable to disconnect socket: [{}] {}", r, zmq_strerror(errno)); } // Allow chaining return *this; } - /* -------------------------------------------------------------------------------------------- - * Run the managed socket. - */ - ZSocket & Run() - { - // Make sure thread exists already - if (Valid().mThread.joinable()) - { - STHROWF("Socket is already running"); - } - // Allow the thread to run - mHnd->mStatus = 1; - // Now we can create the thread - Valid().mThread = std::thread(&ZSkt::Proc, &Valid()); - // Allow chaining - return *this; - } - /* -------------------------------------------------------------------------------------------- * Close the managed socket. */ @@ -1185,11 +840,14 @@ struct ZSocket } /* -------------------------------------------------------------------------------------------- - * Send a ZMQ message to the socket. + * Send a binary message to the socket. */ - ZSocket & SendMessage(ZMessage & msg) + ZSocket & SendBuffer(SqBuffer & data) { - Valid().Send(msg.Valid()); + // Validate buffer + data.ValidateDeeper(); + // Create a copy and send it + Valid().Send(Buffer(data.GetInst().Data(), data.GetInst().Position())); // Allow chaining return *this; } @@ -1197,9 +855,9 @@ struct ZSocket /* -------------------------------------------------------------------------------------------- * Send a string message to the socket. */ - ZSocket & SendString(StackStrF & msg) + ZSocket & SendString(StackStrF & str) { - Valid().Send(msg.mPtr, msg.GetSize()); + Valid().Send(Buffer(static_cast< Buffer::ConstPtr >(str.mPtr), ClampL< SQInteger, Buffer::SzType >(str.mLen))); // Allow chaining return *this; } @@ -1207,28 +865,26 @@ struct ZSocket /* -------------------------------------------------------------------------------------------- * Send a multi-part ZMQ message to the socket. */ - ZSocket & SendMessages(Array & arr) + ZSocket & SendBuffers(Array & arr) { Validate(); ZSkt::List list; // Extract the messages from the array arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) { - // Extract the message from the stack - ZMessage * msg = ClassType< ZMessage >::GetInstance(vm, -1); + // Extract the buffer from the stack + SqBuffer * data = ClassType< SqBuffer >::GetInstance(vm, -1); // In case we didn't fail at the null part (it should) - if (msg && msg->mHnd->mPtr) + if (data && data->GetRef()) { // Add the message to the list - list.emplace_back(std::move((*msg->mHnd))); - // Initialize it back to default - (*msg->mHnd) = ZMsg(); + list.emplace_back(data->GetInst().Data(), data->GetInst().Position()); } // Continue return SQ_OK; }); // Send the message list - mHnd->Send(list); + mHnd->Send(std::move(list)); // Allow chaining return *this; } @@ -1250,14 +906,12 @@ struct ZSocket return str.mRes; // Abort } // Create a new message - list.emplace_back(str.mLen); - // Populate it with data - std::memcpy(zmq_msg_data(list.back()), str.mPtr, static_cast< size_t >(str.mLen)); + list.emplace_back(static_cast< Buffer::ConstPtr >(str.mPtr), ClampL< SQInteger, Buffer::SzType >(str.mLen)); // Continue return SQRESULT(SQ_OK); }); // Send the message list - mHnd->Send(list); + mHnd->Send(std::move(list)); // Allow chaining return *this; }