|
@@ -16,6 +16,7 @@
|
|
|
############################################################################## */
|
|
|
|
|
|
#include "jmisc.hpp"
|
|
|
+#include "jfile.hpp"
|
|
|
#include "udplib.hpp"
|
|
|
#include "udptopo.hpp"
|
|
|
#include "udpipmap.hpp"
|
|
@@ -126,6 +127,7 @@ public:
|
|
|
virtual bool implementsChannel(unsigned channel) const override;
|
|
|
virtual StringBuffer &report(StringBuffer &ret) const override;
|
|
|
virtual time_t queryServerInstance(const SocketEndpoint &ep) const override;
|
|
|
+ virtual void updateStatus() const override;
|
|
|
private:
|
|
|
std::map<unsigned, SocketEndpointArray> agents; // indexed by channel
|
|
|
std::map<unsigned, SocketEndpointArray> servers; // indexed by port
|
|
@@ -141,6 +143,12 @@ private:
|
|
|
};
|
|
|
|
|
|
SocketEndpoint myAgentEP;
|
|
|
+unsigned numChannels;
|
|
|
+
|
|
|
+static bool isActive(time_t instance)
|
|
|
+{
|
|
|
+ return instance != 0 && instance != time_t(-1);
|
|
|
+}
|
|
|
|
|
|
CTopologyServer::CTopologyServer()
|
|
|
{
|
|
@@ -194,7 +202,7 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
|
|
|
}
|
|
|
if (streq(role, "agent"))
|
|
|
{
|
|
|
- if (instance || ep.equals(myAgentEP))
|
|
|
+ if (isActive(instance) || ep.equals(myAgentEP))
|
|
|
{
|
|
|
agents[channel].append(ep);
|
|
|
if (ep.equals(myAgentEP))
|
|
@@ -205,7 +213,7 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
|
|
|
}
|
|
|
agents[0].append(ep);
|
|
|
}
|
|
|
- else
|
|
|
+ else if (!instance)
|
|
|
{
|
|
|
degradedAgents[channel].append(ep);
|
|
|
}
|
|
@@ -213,13 +221,13 @@ CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer
|
|
|
else if (streq(role, "server"))
|
|
|
{
|
|
|
time_t oldInstance = old ? old->queryServerInstance(ep) : 0;
|
|
|
- if (!instance || (oldInstance && oldInstance != instance))
|
|
|
+ if (!isActive(instance) || (isActive(oldInstance) && oldInstance != instance))
|
|
|
{
|
|
|
StringBuffer s;
|
|
|
DBGLOG("Deleting pending data for server %s which has terminated or restarted", ep.getUrlStr(s).str());
|
|
|
ROQ->abortPendingData(ep);
|
|
|
}
|
|
|
- if (instance)
|
|
|
+ if (isActive(instance))
|
|
|
{
|
|
|
servers[ep.port].append(ep);
|
|
|
serverInstances[ep] = instance;
|
|
@@ -304,6 +312,9 @@ bool CTopologyServer::implementsChannel(unsigned channel) const
|
|
|
|
|
|
StringBuffer &CTopologyServer::report(StringBuffer &ret) const
|
|
|
{
|
|
|
+#ifdef _DEBUG
|
|
|
+// ret.append(rawData).newline();
|
|
|
+#endif
|
|
|
for (auto it = agents.begin(); it != agents.end(); it++)
|
|
|
{
|
|
|
if (it->second.length())
|
|
@@ -323,6 +334,54 @@ StringBuffer &CTopologyServer::report(StringBuffer &ret) const
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+void CTopologyServer::updateStatus() const
|
|
|
+{
|
|
|
+ // Set the k8s ready probe status according to whether we have at least one agent available per channel
|
|
|
+ unsigned unready = 0;
|
|
|
+ StringBuffer report;
|
|
|
+ unsigned rangeStart = 0;
|
|
|
+ for (unsigned channel=1; channel <= numChannels; channel++)
|
|
|
+ {
|
|
|
+ if (!queryAgents(channel).length())
|
|
|
+ {
|
|
|
+ if (!rangeStart)
|
|
|
+ rangeStart = channel;
|
|
|
+ unready++;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (rangeStart)
|
|
|
+ {
|
|
|
+ if (report.length())
|
|
|
+ report.append(',');
|
|
|
+ report.appendf("%u", rangeStart);
|
|
|
+ if (rangeStart != channel-1)
|
|
|
+ report.appendf("-%u", channel-1);
|
|
|
+ }
|
|
|
+ rangeStart = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (rangeStart)
|
|
|
+ {
|
|
|
+ if (report.length())
|
|
|
+ report.append(',');
|
|
|
+ report.appendf("%u", rangeStart);
|
|
|
+ if (rangeStart != numChannels)
|
|
|
+ report.appendf("-%u", numChannels);
|
|
|
+ }
|
|
|
+ Owned<IFile> sentinelFile = createSentinelTarget();
|
|
|
+ if (unready==0)
|
|
|
+ {
|
|
|
+ writeSentinelFile(sentinelFile);
|
|
|
+ DBGLOG("TOPO: all channels ready");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ removeSentinelFile(sentinelFile);
|
|
|
+ DBGLOG("TOPO: %u channel%s not ready: %s", unready, unready==1 ? "" : "s", report.str());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
const SocketEndpointArray CTopologyServer::nullArray;
|
|
|
|
|
|
// Class TopologyManager (there is a single instance) handles interaction with topology servers
|
|
@@ -338,6 +397,7 @@ public:
|
|
|
const ITopologyServer &getCurrent();
|
|
|
|
|
|
bool update();
|
|
|
+ void setTraceLevel(unsigned _traceLevel) { traceLevel = _traceLevel; }
|
|
|
unsigned numServers() const { return topoServers.length(); }
|
|
|
void freeze(bool frozen);
|
|
|
|
|
@@ -351,6 +411,7 @@ private:
|
|
|
const unsigned maxReasonableResponse = 32*32*1024; // At ~ 32 bytes per entry, 1024 channels and 32-way redundancy that's a BIG cluster!
|
|
|
StringBuffer md5;
|
|
|
StringBuffer topoBuf;
|
|
|
+ unsigned traceLevel = 0;
|
|
|
bool frozen = false; // used for testing
|
|
|
};
|
|
|
|
|
@@ -408,9 +469,23 @@ bool TopologyManager::update()
|
|
|
md5.clear().append(eol-mem, mem); // Note: includes '\n'
|
|
|
Owned<const ITopologyServer> oldServer = &getCurrent();
|
|
|
Owned<const ITopologyServer> newServer = new CTopologyServer(eol, oldServer);
|
|
|
- SpinBlock b(lock);
|
|
|
- currentTopology.swap(newServer);
|
|
|
+ {
|
|
|
+ SpinBlock b(lock);
|
|
|
+ currentTopology.swap(newServer);
|
|
|
+ }
|
|
|
updated = true;
|
|
|
+ if (traceLevel)
|
|
|
+ {
|
|
|
+ DBGLOG("Topology information updated:");
|
|
|
+ StringBuffer s;
|
|
|
+ MLOG("%s", currentTopology->report(s).str());
|
|
|
+ }
|
|
|
+ currentTopology->updateStatus();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx), responseLen, mem);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -529,19 +604,14 @@ extern UDPLIB_API void publishTopology(unsigned traceLevel, const std::vector<Ro
|
|
|
{
|
|
|
if (topologyManager.numServers())
|
|
|
{
|
|
|
- topoThread = std::thread([traceLevel, &myRoles]()
|
|
|
+ topologyManager.setTraceLevel(traceLevel);
|
|
|
+ topoThread = std::thread([&myRoles]()
|
|
|
{
|
|
|
topologyManager.update();
|
|
|
unsigned waitTime = 1000; // First time around we don't wait as long, so that system comes up faster
|
|
|
while (!abortTopo.wait(waitTime))
|
|
|
{
|
|
|
- if (topologyManager.update() && traceLevel)
|
|
|
- {
|
|
|
- DBGLOG("Topology information updated:");
|
|
|
- Owned<const ITopologyServer> c = getTopology();
|
|
|
- StringBuffer s;
|
|
|
- MLOG("%s", c->report(s).str());
|
|
|
- }
|
|
|
+ topologyManager.update();
|
|
|
waitTime = heartbeatInterval;
|
|
|
}
|
|
|
topologyManager.closedown(myRoles);
|