|
@@ -97,6 +97,7 @@ public:
|
|
|
virtual const ChannelInfo &queryChannelInfo(unsigned channel) const override;
|
|
|
virtual const std::vector<unsigned> &queryChannels() const override;
|
|
|
virtual bool implementsChannel(unsigned channel) const override;
|
|
|
+ virtual void report(StringBuffer &ret) const override;
|
|
|
|
|
|
private:
|
|
|
std::map<unsigned, SocketEndpointArray> agents; // indexed by channel
|
|
@@ -106,6 +107,9 @@ private:
|
|
|
std::map<unsigned, unsigned> mySubChannels;
|
|
|
std::vector<unsigned> channels;
|
|
|
std::vector<unsigned> replicationLevels;
|
|
|
+#ifdef _DEBUG
|
|
|
+ StringAttr rawData;
|
|
|
+#endif
|
|
|
};
|
|
|
|
|
|
SocketEndpoint myAgentEP;
|
|
@@ -115,6 +119,9 @@ CTopologyServer::CTopologyServer()
|
|
|
}
|
|
|
|
|
|
CTopologyServer::CTopologyServer(const char *topologyInfo)
|
|
|
+#ifdef _DEBUG
|
|
|
+ : rawData(topologyInfo)
|
|
|
+#endif
|
|
|
{
|
|
|
std::istringstream ss(topologyInfo);
|
|
|
std::string line;
|
|
@@ -210,6 +217,16 @@ bool CTopologyServer::implementsChannel(unsigned channel) const
|
|
|
else
|
|
|
return true; // Kinda-sorta - perhaps not true if separated servers from agents, but even then child queries may access channel 0
|
|
|
}
|
|
|
+
|
|
|
+void CTopologyServer::report(StringBuffer &ret) const
|
|
|
+{
|
|
|
+#ifdef _DEBUG
|
|
|
+ ret.append(rawData);
|
|
|
+#else
|
|
|
+ UNIMPLEMENTED;
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
const SocketEndpointArray CTopologyServer::nullArray;
|
|
|
|
|
|
// Class TopologyManager (there is a single instance) handles interaction with topology servers
|
|
@@ -225,6 +242,8 @@ public:
|
|
|
|
|
|
bool update();
|
|
|
unsigned numServers() const { return topoServers.length(); }
|
|
|
+ void freeze(bool frozen);
|
|
|
+
|
|
|
private:
|
|
|
Owned<const ITopologyServer> currentTopology;
|
|
|
SpinLock lock;
|
|
@@ -233,12 +252,15 @@ private:
|
|
|
const unsigned maxReasonableResponse = 32*32*1024; // At ~ 32 bytes per entry, 1024 channels and 32-way redundancy that's a BIG cluster!
|
|
|
StringBuffer md5;
|
|
|
StringBuffer topoBuf;
|
|
|
+ bool frozen = false; // used for testing
|
|
|
};
|
|
|
|
|
|
static TopologyManager topologyManager;
|
|
|
|
|
|
bool TopologyManager::update()
|
|
|
{
|
|
|
+ if (frozen)
|
|
|
+ return false;
|
|
|
bool updated = false;
|
|
|
ForEachItemIn(idx, topoServers)
|
|
|
{
|
|
@@ -273,8 +295,9 @@ bool TopologyManager::update()
|
|
|
else
|
|
|
{
|
|
|
MemoryBuffer mb;
|
|
|
- char *mem = (char *)mb.reserveTruncate(responseLen);
|
|
|
+ char *mem = (char *)mb.reserveTruncate(responseLen+1);
|
|
|
topo->read(mem, responseLen);
|
|
|
+ mem[responseLen] = '\0';
|
|
|
if (responseLen>=md5.length() && mem[0]=='=')
|
|
|
{
|
|
|
if (md5.length()==0 || memcmp(mem, md5.str(), md5.length())!=0)
|
|
@@ -311,6 +334,11 @@ bool TopologyManager::update()
|
|
|
return updated;
|
|
|
}
|
|
|
|
|
|
+void TopologyManager::freeze(bool _frozen)
|
|
|
+{
|
|
|
+ frozen = _frozen;
|
|
|
+}
|
|
|
+
|
|
|
const ITopologyServer &TopologyManager::getCurrent()
|
|
|
{
|
|
|
SpinBlock b(lock);
|
|
@@ -350,6 +378,11 @@ extern UDPLIB_API const ITopologyServer *getTopology()
|
|
|
return &topologyManager.getCurrent();
|
|
|
}
|
|
|
|
|
|
+extern UDPLIB_API void freezeTopology(bool frozen)
|
|
|
+{
|
|
|
+ topologyManager.freeze(frozen);
|
|
|
+}
|
|
|
+
|
|
|
extern UDPLIB_API unsigned getNumAgents(unsigned channel)
|
|
|
{
|
|
|
Owned<const ITopologyServer> topology = getTopology();
|