LogFailSafe.cpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning(disable:4786)
  14. /* Review notes: this file is modified based on:
  15. * HPCC-Platform/esp/clients/LoggingClient/LogFailSafe.cpp
  16. 3 methods are added to ILogFailSafe: Add(), RolloverAllLogs(), PopPendingLogRecord().
  17. 1 extern function is added: createFailSafeLogger()
  18. 3 member variables are added to CLogFailSafe: m_LogService, m_critSec, m_PendingLogs
  19. 5 methods and one constructor are added to CLogFailSafe: Add(), RolloverAllLogs(), PopPendingLogRecord(),
  20. loadPendingLogReqsFromExistingLogFiles(), generateNewFileNames()
  21. Minor changes are made in the existing methods.
  22. */
  23. #include "LogFailSafe.hpp"
  24. #include "jmisc.hpp"
  25. #include "soapbind.hpp"
  26. #define RECEIVING "_acked_"
  27. #define SENDING "_sending_"
  28. const char* const RolloverExt=".old";
  29. const unsigned long SAFE_ROLLOVER_THRESHOLD = 500000L;
  30. const unsigned int TRACE_PENDING_LOGS_MIN = 10;
  31. const unsigned int TRACE_PENDING_LOGS_MAX = 50;
  32. extern LOGGINGCOMMON_API ILogFailSafe* createFailSafeLogger(const char* logType, const char* logsdir)
  33. {
  34. return new CLogFailSafe(logType, logsdir);
  35. }
  36. extern LOGGINGCOMMON_API ILogFailSafe* createFailSafeLogger(const char* pszService, const char* logType, const char* logsdir)
  37. {
  38. return new CLogFailSafe(pszService, logType, logsdir && *logsdir ? logsdir : "./FailSafeLogs");
  39. }
  40. CLogFailSafe::CLogFailSafe()
  41. {
  42. }
  43. CLogFailSafe::CLogFailSafe(const char* logType, const char* logsdir) : m_LogType(logType), m_logsdir(logsdir)
  44. {
  45. loadFailed(logType);
  46. createNew(logType);
  47. }
  48. CLogFailSafe::CLogFailSafe(const char* pszService, const char* logType, const char* logsdir)
  49. : m_LogService(pszService), m_LogType(logType), m_logsdir(logsdir)
  50. {
  51. loadPendingLogReqsFromExistingLogFiles();
  52. StringBuffer send, receive;
  53. generateNewFileNames(send, receive);
  54. m_Added.Open(m_logsdir.str(), send.str(), NULL);
  55. m_Cleared.Open(m_logsdir.str(), receive.str(), NULL);
  56. }
  57. CLogFailSafe::~CLogFailSafe()
  58. {
  59. ESPLOG(LogMax, "CLogFailSafe::~CLogFailSafe()");
  60. m_Added.Close();
  61. m_Cleared.Close();
  62. }
  63. bool CLogFailSafe::FindOldLogs()
  64. {
  65. if(m_UnsentLogs.ordinality())
  66. return true;
  67. return false;
  68. }
  69. void CLogFailSafe::LoadOldLogs(StringArray& oldLogData)
  70. {
  71. ForEachItemIn(aidx, m_UnsentLogs)
  72. {
  73. oldLogData.append(m_UnsentLogs.item(aidx));
  74. }
  75. }
  76. void CLogFailSafe::loadPendingLogReqsFromExistingLogFiles()
  77. {
  78. VStringBuffer fileName("%s%s%s*.log", m_LogService.str(), m_LogType.str(), SENDING);
  79. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
  80. ForEach (*di)
  81. {
  82. IFile &file = di->query();
  83. StringBuffer ackedName;
  84. GuidSet ackedSet;
  85. getReceiveFileName(file.queryFilename(),ackedName);
  86. CLogSerializer ackedLog(ackedName.str());
  87. ackedLog.loadAckedLogs(ackedSet);
  88. CLogSerializer sendLog(file.queryFilename());
  89. unsigned long total_missed = 0;
  90. {//scope needed for critical block below
  91. CriticalBlock b(m_critSec); //since we plan to use m_PendingLogs
  92. sendLog.loadSendLogs(ackedSet, m_PendingLogs, total_missed);
  93. }
  94. if (total_missed == 0)
  95. {
  96. ackedLog.Rollover(RolloverExt);
  97. sendLog.Rollover(RolloverExt);
  98. }
  99. }
  100. }
  101. void CLogFailSafe::generateNewFileNames(StringBuffer& sendingFile, StringBuffer& receivingFile)
  102. {
  103. StringBuffer GUID;
  104. GenerateGUID(GUID);
  105. StringBuffer tmp;
  106. tmp.append(m_LogService).append(m_LogType);
  107. sendingFile.append(tmp).append(SENDING).append(GUID).append(".log");
  108. receivingFile.append(tmp).append(RECEIVING).append(GUID).append(".log");
  109. }
  110. bool CLogFailSafe::PopPendingLogRecord(StringBuffer& GUID, StringBuffer& cache)
  111. {
  112. CriticalBlock b(m_critSec);
  113. GuidMap::iterator it = m_PendingLogs.begin();
  114. if (it == m_PendingLogs.end())
  115. return false;
  116. GUID.clear().append( (*it).first.c_str() );
  117. cache.clear().append( (*it).second.c_str() );
  118. m_PendingLogs.erase(it);
  119. unsigned int nPendingLogs = m_PendingLogs.size();
  120. if (nPendingLogs && (nPendingLogs < TRACE_PENDING_LOGS_MIN || (nPendingLogs % TRACE_PENDING_LOGS_MAX == 0)))
  121. ESPLOG(LogNormal, "%u logs pending", nPendingLogs);
  122. return true;
  123. }
  124. void CLogFailSafe::createNew(const char* logType)
  125. {
  126. StringBuffer UniqueID;
  127. GenerateGUID(UniqueID);
  128. UniqueID.append(".log");
  129. StringBuffer send(logType),recieve(logType);
  130. send.append("_sending");
  131. recieve.append("_recieving");
  132. m_Added.Open(m_logsdir.str(),UniqueID,send.str());
  133. m_Cleared.Open(m_logsdir.str(),UniqueID,recieve.str());
  134. }
  135. void CLogFailSafe::loadFailed(const char* logType)
  136. {
  137. StringBuffer fileName;
  138. fileName.appendf("%s_sending*.log",logType);
  139. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
  140. ForEach (*di)
  141. {
  142. IFile &file = di->query();
  143. StringBuffer recieveName;
  144. GuidMap recieve_map;
  145. getReceiveFileName(file.queryFilename(),recieveName);
  146. CRecieveLogSerializer recieveLog(recieveName.str());
  147. recieveLog.LoadDataMap(recieve_map);
  148. CSendLogSerializer sendLog(file.queryFilename());
  149. sendLog.LoadDataMap(recieve_map,m_UnsentLogs);
  150. }
  151. }
  152. StringBuffer& CLogFailSafe::getReceiveFileName(const char* sendFileName, StringBuffer& receiveName)
  153. {
  154. if (sendFileName)
  155. {
  156. receiveName.append(sendFileName);
  157. receiveName.replaceString(SENDING, RECEIVING);
  158. }
  159. return receiveName;
  160. }
  161. StringBuffer& CLogFailSafe::GenerateGUID(StringBuffer& GUID, const char* seed)
  162. {
  163. GUID.appendf("%u",getRandom());
  164. while (GUID.length() < 10)
  165. GUID.insert(0,'0');
  166. addFileTimestamp(GUID);
  167. if(seed!=NULL && *seed!='\0')
  168. GUID.appendf(".%s",seed);
  169. return GUID;
  170. }
  171. void CLogFailSafe::SplitLogRecord(const char* requestStr,StringBuffer& GUID, StringBuffer& Cache)
  172. {
  173. SplitRecord(requestStr,GUID,Cache);
  174. }
  175. void CLogFailSafe::Add(const char* GUID,IInterface& pIn)
  176. {
  177. CSoapRequestBinding* reqObj = dynamic_cast<CSoapRequestBinding*>(&pIn);
  178. if (reqObj == 0)
  179. throw MakeStringException(-1, "Unable to cast interface to SoapBindind");
  180. StringBuffer dataStr;
  181. reqObj->serializeContent(NULL,dataStr,NULL);
  182. Add(GUID, dataStr);
  183. }
  184. void CLogFailSafe::Add(const char* GUID, const StringBuffer& strContents)
  185. {
  186. VStringBuffer dataStr("<cache>%s</cache>", strContents.str());
  187. unsigned long item_count = m_Added.getItemCount();
  188. if (item_count > SAFE_ROLLOVER_THRESHOLD)
  189. SafeRollover();
  190. m_Added.Append(GUID, dataStr.str());
  191. }
  192. void CLogFailSafe::AddACK(const char* GUID)
  193. {
  194. m_Cleared.Append(GUID, "");
  195. CriticalBlock b(m_critSec);
  196. GuidMap::iterator it = m_PendingLogs.find(GUID);
  197. if (it != m_PendingLogs.end())
  198. m_PendingLogs.erase(it);
  199. }
  200. void CLogFailSafe::RollCurrentLog()
  201. {
  202. m_Added.Rollover(RolloverExt);
  203. m_Cleared.Rollover(RolloverExt);
  204. }
  205. void CLogFailSafe::SafeRollover()
  206. {
  207. StringBuffer send, receive;
  208. generateNewFileNames(send, receive);
  209. // Rolling over m_Added first is desirable here beccause requests being written to the new tank file before
  210. // m_Cleared finishes rolling all haven't been sent yet (because the sending thread is here busy rolling).
  211. m_Added.SafeRollover (m_logsdir.str(), send.str(), NULL, RolloverExt);
  212. m_Cleared.SafeRollover(m_logsdir.str(), receive.str(), NULL, RolloverExt);
  213. }
  214. void CLogFailSafe::RollOldLogs()
  215. {
  216. StringBuffer filesToFind;
  217. filesToFind.appendf("%s*.log",m_LogType.str());
  218. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
  219. ForEach (*di)
  220. {
  221. IFile &file = di->query();
  222. StringBuffer fileName;
  223. ExtractFileName(file.queryFilename(),fileName);
  224. if (fileName.length() && strcmp(fileName.str(),m_Added.queryFileName()) != 0 && strcmp(fileName.str(),m_Cleared.queryFileName()) != 0 )
  225. {
  226. fileName.replaceString(".log", RolloverExt);
  227. file.rename(fileName.str());
  228. }
  229. }
  230. }
  231. //Rename existing .log files (except for current added/cleared log files) to .old files
  232. void CLogFailSafe::RolloverAllLogs()
  233. {
  234. VStringBuffer filesToFind("%s%s*.log", m_LogService.str(), m_LogType.str());
  235. Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
  236. ForEach (*di)
  237. {
  238. IFile &file = di->query();
  239. StringBuffer fileName;
  240. CLogSerializer::extractFileName(file.queryFilename(), fileName);
  241. if (fileName.length() && !streq(fileName.str(), m_Added.queryFileName()) &&
  242. !streq(fileName.str(), m_Cleared.queryFileName()))
  243. {
  244. fileName.replaceString(".log", RolloverExt);
  245. file.rename(fileName.str());
  246. }
  247. }
  248. }
  249. StringBuffer& CLogFailSafe::ExtractFileName(const char* fileName,StringBuffer& FileName)
  250. {
  251. StringBuffer tmp(fileName);
  252. for(int i = tmp.length() - 1; i >=0; i--)
  253. {
  254. if(tmp.charAt(i) == '\\' || tmp.charAt(i) == '/')
  255. break;
  256. FileName.insert(0,tmp.charAt(i));
  257. }
  258. return FileName;
  259. }