From 266e06e870f74504b4902ad275237fbe0d94ade5 Mon Sep 17 00:00:00 2001 From: Sandu Liviu Catalin Date: Tue, 2 Feb 2021 21:36:07 +0200 Subject: [PATCH] Update error retrieval and expose more functionality, --- module/Library/ZMQ.cpp | 269 +++++++++++++++++++++++++++++++++++++++++ module/Library/ZMQ.hpp | 48 +++++--- 2 files changed, 298 insertions(+), 19 deletions(-) diff --git a/module/Library/ZMQ.cpp b/module/Library/ZMQ.cpp index 2df451a9..2edbf4de 100644 --- a/module/Library/ZMQ.cpp +++ b/module/Library/ZMQ.cpp @@ -36,6 +36,273 @@ LightObj ZContext::Socket(int type) const { return LightObj(SqTypeIdentity< ZSocket >{}, SqVM(), *this, type); } +// ------------------------------------------------------------------------------------------------ +LightObj ZSocket::GetOpt(int opt) const +{ + int r = 0; + // Identify option + switch (opt) + { + // int + case ZMQ_BACKLOG: + case ZMQ_CONFLATE: + case ZMQ_CONNECT_TIMEOUT: + case ZMQ_CURVE_SERVER: + case ZMQ_GSSAPI_PLAINTEXT: + case ZMQ_GSSAPI_SERVER: + case ZMQ_GSSAPI_PRINCIPAL_NAMETYPE: + case ZMQ_HANDSHAKE_IVL: + case ZMQ_HEARTBEAT_IVL: + case ZMQ_HEARTBEAT_TIMEOUT: + case ZMQ_HEARTBEAT_TTL: + case ZMQ_IMMEDIATE: + case ZMQ_INVERT_MATCHING: + case ZMQ_IPV6: + case ZMQ_LINGER: + case ZMQ_MULTICAST_HOPS: + case ZMQ_MULTICAST_MAXTPDU: + case ZMQ_PLAIN_SERVER: + case ZMQ_USE_FD: + case ZMQ_PROBE_ROUTER: + case ZMQ_RATE: + case ZMQ_RCVBUF: + case ZMQ_RCVHWM: + case ZMQ_RCVTIMEO: + case ZMQ_RECONNECT_IVL: + case ZMQ_RECONNECT_IVL_MAX: + case ZMQ_RECOVERY_IVL: + case ZMQ_REQ_CORRELATE: + case ZMQ_REQ_RELAXED: + case ZMQ_ROUTER_HANDOVER: + case ZMQ_ROUTER_MANDATORY: + case ZMQ_ROUTER_RAW: + case ZMQ_SNDBUF: + case ZMQ_SNDHWM: + case ZMQ_SNDTIMEO: + case ZMQ_STREAM_NOTIFY: + case ZMQ_TCP_KEEPALIVE: + case ZMQ_TCP_KEEPALIVE_CNT: + case ZMQ_TCP_KEEPALIVE_IDLE: + case ZMQ_TCP_KEEPALIVE_INTVL: + case ZMQ_TCP_MAXRT: + case ZMQ_TOS: + case ZMQ_XPUB_VERBOSE: + case ZMQ_XPUB_VERBOSER: + case ZMQ_XPUB_MANUAL: + case ZMQ_XPUB_NODROP: + case ZMQ_IPV4ONLY: + case ZMQ_VMCI_CONNECT_TIMEOUT: { + int out; + size_t len = sizeof(out); + r = zmq_getsockopt(Valid(), opt, &out, &len); + // Validate and return it + if (r == 0) + { + sq_pushinteger(SqVM(), out); + LightObj o(-1); + sq_poptop(SqVM()); + return o; + } + } break; + // character string + case ZMQ_BINDTODEVICE: + case ZMQ_GSSAPI_PRINCIPAL: + case ZMQ_GSSAPI_SERVICE_PRINCIPAL: + case ZMQ_PLAIN_PASSWORD: + case ZMQ_PLAIN_USERNAME: + case ZMQ_SOCKS_PROXY: + case ZMQ_ZAP_DOMAIN: { + SQChar out[1024]; // Let's hope this is reasonable. I really don't care much about this feature. + size_t len = sizeof(out); + r = zmq_getsockopt(Valid(), opt, &out, &len); + // Validate and return it + if (r == 0) + { + return LightObj(out, static_cast< SQInteger >(len)); + } + } break; + // uint64_t + case ZMQ_AFFINITY: + case ZMQ_VMCI_BUFFER_SIZE: + case ZMQ_VMCI_BUFFER_MIN_SIZE: + case ZMQ_VMCI_BUFFER_MAX_SIZE: { + uint64_t out; + size_t len = sizeof(out); + r = zmq_getsockopt(Valid(), opt, &out, &len); + // Validate and return it + if (r == 0) + { + sq_pushinteger(SqVM(), static_cast< SQInteger >(out)); + LightObj o(-1); + sq_poptop(SqVM()); + return o; + } + } break; + // int64_t + case ZMQ_MAXMSGSIZE: { + int64_t out; + size_t len = sizeof(out); + r = zmq_getsockopt(Valid(), opt, &out, &len); + // Validate and return it + if (r == 0) + { + sq_pushinteger(SqVM(), static_cast< SQInteger >(out)); + LightObj o(-1); + sq_poptop(SqVM()); + return o; + } + } break; + // binary data + case ZMQ_CONNECT_ROUTING_ID: + case ZMQ_CURVE_PUBLICKEY: + case ZMQ_CURVE_SECRETKEY: + case ZMQ_CURVE_SERVERKEY: + case ZMQ_ROUTING_ID: + case ZMQ_SUBSCRIBE: + case ZMQ_UNSUBSCRIBE: + case ZMQ_XPUB_WELCOME_MSG: + case ZMQ_TCP_ACCEPT_FILTER: { + Buffer out(4096); // Let's hope this is reasonable. I really don't care much about this feature. + size_t len = out.Size(); + r = zmq_getsockopt(Valid(), opt, &out, &len); + out.Move(static_cast< SQInteger >(len)); + // Validate and return it + if (r == 0) + { + return LightObj(SqTypeIdentity< SqBuffer >{}, SqVM(), std::move(out)); + } + } break; + default: STHROWF("Unknown socket option"); + } + // Validate result + if (r != 0) + { + STHROWF("Unable to retrieve socket option: [%d] %s", r, zmq_strerror(errno)); + } + SQ_UNREACHABLE; + // Never reaches here + return LightObj(); +} + +// ------------------------------------------------------------------------------------------------ +void ZSocket::SetOpt(int opt, LightObj & value) +{ + int r = 0; + // Identify option + switch (opt) + { + // int + case ZMQ_BACKLOG: + case ZMQ_CONFLATE: + case ZMQ_CONNECT_TIMEOUT: + case ZMQ_CURVE_SERVER: + case ZMQ_GSSAPI_PLAINTEXT: + case ZMQ_GSSAPI_SERVER: + case ZMQ_GSSAPI_PRINCIPAL_NAMETYPE: + case ZMQ_HANDSHAKE_IVL: + case ZMQ_HEARTBEAT_IVL: + case ZMQ_HEARTBEAT_TIMEOUT: + case ZMQ_HEARTBEAT_TTL: + case ZMQ_IMMEDIATE: + case ZMQ_INVERT_MATCHING: + case ZMQ_IPV6: + case ZMQ_LINGER: + case ZMQ_MULTICAST_HOPS: + case ZMQ_MULTICAST_MAXTPDU: + case ZMQ_PLAIN_SERVER: + case ZMQ_USE_FD: + case ZMQ_PROBE_ROUTER: + case ZMQ_RATE: + case ZMQ_RCVBUF: + case ZMQ_RCVHWM: + case ZMQ_RCVTIMEO: + case ZMQ_RECONNECT_IVL: + case ZMQ_RECONNECT_IVL_MAX: + case ZMQ_RECOVERY_IVL: + case ZMQ_REQ_CORRELATE: + case ZMQ_REQ_RELAXED: + case ZMQ_ROUTER_HANDOVER: + case ZMQ_ROUTER_MANDATORY: + case ZMQ_ROUTER_RAW: + case ZMQ_SNDBUF: + case ZMQ_SNDHWM: + case ZMQ_SNDTIMEO: + case ZMQ_STREAM_NOTIFY: + case ZMQ_TCP_KEEPALIVE: + case ZMQ_TCP_KEEPALIVE_CNT: + case ZMQ_TCP_KEEPALIVE_IDLE: + case ZMQ_TCP_KEEPALIVE_INTVL: + case ZMQ_TCP_MAXRT: + case ZMQ_TOS: + case ZMQ_XPUB_VERBOSE: + case ZMQ_XPUB_VERBOSER: + case ZMQ_XPUB_MANUAL: + case ZMQ_XPUB_NODROP: + case ZMQ_IPV4ONLY: + case ZMQ_VMCI_CONNECT_TIMEOUT: { + auto in = value.Cast< int >(); + r = zmq_setsockopt(Valid(), opt, &in, sizeof(in)); + } break; + // character string + case ZMQ_BINDTODEVICE: + case ZMQ_GSSAPI_PRINCIPAL: + case ZMQ_GSSAPI_SERVICE_PRINCIPAL: + case ZMQ_PLAIN_PASSWORD: + case ZMQ_PLAIN_USERNAME: + case ZMQ_SOCKS_PROXY: + case ZMQ_ZAP_DOMAIN: { + Var::push(SqVM(), value); + StackStrF str(SqVM(), -1); + if (SQ_SUCCEEDED(str.Proc(false))) + { + r = zmq_setsockopt(Valid(), opt, str.mPtr, str.GetSize()); + } else r = -1; + sq_poptop(SqVM()); + } break; + // uint64_t + case ZMQ_AFFINITY: + case ZMQ_VMCI_BUFFER_SIZE: + case ZMQ_VMCI_BUFFER_MIN_SIZE: + case ZMQ_VMCI_BUFFER_MAX_SIZE: { + auto in = value.Cast< uint64_t >(); + r = zmq_setsockopt(Valid(), opt, &in, sizeof(in)); + } break; + // int64_t + case ZMQ_MAXMSGSIZE: { + auto in = value.Cast< int64_t >(); + r = zmq_setsockopt(Valid(), opt, &in, sizeof(in)); + } break; + // binary data + case ZMQ_CONNECT_ROUTING_ID: + case ZMQ_CURVE_PUBLICKEY: + case ZMQ_CURVE_SECRETKEY: + case ZMQ_CURVE_SERVERKEY: + case ZMQ_ROUTING_ID: + case ZMQ_SUBSCRIBE: + case ZMQ_UNSUBSCRIBE: + case ZMQ_XPUB_WELCOME_MSG: + case ZMQ_TCP_ACCEPT_FILTER: { + if (value.GetTypeTag() != StaticClassTypeTag< SqBuffer >::Get()) + { + STHROWF("Invalid buffer value"); + } + auto * inst = value.CastI< SqBuffer >(); + if (inst) + { + r = zmq_setsockopt(Valid(), opt, inst->GetRef()->Data(), static_cast< size_t >(inst->GetPosition())); + } else r = -1; + } break; + default: STHROWF("Unknown socket option"); + } + // Validate result + if (r != 0) + { + STHROWF("Unable to modify socket option: [%d] %s", r, zmq_strerror(errno)); + } + // Never reaches here + SQ_UNREACHABLE; +} + // ------------------------------------------------------------------------------------------------ static String SqZmqVersion() { @@ -135,6 +402,8 @@ void Register_ZMQ(HSQUIRRELVM vm) .FmtFunc(_SC("SendString"), &ZSocket::SendString) .Func(_SC("SendMessages"), &ZSocket::SendMessages) .Func(_SC("SendStrings"), &ZSocket::SendStrings) + .Func(_SC("GetOpt"), &ZSocket::GetOpt) + .Func(_SC("SetOpt"), &ZSocket::SetOpt) ); RootTable(vm).Bind(_SC("SqZMQ"), ns); diff --git a/module/Library/ZMQ.hpp b/module/Library/ZMQ.hpp index 267ddaee..8d3fe26e 100644 --- a/module/Library/ZMQ.hpp +++ b/module/Library/ZMQ.hpp @@ -137,7 +137,7 @@ struct ZCtx // Just in case if (r != 0) { - LogFtl("Context failed to terminate properly: [%d], %s", r, zmq_strerror(r)); + LogFtl("Context failed to terminate properly: [%d], %s", r, zmq_strerror(errno)); } } } @@ -189,7 +189,7 @@ struct ZMsg // Validate result if (r != 0) { - STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); } } @@ -203,7 +203,7 @@ struct ZMsg // Validate result if (r != 0) { - STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); } } @@ -223,7 +223,7 @@ struct ZMsg // Validate result if (r != 0) { - STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); } } @@ -237,13 +237,13 @@ struct ZMsg // Validate result if (r != 0) { - LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(r)); + LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); } r = zmq_msg_copy(mPtr.get(), o.mPtr.get()); // Validate result if (r != 0) { - LogFtl("Unable to copy message: [%d] %s", r, zmq_strerror(r)); + LogFtl("Unable to copy message: [%d] %s", r, zmq_strerror(errno)); } } @@ -281,7 +281,7 @@ struct ZMsg // Validate result if (r != 0) { - LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(r)); + LogFtl("Unable to initialize message: [%d] %s", r, zmq_strerror(errno)); } } // Do we have a message? @@ -291,7 +291,7 @@ struct ZMsg // Validate result if (r != 0) { - LogFtl("Unable to copy message: [%d] %s", r, zmq_strerror(r)); + LogFtl("Unable to copy message: [%d] %s", r, zmq_strerror(errno)); } } } @@ -433,7 +433,7 @@ struct ZSkt : SqChainedInstances< ZSkt > // Just in case if (r != 0) { - LogFtl("Socket failed to close properly: [%d], %s", r, zmq_strerror(r)); + LogFtl("Socket failed to close properly: [%d], %s", r, zmq_strerror(errno)); } } // Forget about this instance @@ -515,7 +515,7 @@ struct ZSkt : SqChainedInstances< ZSkt > // Validate result if (r != 0) { - STHROWF("Unable to close socket: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to close socket: [%d] %s", r, zmq_strerror(errno)); } } } @@ -585,7 +585,7 @@ protected: // Could we send what the message had? if (r != zmq_msg_size(msg)) { - LogErr("Unable to send data to socket: [%d], %s", r, zmq_strerror(r)); + LogErr("Unable to send data to socket: [%d], %s", r, zmq_strerror(errno)); } // One item was found in the queue return true; @@ -615,7 +615,7 @@ protected: // Could we send what the message had? if (r != zmq_msg_size(msg)) { - LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(r)); + LogErr("Unable to send multi-part data to socket: [%d], %s", r, zmq_strerror(errno)); } } // One item was found in the queue @@ -735,7 +735,7 @@ struct ZContext // Validate result if (r != 0) { - STHROWF("Unable to set context option: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to set context option: [%d] %s", r, zmq_strerror(errno)); } } @@ -748,7 +748,7 @@ struct ZContext // Validate result if (r != 0) { - STHROWF("Unable to shutdown context: %s", zmq_strerror(r)); + STHROWF("Unable to shutdown context: %s", zmq_strerror(errno)); } } @@ -877,7 +877,7 @@ struct ZMessage // Validate result if (r != 0) { - STHROWF("Unable to set message option: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to set message option: [%d] %s", r, zmq_strerror(errno)); } // Allow chaining return *this; @@ -900,7 +900,7 @@ struct ZMessage // Validate result if (r != 0) { - STHROWF("Unable to copy message: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to copy message: [%d] %s", r, zmq_strerror(errno)); } // Allow chaining return *this; @@ -1077,6 +1077,16 @@ struct ZSocket return static_cast< bool >(mHnd); } + /* -------------------------------------------------------------------------------------------- + * Retrieve the value of a socket option. + */ + SQMOD_NODISCARD LightObj GetOpt(int opt) const; + + /* -------------------------------------------------------------------------------------------- + * Modify the value of a socket option. + */ + void SetOpt(int opt, LightObj & value); + /* -------------------------------------------------------------------------------------------- * Callback to receive incoming messages. */ @@ -1099,7 +1109,7 @@ struct ZSocket // Validate result if (r != 0) { - STHROWF("Unable to bind socket: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to bind socket: [%d] %s", r, zmq_strerror(errno)); } // Allow chaining return *this; @@ -1117,7 +1127,7 @@ struct ZSocket // Validate result if (r != 0) { - STHROWF("Unable to connect socket: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to connect socket: [%d] %s", r, zmq_strerror(errno)); } // Allow chaining return *this; @@ -1135,7 +1145,7 @@ struct ZSocket // Validate result if (r != 0) { - STHROWF("Unable to disconnect socket: [%d] %s", r, zmq_strerror(r)); + STHROWF("Unable to disconnect socket: [%d] %s", r, zmq_strerror(errno)); } // Allow chaining return *this;