123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #define mp_decl DECL_EXPORT
- #include "mplog.hpp"
- #include "mplog.ipp"
- #include "mpcomm.hpp"
- LogMsgChildReceiverThread * childReceiver;
- LogMsgParentReceiverThread * parentReceiver;
- ILogMsgManager * listener;
- // PARENT-SIDE CLASSES
- // LogMsgLogReceiverThread
- int LogMsgLogReceiverThread::run()
- {
- while(!done)
- {
- try
- {
- if(queryWorldCommunicator().recv(in, childNode, MPTAG_JLOG_CHILD_TO_PARENT))
- {
- msgBuffer.deserialize(in, true);
- if(isListener)
- listener->report(msgBuffer);
- else
- queryLogMsgManager()->report(msgBuffer);
- }
- }
- catch(IException * e)
- {
- done = true;
- CMessageBuffer out;
- out.append('D').append(cid);
- try
- {
- queryWorldCommunicator().send(out, queryMyNode(), MPTAG_JLOG_CONNECT_TO_PARENT, MP_ASYNC_SEND);
- }
- catch(IException * ee)
- {
- ee->Release();
- }
- e->Release();
- }
- }
- return 0;
- }
- void LogMsgLogReceiverThread::stop()
- {
- if(!done)
- {
- done = true;
- queryWorldCommunicator().cancel(childNode, MPTAG_JLOG_CHILD_TO_PARENT);
- join();
- }
- }
- // CLogMsgLinkToChild
- CLogMsgLinkToChild::CLogMsgLinkToChild(MPLogId _cid, MPLogId _pid, INode * _childNode, bool isListener, bool _connected)
- : childNode(_childNode), cid(_cid), pid(_pid), connected(_connected)
- {
- receiverThread.setown(new LogMsgLogReceiverThread(cid, childNode, isListener));
- receiverThread->start();
- }
- CLogMsgLinkToChild::~CLogMsgLinkToChild()
- {
- try
- {
- if(connected)
- disconnect();
- receiverThread->stop();
- }
- catch (IException * e)
- {
- //Likely that the logging is closing down, so silently discard any exceptions
- e->Release();
- }
- }
- void CLogMsgLinkToChild::sendFilter(ILogMsgFilter * filter) const
- {
- CMessageBuffer out;
- filter->serialize(out, false);
- try
- {
- queryWorldCommunicator().send(out, childNode, MPTAG_JLOG_PARENT_TO_CHILD, MP_ASYNC_SEND);
- }
- catch(IException * e)
- {
- e->Release();
- }
- }
- void CLogMsgLinkToChild::connect()
- {
- CMessageBuffer out;
- out.append('A').append(cid);
- try
- {
- queryWorldCommunicator().sendRecv(out, childNode, MPTAG_JLOG_CONNECT_TO_CHILD);
- }
- catch(IException * e)
- {
- e->Release();
- }
- out.read(pid);
- connected = true;
- }
- void CLogMsgLinkToChild::disconnect()
- {
- CMessageBuffer out;
- out.append('D').append(pid);
- try
- {
- queryWorldCommunicator().send(out, childNode, MPTAG_JLOG_CONNECT_TO_CHILD, MP_ASYNC_SEND);
- }
- catch(IException * e)
- {
- e->Release();
- }
- connected = false;
- }
- // LogMsgChildReceiverThread
- int LogMsgChildReceiverThread::run()
- {
- INode * sender;
- char ctrl;
- while(!done)
- {
- try
- {
- if(queryWorldCommunicator().recv(in, 0, MPTAG_JLOG_CONNECT_TO_PARENT, &sender))
- {
- in.read(ctrl);
- if(ctrl=='A')
- {
- MPLogId pid;
- in.read(pid);
- MPLogId cid = addChildToManager(pid, sender, false, true);
- StringBuffer buff;
- in.clear().append(cid);
- queryWorldCommunicator().reply(in, MP_ASYNC_SEND);
- }
- else if(ctrl=='D')
- {
- MPLogId cid;
- in.read(cid);
- removeChildFromManager(cid, true);
- }
- else
- ERRLOG("LogMsgChildReceiverThread::run() : unknown control character on received message");
- if(sender) sender->Release();
- }
- }
- catch(IException * e)
- {
- EXCLOG(e, "LogMsgChildReceiverThread::run()");
- e->Release();
- }
- catch(...)
- {
- ERRLOG("LogMsgChildReceiverThread::run() : unknown exception");
- }
- }
- return 0;
- }
- void LogMsgChildReceiverThread::stop()
- {
- done = true;
- queryWorldCommunicator().cancel(0, MPTAG_JLOG_CONNECT_TO_PARENT);
- join();
- }
- MPLogId LogMsgChildReceiverThread::addChildToManager(MPLogId pid, INode * childNode, bool isListener, bool connected)
- {
- CriticalBlock critBlock(tableOfChildrenCrit);
- aindex_t pos = findChild(childNode);
- if(pos != NotFound)
- {
- if(isListener)
- table.item(pos).queryLink()->sendFilterOwn(listener->getCompoundFilter());
- else
- table.item(pos).queryLink()->sendFilterOwn(queryLogMsgManager()->getCompoundFilter());
- return false;
- }
- MPLogId cid = ++nextId;
- ILogMsgLinkToChild * link = new CLogMsgLinkToChild(cid, pid, childNode, isListener, connected);
- if(!connected) link->connect();
- if(isListener)
- {
- link->sendFilterOwn(listener->getCompoundFilter());
- listener->addChildOwn(link);
- }
- else
- {
- link->sendFilterOwn(queryLogMsgManager()->getCompoundFilter());
- queryLogMsgManager()->addChildOwn(link);
- }
- table.append(*new IdLinkToChildPair(cid, childNode, link, isListener));
- return cid;
- }
- bool LogMsgChildReceiverThread::removeChildFromManager(MPLogId cid, bool disconnected)
- {
- CriticalBlock critBlock(tableOfChildrenCrit);
- aindex_t pos = findChild(cid);
- if(pos == NotFound) return false;
- doRemoveChildFromManager(pos, disconnected);
- return true;
- }
- bool LogMsgChildReceiverThread::removeChildFromManager(INode const * node, bool disconnected)
- {
- StringBuffer buff;
- CriticalBlock critBlock(tableOfChildrenCrit);
- aindex_t pos = findChild(node);
- if(pos == NotFound) return false;
- doRemoveChildFromManager(pos, disconnected);
- return true;
- }
- void LogMsgChildReceiverThread::doRemoveChildFromManager(aindex_t pos, bool disconnected)
- {
- ILogMsgLinkToChild * link = table.item(pos).queryLink();
- bool isListener = table.item(pos).isListener();
- table.remove(pos);
- if(disconnected) link->markDisconnected();
- if(isListener)
- listener->removeChild(link);
- else
- queryLogMsgManager()->removeChild(link);
- }
- aindex_t LogMsgChildReceiverThread::findChild(MPLogId cid) const
- {
- ForEachItemIn(i, table)
- if(table.item(i).queryId() == cid) return i;
- return NotFound;
- }
- aindex_t LogMsgChildReceiverThread::findChild(INode const * node) const
- {
- ForEachItemIn(i, table)
- if(table.item(i).queryNode()->equals(node)) return i;
- return NotFound;
- }
- // PARENT-SIDE HELPER FUNCTIONS
- bool connectLogMsgManagerToChild(INode * childNode)
- {
- assertex(childReceiver);
- return (childReceiver->addChildToManager(0, childNode, false, false) != 0);
- }
- bool connectLogMsgManagerToChildOwn(INode * childNode)
- {
- bool ret = connectLogMsgManagerToChild(childNode);
- childNode->Release();
- return ret;
- }
- bool disconnectLogMsgManagerFromChild(INode * childNode)
- {
- return childReceiver->removeChildFromManager(childNode, false);
- }
- bool disconnectLogMsgManagerFromChildOwn(INode * childNode)
- {
- bool ret = disconnectLogMsgManagerFromChild(childNode);
- childNode->Release();
- return ret;
- }
- void startLogMsgChildReceiver()
- {
- childReceiver = new LogMsgChildReceiverThread();
- childReceiver->startRelease();
- }
- // CHILD-SIDE CLASSES
- // LogMsgFilterReceiverThread
- int LogMsgFilterReceiverThread::run()
- {
- assertex(handler);
- while(!done)
- {
- try
- {
- if(queryWorldCommunicator().recv(in, parentNode, MPTAG_JLOG_PARENT_TO_CHILD))
- queryLogMsgManager()->changeMonitorFilterOwn(handler, getDeserializedLogMsgFilter(in));
- }
- catch(IException * e)
- {
- if (!done) {
- done = true;
- CMessageBuffer out;
- out.append('D').append(pid);
- try
- {
- queryWorldCommunicator().send(out, queryMyNode(), MPTAG_JLOG_CONNECT_TO_CHILD, MP_ASYNC_SEND);
- }
- catch(IException * ee)
- {
- ee->Release();
- }
- }
- e->Release();
- }
- }
- return 0;
- }
- void LogMsgFilterReceiverThread::stop()
- {
- if(!done)
- {
- done = true;
- queryWorldCommunicator().cancel(parentNode, MPTAG_JLOG_PARENT_TO_CHILD);
- Sleep(10); // swap
- if (!join(1000*60*5)) // should be pretty instant
- WARNLOG("LogMsgFilterReceiverThread::stop timed out");
- }
- }
- // LinkToParentLogMsgHandler
- LinkToParentLogMsgHandler::~LinkToParentLogMsgHandler()
- {
- try
- {
- if(connected)
- disconnect();
- receiverThread->stop();
- }
- catch (IException * e)
- {
- //Likely that the logging is closing down, so silently discard any exceptions
- e->Release();
- }
- }
- void LinkToParentLogMsgHandler::handleMessage(const LogMsg & msg)
- {
- CMessageBuffer out;
- msg.serialize(out);
- try
- {
- queryWorldCommunicator().send(out, parentNode, MPTAG_JLOG_CHILD_TO_PARENT, MP_ASYNC_SEND);
- }
- catch(IException * e)
- {
- //logging this exception would be a bad idea...
- e->Release();
- }
- }
- void LinkToParentLogMsgHandler::addToPTree(IPropertyTree * tree) const
- {
- IPropertyTree * handlerTree = createPTree(ipt_caseInsensitive);
- handlerTree->setProp("@type", "linktoparent");
- StringBuffer buff;
- parentNode->endpoint().getUrlStr(buff);
- handlerTree->setProp("@url", buff.str());
- tree->addPropTree("handler", handlerTree);
- }
- ILogMsgFilter * LinkToParentLogMsgHandler::receiveFilter() const
- {
- CMessageBuffer in;
- if(queryWorldCommunicator().recv(in, parentNode, MPTAG_JLOG_PARENT_TO_CHILD))
- return getDeserializedLogMsgFilter(in);
- else
- return getPassNoneLogMsgFilter(); /* Print some kind of warning here? */
- }
- void LinkToParentLogMsgHandler::connect()
- {
- CMessageBuffer out;
- out.append('A').append(pid);
- try
- {
- queryWorldCommunicator().sendRecv(out, parentNode, MPTAG_JLOG_CONNECT_TO_PARENT);
- }
- catch(IException * e)
- {
- e->Release();
- }
- out.read(cid);
- connected = true;
- }
- void LinkToParentLogMsgHandler::disconnect()
- {
- CMessageBuffer out;
- out.append('D').append(cid);
- try
- {
- queryWorldCommunicator().send(out, parentNode, MPTAG_JLOG_CONNECT_TO_PARENT, MP_ASYNC_SEND);
- }
- catch(IException * e)
- {
- e->Release();
- }
- connected = false;
- }
- // LogMsgParentReceiverThread
- int LogMsgParentReceiverThread::run()
- {
- char ctrl;
- INode * sender;
- while(!done)
- {
- try
- {
- if(queryWorldCommunicator().recv(in, 0, MPTAG_JLOG_CONNECT_TO_CHILD, &sender))
- {
- in.read(ctrl);
- if(ctrl=='A')
- {
- if (in.getReplyTag()!=TAG_NULL) { // protect against old clients crashing Dali
- MPLogId cid;
- in.read(cid);
- MPLogId pid = getNextId();
- StringBuffer buff;
- in.clear().append(pid);
- queryWorldCommunicator().reply(in, MP_ASYNC_SEND);
- addParentToManager(cid, pid, sender, true);
- }
- }
- else if(ctrl=='D')
- {
- MPLogId pid;
- in.read(pid);
- removeParentFromManager(pid, true);
- }
- else
- ERRLOG("LogMsgParentReceiverThread::run() : unknown control character on received message");
- if(sender) sender->Release();
- }
- }
- catch(IException * e)
- {
- EXCLOG(e, "LogMsgParentReceiverThread::run()");
- e->Release();
- }
- catch(...)
- {
- ERRLOG("LogMsgParentReceiverThread::run() : unknown exception");
- }
- }
- return 0;
- }
- void LogMsgParentReceiverThread::stop()
- {
- done = true;
- queryWorldCommunicator().cancel(0, MPTAG_JLOG_CONNECT_TO_CHILD);
- join();
- }
- MPLogId LogMsgParentReceiverThread::getNextId()
- {
- CriticalBlock critBlock(tableOfParentsCrit);
- return ++nextId;
- }
- bool LogMsgParentReceiverThread::addParentToManager(MPLogId cid, MPLogId pid, INode * parentNode, bool connected)
- {
- CriticalBlock critBlock(tableOfParentsCrit);
- if(findParent(parentNode) != NotFound) return false;
- Owned<LinkToParentLogMsgHandler> linkHandler(new LinkToParentLogMsgHandler(cid, pid, parentNode, connected));
- if(!connected) linkHandler->connect();
- Owned<ILogMsgFilter> filter(linkHandler->receiveFilter());
- queryLogMsgManager()->addMonitor(linkHandler, filter);
- table.append(*new IdLinkToParentPair(pid, parentNode, linkHandler));
- linkHandler->startReceiver();
- return true;
- }
- bool LogMsgParentReceiverThread::removeParentFromManager(MPLogId pid, bool disconnected)
- {
- CriticalBlock critBlock(tableOfParentsCrit);
- aindex_t pos = findParent(pid);
- if(pos == NotFound) return false;
- doRemoveParentFromManager(pos, disconnected);
- return true;
- }
- bool LogMsgParentReceiverThread::removeParentFromManager(const INode * parentNode, bool disconnected)
- {
- CriticalBlock critBlock(tableOfParentsCrit);
- aindex_t pos = findParent(parentNode);
- if(pos == NotFound) return false;
- doRemoveParentFromManager(pos, disconnected);
- return true;
- }
- void LogMsgParentReceiverThread::doRemoveParentFromManager(aindex_t pos, bool disconnected)
- {
- LinkToParentLogMsgHandler * linkHandler = table.item(pos).queryLinkHandler();
- table.remove(pos);
- if(disconnected) linkHandler->markDisconnected();
- queryLogMsgManager()->removeMonitor(linkHandler);
- }
- aindex_t LogMsgParentReceiverThread::findParent(MPLogId pid) const
- {
- ForEachItemIn(i, table)
- if(table.item(i).queryId() == pid) return i;
- return NotFound;
- }
- aindex_t LogMsgParentReceiverThread::findParent(const INode * node) const
- {
- ForEachItemIn(i, table)
- if(table.item(i).queryNode()->equals(node)) return i;
- return NotFound;
- }
- // CHILD-SIDE HELPER FUNCTIONS
- bool connectLogMsgManagerToParent(INode * parentNode)
- {
- assertex(parentReceiver);
- MPLogId pid = parentReceiver->getNextId();
- return parentReceiver->addParentToManager(0, pid, parentNode, false);
- }
- bool connectLogMsgManagerToParentOwn(INode * parentNode)
- {
- bool ret = connectLogMsgManagerToParent(parentNode);
- parentNode->Release();
- return ret;
- }
- bool disconnectLogMsgManagerFromParent(INode * parentNode)
- {
- return parentReceiver->removeParentFromManager(parentNode, false);
- }
- bool disconnectLogMsgManagerFromParentOwn(INode * parentNode)
- {
- bool ret = disconnectLogMsgManagerFromParent(parentNode);
- parentNode->Release();
- return ret;
- }
- void startLogMsgParentReceiver()
- {
- parentReceiver = new LogMsgParentReceiverThread();
- parentReceiver->startRelease();
- }
- // MISC. HELPER FUNCTION
- bool isMPLogMsgMonitor(ILogMsgHandler * handler)
- {
- return (dynamic_cast<LinkToParentLogMsgHandler *>(handler) != NULL);
- }
- void stopLogMsgReceivers()
- {
- if(parentReceiver)
- {
- parentReceiver->Link();
- parentReceiver->stop();
- parentReceiver->Release();
- }
- parentReceiver = 0;
- if(childReceiver)
- {
- childReceiver->Link();
- childReceiver->stop();
- childReceiver->Release();
- }
- childReceiver = 0;
- queryLogMsgManager()->removeMonitorsMatching(isMPLogMsgMonitor);
- queryLogMsgManager()->removeAllChildren();
- }
- // LISTENER HELPER FUNCTIONS
- ILogMsgListener * startLogMsgListener()
- {
- if(!listener)
- listener = createLogMsgManager();
- return listener;
- }
-
- void stopLogMsgListener()
- {
- if(listener)
- {
- listener->Release();
- listener = 0;
- }
- }
- bool connectLogMsgListenerToChild(INode * childNode)
- {
- assertex(childReceiver);
- return (childReceiver->addChildToManager(0, childNode, true, false) != 0);
- }
- bool connectLogMsgListenerToChildOwn(INode * childNode)
- {
- bool ret = connectLogMsgListenerToChild(childNode);
- childNode->Release();
- return ret;
- }
- bool disconnectLogMsgListenerFromChild(INode * childNode)
- {
- return childReceiver->removeChildFromManager(childNode, false);
- }
- bool disconnectLogMsgListenerFromChildOwn(INode * childNode)
- {
- bool ret = disconnectLogMsgListenerFromChild(childNode);
- childNode->Release();
- return ret;
- }
|