123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #pragma warning(disable : 4786)
- #include "LogSerializer.hpp"
- #include "jexcept.hpp"
- #include "jfile.hpp"
- #include "jlog.hpp"
- /* Review notes: this file is modified based on:
- * HPCC-Platform/esp/clients/LoggingClient/LogSerializer.cpp
- 5 methods are added to CLogSerializer: splitLogRecord(), getItemCount(), extractFileName(),
- loadSendLogs(), loadAckedLogs.
- Minor changes are made in the existing methods.
- */
- #define TRACE_INTERVAL 100
- CLogSerializer::CLogSerializer()
- {
- Init();
- }
- CLogSerializer::CLogSerializer(const char* fileName)
- {
- m_FilePath.append(fileName);
- Init();
- extractFileName(m_FilePath, m_FileName);//
- }
- StringBuffer& CLogSerializer::extractFileName(const char* fullName,StringBuffer& fileName)
- {
- StringBuffer tmp(fullName);
- for(unsigned i = tmp.length(); i-- > 0; )
- {
- if(tmp.charAt(i) == '\\' || tmp.charAt(i) == '/')
- break;
- fileName.insert(0, tmp.charAt(i));
- }
- return fileName;
- }
- void CLogSerializer::Init()
- {
- m_bytesWritten = 0;
- m_ItemCount = 0;
- m_fileio = 0;
- m_file = 0;
- }
- CLogSerializer::~CLogSerializer()
- {
- DBGLOG("CLogSerializer::~CLogSerializer()");
- Close();
- }
- void CLogSerializer::Append(const char* GUID, const char* Data)
- {
- StringBuffer toWrite,size;
- toWrite.appendf("%s\t%s\r\n",GUID,Data);
- size.appendf("%d",toWrite.length());
- while (size.length() < 8)
- size.insert(0,'0');
- size.append("\t");
- toWrite.insert(0,size.str());
- //optimize
- CriticalBlock b(crit);
- m_ItemCount++;
- m_bytesWritten += m_fileio->write(m_bytesWritten, toWrite.length(), toWrite.str());
- }
- void CLogSerializer::Remove(const char* GUID)
- {
- }
- void CLogSerializer::Open(const char*Directory,const char* NewFileName,const char* Prefix)
- {
- m_FilePath.clear();
- m_FilePath.append(Directory);
- if (!EnsureDirectory(m_FilePath))
- throw MakeStringException(-1,"Unable to create directory at %s.",m_FilePath.str());
- m_FilePath.append("/");
- m_FileName.clear();
- if (Prefix && *Prefix)
- m_FileName.append(Prefix).append("_");
- m_FileName.append(NewFileName);
- m_FilePath.append(m_FileName);
- m_file = createIFile(m_FilePath.str());
- m_fileio = m_file->open(IFOcreate);
- if (m_fileio == 0)
- throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
- else
- DBGLOG("Tank file %s successfully created", m_FilePath.str());
- }
- bool CLogSerializer::EnsureDirectory(StringBuffer& Dir)
- {
- try
- {
- Owned<IFile> pDirectory = createIFile(Dir.str());
- if(pDirectory->exists() == true)
- return true;
- return pDirectory->createDirectory();
- }
- catch(IException *ex)
- {
- ex->Release();
- }
- return false;
- }
- void CLogSerializer::Close()
- {
- if(m_fileio)
- {
- m_fileio->Release();
- m_fileio = 0;
- }
- if(m_file)
- {
- m_file->Release();
- m_file = 0;
- }
- m_bytesWritten = 0;//
- m_ItemCount = 0;//
- }
- void CLogSerializer::Rollover(const char* ClosedPrefix)
- {
- Close();
- Owned<IFile> file = createIFile(m_FilePath.str());
- if(file.get() && file->exists() == true)
- {
- StringBuffer newFileName;
- GetRolloverFileName(m_FileName,newFileName,ClosedPrefix);
- file->rename(newFileName.str());
- }
- }
- void CLogSerializer::SafeRollover(const char*Directory,const char* NewFileName,const char* Prefix, const char* ClosedPrefix)
- {
- CriticalBlock b(crit);
- Rollover(ClosedPrefix);
- Init();
- Open(Directory, NewFileName, Prefix);
- }
- void CLogSerializer::splitLogRecord(MemoryBuffer& rawdata, StringBuffer& GUID, StringBuffer& line)//
- {
- //send log buffer should be in the form of 2635473460.05_01_12_16_13_57\t<cache>...</cache>
- //parse it into GUID and line (as <cache>...</cache>)
- //receive log buffer should be in the form of 2515777767.12_11_03_08_25_29\t
- //we want to extract the GUID only
- const char* begin = rawdata.toByteArray(); //no string termination character \0
- int len = rawdata.length();
- if (begin && len>0)
- {
- const char* p = begin;
- const char* end = begin + len;
- while (*p && *p != '\t' && p < end)
- p++;
- GUID.append(p-begin, begin);
- if (++p < end)
- line.append(end-p, p);
- }
- }
- void CLogSerializer::loadSendLogs(GuidSet& ackSet, GuidMap& missedLogs, unsigned long& total_missed)//
- {
- try
- {
- Close(); //release old file io, if any
- m_file = createIFile(m_FilePath.str());
- m_fileio = m_file->open(IFOread);
- if (m_fileio == 0)
- throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
- offset_t finger = 0;
- total_missed = 0;
- while(true)
- {
- char dataSize[9];
- memset(dataSize, 0, 9);
- size32_t bytesRead = m_fileio->read(finger,8,dataSize);
- if(bytesRead==0)
- break;
- MemoryBuffer data;
- int dataLen = atoi(dataSize);
- finger+=9;
- bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
- if(bytesRead==0)
- break;
- StringBuffer GUID,lostlogStr;
- splitLogRecord(data,GUID,lostlogStr);
- if (ackSet.find(GUID.str())==ackSet.end() && missedLogs.find(GUID.str()) == missedLogs.end())
- {
- if(total_missed % TRACE_INTERVAL == 0)
- DBGLOG("Miss #%lu GUID: <%s>", total_missed, GUID.str());
- missedLogs[GUID.str()] = lostlogStr.str();
- total_missed++;
- }
- finger+=dataLen;
- }
- }
- catch(IException* ex)
- {
- StringBuffer errorStr;
- ex->errorMessage(errorStr);
- ERRLOG("Exception caught within CSendLogSerializer::LoadDataMap: %s",errorStr.str());
- ex->Release();
- }
- catch(...)
- {
- DBGLOG("Unknown Exception thrown in CSendLogSerializer::LoadDataMap");
- }
- Close();
- }
- void CLogSerializer::loadAckedLogs(GuidSet& ackedLogs)//
- {
- try
- {
- Close(); //release old file io, if any
- m_file = createIFile(m_FilePath.str());
- m_fileio = m_file->open(IFOread);
- if (m_fileio == 0)
- throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
- offset_t finger = 0;
- m_ItemCount = 0;
- while(true)
- {
- char dataSize[9];
- memset(dataSize, 0, 9);
- size32_t bytesRead = m_fileio->read(finger,8,dataSize);
- if(bytesRead==0)
- break;
- MemoryBuffer data;
- int dataLen = atoi(dataSize);
- finger+=9;
- bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
- if(bytesRead==0)
- break;
- StringBuffer GUID, line;
- splitLogRecord(data, GUID, line);
- ackedLogs.insert(GUID.str());
- m_ItemCount++;
- finger+=dataLen;
- }
- DBGLOG("Total acks loaded %lu", m_ItemCount);
- }
- catch(IException* ex)
- {
- StringBuffer errorStr;
- ex->errorMessage(errorStr);
- ERRLOG("Exception caught within CLogSerializer::loadAckedLogs: %s",errorStr.str());
- ex->Release();
- }
- catch(...)
- {
- DBGLOG("Unknown Exception thrown in CLogSerializer::loadAckedLogs");
- }
- Close();
- }
- StringBuffer& CLogSerializer::GetRolloverFileName(StringBuffer& oldFile, StringBuffer& newfile, const char* newExtension)
- {
- newfile.append(oldFile);
- newfile.replaceString(".log",newExtension);
- return newfile;
- }
- void CLogSerializer::Remove()
- {
- Close();
- Owned<IFile> file = createIFile(m_FilePath.str());
- if(file.get() && file->exists() == true)
- file->remove();
- }
- __int64 CLogSerializer::WroteBytes()
- {
- CriticalBlock b(crit);
- return m_bytesWritten;
- }
- void CSendLogSerializer::LoadDataMap(GuidMap& ACKMap,StringArray& MissedLogs)
- {
- try
- {
- m_file = createIFile(m_FilePath.str());
- m_fileio = m_file->open(IFOread);
- if (m_fileio == 0)
- throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
- else
- DBGLOG("File %s successfully opened", m_FilePath.str());
- long finger,bytesRead;
- finger = bytesRead = 0;
- bool bOk = true;
- MemoryBuffer dataSize,data;
- StringBuffer GUID,lostlogStr;
- int dataLen;
- unsigned int total = 0;
- unsigned int total_missed = 0;
- while(bOk)
- {
- bytesRead = m_fileio->read(finger,8,dataSize.reserveTruncate(8));
- if(bytesRead==0)
- break;
- finger+=9;
- dataLen = atoi(dataSize.toByteArray());
- bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
- if(bytesRead==0)
- break;
- LoadMap(data,GUID,lostlogStr);
- if(total % TRACE_INTERVAL == 0)
- {
- DBGLOG("Checking log #%u", total);
- DBGLOG("{%s}", GUID.str());
- }
- total++;
- int i = MissedLogs.find(lostlogStr.str());
- if (!(*(ACKMap[GUID.str()].c_str())) && MissedLogs.find(lostlogStr.str()) == -1)
- {
- if(total_missed % TRACE_INTERVAL == 0)
- {
- DBGLOG("Miss #%u", total_missed);
- DBGLOG("<%s>", GUID.str());
- }
- MissedLogs.append(lostlogStr.str());
- total_missed++;
- }
- finger+=dataLen;
- data.clear();
- dataSize.clear();
- GUID.clear();
- lostlogStr.clear();
- }
- DBGLOG("Total logs checked %u, total missed %u", total, total_missed);
- }
- catch(IException* ex)
- {
- StringBuffer errorStr;
- ex->errorMessage(errorStr);
- ERRLOG("Exception caught within CSendLogSerializer::LoadDataMap: %s",errorStr.str());
- ex->Release();
- }
- catch(...)
- {
- DBGLOG("Unknown Exception thrown in CSendLogSerializer::LoadDataMap");
- }
- Close();
- }
- void CSendLogSerializer::LoadMap(MemoryBuffer& rawdata,StringBuffer& GUID, StringBuffer& line)
- {
- line.append(rawdata.length() -1, rawdata.toByteArray());
- const char* strLine = line.str();
- while(*strLine && *strLine != '\t' && *strLine != '\0')
- {
- GUID.append(*strLine);
- strLine++;
- }
- }
- void SplitRecord(const char* strLine, StringBuffer& GUID, StringBuffer& Cache)
- {
- if(strLine==NULL || *strLine=='\0')
- return;
- while(*strLine && *strLine != '\t' && *strLine != '\0')
- {
- GUID.append(*strLine);
- strLine++;
- }
- strLine++;
- Cache.appendf("%s",strLine);
- }
- void CRecieveLogSerializer::LoadMap(MemoryBuffer& rawdata,GuidMap& GUIDmap, bool printTrace)
- {
- //buffer chould be in the form of 000000030\t2515777767.12_11_03_08_25_29\r
- //we want to extract the GUID only....
- StringBuffer line,GUID;
- line.append(rawdata.length() -1, rawdata.toByteArray());
- const char* strLine = line.str();
- while(*strLine && *strLine != '\t' && *strLine != '\0')
- {
- GUID.append(*strLine);
- strLine++;
- }
- if(printTrace)
- DBGLOG("[%s]", GUID.str());
- GUIDmap[GUID.str()] = "1";
- }
- void CRecieveLogSerializer::LoadDataMap(GuidMap& GUIDmap)
- {
- try
- {
- m_file = createIFile(m_FilePath.str());
- m_fileio = m_file->open(IFOread);
- if (m_fileio == 0)
- throw MakeStringException(-1, "Unable to open logging file %s",m_FilePath.str());
- else
- DBGLOG("File %s successfully opened", m_FilePath.str());
- long finger,bytesRead,dataLen;
- finger = bytesRead = dataLen = 0;
- MemoryBuffer filecontents,dataSize,data;
- bool bOk = true;
- unsigned int total = 0;
- while(bOk)
- {
- bytesRead = m_fileio->read(finger,8,dataSize.reserveTruncate(8));
- if(bytesRead==0)
- break;
- finger+=9;
- dataLen = atoi(dataSize.toByteArray());
- bytesRead = m_fileio->read(finger,dataLen,data.reserveTruncate(dataLen));
- if(bytesRead==0)
- break;
- bool printTrace = false;
- if(total % TRACE_INTERVAL == 0)
- {
- DBGLOG("Loading ack #%u", total);
- printTrace = true;
- }
- LoadMap(data,GUIDmap,printTrace);
- total++;
- finger+=dataLen;
- data.clear();
- dataSize.clear();
- }
- DBGLOG("Total acks loaded %u", total);
- }
- catch(IException* ex)
- {
- StringBuffer errorStr;
- ex->errorMessage(errorStr);
- ERRLOG("Exception caught within CRecieveLogSerializer::LoadDataMap: %s",errorStr.str());
- ex->Release();
- }
- catch(...)
- {
- DBGLOG("Unknown Exception thrown in CRecieveLogSerializer::LoadDataMap");
- }
- Close();
- }
|