1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-02-12 07:47:12 +01:00

Further work on ZMQ bindings.

This commit is contained in:
Sandu Liviu Catalin 2021-02-02 20:31:21 +02:00
parent ba237ec49c
commit fcef50651a
2 changed files with 283 additions and 94 deletions

View File

@ -36,9 +36,16 @@ LightObj ZContext::Socket(int type) const
{ {
return LightObj(SqTypeIdentity< ZSocket >{}, SqVM(), *this, type); return LightObj(SqTypeIdentity< ZSocket >{}, SqVM(), *this, type);
} }
// ------------------------------------------------------------------------------------------------
static String SqZmqVersion()
{
int major=0, minor=0, patch=0;
zmq_version(&major, &minor, &patch);
return fmt::format("{}.{}.{}", major, minor, patch);
}
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
static void ZmqProcess() static void SqZmqProcess()
{ {
// Go over all sockets and try to update them // Go over all sockets and try to update them
for (ZSkt * inst = ZSkt::sHead; inst && inst->mNext != ZSkt::sHead; inst = inst->mNext) for (ZSkt * inst = ZSkt::sHead; inst && inst->mNext != ZSkt::sHead; inst = inst->mNext)
@ -66,7 +73,8 @@ void Register_ZMQ(HSQUIRRELVM vm)
{ {
Table ns(vm); Table ns(vm);
ns.Func(_SC("Process"), &ZmqProcess); ns.Func(_SC("Process"), &SqZmqProcess);
ns.Func(_SC("Version"), &SqZmqVersion);
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
ns.Bind(_SC("Context"), ns.Bind(_SC("Context"),
@ -89,8 +97,7 @@ void Register_ZMQ(HSQUIRRELVM vm)
Class< ZMessage, NoCopy< ZMessage > >(vm, SqZMessage::Str) Class< ZMessage, NoCopy< ZMessage > >(vm, SqZMessage::Str)
// Constructors // Constructors
.Ctor() .Ctor()
.Ctor< SQInteger >() .Ctor< StackStrF & >()
.Ctor< SQInteger, StackStrF & >()
// Meta-methods // Meta-methods
.SquirrelFunc(_SC("_typename"), &SqZMessage::Fn) .SquirrelFunc(_SC("_typename"), &SqZMessage::Fn)
// Properties // Properties
@ -101,7 +108,11 @@ void Register_ZMQ(HSQUIRRELVM vm)
.Func(_SC("Get"), &ZMessage::Get) .Func(_SC("Get"), &ZMessage::Get)
.Func(_SC("Set"), &ZMessage::Set) .Func(_SC("Set"), &ZMessage::Set)
.Func(_SC("Meta"), &ZMessage::Meta) .Func(_SC("Meta"), &ZMessage::Meta)
.Func(_SC("Copy"), &ZMessage::Copy)
.Func(_SC("ToString"), &ZMessage::ToString) .Func(_SC("ToString"), &ZMessage::ToString)
.Func(_SC("FromString"), &ZMessage::FromString)
.Func(_SC("ToBuffer"), &ZMessage::ToBuffer)
.Func(_SC("FromBuffer"), &ZMessage::FromBuffer)
); );
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
@ -114,12 +125,16 @@ void Register_ZMQ(HSQUIRRELVM vm)
// Properties // Properties
.Prop(_SC("IsNull"), &ZSocket::IsNull) .Prop(_SC("IsNull"), &ZSocket::IsNull)
// Member Methods // Member Methods
.Func(_SC("Bind"), &ZSocket::Bind) .CbFunc(_SC("OnData"), &ZSocket::OnData)
.Func(_SC("Connect"), &ZSocket::Connect) .FmtFunc(_SC("Bind"), &ZSocket::Bind)
.Func(_SC("Disconnect"), &ZSocket::Disconnect) .FmtFunc(_SC("Connect"), &ZSocket::Connect)
.FmtFunc(_SC("Disconnect"), &ZSocket::Disconnect)
.Func(_SC("Run"), &ZSocket::Run) .Func(_SC("Run"), &ZSocket::Run)
.Func(_SC("Close"), &ZSocket::Close) .Func(_SC("Close"), &ZSocket::Close)
.CbFunc(_SC("OnData"), &ZSocket::OnData) .Func(_SC("SendMessage"), &ZSocket::SendMessage)
.FmtFunc(_SC("SendString"), &ZSocket::SendString)
.Func(_SC("SendMessages"), &ZSocket::SendMessages)
.Func(_SC("SendStrings"), &ZSocket::SendStrings)
); );
RootTable(vm).Bind(_SC("SqZMQ"), ns); RootTable(vm).Bind(_SC("SqZMQ"), ns);

View File

@ -2,6 +2,7 @@
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
#include "Core/Utility.hpp" #include "Core/Utility.hpp"
#include "Library/IO/Buffer.hpp"
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
#include <mutex> #include <mutex>
@ -176,15 +177,15 @@ struct ZMsg
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* The underlying message. * The underlying message.
*/ */
std::unique_ptr< zmq_msg_t > mMsg; std::unique_ptr< zmq_msg_t > mPtr;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Default constructor. * Default constructor.
*/ */
ZMsg() ZMsg()
: mMsg(std::make_unique< zmq_msg_t >()) : mPtr(std::make_unique< zmq_msg_t >())
{ {
int r = zmq_msg_init(mMsg.get()); int r = zmq_msg_init(mPtr.get());
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
@ -196,9 +197,9 @@ struct ZMsg
* Explicit message size constructor. * Explicit message size constructor.
*/ */
explicit ZMsg(SQInteger size) explicit ZMsg(SQInteger size)
: mMsg(std::make_unique< zmq_msg_t >()) : mPtr(std::make_unique< zmq_msg_t >())
{ {
int r = zmq_msg_init_size(mMsg.get(), ClampL< SQInteger, size_t >(size)); int r = zmq_msg_init_size(mPtr.get(), ClampL< SQInteger, size_t >(size));
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
@ -210,7 +211,7 @@ struct ZMsg
* Explicit message data and size constructor. * Explicit message data and size constructor.
*/ */
ZMsg(void * data, SQInteger size, zmq_free_fn * ffn, void * hint = nullptr) ZMsg(void * data, SQInteger size, zmq_free_fn * ffn, void * hint = nullptr)
: mMsg(std::make_unique< zmq_msg_t >()) : mPtr(std::make_unique< zmq_msg_t >())
{ {
// Make sure there's data if required // Make sure there's data if required
if (size > 0 && !data) if (size > 0 && !data)
@ -218,7 +219,7 @@ struct ZMsg
STHROWF("Invalid message data"); STHROWF("Invalid message data");
} }
// Now the message can be initialized // Now the message can be initialized
int r = zmq_msg_init_data(mMsg.get(), data, ClampL< SQInteger, size_t >(size), ffn, hint); int r = zmq_msg_init_data(mPtr.get(), data, ClampL< SQInteger, size_t >(size), ffn, hint);
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
@ -230,15 +231,15 @@ struct ZMsg
* Copy constructor. * Copy constructor.
*/ */
ZMsg(const ZMsg & o) ZMsg(const ZMsg & o)
: mMsg(std::make_unique< zmq_msg_t >()) : mPtr(std::make_unique< zmq_msg_t >())
{ {
int r = zmq_msg_init(mMsg.get()); int r = zmq_msg_init(mPtr.get());
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(r)); LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(r));
} }
r = zmq_msg_copy(mMsg.get(), o.mMsg.get()); r = zmq_msg_copy(mPtr.get(), o.mPtr.get());
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
@ -256,9 +257,9 @@ struct ZMsg
*/ */
~ZMsg() ~ZMsg()
{ {
if (mMsg) if (mPtr)
{ {
zmq_msg_close(mMsg.get()); zmq_msg_close(mPtr.get());
// We don't really care if the above failed (i.e. returned EFAULT) // We don't really care if the above failed (i.e. returned EFAULT)
// We probably did it already before but we need to be sure // We probably did it already before but we need to be sure
// This is something I can live with in this under the circumstances // This is something I can live with in this under the circumstances
@ -274,9 +275,9 @@ struct ZMsg
if (this != &o) if (this != &o)
{ {
// We need a message, even if empty // We need a message, even if empty
if (!mMsg) if (!mPtr)
{ {
int r = zmq_msg_init(mMsg.get()); int r = zmq_msg_init(mPtr.get());
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
@ -284,9 +285,9 @@ struct ZMsg
} }
} }
// Do we have a message? // Do we have a message?
if (mMsg) if (mPtr)
{ {
int r = zmq_msg_copy(mMsg.get(), o.mMsg.get()); int r = zmq_msg_copy(mPtr.get(), o.mPtr.get());
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
@ -306,12 +307,12 @@ struct ZMsg
if (this != &o) if (this != &o)
{ {
// Close current message, if any // Close current message, if any
if (mMsg) if (mPtr)
{ {
zmq_msg_close(mMsg.get()); zmq_msg_close(mPtr.get());
} }
// Now the message can be moved // Now the message can be moved
mMsg = std::move(o.mMsg); mPtr = std::move(o.mPtr);
} }
return *this; return *this;
} }
@ -319,7 +320,7 @@ struct ZMsg
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Implicit conversion to const message pointer (const zmq_msg_t *) operator. * Implicit conversion to const message pointer (const zmq_msg_t *) operator.
*/ */
operator zmq_msg_t * () const noexcept { return mMsg.get(); } // NOLINT(google-explicit-constructor) operator zmq_msg_t * () const noexcept { return mPtr.get(); } // NOLINT(google-explicit-constructor)
}; };
/* ------------------------------------------------------------------------------------------------ /* ------------------------------------------------------------------------------------------------
@ -519,6 +520,38 @@ struct ZSkt : SqChainedInstances< ZSkt >
} }
} }
/* --------------------------------------------------------------------------------------------
* Queue a message to be sent through the socket.
*/
void Send(ZMsg & msg)
{
// Queue it
mInputQueue.enqueue(std::move(msg));
// Initialize it back to default
msg = ZMsg();
}
/* --------------------------------------------------------------------------------------------
* Queue a message to be sent through the socket.
*/
void Send(const SQChar * str, size_t length)
{
// Create a new message
ZMsg msg(static_cast< SQInteger >(length));
// Populate it with data
std::memcpy(zmq_msg_data(msg), str, length);
// Queue it
mInputQueue.enqueue(std::move(msg));
}
/* --------------------------------------------------------------------------------------------
* Queue a multi-part message to be sent through the socket.
*/
void Send(List & list)
{
mInputListQueue.enqueue(std::make_unique< List >(std::move(list)));
}
protected: protected:
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -600,11 +633,16 @@ protected:
*/ */
struct ZContext struct ZContext
{ {
/* --------------------------------------------------------------------------------------------
* Pointer to the interfaced context.
*/
ZCtx::Ptr mPtr;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Default constructor. * Default constructor.
*/ */
ZContext() ZContext()
: m_Ptr(std::make_shared< ZCtx >()) : mPtr(std::make_shared< ZCtx >())
{ {
} }
@ -612,7 +650,7 @@ struct ZContext
* Pointer constructor. * Pointer constructor.
*/ */
explicit ZContext(ZCtx::Ptr ptr) explicit ZContext(ZCtx::Ptr ptr)
: m_Ptr(std::move(ptr)) : mPtr(std::move(ptr))
{ {
} }
@ -646,7 +684,7 @@ struct ZContext
*/ */
void Validate() const void Validate() const
{ {
if (!m_Ptr) if (!mPtr)
{ {
STHROWF("Invalid context instance"); STHROWF("Invalid context instance");
} }
@ -655,29 +693,29 @@ struct ZContext
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a context instance is referenced and return the context. * Make sure a context instance is referenced and return the context.
*/ */
SQMOD_NODISCARD ZCtx & Valid() { Validate(); return *m_Ptr; } SQMOD_NODISCARD ZCtx & Valid() { Validate(); return *mPtr; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a context instance is referenced and return the context. * Make sure a context instance is referenced and return the context.
*/ */
SQMOD_NODISCARD const ZCtx & Valid() const { Validate(); return *m_Ptr; } SQMOD_NODISCARD const ZCtx & Valid() const { Validate(); return *mPtr; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a context instance is referenced and return the reference. * Make sure a context instance is referenced and return the reference.
*/ */
SQMOD_NODISCARD ZCtx::Ptr & ValidRef() { Validate(); return m_Ptr; } SQMOD_NODISCARD ZCtx::Ptr & ValidRef() { Validate(); return mPtr; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a context instance is referenced and return the reference. * Make sure a context instance is referenced and return the reference.
*/ */
SQMOD_NODISCARD const ZCtx::Ptr & ValidRef() const { Validate(); return m_Ptr; } SQMOD_NODISCARD const ZCtx::Ptr & ValidRef() const { Validate(); return mPtr; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Check if a context instance is referenced. * Check if a context instance is referenced.
*/ */
SQMOD_NODISCARD bool IsNull() const SQMOD_NODISCARD bool IsNull() const
{ {
return static_cast< bool >(m_Ptr); return static_cast< bool >(mPtr);
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -718,13 +756,6 @@ struct ZContext
* Helper function to create sockets. * Helper function to create sockets.
*/ */
SQMOD_NODISCARD LightObj Socket(int type) const; SQMOD_NODISCARD LightObj Socket(int type) const;
private:
/* --------------------------------------------------------------------------------------------
* Pointer to the interfaced context.
*/
ZCtx::Ptr m_Ptr;
}; };
/* ------------------------------------------------------------------------------------------------ /* ------------------------------------------------------------------------------------------------
@ -732,35 +763,27 @@ private:
*/ */
struct ZMessage struct ZMessage
{ {
/* --------------------------------------------------------------------------------------------
* Pointer to the interfaced message.
*/
ZMsg::Ptr mHnd;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Default constructor. * Default constructor.
*/ */
ZMessage() ZMessage()
: m_Ptr(std::make_shared< ZMsg >()) : mHnd()
{
}
/* --------------------------------------------------------------------------------------------
* Explicit message size constructor.
*/
explicit ZMessage(SQInteger size)
: m_Ptr(std::make_shared< ZMsg >(size))
{ {
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Explicit message data and size constructor. * Explicit message data and size constructor.
*/ */
ZMessage(SQInteger size, StackStrF & data) ZMessage(StackStrF & data)
: m_Ptr(std::make_shared< ZMsg >(size < 0 ? data.mLen : size)) : mHnd(std::make_shared< ZMsg >(data.mLen))
{ {
// Make sure the requested size is within range
if (size < 0 || size > data.mLen)
{
size = data.mLen;
}
// Copy the string in the memory buffer // Copy the string in the memory buffer
std::memcpy(zmq_msg_data(*m_Ptr), data.mPtr, static_cast< size_t >(size)); std::memcpy(zmq_msg_data(*mHnd), data.mPtr, static_cast< size_t >(data.mLen));
/* normally you'd have to do static_cast< size_t >(data.mLen) * sizeof(SQChar) */ /* normally you'd have to do static_cast< size_t >(data.mLen) * sizeof(SQChar) */
/* but this SQChar is required to be 1 byte so we don't bother with it */ /* but this SQChar is required to be 1 byte so we don't bother with it */
} }
@ -769,7 +792,7 @@ struct ZMessage
* Pointer constructor. * Pointer constructor.
*/ */
explicit ZMessage(ZMsg::Ptr ptr) explicit ZMessage(ZMsg::Ptr ptr)
: m_Ptr(std::move(ptr)) : mHnd(std::move(ptr))
{ {
} }
@ -803,7 +826,7 @@ struct ZMessage
*/ */
void Validate() const void Validate() const
{ {
if (!m_Ptr) if (!mHnd)
{ {
STHROWF("Invalid message instance"); STHROWF("Invalid message instance");
} }
@ -812,29 +835,29 @@ struct ZMessage
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a message instance is referenced and return the message. * Make sure a message instance is referenced and return the message.
*/ */
SQMOD_NODISCARD ZMsg & Valid() { Validate(); return *m_Ptr; } SQMOD_NODISCARD ZMsg & Valid() { Validate(); return *mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a message instance is referenced and return the message. * Make sure a message instance is referenced and return the message.
*/ */
SQMOD_NODISCARD const ZMsg & Valid() const { Validate(); return *m_Ptr; } SQMOD_NODISCARD const ZMsg & Valid() const { Validate(); return *mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a message instance is referenced and return the reference. * Make sure a message instance is referenced and return the reference.
*/ */
SQMOD_NODISCARD ZMsg::Ptr & ValidRef() { Validate(); return m_Ptr; } SQMOD_NODISCARD ZMsg::Ptr & ValidRef() { Validate(); return mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a message instance is referenced and return the reference. * Make sure a message instance is referenced and return the reference.
*/ */
SQMOD_NODISCARD const ZMsg::Ptr & ValidRef() const { Validate(); return m_Ptr; } SQMOD_NODISCARD const ZMsg::Ptr & ValidRef() const { Validate(); return mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Check if a context instance is referenced. * Check if a message instance is referenced.
*/ */
SQMOD_NODISCARD bool IsNull() const SQMOD_NODISCARD bool IsNull() const
{ {
return static_cast< bool >(m_Ptr); return static_cast< bool >(mHnd);
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -848,14 +871,16 @@ struct ZMessage
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Modify the value of an property. * Modify the value of an property.
*/ */
void Set(int prop, int value) ZMessage & Set(int prop, int value)
{ {
int r = zmq_msg_set(Valid(), prop, value); int r = zmq_msg_set(Valid(), prop, value);
// Validate result // Validate result
if (r != 0) if (r != 0)
{ {
STHROWF("Unable to set context option: [%d] %s", r, zmq_strerror(r)); STHROWF("Unable to set message option: [%d] %s", r, zmq_strerror(r));
} }
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -866,6 +891,21 @@ struct ZMessage
return zmq_msg_gets (Valid(), prop.mPtr); return zmq_msg_gets (Valid(), prop.mPtr);
} }
/* --------------------------------------------------------------------------------------------
* Copy another message.
*/
ZMessage & Copy(ZMessage & msg)
{
int r = zmq_msg_copy(Valid(), msg.Valid());
// Validate result
if (r != 0)
{
STHROWF("Unable to copy message: [%d] %s", r, zmq_strerror(r));
}
// Allow chaining
return *this;
}
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Indicate if there are more message parts to receive. * Indicate if there are more message parts to receive.
*/ */
@ -890,12 +930,58 @@ struct ZMessage
return LightObj(static_cast< const SQChar * >(zmq_msg_data(Valid())), GetSize()); return LightObj(static_cast< const SQChar * >(zmq_msg_data(Valid())), GetSize());
} }
private: /* --------------------------------------------------------------------------------------------
* Generate a message from a string.
*/
ZMessage & FromString(StackStrF & data)
{
// If there's no handle we make one
if (!mHnd)
{
mHnd = (std::make_shared< ZMsg >(data.mLen));
}
else
{
(*mHnd) = ZMsg(data.mLen); // Update the current one
}
// Copy the string in the memory buffer
std::memcpy(zmq_msg_data(*mHnd), data.mPtr, static_cast< size_t >(data.mLen));
// Allow chaining
return *this;
}
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Pointer to the interfaced message. * Retrieve the message data as a buffer.
*/ */
ZMsg::Ptr m_Ptr; SQMOD_NODISCARD LightObj ToBuffer() const
{
Validate();
// Create the buffer instance and return it
return LightObj(SqTypeIdentity< SqBuffer >{}, SqVM(),
Buffer(static_cast< Buffer::ConstPtr >(zmq_msg_data(*mHnd)), zmq_msg_size(*mHnd), 0));
}
/* --------------------------------------------------------------------------------------------
* Generate a message from a buffer.
*/
ZMessage & FromBuffer(SqBuffer & data)
{
data.ValidateDeeper();
// If there's no handle we make one
if (!mHnd)
{
mHnd = (std::make_shared< ZMsg >(data.GetPosition()));
}
else
{
(*mHnd) = ZMsg(data.GetPosition()); // Update the current one
}
// Copy the string in the memory buffer
std::memcpy(zmq_msg_data(*mHnd), data.GetRef()->Data(), static_cast< size_t >(data.GetPosition()));
// Allow chaining
return *this;
}
}; };
/* ------------------------------------------------------------------------------------------------ /* ------------------------------------------------------------------------------------------------
@ -903,11 +989,16 @@ private:
*/ */
struct ZSocket struct ZSocket
{ {
/* --------------------------------------------------------------------------------------------
* Pointer to the interfaced socket.
*/
ZSkt::Ptr mHnd;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Default constructor. * Default constructor.
*/ */
ZSocket(const ZContext & ctx, int type) ZSocket(const ZContext & ctx, int type)
: m_Ptr(std::make_shared< ZSkt >(ctx.Valid(), type)) : mHnd(std::make_shared< ZSkt >(ctx.Valid(), type))
{ {
} }
@ -915,7 +1006,7 @@ struct ZSocket
* Pointer constructor. * Pointer constructor.
*/ */
explicit ZSocket(ZSkt::Ptr ptr) explicit ZSocket(ZSkt::Ptr ptr)
: m_Ptr(std::move(ptr)) : mHnd(std::move(ptr))
{ {
} }
@ -952,7 +1043,7 @@ struct ZSocket
*/ */
void Validate() const void Validate() const
{ {
if (!m_Ptr) if (!mHnd)
{ {
STHROWF("Invalid socket instance"); STHROWF("Invalid socket instance");
} }
@ -961,35 +1052,45 @@ struct ZSocket
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a socket instance is referenced and return the socket. * Make sure a socket instance is referenced and return the socket.
*/ */
SQMOD_NODISCARD ZSkt & Valid() { Validate(); return *m_Ptr; } SQMOD_NODISCARD ZSkt & Valid() { Validate(); return *mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a socket instance is referenced and return the socket. * Make sure a socket instance is referenced and return the socket.
*/ */
SQMOD_NODISCARD const ZSkt & Valid() const { Validate(); return *m_Ptr; } SQMOD_NODISCARD const ZSkt & Valid() const { Validate(); return *mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a socket instance is referenced and return the reference. * Make sure a socket instance is referenced and return the reference.
*/ */
SQMOD_NODISCARD ZSkt::Ptr & ValidRef() { Validate(); return m_Ptr; } SQMOD_NODISCARD ZSkt::Ptr & ValidRef() { Validate(); return mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Make sure a socket instance is referenced and return the reference. * Make sure a socket instance is referenced and return the reference.
*/ */
SQMOD_NODISCARD const ZSkt::Ptr & ValidRef() const { Validate(); return m_Ptr; } SQMOD_NODISCARD const ZSkt::Ptr & ValidRef() const { Validate(); return mHnd; }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Check if a context instance is referenced. * Check if a socket instance is referenced.
*/ */
SQMOD_NODISCARD bool IsNull() const SQMOD_NODISCARD bool IsNull() const
{ {
return static_cast< bool >(m_Ptr); return static_cast< bool >(mHnd);
}
/* --------------------------------------------------------------------------------------------
* Callback to receive incoming messages.
*/
ZSocket & OnData(Function & cb)
{
Valid().mOnData = std::move(cb);
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Accept incoming connections on the socket. * Accept incoming connections on the socket.
*/ */
void Bind(StackStrF & ep) ZSocket & Bind(StackStrF & ep)
{ {
// Acquire exclusive access to the socket // Acquire exclusive access to the socket
std::lock_guard< std::mutex > guard(Valid().mMtx); std::lock_guard< std::mutex > guard(Valid().mMtx);
@ -1000,12 +1101,14 @@ struct ZSocket
{ {
STHROWF("Unable to bind socket: [%d] %s", r, zmq_strerror(r)); STHROWF("Unable to bind socket: [%d] %s", r, zmq_strerror(r));
} }
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Create outgoing connection from the socket * Create outgoing connection from the socket
*/ */
void Connect(StackStrF & ep) ZSocket & Connect(StackStrF & ep)
{ {
// Acquire exclusive access to the socket // Acquire exclusive access to the socket
std::lock_guard< std::mutex > guard(Valid().mMtx); std::lock_guard< std::mutex > guard(Valid().mMtx);
@ -1016,12 +1119,14 @@ struct ZSocket
{ {
STHROWF("Unable to connect socket: [%d] %s", r, zmq_strerror(r)); STHROWF("Unable to connect socket: [%d] %s", r, zmq_strerror(r));
} }
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Create outgoing connection from the socket * Create outgoing connection from the socket
*/ */
void Disconnect(StackStrF & ep) ZSocket & Disconnect(StackStrF & ep)
{ {
// Acquire exclusive access to the socket // Acquire exclusive access to the socket
std::lock_guard< std::mutex > guard(Valid().mMtx); std::lock_guard< std::mutex > guard(Valid().mMtx);
@ -1032,12 +1137,14 @@ struct ZSocket
{ {
STHROWF("Unable to disconnect socket: [%d] %s", r, zmq_strerror(r)); STHROWF("Unable to disconnect socket: [%d] %s", r, zmq_strerror(r));
} }
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Run the managed socket. * Run the managed socket.
*/ */
void Run() ZSocket & Run()
{ {
// Make sure thread exists already // Make sure thread exists already
if (Valid().mThread.joinable()) if (Valid().mThread.joinable())
@ -1045,33 +1152,100 @@ struct ZSocket
STHROWF("Socket is already running"); STHROWF("Socket is already running");
} }
// Allow the thread to run // Allow the thread to run
m_Ptr->mStatus = 1; mHnd->mStatus = 1;
// Now we can create the thread // Now we can create the thread
Valid().mThread = std::thread(&ZSkt::Proc, &Valid()); Valid().mThread = std::thread(&ZSkt::Proc, &Valid());
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Close the managed socket. * Close the managed socket.
*/ */
void Close() ZSocket & Close()
{ {
Valid().Close(); Valid().Close();
// Allow chaining
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Callback to receive incoming messages. * Send a ZMQ message to the socket.
*/ */
void OnData(Function & cb) ZSocket & SendMessage(ZMessage & msg)
{ {
Valid().mOnData = std::move(cb); Valid().Send(msg.Valid());
// Allow chaining
return *this;
} }
private: /* --------------------------------------------------------------------------------------------
* Send a string message to the socket.
*/
ZSocket & SendString(StackStrF & msg)
{
Valid().Send(msg.mPtr, msg.GetSize());
// Allow chaining
return *this;
}
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Pointer to the interfaced socket. * Send a multi-part ZMQ message to the socket.
*/ */
ZSkt::Ptr m_Ptr; ZSocket & SendMessages(Array & arr)
{
Validate();
ZSkt::List list;
// Extract the messages from the array
arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) {
// Extract the message from the stack
ZMessage * msg = ClassType< ZMessage >::GetInstance(vm, -1);
// In case we didn't fail at the null part (it should)
if (msg && msg->mHnd->mPtr)
{
// Add the message to the list
list.emplace_back(std::move((*msg->mHnd)));
// Initialize it back to default
(*msg->mHnd) = ZMsg();
}
// Continue
return SQ_OK;
});
// Send the message list
mHnd->Send(list);
// Allow chaining
return *this;
}
/* --------------------------------------------------------------------------------------------
* Send a multi-part string message to the socket.
*/
ZSocket & SendStrings(Array & arr)
{
Validate();
ZSkt::List list;
// Extract the messages from the array
arr.Foreach([&list](HSQUIRRELVM vm, SQInteger) {
StackStrF str(vm, -1);
// Extract the string from the stack
if (SQ_FAILED(str.Proc(false)))
{
return str.mRes; // Abort
}
// Create a new message
list.emplace_back(str.mLen);
// Populate it with data
std::memcpy(zmq_msg_data(list.back()), str.mPtr, static_cast< size_t >(str.mLen));
// Continue
return SQRESULT(SQ_OK);
});
// Send the message list
mHnd->Send(list);
// Allow chaining
return *this;
}
}; };