LogSerializer.cpp 14 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning(disable : 4786)
  14. #include "LogSerializer.hpp"
  15. #include "jexcept.hpp"
  16. #include "jfile.hpp"
  17. #include "jlog.hpp"
  18. /* Review notes: this file is modified based on:
  19. * HPCC-Platform/esp/clients/LoggingClient/LogSerializer.cpp
  20. 5 methods are added to CLogSerializer: splitLogRecord(), getItemCount(), extractFileName(),
  21. loadSendLogs(), loadAckedLogs.
  22. Minor changes are made in the existing methods.
  23. */
  24. #define TRACE_INTERVAL 100
  25. CLogSerializer::CLogSerializer()
  26. {
  27. Init();
  28. }
  29. CLogSerializer::CLogSerializer(const char* fileName)
  30. {
  31. m_FilePath.append(fileName);
  32. Init();
  33. extractFileName(m_FilePath, m_FileName);//
  34. }
  35. StringBuffer& CLogSerializer::extractFileName(const char* fullName,StringBuffer& fileName)
  36. {
  37. StringBuffer tmp(fullName);
  38. for(unsigned i = tmp.length(); i-- > 0; )
  39. {
  40. if(tmp.charAt(i) == '\\' || tmp.charAt(i) == '/')
  41. break;
  42. fileName.insert(0, tmp.charAt(i));
  43. }
  44. return fileName;
  45. }
  46. void CLogSerializer::Init()
  47. {
  48. m_bytesWritten = 0;
  49. m_ItemCount = 0;
  50. m_fileio = 0;
  51. m_file = 0;
  52. }
  53. CLogSerializer::~CLogSerializer()
  54. {
  55. DBGLOG("CLogSerializer::~CLogSerializer()");
  56. Close();
  57. }
  58. void CLogSerializer::Append(const char* GUID, const char* Data)
  59. {
  60. StringBuffer toWrite,size;
  61. toWrite.appendf("%s\t%s\r\n",GUID,Data);
  62. size.appendf("%d",toWrite.length());
  63. while (size.length() < 8)
  64. size.insert(0,'0');
  65. size.append("\t");
  66. toWrite.insert(0,size.str());
  67. //optimize
  68. CriticalBlock b(crit);
  69. m_ItemCount++;
  70. m_bytesWritten += m_fileio->write(m_bytesWritten, toWrite.length(), toWrite.str());
  71. }
  72. void CLogSerializer::Remove(const char* GUID)
  73. {
  74. }
  75. void CLogSerializer::Open(const char*Directory,const char* NewFileName,const char* Prefix)
  76. {
  77. m_FilePath.clear();
  78. m_FilePath.append(Directory);
  79. if (!EnsureDirectory(m_FilePath))
  80. throw MakeStringException(-1,"Unable to create directory at %s.",m_FilePath.str());
  81. m_FilePath.append("/");
  82. m_FileName.clear();
  83. if (Prefix && *Prefix)
  84. m_FileName.append(Prefix).append("_");
  85. m_FileName.append(NewFileName);
  86. m_FilePath.append(m_FileName);
  87. m_file = createIFile(m_FilePath.str());
  88. m_fileio = m_file->open(IFOcreate);
  89. if (m_fileio == 0)
  90. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  91. else
  92. DBGLOG("Tank file %s successfully created", m_FilePath.str());
  93. }
  94. bool CLogSerializer::EnsureDirectory(StringBuffer& Dir)
  95. {
  96. try
  97. {
  98. Owned<IFile> pDirectory = createIFile(Dir.str());
  99. if(pDirectory->exists() == true)
  100. return true;
  101. return pDirectory->createDirectory();
  102. }
  103. catch(IException *ex)
  104. {
  105. ex->Release();
  106. }
  107. return false;
  108. }
  109. void CLogSerializer::Close()
  110. {
  111. if(m_fileio)
  112. {
  113. m_fileio->Release();
  114. m_fileio = 0;
  115. }
  116. if(m_file)
  117. {
  118. m_file->Release();
  119. m_file = 0;
  120. }
  121. m_bytesWritten = 0;//
  122. m_ItemCount = 0;//
  123. }
  124. void CLogSerializer::Rollover(const char* ClosedPrefix)
  125. {
  126. Close();
  127. Owned<IFile> file = createIFile(m_FilePath.str());
  128. if(file.get() && file->exists() == true)
  129. {
  130. StringBuffer newFileName;
  131. GetRolloverFileName(m_FileName,newFileName,ClosedPrefix);
  132. file->rename(newFileName.str());
  133. }
  134. }
  135. void CLogSerializer::SafeRollover(const char*Directory,const char* NewFileName,const char* Prefix, const char* ClosedPrefix)
  136. {
  137. CriticalBlock b(crit);
  138. Rollover(ClosedPrefix);
  139. Init();
  140. Open(Directory, NewFileName, Prefix);
  141. }
  142. void CLogSerializer::splitLogRecord(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& line)//
  143. {
  144. //send log buffer should be in the form of 2635473460.05_01_12_16_13_57\t<cache>...</cache>
  145. //parse it into GUID and line (as <cache>...</cache>)
  146. //receive log buffer should be in the form of 2515777767.12_11_03_08_25_29\t
  147. //we want to extract the GUID only
  148. const char* begin = rawdata.toByteArray(); //no string termination character \0
  149. int len = rawdata.length();
  150. if (begin && len>0)
  151. {
  152. const char* p = begin;
  153. const char* end = begin + len;
  154. while (*p && *p != '\t' && p < end)
  155. p++;
  156. GUID.append(p-begin, begin);
  157. if (++p < end)
  158. line.append(end-p, p);
  159. }
  160. }
  161. void CLogSerializer::loadSendLogs(GuidSet& ackSet, GuidMap& missedLogs, unsigned long& total_missed)//
  162. {
  163. try
  164. {
  165. Close(); //release old file io, if any
  166. m_file = createIFile(m_FilePath.str());
  167. m_fileio = m_file->open(IFOread);
  168. if (m_fileio == 0)
  169. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  170. offset_t finger = 0;
  171. total_missed = 0;
  172. while(true)
  173. {
  174. char dataSize[9];
  175. memset(dataSize, 0, 9);
  176. size32_t bytesRead = m_fileio->read(finger,8,dataSize);
  177. if(bytesRead==0)
  178. break;
  179. MemoryBuffer data;
  180. int dataLen = atoi(dataSize);
  181. finger+=9;
  182. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  183. if(bytesRead==0)
  184. break;
  185. StringBuffer GUID,lostlogStr;
  186. splitLogRecord(data,GUID,lostlogStr);
  187. if (ackSet.find(GUID.str())==ackSet.end() && missedLogs.find(GUID.str()) == missedLogs.end())
  188. {
  189. if(total_missed % TRACE_INTERVAL == 0)
  190. DBGLOG("Miss #%lu GUID: <%s>", total_missed, GUID.str());
  191. missedLogs[GUID.str()] = lostlogStr.str();
  192. total_missed++;
  193. }
  194. finger+=dataLen;
  195. }
  196. }
  197. catch(IException* ex)
  198. {
  199. StringBuffer errorStr;
  200. ex->errorMessage(errorStr);
  201. ERRLOG("Exception caught within CSendLogSerializer::LoadDataMap: %s",errorStr.str());
  202. ex->Release();
  203. }
  204. catch(...)
  205. {
  206. DBGLOG("Unknown Exception thrown in CSendLogSerializer::LoadDataMap");
  207. }
  208. Close();
  209. }
  210. void CLogSerializer::loadAckedLogs(GuidSet& ackedLogs)//
  211. {
  212. try
  213. {
  214. Close(); //release old file io, if any
  215. m_file = createIFile(m_FilePath.str());
  216. m_fileio = m_file->open(IFOread);
  217. if (m_fileio == 0)
  218. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  219. offset_t finger = 0;
  220. m_ItemCount = 0;
  221. while(true)
  222. {
  223. char dataSize[9];
  224. memset(dataSize, 0, 9);
  225. size32_t bytesRead = m_fileio->read(finger,8,dataSize);
  226. if(bytesRead==0)
  227. break;
  228. MemoryBuffer data;
  229. int dataLen = atoi(dataSize);
  230. finger+=9;
  231. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  232. if(bytesRead==0)
  233. break;
  234. StringBuffer GUID, line;
  235. splitLogRecord(data, GUID, line);
  236. ackedLogs.insert(GUID.str());
  237. m_ItemCount++;
  238. finger+=dataLen;
  239. }
  240. DBGLOG("Total acks loaded %lu", m_ItemCount);
  241. }
  242. catch(IException* ex)
  243. {
  244. StringBuffer errorStr;
  245. ex->errorMessage(errorStr);
  246. ERRLOG("Exception caught within CLogSerializer::loadAckedLogs: %s",errorStr.str());
  247. ex->Release();
  248. }
  249. catch(...)
  250. {
  251. DBGLOG("Unknown Exception thrown in CLogSerializer::loadAckedLogs");
  252. }
  253. Close();
  254. }
  255. StringBuffer& CLogSerializer::GetRolloverFileName(StringBuffer& oldFile, StringBuffer& newfile, const char* newExtension)
  256. {
  257. newfile.append(oldFile);
  258. newfile.replaceString(".log",newExtension);
  259. return newfile;
  260. }
  261. void CLogSerializer::Remove()
  262. {
  263. Close();
  264. Owned<IFile> file = createIFile(m_FilePath.str());
  265. if(file.get() && file->exists() == true)
  266. file->remove();
  267. }
  268. __int64 CLogSerializer::WroteBytes()
  269. {
  270. CriticalBlock b(crit);
  271. return m_bytesWritten;
  272. }
  273. void CSendLogSerializer::LoadDataMap(GuidMap& ACKMap,StringArray& MissedLogs)
  274. {
  275. try
  276. {
  277. m_file = createIFile(m_FilePath.str());
  278. m_fileio = m_file->open(IFOread);
  279. if (m_fileio == 0)
  280. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  281. else
  282. DBGLOG("File %s successfully opened", m_FilePath.str());
  283. long finger,bytesRead;
  284. finger = bytesRead = 0;
  285. bool bOk = true;
  286. MemoryBuffer dataSize,data;
  287. StringBuffer GUID,lostlogStr;
  288. int dataLen;
  289. unsigned int total = 0;
  290. unsigned int total_missed = 0;
  291. while(bOk)
  292. {
  293. bytesRead = m_fileio->read(finger,8,dataSize.reserveTruncate(8));
  294. if(bytesRead==0)
  295. break;
  296. finger+=9;
  297. dataLen = atoi(dataSize.toByteArray());
  298. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  299. if(bytesRead==0)
  300. break;
  301. LoadMap(data,GUID,lostlogStr);
  302. if(total % TRACE_INTERVAL == 0)
  303. {
  304. DBGLOG("Checking log #%u", total);
  305. DBGLOG("{%s}", GUID.str());
  306. }
  307. total++;
  308. int i = MissedLogs.find(lostlogStr.str());
  309. if (!(*(ACKMap[GUID.str()].c_str())) && MissedLogs.find(lostlogStr.str()) == -1)
  310. {
  311. if(total_missed % TRACE_INTERVAL == 0)
  312. {
  313. DBGLOG("Miss #%u", total_missed);
  314. DBGLOG("<%s>", GUID.str());
  315. }
  316. MissedLogs.append(lostlogStr.str());
  317. total_missed++;
  318. }
  319. finger+=dataLen;
  320. data.clear();
  321. dataSize.clear();
  322. GUID.clear();
  323. lostlogStr.clear();
  324. }
  325. DBGLOG("Total logs checked %u, total missed %u", total, total_missed);
  326. }
  327. catch(IException* ex)
  328. {
  329. StringBuffer errorStr;
  330. ex->errorMessage(errorStr);
  331. ERRLOG("Exception caught within CSendLogSerializer::LoadDataMap: %s",errorStr.str());
  332. ex->Release();
  333. }
  334. catch(...)
  335. {
  336. DBGLOG("Unknown Exception thrown in CSendLogSerializer::LoadDataMap");
  337. }
  338. Close();
  339. }
  340. void CSendLogSerializer::LoadMap(MemoryBuffer& rawdata,StringBuffer& GUID, StringBuffer& line)
  341. {
  342. line.append(rawdata.length() -1, rawdata.toByteArray());
  343. const char* strLine = line.str();
  344. while(*strLine && *strLine != '\t' && *strLine != '\0')
  345. {
  346. GUID.append(*strLine);
  347. strLine++;
  348. }
  349. }
  350. void SplitRecord(const char* strLine, StringBuffer& GUID, StringBuffer& Cache)
  351. {
  352. if(strLine==NULL || *strLine=='\0')
  353. return;
  354. while(*strLine && *strLine != '\t' && *strLine != '\0')
  355. {
  356. GUID.append(*strLine);
  357. strLine++;
  358. }
  359. strLine++;
  360. Cache.appendf("%s",strLine);
  361. }
  362. void CRecieveLogSerializer::LoadMap(MemoryBuffer& rawdata,GuidMap& GUIDmap, bool printTrace)
  363. {
  364. //buffer chould be in the form of 000000030\t2515777767.12_11_03_08_25_29\r
  365. //we want to extract the GUID only....
  366. StringBuffer line,GUID;
  367. line.append(rawdata.length() -1, rawdata.toByteArray());
  368. const char* strLine = line.str();
  369. while(*strLine && *strLine != '\t' && *strLine != '\0')
  370. {
  371. GUID.append(*strLine);
  372. strLine++;
  373. }
  374. if(printTrace)
  375. DBGLOG("[%s]", GUID.str());
  376. GUIDmap[GUID.str()] = "1";
  377. }
  378. void CRecieveLogSerializer::LoadDataMap(GuidMap& GUIDmap)
  379. {
  380. try
  381. {
  382. m_file = createIFile(m_FilePath.str());
  383. m_fileio = m_file->open(IFOread);
  384. if (m_fileio == 0)
  385. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  386. else
  387. DBGLOG("File %s successfully opened", m_FilePath.str());
  388. long finger,bytesRead,dataLen;
  389. finger = bytesRead = dataLen = 0;
  390. MemoryBuffer filecontents,dataSize,data;
  391. bool bOk = true;
  392. unsigned int total = 0;
  393. while(bOk)
  394. {
  395. bytesRead = m_fileio->read(finger,8,dataSize.reserveTruncate(8));
  396. if(bytesRead==0)
  397. break;
  398. finger+=9;
  399. dataLen = atoi(dataSize.toByteArray());
  400. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  401. if(bytesRead==0)
  402. break;
  403. bool printTrace = false;
  404. if(total % TRACE_INTERVAL == 0)
  405. {
  406. DBGLOG("Loading ack #%u", total);
  407. printTrace = true;
  408. }
  409. LoadMap(data,GUIDmap,printTrace);
  410. total++;
  411. finger+=dataLen;
  412. data.clear();
  413. dataSize.clear();
  414. }
  415. DBGLOG("Total acks loaded %u", total);
  416. }
  417. catch(IException* ex)
  418. {
  419. StringBuffer errorStr;
  420. ex->errorMessage(errorStr);
  421. ERRLOG("Exception caught within CRecieveLogSerializer::LoadDataMap: %s",errorStr.str());
  422. ex->Release();
  423. }
  424. catch(...)
  425. {
  426. DBGLOG("Unknown Exception thrown in CRecieveLogSerializer::LoadDataMap");
  427. }
  428. Close();
  429. }