logthread.cpp 36 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jmisc.hpp"
  14. #include "jexcept.hpp"
  15. #include "jdebug.hpp"
  16. #include "LoggingErrors.hpp"
  17. #include "LogSerializer.hpp"
  18. #include "logthread.hpp"
  19. #include "compressutil.hpp"
  20. const char* const PropMaxLogQueueLength = "MaxLogQueueLength";
  21. const char* const PropQueueSizeSignal = "QueueSizeSignal";
  22. const char* const PropMaxTriesRS = "MaxTriesRS";
  23. const char* const PropFailSafe = "FailSafe";
  24. const char* const PropAckedFiles = "AckedFiles";
  25. const char* const PropDefaultAckedFiles = "AckedFiles";
  26. const char* const PropAckedLogRequests = "AckedLogRequests";
  27. const char* const PropDefaultAckedLogRequests = "AckedLogRequests";
  28. const char* const PropPendingLogBufferSize = "PendingLogBufferSize";
  29. const char* const PropReadRequestWaitingSeconds = "ReadRequestWaitingSeconds";
  30. const char* const sendLogKeyword = "_sending_";
  31. const unsigned sendLogKeywordLen = strlen(sendLogKeyword);
  32. const unsigned dateTimeStringLength = 19; //yyyy_mm_dd_hh_mm_ss
  33. #define MaxLogQueueLength 500000 //Write a warning into log when queue length is greater than 500000
  34. #define QueueSizeSignal 10000 //Write a warning into log when queue length is increased by 10000
  35. const int DefaultMaxTriesRS = -1; // Max. # of attempts to send log message to WsReportService. Default: infinite
  36. extern LOGGINGCOMMON_API IUpdateLogThread* createUpdateLogThread(IPropertyTree* _cfg, const char* _service, const char* _agentName,
  37. const char* _tankFileDir, IEspLogAgent* _logAgent)
  38. {
  39. if (!_cfg)
  40. return NULL;
  41. IUpdateLogThread* loggingThread = new CLogThread(_cfg, _service, _agentName, _logAgent, _tankFileDir);
  42. loggingThread->start();
  43. return loggingThread;
  44. }
  45. CLogThread::CLogThread(IPropertyTree* _cfg , const char* _service, const char* _agentName, IEspLogAgent* _logAgent, const char* _tankFileDir)
  46. : stopping(false), agentName(_agentName), tankFileDir(_tankFileDir)
  47. {
  48. if(!_agentName || !*_agentName)
  49. throw MakeStringException(-1,"No Logging agent name defined");
  50. if(!_cfg)
  51. throw MakeStringException(-1,"No Logging agent Configuration for %s", _agentName);
  52. if(!_service || !*_service)
  53. throw MakeStringException(-1,"No service name defined for %s", _agentName);
  54. if(!_logAgent)
  55. throw MakeStringException(-1,"No Logging agent interface for %s", _agentName);
  56. logAgent.setown(_logAgent);
  57. maxLogQueueLength = _cfg->getPropInt(PropMaxLogQueueLength, MaxLogQueueLength);
  58. signalGrowingQueueAt = _cfg->getPropInt(PropQueueSizeSignal, QueueSizeSignal);
  59. maxLogRetries = _cfg->getPropInt(PropMaxTriesRS, DefaultMaxTriesRS);
  60. ensureFailSafe = _cfg->getPropBool(PropFailSafe);
  61. if(ensureFailSafe)
  62. logFailSafe.setown(createFailSafeLogger(_cfg, _service, _agentName));
  63. time_t tNow;
  64. time(&tNow);
  65. localtime_r(&tNow, &m_startTime);
  66. if (tankFileDir.get())
  67. {
  68. Owned<CLogRequestReaderSettings> settings = new CLogRequestReaderSettings();
  69. settings->tankFileDir.set(tankFileDir.get());
  70. const char* ackedFiles = _cfg->queryProp(PropAckedFiles);
  71. settings->ackedFileList.set(isEmptyString(ackedFiles) ? PropDefaultAckedFiles : ackedFiles);
  72. const char* ackedLogRequestFile = _cfg->queryProp(PropAckedLogRequests);
  73. settings->ackedLogRequestFile.set(isEmptyString(ackedLogRequestFile) ? PropDefaultAckedLogRequests : ackedLogRequestFile);
  74. int pendingLogBufferSize = _cfg->getPropInt(PropPendingLogBufferSize, DEFAULTPENDINGLOGBUFFERSIZE);
  75. if (pendingLogBufferSize <= 0)
  76. throw MakeStringException(-1, "The %s (%d) should be greater than 0.", PropPendingLogBufferSize, pendingLogBufferSize);
  77. settings->pendingLogBufferSize = pendingLogBufferSize;
  78. int waitSeconds = _cfg->getPropInt(PropReadRequestWaitingSeconds, DEFAULTREADLOGREQUESTWAITSECOND);
  79. if (waitSeconds <= 0)
  80. throw MakeStringException(-1, "The %s (%d) should be greater than 0.", PropReadRequestWaitingSeconds, waitSeconds);
  81. settings->waitSeconds = waitSeconds;
  82. PROGLOG("%s %s: %s", agentName.get(), PropAckedFiles, settings->ackedFileList.str());
  83. PROGLOG("%s %s: %s", agentName.get(), PropDefaultAckedLogRequests, settings->ackedLogRequestFile.str());
  84. PROGLOG("%s %s: %d. %s: %d", agentName.get(), PropReadRequestWaitingSeconds, settings->waitSeconds, PropPendingLogBufferSize, settings->pendingLogBufferSize);
  85. checkAndCreateFile(settings->ackedFileList);
  86. checkAndCreateFile(settings->ackedLogRequestFile);
  87. logRequestReader.setown(new CLogRequestReader(settings.getClear(), this));
  88. }
  89. PROGLOG("%s CLogThread started.", agentName.get());
  90. }
  91. CLogThread::~CLogThread()
  92. {
  93. ESPLOG(LogMax, "CLogThread::~CLogThread()");
  94. }
  95. void CLogThread::start()
  96. {
  97. Thread::start();
  98. }
  99. int CLogThread::run()
  100. {
  101. Link();
  102. if (!logRequestReader && logFailSafe.get())
  103. checkPendingLogs(false);
  104. while(!stopping)
  105. {
  106. m_sem.wait(UPDATELOGTHREADWAITINGTIME);
  107. sendLog();
  108. if (!logRequestReader && logFailSafe.get())
  109. {
  110. checkPendingLogs(true);
  111. checkRollOver();
  112. }
  113. }
  114. Release();
  115. return 0;
  116. }
  117. void CLogThread::stop()
  118. {
  119. try
  120. {
  121. CriticalBlock b(logQueueCrit);
  122. if (!logQueue.ordinality() && !logRequestReader && logFailSafe.get() && logFailSafe->canRollCurrentLog())
  123. logFailSafe->RollCurrentLog();
  124. //If logQueue is not empty, the log files are rolled over so that queued jobs can be read
  125. //when the CLogThread is restarted.
  126. }
  127. catch(...)
  128. {
  129. DBGLOG("Exception");
  130. }
  131. stopping = true;
  132. m_sem.signal();
  133. join();
  134. }
  135. bool CLogThread::queueLog(IEspUpdateLogRequest* logRequest)
  136. {
  137. if (!logRequest)
  138. return false;
  139. Owned<IEspUpdateLogRequestWrap> logRequestWrap = new CUpdateLogRequestWrap(NULL, logRequest->getOption(), logRequest->getLogContent());
  140. return enqueue(logRequestWrap, nullptr);
  141. }
  142. bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
  143. {
  144. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  145. Owned<IEspUpdateLogRequestWrap> logRequestFiltered = logAgent->filterLogContent(logRequest);
  146. ESPLOG(LogNormal, "LThread:filterLog: %dms\n", msTick() - startTime);
  147. return enqueue(logRequestFiltered, nullptr);
  148. }
  149. bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid)
  150. {
  151. if (!logRequestReader && logFailSafe.get())
  152. {
  153. StringBuffer GUID, reqBuf;
  154. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  155. if (isEmptyString(guid))
  156. logFailSafe->GenerateGUID(GUID, nullptr);
  157. else
  158. GUID.set(guid);
  159. logRequest->setGUID(GUID.str());
  160. if (serializeLogRequestContent(logRequest, reqBuf))
  161. logFailSafe->Add(GUID, reqBuf.str(), nullptr);
  162. ESPLOG(LogNormal, "LThread:addToFailSafe: %dms\n", msTick() - startTime);
  163. }
  164. writeJobQueue(logRequest);
  165. m_sem.signal();
  166. return true;
  167. }
  168. void CLogThread::sendLog()
  169. {
  170. try
  171. {
  172. if(stopping)
  173. return;
  174. int recSend = 0;
  175. while(true)
  176. {
  177. IEspUpdateLogRequestWrap* logRequest = readJobQueue();
  178. if (!logRequest)
  179. break;
  180. const char* GUID= logRequest->getGUID();
  181. if ((!GUID || !*GUID) && ensureFailSafe && logFailSafe.get())
  182. continue;
  183. PROGLOG("Sending %s ...\n", GUID);
  184. Owned<IEspUpdateLogRequestWrap> logRequestInFile;
  185. if (!logRequestReader)
  186. logRequestInFile.setown(checkAndReadLogRequestFromSharedTankFile(logRequest));
  187. try
  188. {
  189. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  190. Owned<IEspUpdateLogResponse> logResponse = createUpdateLogResponse();
  191. if (logRequestInFile)
  192. logAgent->updateLog(*logRequestInFile, *logResponse);
  193. else
  194. logAgent->updateLog(*logRequest, *logResponse);
  195. if (!logResponse)
  196. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "no response");
  197. if (logResponse->getStatusCode())
  198. {
  199. const char* statusMessage = logResponse->getStatusMessage();
  200. if(statusMessage && *statusMessage)
  201. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "%s", statusMessage);
  202. else
  203. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Unknown error");
  204. }
  205. ESPLOG(LogNormal, "LThread:updateLog: %dms\n", msTick() - startTime);
  206. if (logRequestReader)
  207. {
  208. unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  209. logRequestReader->addACK(GUID);
  210. PROGLOG("%s acked: %dms\n", GUID, msTick() - startTime1);
  211. }
  212. else if(ensureFailSafe && logFailSafe.get())
  213. {
  214. unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  215. logFailSafe->AddACK(GUID);
  216. ESPLOG(LogNormal, "LThread:AddACK: %dms\n", msTick() - startTime1);
  217. }
  218. logRequest->Release();//Make sure that no data (such as GUID) is needed before releasing the logRequest.
  219. }
  220. catch(IException* e)
  221. {
  222. StringBuffer errorStr, errorMessage;
  223. errorMessage.appendf("Failed to update log for %s: error code %d, error message %s", GUID, e->errorCode(), e->errorMessage(errorStr).str());
  224. e->Release();
  225. if (logRequestInFile)
  226. logRequest->setNoResend(logRequestInFile->getNoResend());
  227. bool willRetry = false;
  228. if (!logRequest->getNoResend() && (maxLogRetries != 0))
  229. {
  230. unsigned retry = logRequest->incrementRetryCount();
  231. if (retry > maxLogRetries)
  232. errorMessage.append(" Max logging retries exceeded.");
  233. else
  234. {
  235. willRetry = true;
  236. writeJobQueue(logRequest);
  237. errorMessage.appendf(" Adding back to logging queue for retrying %d.", retry);
  238. }
  239. }
  240. if (!willRetry)
  241. {
  242. logRequest->Release();
  243. }
  244. IERRLOG("%s", errorMessage.str());
  245. }
  246. }
  247. }
  248. catch(IException* e)
  249. {
  250. StringBuffer errorStr, errorMessage;
  251. errorMessage.append("Exception thrown within update log thread: error code ").append(e->errorCode()).append(", error message ").append(e->errorMessage(errorStr));
  252. IERRLOG("%s", errorMessage.str());
  253. e->Release();
  254. }
  255. catch(...)
  256. {
  257. IERRLOG("Unknown exception thrown within update log thread");
  258. }
  259. return;
  260. }
  261. //At first, we check whether the logRequest contains the information about original log Request
  262. //in shared tank file created by logging manager. If yes, try to read the original log Request
  263. //based on the information. If the original log Request is found and unserialized, return new
  264. //IEspUpdateLogRequestWrap which contains original log Request.
  265. IEspUpdateLogRequestWrap* CLogThread::checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest)
  266. {
  267. //Read LogRequestInFile info if exists.
  268. Owned<IPropertyTree> logInFle = createPTreeFromXMLString(logRequest->getUpdateLogRequest());
  269. if (!logInFle)
  270. return nullptr;
  271. const char* GUID = logInFle->queryProp(LOGREQUEST_GUID);
  272. if (isEmptyString(GUID))
  273. return nullptr;
  274. const char* fileName = logInFle->queryProp(LOGCONTENTINFILE_FILENAME);
  275. if (isEmptyString(fileName))
  276. return nullptr;
  277. __int64 pos = logInFle->getPropInt64(LOGCONTENTINFILE_FILEPOS, -1);
  278. if (pos < 0)
  279. return nullptr;
  280. int size = logInFle->getPropInt64(LOGCONTENTINFILE_FILESIZE, -1);
  281. if (size < 0)
  282. return nullptr;
  283. Owned<CLogRequestInFile> reqInFile = new CLogRequestInFile();
  284. reqInFile->setGUID(GUID);
  285. reqInFile->setFileName(fileName);
  286. reqInFile->setPos(pos);
  287. reqInFile->setSize(size);
  288. //Read Log Request from the file
  289. StringBuffer logRequestStr;
  290. CLogSerializer logSerializer;
  291. if (!logSerializer.readLogRequest(reqInFile, logRequestStr))
  292. {
  293. ERRLOG("Failed to read Log Request from %s", fileName);
  294. return nullptr;
  295. }
  296. try
  297. {
  298. Owned<IPropertyTree> logRequestTree = createPTreeFromXMLString(logRequestStr.str());
  299. if (!logRequestTree)
  300. return nullptr;
  301. const char* guid = logRequestTree->queryProp("GUID");
  302. const char* opt = logRequestTree->queryProp("Option");
  303. const char* reqBuf = logRequestTree->queryProp("LogRequest");
  304. if (isEmptyString(reqBuf))
  305. return nullptr;
  306. StringBuffer decoded, req;
  307. JBASE64_Decode(reqBuf, decoded);
  308. LZWExpand(decoded, decoded.length(), req);
  309. return new CUpdateLogRequestWrap(guid, opt, req.str());
  310. }
  311. catch(IException* e)
  312. {
  313. StringBuffer errorStr;
  314. ERRLOG("Exception when unserializing Log Request Content: %d %s", e->errorCode(), e->errorMessage(errorStr).str());
  315. e->Release();
  316. }
  317. return nullptr;
  318. }
  319. //////////////////////////FailSafe////////////////////////////
  320. void CLogThread::checkRollOver()
  321. {
  322. try
  323. {
  324. bool bRollover = false;
  325. time_t tNow;
  326. time(&tNow);
  327. struct tm ltNow;
  328. localtime_r(&tNow, &ltNow);
  329. if ((ltNow.tm_year != m_startTime.tm_year || ltNow.tm_yday != m_startTime.tm_yday))
  330. {
  331. bRollover = true;
  332. localtime_r(&tNow, &m_startTime); // reset the start time for next rollover check
  333. }
  334. if (!bRollover)
  335. return;
  336. //Rename .log files to .old files
  337. logFailSafe->SafeRollover();
  338. CriticalBlock b(logQueueCrit);
  339. //Check and add queued requests to tank(.log) files
  340. unsigned numNewArrivals = logQueue.ordinality();
  341. if(numNewArrivals <= 0)
  342. return;
  343. ESPLOG(LogMax, "writing %d requests in the queue to the rolled over tank file.", numNewArrivals);
  344. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  345. for(unsigned i = 0; i < numNewArrivals; i++)
  346. {
  347. IInterface* pRequest = logQueue.item(i);
  348. if (!pRequest)
  349. continue;
  350. IEspUpdateLogRequestWrap* pEspRequest = dynamic_cast<IEspUpdateLogRequestWrap*>(pRequest);
  351. if(!pEspRequest)
  352. continue;
  353. StringBuffer reqBuf;
  354. const char* GUID = pEspRequest->getGUID();
  355. if(GUID && *GUID && serializeLogRequestContent(pEspRequest, reqBuf))
  356. logFailSafe->Add(GUID, reqBuf.str(), nullptr);
  357. }
  358. ESPLOG(LogNormal, "LThread:AddFailSafe: %dms\n", msTick() - startTime);
  359. }
  360. catch(IException* Ex)
  361. {
  362. StringBuffer str;
  363. Ex->errorMessage(str);
  364. IERRLOG("Exception thrown during tank file rollover: %s",str.str());
  365. Ex->Release();
  366. }
  367. catch(...)
  368. {
  369. IERRLOG("Unknown exception thrown during tank file rollover.");
  370. }
  371. }
  372. unsigned CLogThread::serializeLogRequestContent(IEspUpdateLogRequestWrap* pRequest, StringBuffer& logData)
  373. {
  374. const char* GUID = pRequest->getGUID();
  375. const char* option = pRequest->getOption();
  376. const char* logRequest = pRequest->getUpdateLogRequest();
  377. if (GUID && *GUID)
  378. logData.append("<GUID>").append(GUID).append("</GUID>");
  379. if (option && *option)
  380. logData.append("<Option>").append(option).append("</Option>");
  381. if (logRequest && *logRequest)
  382. {
  383. StringBuffer buffer;
  384. JBASE64_Encode(logRequest, strlen(logRequest), buffer, true);
  385. logData.append("<LogRequest>").append(buffer.str()).append("</LogRequest>");
  386. }
  387. return logData.length();
  388. }
  389. void CLogThread::checkPendingLogs(bool bOneRecOnly)
  390. {
  391. try
  392. {
  393. bool queueLogError = false;
  394. bool bFirst = true;
  395. StringBuffer GUID, logData;
  396. while (logFailSafe->PopPendingLogRecord(GUID, logData))
  397. {
  398. if (bFirst && !bOneRecOnly)
  399. {
  400. DBGLOG("We have old logs!. Will now try and recover the lost log messages");
  401. bFirst = false;
  402. }
  403. Owned<IEspUpdateLogRequestWrap> logRequest = unserializeLogRequestContent(logData.str(), false);
  404. if (!logRequest)
  405. IERRLOG("checkPendingLogs: failed to unserialize: %s", logData.str());
  406. else if (!enqueue(logRequest, GUID))
  407. {
  408. OERRLOG("checkPendingLogs: failed to add a log request to queue");
  409. queueLogError=true;
  410. }
  411. if (bOneRecOnly)
  412. break;
  413. }
  414. //if everything went ok then we should be able to rollover the old logs.
  415. if (!queueLogError && !bOneRecOnly)
  416. logFailSafe->RollOldLogs();
  417. }
  418. catch(IException* ex)
  419. {
  420. StringBuffer errorStr;
  421. ex->errorMessage(errorStr);
  422. IERRLOG("CheckPendingLogs: %s:" ,errorStr.str());
  423. ex->Release();
  424. }
  425. catch(...)
  426. {
  427. IERRLOG("Unknown exception thrown in CheckPendingLogs");
  428. }
  429. }
  430. //When the logData is read from a main tank file, it should be decrypted.
  431. //For non-decoupled logging agents, each logging agent may have its own tank file (with the location and
  432. //position of the main tank file). The logData from those agent tank files is not encrypted. So, it should
  433. //not be decrypted.
  434. //BTW: For non-decoupled logging agents, the logData from main tank file is read and decrypted in
  435. //checkAndReadLogRequestFromSharedTankFile().
  436. IEspUpdateLogRequestWrap* CLogThread::unserializeLogRequestContent(const char* logData, bool decompress)
  437. {
  438. if (!logData && *logData)
  439. return NULL;
  440. Owned<IPropertyTree> pLogTree = createPTreeFromXMLString(logData);
  441. if (!pLogTree)
  442. return NULL;
  443. const char* guid = pLogTree->queryProp("GUID");
  444. const char* opt = pLogTree->queryProp("Option");
  445. const char* logRequest = pLogTree->queryProp("LogRequest");
  446. if (!logRequest || !*logRequest)
  447. return NULL;
  448. StringBuffer decoded;
  449. JBASE64_Decode(logRequest, decoded);
  450. if (!decompress)
  451. return new CUpdateLogRequestWrap(guid, opt, decoded.str());
  452. StringBuffer req;
  453. LZWExpand(decoded, decoded.length(), req);
  454. return new CUpdateLogRequestWrap(guid, opt, req.str());
  455. };
  456. void CLogThread::writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite)
  457. {
  458. if (jobToWrite)
  459. {
  460. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  461. CriticalBlock b(logQueueCrit);
  462. ESPLOG(LogNormal, "LThread:waitWQ: %dms\n", msTick() - startTime);
  463. int QueueSize = logQueue.ordinality();
  464. if(QueueSize > maxLogQueueLength)
  465. OERRLOG("LOGGING QUEUE SIZE %d EXCEEDED MaxLogQueueLength %d, check the logging server.",QueueSize, maxLogQueueLength);
  466. if(QueueSize!=0 && QueueSize % signalGrowingQueueAt == 0)
  467. OERRLOG("Logging Queue at %d records. Check the logging server.",QueueSize);
  468. logQueue.enqueue(LINK(jobToWrite));
  469. }
  470. }
  471. IEspUpdateLogRequestWrap* CLogThread::readJobQueue()
  472. {
  473. #define LOG_LEVEL LogNormal
  474. unsigned startTime = (getEspLogLevel()>=LOG_LEVEL) ? msTick() : 0;
  475. CriticalBlock b(logQueueCrit);
  476. unsigned delta = (getEspLogLevel()>=LOG_LEVEL) ? msTick() - startTime : 0;
  477. if (delta > 1) // <=1ms is not indicative of an unexpected delay
  478. ESPLOG(LOG_LEVEL, "LThread:waitRQ: %dms", delta);
  479. return (IEspUpdateLogRequestWrap*)logQueue.dequeue();
  480. #undef LOG_LEVEL
  481. }
  482. void CLogThread::checkAndCreateFile(const char* fileName)
  483. {
  484. Owned<IFile> file = createIFile(fileName);
  485. if(file->isFile() != fileBool::notFound)
  486. return;
  487. StringBuffer dir;
  488. splitFilename(fileName, &dir, &dir, nullptr, nullptr);
  489. recursiveCreateDirectory(dir);
  490. Owned<IFileIO> io = file->openShared(IFOcreate, IFSHfull);
  491. PROGLOG("CLogThread::checkAndCreateFile: %s is created.", fileName);
  492. }
  493. CLogRequestReader::~CLogRequestReader()
  494. {
  495. stopping = true;
  496. sem.signal();
  497. threaded.join();
  498. }
  499. void CLogRequestReader::threadmain()
  500. {
  501. PROGLOG("LogRequest Reader Thread started.");
  502. readAcked(settings->ackedFileList, ackedLogFileCheckList);
  503. readAcked(settings->ackedLogRequestFile, ackedLogRequests);
  504. unsigned waitMillSeconds = 1000*settings->waitSeconds;
  505. while (!stopping)
  506. {
  507. ESPLOG(LogMax, "#### CLogRequestReader: the loop for reading log requests begins.");
  508. if (!paused)
  509. {
  510. try
  511. {
  512. ESPLOG(LogMax, "#### CLogRequestReader: waiting for readLogRequest().");
  513. CriticalBlock b(crit);
  514. readLogRequest();
  515. if (newAckedLogFiles.length())
  516. {
  517. updateAckedFileList();
  518. updateAckedLogRequestList();
  519. }
  520. PROGLOG("CLogRequestReader: finished the loop for reading log requests.");
  521. }
  522. catch(IException *e)
  523. {
  524. StringBuffer msg;
  525. IERRLOG("Exception %d:%s in CLogRequestReader::threadmain()", e->errorCode(), e->errorMessage(msg).str());
  526. e->Release();
  527. }
  528. catch(...)
  529. {
  530. IERRLOG("Unknown exception in CLogRequestReader::threadmain()");
  531. }
  532. }
  533. sem.wait(waitMillSeconds);
  534. }
  535. PROGLOG("LogRequest Reader Thread terminated.");
  536. }
  537. void CLogRequestReader::reportAckedLogFiles(StringArray& ackedLogFiles)
  538. {
  539. CriticalBlock b(crit);
  540. for (auto r : ackedLogFileCheckList)
  541. ackedLogFiles.append(r.c_str());
  542. ESPLOG(LogMax, "#### The reportAckedLogFiles() done.");
  543. }
  544. //This method is used to setup an acked file list which contains the acked files for all agents.
  545. //The first agent reports all the acked files in that agent using the reportAckedLogFiles().
  546. //Using this method, the list of the acked files is given to the rest agents. If any file inside
  547. //the list has not been acked in the rest agents, the file should be removed from the list.
  548. void CLogRequestReader::removeUnknownAckedLogFiles(StringArray& ackedLogFiles)
  549. {
  550. CriticalBlock b(crit);
  551. ForEachItemInRev(i, ackedLogFiles)
  552. {
  553. const char* node = ackedLogFiles.item(i);
  554. if (ackedLogFileCheckList.find(node) == ackedLogFileCheckList.end())
  555. ackedLogFiles.remove(i);
  556. }
  557. ESPLOG(LogMax, "#### The removeUnknownAckedLogFiles() done.");
  558. }
  559. void CLogRequestReader::addNewAckedFileList(const char* list, StringArray& fileNames)
  560. {
  561. OwnedIFile newList = createIFile(list);
  562. if (!newList)
  563. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to access %s", list);
  564. if (newList->exists())
  565. {
  566. if (!newList->remove())
  567. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to remove old %s", list);
  568. }
  569. OwnedIFileIO newListIO = newList->open(IFOwrite);
  570. if (!newListIO)
  571. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to open %s", list);
  572. offset_t pos = 0;
  573. ForEachItemIn(i, fileNames)
  574. {
  575. const char* fileName = fileNames.item(i);
  576. StringBuffer line(fileName);
  577. line.append("\r\n");
  578. unsigned len = line.length();
  579. newListIO->write(pos, len, line.str());
  580. pos += len;
  581. PROGLOG("Add AckedLogFile %s to %s", fileName, list);
  582. }
  583. }
  584. //The file names in the fileNames should be removed from both ackedLogFileCheckList
  585. //and settings->ackedFileList.
  586. void CLogRequestReader::cleanAckedLogFiles(StringArray& fileNames)
  587. {
  588. CriticalBlock b(crit);
  589. //Find which file should not be removed from ackedLogFileCheckList.
  590. StringArray fileNamesToKeep;
  591. for (auto r : ackedLogFileCheckList)
  592. {
  593. if (!fileNames.contains(r.c_str()))
  594. fileNamesToKeep.append(r.c_str());
  595. }
  596. //Create a temp file with the fileNamesToKeep for replacing the settings->ackedFileList
  597. VStringBuffer tempFileName("%s.tmp", settings->ackedFileList.str());
  598. addNewAckedFileList(tempFileName, fileNamesToKeep);
  599. //Replace the settings->ackedFileList with the temp file
  600. renameFile(settings->ackedFileList, tempFileName, true);
  601. PROGLOG("Rename %s to %s", tempFileName.str(), settings->ackedFileList.str());
  602. //Create new ackedLogFileCheckList based on fileNamesToKeep
  603. ackedLogFileCheckList.clear();
  604. ForEachItemIn(j, fileNamesToKeep)
  605. {
  606. const char* name = fileNamesToKeep.item(j);
  607. ackedLogFileCheckList.insert(name);
  608. PROGLOG("Add %s to new ackedLogFileCheckList", name);
  609. }
  610. }
  611. void CLogRequestReader::readAcked(const char* fileName, std::set<std::string>& acked)
  612. {
  613. Owned<IFile> f = createIFile(fileName);
  614. if (f)
  615. {
  616. OwnedIFileIO io = f->openShared(IFOread, IFSHfull);
  617. if (io)
  618. {
  619. StringBuffer line;
  620. OwnedIFileIOStream ios = createIOStream(io);
  621. Owned<IStreamLineReader> lineReader = createLineReader(ios, true);
  622. while(!lineReader->readLine(line.clear()))
  623. {
  624. if (line.isEmpty())
  625. continue;
  626. unsigned len = line.length();
  627. if ((len > 1) && (line.charAt(len - 2) == '\r') && (line.charAt(len - 1) == '\n'))
  628. line.setLength(len - 2); //remove \r\n
  629. else if ((len > 0) && ((line.charAt(len - 1) == '\r') || (line.charAt(len - 1) == '\n')))
  630. line.setLength(len - 1); //remove \r or \n
  631. if (line.length() > 0) //Check just in case
  632. acked.insert(line.str());
  633. PROGLOG("Found Acked %s from %s", line.str(), fileName);
  634. }
  635. }
  636. }
  637. else
  638. {
  639. f.setown(createIFile(fileName));
  640. Owned<IFileIO> io = f->open(IFOcreate);
  641. }
  642. }
  643. void CLogRequestReader::readLogRequest()
  644. {
  645. ESPLOG(LogMax, "#### Enter readLogRequest()");
  646. StringAttr tankFileNotFinished;//Today's newest tank file.
  647. findTankFileNotFinished(tankFileNotFinished);
  648. offset_t tankFileNotFinishedPos = 0;
  649. Owned<IDirectoryIterator> it = createDirectoryIterator(settings->tankFileDir.get(), "*.log");
  650. ForEach (*it)
  651. {
  652. const char *fileNameWithPath = it->query().queryFilename();
  653. const char *fileName = pathTail(fileNameWithPath);
  654. if (ackedLogFileCheckList.find(fileName) != ackedLogFileCheckList.end())
  655. {
  656. ESPLOG(LogMax, "####Skip tank file: %s. It is in the Acked File list.", fileName);
  657. continue;
  658. }
  659. if (readLogRequestsFromTankFile(fileNameWithPath, tankFileNotFinished, tankFileNotFinishedPos))
  660. addToAckedLogFileList(fileName, fileNameWithPath);
  661. }
  662. addPendingLogsToQueue();
  663. if (tankFileNotFinishedPos)
  664. {//In the next loop, we may skip the log requests which have been read in this loop.
  665. lastTankFile = tankFileNotFinished;
  666. lastTankFilePos = tankFileNotFinishedPos;
  667. }
  668. ESPLOG(LogMax, "#### Leave readLogRequest()");
  669. }
  670. void CLogRequestReader::findTankFileNotFinished(StringAttr& tankFileNotFinished)
  671. {
  672. StringBuffer todayString;
  673. unsigned year, month, day;
  674. CDateTime now;
  675. now.setNow();
  676. now.getDate(year, month, day, true);
  677. todayString.appendf("%04d_%02d_%02d_00_00_00", year, month, day);
  678. StringBuffer lastTimeString;
  679. Owned<IDirectoryIterator> it = createDirectoryIterator(settings->tankFileDir.get(), "*");
  680. ForEach (*it)
  681. {
  682. StringBuffer timeString;
  683. const char* aFileNameWithPath = it->query().queryFilename();
  684. const char* aFileName = pathTail(aFileNameWithPath);
  685. getTankFileTimeString(aFileName, timeString);
  686. if (timeString.isEmpty())
  687. {
  688. IERRLOG("Failed to parse tank file name: %s", aFileName);
  689. continue;
  690. }
  691. if (strcmp(todayString, timeString) > 0) //Not created today
  692. continue;
  693. if (strcmp(timeString, lastTimeString) > 0) //a newer file is found.
  694. {
  695. lastTimeString.set(timeString);
  696. tankFileNotFinished.set(aFileNameWithPath);
  697. }
  698. }
  699. }
  700. StringBuffer& CLogRequestReader::getTankFileTimeString(const char* fileName, StringBuffer& timeString)
  701. {
  702. const char* ptr = strstr(fileName, sendLogKeyword);
  703. if (!ptr)
  704. return timeString;
  705. ptr += sendLogKeywordLen;
  706. if (!ptr)
  707. return timeString;
  708. ptr = strchr(ptr, '.');
  709. if (ptr && (strlen(ptr) > dateTimeStringLength))
  710. timeString.append(dateTimeStringLength, ++ptr); //yyyy_mm_dd_hh_mm_ss
  711. return timeString;
  712. }
  713. bool CLogRequestReader::readLogRequestsFromTankFile(const char* fileName, StringAttr& tankFileNotFinished, offset_t& tankFileNotFinishedPos)
  714. {
  715. ESPLOG(LogMax, "#### Enter readLogRequestsFromTankFile(): %s", fileName);
  716. Owned<IFile> file = createIFile(fileName);
  717. if (!file) //This can only happen at start time. So, throw exception.
  718. throw MakeStringException(-1, "Unable to find logging file %s", fileName);
  719. Owned<IFileIO> fileIO = file->open(IFOread);
  720. if (!fileIO)
  721. throw MakeStringException(-1, "Unable to open logging file %s", fileName);
  722. //Sample: 00009902 0421311217.2019_03_29_14_32_11 <cache><GUID>0421311217.2019_03_29_14_32_11</GUID>
  723. //<option>SingleInsert</option><LogRequest>dUgAAH...AA==</LogRequest></cache>
  724. offset_t finger = getReadFilePos(fileName);
  725. unsigned totalMissed = 0;
  726. while(true)
  727. {
  728. MemoryBuffer data;
  729. CLogSerializer logSerializer;
  730. if (!logSerializer.readALogLine(fileIO, finger, data))
  731. break;
  732. StringBuffer GUID, logRequest;
  733. if (!parseLogRequest(data, GUID, logRequest))
  734. IERRLOG("Invalid logging request in %s", fileName);
  735. else if (ackedLogRequests.find(GUID.str()) == ackedLogRequests.end())
  736. {//This QUID is not acked.
  737. totalMissed++;
  738. if (pendingLogGUIDs.find(GUID.str()) == pendingLogGUIDs.end())
  739. {//This QUID has not been queued yet.
  740. PROGLOG("Found new log request %s from tank file %s. Added to pending logs.", GUID.str(), fileName);
  741. pendingLogGUIDs.insert(GUID.str());
  742. pendingLogs[GUID.str()] = logRequest.str();
  743. if (pendingLogs.size() > settings->pendingLogBufferSize)
  744. addPendingLogsToQueue();
  745. }
  746. }
  747. }
  748. bool isTankFileNotFinished = !isEmptyString(tankFileNotFinished) && strieq(fileName, tankFileNotFinished);
  749. if (isTankFileNotFinished)
  750. tankFileNotFinishedPos = fileIO->size();
  751. ESPLOG(LogMax, "#### Leave readLogRequestsFromTankFile(): %s, totalMissed(%d)", fileName, totalMissed);
  752. return (totalMissed == 0) && !isTankFileNotFinished;
  753. }
  754. offset_t CLogRequestReader::getReadFilePos(const char* fileName)
  755. {
  756. const char* lastTankFileName = lastTankFile.get();
  757. if (lastTankFileName && strieq(lastTankFileName, fileName))
  758. return lastTankFilePos;
  759. return 0;
  760. }
  761. bool CLogRequestReader::parseLogRequest(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& logLine)
  762. {
  763. //The rawdata should be in the form of 2635473460.05_01_12_16_13_57\t<cache>...</cache>
  764. //parse it into GUID and logLine (as <cache>...</cache>)
  765. const char* begin = rawdata.toByteArray(); //no string termination character \0
  766. unsigned len = rawdata.length();
  767. if (!begin || (len == 0))
  768. return false;
  769. const char* ptr = begin;
  770. const char* end = begin + len;
  771. while ((ptr < end) && (*ptr != '\t'))
  772. ptr++;
  773. if ((ptr == end) || (ptr == begin))
  774. return false;
  775. GUID.append(ptr - begin, begin);
  776. if (++ptr == end)
  777. return false;
  778. logLine.append(end - ptr, ptr);
  779. return true;
  780. }
  781. void CLogRequestReader::addPendingLogsToQueue()
  782. {
  783. ESPLOG(LogMax, "#### Enter addPendingLogsToQueue()");
  784. //Add the pendingLogs to log queue
  785. if (pendingLogs.size())
  786. ESPLOG(LogMin, "Adding %zu Pending Log Request(s) to job queue", pendingLogs.size());
  787. StringArray queuedPendingLogs;
  788. for (auto const& x : pendingLogs)
  789. {
  790. Owned<IEspUpdateLogRequestWrap> logRequest = logThread->unserializeLogRequestContent(x.second.c_str(), true);
  791. if (!logRequest)
  792. IERRLOG("addPendingLogsToQueue: failed to unserialize: %s", x.second.c_str());
  793. logThread->queueLog(logRequest);
  794. queuedPendingLogs.append(x.first.c_str());
  795. PROGLOG("Enqueue: %s", x.first.c_str());
  796. }
  797. //Clean the pendingLogs
  798. ForEachItemIn(i, queuedPendingLogs)
  799. pendingLogs.erase(queuedPendingLogs.item(i));
  800. ESPLOG(LogMax, "#### Leave addPendingLogsToQueue()");
  801. }
  802. void CLogRequestReader::addACK(const char* GUID)
  803. {
  804. ESPLOG(LogMax, "#### Enter addACK(): %s", GUID);
  805. CriticalBlock b(crit);
  806. Owned<IFile> f = createIFile(settings->ackedLogRequestFile);
  807. Owned<IFileIO> io = f->open(IFOwrite);
  808. StringBuffer toWrite,size;
  809. toWrite.appendf("%s\r\n", GUID);
  810. io->write(io->size(), toWrite.length(), toWrite.str());
  811. ackedLogRequests.insert(GUID);
  812. pendingLogGUIDs.erase(GUID);
  813. ESPLOG(LogMax, "#### addACK(): %s acked", GUID);
  814. }
  815. void CLogRequestReader::addToAckedLogFileList(const char* fileName, const char* fileNameWithPath)
  816. {
  817. PROGLOG("Found an AckedLogFile %s.", fileNameWithPath);
  818. newAckedLogFiles.append(fileNameWithPath);
  819. ackedLogFileCheckList.insert(fileName);
  820. }
  821. //Update newAckedLogFiles to file settings->ackedFileList,
  822. void CLogRequestReader::updateAckedFileList()
  823. {
  824. ESPLOG(LogMax, "#### Enter updateAckedFileList()");
  825. OwnedIFile ackedFiles = createIFile(settings->ackedFileList);
  826. if (!ackedFiles)
  827. return; //Should never happen
  828. OwnedIFileIO ackedFilesIO = ackedFiles->open(IFOwrite);
  829. if (!ackedFilesIO)
  830. return; //Should never happen
  831. offset_t pos = ackedFilesIO->size();
  832. ForEachItemIn(i, newAckedLogFiles)
  833. {
  834. const char* fileNameWithPath = newAckedLogFiles.item(i);
  835. StringBuffer fileName(pathTail(fileNameWithPath));
  836. PROGLOG("Add AckedLogFile %s to %s", fileName.str(), settings->ackedFileList.str());
  837. //Remove log request from the ackedLogRequests
  838. GuidSet logRequestsToRemove;
  839. CLogSerializer ackedLog(fileNameWithPath);
  840. ackedLog.loadAckedLogs(logRequestsToRemove);
  841. for (auto r : logRequestsToRemove)
  842. ackedLogRequests.erase(r.c_str());
  843. fileName.append("\r\n");
  844. unsigned len = strlen(fileName);
  845. ackedFilesIO->write(pos, len, fileName);
  846. pos += len;
  847. }
  848. newAckedLogFiles.clear();
  849. ESPLOG(LogMax, "#### Leave updateAckedFileList()");
  850. }
  851. void CLogRequestReader::updateAckedLogRequestList()
  852. {
  853. ESPLOG(LogMax, "#### Enter updateAckedLogRequestList()");
  854. OwnedIFile newAckedLogRequestFile = createIFile(settings->ackedLogRequestFile);
  855. if (newAckedLogRequestFile)
  856. {
  857. newAckedLogRequestFile->remove();
  858. PROGLOG("Clean %s", settings->ackedLogRequestFile.str());
  859. }
  860. OwnedIFileIO newAckedLogRequestFileIO = newAckedLogRequestFile->open(IFOwrite);
  861. if (!newAckedLogRequestFileIO)
  862. return; //Should never happen
  863. offset_t pos = 0;
  864. for (auto r : ackedLogRequests)
  865. {
  866. StringBuffer line(r.c_str());
  867. line.append("\r\n");
  868. unsigned len = line.length();
  869. newAckedLogRequestFileIO->write(pos, len, line.str());
  870. pos += len;
  871. PROGLOG("Add AckedLogRequest %s to %s", line.str(), settings->ackedLogRequestFile.str());
  872. }
  873. ESPLOG(LogMax, "#### Leave updateAckedLogRequestList()");
  874. }