mplog.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #define mp_decl DECL_EXPORT
  14. #include "mplog.hpp"
  15. #include "mplog.ipp"
  16. #include "mpcomm.hpp"
  17. LogMsgChildReceiverThread * childReceiver;
  18. LogMsgParentReceiverThread * parentReceiver;
  19. ILogMsgManager * listener;
  20. // PARENT-SIDE CLASSES
  21. // LogMsgLogReceiverThread
  22. int LogMsgLogReceiverThread::run()
  23. {
  24. while(!done)
  25. {
  26. try
  27. {
  28. if(queryWorldCommunicator().recv(in, childNode, MPTAG_JLOG_CHILD_TO_PARENT))
  29. {
  30. msgBuffer.deserialize(in, true);
  31. if(isListener)
  32. listener->report(msgBuffer);
  33. else
  34. queryLogMsgManager()->report(msgBuffer);
  35. }
  36. }
  37. catch(IException * e)
  38. {
  39. done = true;
  40. CMessageBuffer out;
  41. out.append('D').append(cid);
  42. try
  43. {
  44. queryWorldCommunicator().send(out, queryMyNode(), MPTAG_JLOG_CONNECT_TO_PARENT, MP_ASYNC_SEND);
  45. }
  46. catch(IException * ee)
  47. {
  48. ee->Release();
  49. }
  50. e->Release();
  51. }
  52. }
  53. return 0;
  54. }
  55. void LogMsgLogReceiverThread::stop()
  56. {
  57. if(!done)
  58. {
  59. done = true;
  60. queryWorldCommunicator().cancel(childNode, MPTAG_JLOG_CHILD_TO_PARENT);
  61. join();
  62. }
  63. }
  64. // CLogMsgLinkToChild
  65. CLogMsgLinkToChild::CLogMsgLinkToChild(MPLogId _cid, MPLogId _pid, INode * _childNode, bool isListener, bool _connected)
  66. : childNode(_childNode), cid(_cid), pid(_pid), connected(_connected)
  67. {
  68. receiverThread.setown(new LogMsgLogReceiverThread(cid, childNode, isListener));
  69. receiverThread->start();
  70. }
  71. CLogMsgLinkToChild::~CLogMsgLinkToChild()
  72. {
  73. try
  74. {
  75. if(connected)
  76. disconnect();
  77. receiverThread->stop();
  78. }
  79. catch (IException * e)
  80. {
  81. //Likely that the logging is closing down, so silently discard any exceptions
  82. e->Release();
  83. }
  84. }
  85. void CLogMsgLinkToChild::sendFilter(ILogMsgFilter * filter) const
  86. {
  87. CMessageBuffer out;
  88. filter->serialize(out, false);
  89. try
  90. {
  91. queryWorldCommunicator().send(out, childNode, MPTAG_JLOG_PARENT_TO_CHILD, MP_ASYNC_SEND);
  92. }
  93. catch(IException * e)
  94. {
  95. e->Release();
  96. }
  97. }
  98. void CLogMsgLinkToChild::connect()
  99. {
  100. CMessageBuffer out;
  101. out.append('A').append(cid);
  102. try
  103. {
  104. queryWorldCommunicator().sendRecv(out, childNode, MPTAG_JLOG_CONNECT_TO_CHILD);
  105. }
  106. catch(IException * e)
  107. {
  108. e->Release();
  109. }
  110. out.read(pid);
  111. connected = true;
  112. }
  113. void CLogMsgLinkToChild::disconnect()
  114. {
  115. CMessageBuffer out;
  116. out.append('D').append(pid);
  117. try
  118. {
  119. queryWorldCommunicator().send(out, childNode, MPTAG_JLOG_CONNECT_TO_CHILD, MP_ASYNC_SEND);
  120. }
  121. catch(IException * e)
  122. {
  123. e->Release();
  124. }
  125. connected = false;
  126. }
  127. // LogMsgChildReceiverThread
  128. int LogMsgChildReceiverThread::run()
  129. {
  130. INode * sender;
  131. char ctrl;
  132. while(!done)
  133. {
  134. try
  135. {
  136. if(queryWorldCommunicator().recv(in, 0, MPTAG_JLOG_CONNECT_TO_PARENT, &sender))
  137. {
  138. in.read(ctrl);
  139. if(ctrl=='A')
  140. {
  141. MPLogId pid;
  142. in.read(pid);
  143. MPLogId cid = addChildToManager(pid, sender, false, true);
  144. StringBuffer buff;
  145. in.clear().append(cid);
  146. queryWorldCommunicator().reply(in, MP_ASYNC_SEND);
  147. }
  148. else if(ctrl=='D')
  149. {
  150. MPLogId cid;
  151. in.read(cid);
  152. removeChildFromManager(cid, true);
  153. }
  154. else
  155. ERRLOG("LogMsgChildReceiverThread::run() : unknown control character on received message");
  156. if(sender) sender->Release();
  157. }
  158. }
  159. catch(IException * e)
  160. {
  161. EXCLOG(e, "LogMsgChildReceiverThread::run()");
  162. e->Release();
  163. }
  164. catch(...)
  165. {
  166. ERRLOG("LogMsgChildReceiverThread::run() : unknown exception");
  167. }
  168. }
  169. return 0;
  170. }
  171. void LogMsgChildReceiverThread::stop()
  172. {
  173. done = true;
  174. queryWorldCommunicator().cancel(0, MPTAG_JLOG_CONNECT_TO_PARENT);
  175. join();
  176. }
  177. MPLogId LogMsgChildReceiverThread::addChildToManager(MPLogId pid, INode * childNode, bool isListener, bool connected)
  178. {
  179. CriticalBlock critBlock(tableOfChildrenCrit);
  180. aindex_t pos = findChild(childNode);
  181. if(pos != NotFound)
  182. {
  183. if(isListener)
  184. table.item(pos).queryLink()->sendFilterOwn(listener->getCompoundFilter());
  185. else
  186. table.item(pos).queryLink()->sendFilterOwn(queryLogMsgManager()->getCompoundFilter());
  187. return false;
  188. }
  189. MPLogId cid = ++nextId;
  190. ILogMsgLinkToChild * link = new CLogMsgLinkToChild(cid, pid, childNode, isListener, connected);
  191. if(!connected) link->connect();
  192. if(isListener)
  193. {
  194. link->sendFilterOwn(listener->getCompoundFilter());
  195. listener->addChildOwn(link);
  196. }
  197. else
  198. {
  199. link->sendFilterOwn(queryLogMsgManager()->getCompoundFilter());
  200. queryLogMsgManager()->addChildOwn(link);
  201. }
  202. table.append(*new IdLinkToChildPair(cid, childNode, link, isListener));
  203. return cid;
  204. }
  205. bool LogMsgChildReceiverThread::removeChildFromManager(MPLogId cid, bool disconnected)
  206. {
  207. CriticalBlock critBlock(tableOfChildrenCrit);
  208. aindex_t pos = findChild(cid);
  209. if(pos == NotFound) return false;
  210. doRemoveChildFromManager(pos, disconnected);
  211. return true;
  212. }
  213. bool LogMsgChildReceiverThread::removeChildFromManager(INode const * node, bool disconnected)
  214. {
  215. StringBuffer buff;
  216. CriticalBlock critBlock(tableOfChildrenCrit);
  217. aindex_t pos = findChild(node);
  218. if(pos == NotFound) return false;
  219. doRemoveChildFromManager(pos, disconnected);
  220. return true;
  221. }
  222. void LogMsgChildReceiverThread::doRemoveChildFromManager(aindex_t pos, bool disconnected)
  223. {
  224. ILogMsgLinkToChild * link = table.item(pos).queryLink();
  225. bool isListener = table.item(pos).isListener();
  226. table.remove(pos);
  227. if(disconnected) link->markDisconnected();
  228. if(isListener)
  229. listener->removeChild(link);
  230. else
  231. queryLogMsgManager()->removeChild(link);
  232. }
  233. aindex_t LogMsgChildReceiverThread::findChild(MPLogId cid) const
  234. {
  235. ForEachItemIn(i, table)
  236. if(table.item(i).queryId() == cid) return i;
  237. return NotFound;
  238. }
  239. aindex_t LogMsgChildReceiverThread::findChild(INode const * node) const
  240. {
  241. ForEachItemIn(i, table)
  242. if(table.item(i).queryNode()->equals(node)) return i;
  243. return NotFound;
  244. }
  245. // PARENT-SIDE HELPER FUNCTIONS
  246. bool connectLogMsgManagerToChild(INode * childNode)
  247. {
  248. assertex(childReceiver);
  249. return (childReceiver->addChildToManager(0, childNode, false, false) != 0);
  250. }
  251. bool connectLogMsgManagerToChildOwn(INode * childNode)
  252. {
  253. bool ret = connectLogMsgManagerToChild(childNode);
  254. childNode->Release();
  255. return ret;
  256. }
  257. bool disconnectLogMsgManagerFromChild(INode * childNode)
  258. {
  259. return childReceiver->removeChildFromManager(childNode, false);
  260. }
  261. bool disconnectLogMsgManagerFromChildOwn(INode * childNode)
  262. {
  263. bool ret = disconnectLogMsgManagerFromChild(childNode);
  264. childNode->Release();
  265. return ret;
  266. }
  267. void startLogMsgChildReceiver()
  268. {
  269. childReceiver = new LogMsgChildReceiverThread();
  270. childReceiver->startRelease();
  271. }
  272. // CHILD-SIDE CLASSES
  273. // LogMsgFilterReceiverThread
  274. int LogMsgFilterReceiverThread::run()
  275. {
  276. assertex(handler);
  277. while(!done)
  278. {
  279. try
  280. {
  281. if(queryWorldCommunicator().recv(in, parentNode, MPTAG_JLOG_PARENT_TO_CHILD))
  282. queryLogMsgManager()->changeMonitorFilterOwn(handler, getDeserializedLogMsgFilter(in));
  283. }
  284. catch(IException * e)
  285. {
  286. if (!done) {
  287. done = true;
  288. CMessageBuffer out;
  289. out.append('D').append(pid);
  290. try
  291. {
  292. queryWorldCommunicator().send(out, queryMyNode(), MPTAG_JLOG_CONNECT_TO_CHILD, MP_ASYNC_SEND);
  293. }
  294. catch(IException * ee)
  295. {
  296. ee->Release();
  297. }
  298. }
  299. e->Release();
  300. }
  301. }
  302. return 0;
  303. }
  304. void LogMsgFilterReceiverThread::stop()
  305. {
  306. if(!done)
  307. {
  308. done = true;
  309. queryWorldCommunicator().cancel(parentNode, MPTAG_JLOG_PARENT_TO_CHILD);
  310. Sleep(10); // swap
  311. if (!join(1000*60*5)) // should be pretty instant
  312. WARNLOG("LogMsgFilterReceiverThread::stop timed out");
  313. }
  314. }
  315. // LinkToParentLogMsgHandler
  316. LinkToParentLogMsgHandler::~LinkToParentLogMsgHandler()
  317. {
  318. try
  319. {
  320. if(connected)
  321. disconnect();
  322. receiverThread->stop();
  323. }
  324. catch (IException * e)
  325. {
  326. //Likely that the logging is closing down, so silently discard any exceptions
  327. e->Release();
  328. }
  329. }
  330. void LinkToParentLogMsgHandler::handleMessage(const LogMsg & msg)
  331. {
  332. CMessageBuffer out;
  333. msg.serialize(out);
  334. try
  335. {
  336. queryWorldCommunicator().send(out, parentNode, MPTAG_JLOG_CHILD_TO_PARENT, MP_ASYNC_SEND);
  337. }
  338. catch(IException * e)
  339. {
  340. //logging this exception would be a bad idea...
  341. e->Release();
  342. }
  343. }
  344. void LinkToParentLogMsgHandler::addToPTree(IPropertyTree * tree) const
  345. {
  346. IPropertyTree * handlerTree = createPTree(ipt_caseInsensitive);
  347. handlerTree->setProp("@type", "linktoparent");
  348. StringBuffer buff;
  349. parentNode->endpoint().getUrlStr(buff);
  350. handlerTree->setProp("@url", buff.str());
  351. tree->addPropTree("handler", handlerTree);
  352. }
  353. ILogMsgFilter * LinkToParentLogMsgHandler::receiveFilter() const
  354. {
  355. CMessageBuffer in;
  356. if(queryWorldCommunicator().recv(in, parentNode, MPTAG_JLOG_PARENT_TO_CHILD))
  357. return getDeserializedLogMsgFilter(in);
  358. else
  359. return getPassNoneLogMsgFilter(); /* Print some kind of warning here? */
  360. }
  361. void LinkToParentLogMsgHandler::connect()
  362. {
  363. CMessageBuffer out;
  364. out.append('A').append(pid);
  365. try
  366. {
  367. queryWorldCommunicator().sendRecv(out, parentNode, MPTAG_JLOG_CONNECT_TO_PARENT);
  368. }
  369. catch(IException * e)
  370. {
  371. e->Release();
  372. }
  373. out.read(cid);
  374. connected = true;
  375. }
  376. void LinkToParentLogMsgHandler::disconnect()
  377. {
  378. CMessageBuffer out;
  379. out.append('D').append(cid);
  380. try
  381. {
  382. queryWorldCommunicator().send(out, parentNode, MPTAG_JLOG_CONNECT_TO_PARENT, MP_ASYNC_SEND);
  383. }
  384. catch(IException * e)
  385. {
  386. e->Release();
  387. }
  388. connected = false;
  389. }
  390. // LogMsgParentReceiverThread
  391. int LogMsgParentReceiverThread::run()
  392. {
  393. char ctrl;
  394. INode * sender;
  395. while(!done)
  396. {
  397. try
  398. {
  399. if(queryWorldCommunicator().recv(in, 0, MPTAG_JLOG_CONNECT_TO_CHILD, &sender))
  400. {
  401. in.read(ctrl);
  402. if(ctrl=='A')
  403. {
  404. if (in.getReplyTag()!=TAG_NULL) { // protect against old clients crashing Dali
  405. MPLogId cid;
  406. in.read(cid);
  407. MPLogId pid = getNextId();
  408. StringBuffer buff;
  409. in.clear().append(pid);
  410. queryWorldCommunicator().reply(in, MP_ASYNC_SEND);
  411. addParentToManager(cid, pid, sender, true);
  412. }
  413. }
  414. else if(ctrl=='D')
  415. {
  416. MPLogId pid;
  417. in.read(pid);
  418. removeParentFromManager(pid, true);
  419. }
  420. else
  421. ERRLOG("LogMsgParentReceiverThread::run() : unknown control character on received message");
  422. if(sender) sender->Release();
  423. }
  424. }
  425. catch(IException * e)
  426. {
  427. EXCLOG(e, "LogMsgParentReceiverThread::run()");
  428. e->Release();
  429. }
  430. catch(...)
  431. {
  432. ERRLOG("LogMsgParentReceiverThread::run() : unknown exception");
  433. }
  434. }
  435. return 0;
  436. }
  437. void LogMsgParentReceiverThread::stop()
  438. {
  439. done = true;
  440. queryWorldCommunicator().cancel(0, MPTAG_JLOG_CONNECT_TO_CHILD);
  441. join();
  442. }
  443. MPLogId LogMsgParentReceiverThread::getNextId()
  444. {
  445. CriticalBlock critBlock(tableOfParentsCrit);
  446. return ++nextId;
  447. }
  448. bool LogMsgParentReceiverThread::addParentToManager(MPLogId cid, MPLogId pid, INode * parentNode, bool connected)
  449. {
  450. CriticalBlock critBlock(tableOfParentsCrit);
  451. if(findParent(parentNode) != NotFound) return false;
  452. Owned<LinkToParentLogMsgHandler> linkHandler(new LinkToParentLogMsgHandler(cid, pid, parentNode, connected));
  453. if(!connected) linkHandler->connect();
  454. Owned<ILogMsgFilter> filter(linkHandler->receiveFilter());
  455. queryLogMsgManager()->addMonitor(linkHandler, filter);
  456. table.append(*new IdLinkToParentPair(pid, parentNode, linkHandler));
  457. linkHandler->startReceiver();
  458. return true;
  459. }
  460. bool LogMsgParentReceiverThread::removeParentFromManager(MPLogId pid, bool disconnected)
  461. {
  462. CriticalBlock critBlock(tableOfParentsCrit);
  463. aindex_t pos = findParent(pid);
  464. if(pos == NotFound) return false;
  465. doRemoveParentFromManager(pos, disconnected);
  466. return true;
  467. }
  468. bool LogMsgParentReceiverThread::removeParentFromManager(const INode * parentNode, bool disconnected)
  469. {
  470. CriticalBlock critBlock(tableOfParentsCrit);
  471. aindex_t pos = findParent(parentNode);
  472. if(pos == NotFound) return false;
  473. doRemoveParentFromManager(pos, disconnected);
  474. return true;
  475. }
  476. void LogMsgParentReceiverThread::doRemoveParentFromManager(aindex_t pos, bool disconnected)
  477. {
  478. LinkToParentLogMsgHandler * linkHandler = table.item(pos).queryLinkHandler();
  479. table.remove(pos);
  480. if(disconnected) linkHandler->markDisconnected();
  481. queryLogMsgManager()->removeMonitor(linkHandler);
  482. }
  483. aindex_t LogMsgParentReceiverThread::findParent(MPLogId pid) const
  484. {
  485. ForEachItemIn(i, table)
  486. if(table.item(i).queryId() == pid) return i;
  487. return NotFound;
  488. }
  489. aindex_t LogMsgParentReceiverThread::findParent(const INode * node) const
  490. {
  491. ForEachItemIn(i, table)
  492. if(table.item(i).queryNode()->equals(node)) return i;
  493. return NotFound;
  494. }
  495. // CHILD-SIDE HELPER FUNCTIONS
  496. bool connectLogMsgManagerToParent(INode * parentNode)
  497. {
  498. assertex(parentReceiver);
  499. MPLogId pid = parentReceiver->getNextId();
  500. return parentReceiver->addParentToManager(0, pid, parentNode, false);
  501. }
  502. bool connectLogMsgManagerToParentOwn(INode * parentNode)
  503. {
  504. bool ret = connectLogMsgManagerToParent(parentNode);
  505. parentNode->Release();
  506. return ret;
  507. }
  508. bool disconnectLogMsgManagerFromParent(INode * parentNode)
  509. {
  510. return parentReceiver->removeParentFromManager(parentNode, false);
  511. }
  512. bool disconnectLogMsgManagerFromParentOwn(INode * parentNode)
  513. {
  514. bool ret = disconnectLogMsgManagerFromParent(parentNode);
  515. parentNode->Release();
  516. return ret;
  517. }
  518. void startLogMsgParentReceiver()
  519. {
  520. parentReceiver = new LogMsgParentReceiverThread();
  521. parentReceiver->startRelease();
  522. }
  523. // MISC. HELPER FUNCTION
  524. bool isMPLogMsgMonitor(ILogMsgHandler * handler)
  525. {
  526. return (dynamic_cast<LinkToParentLogMsgHandler *>(handler) != NULL);
  527. }
  528. void stopLogMsgReceivers()
  529. {
  530. if(parentReceiver)
  531. {
  532. parentReceiver->Link();
  533. parentReceiver->stop();
  534. parentReceiver->Release();
  535. }
  536. parentReceiver = 0;
  537. if(childReceiver)
  538. {
  539. childReceiver->Link();
  540. childReceiver->stop();
  541. childReceiver->Release();
  542. }
  543. childReceiver = 0;
  544. queryLogMsgManager()->removeMonitorsMatching(isMPLogMsgMonitor);
  545. queryLogMsgManager()->removeAllChildren();
  546. }
  547. // LISTENER HELPER FUNCTIONS
  548. ILogMsgListener * startLogMsgListener()
  549. {
  550. if(!listener)
  551. listener = createLogMsgManager();
  552. return listener;
  553. }
  554. void stopLogMsgListener()
  555. {
  556. if(listener)
  557. {
  558. listener->Release();
  559. listener = 0;
  560. }
  561. }
  562. bool connectLogMsgListenerToChild(INode * childNode)
  563. {
  564. assertex(childReceiver);
  565. return (childReceiver->addChildToManager(0, childNode, true, false) != 0);
  566. }
  567. bool connectLogMsgListenerToChildOwn(INode * childNode)
  568. {
  569. bool ret = connectLogMsgListenerToChild(childNode);
  570. childNode->Release();
  571. return ret;
  572. }
  573. bool disconnectLogMsgListenerFromChild(INode * childNode)
  574. {
  575. return childReceiver->removeChildFromManager(childNode, false);
  576. }
  577. bool disconnectLogMsgListenerFromChildOwn(INode * childNode)
  578. {
  579. bool ret = disconnectLogMsgListenerFromChild(childNode);
  580. childNode->Release();
  581. return ret;
  582. }