mplog.ipp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef MPLOG_IPP
  14. #define MPLOG_IPP
  15. #include "mpbuff.hpp"
  16. #include "mpbase.hpp"
  17. #include "mplog.hpp"
  18. typedef unsigned __int64 MPLogId;
  19. // Base class for the receiver threads used below
  20. class LogMsgReceiverThread : extends Thread
  21. {
  22. public:
  23. LogMsgReceiverThread(char const * name) : Thread(name), done(false) {}
  24. protected:
  25. CMessageBuffer in;
  26. bool done;
  27. };
  28. // PARENT-SIDE CLASSES
  29. // Thread in CLogMsgLinkToChild which receives logs from child
  30. class LogMsgLogReceiverThread : public LogMsgReceiverThread
  31. {
  32. public:
  33. LogMsgLogReceiverThread(MPLogId _cid, INode * _child, bool _isListener) : LogMsgReceiverThread("LogMsgLogReceiver"), childNode(_child), cid(_cid), isListener(_isListener) {}
  34. int run();
  35. void stop();
  36. private:
  37. LogMsg msgBuffer;
  38. INode * childNode;
  39. MPLogId cid;
  40. bool isListener;
  41. };
  42. // Class on managers's list of children which sends new filters to children, and holds thread which receives log messages
  43. class CLogMsgLinkToChild : implements ILogMsgLinkToChild, public CInterface
  44. {
  45. public:
  46. CLogMsgLinkToChild(MPLogId _cid, MPLogId _pid, INode * _childNode, bool isListener, bool _connected = false);
  47. ~CLogMsgLinkToChild();
  48. IMPLEMENT_IINTERFACE;
  49. void sendFilter(ILogMsgFilter * filter) const;
  50. void sendFilterOwn(ILogMsgFilter * filter) const { sendFilter(filter); filter->Release(); }
  51. bool linksTo(SocketEndpoint const & node) const { return childNode->endpoint().equals(node); }
  52. void connect();
  53. void disconnect();
  54. bool queryConnected() const { return connected; }
  55. void markDisconnected() { connected = false; }
  56. private:
  57. Linked<INode> childNode;
  58. MPLogId cid;
  59. MPLogId pid;
  60. Owned<LogMsgLogReceiverThread> receiverThread;
  61. bool connected;
  62. };
  63. // Pairing of MPLogId and ILogMsgLinkToChild * for LogMsgChildReceiverThread's table
  64. class IdLinkToChildPair : public CInterface
  65. {
  66. public:
  67. IdLinkToChildPair(MPLogId _cid, INode const * _node, ILogMsgLinkToChild * _link, bool _isListener) : cid(_cid), node(_node), link(_link), isList(_isListener) {}
  68. MPLogId queryId() const { return cid; }
  69. INode const * queryNode() const { return node; }
  70. ILogMsgLinkToChild * queryLink() const { return link; }
  71. bool isListener() const { return isList; }
  72. private:
  73. MPLogId cid;
  74. INode const * node;
  75. ILogMsgLinkToChild * link;
  76. bool isList;
  77. };
  78. // Thread in CLogMsgManager which receives adoption requests from children
  79. class LogMsgChildReceiverThread : public LogMsgReceiverThread
  80. {
  81. public:
  82. LogMsgChildReceiverThread() : LogMsgReceiverThread("LogMsgChildReceiver"), nextId(0) {}
  83. int run();
  84. void stop();
  85. MPLogId addChildToManager(MPLogId pid, INode * childNode, bool isListener, bool connected);
  86. bool removeChildFromManager(MPLogId cid, bool disconnected);
  87. bool removeChildFromManager(INode const * node, bool disconnected);
  88. private:
  89. aindex_t findChild(MPLogId cid) const;
  90. aindex_t findChild(INode const * node) const;
  91. void doRemoveChildFromManager(aindex_t pos, bool disconnected);
  92. private:
  93. CIArrayOf<IdLinkToChildPair> table;
  94. CriticalSection tableOfChildrenCrit;
  95. MPLogId nextId;
  96. };
  97. // CHILD-SIDE CLASSES
  98. // Thread in LinkToParentLogMsgHandler which receives filters from parent
  99. class LogMsgFilterReceiverThread : public LogMsgReceiverThread
  100. {
  101. public:
  102. LogMsgFilterReceiverThread(MPLogId _pid, INode * _parentNode) : LogMsgReceiverThread("LogMsgFilterReceiver"), handler(0), pid(_pid), parentNode(_parentNode) {}
  103. void setHandler(ILogMsgHandler * _handler) { handler = _handler; }
  104. int run();
  105. void stop();
  106. private:
  107. ILogMsgHandler * handler;
  108. MPLogId pid;
  109. INode * parentNode;
  110. };
  111. // Class on manager's list of handlers which sends log messages to a parent, also holds thread which receives filter changes
  112. class LinkToParentLogMsgHandler : implements ILogMsgHandler, public CInterface
  113. {
  114. public:
  115. LinkToParentLogMsgHandler(MPLogId _cid, MPLogId _pid, INode * _parentNode, bool _connected) : parentNode(_parentNode), cid(_cid), pid(_pid), receiverThread(new LogMsgFilterReceiverThread(_pid, _parentNode)), connected(_connected) { receiverThread->setHandler(this); }
  116. ~LinkToParentLogMsgHandler();
  117. IMPLEMENT_IINTERFACE;
  118. void handleMessage(LogMsg const & msg);
  119. bool needsPrep() const { return false; }
  120. void prep() {}
  121. void addToPTree(IPropertyTree * tree) const;
  122. unsigned queryMessageFields() const { return MSGFIELD_all; }
  123. void setMessageFields(unsigned _fields = MSGFIELD_all) {}
  124. ILogMsgFilter * receiveFilter() const;
  125. void startReceiver() { receiverThread->start(); }
  126. void connect();
  127. void disconnect();
  128. bool queryConnected() const { return connected; }
  129. void markDisconnected() { connected = false; }
  130. bool getLogName(StringBuffer &name) const { return false; }
  131. offset_t getLogPosition(StringBuffer &name) const { return 0; }
  132. private:
  133. Linked<INode> parentNode;
  134. MPLogId cid;
  135. MPLogId pid;
  136. Owned<LogMsgFilterReceiverThread> receiverThread;
  137. bool connected;
  138. };
  139. // Pairing of INode * and LinkToParentLogMsgHandler * for LogMsgParentReceiverThread's table
  140. class IdLinkToParentPair : public CInterface
  141. {
  142. public:
  143. IdLinkToParentPair(MPLogId _pid, INode const * _node, LinkToParentLogMsgHandler * _linkHandler) : pid(_pid), node(_node), linkHandler(_linkHandler) {}
  144. MPLogId queryId() const { return pid; }
  145. INode const * queryNode() const { return node; }
  146. LinkToParentLogMsgHandler * queryLinkHandler() const { return linkHandler; }
  147. private:
  148. MPLogId pid;
  149. INode const * node;
  150. LinkToParentLogMsgHandler * linkHandler;
  151. };
  152. // Thread in CLogMsgManager which receives adoption requests from parents
  153. class LogMsgParentReceiverThread : public LogMsgReceiverThread
  154. {
  155. public:
  156. LogMsgParentReceiverThread() : LogMsgReceiverThread("LogMsgParentReceiver"), nextId(0) {}
  157. int run();
  158. void stop();
  159. MPLogId getNextId();
  160. bool addParentToManager(MPLogId cid, MPLogId pid, INode * parentNode, bool connected);
  161. bool removeParentFromManager(MPLogId pid, bool disconnected);
  162. bool removeParentFromManager(INode const * parentNode, bool disconnected);
  163. private:
  164. aindex_t findParent(MPLogId pid) const;
  165. aindex_t findParent(INode const * node) const;
  166. void doRemoveParentFromManager(aindex_t pos, bool disconnected);
  167. private:
  168. CIArrayOf<IdLinkToParentPair> table;
  169. CriticalSection tableOfParentsCrit;
  170. MPLogId nextId;
  171. };
  172. #endif