LogFailSafe.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning(disable:4786)
  14. /* Review notes: this file is modified based on:
  15. * HPCC-Platform/esp/clients/LoggingClient/LogFailSafe.cpp
  16. 3 methods are added to ILogFailSafe: Add(), RolloverAllLogs(), PopPendingLogRecord().
  17. 1 extern function is added: createFailSafeLogger()
  18. 3 member variables are added to CLogFailSafe: m_LogService, m_critSec, m_PendingLogs
  19. 5 methods and one constructor are added to CLogFailSafe: Add(), RolloverAllLogs(), PopPendingLogRecord(),
  20. loadPendingLogReqsFromExistingLogFiles(), generateNewFileNames()
  21. Minor changes are made in the existing methods.
  22. */
  23. #include "LogFailSafe.hpp"
  24. #include "jmisc.hpp"
  25. #include "soapbind.hpp"
  26. #define RECEIVING "_acked_"
  27. #define SENDING "_sending_"
  28. const unsigned int TRACE_PENDING_LOGS_MIN = 10;
  29. const unsigned int TRACE_PENDING_LOGS_MAX = 50;
  30. extern LOGGINGCOMMON_API ILogFailSafe* createFailSafeLogger(IPropertyTree* cfg, const char* logType)
  31. {
  32. return new CLogFailSafe(cfg, logType);
  33. }
  34. extern LOGGINGCOMMON_API ILogFailSafe* createFailSafeLogger(IPropertyTree* cfg, const char* pszService, const char* logType)
  35. {
  36. return new CLogFailSafe(cfg, pszService, logType);
  37. }
  38. CLogFailSafe::CLogFailSafe()
  39. {
  40. }
  41. CLogFailSafe::CLogFailSafe(IPropertyTree* cfg, const char* logType) : m_LogType(logType)
  42. {
  43. readCfg(cfg);
  44. loadFailed(logType);
  45. createNew(logType);
  46. }
  47. CLogFailSafe::CLogFailSafe(IPropertyTree* cfg, const char* pszService, const char* logType)
  48. : m_LogService(pszService), m_LogType(logType)
  49. {
  50. readCfg(cfg);
  51. loadPendingLogReqsFromExistingLogFiles();
  52. StringBuffer send, receive;
  53. generateNewFileNames(send, receive);
  54. m_Added.Open(m_logsdir.str(), send.str(), NULL);
  55. m_Cleared.Open(m_logsdir.str(), receive.str(), NULL);
  56. }
  57. CLogFailSafe::~CLogFailSafe()
  58. {
  59. ESPLOG(LogMax, "CLogFailSafe::~CLogFailSafe()");
  60. m_Added.Close();
  61. m_Cleared.Close();
  62. }
  63. void CLogFailSafe::readCfg(IPropertyTree* cfg)
  64. {
  65. StringBuffer safeRolloverThreshold(cfg->queryProp(PropSafeRolloverThreshold));
  66. if (!safeRolloverThreshold.isEmpty())
  67. readSafeRolloverThresholdCfg(safeRolloverThreshold);
  68. const char* logsDir = cfg->queryProp(PropFailSafeLogsDir);
  69. if (!isEmptyString(logsDir))
  70. m_logsdir.set(logsDir);
  71. else
  72. m_logsdir.set(DefaultFailSafeLogsDir);
  73. }
  74. void CLogFailSafe::readSafeRolloverThresholdCfg(StringBuffer& safeRolloverThreshold)
  75. {
  76. safeRolloverThreshold.trim();
  77. const char* ptrStart = safeRolloverThreshold.str();
  78. const char* ptrAfterDigit = ptrStart;
  79. while (*ptrAfterDigit && isdigit(*ptrAfterDigit))
  80. ptrAfterDigit++;
  81. if (!*ptrAfterDigit)
  82. {
  83. safeRolloverReqThreshold = atol(safeRolloverThreshold.str());
  84. return;
  85. }
  86. const char* ptr = ptrAfterDigit;
  87. while (*ptr && (ptr[0] == ' '))
  88. ptr++;
  89. char c = ptr[0];
  90. safeRolloverThreshold.setLength(ptrAfterDigit - ptrStart);
  91. safeRolloverSizeThreshold = atol(safeRolloverThreshold.str());
  92. switch (c)
  93. {
  94. case 'k':
  95. case 'K':
  96. safeRolloverSizeThreshold *= 1000;
  97. break;
  98. case 'm':
  99. case 'M':
  100. safeRolloverSizeThreshold *= 1000000;
  101. break;
  102. case 'g':
  103. case 'G':
  104. safeRolloverSizeThreshold *= 1000000000;
  105. break;
  106. case 't':
  107. case 'T':
  108. safeRolloverSizeThreshold *= 1000000000000;
  109. break;
  110. default:
  111. break;
  112. }
  113. }
  114. bool CLogFailSafe::FindOldLogs()
  115. {
  116. if(m_UnsentLogs.ordinality())
  117. return true;
  118. return false;
  119. }
  120. void CLogFailSafe::LoadOldLogs(StringArray& oldLogData)
  121. {
  122. ForEachItemIn(aidx, m_UnsentLogs)
  123. {
  124. oldLogData.append(m_UnsentLogs.item(aidx));
  125. }
  126. }
  127. void CLogFailSafe::loadPendingLogReqsFromExistingLogFiles()
  128. {
  129. //Read acked LogReqs from all of ack files.
  130. //Filter out those acked LogReqs from all of sending files.
  131. //Non-acked LogReqs are added into m_PendingLogs.
  132. //The LogReqs in the m_PendingLogs will be added to queue and
  133. //new tank files through the enqueue() in the checkPendingLogs().
  134. //After that, the oldLogs will be renamed to .old file.
  135. GuidSet ackedSet;
  136. VStringBuffer fileName("%s%s%s*%s", m_LogService.str(), m_LogType.str(), RECEIVING, logFileExt);
  137. Owned<IDirectoryIterator> it = createDirectoryIterator(m_logsdir.str(), fileName.str());
  138. ForEach (*it)
  139. {
  140. IFile &file = it->query();
  141. CLogSerializer ackedLog(file.queryFilename());
  142. ackedLog.loadAckedLogs(ackedSet);
  143. oldLogs.append(file.queryFilename());
  144. }
  145. fileName.setf("%s%s%s*%s", m_LogService.str(), m_LogType.str(), SENDING, logFileExt);
  146. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
  147. ForEach (*di)
  148. {
  149. IFile &file = di->query();
  150. oldLogs.append(file.queryFilename()); //add to the files for rollover
  151. CLogSerializer sendLog(file.queryFilename());
  152. unsigned long total_missed = 0;
  153. sendLog.loadSendLogs(ackedSet, m_PendingLogs, total_missed);
  154. }
  155. }
  156. void CLogFailSafe::generateNewFileNames(StringBuffer& sendingFile, StringBuffer& receivingFile)
  157. {
  158. StringBuffer GUID;
  159. GenerateGUID(GUID);
  160. StringBuffer tmp;
  161. tmp.append(m_LogService).append(m_LogType);
  162. sendingFile.append(tmp).append(SENDING).append(GUID).append(logFileExt);
  163. receivingFile.append(tmp).append(RECEIVING).append(GUID).append(logFileExt);
  164. }
  165. bool CLogFailSafe::PopPendingLogRecord(StringBuffer& GUID, StringBuffer& cache)
  166. {
  167. CriticalBlock b(m_critSec);
  168. GuidMap::iterator it = m_PendingLogs.begin();
  169. if (it == m_PendingLogs.end())
  170. return false;
  171. GUID.clear().append( (*it).first.c_str() );
  172. cache.clear().append( (*it).second.c_str() );
  173. m_PendingLogs.erase(it);
  174. unsigned int nPendingLogs = m_PendingLogs.size();
  175. if (nPendingLogs && (nPendingLogs < TRACE_PENDING_LOGS_MIN || (nPendingLogs % TRACE_PENDING_LOGS_MAX == 0)))
  176. ESPLOG(LogNormal, "%u logs pending", nPendingLogs);
  177. return true;
  178. }
  179. void CLogFailSafe::createNew(const char* logType)
  180. {
  181. StringBuffer UniqueID;
  182. GenerateGUID(UniqueID);
  183. UniqueID.append(logFileExt);
  184. StringBuffer send(logType),recieve(logType);
  185. send.append("_sending");
  186. recieve.append("_recieving");
  187. m_Added.Open(m_logsdir.str(),UniqueID,send.str());
  188. m_Cleared.Open(m_logsdir.str(),UniqueID,recieve.str());
  189. }
  190. void CLogFailSafe::loadFailed(const char* logType)
  191. {
  192. StringBuffer fileName;
  193. fileName.appendf("%s_sending*%s", logType, logFileExt);
  194. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
  195. ForEach (*di)
  196. {
  197. IFile &file = di->query();
  198. StringBuffer recieveName;
  199. GuidMap recieve_map;
  200. getReceiveFileName(file.queryFilename(),recieveName);
  201. CRecieveLogSerializer recieveLog(recieveName.str());
  202. recieveLog.LoadDataMap(recieve_map);
  203. CSendLogSerializer sendLog(file.queryFilename());
  204. sendLog.LoadDataMap(recieve_map,m_UnsentLogs);
  205. }
  206. }
  207. StringBuffer& CLogFailSafe::getReceiveFileName(const char* sendFileName, StringBuffer& receiveName)
  208. {
  209. if (sendFileName)
  210. {
  211. receiveName.append(sendFileName);
  212. receiveName.replaceString(SENDING, RECEIVING);
  213. }
  214. return receiveName;
  215. }
  216. StringBuffer& CLogFailSafe::GenerateGUID(StringBuffer& GUID, const char* seed)
  217. {
  218. GUID.appendf("%u",getRandom());
  219. while (GUID.length() < 10)
  220. GUID.insert(0,'0');
  221. addFileTimestamp(GUID);
  222. if(seed!=NULL && *seed!='\0')
  223. GUID.appendf(".%s",seed);
  224. return GUID;
  225. }
  226. void CLogFailSafe::SplitLogRecord(const char* requestStr,StringBuffer& GUID, StringBuffer& Cache)
  227. {
  228. SplitRecord(requestStr,GUID,Cache);
  229. }
  230. void CLogFailSafe::Add(const char* GUID,IInterface& pIn, CLogRequestInFile* reqInFile)
  231. {
  232. CSoapRequestBinding* reqObj = dynamic_cast<CSoapRequestBinding*>(&pIn);
  233. if (reqObj == 0)
  234. throw MakeStringException(-1, "Unable to cast interface to SoapBindind");
  235. StringBuffer dataStr;
  236. reqObj->serializeContent(NULL,dataStr,NULL);
  237. Add(GUID, dataStr, reqInFile);
  238. }
  239. void CLogFailSafe::Add(const char* GUID, const char *strContents, CLogRequestInFile* reqInFile)
  240. {
  241. VStringBuffer dataStr("<cache>%s</cache>", strContents);
  242. if (safeRolloverSizeThreshold <= 0)
  243. {
  244. unsigned long item_count = m_Added.getItemCount();
  245. if (item_count > safeRolloverReqThreshold)
  246. SafeRollover();
  247. }
  248. else
  249. {
  250. unsigned long fileSize = m_Added.getFileSize();
  251. if (fileSize > safeRolloverSizeThreshold)
  252. SafeRollover();
  253. }
  254. m_Added.Append(GUID, dataStr.str(), reqInFile);
  255. }
  256. void CLogFailSafe::AddACK(const char* GUID)
  257. {
  258. m_Cleared.Append(GUID, "", nullptr);
  259. CriticalBlock b(m_critSec);
  260. GuidMap::iterator it = m_PendingLogs.find(GUID);
  261. if (it != m_PendingLogs.end())
  262. m_PendingLogs.erase(it);
  263. }
  264. void CLogFailSafe::RollCurrentLog()
  265. {
  266. m_Added.Rollover(rolloverFileExt);
  267. m_Cleared.Rollover(rolloverFileExt);
  268. }
  269. void CLogFailSafe::SafeRollover()
  270. {
  271. StringBuffer send, receive;
  272. generateNewFileNames(send, receive);
  273. // Rolling over m_Added first is desirable here beccause requests being written to the new tank file before
  274. // m_Cleared finishes rolling all haven't been sent yet (because the sending thread is here busy rolling).
  275. m_Added.SafeRollover (m_logsdir.str(), send.str(), nullptr, rolloverFileExt);
  276. m_Cleared.SafeRollover(m_logsdir.str(), receive.str(), nullptr, rolloverFileExt);
  277. }
  278. //Only rollover the files inside the oldLogs.
  279. //This is called only when a log agent is starting.
  280. void CLogFailSafe::RollOldLogs()
  281. {
  282. ForEachItemIn(i, oldLogs)
  283. {
  284. StringBuffer fileName(oldLogs.item(i));
  285. Owned<IFile> file = createIFile(fileName);
  286. fileName.replaceString(logFileExt, rolloverFileExt);
  287. file->rename(fileName.str());
  288. }
  289. oldLogs.kill();
  290. }
  291. //Rename existing .log files (except for current added/cleared log files) to .old files
  292. void CLogFailSafe::RolloverAllLogs()
  293. {
  294. VStringBuffer filesToFind("%s%s*%s", m_LogService.str(), m_LogType.str(), logFileExt);
  295. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
  296. ForEach (*di)
  297. {
  298. IFile &file = di->query();
  299. StringBuffer fileName;
  300. CLogSerializer::extractFileName(file.queryFilename(), fileName);
  301. if (fileName.length() && !streq(fileName.str(), m_Added.queryFileName()) &&
  302. !streq(fileName.str(), m_Cleared.queryFileName()))
  303. {
  304. fileName.replaceString(logFileExt, rolloverFileExt);
  305. file.rename(fileName.str());
  306. }
  307. }
  308. }
  309. StringBuffer& CLogFailSafe::ExtractFileName(const char* fileName,StringBuffer& FileName)
  310. {
  311. StringBuffer tmp(fileName);
  312. for(int i = tmp.length() - 1; i >=0; i--)
  313. {
  314. if(tmp.charAt(i) == '\\' || tmp.charAt(i) == '/')
  315. break;
  316. FileName.insert(0,tmp.charAt(i));
  317. }
  318. return FileName;
  319. }