1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2024-11-08 08:47:17 +01:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Sandu Liviu Catalin
cacc6c7c62 Minor WS changes and a few helpers. 2021-09-05 18:15:02 +03:00
Sandu Liviu Catalin
11c17189b3 Update Net.hpp 2021-09-05 13:40:21 +03:00
Sandu Liviu Catalin
1a11dd777e Make WS non blocking and handle timeout.
Implement a few helper methods as well.
2021-09-05 13:27:19 +03:00
3 changed files with 173 additions and 33 deletions

View File

@ -48,6 +48,7 @@ extern void TerminatePrivileges();
extern void TerminateRoutines(); extern void TerminateRoutines();
extern void TerminateCommands(); extern void TerminateCommands();
extern void TerminateSignals(); extern void TerminateSignals();
extern void TerminatePocoNet();
extern void TerminatePocoData(); extern void TerminatePocoData();
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
@ -548,6 +549,7 @@ void Core::Terminate(bool shutdown)
AnnounceTerminate(); AnnounceTerminate();
cLogDbg(m_Verbosity >= 1, "Announcer terminated"); cLogDbg(m_Verbosity >= 1, "Announcer terminated");
// Release Poco statement results // Release Poco statement results
TerminatePocoNet();
TerminatePocoData(); TerminatePocoData();
cLogDbg(m_Verbosity >= 1, "Poco terminated"); cLogDbg(m_Verbosity >= 1, "Poco terminated");
// Release ZMQ sockets // Release ZMQ sockets

View File

