1
0
mirror of https://github.com/VCMP-SqMod/SqMod.git synced 2025-01-18 19:47:15 +01:00

Builtin announcer.

This commit is contained in:
Sandu Liviu Catalin 2021-02-03 15:32:12 +02:00
parent a28d1799c5
commit 89bc0a5a3b
5 changed files with 521 additions and 0 deletions

View File

@ -73,6 +73,7 @@ add_library(SqModule MODULE SqBase.hpp Main.cpp
Library/System/Env.cpp Library/System/Env.hpp
Library/System/Path.cpp Library/System/Path.hpp
Library/Utils.cpp Library/Utils.hpp
Library/Utils/Announce.cpp Library/Utils/Announce.hpp
Library/Utils/Map.cpp Library/Utils/Map.hpp
Library/Utils/Vector.cpp Library/Utils/Vector.hpp
Library/ZMQ.cpp Library/ZMQ.hpp

View File

@ -39,6 +39,7 @@ extern bool RegisterAPI(HSQUIRRELVM vm);
// ------------------------------------------------------------------------------------------------
extern void ZmqProcess();
extern void ZmqTerminate();
extern void AnnounceTerminate();
extern void InitializeTasks();
extern void InitializeRoutines();
extern void TerminateAreas();
@ -501,6 +502,8 @@ void Core::Terminate(bool shutdown)
TerminateAreas();
// Release privilege managers
//TerminatePrivileges();
// Release announcers
AnnounceTerminate();
// Release ZMQ sockets
ZmqTerminate();
// In case there's a payload for reload

View File

@ -85,6 +85,7 @@ static SQInteger SqExtractIPv4(HSQUIRRELVM vm)
// ------------------------------------------------------------------------------------------------
extern void Register_Map(HSQUIRRELVM vm, Table & ns);
extern void Register_Vector(HSQUIRRELVM vm, Table & ns);
extern void Register_ServerAnnouncer(HSQUIRRELVM vm, Table & ns);
// ================================================================================================
void Register_Utils(HSQUIRRELVM vm)
@ -93,6 +94,7 @@ void Register_Utils(HSQUIRRELVM vm)
Register_Map(vm, ns);
Register_Vector(vm, ns);
Register_ServerAnnouncer(vm, ns);
ns.SquirrelFunc(_SC("ExtractIPv4"), &SqExtractIPv4);

View File

@ -0,0 +1,46 @@
// ------------------------------------------------------------------------------------------------
#include "Library/Utils/Announce.hpp"
// ------------------------------------------------------------------------------------------------
namespace SqMod {
// ------------------------------------------------------------------------------------------------
SQMOD_DECL_TYPENAME(SqServerAnnouncer, _SC("SqServerAnnouncer"))
// ------------------------------------------------------------------------------------------------
void AnnounceTerminate()
{
// Go over all sockets and try to close them
for (SqAnnouncer * inst = SqAnnouncer::sHead; inst && inst->mNext != SqAnnouncer::sHead; inst = inst->mNext)
{
inst->Stop(); // Stop the announcer
}
}
// ================================================================================================
void Register_ServerAnnouncer(HSQUIRRELVM vm, Table & ns)
{
// --------------------------------------------------------------------------------------------
ns.Bind(_SC("Announcer"),
Class< SqAnnouncer, NoCopy< SqAnnouncer > >(vm, SqServerAnnouncer::Str)
// Constructors
.Ctor< StackStrF & >()
.Ctor< SQInteger, StackStrF & >()
// Meta-methods
.SquirrelFunc(_SC("_typename"), &SqServerAnnouncer::Fn)
// Properties
.Prop(_SC("Running"), &SqAnnouncer::Running)
.Prop(_SC("Log"), &SqAnnouncer::GetLog, &SqAnnouncer::SetLog)
.Prop(_SC("Interval"), &SqAnnouncer::GetInterval, &SqAnnouncer::SetInterval)
.Prop(_SC("Retries"), &SqAnnouncer::GetRetries, &SqAnnouncer::SetRetries)
.Prop(_SC("Count"), &SqAnnouncer::GetCount)
.Prop(_SC("Success"), &SqAnnouncer::GetSuccess)
.Prop(_SC("Failures"), &SqAnnouncer::GetFailures)
.Prop(_SC("URI"), &SqAnnouncer::GetURI, &SqAnnouncer::SetURI)
// Member Methods
.Func(_SC("Run"), &SqAnnouncer::Run)
.Func(_SC("Stop"), &SqAnnouncer::Stop)
);
}
} // Namespace:: SqMod

View File

@ -0,0 +1,469 @@
#pragma once
// ------------------------------------------------------------------------------------------------
#include "Core/Utility.hpp"
#include "Misc/Functions.hpp"
// ------------------------------------------------------------------------------------------------
#include <mutex>
#include <thread>
#include <vector>
#include <memory>
#include <chrono>
#include <atomic>
#include <utility>
#include <sstream>
#include <iostream>
// ------------------------------------------------------------------------------------------------
#include "Poco/Net/HTTPClientSession.h"
#include "Poco/Net/HTTPRequest.h"
#include "Poco/Net/HTTPResponse.h"
#include "Poco/Net/HTTPCredentials.h"
#include "Poco/StreamCopier.h"
#include "Poco/NullStream.h"
#include "Poco/Exception.h"
#include "Poco/Path.h"
#include "Poco/URI.h"
// ------------------------------------------------------------------------------------------------
namespace SqMod {
// ------------------------------------------------------------------------------------------------
using Poco::Net::HTTPClientSession;
using Poco::Net::HTTPRequest;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPMessage;
using Poco::StreamCopier;
/* ------------------------------------------------------------------------------------------------
* Helper utility that can be used to announce the server to a master-list.
*/
struct SqAnnouncer : SqChainedInstances< SqAnnouncer >
{
/* --------------------------------------------------------------------------------------------
* Announcer status.
*/
bool mRun;
/* --------------------------------------------------------------------------------------------
* Log failure messages.
*/
bool mLog;
/* --------------------------------------------------------------------------------------------
* Announce count.
*/
uint32_t mCount;
/* --------------------------------------------------------------------------------------------
* Success count (since last failure).
*/
uint32_t mSuccess;
/* --------------------------------------------------------------------------------------------
* Failure count.
*/
uint32_t mFailures;
/* --------------------------------------------------------------------------------------------
* Update interval (seconds).
*/
uint32_t mInterval;
/* --------------------------------------------------------------------------------------------
* The number failures to ignore.
*/
uint32_t mRetries;
/* --------------------------------------------------------------------------------------------
* Server port.
*/
const uint32_t mPort;
/* --------------------------------------------------------------------------------------------
* Server version.
*/
const uint32_t mVersion;
/* --------------------------------------------------------------------------------------------
* Master-list address.
*/
Poco::URI mURI;
/* --------------------------------------------------------------------------------------------
* Synchronization mutex.
*/
std::mutex mMtx;
/* --------------------------------------------------------------------------------------------
* Processing thread.
*/
std::thread mThread;
/* --------------------------------------------------------------------------------------------
* Default interval constructor.
*/
explicit SqAnnouncer(StackStrF & uri)
: SqAnnouncer(60,uri)
{
}
/* --------------------------------------------------------------------------------------------
* Explicit constructor.
*/
SqAnnouncer(SQInteger interval, StackStrF & uri)
: mRun(false), mLog(true)
, mCount(0)
, mSuccess(0)
, mFailures(0)
, mInterval(ClampL< SQInteger, uint32_t >(interval))
, mRetries(10)
, mPort(GetServerPort())
, mVersion(GetServerVersion())
, mURI(uri.mPtr)
, mMtx(), mThread()
{
// Remember this instance
ChainInstance();
}
/* --------------------------------------------------------------------------------------------
* Copy constructor (disabled).
*/
SqAnnouncer(const SqAnnouncer &) = delete;
/* --------------------------------------------------------------------------------------------
* Move constructor (disabled).
*/
SqAnnouncer(SqAnnouncer &&) noexcept = delete;
/* --------------------------------------------------------------------------------------------
* Destructor.
*/
~SqAnnouncer()
{
// Forget about this instance
UnchainInstance();
// Terminate the announcer if necessary
Stop();
}
/* --------------------------------------------------------------------------------------------
* Assignment operator (disabled).
*/
SqAnnouncer & operator = (const SqAnnouncer &) = delete;
/* --------------------------------------------------------------------------------------------
* Move assignment (disabled).
*/
SqAnnouncer & operator = (SqAnnouncer &&) noexcept = delete;
/* --------------------------------------------------------------------------------------------
* Start the announcer.
*/
SqAnnouncer & Run()
{
// Is the processing thread running?
if (mThread.joinable())
{
STHROWF("Announcer is already running");
}
// Allow the loop to run
mRun = true;
// Create the processing thread
mThread = std::thread(&SqAnnouncer::Proc, this);
// Allow chaining
return *this;
}
/* --------------------------------------------------------------------------------------------
* Start the announcer.
*/
SqAnnouncer & Stop()
{
// Is the processing thread running?
if (mThread.joinable())
{
// Acquire exclusive access
mMtx.lock();
// Stop the loop
mRun = false;
// Yield exclusive access
mMtx.unlock();
// Wait for the thread
mThread.join();
} else mRun = false; // Just in case
// Allow chaining
return *this;
}
/* --------------------------------------------------------------------------------------------
* See if the announcer is running.
*/
bool Running()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mRun;
}
/* --------------------------------------------------------------------------------------------
* Check if logging is enabled when failed.
*/
bool GetLog()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mLog;
}
/* --------------------------------------------------------------------------------------------
* Set if logging is enabled when failed.
*/
void SetLog(bool toggle)
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Modify the requested value
mLog = toggle;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the announce interval (seconds).
*/
uint32_t GetInterval()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mInterval;
}
/* --------------------------------------------------------------------------------------------
* Modify the announce interval (seconds).
*/
void SetInterval(uint32_t seconds)
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Modify the requested value
mInterval = seconds;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the announce retries.
*/
uint32_t GetRetries()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mRetries;
}
/* --------------------------------------------------------------------------------------------
* Modify the announce retries.
*/
void SetRetries(uint32_t seconds)
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Modify the requested value
mRetries = seconds;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the announce count (total).
*/
uint32_t GetCount()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mCount;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the announce success count (since last failure).
*/
uint32_t GetSuccess()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mSuccess;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the announce failure count.
*/
uint32_t GetFailures()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mFailures;
}
/* --------------------------------------------------------------------------------------------
* Modify the announce URI.
*/
void SetURI(StackStrF & uri)
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Modify the requested value
mURI = uri.mPtr;
}
/* --------------------------------------------------------------------------------------------
* Retrieve the announce URI.
*/
String GetURI()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Return the requested value
return mURI.toString();
}
protected:
/* --------------------------------------------------------------------------------------------
* Processing thread.
*/
void Proc()
{
while (mRun)
{
try
{
using namespace std::chrono_literals;
// Grab the current time point
std::chrono::time_point< std::chrono::steady_clock > next = std::chrono::steady_clock::now();
// Set the time-point for next update
next += std::chrono::seconds(mInterval);
// Try to announce
DoAnnounce();
// Sleep until the next appointed update time point
for (std::chrono::time_point< std::chrono::steady_clock > curr;
mRun && (next > curr);
curr = std::chrono::steady_clock::now())
{
std::this_thread::sleep_for(50ms); // Sleep for 1/20'th of a second
}
}
catch (const std::exception & e)
{
if (mLog) LogErr("Announcer failed: %s", e.what());
}
catch (...)
{
LogFtl("Unknown exception while announcing. Announcer stopped");
// This is bad. Avoid a crash!
return;
}
}
}
/* --------------------------------------------------------------------------------------------
* Announce succeeded.
*/
void DoSucceeded()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Increment announce counter
mCount += 1;
// Increment success counter
mSuccess += 1;
// Reset failure counter
mFailures = 0;
}
/* --------------------------------------------------------------------------------------------
* Announce failed.
*/
void DoFailed()
{
// Acquire exclusive access to the announcer
std::lock_guard< std::mutex > guard(mMtx);
// Increment announce counter
mCount += 1;
// Reset success counter
mSuccess = 0;
// Increment failure counter
mFailures += 1;
// Check failure limit
if (mFailures >= mRetries)
{
// Stop the announcer
mRun = false;
}
}
/* --------------------------------------------------------------------------------------------
* Processing thread.
*/
void DoAnnounce()
{
// Acquire exclusive access to the announcer
mMtx.lock();
// Create a new HTTP client session
HTTPClientSession session(mURI.getHost(), mURI.getPort());
// Create a new HTTP request
HTTPRequest request(HTTPRequest::HTTP_POST, mURI.getPathAndQuery(), HTTPMessage::HTTP_1_1);
// Set request content type header
request.setContentType("application/x-www-form-urlencoded");
// Set the user agent for the announcer
request.add("User-Agent", "VCMP/0.4");
// Set the server version for the announcer
request.add("VCMP-Version", fmt::format("{}", mVersion));
// Generate the request body
String body = fmt::format("port={}", mPort);
// Release exclusive access to the announcer
mMtx.unlock(); // No need to block while waiting for response
// Create a new HTTP response
HTTPResponse response;
// Set the request body length
request.setContentLength(body.size());
// Send the request
try
{
std::ostream & os = session.sendRequest(request);
// Send the content
os << body;
}
catch (const std::exception & e)
{
if (mLog) LogErr("Request failed: %s", e.what());
// Failed!
DoFailed();
// Probably the server is offline
return;
}
// Receive the response
std::istream & is = session.receiveResponse(response);
// Validate the response from the server
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream ss;
// Extract response contents
StreamCopier::copyStream(is, ss);
// Log the result
if (mLog) LogErr("Announce failed: %s", ss.str().c_str());
// Failed!
DoFailed();
}
else
{
DoSucceeded(); // Succeeded!
}
}
};
} // Namespace:: SqMod