1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-01-19 03:57:14 +01:00
This commit is contained in:
Sandu Liviu Catalin 2021-02-03 12:01:46 +02:00
parent 203a02cb2d
commit 5fd4a6471d

View File

@ -21,32 +21,10 @@
namespace SqMod { namespace SqMod {
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
struct ZSkt;
struct ZCtx; struct ZCtx;
struct ZSocket; struct ZSkt;
struct ZContext; struct ZContext;
struct ZSocket;
/* ------------------------------------------------------------------------------------------------
* Allocate raw memory for a string, fill it with data from a StackStrF instance and return it.
*/
inline void * ZmqDataFromStackStrF(StackStrF & data)
{
if (data.mLen)
{
// Allocate the string memory
auto * mem = new SQChar[static_cast< size_t >(data.mLen)];
// Why not
assert(mem);
// Copy the string in the memory buffer
std::memcpy(mem, data.mPtr, static_cast< size_t >(data.mLen));
/* normally you'd have to do static_cast< size_t >(data.mLen) * sizeof(SQChar) */
/* but this SQChar is required to be 1 byte so we don't bother with it */
// Yield ownership of the memory
return mem;
}
// Failed!
return nullptr;
}
/* ------------------------------------------------------------------------------------------------ /* ------------------------------------------------------------------------------------------------
* Core implementation and management for a ZMQ context. * Core implementation and management for a ZMQ context.
@ -108,7 +86,7 @@ struct ZCtx
// Just in case // Just in case
if (r != 0) if (r != 0)
{ {
LogFtl("Context failed to terminate properly: [%d], %s", r, zmq_strerror(errno)); LogFtl("Context failed to terminate properly: [{}], {}", r, zmq_strerror(errno));
} }
} }
} }
@ -233,7 +211,6 @@ struct ZSkt : SqChainedInstances< ZSkt >
, mOutputQueue(4096), mInputQueue(4096), mInputListQueue(1024) , mOutputQueue(4096), mInputQueue(4096), mInputListQueue(1024)
, mOnData(), mThread(), mContext(ctx) , mOnData(), mThread(), mContext(ctx)
{ {
using namespace std::chrono_literals;
// Validate the context // Validate the context
if (!ctx) if (!ctx)
{ {
@ -243,9 +220,10 @@ struct ZSkt : SqChainedInstances< ZSkt >
mThread = std::thread(&ZSkt::Proc, this); mThread = std::thread(&ZSkt::Proc, this);
// Remember this instance // Remember this instance
ChainInstance(); ChainInstance();
// Wait for the socket to be created // Wait for the socket to be created before we attempt to use it
for (int n = 0; n < 100; ++n) for (int n = 0; n < 100; ++n)
{ {
using namespace std::chrono_literals;
// Acquire exclusive access to the socket // Acquire exclusive access to the socket
mMtx.lock(); mMtx.lock();
// Was the socket created? // Was the socket created?
@ -323,8 +301,6 @@ struct ZSkt : SqChainedInstances< ZSkt >
mMtx.lock(); mMtx.lock();
// Create the socket in this thread // Create the socket in this thread
mPtr = zmq_socket(*mContext, mType); mPtr = zmq_socket(*mContext, mType);
// Release exclusive access to the socket
mMtx.unlock();
// Validate the socket // Validate the socket
if (!mPtr) if (!mPtr)
{ {
@ -332,6 +308,8 @@ struct ZSkt : SqChainedInstances< ZSkt >
// Stop the thread // Stop the thread
return; return;
} }
// Release exclusive access to the socket
mMtx.unlock();
// Enter processing loop // Enter processing loop
while (mRun) while (mRun)
{ {
@ -354,8 +332,6 @@ struct ZSkt : SqChainedInstances< ZSkt >
} }
// Acquire exclusive access to the socket // Acquire exclusive access to the socket
mMtx.lock(); mMtx.lock();
// Forget about the context
mContext.reset();
// Close the socket // Close the socket
int r = zmq_close(mPtr); int r = zmq_close(mPtr);
// Forget about it // Forget about it
@ -395,6 +371,8 @@ struct ZSkt : SqChainedInstances< ZSkt >
{ {
mRun = false; // Just in case mRun = false; // Just in case
} }
// Forget about the context
mContext.reset();
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -473,8 +451,8 @@ protected:
// Could we send what the message had? // Could we send what the message had?
if (r >= 0 && static_cast< Buffer::SzType >(r) != data->Size()) if (r >= 0 && static_cast< Buffer::SzType >(r) != data->Size())
{ {
LogErr("Unable to send data to socket: [%d], %s", r, zmq_strerror(errno)); LogErr("Unable to send data to socket: [{}], {}", r, zmq_strerror(errno));
// Could not send. NOTE: Should we put the buffer back into the queue? // NOTE: Should we put the buffer back into the queue?
} }
// One item was found in the queue // One item was found in the queue
return true; return true;
@ -502,8 +480,8 @@ protected:
// Could we send what the message had? // Could we send what the message had?
if (r >= 0 && static_cast< Buffer::SzType >(r) != (*mp_msg)[i].Size()) if (r >= 0 && static_cast< Buffer::SzType >(r) != (*mp_msg)[i].Size())
{ {
LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno)); LogErr("Unable to send multi-part data to socket: [{}], {}", r, zmq_strerror(errno));
// Could not send. NOTE: Should we abort the whole thing? But we probably already sent some. // NOTE: Should we abort the whole thing? But we probably already sent some.
} }
} }
// One item was found in the queue // One item was found in the queue
@ -654,7 +632,7 @@ struct ZSocket
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Pointer to the interfaced socket. * Pointer to the interfaced socket.
*/ */
ZSkt::Ptr mHnd; ZSkt::Ptr mHnd;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Default constructor. * Default constructor.
@ -662,6 +640,10 @@ struct ZSocket
ZSocket(const ZContext & ctx, int type) ZSocket(const ZContext & ctx, int type)
: mHnd(std::make_shared< ZSkt >(ctx.mHnd, type)) : mHnd(std::make_shared< ZSkt >(ctx.mHnd, type))
{ {
if (!(mHnd->mPtr))
{
STHROWF("Failed to create socket");
}
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -917,5 +899,4 @@ struct ZSocket
} }
}; };
} // Namespace:: SqMod } // Namespace:: SqMod