@ -10,6 +10,12 @@ namespace SqMod {
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
SQMOD_DECL_TYPENAME(SqWsClient, _SC("SqWsClient")) SQMOD_DECL_TYPENAME(SqWsClient, _SC("SqWsClient"))
// ------------------------------------------------------------------------------------------------
void TerminatePocoNet()
{
}
// ================================================================================================ // ================================================================================================
void Register_POCO_Net(HSQUIRRELVM vm, Table &) void Register_POCO_Net(HSQUIRRELVM vm, Table &)
{ {
@ -25,14 +31,20 @@ void Register_POCO_Net(HSQUIRRELVM vm, Table &)
.Var(_SC("Flags"), &WsClient::mFlags) .Var(_SC("Flags"), &WsClient::mFlags)
.Var(_SC("State"), &WsClient::mState) .Var(_SC("State"), &WsClient::mState)
// Properties // Properties
.Prop(_SC("Tag"), &WsClient::GetTag, &WsClient::SetTag)
.Prop(_SC("Data"), &WsClient::GetData, &WsClient::SetData)
.Prop(_SC("MaxPayloadSize"), &WsClient::GetMaxPayloadSize, &WsClient::SetMaxPayloadSize) .Prop(_SC("MaxPayloadSize"), &WsClient::GetMaxPayloadSize, &WsClient::SetMaxPayloadSize)
// Member Methods // Member Methods
.FmtFunc(_SC("SetTag"), &WsClient::ApplyTag)
.FmtFunc(_SC("SetData"), &WsClient::ApplyData)
.Func(_SC("Shutdown"), &WsClient::Shutdown) .Func(_SC("Shutdown"), &WsClient::Shutdown)
.FmtFunc(_SC("ShutdownWith"), &WsClient::ShutdownWith) .FmtFunc(_SC("ShutdownWith"), &WsClient::ShutdownWith)
.Func(_SC("SendFrame"), &WsClient::SendFrame) .Func(_SC("SendFrame"), &WsClient::SendFrame)
.FmtFunc(_SC("SendStringFrame"), &WsClient::SendStringFrame) .FmtFunc(_SC("SendStringFrame"), &WsClient::SendStringFrame)
.Func(_SC("RecvFrame"), &WsClient::RecvFrame) .Func(_SC("RecvFrame"), &WsClient::RecvFrame)
.Func(_SC("RecvStringFrame"), &WsClient::RecvStringFrame) .Func(_SC("RecvStringFrame"), &WsClient::RecvStringFrame)
.CbFunc(_SC("RecvFrameIn"), &WsClient::RecvFrameIn)
.CbFunc(_SC("RecvStringFrameIn"), &WsClient::RecvStringFrameIn)
); );
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
RootTable(vm).Bind(_SC("SqNet"), ns); RootTable(vm).Bind(_SC("SqNet"), ns);

View File

@ -6,12 +6,21 @@
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
#include <vector> #include <vector>
#include <utility>
#include <algorithm>
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
#include <Poco/Thread.h>
#include <Poco/AutoPtr.h>
#include <Poco/Runnable.h>
#include <Poco/Observer.h>
#include <Poco/NObserver.h>
#include <Poco/Net/HTTPRequest.h> #include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h> #include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPMessage.h> #include <Poco/Net/HTTPMessage.h>
#include <Poco/Net/HTTPClientSession.h> #include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/SocketAcceptor.h>
#include <Poco/Net/SocketReactor.h>
#include <Poco/Net/WebSocket.h> #include <Poco/Net/WebSocket.h>
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
@ -22,6 +31,34 @@ namespace SqMod {
*/ */
struct WsClient struct WsClient
{ {
/* --------------------------------------------------------------------------------------------
* Flags received in the last call to Recv[String]Frame() (will be overwritten on next call).
*/
int mFlags{0};
/* --------------------------------------------------------------------------------------------
* Return value from the last call to Recv[String]Frame() (will be overwritten on next call).
* A return value of 0, with flags also 0, means that the peer has shut down or closed the connection.
* A return value of 0, with non-zero flags, indicates an reception of an empty frame (e.g., in case of a PING).
*/
int mState{0};
/* --------------------------------------------------------------------------------------------
* Receiving buffer instance.
*/
Poco::Buffer< char > mBuffer;
/* --------------------------------------------------------------------------------------------
* User tag associated with this instance.
*/
String mTag;
/* --------------------------------------------------------------------------------------------
* User data associated with this instance.
*/
LightObj mData;
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* HTTP client session instance. * HTTP client session instance.
*/ */
@ -40,63 +77,110 @@ struct WsClient
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* WebSocket instance. * WebSocket instance.
*/ */
Poco::Net::WebSocket mWebSocket; Poco::Net::WebSocket mSocket;
/* --------------------------------------------------------------------------------------------
* Receiving buffer instance.
*/
Poco::Buffer< char > mBuffer;
/* --------------------------------------------------------------------------------------------
* Flags received in the last call to Recv[String]Frame() (will be overwritten on next call).
*/
int mFlags{0};
/* --------------------------------------------------------------------------------------------
* Return value from the last call to Recv[String]Frame() (will be overwritten on next call).
* A return value of 0, with flags also 0, means that the peer has shut down or closed the connection.
* A return value of 0, with non-zero flags, indicates an reception of an empty frame (e.g., in case of a PING).
*/
int mState{0};
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Base constructor. * Base constructor.
*/ */
WsClient(StackStrF & host, uint16_t port, StackStrF & uri) WsClient(StackStrF & host, uint16_t port, StackStrF & uri)
: mClient(host.ToStr(), port), : mFlags(0), mState(0), mBuffer(0), mTag(), mData()
mRequest(Poco::Net::HTTPRequest::HTTP_GET, uri.ToStr(), Poco::Net::HTTPRequest::HTTP_1_1), mResponse(), , mClient(host.ToStr(), port)
mWebSocket(mClient, mRequest, mResponse), mBuffer(0), mFlags(0), mState(0) , mRequest(Poco::Net::HTTPRequest::HTTP_GET, uri.ToStr(), Poco::Net::HTTPRequest::HTTP_1_1)
, mResponse()
, mSocket(mClient, mRequest, mResponse)
{ {
mSocket.setBlocking(false); // Disable blocking
}
/* --------------------------------------------------------------------------------------------
* Retrieve the associated user tag.
*/
SQMOD_NODISCARD const String & GetTag() const
{
return mTag;
}
/* --------------------------------------------------------------------------------------------
* Modify the associated user tag.
*/
void SetTag(StackStrF & tag)
{
if (tag.mLen > 0)
{
mTag.assign(tag.mPtr, static_cast< size_t >(tag.mLen));
}
else
{
mTag.clear();
}
}
/* --------------------------------------------------------------------------------------------
* Modify the associated user tag.
*/
WsClient & ApplyTag(StackStrF & tag)
{
SetTag(tag);
return *this;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the associated user data.
*/
SQMOD_NODISCARD LightObj & GetData()
{
return mData;
}
/* --------------------------------------------------------------------------------------------
* Modify the associated user data.
*/
void SetData(LightObj & data)
{
mData = data;
}
/* --------------------------------------------------------------------------------------------
* Modify the associated user data.
*/
WsClient & ApplyData(LightObj & data)
{
mData = data;
return *this;
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Sends a Close control frame to the server end of the connection to initiate an orderly shutdown of the connection. * Sends a Close control frame to the server end of the connection to initiate an orderly shutdown of the connection.
*/ */
void Shutdown() { void Shutdown()
mWebSocket.shutdown(); {
mSocket.shutdown();
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Sends a Close control frame to the server end of the connection to initiate an orderly shutdown of the connection. * Sends a Close control frame to the server end of the connection to initiate an orderly shutdown of the connection.
*/ */
void ShutdownWith(SQInteger code, StackStrF & msg) { void ShutdownWith(SQInteger code, StackStrF & msg)
mWebSocket.shutdown(static_cast< uint16_t >(code), msg.ToStr()); {
mSocket.shutdown(static_cast< uint16_t >(code), msg.ToStr());
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Sends the contents of the given buffer through the socket as a single frame. * Sends the contents of the given buffer through the socket as a single frame.
* Returns the number of bytes sent, which may be less than the number of bytes specified. * Returns the number of bytes sent, which may be less than the number of bytes specified.
*/ */
SQInteger SendFrame(SqBuffer & buf, SQInteger flags) { SQInteger SendFrame(SqBuffer & buf, SQInteger flags)
return mWebSocket.sendFrame(buf.Valid().Data(), static_cast< int >(buf.Valid().Position()), static_cast< int >(flags)); {
return mSocket.sendFrame(buf.Valid().Data(), static_cast< int >(buf.Valid().Position()), static_cast< int >(flags));
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Sends the contents of the given string through the socket as a single frame. * Sends the contents of the given string through the socket as a single frame.
* Returns the number of bytes sent, which may be less than the number of bytes specified. * Returns the number of bytes sent, which may be less than the number of bytes specified.
*/ */
SQInteger SendStringFrame(SQInteger flags, StackStrF & str) { SQInteger SendStringFrame(SQInteger flags, StackStrF & str)
return mWebSocket.sendFrame(str.mPtr, static_cast< int >(str.mLen), static_cast< int >(flags)); {
return mSocket.sendFrame(str.mPtr, static_cast< int >(str.mLen), static_cast< int >(flags));
} }
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
@ -106,7 +190,12 @@ struct WsClient
LightObj RecvFrame() LightObj RecvFrame()
{ {
// Attempt to receive data // Attempt to receive data
mState = mWebSocket.receiveFrame(mBuffer, mFlags); try {
mState = mSocket.receiveFrame(mBuffer, mFlags);
} catch (const Poco::TimeoutException &) {
mState = mFlags = 0; // Make sure these don't indicate otherwise
return LightObj{}; // We handle timeout so we can be non blocking
}
// If something was returned // If something was returned
if (mState != 0) if (mState != 0)
{ {
@ -131,7 +220,12 @@ struct WsClient
LightObj RecvStringFrame() LightObj RecvStringFrame()
{ {
// Attempt to receive data // Attempt to receive data
mState = mWebSocket.receiveFrame(mBuffer, mFlags); try {
mState = mSocket.receiveFrame(mBuffer, mFlags);
} catch (const Poco::TimeoutException &) {
mState = mFlags = 0; // Make sure these don't indicate otherwise
return LightObj{}; // We handle timeout so we can be non blocking
}
// If something was returned // If something was returned
if (mState != 0) if (mState != 0)
{ {
@ -146,12 +240,44 @@ struct WsClient
return LightObj{}; return LightObj{};
} }
/* --------------------------------------------------------------------------------------------
* Receives a frame from the socket and return it as a buffer. Only invokes callback if response is valid.
* The frame's payload size must not exceed the maximum payload size set with SetMaxPayloadSize().
*/
SQInteger RecvFrameIn(Function & cb)
{
auto obj = RecvFrame();
// Only invoke the callback if we have a valid response
if (mState != 0 || mFlags != 0)
{
cb(obj, mState, mFlags);
}
// Return result
return mState;
}
/* --------------------------------------------------------------------------------------------
* Receives a frame from the socket and return it as a string. Only invokes callback if response is valid.
* The frame's payload size must not exceed the maximum payload size set with SetMaxPayloadSize().
*/
SQInteger RecvStringFrameIn(Function & cb)
{
auto obj = RecvStringFrame();
// Only invoke the callback if we have data response
if (mState != 0 || mFlags != 0)
{
cb(obj, mState, mFlags);
}
// Return result
return mState;
}
/* -------------------------------------------------------------------------------------------- /* --------------------------------------------------------------------------------------------
* Sets the maximum payload size for RecvFrame(). The default is std::numeric_limits<int>::max(). * Sets the maximum payload size for RecvFrame(). The default is std::numeric_limits<int>::max().
*/ */
WsClient & SetMaxPayloadSize(SQInteger size) WsClient & SetMaxPayloadSize(SQInteger size)
{ {
mWebSocket.setMaxPayloadSize(static_cast< int >(size)); mSocket.setMaxPayloadSize(static_cast< int >(size));
return *this; return *this;
} }
@ -160,7 +286,7 @@ struct WsClient
*/ */
SQMOD_NODISCARD SQInteger GetMaxPayloadSize() const SQMOD_NODISCARD SQInteger GetMaxPayloadSize() const
{ {
return mWebSocket.getMaxPayloadSize(); return mSocket.getMaxPayloadSize();
} }
}; };