mplog.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define mp_decl DECL_EXPORT
  14. #include "mplog.hpp"
  15. #include "mplog.ipp"
  16. #include "mpcomm.hpp"
  17. LogMsgChildReceiverThread * childReceiver;
  18. LogMsgParentReceiverThread * parentReceiver;
  19. ILogMsgManager * listener;
  20. // PARENT-SIDE CLASSES
  21. // LogMsgLogReceiverThread
  22. int LogMsgLogReceiverThread::run()
  23. {
  24. while(!done)
  25. {
  26. try
  27. {
  28. if(queryWorldCommunicator().recv(in, childNode, MPTAG_JLOG_CHILD_TO_PARENT))
  29. {
  30. msgBuffer.deserialize(in, true);
  31. if(isListener)
  32. listener->report(msgBuffer);
  33. else
  34. queryLogMsgManager()->report(msgBuffer);
  35. }
  36. }
  37. catch(IException * e)
  38. {
  39. done = true;
  40. CMessageBuffer out;
  41. out.append('D').append(cid);
  42. try
  43. {
  44. queryWorldCommunicator().send(out, queryMyNode(), MPTAG_JLOG_CONNECT_TO_PARENT, MP_ASYNC_SEND);
  45. }
  46. catch(IException * ee)
  47. {
  48. ee->Release();
  49. }
  50. e->Release();
  51. }
  52. }
  53. return 0;
  54. }
  55. void LogMsgLogReceiverThread::stop()
  56. {
  57. if(!done)
  58. {
  59. done = true;
  60. queryWorldCommunicator().cancel(childNode, MPTAG_JLOG_CHILD_TO_PARENT);
  61. join();
  62. }
  63. }
  64. // CLogMsgLinkToChild
  65. CLogMsgLinkToChild::CLogMsgLinkToChild(MPLogId _cid, MPLogId _pid, INode * _childNode, bool isListener, bool _connected)
  66. : childNode(_childNode), cid(_cid), pid(_pid), connected(_connected)
  67. {
  68. receiverThread.setown(new LogMsgLogReceiverThread(cid, childNode, isListener));
  69. receiverThread->start();
  70. }
  71. CLogMsgLinkToChild::~CLogMsgLinkToChild()
  72. {
  73. if(connected) disconnect();
  74. receiverThread->stop();
  75. }
  76. void CLogMsgLinkToChild::sendFilter(ILogMsgFilter * filter) const
  77. {
  78. CMessageBuffer out;
  79. filter->serialize(out, false);
  80. try
  81. {
  82. queryWorldCommunicator().send(out, childNode, MPTAG_JLOG_PARENT_TO_CHILD, MP_ASYNC_SEND);
  83. }
  84. catch(IException * e)
  85. {
  86. e->Release();
  87. }
  88. }
  89. void CLogMsgLinkToChild::connect()
  90. {
  91. CMessageBuffer out;
  92. out.append('A').append(cid);
  93. try
  94. {
  95. queryWorldCommunicator().sendRecv(out, childNode, MPTAG_JLOG_CONNECT_TO_CHILD);
  96. }
  97. catch(IException * e)
  98. {
  99. e->Release();
  100. }
  101. out.read(pid);
  102. connected = true;
  103. }
  104. void CLogMsgLinkToChild::disconnect()
  105. {
  106. CMessageBuffer out;
  107. out.append('D').append(pid);
  108. try
  109. {
  110. queryWorldCommunicator().send(out, childNode, MPTAG_JLOG_CONNECT_TO_CHILD, MP_ASYNC_SEND);
  111. }
  112. catch(IException * e)
  113. {
  114. e->Release();
  115. }
  116. connected = false;
  117. }
  118. // LogMsgChildReceiverThread
  119. int LogMsgChildReceiverThread::run()
  120. {
  121. INode * sender;
  122. char ctrl;
  123. while(!done)
  124. {
  125. try
  126. {
  127. if(queryWorldCommunicator().recv(in, 0, MPTAG_JLOG_CONNECT_TO_PARENT, &sender))
  128. {
  129. in.read(ctrl);
  130. if(ctrl=='A')
  131. {
  132. MPLogId pid;
  133. in.read(pid);
  134. MPLogId cid = addChildToManager(pid, sender, false, true);
  135. StringBuffer buff;
  136. in.clear().append(cid);
  137. queryWorldCommunicator().reply(in, MP_ASYNC_SEND);
  138. }
  139. else if(ctrl=='D')
  140. {
  141. MPLogId cid;
  142. in.read(cid);
  143. removeChildFromManager(cid, true);
  144. }
  145. else
  146. ERRLOG("LogMsgChildReceiverThread::run() : unknown control character on received message");
  147. if(sender) sender->Release();
  148. }
  149. }
  150. catch(IException * e)
  151. {
  152. EXCLOG(e, "LogMsgChildReceiverThread::run()");
  153. e->Release();
  154. }
  155. catch(...)
  156. {
  157. ERRLOG("LogMsgChildReceiverThread::run() : unknown exception");
  158. }
  159. }
  160. return 0;
  161. }
  162. void LogMsgChildReceiverThread::stop()
  163. {
  164. done = true;
  165. queryWorldCommunicator().cancel(0, MPTAG_JLOG_CONNECT_TO_PARENT);
  166. join();
  167. }
  168. MPLogId LogMsgChildReceiverThread::addChildToManager(MPLogId pid, INode * childNode, bool isListener, bool connected)
  169. {
  170. CriticalBlock critBlock(tableOfChildrenCrit);
  171. aindex_t pos = findChild(childNode);
  172. if(pos != NotFound)
  173. {
  174. if(isListener)
  175. table.item(pos).queryLink()->sendFilterOwn(listener->getCompoundFilter());
  176. else
  177. table.item(pos).queryLink()->sendFilterOwn(queryLogMsgManager()->getCompoundFilter());
  178. return false;
  179. }
  180. MPLogId cid = ++nextId;
  181. ILogMsgLinkToChild * link = new CLogMsgLinkToChild(cid, pid, childNode, isListener, connected);
  182. if(!connected) link->connect();
  183. if(isListener)
  184. {
  185. link->sendFilterOwn(listener->getCompoundFilter());
  186. listener->addChildOwn(link);
  187. }
  188. else
  189. {
  190. link->sendFilterOwn(queryLogMsgManager()->getCompoundFilter());
  191. queryLogMsgManager()->addChildOwn(link);
  192. }
  193. table.append(*new IdLinkToChildPair(cid, childNode, link, isListener));
  194. return cid;
  195. }
  196. bool LogMsgChildReceiverThread::removeChildFromManager(MPLogId cid, bool disconnected)
  197. {
  198. CriticalBlock critBlock(tableOfChildrenCrit);
  199. aindex_t pos = findChild(cid);
  200. if(pos == NotFound) return false;
  201. doRemoveChildFromManager(pos, disconnected);
  202. return true;
  203. }
  204. bool LogMsgChildReceiverThread::removeChildFromManager(INode const * node, bool disconnected)
  205. {
  206. StringBuffer buff;
  207. CriticalBlock critBlock(tableOfChildrenCrit);
  208. aindex_t pos = findChild(node);
  209. if(pos == NotFound) return false;
  210. doRemoveChildFromManager(pos, disconnected);
  211. return true;
  212. }
  213. void LogMsgChildReceiverThread::doRemoveChildFromManager(aindex_t pos, bool disconnected)
  214. {
  215. ILogMsgLinkToChild * link = table.item(pos).queryLink();
  216. bool isListener = table.item(pos).isListener();
  217. table.remove(pos);
  218. if(disconnected) link->markDisconnected();
  219. if(isListener)
  220. listener->removeChild(link);
  221. else
  222. queryLogMsgManager()->removeChild(link);
  223. }
  224. aindex_t LogMsgChildReceiverThread::findChild(MPLogId cid) const
  225. {
  226. ForEachItemIn(i, table)
  227. if(table.item(i).queryId() == cid) return i;
  228. return NotFound;
  229. }
  230. aindex_t LogMsgChildReceiverThread::findChild(INode const * node) const
  231. {
  232. ForEachItemIn(i, table)
  233. if(table.item(i).queryNode()->equals(node)) return i;
  234. return NotFound;
  235. }
  236. // PARENT-SIDE HELPER FUNCTIONS
  237. bool connectLogMsgManagerToChild(INode * childNode)
  238. {
  239. assertex(childReceiver);
  240. return (childReceiver->addChildToManager(0, childNode, false, false) != 0);
  241. }
  242. bool connectLogMsgManagerToChildOwn(INode * childNode)
  243. {
  244. bool ret = connectLogMsgManagerToChild(childNode);
  245. childNode->Release();
  246. return ret;
  247. }
  248. bool disconnectLogMsgManagerFromChild(INode * childNode)
  249. {
  250. return childReceiver->removeChildFromManager(childNode, false);
  251. }
  252. bool disconnectLogMsgManagerFromChildOwn(INode * childNode)
  253. {
  254. bool ret = disconnectLogMsgManagerFromChild(childNode);
  255. childNode->Release();
  256. return ret;
  257. }
  258. void startLogMsgChildReceiver()
  259. {
  260. childReceiver = new LogMsgChildReceiverThread();
  261. childReceiver->startRelease();
  262. }
  263. // CHILD-SIDE CLASSES
  264. // LogMsgFilterReceiverThread
  265. int LogMsgFilterReceiverThread::run()
  266. {
  267. assertex(handler);
  268. while(!done)
  269. {
  270. try
  271. {
  272. if(queryWorldCommunicator().recv(in, parentNode, MPTAG_JLOG_PARENT_TO_CHILD))
  273. queryLogMsgManager()->changeMonitorFilterOwn(handler, getDeserializedLogMsgFilter(in));
  274. }
  275. catch(IException * e)
  276. {
  277. if (!done) {
  278. done = true;
  279. CMessageBuffer out;
  280. out.append('D').append(pid);
  281. try
  282. {
  283. queryWorldCommunicator().send(out, queryMyNode(), MPTAG_JLOG_CONNECT_TO_CHILD, MP_ASYNC_SEND);
  284. }
  285. catch(IException * ee)
  286. {
  287. ee->Release();
  288. }
  289. }
  290. e->Release();
  291. }
  292. }
  293. return 0;
  294. }
  295. void LogMsgFilterReceiverThread::stop()
  296. {
  297. if(!done)
  298. {
  299. done = true;
  300. queryWorldCommunicator().cancel(parentNode, MPTAG_JLOG_PARENT_TO_CHILD);
  301. Sleep(10); // swap
  302. if (!join(1000*60*5)) // should be pretty instant
  303. WARNLOG("LogMsgFilterReceiverThread::stop timed out");
  304. }
  305. }
  306. // LinkToParentLogMsgHandler
  307. void LinkToParentLogMsgHandler::handleMessage(const LogMsg & msg) const
  308. {
  309. CMessageBuffer out;
  310. msg.serialize(out);
  311. try
  312. {
  313. queryWorldCommunicator().send(out, parentNode, MPTAG_JLOG_CHILD_TO_PARENT, MP_ASYNC_SEND);
  314. }
  315. catch(IException * e)
  316. {
  317. //logging this exception would be a bad idea...
  318. e->Release();
  319. }
  320. }
  321. void LinkToParentLogMsgHandler::addToPTree(IPropertyTree * tree) const
  322. {
  323. IPropertyTree * handlerTree = createPTree(ipt_caseInsensitive);
  324. handlerTree->setProp("@type", "linktoparent");
  325. StringBuffer buff;
  326. parentNode->endpoint().getUrlStr(buff);
  327. handlerTree->setProp("@url", buff.str());
  328. tree->addPropTree("handler", handlerTree);
  329. }
  330. ILogMsgFilter * LinkToParentLogMsgHandler::receiveFilter() const
  331. {
  332. CMessageBuffer in;
  333. if(queryWorldCommunicator().recv(in, parentNode, MPTAG_JLOG_PARENT_TO_CHILD))
  334. return getDeserializedLogMsgFilter(in);
  335. else
  336. return getPassNoneLogMsgFilter(); /* Print some kind of warning here? */
  337. }
  338. void LinkToParentLogMsgHandler::connect()
  339. {
  340. CMessageBuffer out;
  341. out.append('A').append(pid);
  342. try
  343. {
  344. queryWorldCommunicator().sendRecv(out, parentNode, MPTAG_JLOG_CONNECT_TO_PARENT);
  345. }
  346. catch(IException * e)
  347. {
  348. e->Release();
  349. }
  350. out.read(cid);
  351. connected = true;
  352. }
  353. void LinkToParentLogMsgHandler::disconnect()
  354. {
  355. CMessageBuffer out;
  356. out.append('D').append(cid);
  357. try
  358. {
  359. queryWorldCommunicator().send(out, parentNode, MPTAG_JLOG_CONNECT_TO_PARENT, MP_ASYNC_SEND);
  360. }
  361. catch(IException * e)
  362. {
  363. e->Release();
  364. }
  365. connected = false;
  366. }
  367. // LogMsgParentReceiverThread
  368. int LogMsgParentReceiverThread::run()
  369. {
  370. char ctrl;
  371. INode * sender;
  372. while(!done)
  373. {
  374. try
  375. {
  376. if(queryWorldCommunicator().recv(in, 0, MPTAG_JLOG_CONNECT_TO_CHILD, &sender))
  377. {
  378. in.read(ctrl);
  379. if(ctrl=='A')
  380. {
  381. if (in.getReplyTag()!=TAG_NULL) { // protect against old clients crashing Dali
  382. MPLogId cid;
  383. in.read(cid);
  384. MPLogId pid = getNextId();
  385. StringBuffer buff;
  386. in.clear().append(pid);
  387. queryWorldCommunicator().reply(in, MP_ASYNC_SEND);
  388. addParentToManager(cid, pid, sender, true);
  389. }
  390. }
  391. else if(ctrl=='D')
  392. {
  393. MPLogId pid;
  394. in.read(pid);
  395. removeParentFromManager(pid, true);
  396. }
  397. else
  398. ERRLOG("LogMsgParentReceiverThread::run() : unknown control character on received message");
  399. if(sender) sender->Release();
  400. }
  401. }
  402. catch(IException * e)
  403. {
  404. EXCLOG(e, "LogMsgParentReceiverThread::run()");
  405. e->Release();
  406. }
  407. catch(...)
  408. {
  409. ERRLOG("LogMsgParentReceiverThread::run() : unknown exception");
  410. }
  411. }
  412. return 0;
  413. }
  414. void LogMsgParentReceiverThread::stop()
  415. {
  416. done = true;
  417. queryWorldCommunicator().cancel(0, MPTAG_JLOG_CONNECT_TO_CHILD);
  418. join();
  419. }
  420. MPLogId LogMsgParentReceiverThread::getNextId()
  421. {
  422. CriticalBlock critBlock(tableOfParentsCrit);
  423. return ++nextId;
  424. }
  425. bool LogMsgParentReceiverThread::addParentToManager(MPLogId cid, MPLogId pid, INode * parentNode, bool connected)
  426. {
  427. CriticalBlock critBlock(tableOfParentsCrit);
  428. if(findParent(parentNode) != NotFound) return false;
  429. Owned<LinkToParentLogMsgHandler> linkHandler(new LinkToParentLogMsgHandler(cid, pid, parentNode, connected));
  430. if(!connected) linkHandler->connect();
  431. Owned<ILogMsgFilter> filter(linkHandler->receiveFilter());
  432. queryLogMsgManager()->addMonitor(linkHandler, filter);
  433. table.append(*new IdLinkToParentPair(pid, parentNode, linkHandler));
  434. linkHandler->startReceiver();
  435. return true;
  436. }
  437. bool LogMsgParentReceiverThread::removeParentFromManager(MPLogId pid, bool disconnected)
  438. {
  439. CriticalBlock critBlock(tableOfParentsCrit);
  440. aindex_t pos = findParent(pid);
  441. if(pos == NotFound) return false;
  442. doRemoveParentFromManager(pos, disconnected);
  443. return true;
  444. }
  445. bool LogMsgParentReceiverThread::removeParentFromManager(const INode * parentNode, bool disconnected)
  446. {
  447. CriticalBlock critBlock(tableOfParentsCrit);
  448. aindex_t pos = findParent(parentNode);
  449. if(pos == NotFound) return false;
  450. doRemoveParentFromManager(pos, disconnected);
  451. return true;
  452. }
  453. void LogMsgParentReceiverThread::doRemoveParentFromManager(aindex_t pos, bool disconnected)
  454. {
  455. LinkToParentLogMsgHandler * linkHandler = table.item(pos).queryLinkHandler();
  456. table.remove(pos);
  457. if(disconnected) linkHandler->markDisconnected();
  458. queryLogMsgManager()->removeMonitor(linkHandler);
  459. }
  460. aindex_t LogMsgParentReceiverThread::findParent(MPLogId pid) const
  461. {
  462. ForEachItemIn(i, table)
  463. if(table.item(i).queryId() == pid) return i;
  464. return NotFound;
  465. }
  466. aindex_t LogMsgParentReceiverThread::findParent(const INode * node) const
  467. {
  468. ForEachItemIn(i, table)
  469. if(table.item(i).queryNode()->equals(node)) return i;
  470. return NotFound;
  471. }
  472. // CHILD-SIDE HELPER FUNCTIONS
  473. bool connectLogMsgManagerToParent(INode * parentNode)
  474. {
  475. assertex(parentReceiver);
  476. MPLogId pid = parentReceiver->getNextId();
  477. return parentReceiver->addParentToManager(0, pid, parentNode, false);
  478. }
  479. bool connectLogMsgManagerToParentOwn(INode * parentNode)
  480. {
  481. bool ret = connectLogMsgManagerToParent(parentNode);
  482. parentNode->Release();
  483. return ret;
  484. }
  485. bool disconnectLogMsgManagerFromParent(INode * parentNode)
  486. {
  487. return parentReceiver->removeParentFromManager(parentNode, false);
  488. }
  489. bool disconnectLogMsgManagerFromParentOwn(INode * parentNode)
  490. {
  491. bool ret = disconnectLogMsgManagerFromParent(parentNode);
  492. parentNode->Release();
  493. return ret;
  494. }
  495. void startLogMsgParentReceiver()
  496. {
  497. parentReceiver = new LogMsgParentReceiverThread();
  498. parentReceiver->startRelease();
  499. }
  500. // MISC. HELPER FUNCTION
  501. bool isMPLogMsgMonitor(ILogMsgHandler * handler)
  502. {
  503. return (dynamic_cast<LinkToParentLogMsgHandler *>(handler) != NULL);
  504. }
  505. void stopLogMsgReceivers()
  506. {
  507. if(parentReceiver)
  508. {
  509. parentReceiver->Link();
  510. parentReceiver->stop();
  511. parentReceiver->Release();
  512. }
  513. parentReceiver = 0;
  514. if(childReceiver)
  515. {
  516. childReceiver->Link();
  517. childReceiver->stop();
  518. childReceiver->Release();
  519. }
  520. childReceiver = 0;
  521. queryLogMsgManager()->removeMonitorsMatching(isMPLogMsgMonitor);
  522. queryLogMsgManager()->removeAllChildren();
  523. }
  524. // LISTENER HELPER FUNCTIONS
  525. ILogMsgListener * startLogMsgListener()
  526. {
  527. if(!listener)
  528. listener = createLogMsgManager();
  529. return listener;
  530. }
  531. void stopLogMsgListener()
  532. {
  533. if(listener)
  534. {
  535. listener->Release();
  536. listener = 0;
  537. }
  538. }
  539. bool connectLogMsgListenerToChild(INode * childNode)
  540. {
  541. assertex(childReceiver);
  542. return (childReceiver->addChildToManager(0, childNode, true, false) != 0);
  543. }
  544. bool connectLogMsgListenerToChildOwn(INode * childNode)
  545. {
  546. bool ret = connectLogMsgListenerToChild(childNode);
  547. childNode->Release();
  548. return ret;
  549. }
  550. bool disconnectLogMsgListenerFromChild(INode * childNode)
  551. {
  552. return childReceiver->removeChildFromManager(childNode, false);
  553. }
  554. bool disconnectLogMsgListenerFromChildOwn(INode * childNode)
  555. {
  556. bool ret = disconnectLogMsgListenerFromChild(childNode);
  557. childNode->Release();
  558. return ret;
  559. }