logthread.cpp 40 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jmisc.hpp"
  14. #include "jexcept.hpp"
  15. #include "jdebug.hpp"
  16. #include "LoggingErrors.hpp"
  17. #include "LogSerializer.hpp"
  18. #include "logthread.hpp"
  19. #include "compressutil.hpp"
  20. #include "logconfigptree.hpp"
  21. using namespace LogConfigPTree;
  22. const char* const PropMaxLogQueueLength = "MaxLogQueueLength";
  23. const char* const PropQueueSizeSignal = "QueueSizeSignal";
  24. const char* const PropMaxTriesRS = "MaxTriesRS";
  25. const char* const PropFailSafe = "FailSafe";
  26. const char* const PropDisableFailSafe = "DisableFailSafe";
  27. const char* const PropAckedFiles = "AckedFiles";
  28. const char* const PropDefaultAckedFiles = "AckedFiles";
  29. const char* const PropAckedLogRequests = "AckedLogRequests";
  30. const char* const PropDefaultAckedLogRequests = "AckedLogRequests";
  31. const char* const PropPendingLogBufferSize = "PendingLogBufferSize";
  32. const char* const PropReadRequestWaitingSeconds = "ReadRequestWaitingSeconds";
  33. const char* const sendLogKeyword = "_sending_";
  34. const unsigned sendLogKeywordLen = strlen(sendLogKeyword);
  35. const unsigned dateTimeStringLength = 19; //yyyy_mm_dd_hh_mm_ss
  36. #define MaxLogQueueLength 500000 //Write a warning into log when queue length is greater than 500000
  37. #define QueueSizeSignal 10000 //Write a warning into log when queue length is increased by 10000
  38. const int DefaultMaxTriesRS = -1; // Max. # of attempts to send log message to WsReportService. Default: infinite
  39. extern LOGGINGCOMMON_API IUpdateLogThread* createUpdateLogThread(IPropertyTree* _cfg, const char* _service, const char* _agentName,
  40. const char* _tankFileDir, IEspLogAgent* _logAgent)
  41. {
  42. if (!_cfg)
  43. return NULL;
  44. IUpdateLogThread* loggingThread = new CLogThread(_cfg, _service, _agentName, _logAgent, _tankFileDir);
  45. loggingThread->start();
  46. return loggingThread;
  47. }
  48. CLogThread::CLogThread(IPropertyTree* _cfg , const char* _service, const char* _agentName, IEspLogAgent* _logAgent, const char* _tankFileDir)
  49. : stopping(false), agentName(_agentName), tankFileDir(_tankFileDir)
  50. {
  51. if(!_agentName || !*_agentName)
  52. throw MakeStringException(-1,"No Logging agent name defined");
  53. if(!_cfg)
  54. throw MakeStringException(-1,"No Logging agent Configuration for %s", _agentName);
  55. if(!_service || !*_service)
  56. throw MakeStringException(-1,"No service name defined for %s", _agentName);
  57. if(!_logAgent)
  58. throw MakeStringException(-1,"No Logging agent interface for %s", _agentName);
  59. logAgent.setown(_logAgent);
  60. maxLogQueueLength = getConfigValue<int>(_cfg, PropMaxLogQueueLength, MaxLogQueueLength);
  61. signalGrowingQueueAt = getConfigValue<int>(_cfg, PropQueueSizeSignal, QueueSizeSignal);
  62. maxLogRetries = getConfigValue<int>(_cfg, PropMaxTriesRS, DefaultMaxTriesRS);
  63. //For decoupled logging, the fail safe is not needed because the logging agent always
  64. //picks up the logging requests from tank file.
  65. ensureFailSafe = getConfigValue<bool>(_cfg, PropFailSafe) && !getConfigValue<bool>(_cfg, PropDisableFailSafe, false);
  66. if(ensureFailSafe)
  67. {
  68. logFailSafe.setown(createFailSafeLogger(_cfg, _service, _agentName));
  69. PROGLOG("FailSafe ensured for %s", agentName.get());
  70. }
  71. time_t tNow;
  72. time(&tNow);
  73. localtime_r(&tNow, &m_startTime);
  74. if (tankFileDir.get())
  75. {
  76. Owned<CLogRequestReaderSettings> settings = new CLogRequestReaderSettings();
  77. settings->tankFileDir.set(tankFileDir.get());
  78. const char* ackedFiles = queryConfigValue(_cfg, PropAckedFiles);
  79. settings->ackedFileList.set(isEmptyString(ackedFiles) ? PropDefaultAckedFiles : ackedFiles);
  80. const char* ackedLogRequestFile = queryConfigValue(_cfg, PropAckedLogRequests);
  81. settings->ackedLogRequestFile.set(isEmptyString(ackedLogRequestFile) ? PropDefaultAckedLogRequests : ackedLogRequestFile);
  82. int pendingLogBufferSize = getConfigValue<int>(_cfg, PropPendingLogBufferSize, DEFAULTPENDINGLOGBUFFERSIZE);
  83. if (pendingLogBufferSize <= 0)
  84. throw MakeStringException(-1, "The %s (%d) should be greater than 0.", PropPendingLogBufferSize, pendingLogBufferSize);
  85. settings->pendingLogBufferSize = pendingLogBufferSize;
  86. int waitSeconds = getConfigValue<int>(_cfg, PropReadRequestWaitingSeconds, DEFAULTREADLOGREQUESTWAITSECOND);
  87. if (waitSeconds <= 0)
  88. throw MakeStringException(-1, "The %s (%d) should be greater than 0.", PropReadRequestWaitingSeconds, waitSeconds);
  89. settings->waitSeconds = waitSeconds;
  90. PROGLOG("%s %s: %s", agentName.get(), PropAckedFiles, settings->ackedFileList.str());
  91. PROGLOG("%s %s: %s", agentName.get(), PropDefaultAckedLogRequests, settings->ackedLogRequestFile.str());
  92. PROGLOG("%s %s: %d. %s: %d", agentName.get(), PropReadRequestWaitingSeconds, settings->waitSeconds, PropPendingLogBufferSize, settings->pendingLogBufferSize);
  93. checkAndCreateFile(settings->ackedFileList);
  94. checkAndCreateFile(settings->ackedLogRequestFile);
  95. logRequestReader.setown(new CLogRequestReader(settings.getClear(), this));
  96. logRequestReader->setTankFilePattern(_service);
  97. }
  98. PROGLOG("%s CLogThread started.", agentName.get());
  99. }
  100. CLogThread::~CLogThread()
  101. {
  102. ESPLOG(LogMax, "CLogThread::~CLogThread()");
  103. }
  104. void CLogThread::start()
  105. {
  106. Thread::start();
  107. }
  108. int CLogThread::run()
  109. {
  110. Link();
  111. if (!logRequestReader && logFailSafe.get())
  112. checkPendingLogs(false);
  113. while(!stopping)
  114. {
  115. m_sem.wait(UPDATELOGTHREADWAITINGTIME);
  116. sendLog();
  117. if (!logRequestReader && logFailSafe.get())
  118. {
  119. checkPendingLogs(true);
  120. checkRollOver();
  121. }
  122. }
  123. Release();
  124. return 0;
  125. }
  126. void CLogThread::stop()
  127. {
  128. try
  129. {
  130. CriticalBlock b(logQueueCrit);
  131. if (!logQueue.ordinality() && !logRequestReader && logFailSafe.get() && logFailSafe->canRollCurrentLog())
  132. logFailSafe->RollCurrentLog();
  133. //If logQueue is not empty, the log files are rolled over so that queued jobs can be read
  134. //when the CLogThread is restarted.
  135. }
  136. catch(...)
  137. {
  138. DBGLOG("Exception");
  139. }
  140. stopping = true;
  141. m_sem.signal();
  142. join();
  143. }
  144. bool CLogThread::queueLog(IEspUpdateLogRequest* logRequest)
  145. {
  146. if (!logRequest)
  147. return false;
  148. Owned<IEspUpdateLogRequestWrap> logRequestWrap = new CUpdateLogRequestWrap(NULL, logRequest->getOption(), logRequest->getLogContent());
  149. return enqueue(logRequestWrap, nullptr);
  150. }
  151. bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
  152. {
  153. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  154. Owned<IEspUpdateLogRequestWrap> logRequestFiltered = logAgent->filterLogContent(logRequest);
  155. ESPLOG(LogNormal, "LThread:filterLog: %dms\n", msTick() - startTime);
  156. return enqueue(logRequestFiltered, nullptr);
  157. }
  158. bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid)
  159. {
  160. if (!logRequestReader && logFailSafe.get())
  161. {
  162. StringBuffer GUID, reqBuf;
  163. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  164. if (isEmptyString(guid))
  165. logFailSafe->GenerateGUID(GUID, nullptr);
  166. else
  167. GUID.set(guid);
  168. logRequest->setGUID(GUID.str());
  169. //This part of code can only be reached for non decoupled logging.
  170. //For non decoupled logging, the manager has filtered out skipped agents.
  171. //This part of code is called only if the agent is not skipped.
  172. //So, the scriptValues section is not needed to be in the logFailSafe->Add().
  173. if (serializeLogRequestContent(logRequest, reqBuf))
  174. logFailSafe->Add(GUID, nullptr, reqBuf.str(), nullptr);
  175. ESPLOG(LogNormal, "LThread:addToFailSafe: %dms\n", msTick() - startTime);
  176. }
  177. writeJobQueue(logRequest);
  178. m_sem.signal();
  179. return true;
  180. }
  181. void CLogThread::sendLog()
  182. {
  183. try
  184. {
  185. if(stopping)
  186. return;
  187. int recSend = 0;
  188. while(true)
  189. {
  190. IEspUpdateLogRequestWrap* logRequest = readJobQueue();
  191. if (!logRequest)
  192. break;
  193. const char* GUID= logRequest->getGUID();
  194. if ((!GUID || !*GUID) && ensureFailSafe && logFailSafe.get())
  195. continue;
  196. PROGLOG("Sending %s ...\n", GUID);
  197. Owned<IEspUpdateLogRequestWrap> logRequestInFile;
  198. if (!logRequestReader)
  199. logRequestInFile.setown(checkAndReadLogRequestFromSharedTankFile(logRequest));
  200. try
  201. {
  202. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  203. Owned<IEspUpdateLogResponse> logResponse = createUpdateLogResponse();
  204. if (logRequestInFile)
  205. logAgent->updateLog(*logRequestInFile, *logResponse);
  206. else
  207. logAgent->updateLog(*logRequest, *logResponse);
  208. if (!logResponse)
  209. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "no response");
  210. if (logResponse->getStatusCode())
  211. {
  212. const char* statusMessage = logResponse->getStatusMessage();
  213. if(statusMessage && *statusMessage)
  214. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "%s", statusMessage);
  215. else
  216. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Unknown error");
  217. }
  218. ESPLOG(LogNormal, "LThread:updateLog: %dms\n", msTick() - startTime);
  219. if (logRequestReader)
  220. {
  221. unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  222. logRequestReader->addACK(GUID);
  223. PROGLOG("%s acked: %dms\n", GUID, msTick() - startTime1);
  224. }
  225. else if(ensureFailSafe && logFailSafe.get())
  226. {
  227. unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  228. logFailSafe->AddACK(GUID);
  229. ESPLOG(LogNormal, "LThread:AddACK: %dms\n", msTick() - startTime1);
  230. }
  231. logRequest->Release();//Make sure that no data (such as GUID) is needed before releasing the logRequest.
  232. }
  233. catch(IException* e)
  234. {
  235. StringBuffer errorStr, errorMessage;
  236. errorMessage.appendf("Failed to update log for %s: error code %d, error message %s", GUID, e->errorCode(), e->errorMessage(errorStr).str());
  237. e->Release();
  238. if (logRequestInFile)
  239. logRequest->setNoResend(logRequestInFile->getNoResend());
  240. bool willRetry = false;
  241. if (!logRequest->getNoResend() && (maxLogRetries != 0))
  242. {
  243. unsigned retry = logRequest->incrementRetryCount();
  244. if (retry > maxLogRetries)
  245. errorMessage.append(" Max logging retries exceeded.");
  246. else
  247. {
  248. willRetry = true;
  249. writeJobQueue(logRequest);
  250. errorMessage.appendf(" Adding back to logging queue for retrying %d.", retry);
  251. }
  252. }
  253. if (!willRetry)
  254. {
  255. logRequest->Release();
  256. }
  257. IERRLOG("%s", errorMessage.str());
  258. }
  259. }
  260. }
  261. catch(IException* e)
  262. {
  263. StringBuffer errorStr, errorMessage;
  264. errorMessage.append("Exception thrown within update log thread: error code ").append(e->errorCode()).append(", error message ").append(e->errorMessage(errorStr));
  265. IERRLOG("%s", errorMessage.str());
  266. e->Release();
  267. }
  268. catch(...)
  269. {
  270. IERRLOG("Unknown exception thrown within update log thread");
  271. }
  272. return;
  273. }
  274. //At first, we check whether the logRequest contains the information about original log Request
  275. //in shared tank file created by logging manager. If yes, try to read the original log Request
  276. //based on the information. If the original log Request is found and unserialized, return new
  277. //IEspUpdateLogRequestWrap which contains original log Request.
  278. IEspUpdateLogRequestWrap* CLogThread::checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest)
  279. {
  280. //Read LogRequestInFile info if exists.
  281. Owned<IPropertyTree> logInFle = createPTreeFromXMLString(logRequest->getUpdateLogRequest());
  282. if (!logInFle)
  283. return nullptr;
  284. const char* GUID = logInFle->queryProp(LOGREQUEST_GUID);
  285. if (isEmptyString(GUID))
  286. return nullptr;
  287. const char* fileName = logInFle->queryProp(LOGCONTENTINFILE_FILENAME);
  288. if (isEmptyString(fileName))
  289. return nullptr;
  290. __int64 pos = logInFle->getPropInt64(LOGCONTENTINFILE_FILEPOS, -1);
  291. if (pos < 0)
  292. return nullptr;
  293. int size = logInFle->getPropInt64(LOGCONTENTINFILE_FILESIZE, -1);
  294. if (size < 0)
  295. return nullptr;
  296. Owned<CLogRequestInFile> reqInFile = new CLogRequestInFile();
  297. reqInFile->setGUID(GUID);
  298. reqInFile->setFileName(fileName);
  299. reqInFile->setPos(pos);
  300. reqInFile->setSize(size);
  301. //Read Log Request from the file
  302. StringBuffer logRequestStr;
  303. CLogSerializer logSerializer;
  304. if (!logSerializer.readLogRequest(reqInFile, logRequestStr))
  305. {
  306. ERRLOG("Failed to read Log Request from %s", fileName);
  307. return nullptr;
  308. }
  309. try
  310. {
  311. Owned<IPropertyTree> logRequestTree = createPTreeFromXMLString(logRequestStr.str());
  312. if (!logRequestTree)
  313. return nullptr;
  314. const char* guid = logRequestTree->queryProp("GUID");
  315. const char* opt = logRequestTree->queryProp("Option");
  316. const char* reqBuf = logRequestTree->queryProp("LogRequest");
  317. if (isEmptyString(reqBuf))
  318. return nullptr;
  319. StringBuffer decoded, req;
  320. JBASE64_Decode(reqBuf, decoded);
  321. LZWExpand(decoded, decoded.length(), req);
  322. return new CUpdateLogRequestWrap(guid, opt, req.str());
  323. }
  324. catch(IException* e)
  325. {
  326. StringBuffer errorStr;
  327. ERRLOG("Exception when unserializing Log Request Content: %d %s", e->errorCode(), e->errorMessage(errorStr).str());
  328. e->Release();
  329. }
  330. return nullptr;
  331. }
  332. //////////////////////////FailSafe////////////////////////////
  333. void CLogThread::checkRollOver()
  334. {
  335. try
  336. {
  337. bool bRollover = false;
  338. time_t tNow;
  339. time(&tNow);
  340. struct tm ltNow;
  341. localtime_r(&tNow, &ltNow);
  342. if ((ltNow.tm_year != m_startTime.tm_year || ltNow.tm_yday != m_startTime.tm_yday))
  343. {
  344. bRollover = true;
  345. localtime_r(&tNow, &m_startTime); // reset the start time for next rollover check
  346. }
  347. if (!bRollover)
  348. return;
  349. //Rename .log files to .old files
  350. logFailSafe->SafeRollover();
  351. CriticalBlock b(logQueueCrit);
  352. //Check and add queued requests to tank(.log) files
  353. unsigned numNewArrivals = logQueue.ordinality();
  354. if(numNewArrivals <= 0)
  355. return;
  356. ESPLOG(LogMax, "writing %d requests in the queue to the rolled over tank file.", numNewArrivals);
  357. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  358. for(unsigned i = 0; i < numNewArrivals; i++)
  359. {
  360. IInterface* pRequest = logQueue.item(i);
  361. if (!pRequest)
  362. continue;
  363. IEspUpdateLogRequestWrap* pEspRequest = dynamic_cast<IEspUpdateLogRequestWrap*>(pRequest);
  364. if(!pEspRequest)
  365. continue;
  366. StringBuffer reqBuf;
  367. const char* GUID = pEspRequest->getGUID();
  368. //This part of code can only be reached for non decoupled logging.
  369. //For non decoupled logging, the manager has filtered out skipped agents.
  370. //This part of code is called only if the agent is not skipped.
  371. //So, the scriptValues section is not needed to be in the logFailSafe->Add().
  372. if(GUID && *GUID && serializeLogRequestContent(pEspRequest, reqBuf))
  373. logFailSafe->Add(GUID, nullptr, reqBuf.str(), nullptr);
  374. }
  375. ESPLOG(LogNormal, "LThread:AddFailSafe: %dms\n", msTick() - startTime);
  376. }
  377. catch(IException* Ex)
  378. {
  379. StringBuffer str;
  380. Ex->errorMessage(str);
  381. IERRLOG("Exception thrown during tank file rollover: %s",str.str());
  382. Ex->Release();
  383. }
  384. catch(...)
  385. {
  386. IERRLOG("Unknown exception thrown during tank file rollover.");
  387. }
  388. }
  389. unsigned CLogThread::serializeLogRequestContent(IEspUpdateLogRequestWrap* pRequest, StringBuffer& logData)
  390. {
  391. const char* GUID = pRequest->getGUID();
  392. const char* option = pRequest->getOption();
  393. const char* logRequest = pRequest->getUpdateLogRequest();
  394. if (GUID && *GUID)
  395. logData.append("<GUID>").append(GUID).append("</GUID>");
  396. if (option && *option)
  397. logData.append("<Option>").append(option).append("</Option>");
  398. if (logRequest && *logRequest)
  399. {
  400. StringBuffer buffer;
  401. JBASE64_Encode(logRequest, strlen(logRequest), buffer, true);
  402. logData.append("<LogRequest>").append(buffer.str()).append("</LogRequest>");
  403. }
  404. return logData.length();
  405. }
  406. void CLogThread::checkPendingLogs(bool bOneRecOnly)
  407. {
  408. try
  409. {
  410. bool queueLogError = false;
  411. bool bFirst = true;
  412. StringBuffer GUID, logData;
  413. while (logFailSafe->PopPendingLogRecord(GUID, logData))
  414. {
  415. if (bFirst && !bOneRecOnly)
  416. {
  417. DBGLOG("We have old logs!. Will now try and recover the lost log messages");
  418. bFirst = false;
  419. }
  420. Owned<IEspUpdateLogRequestWrap> logRequest = unserializeLogRequestContent(logData.str(), false);
  421. if (!logRequest)
  422. IERRLOG("checkPendingLogs: failed to unserialize: %s", logData.str());
  423. else if (!enqueue(logRequest, GUID))
  424. {
  425. OERRLOG("checkPendingLogs: failed to add a log request to queue");
  426. queueLogError=true;
  427. }
  428. if (bOneRecOnly)
  429. break;
  430. }
  431. //if everything went ok then we should be able to rollover the old logs.
  432. if (!queueLogError && !bOneRecOnly)
  433. logFailSafe->RollOldLogs();
  434. }
  435. catch(IException* ex)
  436. {
  437. StringBuffer errorStr;
  438. ex->errorMessage(errorStr);
  439. IERRLOG("CheckPendingLogs: %s:" ,errorStr.str());
  440. ex->Release();
  441. }
  442. catch(...)
  443. {
  444. IERRLOG("Unknown exception thrown in CheckPendingLogs");
  445. }
  446. }
  447. //When the logData is read from a main tank file, it should be decrypted.
  448. //For non-decoupled logging agents, each logging agent may have its own tank file (with the location and
  449. //position of the main tank file). The logData from those agent tank files is not encrypted. So, it should
  450. //not be decrypted.
  451. //BTW: For non-decoupled logging agents, the logData from main tank file is read and decrypted in
  452. //checkAndReadLogRequestFromSharedTankFile().
  453. IEspUpdateLogRequestWrap* CLogThread::unserializeLogRequestContent(const char* logData, bool decompress)
  454. {
  455. if (!logData && *logData)
  456. return NULL;
  457. Owned<IPropertyTree> pLogTree = createPTreeFromXMLString(logData);
  458. if (!pLogTree)
  459. return NULL;
  460. const char* guid = pLogTree->queryProp("GUID");
  461. const char* opt = pLogTree->queryProp("Option");
  462. const char* logRequest = pLogTree->queryProp("LogRequest");
  463. if (!logRequest || !*logRequest)
  464. return NULL;
  465. StringBuffer decoded;
  466. JBASE64_Decode(logRequest, decoded);
  467. if (!decompress)
  468. return new CUpdateLogRequestWrap(guid, opt, decoded.str());
  469. StringBuffer req;
  470. LZWExpand(decoded, decoded.length(), req);
  471. return new CUpdateLogRequestWrap(guid, opt, req.str());
  472. };
  473. void CLogThread::writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite)
  474. {
  475. if (jobToWrite)
  476. {
  477. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  478. CriticalBlock b(logQueueCrit);
  479. ESPLOG(LogNormal, "LThread:waitWQ: %dms\n", msTick() - startTime);
  480. int QueueSize = logQueue.ordinality();
  481. if(QueueSize > maxLogQueueLength)
  482. OERRLOG("LOGGING QUEUE SIZE %d EXCEEDED MaxLogQueueLength %d, check the logging server.",QueueSize, maxLogQueueLength);
  483. if(QueueSize!=0 && QueueSize % signalGrowingQueueAt == 0)
  484. OERRLOG("Logging Queue at %d records. Check the logging server.",QueueSize);
  485. logQueue.enqueue(LINK(jobToWrite));
  486. }
  487. }
  488. IEspUpdateLogRequestWrap* CLogThread::readJobQueue()
  489. {
  490. #define LOG_LEVEL LogNormal
  491. unsigned startTime = (getEspLogLevel()>=LOG_LEVEL) ? msTick() : 0;
  492. CriticalBlock b(logQueueCrit);
  493. unsigned delta = (getEspLogLevel()>=LOG_LEVEL) ? msTick() - startTime : 0;
  494. if (delta > 1) // <=1ms is not indicative of an unexpected delay
  495. ESPLOG(LOG_LEVEL, "LThread:waitRQ: %dms", delta);
  496. return (IEspUpdateLogRequestWrap*)logQueue.dequeue();
  497. #undef LOG_LEVEL
  498. }
  499. void CLogThread::checkAndCreateFile(const char* fileName)
  500. {
  501. Owned<IFile> file = createIFile(fileName);
  502. if(file->isFile() != fileBool::notFound)
  503. return;
  504. StringBuffer dir;
  505. splitFilename(fileName, &dir, &dir, nullptr, nullptr);
  506. recursiveCreateDirectory(dir);
  507. Owned<IFileIO> io = file->openShared(IFOcreate, IFSHfull);
  508. PROGLOG("CLogThread::checkAndCreateFile: %s is created.", fileName);
  509. }
  510. CLogRequestReader::~CLogRequestReader()
  511. {
  512. stopping = true;
  513. sem.signal();
  514. threaded.join();
  515. }
  516. void CLogRequestReader::threadmain()
  517. {
  518. PROGLOG("LogRequest Reader Thread started.");
  519. readAcked(settings->ackedFileList, ackedLogFileCheckList);
  520. readAcked(settings->ackedLogRequestFile, ackedLogRequests);
  521. unsigned waitMillSeconds = 1000*settings->waitSeconds;
  522. while (!stopping)
  523. {
  524. ESPLOG(LogMax, "#### CLogRequestReader: the loop for reading log requests begins.");
  525. if (!paused)
  526. {
  527. try
  528. {
  529. ESPLOG(LogMax, "#### CLogRequestReader: waiting for readLogRequest().");
  530. CriticalBlock b(crit);
  531. readLogRequest();
  532. if (newAckedLogFiles.length())
  533. {
  534. updateAckedFileList();
  535. updateAckedLogRequestList();
  536. }
  537. PROGLOG("CLogRequestReader: finished the loop for reading log requests.");
  538. }
  539. catch(IException *e)
  540. {
  541. StringBuffer msg;
  542. IERRLOG("Exception %d:%s in CLogRequestReader::threadmain()", e->errorCode(), e->errorMessage(msg).str());
  543. e->Release();
  544. }
  545. catch(...)
  546. {
  547. IERRLOG("Unknown exception in CLogRequestReader::threadmain()");
  548. }
  549. }
  550. sem.wait(waitMillSeconds);
  551. }
  552. PROGLOG("LogRequest Reader Thread terminated.");
  553. }
  554. void CLogRequestReader::reportAckedLogFiles(StringArray& ackedLogFiles)
  555. {
  556. CriticalBlock b(crit);
  557. for (auto r : ackedLogFileCheckList)
  558. ackedLogFiles.append(r.c_str());
  559. ESPLOG(LogMax, "#### The reportAckedLogFiles() done.");
  560. }
  561. //This method is used to setup an acked file list which contains the acked files for all agents.
  562. //The first agent reports all the acked files in that agent using the reportAckedLogFiles().
  563. //Using this method, the list of the acked files is given to the rest agents. If any file inside
  564. //the list has not been acked in the rest agents, the file should be removed from the list.
  565. void CLogRequestReader::removeUnknownAckedLogFiles(StringArray& ackedLogFiles)
  566. {
  567. CriticalBlock b(crit);
  568. ForEachItemInRev(i, ackedLogFiles)
  569. {
  570. const char* node = ackedLogFiles.item(i);
  571. if (ackedLogFileCheckList.find(node) == ackedLogFileCheckList.end())
  572. ackedLogFiles.remove(i);
  573. }
  574. ESPLOG(LogMax, "#### The removeUnknownAckedLogFiles() done.");
  575. }
  576. void CLogRequestReader::addNewAckedFileList(const char* list, StringArray& fileNames)
  577. {
  578. OwnedIFile newList = createIFile(list);
  579. if (!newList)
  580. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to access %s", list);
  581. if (newList->exists())
  582. {
  583. if (!newList->remove())
  584. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to remove old %s", list);
  585. }
  586. OwnedIFileIO newListIO = newList->open(IFOwrite);
  587. if (!newListIO)
  588. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to open %s", list);
  589. offset_t pos = 0;
  590. ForEachItemIn(i, fileNames)
  591. {
  592. const char* fileName = fileNames.item(i);
  593. StringBuffer line(fileName);
  594. line.append("\r\n");
  595. unsigned len = line.length();
  596. newListIO->write(pos, len, line.str());
  597. pos += len;
  598. PROGLOG("Add AckedLogFile %s to %s", fileName, list);
  599. }
  600. }
  601. //The file names in the fileNames should be removed from both ackedLogFileCheckList
  602. //and settings->ackedFileList.
  603. void CLogRequestReader::cleanAckedLogFiles(StringArray& fileNames)
  604. {
  605. CriticalBlock b(crit);
  606. //Find which file should not be removed from ackedLogFileCheckList.
  607. StringArray fileNamesToKeep;
  608. for (auto r : ackedLogFileCheckList)
  609. {
  610. if (!fileNames.contains(r.c_str()))
  611. fileNamesToKeep.append(r.c_str());
  612. }
  613. //Create a temp file with the fileNamesToKeep for replacing the settings->ackedFileList
  614. VStringBuffer tempFileName("%s.tmp", settings->ackedFileList.str());
  615. addNewAckedFileList(tempFileName, fileNamesToKeep);
  616. //Replace the settings->ackedFileList with the temp file
  617. renameFile(settings->ackedFileList, tempFileName, true);
  618. PROGLOG("Rename %s to %s", tempFileName.str(), settings->ackedFileList.str());
  619. //Create new ackedLogFileCheckList based on fileNamesToKeep
  620. ackedLogFileCheckList.clear();
  621. ForEachItemIn(j, fileNamesToKeep)
  622. {
  623. const char* name = fileNamesToKeep.item(j);
  624. ackedLogFileCheckList.insert(name);
  625. PROGLOG("Add %s to new ackedLogFileCheckList", name);
  626. }
  627. }
  628. void CLogRequestReader::readAcked(const char* fileName, std::set<std::string>& acked)
  629. {
  630. Owned<IFile> f = createIFile(fileName);
  631. if (f)
  632. {
  633. OwnedIFileIO io = f->openShared(IFOread, IFSHfull);
  634. if (io)
  635. {
  636. StringBuffer line;
  637. OwnedIFileIOStream ios = createIOStream(io);
  638. Owned<IStreamLineReader> lineReader = createLineReader(ios, true);
  639. while(!lineReader->readLine(line.clear()))
  640. {
  641. if (line.isEmpty())
  642. continue;
  643. unsigned len = line.length();
  644. if ((len > 1) && (line.charAt(len - 2) == '\r') && (line.charAt(len - 1) == '\n'))
  645. line.setLength(len - 2); //remove \r\n
  646. else if ((len > 0) && ((line.charAt(len - 1) == '\r') || (line.charAt(len - 1) == '\n')))
  647. line.setLength(len - 1); //remove \r or \n
  648. if (line.length() > 0) //Check just in case
  649. acked.insert(line.str());
  650. PROGLOG("Found Acked %s from %s", line.str(), fileName);
  651. }
  652. }
  653. }
  654. else
  655. {
  656. f.setown(createIFile(fileName));
  657. Owned<IFileIO> io = f->open(IFOcreate);
  658. }
  659. }
  660. void CLogRequestReader::readLogRequest()
  661. {
  662. ESPLOG(LogMax, "#### Enter readLogRequest()");
  663. StringAttr tankFileNotFinished;//Today's newest tank file.
  664. findTankFileNotFinished(tankFileNotFinished);
  665. offset_t tankFileNotFinishedPos = 0;
  666. Owned<IDirectoryIterator> it = createDirectoryIterator(settings->tankFileDir.get(), tankFilePattern);
  667. ForEach (*it)
  668. {
  669. const char *fileNameWithPath = it->query().queryFilename();
  670. const char *fileName = pathTail(fileNameWithPath);
  671. if (ackedLogFileCheckList.find(fileName) != ackedLogFileCheckList.end())
  672. {
  673. ESPLOG(LogMax, "####Skip tank file: %s. It is in the Acked File list.", fileName);
  674. continue;
  675. }
  676. if (readLogRequestsFromTankFile(fileNameWithPath, tankFileNotFinished, tankFileNotFinishedPos))
  677. addToAckedLogFileList(fileName, fileNameWithPath);
  678. }
  679. addPendingLogsToQueue();
  680. if (tankFileNotFinishedPos)
  681. {//In the next loop, we may skip the log requests which have been read in this loop.
  682. lastTankFile = tankFileNotFinished;
  683. lastTankFilePos = tankFileNotFinishedPos;
  684. }
  685. ESPLOG(LogMax, "#### Leave readLogRequest()");
  686. }
  687. void CLogRequestReader::findTankFileNotFinished(StringAttr& tankFileNotFinished)
  688. {
  689. StringBuffer todayString;
  690. unsigned year, month, day;
  691. CDateTime now;
  692. now.setNow();
  693. now.getDate(year, month, day, true);
  694. todayString.appendf("%04d_%02d_%02d_00_00_00", year, month, day);
  695. StringBuffer lastTimeString;
  696. Owned<IDirectoryIterator> it = createDirectoryIterator(settings->tankFileDir.get(), tankFilePattern);
  697. ForEach (*it)
  698. {
  699. StringBuffer timeString;
  700. const char* aFileNameWithPath = it->query().queryFilename();
  701. const char* aFileName = pathTail(aFileNameWithPath);
  702. getTankFileTimeString(aFileName, timeString);
  703. if (timeString.isEmpty())
  704. {
  705. IERRLOG("Failed to parse tank file name: %s", aFileName);
  706. continue;
  707. }
  708. if (strcmp(todayString, timeString) > 0) //Not created today
  709. continue;
  710. if (strcmp(timeString, lastTimeString) > 0) //a newer file is found.
  711. {
  712. lastTimeString.set(timeString);
  713. tankFileNotFinished.set(aFileNameWithPath);
  714. }
  715. }
  716. }
  717. StringBuffer& CLogRequestReader::getTankFileTimeString(const char* fileName, StringBuffer& timeString)
  718. {
  719. const char* ptr = strstr(fileName, sendLogKeyword);
  720. if (!ptr)
  721. return timeString;
  722. ptr += sendLogKeywordLen;
  723. if (!ptr)
  724. return timeString;
  725. ptr = strchr(ptr, '.');
  726. if (ptr && (strlen(ptr) > dateTimeStringLength))
  727. timeString.append(dateTimeStringLength, ++ptr); //yyyy_mm_dd_hh_mm_ss
  728. return timeString;
  729. }
  730. bool CLogRequestReader::readLogRequestsFromTankFile(const char* fileName, StringAttr& tankFileNotFinished, offset_t& tankFileNotFinishedPos)
  731. {
  732. ESPLOG(LogMax, "#### Enter readLogRequestsFromTankFile(): %s", fileName);
  733. Owned<IFile> file = createIFile(fileName);
  734. if (!file) //This can only happen at start time. So, throw exception.
  735. throw MakeStringException(-1, "Unable to find logging file %s", fileName);
  736. Owned<IFileIO> fileIO = file->open(IFOread);
  737. if (!fileIO)
  738. throw MakeStringException(-1, "Unable to open logging file %s", fileName);
  739. //Sample: 00009902 0421311217.2019_03_29_14_32_11 <cache><GUID>0421311217.2019_03_29_14_32_11</GUID>
  740. //<option>SingleInsert</option><LogRequest>dUgAAH...AA==</LogRequest></cache>
  741. offset_t finger = getReadFilePos(fileName);
  742. unsigned totalMissed = 0;
  743. while(true)
  744. {
  745. MemoryBuffer data;
  746. CLogSerializer logSerializer;
  747. if (!logSerializer.readALogLine(fileIO, finger, data))
  748. break;
  749. bool skipLogRequest = false;
  750. StringBuffer GUID, logRequest;
  751. if (!parseLogRequest(data, GUID, logRequest, skipLogRequest))
  752. {
  753. if (skipLogRequest)
  754. ESPLOG(LogMax, "#### Agent %s skips %s.", logThread->getLogAgent()->getName(), GUID.str());
  755. else
  756. IERRLOG("Invalid logging request in %s", fileName);
  757. }
  758. else if (ackedLogRequests.find(GUID.str()) == ackedLogRequests.end())
  759. {//This QUID is not acked.
  760. totalMissed++;
  761. if (pendingLogGUIDs.find(GUID.str()) == pendingLogGUIDs.end())
  762. {//This QUID has not been queued yet.
  763. PROGLOG("Found new log request %s from tank file %s. Added to pending logs.", GUID.str(), fileName);
  764. pendingLogGUIDs.insert(GUID.str());
  765. pendingLogs[GUID.str()] = logRequest.str();
  766. if (pendingLogs.size() > settings->pendingLogBufferSize)
  767. addPendingLogsToQueue();
  768. }
  769. }
  770. }
  771. bool isTankFileNotFinished = !isEmptyString(tankFileNotFinished) && strieq(fileName, tankFileNotFinished);
  772. if (isTankFileNotFinished)
  773. tankFileNotFinishedPos = fileIO->size();
  774. ESPLOG(LogMax, "#### Leave readLogRequestsFromTankFile(): %s, totalMissed(%d)", fileName, totalMissed);
  775. return (totalMissed == 0) && !isTankFileNotFinished;
  776. }
  777. offset_t CLogRequestReader::getReadFilePos(const char* fileName)
  778. {
  779. const char* lastTankFileName = lastTankFile.get();
  780. if (lastTankFileName && strieq(lastTankFileName, fileName))
  781. return lastTankFilePos;
  782. return 0;
  783. }
  784. bool CLogRequestReader::parseLogRequest(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& logLine, bool& skipLogRequest)
  785. {
  786. //The rawdata should be in the form of 2635473460.05_01_12_16_13_57\t<ScriptValues>...</ScriptValues>\t<cache>...</cache>
  787. //parse it into GUID and logLine (as <cache>...</cache>)
  788. //The <ScriptValues> section is optional. If available, it should be used to check whether this agent should skip this
  789. //log request or not.
  790. const char* begin = rawdata.toByteArray(); //no string termination character \0
  791. unsigned len = rawdata.length();
  792. if (!begin || (len == 0))
  793. return false;
  794. const char* ptr = begin;
  795. const char* end = begin + len;
  796. while ((ptr < end) && (*ptr != '\t'))
  797. ptr++;
  798. if ((ptr == end) || (ptr == begin))
  799. return false;
  800. GUID.append(ptr - begin, begin);
  801. if (++ptr == end)
  802. return false;
  803. if (!checkScriptValues(ptr, end, skipLogRequest))
  804. return false;
  805. logLine.append(end - ptr, ptr);
  806. return true;
  807. }
  808. bool CLogRequestReader::checkScriptValues(const char* ptr, const char* end, bool& skipLogRequest)
  809. {
  810. VStringBuffer scriptValuesTag("<%s>", logRequestScriptValues);
  811. if (strnicmp(ptr, scriptValuesTag, scriptValuesTag.length()))
  812. return true;
  813. ptr += scriptValuesTag.length();
  814. const char* scriptValuesBegin = ptr;
  815. while ((ptr < end) && (*ptr != '\t'))
  816. ptr++;
  817. if (ptr == end)
  818. return false; //Invalid logging request
  819. const char* scriptValuesEnd = ptr - scriptValuesTag.length() - 1; //No include XML CloseTag
  820. if (scriptValuesEnd < scriptValuesBegin)
  821. return false;
  822. StringBuffer scriptValuesStr;
  823. scriptValuesStr.append(scriptValuesEnd - scriptValuesBegin, scriptValuesBegin);
  824. Owned<IPropertyTree> scriptValues = createPTreeFromXMLString(scriptValuesStr);
  825. if (checkSkipThreadQueue(scriptValues, *logThread))
  826. {
  827. skipLogRequest = true;
  828. return false;
  829. }
  830. return ++ptr != end;
  831. }
  832. void CLogRequestReader::addPendingLogsToQueue()
  833. {
  834. ESPLOG(LogMax, "#### Enter addPendingLogsToQueue()");
  835. //Add the pendingLogs to log queue
  836. if (pendingLogs.size())
  837. ESPLOG(LogMin, "Adding %zu Pending Log Request(s) to job queue", pendingLogs.size());
  838. StringArray queuedPendingLogs;
  839. for (auto const& x : pendingLogs)
  840. {
  841. Owned<IEspUpdateLogRequestWrap> logRequest = logThread->unserializeLogRequestContent(x.second.c_str(), true);
  842. if (!logRequest)
  843. IERRLOG("addPendingLogsToQueue: failed to unserialize: %s", x.second.c_str());
  844. logThread->queueLog(logRequest);
  845. queuedPendingLogs.append(x.first.c_str());
  846. PROGLOG("Enqueue: %s", x.first.c_str());
  847. }
  848. //Clean the pendingLogs
  849. ForEachItemIn(i, queuedPendingLogs)
  850. pendingLogs.erase(queuedPendingLogs.item(i));
  851. ESPLOG(LogMax, "#### Leave addPendingLogsToQueue()");
  852. }
  853. void CLogRequestReader::addACK(const char* GUID)
  854. {
  855. ESPLOG(LogMax, "#### Enter addACK(): %s", GUID);
  856. CriticalBlock b(crit);
  857. Owned<IFile> f = createIFile(settings->ackedLogRequestFile);
  858. Owned<IFileIO> io = f->open(IFOwrite);
  859. StringBuffer toWrite,size;
  860. toWrite.appendf("%s\r\n", GUID);
  861. io->write(io->size(), toWrite.length(), toWrite.str());
  862. ackedLogRequests.insert(GUID);
  863. pendingLogGUIDs.erase(GUID);
  864. ESPLOG(LogMax, "#### addACK(): %s acked", GUID);
  865. }
  866. void CLogRequestReader::addToAckedLogFileList(const char* fileName, const char* fileNameWithPath)
  867. {
  868. PROGLOG("Found an AckedLogFile %s.", fileNameWithPath);
  869. newAckedLogFiles.append(fileNameWithPath);
  870. ackedLogFileCheckList.insert(fileName);
  871. }
  872. //Update newAckedLogFiles to file settings->ackedFileList,
  873. void CLogRequestReader::updateAckedFileList()
  874. {
  875. ESPLOG(LogMax, "#### Enter updateAckedFileList()");
  876. OwnedIFile ackedFiles = createIFile(settings->ackedFileList);
  877. if (!ackedFiles)
  878. return; //Should never happen
  879. OwnedIFileIO ackedFilesIO = ackedFiles->open(IFOwrite);
  880. if (!ackedFilesIO)
  881. return; //Should never happen
  882. offset_t pos = ackedFilesIO->size();
  883. ForEachItemIn(i, newAckedLogFiles)
  884. {
  885. const char* fileNameWithPath = newAckedLogFiles.item(i);
  886. StringBuffer fileName(pathTail(fileNameWithPath));
  887. PROGLOG("Add AckedLogFile %s to %s", fileName.str(), settings->ackedFileList.str());
  888. //Remove log request from the ackedLogRequests
  889. GuidSet logRequestsToRemove;
  890. CLogSerializer ackedLog(fileNameWithPath);
  891. ackedLog.loadAckedLogs(logRequestsToRemove);
  892. for (auto r : logRequestsToRemove)
  893. ackedLogRequests.erase(r.c_str());
  894. fileName.append("\r\n");
  895. unsigned len = strlen(fileName);
  896. ackedFilesIO->write(pos, len, fileName);
  897. pos += len;
  898. }
  899. newAckedLogFiles.clear();
  900. ESPLOG(LogMax, "#### Leave updateAckedFileList()");
  901. }
  902. void CLogRequestReader::updateAckedLogRequestList()
  903. {
  904. ESPLOG(LogMax, "#### Enter updateAckedLogRequestList()");
  905. OwnedIFile newAckedLogRequestFile = createIFile(settings->ackedLogRequestFile);
  906. if (newAckedLogRequestFile)
  907. {
  908. newAckedLogRequestFile->remove();
  909. PROGLOG("Clean %s", settings->ackedLogRequestFile.str());
  910. }
  911. OwnedIFileIO newAckedLogRequestFileIO = newAckedLogRequestFile->open(IFOwrite);
  912. if (!newAckedLogRequestFileIO)
  913. return; //Should never happen
  914. offset_t pos = 0;
  915. for (auto r : ackedLogRequests)
  916. {
  917. StringBuffer line(r.c_str());
  918. line.append("\r\n");
  919. unsigned len = line.length();
  920. newAckedLogRequestFileIO->write(pos, len, line.str());
  921. pos += len;
  922. PROGLOG("Add AckedLogRequest %s to %s", line.str(), settings->ackedLogRequestFile.str());
  923. }
  924. ESPLOG(LogMax, "#### Leave updateAckedLogRequestList()");
  925. }
  926. void CLogRequestReader::setTankFilePattern(const char* service)
  927. {
  928. tankFilePattern.set(service).append("*").append(logFileExt);
  929. }
  930. static bool checkEnabledLogVariant(IPropertyTree *scriptValues, const char *profile, const char *tracename, const char *group, const char *logtype)
  931. {
  932. bool checkProfile = !isEmptyString(profile);
  933. bool checkType = !isEmptyString(logtype);
  934. if (checkProfile && (isEmptyString(group) || !strieq(profile, group)))
  935. {
  936. ESPLOG(LogNormal, "'%s' log entry disabled - log profile '%s' disabled", tracename, profile);
  937. return false;
  938. }
  939. else if (checkType)
  940. {
  941. VStringBuffer xpath("@disable-log-type-%s", logtype);
  942. if (scriptValues->getPropBool(xpath, false))
  943. {
  944. ESPLOG(LogNormal, "'%s' log entry disabled - log type '%s' disabled", tracename, logtype);
  945. return false;
  946. }
  947. }
  948. return true;
  949. }
  950. bool checkSkipThreadQueue(IPropertyTree *scriptValues, IUpdateLogThread &logthread)
  951. {
  952. if (!scriptValues)
  953. return false;
  954. Linked<IEspLogAgent> agent = logthread.getLogAgent(); //badly named function get functions should link
  955. if (!agent)
  956. return false;
  957. const char *profile = scriptValues->queryProp("@profile");
  958. Owned<IEspLogAgentVariantIterator> variants = agent->getVariants();
  959. if (isEmptyString(profile) && !variants->first())
  960. return false;
  961. ForEach(*variants)
  962. {
  963. const IEspLogAgentVariant& variant = variants->query();
  964. if (checkEnabledLogVariant(scriptValues, profile, variant.getName(), variant.getGroup(), variant.getType()))
  965. return false;
  966. }
  967. return true;
  968. }