LogSerializer.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #pragma warning(disable : 4786)
  14. #include "LogSerializer.hpp"
  15. #include "jexcept.hpp"
  16. #include "jfile.hpp"
  17. #include "jlog.hpp"
  18. /* Review notes: this file is modified based on:
  19. * HPCC-Platform/esp/clients/LoggingClient/LogSerializer.cpp
  20. 5 methods are added to CLogSerializer: splitLogRecord(), getItemCount(), extractFileName(),
  21. loadSendLogs(), loadAckedLogs.
  22. Minor changes are made in the existing methods.
  23. */
  24. #define TRACE_INTERVAL 100
  25. CLogSerializer::CLogSerializer()
  26. {
  27. Init();
  28. }
  29. CLogSerializer::CLogSerializer(const char* fileName)
  30. {
  31. m_FilePath.append(fileName);
  32. Init();
  33. extractFileName(m_FilePath, m_FileName);//
  34. }
  35. StringBuffer& CLogSerializer::extractFileName(const char* fullName,StringBuffer& fileName)
  36. {
  37. StringBuffer tmp(fullName);
  38. for(unsigned i = tmp.length(); i-- > 0; )
  39. {
  40. if(tmp.charAt(i) == '\\' || tmp.charAt(i) == '/')
  41. break;
  42. fileName.insert(0, tmp.charAt(i));
  43. }
  44. return fileName;
  45. }
  46. void CLogSerializer::Init()
  47. {
  48. m_bytesWritten = 0;
  49. m_ItemCount = 0;
  50. m_fileio = 0;
  51. m_file = 0;
  52. fileSize = 0;
  53. }
  54. CLogSerializer::~CLogSerializer()
  55. {
  56. DBGLOG("CLogSerializer::~CLogSerializer()");
  57. Close();
  58. }
  59. void CLogSerializer::Append(const char* GUID, const char* Data, CLogRequestInFile* reqInFile)
  60. {
  61. StringBuffer toWrite,size;
  62. toWrite.appendf("%s\t%s\r\n",GUID,Data);
  63. size.appendf("%d",toWrite.length());
  64. while (size.length() < 8)
  65. size.insert(0,'0');
  66. size.append("\t");
  67. toWrite.insert(0,size.str());
  68. //optimize
  69. CriticalBlock b(crit);
  70. unsigned len = toWrite.length();
  71. if (reqInFile)
  72. {
  73. reqInFile->setFileName(m_file->queryFilename());
  74. reqInFile->setPos(m_file->size());
  75. reqInFile->setSize(len);
  76. }
  77. m_ItemCount++;
  78. fileSize += len;
  79. m_bytesWritten += m_fileio->write(m_bytesWritten, len, toWrite.str());
  80. }
  81. void CLogSerializer::Remove(const char* GUID)
  82. {
  83. }
  84. void CLogSerializer::Open(const char*Directory,const char* NewFileName,const char* Prefix)
  85. {
  86. m_FilePath.clear();
  87. m_FilePath.append(Directory);
  88. if (!EnsureDirectory(m_FilePath))
  89. throw MakeStringException(-1,"Unable to create directory at %s.",m_FilePath.str());
  90. m_FilePath.append("/");
  91. m_FileName.clear();
  92. if (Prefix && *Prefix)
  93. m_FileName.append(Prefix).append("_");
  94. m_FileName.append(NewFileName);
  95. m_FilePath.append(m_FileName);
  96. m_file = createIFile(m_FilePath.str());
  97. m_fileio = m_file->open(IFOcreate);
  98. if (m_fileio == 0)
  99. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  100. else
  101. DBGLOG("Tank file %s successfully created", m_FilePath.str());
  102. }
  103. bool CLogSerializer::EnsureDirectory(StringBuffer& Dir)
  104. {
  105. try
  106. {
  107. Owned<IFile> pDirectory = createIFile(Dir.str());
  108. if(pDirectory->exists() == true)
  109. return true;
  110. return pDirectory->createDirectory();
  111. }
  112. catch(IException *ex)
  113. {
  114. ex->Release();
  115. }
  116. return false;
  117. }
  118. void CLogSerializer::Close()
  119. {
  120. if(m_fileio)
  121. {
  122. m_fileio->Release();
  123. m_fileio = 0;
  124. }
  125. if(m_file)
  126. {
  127. m_file->Release();
  128. m_file = 0;
  129. }
  130. m_bytesWritten = 0;//
  131. m_ItemCount = 0;//
  132. fileSize = 0;
  133. }
  134. void CLogSerializer::Rollover(const char* ClosedPrefix)
  135. {
  136. Close();
  137. Owned<IFile> file = createIFile(m_FilePath.str());
  138. if(file.get() && file->exists() == true)
  139. {
  140. StringBuffer newFileName;
  141. GetRolloverFileName(m_FileName,newFileName,ClosedPrefix);
  142. file->rename(newFileName.str());
  143. }
  144. }
  145. void CLogSerializer::SafeRollover(const char*Directory,const char* NewFileName,const char* Prefix, const char* ClosedPrefix)
  146. {
  147. CriticalBlock b(crit);
  148. Close();
  149. Init();
  150. Open(Directory, NewFileName, Prefix);
  151. }
  152. void CLogSerializer::splitLogRecord(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& line)//
  153. {
  154. //send log buffer should be in the form of 2635473460.05_01_12_16_13_57\t<cache>...</cache>
  155. //parse it into GUID and line (as <cache>...</cache>)
  156. //receive log buffer should be in the form of 2515777767.12_11_03_08_25_29\t
  157. //we want to extract the GUID only
  158. const char* begin = rawdata.toByteArray(); //no string termination character \0
  159. int len = rawdata.length();
  160. if (begin && len>0)
  161. {
  162. const char* p = begin;
  163. const char* end = begin + len;
  164. while (*p && *p != '\t' && p < end)
  165. p++;
  166. GUID.append(p-begin, begin);
  167. if (++p < end)
  168. line.append(end-p, p);
  169. }
  170. }
  171. bool CLogSerializer::readLogRequest(CLogRequestInFile* logRequestInFile, StringBuffer& logRequest)
  172. {
  173. //Open the file if exists.
  174. StringBuffer fileName(logRequestInFile->getFileName());
  175. Owned<IFile> file = createIFile(fileName);
  176. Owned<IFileIO> fileIO = file->open(IFOread);
  177. if (!fileIO)
  178. {
  179. //The file may be renamed from .log to .old.
  180. fileName.replaceString(logFileExt, rolloverFileExt);
  181. file.setown(createIFile(fileName));
  182. fileIO.setown(file->open(IFOread));
  183. if (!fileIO)
  184. {
  185. ERRLOG("Unable to open logging file %s", fileName.str());
  186. return false;
  187. }
  188. }
  189. //Read data size
  190. char dataSize[9];
  191. memset(dataSize, 0, 9);
  192. offset_t finger = logRequestInFile->getPos();
  193. size32_t bytesRead = fileIO->read(finger, 9, dataSize);
  194. if (bytesRead < 9)
  195. {
  196. ERRLOG("Failed to read logging file %s: not enough data for dataSize", fileName.str());
  197. return false;
  198. }
  199. if (dataSize[8] != '\t')
  200. {
  201. ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());
  202. return false;
  203. }
  204. dataSize[8] = 0;
  205. char* eptr = nullptr;
  206. int dataLen = (int)strtol(dataSize, &eptr, 10);
  207. if (*eptr != '\0')
  208. {
  209. ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());
  210. return false;
  211. }
  212. if (dataLen + 9 != logRequestInFile->getSize())
  213. {
  214. ERRLOG("Failed to read logging file %s: incorrect dataSize", fileName.str());
  215. return false;
  216. }
  217. //Read other data
  218. MemoryBuffer data;
  219. finger += 9;
  220. bytesRead = fileIO->read(finger, dataLen, data.reserveTruncate(dataLen));
  221. if (bytesRead < dataLen)
  222. {
  223. ERRLOG("Failed to read logging file %s: dataSize = %d, bytesRead = %d", fileName.str(), dataLen, bytesRead);
  224. return false;
  225. }
  226. //Find GUID and log request
  227. StringBuffer GUID;
  228. splitLogRecord(data, GUID, logRequest);
  229. if (strieq(GUID, logRequestInFile->getGUID()))
  230. return true;
  231. ERRLOG("Failed to read logging file %s: GUID read (%s) is not same as GUID (%s)", fileName.str(), GUID.str(), logRequestInFile->getGUID());
  232. return false;
  233. }
  234. void CLogSerializer::loadSendLogs(GuidSet& ackSet, GuidMap& missedLogs, unsigned long& total_missed)//
  235. {
  236. try
  237. {
  238. Close(); //release old file io, if any
  239. m_file = createIFile(m_FilePath.str());
  240. m_fileio = m_file->open(IFOread);
  241. if (m_fileio == 0)
  242. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  243. offset_t finger = 0;
  244. total_missed = 0;
  245. while(true)
  246. {
  247. char dataSize[9];
  248. memset(dataSize, 0, 9);
  249. size32_t bytesRead = m_fileio->read(finger,8,dataSize);
  250. if(bytesRead==0)
  251. break;
  252. MemoryBuffer data;
  253. int dataLen = atoi(dataSize);
  254. finger+=9;
  255. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  256. if(bytesRead==0)
  257. break;
  258. StringBuffer GUID,lostlogStr;
  259. splitLogRecord(data,GUID,lostlogStr);
  260. if (ackSet.find(GUID.str())==ackSet.end() && missedLogs.find(GUID.str()) == missedLogs.end())
  261. {
  262. if(total_missed % TRACE_INTERVAL == 0)
  263. DBGLOG("Miss #%lu GUID: <%s>", total_missed, GUID.str());
  264. missedLogs[GUID.str()] = lostlogStr.str();
  265. total_missed++;
  266. }
  267. finger+=dataLen;
  268. }
  269. }
  270. catch(IException* ex)
  271. {
  272. StringBuffer errorStr;
  273. ex->errorMessage(errorStr);
  274. ERRLOG("Exception caught within CSendLogSerializer::LoadDataMap: %s",errorStr.str());
  275. ex->Release();
  276. }
  277. catch(...)
  278. {
  279. DBGLOG("Unknown Exception thrown in CSendLogSerializer::LoadDataMap");
  280. }
  281. Close();
  282. }
  283. void CLogSerializer::loadAckedLogs(GuidSet& ackedLogs)//
  284. {
  285. try
  286. {
  287. Close(); //release old file io, if any
  288. m_file = createIFile(m_FilePath.str());
  289. m_fileio = m_file->open(IFOread);
  290. if (m_fileio == 0)
  291. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  292. offset_t finger = 0;
  293. m_ItemCount = 0;
  294. while(true)
  295. {
  296. char dataSize[9];
  297. memset(dataSize, 0, 9);
  298. size32_t bytesRead = m_fileio->read(finger,8,dataSize);
  299. if(bytesRead==0)
  300. break;
  301. MemoryBuffer data;
  302. int dataLen = atoi(dataSize);
  303. finger+=9;
  304. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  305. if(bytesRead==0)
  306. break;
  307. StringBuffer GUID, line;
  308. splitLogRecord(data, GUID, line);
  309. ackedLogs.insert(GUID.str());
  310. m_ItemCount++;
  311. finger+=dataLen;
  312. }
  313. fileSize = finger;
  314. DBGLOG("Total acks loaded %lu", m_ItemCount);
  315. }
  316. catch(IException* ex)
  317. {
  318. StringBuffer errorStr;
  319. ex->errorMessage(errorStr);
  320. ERRLOG("Exception caught within CLogSerializer::loadAckedLogs: %s",errorStr.str());
  321. ex->Release();
  322. }
  323. catch(...)
  324. {
  325. DBGLOG("Unknown Exception thrown in CLogSerializer::loadAckedLogs");
  326. }
  327. Close();
  328. }
  329. StringBuffer& CLogSerializer::GetRolloverFileName(StringBuffer& oldFile, StringBuffer& newfile, const char* newExtension)
  330. {
  331. newfile.append(oldFile);
  332. newfile.replaceString(logFileExt, newExtension);
  333. return newfile;
  334. }
  335. void CLogSerializer::Remove()
  336. {
  337. Close();
  338. Owned<IFile> file = createIFile(m_FilePath.str());
  339. if(file.get() && file->exists() == true)
  340. file->remove();
  341. }
  342. __int64 CLogSerializer::WroteBytes()
  343. {
  344. CriticalBlock b(crit);
  345. return m_bytesWritten;
  346. }
  347. void CSendLogSerializer::LoadDataMap(GuidMap& ACKMap,StringArray& MissedLogs)
  348. {
  349. try
  350. {
  351. m_file = createIFile(m_FilePath.str());
  352. m_fileio = m_file->open(IFOread);
  353. if (m_fileio == 0)
  354. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  355. else
  356. DBGLOG("File %s successfully opened", m_FilePath.str());
  357. long finger,bytesRead;
  358. finger = bytesRead = 0;
  359. bool bOk = true;
  360. MemoryBuffer dataSize,data;
  361. StringBuffer GUID,lostlogStr;
  362. int dataLen;
  363. unsigned int total = 0;
  364. unsigned int total_missed = 0;
  365. while(bOk)
  366. {
  367. bytesRead = m_fileio->read(finger,8,dataSize.reserveTruncate(8));
  368. if(bytesRead==0)
  369. break;
  370. finger+=9;
  371. dataLen = atoi(dataSize.toByteArray());
  372. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  373. if(bytesRead==0)
  374. break;
  375. LoadMap(data,GUID,lostlogStr);
  376. if(total % TRACE_INTERVAL == 0)
  377. {
  378. DBGLOG("Checking log #%u", total);
  379. DBGLOG("{%s}", GUID.str());
  380. }
  381. total++;
  382. int i = MissedLogs.find(lostlogStr.str());
  383. if (!(*(ACKMap[GUID.str()].c_str())) && MissedLogs.find(lostlogStr.str()) == -1)
  384. {
  385. if(total_missed % TRACE_INTERVAL == 0)
  386. {
  387. DBGLOG("Miss #%u", total_missed);
  388. DBGLOG("<%s>", GUID.str());
  389. }
  390. MissedLogs.append(lostlogStr.str());
  391. total_missed++;
  392. }
  393. finger+=dataLen;
  394. data.clear();
  395. dataSize.clear();
  396. GUID.clear();
  397. lostlogStr.clear();
  398. }
  399. DBGLOG("Total logs checked %u, total missed %u", total, total_missed);
  400. }
  401. catch(IException* ex)
  402. {
  403. StringBuffer errorStr;
  404. ex->errorMessage(errorStr);
  405. ERRLOG("Exception caught within CSendLogSerializer::LoadDataMap: %s",errorStr.str());
  406. ex->Release();
  407. }
  408. catch(...)
  409. {
  410. DBGLOG("Unknown Exception thrown in CSendLogSerializer::LoadDataMap");
  411. }
  412. Close();
  413. }
  414. void CSendLogSerializer::LoadMap(MemoryBuffer& rawdata,StringBuffer& GUID, StringBuffer& line)
  415. {
  416. line.append(rawdata.length() -1, rawdata.toByteArray());
  417. const char* strLine = line.str();
  418. while(*strLine && *strLine != '\t' && *strLine != '\0')
  419. {
  420. GUID.append(*strLine);
  421. strLine++;
  422. }
  423. }
  424. void SplitRecord(const char* strLine, StringBuffer& GUID, StringBuffer& Cache)
  425. {
  426. if(strLine==NULL || *strLine=='\0')
  427. return;
  428. while(*strLine && *strLine != '\t' && *strLine != '\0')
  429. {
  430. GUID.append(*strLine);
  431. strLine++;
  432. }
  433. strLine++;
  434. Cache.appendf("%s",strLine);
  435. }
  436. void CRecieveLogSerializer::LoadMap(MemoryBuffer& rawdata,GuidMap& GUIDmap, bool printTrace)
  437. {
  438. //buffer chould be in the form of 000000030\t2515777767.12_11_03_08_25_29\r
  439. //we want to extract the GUID only....
  440. StringBuffer line,GUID;
  441. line.append(rawdata.length() -1, rawdata.toByteArray());
  442. const char* strLine = line.str();
  443. while(*strLine && *strLine != '\t' && *strLine != '\0')
  444. {
  445. GUID.append(*strLine);
  446. strLine++;
  447. }
  448. if(printTrace)
  449. DBGLOG("[%s]", GUID.str());
  450. GUIDmap[GUID.str()] = "1";
  451. }
  452. void CRecieveLogSerializer::LoadDataMap(GuidMap& GUIDmap)
  453. {
  454. try
  455. {
  456. m_file = createIFile(m_FilePath.str());
  457. m_fileio = m_file->open(IFOread);
  458. if (m_fileio == 0)
  459. throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
  460. else
  461. DBGLOG("File %s successfully opened", m_FilePath.str());
  462. long finger,bytesRead,dataLen;
  463. finger = bytesRead = dataLen = 0;
  464. MemoryBuffer filecontents,dataSize,data;
  465. bool bOk = true;
  466. unsigned int total = 0;
  467. while(bOk)
  468. {
  469. bytesRead = m_fileio->read(finger,8,dataSize.reserveTruncate(8));
  470. if(bytesRead==0)
  471. break;
  472. finger+=9;
  473. dataLen = atoi(dataSize.toByteArray());
  474. bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
  475. if(bytesRead==0)
  476. break;
  477. bool printTrace = false;
  478. if(total % TRACE_INTERVAL == 0)
  479. {
  480. DBGLOG("Loading ack #%u", total);
  481. printTrace = true;
  482. }
  483. LoadMap(data,GUIDmap,printTrace);
  484. total++;
  485. finger+=dataLen;
  486. data.clear();
  487. dataSize.clear();
  488. }
  489. DBGLOG("Total acks loaded %u", total);
  490. }
  491. catch(IException* ex)
  492. {
  493. StringBuffer errorStr;
  494. ex->errorMessage(errorStr);
  495. ERRLOG("Exception caught within CRecieveLogSerializer::LoadDataMap: %s",errorStr.str());
  496. ex->Release();
  497. }
  498. catch(...)
  499. {
  500. DBGLOG("Unknown Exception thrown in CRecieveLogSerializer::LoadDataMap");
  501. }
  502. Close();
  503. }