udptopo.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jmisc.hpp"
  14. #include "jfile.hpp"
  15. #include "udplib.hpp"
  16. #include "udptopo.hpp"
  17. #include "udpipmap.hpp"
  18. #include "roxie.hpp"
  19. #include "portlist.h"
  20. #include <thread>
  21. #include <string>
  22. #include <sstream>
  23. #include <map>
  24. unsigned initIbytiDelay; // In milliseconds
  25. unsigned minIbytiDelay; // In milliseconds
  26. unsigned ChannelInfo::getIbytiDelay(unsigned primarySubChannel) const // NOTE - zero-based
  27. {
  28. unsigned delay = 0;
  29. unsigned subChannel = primarySubChannel;
  30. while (subChannel != mySubChannel)
  31. {
  32. delay += currentDelay[subChannel];
  33. subChannel++;
  34. if (subChannel == numSubChannels)
  35. subChannel = 0;
  36. }
  37. return delay;
  38. }
  39. void ChannelInfo::noteChannelsSick(unsigned primarySubChannel) const
  40. {
  41. unsigned subChannel = primarySubChannel;
  42. while (subChannel != mySubChannel)
  43. {
  44. unsigned newDelay = currentDelay[subChannel] / 2;
  45. if (newDelay < minIbytiDelay)
  46. newDelay = minIbytiDelay;
  47. currentDelay[subChannel] = newDelay;
  48. subChannel++;
  49. if (subChannel == numSubChannels)
  50. subChannel = 0;
  51. }
  52. }
  53. void ChannelInfo::noteChannelHealthy(unsigned subChannel) const
  54. {
  55. currentDelay[subChannel] = initIbytiDelay;
  56. }
  57. ChannelInfo::ChannelInfo(unsigned _mySubChannel, unsigned _numSubChannels, unsigned _replicationLevel)
  58. : mySubChannel(_mySubChannel), numSubChannels(_numSubChannels), myReplicationLevel(_replicationLevel)
  59. {
  60. for (unsigned i = 0; i < numSubChannels; i++)
  61. currentDelay.emplace_back(initIbytiDelay);
  62. }
  63. bool ChannelInfo::otherAgentHasPriority(unsigned priorityHash, unsigned otherAgentSubChannel) const
  64. {
  65. unsigned primarySubChannel = (priorityHash % numSubChannels);
  66. // could be coded smarter! Basically mysub - prim < theirsub - prim using modulo arithmetic, I think
  67. while (primarySubChannel != mySubChannel)
  68. {
  69. if (primarySubChannel == otherAgentSubChannel)
  70. return true;
  71. primarySubChannel++;
  72. if (primarySubChannel >= numSubChannels)
  73. primarySubChannel = 0;
  74. }
  75. return false;
  76. }
  77. static unsigned *createNewNodeHealthScore(const ServerIdentifier)
  78. {
  79. return new unsigned(initIbytiDelay);
  80. }
  81. static IpMapOf<unsigned> buddyHealth(createNewNodeHealthScore); // For each buddy IP ever seen, maintains a score of how long I should wait for it to respond when it is the 'first responder'
  82. void noteNodeSick(const ServerIdentifier &node)
  83. {
  84. // NOTE - IpMapOf is thread safe (we never remove entries). Two threads hitting at the same time may result in the change from one being lost, but that's not a disaster
  85. unsigned current = buddyHealth[node];
  86. unsigned newDelay = current / 2;
  87. if (newDelay < minIbytiDelay)
  88. newDelay = minIbytiDelay;
  89. buddyHealth[node] = newDelay;
  90. }
  91. void noteNodeHealthy(const ServerIdentifier &node)
  92. {
  93. // NOTE - IpMapOf is thread safe (we never remove entries). Two threads hitting at the same time may result in the change from one being lost, but that's not a disaster
  94. buddyHealth[node] = initIbytiDelay;
  95. }
  96. unsigned getIbytiDelay(const ServerIdentifier &node)
  97. {
  98. return buddyHealth[node];
  99. }
  100. class CTopologyServer : public CInterfaceOf<ITopologyServer>
  101. {
  102. public:
  103. CTopologyServer();
  104. CTopologyServer(const char *topologyInfo, const ITopologyServer *current);
  105. virtual const SocketEndpointArray &queryAgents(unsigned channel) const override;
  106. virtual const SocketEndpointArray &queryServers(unsigned port) const override;
  107. virtual const ChannelInfo &queryChannelInfo(unsigned channel) const override;
  108. virtual const std::vector<unsigned> &queryChannels() const override;
  109. virtual bool implementsChannel(unsigned channel) const override;
  110. virtual StringBuffer &report(StringBuffer &ret) const override;
  111. virtual time_t queryServerInstance(const SocketEndpoint &ep) const override;
  112. virtual void updateStatus() const override;
  113. private:
  114. std::map<unsigned, SocketEndpointArray> agents; // indexed by channel
  115. std::map<unsigned, SocketEndpointArray> servers; // indexed by port
  116. std::map<SocketEndpoint, time_t> serverInstances;
  117. static const SocketEndpointArray nullArray;
  118. std::map<unsigned, ChannelInfo> channelInfo;
  119. std::map<unsigned, unsigned> mySubChannels;
  120. std::vector<unsigned> channels;
  121. std::vector<unsigned> replicationLevels;
  122. #ifdef _DEBUG
  123. StringAttr rawData;
  124. #endif
  125. };
  126. SocketEndpoint myAgentEP;
  127. unsigned numChannels;
  128. static bool isActive(time_t instance)
  129. {
  130. return instance != 0 && instance != time_t(-1);
  131. }
  132. CTopologyServer::CTopologyServer()
  133. {
  134. }
  135. CTopologyServer::CTopologyServer(const char *topologyInfo, const ITopologyServer *old)
  136. #ifdef _DEBUG
  137. : rawData(topologyInfo)
  138. #endif
  139. {
  140. std::istringstream ss(topologyInfo);
  141. std::string line;
  142. std::map<unsigned, SocketEndpointArray> degradedAgents; // indexed by channel - agents that have not sent heartbeats recently. Use only if nothing else available on channel
  143. while (std::getline(ss, line, '\n'))
  144. {
  145. StringArray fields;
  146. fields.appendList(line.c_str(), "|\t", true);
  147. if (fields.length()==5)
  148. {
  149. const char *role = fields.item(0);
  150. const char *channelStr = fields.item(1);
  151. const char *epStr = fields.item(2);
  152. const char *replStr = fields.item(3);
  153. const char *instanceStr = fields.item(4);
  154. char *tail = nullptr;
  155. unsigned channel = strtoul(channelStr, &tail, 10);
  156. if (*tail)
  157. {
  158. DBGLOG("Unexpected characters parsing channel in topology entry %s", line.c_str());
  159. continue;
  160. }
  161. tail = nullptr;
  162. unsigned repl = strtoul(replStr, &tail, 10);
  163. if (*tail)
  164. {
  165. DBGLOG("Unexpected characters parsing replication level in topology entry %s", line.c_str());
  166. continue;
  167. }
  168. tail = nullptr;
  169. time_t instance = strtoul(instanceStr, &tail, 10);
  170. if (*tail)
  171. {
  172. DBGLOG("Unexpected characters parsing instance value in topology entry %s", line.c_str());
  173. continue;
  174. }
  175. SocketEndpoint ep;
  176. if (!ep.set(epStr))
  177. {
  178. DBGLOG("Unable to process endpoint information in topology entry %s", line.c_str());
  179. continue;
  180. }
  181. if (streq(role, "agent"))
  182. {
  183. if (isActive(instance) || ep.equals(myAgentEP))
  184. {
  185. agents[channel].append(ep);
  186. if (ep.equals(myAgentEP))
  187. {
  188. mySubChannels[channel] = agents[channel].ordinality()-1;
  189. channels.push_back(channel);
  190. replicationLevels.push_back(repl);
  191. }
  192. agents[0].append(ep);
  193. }
  194. else if (!instance)
  195. {
  196. degradedAgents[channel].append(ep);
  197. }
  198. }
  199. else if (streq(role, "server"))
  200. {
  201. time_t oldInstance = old ? old->queryServerInstance(ep) : 0;
  202. if (!isActive(instance) || (isActive(oldInstance) && oldInstance != instance))
  203. {
  204. StringBuffer s;
  205. DBGLOG("Deleting pending data for server %s which has terminated or restarted", ep.getUrlStr(s).str());
  206. ROQ->abortPendingData(ep);
  207. }
  208. if (isActive(instance))
  209. {
  210. servers[ep.port].append(ep);
  211. serverInstances[ep] = instance;
  212. }
  213. }
  214. }
  215. else
  216. DBGLOG("Unable to process information in topology entry %s (expected 5 fields)", line.c_str());
  217. }
  218. // Degraded agents are used only if nothing else is available on the channel
  219. for (auto it = degradedAgents.begin(); it != degradedAgents.end(); it++)
  220. {
  221. unsigned channel = it->first;
  222. if (!agents[channel].length())
  223. {
  224. DBGLOG("Adding degraded agent(s) to channel %d", channel);
  225. ForEachItemIn(idx, it->second)
  226. {
  227. agents[channel].append(it->second.item(idx));
  228. agents[0].append(it->second.item(idx));
  229. }
  230. }
  231. else
  232. DBGLOG("Ignoring degraded agent(s) on channel %d", channel);
  233. }
  234. for (unsigned i = 0; i < channels.size(); i++)
  235. {
  236. unsigned channel = channels[i];
  237. unsigned repl = replicationLevels[i];
  238. unsigned subChannel = mySubChannels[channel];
  239. channelInfo.emplace(std::make_pair(channel, ChannelInfo(subChannel, agents[channel].ordinality(), repl)));
  240. }
  241. }
  242. time_t CTopologyServer::queryServerInstance(const SocketEndpoint &ep) const
  243. {
  244. auto match = serverInstances.find(ep);
  245. if (match == serverInstances.end())
  246. return 0;
  247. return match->second;
  248. }
  249. const SocketEndpointArray &CTopologyServer::queryAgents(unsigned channel) const
  250. {
  251. auto match = agents.find(channel);
  252. if (match == agents.end())
  253. return nullArray;
  254. return match->second;
  255. }
  256. const SocketEndpointArray &CTopologyServer::queryServers(unsigned port) const
  257. {
  258. auto match = servers.find(port);
  259. if (match == servers.end())
  260. return nullArray;
  261. return match->second;
  262. }
  263. const ChannelInfo &CTopologyServer::queryChannelInfo(unsigned channel) const
  264. {
  265. auto match = channelInfo.find(channel);
  266. if (match == channelInfo.end())
  267. throw makeStringExceptionV(ROXIE_INTERNAL_ERROR, "queryChannelInfo requesting info for unexpected channel %u", channel);
  268. return match->second;
  269. }
  270. const std::vector<unsigned> &CTopologyServer::queryChannels() const
  271. {
  272. return channels;
  273. }
  274. bool CTopologyServer::implementsChannel(unsigned channel) const
  275. {
  276. if (channel)
  277. {
  278. return std::find(channels.begin(), channels.end(), channel) != channels.end();
  279. }
  280. else
  281. return true; // Kinda-sorta - perhaps not true if separated servers from agents, but even then child queries may access channel 0
  282. }
  283. StringBuffer &CTopologyServer::report(StringBuffer &ret) const
  284. {
  285. #ifdef _DEBUG
  286. // ret.append(rawData).newline();
  287. #endif
  288. for (auto it = agents.begin(); it != agents.end(); it++)
  289. {
  290. if (it->second.length())
  291. {
  292. ret.appendf("Channel %d agents: ", it->first);
  293. it->second.getText(ret).newline();
  294. }
  295. }
  296. for (auto it = servers.begin(); it != servers.end(); it++)
  297. {
  298. if (it->second.length())
  299. {
  300. ret.appendf("Port %d servers: ", it->first);
  301. it->second.getText(ret).newline();
  302. }
  303. }
  304. return ret;
  305. }
  306. void CTopologyServer::updateStatus() const
  307. {
  308. // Set the k8s ready probe status according to whether we have at least one agent available per channel
  309. unsigned unready = 0;
  310. StringBuffer report;
  311. unsigned rangeStart = 0;
  312. for (unsigned channel=1; channel <= numChannels; channel++)
  313. {
  314. if (!queryAgents(channel).length())
  315. {
  316. if (!rangeStart)
  317. rangeStart = channel;
  318. unready++;
  319. }
  320. else
  321. {
  322. if (rangeStart)
  323. {
  324. if (report.length())
  325. report.append(',');
  326. report.appendf("%u", rangeStart);
  327. if (rangeStart != channel-1)
  328. report.appendf("-%u", channel-1);
  329. }
  330. rangeStart = 0;
  331. }
  332. }
  333. if (rangeStart)
  334. {
  335. if (report.length())
  336. report.append(',');
  337. report.appendf("%u", rangeStart);
  338. if (rangeStart != numChannels)
  339. report.appendf("-%u", numChannels);
  340. }
  341. Owned<IFile> sentinelFile = createSentinelTarget(".ready");
  342. if (unready==0)
  343. {
  344. writeSentinelFile(sentinelFile);
  345. DBGLOG("TOPO: all channels ready");
  346. }
  347. else
  348. {
  349. removeSentinelFile(sentinelFile);
  350. DBGLOG("TOPO: %u channel%s not ready: %s", unready, unready==1 ? "" : "s", report.str());
  351. }
  352. }
  353. const SocketEndpointArray CTopologyServer::nullArray;
  354. // Class TopologyManager (there is a single instance) handles interaction with topology servers
  355. // to provide a TopologyServer reflecting current known cluster topology
  356. class TopologyManager
  357. {
  358. public:
  359. TopologyManager() { currentTopology.setown(new CTopologyServer); };
  360. void setServers(const StringArray &_topoServers);
  361. void setRoles(const std::vector<RoxieEndpointInfo> &myRoles);
  362. void closedown(const std::vector<RoxieEndpointInfo> &myRoles);
  363. const ITopologyServer &getCurrent();
  364. bool update();
  365. void setTraceLevel(unsigned _traceLevel) { traceLevel = _traceLevel; }
  366. unsigned numServers() const { return topoServers.length(); }
  367. void freeze(bool frozen);
  368. private:
  369. void _setRoles(const std::vector<RoxieEndpointInfo> &myRoles, bool remove);
  370. Owned<const ITopologyServer> currentTopology;
  371. SpinLock lock;
  372. StringArray topoServers;
  373. time_t myInstance = 0;
  374. const unsigned topoConnectTimeout = 1000;
  375. const unsigned maxReasonableResponse = 32*32*1024; // At ~ 32 bytes per entry, 1024 channels and 32-way redundancy that's a BIG cluster!
  376. StringBuffer md5;
  377. StringBuffer topoBuf;
  378. unsigned traceLevel = 0;
  379. bool frozen = false; // used for testing
  380. };
  381. static TopologyManager topologyManager;
  382. bool TopologyManager::update()
  383. {
  384. if (frozen)
  385. return false;
  386. bool updated = false;
  387. ForEachItemIn(idx, topoServers)
  388. {
  389. try
  390. {
  391. SocketEndpointArray eps;
  392. eps.fromName(topoServers.item(idx), TOPO_SERVER_PORT);
  393. ForEachItemIn(idx, eps)
  394. {
  395. const SocketEndpoint &ep = eps.item(idx);
  396. Owned<ISocket> topo = ISocket::connect_timeout(ep, topoConnectTimeout);
  397. if (topo)
  398. {
  399. unsigned topoBufLen = md5.length()+topoBuf.length();
  400. _WINREV(topoBufLen);
  401. topo->write(&topoBufLen, 4);
  402. topo->write(md5.str(), md5.length());
  403. topo->write(topoBuf.str(), topoBuf.length());
  404. unsigned responseLen;
  405. topo->read(&responseLen, 4);
  406. _WINREV(responseLen);
  407. if (!responseLen)
  408. {
  409. DBGLOG("Unexpected empty response from topology server %s", topoServers.item(idx));
  410. }
  411. else
  412. {
  413. if (responseLen > maxReasonableResponse)
  414. {
  415. DBGLOG("Unexpectedly large response (%u) from topology server %s", responseLen, topoServers.item(idx));
  416. }
  417. else
  418. {
  419. MemoryBuffer mb;
  420. char *mem = (char *)mb.reserveTruncate(responseLen+1);
  421. topo->read(mem, responseLen);
  422. mem[responseLen] = '\0';
  423. if (responseLen>=md5.length() && mem[0]=='=')
  424. {
  425. if (md5.length()==0 || memcmp(mem, md5.str(), md5.length())!=0)
  426. {
  427. const char *eol = strchr(mem, '\n');
  428. if (eol)
  429. {
  430. eol++;
  431. md5.clear().append(eol-mem, mem); // Note: includes '\n'
  432. Owned<const ITopologyServer> oldServer = &getCurrent();
  433. Owned<const ITopologyServer> newServer = new CTopologyServer(eol, oldServer);
  434. {
  435. SpinBlock b(lock);
  436. currentTopology.swap(newServer);
  437. }
  438. updated = true;
  439. if (traceLevel)
  440. {
  441. DBGLOG("Topology information updated:");
  442. StringBuffer s;
  443. MLOG("%s", currentTopology->report(s).str());
  444. }
  445. currentTopology->updateStatus();
  446. }
  447. else
  448. {
  449. StringBuffer s;
  450. DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx), responseLen, mem);
  451. }
  452. }
  453. }
  454. else
  455. {
  456. StringBuffer s;
  457. DBGLOG("Unexpected response from topology server %s: %.*s", topoServers.item(idx), responseLen, mem);
  458. }
  459. }
  460. }
  461. }
  462. }
  463. }
  464. catch (IException *E)
  465. {
  466. DBGLOG("While connecting to %s", topoServers.item(idx));
  467. EXCLOG(E);
  468. E->Release();
  469. }
  470. }
  471. return updated;
  472. }
  473. void TopologyManager::freeze(bool _frozen)
  474. {
  475. frozen = _frozen;
  476. }
  477. const ITopologyServer &TopologyManager::getCurrent()
  478. {
  479. SpinBlock b(lock);
  480. return *currentTopology.getLink();
  481. }
  482. void TopologyManager::setServers(const StringArray &_topoServers)
  483. {
  484. ForEachItemIn(idx, _topoServers)
  485. topoServers.append(_topoServers.item(idx));
  486. }
  487. void TopologyManager::_setRoles(const std::vector<RoxieEndpointInfo> &myRoles, bool remove)
  488. {
  489. topoBuf.clear();
  490. for (const auto &role : myRoles)
  491. {
  492. if (remove)
  493. topoBuf.append('-');
  494. switch (role.role)
  495. {
  496. case RoxieEndpointInfo::RoxieServer: topoBuf.append("server|"); break;
  497. case RoxieEndpointInfo::RoxieAgent: topoBuf.append("agent|"); break;
  498. default: throwUnexpected();
  499. }
  500. topoBuf.append(role.channel).append('|');
  501. role.ep.getUrlStr(topoBuf);
  502. topoBuf.append('|').append(role.replicationLevel);
  503. topoBuf.append('\t').append((__uint64) myInstance);
  504. topoBuf.append('\n');
  505. }
  506. }
  507. void TopologyManager::setRoles(const std::vector<RoxieEndpointInfo> &myRoles)
  508. {
  509. topoBuf.clear();
  510. myInstance = time(nullptr);
  511. if (!myInstance) myInstance++;
  512. _setRoles(myRoles, false);
  513. Owned<const ITopologyServer> newServer = new CTopologyServer(topoBuf, nullptr); // We set the initial topology to just the local information we know about
  514. SpinBlock b(lock);
  515. currentTopology.swap(newServer);
  516. }
  517. void TopologyManager::closedown(const std::vector<RoxieEndpointInfo> &myRoles)
  518. {
  519. topoBuf.clear();
  520. _setRoles(myRoles, true); // Tell toposerver to remove the specified roles
  521. freeze(false);
  522. update();
  523. }
  524. extern UDPLIB_API const ITopologyServer *getTopology()
  525. {
  526. return &topologyManager.getCurrent();
  527. }
  528. extern UDPLIB_API void freezeTopology(bool frozen)
  529. {
  530. topologyManager.freeze(frozen);
  531. }
  532. extern UDPLIB_API unsigned getNumAgents(unsigned channel)
  533. {
  534. Owned<const ITopologyServer> topology = getTopology();
  535. return topology->queryAgents(channel).ordinality();
  536. }
  537. #ifndef _CONTAINERIZED
  538. extern UDPLIB_API void createStaticTopology(const std::vector<RoxieEndpointInfo> &allRoles, unsigned traceLevel)
  539. {
  540. topologyManager.setRoles(allRoles);
  541. }
  542. #endif
  543. static std::thread topoThread;
  544. static Semaphore abortTopo;
  545. unsigned heartbeatInterval = 5000; // How often roxie servers update topo server
  546. extern UDPLIB_API void initializeTopology(const StringArray &topoValues, const std::vector<RoxieEndpointInfo> &myRoles)
  547. {
  548. topologyManager.setServers(topoValues);
  549. topologyManager.setRoles(myRoles);
  550. heartbeatInterval = getComponentConfigSP()->getPropInt("@heartbeatInterval", heartbeatInterval);
  551. }
  552. extern UDPLIB_API void publishTopology(unsigned traceLevel, const std::vector<RoxieEndpointInfo> &myRoles)
  553. {
  554. if (topologyManager.numServers())
  555. {
  556. topologyManager.setTraceLevel(traceLevel);
  557. topoThread = std::thread([&myRoles]()
  558. {
  559. topologyManager.update();
  560. unsigned waitTime = 1000; // First time around we don't wait as long, so that system comes up faster
  561. while (!abortTopo.wait(waitTime))
  562. {
  563. topologyManager.update();
  564. waitTime = heartbeatInterval;
  565. }
  566. topologyManager.closedown(myRoles);
  567. });
  568. }
  569. }
  570. extern UDPLIB_API void stopTopoThread()
  571. {
  572. if (topoThread.joinable())
  573. {
  574. abortTopo.signal();
  575. topoThread.join();
  576. }
  577. }
  578. #ifdef _USE_CPPUNIT
  579. #include "unittests.hpp"
  580. class BuddyHealthTest : public CppUnit::TestFixture
  581. {
  582. CPPUNIT_TEST_SUITE(BuddyHealthTest);
  583. CPPUNIT_TEST(testBuddyHealth);
  584. CPPUNIT_TEST(testMap);
  585. CPPUNIT_TEST_SUITE_END();
  586. void testBuddyHealth()
  587. {
  588. initIbytiDelay = 64;
  589. minIbytiDelay = 16;
  590. IpAddress a1("123.4.5.1");
  591. IpAddress a2("123.4.6.2");
  592. IpAddress a3("123.4.5.3");
  593. CPPUNIT_ASSERT(getIbytiDelay(a1)==initIbytiDelay);
  594. noteNodeSick(a1);
  595. noteNodeSick(a2);
  596. CPPUNIT_ASSERT(getIbytiDelay(a1)==initIbytiDelay/2);
  597. CPPUNIT_ASSERT(getIbytiDelay(a2)==initIbytiDelay/2);
  598. CPPUNIT_ASSERT(getIbytiDelay(a3)==initIbytiDelay);
  599. noteNodeHealthy(a1);
  600. CPPUNIT_ASSERT(getIbytiDelay(a1)==initIbytiDelay);
  601. CPPUNIT_ASSERT(getIbytiDelay(a2)==initIbytiDelay/2);
  602. CPPUNIT_ASSERT(getIbytiDelay(a3)==initIbytiDelay);
  603. noteNodeSick(a2);
  604. noteNodeSick(a2);
  605. noteNodeSick(a2);
  606. noteNodeSick(a2);
  607. noteNodeSick(a2);
  608. CPPUNIT_ASSERT(getIbytiDelay(a2)==minIbytiDelay);
  609. }
  610. void testMap()
  611. {
  612. std::map<SocketEndpoint, time_t> serverInstances;
  613. SocketEndpoint ep1, ep2, ep3, ep4, ep5, ep6;
  614. ep1.set("1.2.3.4", 5);
  615. ep2.set("1.2.3.4", 6);
  616. ep3.set("1.2.3.4", 5);
  617. ep4.set("1.2.3.4", 6);
  618. ep5.set("1.2.3.5", 6);
  619. ep6.set("1.2.3.4", 7);
  620. serverInstances[ep1] = 7;
  621. serverInstances[ep2] = 8;
  622. CPPUNIT_ASSERT(ep1<ep2);
  623. CPPUNIT_ASSERT(!(ep2<ep1));
  624. CPPUNIT_ASSERT(serverInstances.find(ep1) != serverInstances.end());
  625. CPPUNIT_ASSERT(serverInstances.find(ep2) != serverInstances.end());
  626. CPPUNIT_ASSERT(serverInstances.find(ep3) != serverInstances.end());
  627. CPPUNIT_ASSERT(serverInstances.find(ep4) != serverInstances.end());
  628. CPPUNIT_ASSERT(serverInstances.find(ep5) == serverInstances.end());
  629. CPPUNIT_ASSERT(serverInstances.find(ep6) == serverInstances.end());
  630. }
  631. };
  632. CPPUNIT_TEST_SUITE_REGISTRATION( BuddyHealthTest );
  633. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( BuddyHealthTest, "BuddyHealthTest" );
  634. #endif