123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jmisc.hpp"
- #include "jexcept.hpp"
- #include "jdebug.hpp"
- #include "LoggingErrors.hpp"
- #include "LogSerializer.hpp"
- #include "logthread.hpp"
- #include "compressutil.hpp"
- const char* const PropMaxLogQueueLength = "MaxLogQueueLength";
- const char* const PropQueueSizeSignal = "QueueSizeSignal";
- const char* const PropMaxTriesRS = "MaxTriesRS";
- const char* const PropFailSafe = "FailSafe";
- #define MaxLogQueueLength 500000 //Write a warning into log when queue length is greater than 500000
- #define QueueSizeSignal 10000 //Write a warning into log when queue length is increased by 10000
- const int DefaultMaxTriesRS = -1; // Max. # of attempts to send log message to WsReportService. Default: infinite
- extern LOGGINGCOMMON_API IUpdateLogThread* createUpdateLogThread(IPropertyTree* _cfg, const char* _service, const char* _agentName, IEspLogAgent* _logAgent)
- {
- if (!_cfg)
- return NULL;
- IUpdateLogThread* loggingThread = new CLogThread(_cfg, _service, _agentName, _logAgent);
- loggingThread->start();
- return loggingThread;
- }
- CLogThread::CLogThread(IPropertyTree* _cfg , const char* _service, const char* _agentName, IEspLogAgent* _logAgent)
- : stopping(false), agentName(_agentName)
- {
- if(!_agentName || !*_agentName)
- throw MakeStringException(-1,"No Logging agent name defined");
- if(!_cfg)
- throw MakeStringException(-1,"No Logging agent Configuration for %s", _agentName);
- if(!_service || !*_service)
- throw MakeStringException(-1,"No service name defined for %s", _agentName);
- if(!_logAgent)
- throw MakeStringException(-1,"No Logging agent interface for %s", _agentName);
- logAgent.setown(_logAgent);
- maxLogQueueLength = _cfg->getPropInt(PropMaxLogQueueLength, MaxLogQueueLength);
- signalGrowingQueueAt = _cfg->getPropInt(PropQueueSizeSignal, QueueSizeSignal);
- maxLogRetries = _cfg->getPropInt(PropMaxTriesRS, DefaultMaxTriesRS);
- ensureFailSafe = _cfg->getPropBool(PropFailSafe);
- if(ensureFailSafe)
- logFailSafe.setown(createFailSafeLogger(_cfg, _service, _agentName));
- time_t tNow;
- time(&tNow);
- localtime_r(&tNow, &m_startTime);
- }
- CLogThread::~CLogThread()
- {
- ESPLOG(LogMax, "CLogThread::~CLogThread()");
- }
- void CLogThread::start()
- {
- Thread::start();
- }
- int CLogThread::run()
- {
- Link();
- if(logFailSafe.get())
- checkPendingLogs(false);
- while(!stopping)
- {
- m_sem.wait(UPDATELOGTHREADWAITINGTIME);
- sendLog();
- if(logFailSafe.get())
- {
- checkPendingLogs(true);
- checkRollOver();
- }
- }
- Release();
- return 0;
- }
- void CLogThread::stop()
- {
- try
- {
- CriticalBlock b(logQueueCrit);
- if (!logQueue.ordinality() && logFailSafe.get() && logFailSafe->canRollCurrentLog())
- logFailSafe->RollCurrentLog();
- //If logQueue is not empty, the log files are rolled over so that queued jobs can be read
- //when the CLogThread is restarted.
- }
- catch(...)
- {
- DBGLOG("Exception");
- }
- stopping = true;
- m_sem.signal();
- join();
- }
- bool CLogThread::queueLog(IEspUpdateLogRequest* logRequest)
- {
- if (!logRequest)
- return false;
- Owned<IEspUpdateLogRequestWrap> logRequestWrap = new CUpdateLogRequestWrap(NULL, logRequest->getOption(), logRequest->getLogContent());
- return enqueue(logRequestWrap, nullptr);
- }
- bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
- {
- unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
- Owned<IEspUpdateLogRequestWrap> logRequestFiltered = logAgent->filterLogContent(logRequest);
- ESPLOG(LogNormal, "LThread:filterLog: %dms\n", msTick() - startTime);
- return enqueue(logRequestFiltered, nullptr);
- }
- bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid)
- {
- if (logFailSafe.get())
- {
- StringBuffer GUID, reqBuf;
- unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
- if (isEmptyString(guid))
- logFailSafe->GenerateGUID(GUID, nullptr);
- else
- GUID.set(guid);
- logRequest->setGUID(GUID.str());
- if (serializeLogRequestContent(logRequest, reqBuf))
- logFailSafe->Add(GUID, reqBuf.str(), nullptr);
- ESPLOG(LogNormal, "LThread:addToFailSafe: %dms\n", msTick() - startTime);
- }
- writeJobQueue(logRequest);
- m_sem.signal();
- return true;
- }
- void CLogThread::sendLog()
- {
- try
- {
- if(stopping)
- return;
- int recSend = 0;
- while(true)
- {
- IEspUpdateLogRequestWrap* logRequest = readJobQueue();
- if (!logRequest)
- break;
- const char* GUID= logRequest->getGUID();
- if ((!GUID || !*GUID) && ensureFailSafe && logFailSafe.get())
- continue;
- Owned<IEspUpdateLogRequestWrap> logRequestInFile = checkAndReadLogRequestFromSharedTankFile(logRequest);
- try
- {
- unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
- Owned<IEspUpdateLogResponse> logResponse = createUpdateLogResponse();
- if (logRequestInFile)
- logAgent->updateLog(*logRequestInFile, *logResponse);
- else
- logAgent->updateLog(*logRequest, *logResponse);
- if (!logResponse)
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "no response");
- if (logResponse->getStatusCode())
- {
- const char* statusMessage = logResponse->getStatusMessage();
- if(statusMessage && *statusMessage)
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "%s", statusMessage);
- else
- throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Unknown error");
- }
- ESPLOG(LogNormal, "LThread:updateLog: %dms\n", msTick() - startTime);
- if(ensureFailSafe && logFailSafe.get())
- {
- unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
- logFailSafe->AddACK(GUID);
- ESPLOG(LogNormal, "LThread:AddACK: %dms\n", msTick() - startTime1);
- }
- logRequest->Release();//Make sure that no data (such as GUID) is needed before releasing the logRequest.
- }
- catch(IException* e)
- {
- StringBuffer errorStr, errorMessage;
- errorMessage.appendf("Failed to update log for %s: error code %d, error message %s", GUID, e->errorCode(), e->errorMessage(errorStr).str());
- e->Release();
- if (logRequestInFile)
- logRequest->setNoResend(logRequestInFile->getNoResend());
- bool willRetry = false;
- if (!logRequest->getNoResend() && (maxLogRetries != 0))
- {
- unsigned retry = logRequest->incrementRetryCount();
- if (retry > maxLogRetries)
- errorMessage.append(" Max logging retries exceeded.");
- else
- {
- willRetry = true;
- writeJobQueue(logRequest);
- errorMessage.appendf(" Adding back to logging queue for retrying %d.", retry);
- }
- }
- if (!willRetry)
- {
- logRequest->Release();
- }
- IERRLOG("%s", errorMessage.str());
- }
- }
- }
- catch(IException* e)
- {
- StringBuffer errorStr, errorMessage;
- errorMessage.append("Exception thrown within update log thread: error code ").append(e->errorCode()).append(", error message ").append(e->errorMessage(errorStr));
- IERRLOG("%s", errorMessage.str());
- e->Release();
- }
- catch(...)
- {
- IERRLOG("Unknown exception thrown within update log thread");
- }
- return;
- }
- //At first, we check whether the logRequest contains the information about original log Request
- //in shared tank file created by logging manager. If yes, try to read the original log Request
- //based on the information. If the original log Request is found and unserialized, return new
- //IEspUpdateLogRequestWrap which contains original log Request.
- IEspUpdateLogRequestWrap* CLogThread::checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest)
- {
- //Read LogRequestInFile info if exists.
- Owned<IPropertyTree> logInFle = createPTreeFromXMLString(logRequest->getUpdateLogRequest());
- if (!logInFle)
- return nullptr;
- const char* GUID = logInFle->queryProp(LOGREQUEST_GUID);
- if (isEmptyString(GUID))
- return nullptr;
- const char* fileName = logInFle->queryProp(LOGCONTENTINFILE_FILENAME);
- if (isEmptyString(fileName))
- return nullptr;
- __int64 pos = logInFle->getPropInt64(LOGCONTENTINFILE_FILEPOS, -1);
- if (pos < 0)
- return nullptr;
- int size = logInFle->getPropInt64(LOGCONTENTINFILE_FILESIZE, -1);
- if (size < 0)
- return nullptr;
- Owned<CLogRequestInFile> reqInFile = new CLogRequestInFile();
- reqInFile->setGUID(GUID);
- reqInFile->setFileName(fileName);
- reqInFile->setPos(pos);
- reqInFile->setSize(size);
- //Read Log Request from the file
- StringBuffer logRequestStr;
- CLogSerializer logSerializer;
- if (!logSerializer.readLogRequest(reqInFile, logRequestStr))
- {
- ERRLOG("Failed to read Log Request from %s", fileName);
- return nullptr;
- }
- try
- {
- Owned<IPropertyTree> logRequestTree = createPTreeFromXMLString(logRequestStr.str());
- if (!logRequestTree)
- return nullptr;
- const char* guid = logRequestTree->queryProp("GUID");
- const char* opt = logRequestTree->queryProp("Option");
- const char* reqBuf = logRequestTree->queryProp("LogRequest");
- if (isEmptyString(reqBuf))
- return nullptr;
- StringBuffer decoded, req;
- JBASE64_Decode(reqBuf, decoded);
- LZWExpand(decoded, decoded.length(), req);
- return new CUpdateLogRequestWrap(guid, opt, req.str());
- }
- catch(IException* e)
- {
- StringBuffer errorStr;
- ERRLOG("Exception when unserializing Log Request Content: %d %s", e->errorCode(), e->errorMessage(errorStr).str());
- e->Release();
- }
- return nullptr;
- }
- //////////////////////////FailSafe////////////////////////////
- void CLogThread::checkRollOver()
- {
- try
- {
- bool bRollover = false;
- time_t tNow;
- time(&tNow);
- struct tm ltNow;
- localtime_r(&tNow, <Now);
- if ((ltNow.tm_year != m_startTime.tm_year || ltNow.tm_yday != m_startTime.tm_yday))
- {
- bRollover = true;
- localtime_r(&tNow, &m_startTime); // reset the start time for next rollover check
- }
- if (!bRollover)
- return;
- //Rename .log files to .old files
- logFailSafe->SafeRollover();
- CriticalBlock b(logQueueCrit);
- //Check and add queued requests to tank(.log) files
- unsigned numNewArrivals = logQueue.ordinality();
- if(numNewArrivals <= 0)
- return;
- ESPLOG(LogMax, "writing %d requests in the queue to the rolled over tank file.", numNewArrivals);
- unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
- for(unsigned i = 0; i < numNewArrivals; i++)
- {
- IInterface* pRequest = logQueue.item(i);
- if (!pRequest)
- continue;
- IEspUpdateLogRequestWrap* pEspRequest = dynamic_cast<IEspUpdateLogRequestWrap*>(pRequest);
- if(!pEspRequest)
- continue;
- StringBuffer reqBuf;
- const char* GUID = pEspRequest->getGUID();
- if(GUID && *GUID && serializeLogRequestContent(pEspRequest, reqBuf))
- logFailSafe->Add(GUID, reqBuf.str(), nullptr);
- }
- ESPLOG(LogNormal, "LThread:AddFailSafe: %dms\n", msTick() - startTime);
- }
- catch(IException* Ex)
- {
- StringBuffer str;
- Ex->errorMessage(str);
- IERRLOG("Exception thrown during tank file rollover: %s",str.str());
- Ex->Release();
- }
- catch(...)
- {
- IERRLOG("Unknown exception thrown during tank file rollover.");
- }
- }
- unsigned CLogThread::serializeLogRequestContent(IEspUpdateLogRequestWrap* pRequest, StringBuffer& logData)
- {
- const char* GUID = pRequest->getGUID();
- const char* option = pRequest->getOption();
- const char* logRequest = pRequest->getUpdateLogRequest();
- if (GUID && *GUID)
- logData.append("<GUID>").append(GUID).append("</GUID>");
- if (option && *option)
- logData.append("<Option>").append(option).append("</Option>");
- if (logRequest && *logRequest)
- {
- StringBuffer buffer;
- JBASE64_Encode(logRequest, strlen(logRequest), buffer, true);
- logData.append("<LogRequest>").append(buffer.str()).append("</LogRequest>");
- }
- return logData.length();
- }
- void CLogThread::checkPendingLogs(bool bOneRecOnly)
- {
- try
- {
- bool queueLogError = false;
- bool bFirst = true;
- StringBuffer GUID, logData;
- while (logFailSafe->PopPendingLogRecord(GUID, logData))
- {
- if (bFirst && !bOneRecOnly)
- {
- DBGLOG("We have old logs!. Will now try and recover the lost log messages");
- bFirst = false;
- }
- Owned<IEspUpdateLogRequestWrap> logRequest = unserializeLogRequestContent(logData.str());
- if (!logRequest)
- IERRLOG("checkPendingLogs: failed to unserialize: %s", logData.str());
- else if (!enqueue(logRequest, GUID))
- {
- OERRLOG("checkPendingLogs: failed to add a log request to queue");
- queueLogError=true;
- }
- if (bOneRecOnly)
- break;
- }
- //if everything went ok then we should be able to rollover the old logs.
- if (!queueLogError && !bOneRecOnly)
- logFailSafe->RollOldLogs();
- }
- catch(IException* ex)
- {
- StringBuffer errorStr;
- ex->errorMessage(errorStr);
- IERRLOG("CheckPendingLogs: %s:" ,errorStr.str());
- ex->Release();
- }
- catch(...)
- {
- IERRLOG("Unknown exception thrown in CheckPendingLogs");
- }
- }
- IEspUpdateLogRequestWrap* CLogThread::unserializeLogRequestContent(const char* logData)
- {
- if (!logData && *logData)
- return NULL;
- Owned<IPropertyTree> pLogTree = createPTreeFromXMLString(logData);
- if (!pLogTree)
- return NULL;
- const char* guid = pLogTree->queryProp("GUID");
- const char* opt = pLogTree->queryProp("Option");
- const char* logRequest = pLogTree->queryProp("LogRequest");
- if (!logRequest || !*logRequest)
- return NULL;
- StringBuffer buffer;
- JBASE64_Decode(logRequest, buffer);
- return new CUpdateLogRequestWrap(guid, opt, buffer.str());
- };
- void CLogThread::writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite)
- {
- if (jobToWrite)
- {
- unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
- CriticalBlock b(logQueueCrit);
- ESPLOG(LogNormal, "LThread:waitWQ: %dms\n", msTick() - startTime);
- int QueueSize = logQueue.ordinality();
- if(QueueSize > maxLogQueueLength)
- OERRLOG("LOGGING QUEUE SIZE %d EXCEEDED MaxLogQueueLength %d, check the logging server.",QueueSize, maxLogQueueLength);
- if(QueueSize!=0 && QueueSize % signalGrowingQueueAt == 0)
- OERRLOG("Logging Queue at %d records. Check the logging server.",QueueSize);
- logQueue.enqueue(LINK(jobToWrite));
- }
- }
- IEspUpdateLogRequestWrap* CLogThread::readJobQueue()
- {
- #define LOG_LEVEL LogNormal
- unsigned startTime = (getEspLogLevel()>=LOG_LEVEL) ? msTick() : 0;
- CriticalBlock b(logQueueCrit);
- unsigned delta = (getEspLogLevel()>=LOG_LEVEL) ? msTick() - startTime : 0;
- if (delta > 1) // <=1ms is not indicative of an unexpected delay
- ESPLOG(LOG_LEVEL, "LThread:waitRQ: %dms", delta);
- return (IEspUpdateLogRequestWrap*)logQueue.dequeue();
- #undef LOG_LEVEL
- }
|