diff --git a/module/Library/ZMQ.hpp b/module/Library/ZMQ.hpp index 481ae5bc..3e0b1e2f 100644 --- a/module/Library/ZMQ.hpp +++ b/module/Library/ZMQ.hpp @@ -21,32 +21,10 @@ namespace SqMod { // ------------------------------------------------------------------------------------------------ -struct ZSkt; struct ZCtx; -struct ZSocket; +struct ZSkt; struct ZContext; - -/* ------------------------------------------------------------------------------------------------ - * Allocate raw memory for a string, fill it with data from a StackStrF instance and return it. -*/ -inline void * ZmqDataFromStackStrF(StackStrF & data) -{ - if (data.mLen) - { - // Allocate the string memory - auto * mem = new SQChar[static_cast< size_t >(data.mLen)]; - // Why not - assert(mem); - // Copy the string in the memory buffer - std::memcpy(mem, data.mPtr, static_cast< size_t >(data.mLen)); - /* normally you'd have to do static_cast< size_t >(data.mLen) * sizeof(SQChar) */ - /* but this SQChar is required to be 1 byte so we don't bother with it */ - // Yield ownership of the memory - return mem; - } - // Failed! - return nullptr; -} +struct ZSocket; /* ------------------------------------------------------------------------------------------------ * Core implementation and management for a ZMQ context. @@ -108,7 +86,7 @@ struct ZCtx // Just in case if (r != 0) { - LogFtl("Context failed to terminate properly: [%d], %s", r, zmq_strerror(errno)); + LogFtl("Context failed to terminate properly: [{}], {}", r, zmq_strerror(errno)); } } } @@ -233,7 +211,6 @@ struct ZSkt : SqChainedInstances< ZSkt > , mOutputQueue(4096), mInputQueue(4096), mInputListQueue(1024) , mOnData(), mThread(), mContext(ctx) { - using namespace std::chrono_literals; // Validate the context if (!ctx) { @@ -243,9 +220,10 @@ struct ZSkt : SqChainedInstances< ZSkt > mThread = std::thread(&ZSkt::Proc, this); // Remember this instance ChainInstance(); - // Wait for the socket to be created + // Wait for the socket to be created before we attempt to use it for (int n = 0; n < 100; ++n) { + using namespace std::chrono_literals; // Acquire exclusive access to the socket mMtx.lock(); // Was the socket created? @@ -323,8 +301,6 @@ struct ZSkt : SqChainedInstances< ZSkt > mMtx.lock(); // Create the socket in this thread mPtr = zmq_socket(*mContext, mType); - // Release exclusive access to the socket - mMtx.unlock(); // Validate the socket if (!mPtr) { @@ -332,6 +308,8 @@ struct ZSkt : SqChainedInstances< ZSkt > // Stop the thread return; } + // Release exclusive access to the socket + mMtx.unlock(); // Enter processing loop while (mRun) { @@ -354,8 +332,6 @@ struct ZSkt : SqChainedInstances< ZSkt > } // Acquire exclusive access to the socket mMtx.lock(); - // Forget about the context - mContext.reset(); // Close the socket int r = zmq_close(mPtr); // Forget about it @@ -395,6 +371,8 @@ struct ZSkt : SqChainedInstances< ZSkt > { mRun = false; // Just in case } + // Forget about the context + mContext.reset(); } /* -------------------------------------------------------------------------------------------- @@ -473,8 +451,8 @@ protected: // Could we send what the message had? if (r >= 0 && static_cast< Buffer::SzType >(r) != data->Size()) { - LogErr("Unable to send data to socket: [%d], %s", r, zmq_strerror(errno)); - // Could not send. NOTE: Should we put the buffer back into the queue? + LogErr("Unable to send data to socket: [{}], {}", r, zmq_strerror(errno)); + // NOTE: Should we put the buffer back into the queue? } // One item was found in the queue return true; @@ -502,8 +480,8 @@ protected: // Could we send what the message had? if (r >= 0 && static_cast< Buffer::SzType >(r) != (*mp_msg)[i].Size()) { - LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno)); - // Could not send. NOTE: Should we abort the whole thing? But we probably already sent some. + LogErr("Unable to send multi-part data to socket: [{}], {}", r, zmq_strerror(errno)); + // NOTE: Should we abort the whole thing? But we probably already sent some. } } // One item was found in the queue @@ -654,7 +632,7 @@ struct ZSocket /* -------------------------------------------------------------------------------------------- * Pointer to the interfaced socket. */ - ZSkt::Ptr mHnd; + ZSkt::Ptr mHnd; /* -------------------------------------------------------------------------------------------- * Default constructor. @@ -662,6 +640,10 @@ struct ZSocket ZSocket(const ZContext & ctx, int type) : mHnd(std::make_shared< ZSkt >(ctx.mHnd, type)) { + if (!(mHnd->mPtr)) + { + STHROWF("Failed to create socket"); + } } /* -------------------------------------------------------------------------------------------- @@ -917,5 +899,4 @@ struct ZSocket } }; - } // Namespace:: SqMod