logthread.cpp 36 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jmisc.hpp"
  14. #include "jexcept.hpp"
  15. #include "jdebug.hpp"
  16. #include "LoggingErrors.hpp"
  17. #include "LogSerializer.hpp"
  18. #include "logthread.hpp"
  19. #include "compressutil.hpp"
  20. const char* const PropMaxLogQueueLength = "MaxLogQueueLength";
  21. const char* const PropQueueSizeSignal = "QueueSizeSignal";
  22. const char* const PropMaxTriesRS = "MaxTriesRS";
  23. const char* const PropFailSafe = "FailSafe";
  24. const char* const PropDisableFailSafe = "DisableFailSafe";
  25. const char* const PropAckedFiles = "AckedFiles";
  26. const char* const PropDefaultAckedFiles = "AckedFiles";
  27. const char* const PropAckedLogRequests = "AckedLogRequests";
  28. const char* const PropDefaultAckedLogRequests = "AckedLogRequests";
  29. const char* const PropPendingLogBufferSize = "PendingLogBufferSize";
  30. const char* const PropReadRequestWaitingSeconds = "ReadRequestWaitingSeconds";
  31. const char* const sendLogKeyword = "_sending_";
  32. const unsigned sendLogKeywordLen = strlen(sendLogKeyword);
  33. const unsigned dateTimeStringLength = 19; //yyyy_mm_dd_hh_mm_ss
  34. #define MaxLogQueueLength 500000 //Write a warning into log when queue length is greater than 500000
  35. #define QueueSizeSignal 10000 //Write a warning into log when queue length is increased by 10000
  36. const int DefaultMaxTriesRS = -1; // Max. # of attempts to send log message to WsReportService. Default: infinite
  37. extern LOGGINGCOMMON_API IUpdateLogThread* createUpdateLogThread(IPropertyTree* _cfg, const char* _service, const char* _agentName,
  38. const char* _tankFileDir, IEspLogAgent* _logAgent)
  39. {
  40. if (!_cfg)
  41. return NULL;
  42. IUpdateLogThread* loggingThread = new CLogThread(_cfg, _service, _agentName, _logAgent, _tankFileDir);
  43. loggingThread->start();
  44. return loggingThread;
  45. }
  46. CLogThread::CLogThread(IPropertyTree* _cfg , const char* _service, const char* _agentName, IEspLogAgent* _logAgent, const char* _tankFileDir)
  47. : stopping(false), agentName(_agentName), tankFileDir(_tankFileDir)
  48. {
  49. if(!_agentName || !*_agentName)
  50. throw MakeStringException(-1,"No Logging agent name defined");
  51. if(!_cfg)
  52. throw MakeStringException(-1,"No Logging agent Configuration for %s", _agentName);
  53. if(!_service || !*_service)
  54. throw MakeStringException(-1,"No service name defined for %s", _agentName);
  55. if(!_logAgent)
  56. throw MakeStringException(-1,"No Logging agent interface for %s", _agentName);
  57. logAgent.setown(_logAgent);
  58. maxLogQueueLength = _cfg->getPropInt(PropMaxLogQueueLength, MaxLogQueueLength);
  59. signalGrowingQueueAt = _cfg->getPropInt(PropQueueSizeSignal, QueueSizeSignal);
  60. maxLogRetries = _cfg->getPropInt(PropMaxTriesRS, DefaultMaxTriesRS);
  61. //For decoupled logging, the fail safe is not needed because the logging agent always
  62. //picks up the logging requests from tank file.
  63. ensureFailSafe = _cfg->getPropBool(PropFailSafe) && !_cfg->getPropBool(PropDisableFailSafe, false);
  64. if(ensureFailSafe)
  65. {
  66. logFailSafe.setown(createFailSafeLogger(_cfg, _service, _agentName));
  67. PROGLOG("FailSafe ensured for %s", agentName.get());
  68. }
  69. time_t tNow;
  70. time(&tNow);
  71. localtime_r(&tNow, &m_startTime);
  72. if (tankFileDir.get())
  73. {
  74. Owned<CLogRequestReaderSettings> settings = new CLogRequestReaderSettings();
  75. settings->tankFileDir.set(tankFileDir.get());
  76. const char* ackedFiles = _cfg->queryProp(PropAckedFiles);
  77. settings->ackedFileList.set(isEmptyString(ackedFiles) ? PropDefaultAckedFiles : ackedFiles);
  78. const char* ackedLogRequestFile = _cfg->queryProp(PropAckedLogRequests);
  79. settings->ackedLogRequestFile.set(isEmptyString(ackedLogRequestFile) ? PropDefaultAckedLogRequests : ackedLogRequestFile);
  80. int pendingLogBufferSize = _cfg->getPropInt(PropPendingLogBufferSize, DEFAULTPENDINGLOGBUFFERSIZE);
  81. if (pendingLogBufferSize <= 0)
  82. throw MakeStringException(-1, "The %s (%d) should be greater than 0.", PropPendingLogBufferSize, pendingLogBufferSize);
  83. settings->pendingLogBufferSize = pendingLogBufferSize;
  84. int waitSeconds = _cfg->getPropInt(PropReadRequestWaitingSeconds, DEFAULTREADLOGREQUESTWAITSECOND);
  85. if (waitSeconds <= 0)
  86. throw MakeStringException(-1, "The %s (%d) should be greater than 0.", PropReadRequestWaitingSeconds, waitSeconds);
  87. settings->waitSeconds = waitSeconds;
  88. PROGLOG("%s %s: %s", agentName.get(), PropAckedFiles, settings->ackedFileList.str());
  89. PROGLOG("%s %s: %s", agentName.get(), PropDefaultAckedLogRequests, settings->ackedLogRequestFile.str());
  90. PROGLOG("%s %s: %d. %s: %d", agentName.get(), PropReadRequestWaitingSeconds, settings->waitSeconds, PropPendingLogBufferSize, settings->pendingLogBufferSize);
  91. checkAndCreateFile(settings->ackedFileList);
  92. checkAndCreateFile(settings->ackedLogRequestFile);
  93. logRequestReader.setown(new CLogRequestReader(settings.getClear(), this));
  94. }
  95. PROGLOG("%s CLogThread started.", agentName.get());
  96. }
  97. CLogThread::~CLogThread()
  98. {
  99. ESPLOG(LogMax, "CLogThread::~CLogThread()");
  100. }
  101. void CLogThread::start()
  102. {
  103. Thread::start();
  104. }
  105. int CLogThread::run()
  106. {
  107. Link();
  108. if (!logRequestReader && logFailSafe.get())
  109. checkPendingLogs(false);
  110. while(!stopping)
  111. {
  112. m_sem.wait(UPDATELOGTHREADWAITINGTIME);
  113. sendLog();
  114. if (!logRequestReader && logFailSafe.get())
  115. {
  116. checkPendingLogs(true);
  117. checkRollOver();
  118. }
  119. }
  120. Release();
  121. return 0;
  122. }
  123. void CLogThread::stop()
  124. {
  125. try
  126. {
  127. CriticalBlock b(logQueueCrit);
  128. if (!logQueue.ordinality() && !logRequestReader && logFailSafe.get() && logFailSafe->canRollCurrentLog())
  129. logFailSafe->RollCurrentLog();
  130. //If logQueue is not empty, the log files are rolled over so that queued jobs can be read
  131. //when the CLogThread is restarted.
  132. }
  133. catch(...)
  134. {
  135. DBGLOG("Exception");
  136. }
  137. stopping = true;
  138. m_sem.signal();
  139. join();
  140. }
  141. bool CLogThread::queueLog(IEspUpdateLogRequest* logRequest)
  142. {
  143. if (!logRequest)
  144. return false;
  145. Owned<IEspUpdateLogRequestWrap> logRequestWrap = new CUpdateLogRequestWrap(NULL, logRequest->getOption(), logRequest->getLogContent());
  146. return enqueue(logRequestWrap, nullptr);
  147. }
  148. bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
  149. {
  150. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  151. Owned<IEspUpdateLogRequestWrap> logRequestFiltered = logAgent->filterLogContent(logRequest);
  152. ESPLOG(LogNormal, "LThread:filterLog: %dms\n", msTick() - startTime);
  153. return enqueue(logRequestFiltered, nullptr);
  154. }
  155. bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid)
  156. {
  157. if (!logRequestReader && logFailSafe.get())
  158. {
  159. StringBuffer GUID, reqBuf;
  160. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  161. if (isEmptyString(guid))
  162. logFailSafe->GenerateGUID(GUID, nullptr);
  163. else
  164. GUID.set(guid);
  165. logRequest->setGUID(GUID.str());
  166. if (serializeLogRequestContent(logRequest, reqBuf))
  167. logFailSafe->Add(GUID, reqBuf.str(), nullptr);
  168. ESPLOG(LogNormal, "LThread:addToFailSafe: %dms\n", msTick() - startTime);
  169. }
  170. writeJobQueue(logRequest);
  171. m_sem.signal();
  172. return true;
  173. }
  174. void CLogThread::sendLog()
  175. {
  176. try
  177. {
  178. if(stopping)
  179. return;
  180. int recSend = 0;
  181. while(true)
  182. {
  183. IEspUpdateLogRequestWrap* logRequest = readJobQueue();
  184. if (!logRequest)
  185. break;
  186. const char* GUID= logRequest->getGUID();
  187. if ((!GUID || !*GUID) && ensureFailSafe && logFailSafe.get())
  188. continue;
  189. PROGLOG("Sending %s ...\n", GUID);
  190. Owned<IEspUpdateLogRequestWrap> logRequestInFile;
  191. if (!logRequestReader)
  192. logRequestInFile.setown(checkAndReadLogRequestFromSharedTankFile(logRequest));
  193. try
  194. {
  195. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  196. Owned<IEspUpdateLogResponse> logResponse = createUpdateLogResponse();
  197. if (logRequestInFile)
  198. logAgent->updateLog(*logRequestInFile, *logResponse);
  199. else
  200. logAgent->updateLog(*logRequest, *logResponse);
  201. if (!logResponse)
  202. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "no response");
  203. if (logResponse->getStatusCode())
  204. {
  205. const char* statusMessage = logResponse->getStatusMessage();
  206. if(statusMessage && *statusMessage)
  207. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "%s", statusMessage);
  208. else
  209. throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "Unknown error");
  210. }
  211. ESPLOG(LogNormal, "LThread:updateLog: %dms\n", msTick() - startTime);
  212. if (logRequestReader)
  213. {
  214. unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  215. logRequestReader->addACK(GUID);
  216. PROGLOG("%s acked: %dms\n", GUID, msTick() - startTime1);
  217. }
  218. else if(ensureFailSafe && logFailSafe.get())
  219. {
  220. unsigned startTime1 = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  221. logFailSafe->AddACK(GUID);
  222. ESPLOG(LogNormal, "LThread:AddACK: %dms\n", msTick() - startTime1);
  223. }
  224. logRequest->Release();//Make sure that no data (such as GUID) is needed before releasing the logRequest.
  225. }
  226. catch(IException* e)
  227. {
  228. StringBuffer errorStr, errorMessage;
  229. errorMessage.appendf("Failed to update log for %s: error code %d, error message %s", GUID, e->errorCode(), e->errorMessage(errorStr).str());
  230. e->Release();
  231. if (logRequestInFile)
  232. logRequest->setNoResend(logRequestInFile->getNoResend());
  233. bool willRetry = false;
  234. if (!logRequest->getNoResend() && (maxLogRetries != 0))
  235. {
  236. unsigned retry = logRequest->incrementRetryCount();
  237. if (retry > maxLogRetries)
  238. errorMessage.append(" Max logging retries exceeded.");
  239. else
  240. {
  241. willRetry = true;
  242. writeJobQueue(logRequest);
  243. errorMessage.appendf(" Adding back to logging queue for retrying %d.", retry);
  244. }
  245. }
  246. if (!willRetry)
  247. {
  248. logRequest->Release();
  249. }
  250. IERRLOG("%s", errorMessage.str());
  251. }
  252. }
  253. }
  254. catch(IException* e)
  255. {
  256. StringBuffer errorStr, errorMessage;
  257. errorMessage.append("Exception thrown within update log thread: error code ").append(e->errorCode()).append(", error message ").append(e->errorMessage(errorStr));
  258. IERRLOG("%s", errorMessage.str());
  259. e->Release();
  260. }
  261. catch(...)
  262. {
  263. IERRLOG("Unknown exception thrown within update log thread");
  264. }
  265. return;
  266. }
  267. //At first, we check whether the logRequest contains the information about original log Request
  268. //in shared tank file created by logging manager. If yes, try to read the original log Request
  269. //based on the information. If the original log Request is found and unserialized, return new
  270. //IEspUpdateLogRequestWrap which contains original log Request.
  271. IEspUpdateLogRequestWrap* CLogThread::checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest)
  272. {
  273. //Read LogRequestInFile info if exists.
  274. Owned<IPropertyTree> logInFle = createPTreeFromXMLString(logRequest->getUpdateLogRequest());
  275. if (!logInFle)
  276. return nullptr;
  277. const char* GUID = logInFle->queryProp(LOGREQUEST_GUID);
  278. if (isEmptyString(GUID))
  279. return nullptr;
  280. const char* fileName = logInFle->queryProp(LOGCONTENTINFILE_FILENAME);
  281. if (isEmptyString(fileName))
  282. return nullptr;
  283. __int64 pos = logInFle->getPropInt64(LOGCONTENTINFILE_FILEPOS, -1);
  284. if (pos < 0)
  285. return nullptr;
  286. int size = logInFle->getPropInt64(LOGCONTENTINFILE_FILESIZE, -1);
  287. if (size < 0)
  288. return nullptr;
  289. Owned<CLogRequestInFile> reqInFile = new CLogRequestInFile();
  290. reqInFile->setGUID(GUID);
  291. reqInFile->setFileName(fileName);
  292. reqInFile->setPos(pos);
  293. reqInFile->setSize(size);
  294. //Read Log Request from the file
  295. StringBuffer logRequestStr;
  296. CLogSerializer logSerializer;
  297. if (!logSerializer.readLogRequest(reqInFile, logRequestStr))
  298. {
  299. ERRLOG("Failed to read Log Request from %s", fileName);
  300. return nullptr;
  301. }
  302. try
  303. {
  304. Owned<IPropertyTree> logRequestTree = createPTreeFromXMLString(logRequestStr.str());
  305. if (!logRequestTree)
  306. return nullptr;
  307. const char* guid = logRequestTree->queryProp("GUID");
  308. const char* opt = logRequestTree->queryProp("Option");
  309. const char* reqBuf = logRequestTree->queryProp("LogRequest");
  310. if (isEmptyString(reqBuf))
  311. return nullptr;
  312. StringBuffer decoded, req;
  313. JBASE64_Decode(reqBuf, decoded);
  314. LZWExpand(decoded, decoded.length(), req);
  315. return new CUpdateLogRequestWrap(guid, opt, req.str());
  316. }
  317. catch(IException* e)
  318. {
  319. StringBuffer errorStr;
  320. ERRLOG("Exception when unserializing Log Request Content: %d %s", e->errorCode(), e->errorMessage(errorStr).str());
  321. e->Release();
  322. }
  323. return nullptr;
  324. }
  325. //////////////////////////FailSafe////////////////////////////
  326. void CLogThread::checkRollOver()
  327. {
  328. try
  329. {
  330. bool bRollover = false;
  331. time_t tNow;
  332. time(&tNow);
  333. struct tm ltNow;
  334. localtime_r(&tNow, &ltNow);
  335. if ((ltNow.tm_year != m_startTime.tm_year || ltNow.tm_yday != m_startTime.tm_yday))
  336. {
  337. bRollover = true;
  338. localtime_r(&tNow, &m_startTime); // reset the start time for next rollover check
  339. }
  340. if (!bRollover)
  341. return;
  342. //Rename .log files to .old files
  343. logFailSafe->SafeRollover();
  344. CriticalBlock b(logQueueCrit);
  345. //Check and add queued requests to tank(.log) files
  346. unsigned numNewArrivals = logQueue.ordinality();
  347. if(numNewArrivals <= 0)
  348. return;
  349. ESPLOG(LogMax, "writing %d requests in the queue to the rolled over tank file.", numNewArrivals);
  350. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  351. for(unsigned i = 0; i < numNewArrivals; i++)
  352. {
  353. IInterface* pRequest = logQueue.item(i);
  354. if (!pRequest)
  355. continue;
  356. IEspUpdateLogRequestWrap* pEspRequest = dynamic_cast<IEspUpdateLogRequestWrap*>(pRequest);
  357. if(!pEspRequest)
  358. continue;
  359. StringBuffer reqBuf;
  360. const char* GUID = pEspRequest->getGUID();
  361. if(GUID && *GUID && serializeLogRequestContent(pEspRequest, reqBuf))
  362. logFailSafe->Add(GUID, reqBuf.str(), nullptr);
  363. }
  364. ESPLOG(LogNormal, "LThread:AddFailSafe: %dms\n", msTick() - startTime);
  365. }
  366. catch(IException* Ex)
  367. {
  368. StringBuffer str;
  369. Ex->errorMessage(str);
  370. IERRLOG("Exception thrown during tank file rollover: %s",str.str());
  371. Ex->Release();
  372. }
  373. catch(...)
  374. {
  375. IERRLOG("Unknown exception thrown during tank file rollover.");
  376. }
  377. }
  378. unsigned CLogThread::serializeLogRequestContent(IEspUpdateLogRequestWrap* pRequest, StringBuffer& logData)
  379. {
  380. const char* GUID = pRequest->getGUID();
  381. const char* option = pRequest->getOption();
  382. const char* logRequest = pRequest->getUpdateLogRequest();
  383. if (GUID && *GUID)
  384. logData.append("<GUID>").append(GUID).append("</GUID>");
  385. if (option && *option)
  386. logData.append("<Option>").append(option).append("</Option>");
  387. if (logRequest && *logRequest)
  388. {
  389. StringBuffer buffer;
  390. JBASE64_Encode(logRequest, strlen(logRequest), buffer, true);
  391. logData.append("<LogRequest>").append(buffer.str()).append("</LogRequest>");
  392. }
  393. return logData.length();
  394. }
  395. void CLogThread::checkPendingLogs(bool bOneRecOnly)
  396. {
  397. try
  398. {
  399. bool queueLogError = false;
  400. bool bFirst = true;
  401. StringBuffer GUID, logData;
  402. while (logFailSafe->PopPendingLogRecord(GUID, logData))
  403. {
  404. if (bFirst && !bOneRecOnly)
  405. {
  406. DBGLOG("We have old logs!. Will now try and recover the lost log messages");
  407. bFirst = false;
  408. }
  409. Owned<IEspUpdateLogRequestWrap> logRequest = unserializeLogRequestContent(logData.str(), false);
  410. if (!logRequest)
  411. IERRLOG("checkPendingLogs: failed to unserialize: %s", logData.str());
  412. else if (!enqueue(logRequest, GUID))
  413. {
  414. OERRLOG("checkPendingLogs: failed to add a log request to queue");
  415. queueLogError=true;
  416. }
  417. if (bOneRecOnly)
  418. break;
  419. }
  420. //if everything went ok then we should be able to rollover the old logs.
  421. if (!queueLogError && !bOneRecOnly)
  422. logFailSafe->RollOldLogs();
  423. }
  424. catch(IException* ex)
  425. {
  426. StringBuffer errorStr;
  427. ex->errorMessage(errorStr);
  428. IERRLOG("CheckPendingLogs: %s:" ,errorStr.str());
  429. ex->Release();
  430. }
  431. catch(...)
  432. {
  433. IERRLOG("Unknown exception thrown in CheckPendingLogs");
  434. }
  435. }
  436. //When the logData is read from a main tank file, it should be decrypted.
  437. //For non-decoupled logging agents, each logging agent may have its own tank file (with the location and
  438. //position of the main tank file). The logData from those agent tank files is not encrypted. So, it should
  439. //not be decrypted.
  440. //BTW: For non-decoupled logging agents, the logData from main tank file is read and decrypted in
  441. //checkAndReadLogRequestFromSharedTankFile().
  442. IEspUpdateLogRequestWrap* CLogThread::unserializeLogRequestContent(const char* logData, bool decompress)
  443. {
  444. if (!logData && *logData)
  445. return NULL;
  446. Owned<IPropertyTree> pLogTree = createPTreeFromXMLString(logData);
  447. if (!pLogTree)
  448. return NULL;
  449. const char* guid = pLogTree->queryProp("GUID");
  450. const char* opt = pLogTree->queryProp("Option");
  451. const char* logRequest = pLogTree->queryProp("LogRequest");
  452. if (!logRequest || !*logRequest)
  453. return NULL;
  454. StringBuffer decoded;
  455. JBASE64_Decode(logRequest, decoded);
  456. if (!decompress)
  457. return new CUpdateLogRequestWrap(guid, opt, decoded.str());
  458. StringBuffer req;
  459. LZWExpand(decoded, decoded.length(), req);
  460. return new CUpdateLogRequestWrap(guid, opt, req.str());
  461. };
  462. void CLogThread::writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite)
  463. {
  464. if (jobToWrite)
  465. {
  466. unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
  467. CriticalBlock b(logQueueCrit);
  468. ESPLOG(LogNormal, "LThread:waitWQ: %dms\n", msTick() - startTime);
  469. int QueueSize = logQueue.ordinality();
  470. if(QueueSize > maxLogQueueLength)
  471. OERRLOG("LOGGING QUEUE SIZE %d EXCEEDED MaxLogQueueLength %d, check the logging server.",QueueSize, maxLogQueueLength);
  472. if(QueueSize!=0 && QueueSize % signalGrowingQueueAt == 0)
  473. OERRLOG("Logging Queue at %d records. Check the logging server.",QueueSize);
  474. logQueue.enqueue(LINK(jobToWrite));
  475. }
  476. }
  477. IEspUpdateLogRequestWrap* CLogThread::readJobQueue()
  478. {
  479. #define LOG_LEVEL LogNormal
  480. unsigned startTime = (getEspLogLevel()>=LOG_LEVEL) ? msTick() : 0;
  481. CriticalBlock b(logQueueCrit);
  482. unsigned delta = (getEspLogLevel()>=LOG_LEVEL) ? msTick() - startTime : 0;
  483. if (delta > 1) // <=1ms is not indicative of an unexpected delay
  484. ESPLOG(LOG_LEVEL, "LThread:waitRQ: %dms", delta);
  485. return (IEspUpdateLogRequestWrap*)logQueue.dequeue();
  486. #undef LOG_LEVEL
  487. }
  488. void CLogThread::checkAndCreateFile(const char* fileName)
  489. {
  490. Owned<IFile> file = createIFile(fileName);
  491. if(file->isFile() != fileBool::notFound)
  492. return;
  493. StringBuffer dir;
  494. splitFilename(fileName, &dir, &dir, nullptr, nullptr);
  495. recursiveCreateDirectory(dir);
  496. Owned<IFileIO> io = file->openShared(IFOcreate, IFSHfull);
  497. PROGLOG("CLogThread::checkAndCreateFile: %s is created.", fileName);
  498. }
  499. CLogRequestReader::~CLogRequestReader()
  500. {
  501. stopping = true;
  502. sem.signal();
  503. threaded.join();
  504. }
  505. void CLogRequestReader::threadmain()
  506. {
  507. PROGLOG("LogRequest Reader Thread started.");
  508. readAcked(settings->ackedFileList, ackedLogFileCheckList);
  509. readAcked(settings->ackedLogRequestFile, ackedLogRequests);
  510. unsigned waitMillSeconds = 1000*settings->waitSeconds;
  511. while (!stopping)
  512. {
  513. ESPLOG(LogMax, "#### CLogRequestReader: the loop for reading log requests begins.");
  514. if (!paused)
  515. {
  516. try
  517. {
  518. ESPLOG(LogMax, "#### CLogRequestReader: waiting for readLogRequest().");
  519. CriticalBlock b(crit);
  520. readLogRequest();
  521. if (newAckedLogFiles.length())
  522. {
  523. updateAckedFileList();
  524. updateAckedLogRequestList();
  525. }
  526. PROGLOG("CLogRequestReader: finished the loop for reading log requests.");
  527. }
  528. catch(IException *e)
  529. {
  530. StringBuffer msg;
  531. IERRLOG("Exception %d:%s in CLogRequestReader::threadmain()", e->errorCode(), e->errorMessage(msg).str());
  532. e->Release();
  533. }
  534. catch(...)
  535. {
  536. IERRLOG("Unknown exception in CLogRequestReader::threadmain()");
  537. }
  538. }
  539. sem.wait(waitMillSeconds);
  540. }
  541. PROGLOG("LogRequest Reader Thread terminated.");
  542. }
  543. void CLogRequestReader::reportAckedLogFiles(StringArray& ackedLogFiles)
  544. {
  545. CriticalBlock b(crit);
  546. for (auto r : ackedLogFileCheckList)
  547. ackedLogFiles.append(r.c_str());
  548. ESPLOG(LogMax, "#### The reportAckedLogFiles() done.");
  549. }
  550. //This method is used to setup an acked file list which contains the acked files for all agents.
  551. //The first agent reports all the acked files in that agent using the reportAckedLogFiles().
  552. //Using this method, the list of the acked files is given to the rest agents. If any file inside
  553. //the list has not been acked in the rest agents, the file should be removed from the list.
  554. void CLogRequestReader::removeUnknownAckedLogFiles(StringArray& ackedLogFiles)
  555. {
  556. CriticalBlock b(crit);
  557. ForEachItemInRev(i, ackedLogFiles)
  558. {
  559. const char* node = ackedLogFiles.item(i);
  560. if (ackedLogFileCheckList.find(node) == ackedLogFileCheckList.end())
  561. ackedLogFiles.remove(i);
  562. }
  563. ESPLOG(LogMax, "#### The removeUnknownAckedLogFiles() done.");
  564. }
  565. void CLogRequestReader::addNewAckedFileList(const char* list, StringArray& fileNames)
  566. {
  567. OwnedIFile newList = createIFile(list);
  568. if (!newList)
  569. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to access %s", list);
  570. if (newList->exists())
  571. {
  572. if (!newList->remove())
  573. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to remove old %s", list);
  574. }
  575. OwnedIFileIO newListIO = newList->open(IFOwrite);
  576. if (!newListIO)
  577. throw makeStringExceptionV(EspLoggingErrors::UpdateLogFailed, "Failed to open %s", list);
  578. offset_t pos = 0;
  579. ForEachItemIn(i, fileNames)
  580. {
  581. const char* fileName = fileNames.item(i);
  582. StringBuffer line(fileName);
  583. line.append("\r\n");
  584. unsigned len = line.length();
  585. newListIO->write(pos, len, line.str());
  586. pos += len;
  587. PROGLOG("Add AckedLogFile %s to %s", fileName, list);
  588. }
  589. }
  590. //The file names in the fileNames should be removed from both ackedLogFileCheckList
  591. //and settings->ackedFileList.
  592. void CLogRequestReader::cleanAckedLogFiles(StringArray& fileNames)
  593. {
  594. CriticalBlock b(crit);
  595. //Find which file should not be removed from ackedLogFileCheckList.
  596. StringArray fileNamesToKeep;
  597. for (auto r : ackedLogFileCheckList)
  598. {
  599. if (!fileNames.contains(r.c_str()))
  600. fileNamesToKeep.append(r.c_str());
  601. }
  602. //Create a temp file with the fileNamesToKeep for replacing the settings->ackedFileList
  603. VStringBuffer tempFileName("%s.tmp", settings->ackedFileList.str());
  604. addNewAckedFileList(tempFileName, fileNamesToKeep);
  605. //Replace the settings->ackedFileList with the temp file
  606. renameFile(settings->ackedFileList, tempFileName, true);
  607. PROGLOG("Rename %s to %s", tempFileName.str(), settings->ackedFileList.str());
  608. //Create new ackedLogFileCheckList based on fileNamesToKeep
  609. ackedLogFileCheckList.clear();
  610. ForEachItemIn(j, fileNamesToKeep)
  611. {
  612. const char* name = fileNamesToKeep.item(j);
  613. ackedLogFileCheckList.insert(name);
  614. PROGLOG("Add %s to new ackedLogFileCheckList", name);
  615. }
  616. }
  617. void CLogRequestReader::readAcked(const char* fileName, std::set<std::string>& acked)
  618. {
  619. Owned<IFile> f = createIFile(fileName);
  620. if (f)
  621. {
  622. OwnedIFileIO io = f->openShared(IFOread, IFSHfull);
  623. if (io)
  624. {
  625. StringBuffer line;
  626. OwnedIFileIOStream ios = createIOStream(io);
  627. Owned<IStreamLineReader> lineReader = createLineReader(ios, true);
  628. while(!lineReader->readLine(line.clear()))
  629. {
  630. if (line.isEmpty())
  631. continue;
  632. unsigned len = line.length();
  633. if ((len > 1) && (line.charAt(len - 2) == '\r') && (line.charAt(len - 1) == '\n'))
  634. line.setLength(len - 2); //remove \r\n
  635. else if ((len > 0) && ((line.charAt(len - 1) == '\r') || (line.charAt(len - 1) == '\n')))
  636. line.setLength(len - 1); //remove \r or \n
  637. if (line.length() > 0) //Check just in case
  638. acked.insert(line.str());
  639. PROGLOG("Found Acked %s from %s", line.str(), fileName);
  640. }
  641. }
  642. }
  643. else
  644. {
  645. f.setown(createIFile(fileName));
  646. Owned<IFileIO> io = f->open(IFOcreate);
  647. }
  648. }
  649. void CLogRequestReader::readLogRequest()
  650. {
  651. ESPLOG(LogMax, "#### Enter readLogRequest()");
  652. StringAttr tankFileNotFinished;//Today's newest tank file.
  653. findTankFileNotFinished(tankFileNotFinished);
  654. offset_t tankFileNotFinishedPos = 0;
  655. Owned<IDirectoryIterator> it = createDirectoryIterator(settings->tankFileDir.get(), "*.log");
  656. ForEach (*it)
  657. {
  658. const char *fileNameWithPath = it->query().queryFilename();
  659. const char *fileName = pathTail(fileNameWithPath);
  660. if (ackedLogFileCheckList.find(fileName) != ackedLogFileCheckList.end())
  661. {
  662. ESPLOG(LogMax, "####Skip tank file: %s. It is in the Acked File list.", fileName);
  663. continue;
  664. }
  665. if (readLogRequestsFromTankFile(fileNameWithPath, tankFileNotFinished, tankFileNotFinishedPos))
  666. addToAckedLogFileList(fileName, fileNameWithPath);
  667. }
  668. addPendingLogsToQueue();
  669. if (tankFileNotFinishedPos)
  670. {//In the next loop, we may skip the log requests which have been read in this loop.
  671. lastTankFile = tankFileNotFinished;
  672. lastTankFilePos = tankFileNotFinishedPos;
  673. }
  674. ESPLOG(LogMax, "#### Leave readLogRequest()");
  675. }
  676. void CLogRequestReader::findTankFileNotFinished(StringAttr& tankFileNotFinished)
  677. {
  678. StringBuffer todayString;
  679. unsigned year, month, day;
  680. CDateTime now;
  681. now.setNow();
  682. now.getDate(year, month, day, true);
  683. todayString.appendf("%04d_%02d_%02d_00_00_00", year, month, day);
  684. StringBuffer lastTimeString;
  685. Owned<IDirectoryIterator> it = createDirectoryIterator(settings->tankFileDir.get(), "*");
  686. ForEach (*it)
  687. {
  688. StringBuffer timeString;
  689. const char* aFileNameWithPath = it->query().queryFilename();
  690. const char* aFileName = pathTail(aFileNameWithPath);
  691. getTankFileTimeString(aFileName, timeString);
  692. if (timeString.isEmpty())
  693. {
  694. IERRLOG("Failed to parse tank file name: %s", aFileName);
  695. continue;
  696. }
  697. if (strcmp(todayString, timeString) > 0) //Not created today
  698. continue;
  699. if (strcmp(timeString, lastTimeString) > 0) //a newer file is found.
  700. {
  701. lastTimeString.set(timeString);
  702. tankFileNotFinished.set(aFileNameWithPath);
  703. }
  704. }
  705. }
  706. StringBuffer& CLogRequestReader::getTankFileTimeString(const char* fileName, StringBuffer& timeString)
  707. {
  708. const char* ptr = strstr(fileName, sendLogKeyword);
  709. if (!ptr)
  710. return timeString;
  711. ptr += sendLogKeywordLen;
  712. if (!ptr)
  713. return timeString;
  714. ptr = strchr(ptr, '.');
  715. if (ptr && (strlen(ptr) > dateTimeStringLength))
  716. timeString.append(dateTimeStringLength, ++ptr); //yyyy_mm_dd_hh_mm_ss
  717. return timeString;
  718. }
  719. bool CLogRequestReader::readLogRequestsFromTankFile(const char* fileName, StringAttr& tankFileNotFinished, offset_t& tankFileNotFinishedPos)
  720. {
  721. ESPLOG(LogMax, "#### Enter readLogRequestsFromTankFile(): %s", fileName);
  722. Owned<IFile> file = createIFile(fileName);
  723. if (!file) //This can only happen at start time. So, throw exception.
  724. throw MakeStringException(-1, "Unable to find logging file %s", fileName);
  725. Owned<IFileIO> fileIO = file->open(IFOread);
  726. if (!fileIO)
  727. throw MakeStringException(-1, "Unable to open logging file %s", fileName);
  728. //Sample: 00009902 0421311217.2019_03_29_14_32_11 <cache><GUID>0421311217.2019_03_29_14_32_11</GUID>
  729. //<option>SingleInsert</option><LogRequest>dUgAAH...AA==</LogRequest></cache>
  730. offset_t finger = getReadFilePos(fileName);
  731. unsigned totalMissed = 0;
  732. while(true)
  733. {
  734. MemoryBuffer data;
  735. CLogSerializer logSerializer;
  736. if (!logSerializer.readALogLine(fileIO, finger, data))
  737. break;
  738. StringBuffer GUID, logRequest;
  739. if (!parseLogRequest(data, GUID, logRequest))
  740. IERRLOG("Invalid logging request in %s", fileName);
  741. else if (ackedLogRequests.find(GUID.str()) == ackedLogRequests.end())
  742. {//This QUID is not acked.
  743. totalMissed++;
  744. if (pendingLogGUIDs.find(GUID.str()) == pendingLogGUIDs.end())
  745. {//This QUID has not been queued yet.
  746. PROGLOG("Found new log request %s from tank file %s. Added to pending logs.", GUID.str(), fileName);
  747. pendingLogGUIDs.insert(GUID.str());
  748. pendingLogs[GUID.str()] = logRequest.str();
  749. if (pendingLogs.size() > settings->pendingLogBufferSize)
  750. addPendingLogsToQueue();
  751. }
  752. }
  753. }
  754. bool isTankFileNotFinished = !isEmptyString(tankFileNotFinished) && strieq(fileName, tankFileNotFinished);
  755. if (isTankFileNotFinished)
  756. tankFileNotFinishedPos = fileIO->size();
  757. ESPLOG(LogMax, "#### Leave readLogRequestsFromTankFile(): %s, totalMissed(%d)", fileName, totalMissed);
  758. return (totalMissed == 0) && !isTankFileNotFinished;
  759. }
  760. offset_t CLogRequestReader::getReadFilePos(const char* fileName)
  761. {
  762. const char* lastTankFileName = lastTankFile.get();
  763. if (lastTankFileName && strieq(lastTankFileName, fileName))
  764. return lastTankFilePos;
  765. return 0;
  766. }
  767. bool CLogRequestReader::parseLogRequest(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& logLine)
  768. {
  769. //The rawdata should be in the form of 2635473460.05_01_12_16_13_57\t<cache>...</cache>
  770. //parse it into GUID and logLine (as <cache>...</cache>)
  771. const char* begin = rawdata.toByteArray(); //no string termination character \0
  772. unsigned len = rawdata.length();
  773. if (!begin || (len == 0))
  774. return false;
  775. const char* ptr = begin;
  776. const char* end = begin + len;
  777. while ((ptr < end) && (*ptr != '\t'))
  778. ptr++;
  779. if ((ptr == end) || (ptr == begin))
  780. return false;
  781. GUID.append(ptr - begin, begin);
  782. if (++ptr == end)
  783. return false;
  784. logLine.append(end - ptr, ptr);
  785. return true;
  786. }
  787. void CLogRequestReader::addPendingLogsToQueue()
  788. {
  789. ESPLOG(LogMax, "#### Enter addPendingLogsToQueue()");
  790. //Add the pendingLogs to log queue
  791. if (pendingLogs.size())
  792. ESPLOG(LogMin, "Adding %zu Pending Log Request(s) to job queue", pendingLogs.size());
  793. StringArray queuedPendingLogs;
  794. for (auto const& x : pendingLogs)
  795. {
  796. Owned<IEspUpdateLogRequestWrap> logRequest = logThread->unserializeLogRequestContent(x.second.c_str(), true);
  797. if (!logRequest)
  798. IERRLOG("addPendingLogsToQueue: failed to unserialize: %s", x.second.c_str());
  799. logThread->queueLog(logRequest);
  800. queuedPendingLogs.append(x.first.c_str());
  801. PROGLOG("Enqueue: %s", x.first.c_str());
  802. }
  803. //Clean the pendingLogs
  804. ForEachItemIn(i, queuedPendingLogs)
  805. pendingLogs.erase(queuedPendingLogs.item(i));
  806. ESPLOG(LogMax, "#### Leave addPendingLogsToQueue()");
  807. }
  808. void CLogRequestReader::addACK(const char* GUID)
  809. {
  810. ESPLOG(LogMax, "#### Enter addACK(): %s", GUID);
  811. CriticalBlock b(crit);
  812. Owned<IFile> f = createIFile(settings->ackedLogRequestFile);
  813. Owned<IFileIO> io = f->open(IFOwrite);
  814. StringBuffer toWrite,size;
  815. toWrite.appendf("%s\r\n", GUID);
  816. io->write(io->size(), toWrite.length(), toWrite.str());
  817. ackedLogRequests.insert(GUID);
  818. pendingLogGUIDs.erase(GUID);
  819. ESPLOG(LogMax, "#### addACK(): %s acked", GUID);
  820. }
  821. void CLogRequestReader::addToAckedLogFileList(const char* fileName, const char* fileNameWithPath)
  822. {
  823. PROGLOG("Found an AckedLogFile %s.", fileNameWithPath);
  824. newAckedLogFiles.append(fileNameWithPath);
  825. ackedLogFileCheckList.insert(fileName);
  826. }
  827. //Update newAckedLogFiles to file settings->ackedFileList,
  828. void CLogRequestReader::updateAckedFileList()
  829. {
  830. ESPLOG(LogMax, "#### Enter updateAckedFileList()");
  831. OwnedIFile ackedFiles = createIFile(settings->ackedFileList);
  832. if (!ackedFiles)
  833. return; //Should never happen
  834. OwnedIFileIO ackedFilesIO = ackedFiles->open(IFOwrite);
  835. if (!ackedFilesIO)
  836. return; //Should never happen
  837. offset_t pos = ackedFilesIO->size();
  838. ForEachItemIn(i, newAckedLogFiles)
  839. {
  840. const char* fileNameWithPath = newAckedLogFiles.item(i);
  841. StringBuffer fileName(pathTail(fileNameWithPath));
  842. PROGLOG("Add AckedLogFile %s to %s", fileName.str(), settings->ackedFileList.str());
  843. //Remove log request from the ackedLogRequests
  844. GuidSet logRequestsToRemove;
  845. CLogSerializer ackedLog(fileNameWithPath);
  846. ackedLog.loadAckedLogs(logRequestsToRemove);
  847. for (auto r : logRequestsToRemove)
  848. ackedLogRequests.erase(r.c_str());
  849. fileName.append("\r\n");
  850. unsigned len = strlen(fileName);
  851. ackedFilesIO->write(pos, len, fileName);
  852. pos += len;
  853. }
  854. newAckedLogFiles.clear();
  855. ESPLOG(LogMax, "#### Leave updateAckedFileList()");
  856. }
  857. void CLogRequestReader::updateAckedLogRequestList()
  858. {
  859. ESPLOG(LogMax, "#### Enter updateAckedLogRequestList()");
  860. OwnedIFile newAckedLogRequestFile = createIFile(settings->ackedLogRequestFile);
  861. if (newAckedLogRequestFile)
  862. {
  863. newAckedLogRequestFile->remove();
  864. PROGLOG("Clean %s", settings->ackedLogRequestFile.str());
  865. }
  866. OwnedIFileIO newAckedLogRequestFileIO = newAckedLogRequestFile->open(IFOwrite);
  867. if (!newAckedLogRequestFileIO)
  868. return; //Should never happen
  869. offset_t pos = 0;
  870. for (auto r : ackedLogRequests)
  871. {
  872. StringBuffer line(r.c_str());
  873. line.append("\r\n");
  874. unsigned len = line.length();
  875. newAckedLogRequestFileIO->write(pos, len, line.str());
  876. pos += len;
  877. PROGLOG("Add AckedLogRequest %s to %s", line.str(), settings->ackedLogRequestFile.str());
  878. }
  879. ESPLOG(LogMax, "#### Leave updateAckedLogRequestList()");
  880. }