slwatchdog.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // Slave Watchdog
  14. #include "platform.h"
  15. #include <stdio.h>
  16. #include "jsocket.hpp"
  17. #include "jmisc.hpp"
  18. #include "portlist.h"
  19. #include "thorport.hpp"
  20. #include "thormisc.hpp"
  21. #include "thcompressutil.hpp"
  22. #include "thwatchdog.hpp"
  23. #include "slwatchdog.hpp"
  24. #include "thgraphslave.hpp"
  25. class CGraphProgressHandlerBase : public CInterfaceOf<ISlaveWatchdog>, implements IThreaded
  26. {
  27. mutable CriticalSection crit;
  28. CGraphArray activeGraphs;
  29. bool stopped = true;
  30. bool progressEnabled = false;
  31. CThreaded threaded;
  32. SocketEndpoint self;
  33. void gatherAndSend()
  34. {
  35. MemoryBuffer sendMb, progressMb;
  36. HeartBeatPacketHeader hb;
  37. hb.sender = self;
  38. hb.tick++;
  39. size32_t progressSizePos = (byte *)&hb.progressSize - (byte *)&hb;
  40. sendMb.append(sizeof(HeartBeatPacketHeader), &hb);
  41. hb.progressSize = gatherData(progressMb);
  42. sendMb.writeDirect(progressSizePos, sizeof(hb.progressSize), &hb.progressSize);
  43. sendMb.append(progressMb);
  44. size32_t packetSize = sendMb.length();
  45. sendMb.writeDirect(0, sizeof(hb.packetSize), &packetSize);
  46. sendData(sendMb);
  47. }
  48. virtual void sendData(MemoryBuffer &mb) = 0;
  49. public:
  50. CGraphProgressHandlerBase() : threaded("CGraphProgressHandler", this)
  51. {
  52. self = queryMyNode()->endpoint();
  53. progressEnabled = globals->getPropBool("@watchdogProgressEnabled");
  54. #ifdef _WIN32
  55. threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
  56. #endif
  57. }
  58. void start()
  59. {
  60. stopped = false;
  61. threaded.start();
  62. }
  63. virtual void beforeDispose() override
  64. {
  65. stop();
  66. }
  67. size32_t gatherData(MemoryBuffer &mb)
  68. {
  69. CriticalBlock b(crit);
  70. if (progressEnabled)
  71. {
  72. MemoryBuffer progressData;
  73. {
  74. CriticalBlock b(crit);
  75. ForEachItemIn(g, activeGraphs) // NB: 1 for each slavesPerProcess
  76. {
  77. CGraphBase &graph = activeGraphs.item(g);
  78. progressData.append((unsigned)graph.queryJobChannel().queryMyRank()-1);
  79. if (!graph.serializeStats(progressData))
  80. progressData.setLength(progressData.length()-sizeof(unsigned));
  81. }
  82. }
  83. size32_t sz = progressData.length();
  84. if (sz)
  85. {
  86. ThorCompress(progressData, mb, 0x200);
  87. return sz;
  88. }
  89. }
  90. return 0;
  91. }
  92. // ISlaveWatchdog impl.
  93. virtual void startGraph(CGraphBase &graph) override
  94. {
  95. CriticalBlock b(crit);
  96. activeGraphs.append(*LINK(&graph));
  97. StringBuffer str("Watchdog: Start Job ");
  98. LOG(MCthorDetailedDebugInfo, thorJob, "%s", str.append(graph.queryGraphId()).str());
  99. }
  100. virtual void stopGraph(CGraphBase &graph, MemoryBuffer *mb) override
  101. {
  102. CriticalBlock b(crit);
  103. if (NotFound != activeGraphs.find(graph))
  104. {
  105. StringBuffer str("Watchdog: Stop Job ");
  106. LOG(MCthorDetailedDebugInfo, thorJob, "%s", str.append(graph.queryGraphId()).str());
  107. if (mb)
  108. {
  109. DelayedSizeMarker sizeMark(*mb);
  110. gatherData(*mb);
  111. sizeMark.write();
  112. }
  113. activeGraphs.zap(graph);
  114. }
  115. }
  116. virtual void stop() override
  117. {
  118. if (!stopped)
  119. {
  120. #ifdef _WIN32
  121. threaded.adjustPriority(0); // restore to normal before stopping
  122. #endif
  123. stopped = true;
  124. threaded.join();
  125. LOG(MCdebugProgress, thorJob, "Stopped watchdog");
  126. }
  127. }
  128. virtual void debugRequest(MemoryBuffer &msg, const char *request) const override
  129. {
  130. Owned<IPTree> req = createPTreeFromXMLString(request);
  131. StringBuffer edgeString;
  132. req->getProp("@edgeId", edgeString);
  133. // Split edge string in activityId and edgeIdx
  134. const char *pEdge=edgeString.str();
  135. const activity_id actId = (activity_id)_atoi64(pEdge);
  136. if (!actId) return;
  137. while (*pEdge && *pEdge!='_') ++pEdge;
  138. if (!*pEdge) return;
  139. const unsigned edgeIdx = (unsigned)_atoi64(++pEdge);
  140. CriticalBlock b(crit);
  141. ForEachItemIn(g, activeGraphs) // NB: 1 for each slavesPerProcess
  142. {
  143. CGraphBase &graph = activeGraphs.item(g);
  144. CGraphElementBase *element = graph.queryElement(actId);
  145. if (element)
  146. {
  147. CSlaveActivity *activity = (CSlaveActivity*) element->queryActivity();
  148. if (activity) activity->debugRequest(edgeIdx, msg);
  149. }
  150. }
  151. }
  152. // IThreaded
  153. virtual void threadmain() override
  154. {
  155. LOG(MCthorDetailedDebugInfo, thorJob, "Watchdog: thread running");
  156. gatherAndSend(); // send initial data
  157. assertex(HEARTBEAT_INTERVAL>=8);
  158. unsigned count = HEARTBEAT_INTERVAL+getRandom()%8-4;
  159. while (!stopped)
  160. {
  161. Sleep(1000);
  162. if (stopped)
  163. break;
  164. if (count--==0)
  165. {
  166. gatherAndSend();
  167. count = HEARTBEAT_INTERVAL+getRandom()%8-4;
  168. }
  169. }
  170. }
  171. };
  172. class CGraphProgressUDPHandler : public CGraphProgressHandlerBase
  173. {
  174. Owned<ISocket> sock;
  175. public:
  176. CGraphProgressUDPHandler()
  177. {
  178. StringBuffer ipStr;
  179. queryMasterNode().endpoint().getIpText(ipStr);
  180. sock.setown(ISocket::udp_connect(getFixedPort(getMasterPortBase(), TPORT_watchdog),ipStr.str()));
  181. start();
  182. }
  183. virtual void sendData(MemoryBuffer &mb) override
  184. {
  185. HeartBeatPacketHeader hb;
  186. //Cast is to avoid warning about writing to an object with non trivial copy assignment
  187. memcpy(reinterpret_cast<void *>(&hb), mb.toByteArray(), sizeof(HeartBeatPacketHeader));
  188. if (hb.packetSize > UDP_DATA_MAX)
  189. {
  190. IWARNLOG("Progress packet too big! progress lost");
  191. hb.progressSize = 0;
  192. hb.packetSize = sizeof(HeartBeatPacketHeader);
  193. }
  194. sock->write(mb.toByteArray(), mb.length());
  195. }
  196. };
  197. class CGraphProgressMPHandler : public CGraphProgressHandlerBase
  198. {
  199. public:
  200. CGraphProgressMPHandler()
  201. {
  202. start();
  203. }
  204. virtual void sendData(MemoryBuffer &mb) override
  205. {
  206. CMessageBuffer msg;
  207. msg.swapWith(mb);
  208. queryNodeComm().send(msg, 0, MPTAG_THORWATCHDOG);
  209. }
  210. };
  211. ISlaveWatchdog *createProgressHandler(bool udp)
  212. {
  213. if (udp)
  214. return new CGraphProgressUDPHandler();
  215. else
  216. return new CGraphProgressMPHandler();
  217. }