Browse Source

HPCC-21548 Support multiple active fail-safe log files

1. When a tank file is rollovered, it should not be renamed to
.old. This is because, when restarting logging agent thread,
only .log files are checked for not-acked LogReqs;
2. When restarting logging agent thread, we need to remember the
old .log files and rename those files to .old file. Current code
renames all of .log files (except for 'current' .log files),
including possible newly created, but, rollovered .log files;
3. When restarting logging agent thread, read acked LogReqs from
all of ack files. Filter out those acked LogReqs from all of
sending files. Non-acked LogReqs are added into m_PendingLogs.
When a pending job is adding back to queue, its GUID should not
be changed.

Also fix 3 bugs.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 6 years ago
parent
commit
5ca7a0e74c

+ 38 - 43
esp/logging/logginglib/LogFailSafe.cpp

@@ -34,7 +34,6 @@
 #define RECEIVING "_acked_"
 #define SENDING   "_sending_"
 
-const char* const RolloverExt=".old";
 const unsigned int TRACE_PENDING_LOGS_MIN = 10;
 const unsigned int TRACE_PENDING_LOGS_MAX = 50;
 
@@ -153,30 +152,32 @@ void CLogFailSafe::LoadOldLogs(StringArray& oldLogData)
 
 void CLogFailSafe::loadPendingLogReqsFromExistingLogFiles()
 {
-    VStringBuffer fileName("%s%s%s*.log", m_LogService.str(), m_LogType.str(), SENDING);
+    //Read acked LogReqs from all of ack files.
+    //Filter out those acked LogReqs from all of sending files.
+    //Non-acked LogReqs are added into m_PendingLogs.
+    //The LogReqs in the m_PendingLogs will be added to queue and
+    //new tank files through the enqueue() in the checkPendingLogs().
+    //After that, the oldLogs will be renamed to .old file.
+    GuidSet ackedSet;
+    VStringBuffer fileName("%s%s%s*%s", m_LogService.str(), m_LogType.str(), RECEIVING, logFileExt);
+    Owned<IDirectoryIterator> it = createDirectoryIterator(m_logsdir.str(), fileName.str());
+    ForEach (*it)
+    {
+        IFile &file = it->query();
+        CLogSerializer ackedLog(file.queryFilename());
+        ackedLog.loadAckedLogs(ackedSet);
+        oldLogs.append(file.queryFilename());
+    }
+
+    fileName.setf("%s%s%s*%s", m_LogService.str(), m_LogType.str(), SENDING, logFileExt);
     Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
     ForEach (*di)
     {
         IFile &file = di->query();
-
-        StringBuffer ackedName;
-        GuidSet ackedSet;
-        getReceiveFileName(file.queryFilename(),ackedName);
-        CLogSerializer ackedLog(ackedName.str());
-        ackedLog.loadAckedLogs(ackedSet);
-
+        oldLogs.append(file.queryFilename()); //add to the files for rollover
         CLogSerializer sendLog(file.queryFilename());
         unsigned long total_missed = 0;
-        {//scope needed for critical block below
-            CriticalBlock b(m_critSec); //since we plan to use m_PendingLogs
-            sendLog.loadSendLogs(ackedSet, m_PendingLogs, total_missed);
-        }
-
-        if (total_missed == 0)
-        {
-            ackedLog.Rollover(RolloverExt);
-            sendLog.Rollover(RolloverExt);
-        }
+        sendLog.loadSendLogs(ackedSet, m_PendingLogs, total_missed);
     }
 }
 
@@ -188,8 +189,8 @@ void CLogFailSafe::generateNewFileNames(StringBuffer& sendingFile, StringBuffer&
     StringBuffer tmp;
     tmp.append(m_LogService).append(m_LogType);
 
-    sendingFile.append(tmp).append(SENDING).append(GUID).append(".log");
-    receivingFile.append(tmp).append(RECEIVING).append(GUID).append(".log");
+    sendingFile.append(tmp).append(SENDING).append(GUID).append(logFileExt);
+    receivingFile.append(tmp).append(RECEIVING).append(GUID).append(logFileExt);
 }
 
 bool CLogFailSafe::PopPendingLogRecord(StringBuffer& GUID, StringBuffer& cache)
@@ -215,7 +216,7 @@ void CLogFailSafe::createNew(const char* logType)
 {
     StringBuffer UniqueID;
     GenerateGUID(UniqueID);
-    UniqueID.append(".log");
+    UniqueID.append(logFileExt);
 
     StringBuffer send(logType),recieve(logType);
 
@@ -229,7 +230,7 @@ void CLogFailSafe::createNew(const char* logType)
 void CLogFailSafe::loadFailed(const char* logType)
 {
     StringBuffer fileName;
-    fileName.appendf("%s_sending*.log",logType);
+    fileName.appendf("%s_sending*%s", logType, logFileExt);
 
     Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
     ForEach (*di)
@@ -317,8 +318,8 @@ void CLogFailSafe::AddACK(const char* GUID)
 
 void CLogFailSafe::RollCurrentLog()
 {
-    m_Added.Rollover(RolloverExt);
-    m_Cleared.Rollover(RolloverExt);
+    m_Added.Rollover(rolloverFileExt);
+    m_Cleared.Rollover(rolloverFileExt);
 }
 
 void CLogFailSafe::SafeRollover()
@@ -328,34 +329,28 @@ void CLogFailSafe::SafeRollover()
 
     // Rolling over m_Added first is desirable here beccause requests being written to the new tank file before
     // m_Cleared finishes rolling all haven't been sent yet (because the sending thread is here busy rolling).
-    m_Added.SafeRollover  (m_logsdir.str(), send.str(), NULL,   RolloverExt);
-    m_Cleared.SafeRollover(m_logsdir.str(), receive.str(), NULL, RolloverExt);
+    m_Added.SafeRollover  (m_logsdir.str(), send.str(), nullptr, rolloverFileExt);
+    m_Cleared.SafeRollover(m_logsdir.str(), receive.str(), nullptr, rolloverFileExt);
 }
 
+//Only rollover the files inside the oldLogs.
+//This is called only when a log agent is starting.
 void CLogFailSafe::RollOldLogs()
 {
-    StringBuffer filesToFind;
-    filesToFind.appendf("%s*.log",m_LogType.str());
-
-    Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
-    ForEach (*di)
+    ForEachItemIn(i, oldLogs)
     {
-        IFile &file = di->query();
-
-        StringBuffer fileName;
-        ExtractFileName(file.queryFilename(),fileName);
-        if (fileName.length() && strcmp(fileName.str(),m_Added.queryFileName()) != 0 &&  strcmp(fileName.str(),m_Cleared.queryFileName()) != 0 )
-        {
-            fileName.replaceString(".log", RolloverExt);
-            file.rename(fileName.str());
-        }
+        StringBuffer fileName = oldLogs.item(i);
+        Owned<IFile> file = createIFile(fileName);
+        fileName.replaceString(logFileExt, rolloverFileExt);
+        file->rename(fileName.str());
     }
+    oldLogs.kill();
 }
 
 //Rename existing .log files (except for current added/cleared log files) to .old files
 void CLogFailSafe::RolloverAllLogs()
 {
-    VStringBuffer filesToFind("%s%s*.log", m_LogService.str(), m_LogType.str());
+    VStringBuffer filesToFind("%s%s*%s", m_LogService.str(), m_LogType.str(), logFileExt);
     Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
     ForEach (*di)
     {
@@ -366,7 +361,7 @@ void CLogFailSafe::RolloverAllLogs()
         if (fileName.length() && !streq(fileName.str(), m_Added.queryFileName()) &&
             !streq(fileName.str(), m_Cleared.queryFileName()))
         {
-            fileName.replaceString(".log", RolloverExt);
+            fileName.replaceString(logFileExt, rolloverFileExt);
             file.rename(fileName.str());
         }
     }

+ 1 - 0
esp/logging/logginglib/LogFailSafe.hpp

@@ -60,6 +60,7 @@ class CLogFailSafe : implements ILogFailSafe, public CInterface
     StringArray m_UnsentLogs;
     StringBuffer m_logsdir;
     StringBuffer m_LogService;//
+    StringArray oldLogs;
 
     CriticalSection m_critSec;//
     GuidMap m_PendingLogs;//

+ 3 - 5
esp/logging/logginglib/LogSerializer.cpp

@@ -30,8 +30,6 @@
  */
 
 #define TRACE_INTERVAL 100
-const char* const logFileExt = ".log";
-const char* const rolloverFileExt = ".old";
 
 CLogSerializer::CLogSerializer()
 {
@@ -175,7 +173,7 @@ void CLogSerializer::Rollover(const char* ClosedPrefix)
 void CLogSerializer::SafeRollover(const char*Directory,const char* NewFileName,const char* Prefix, const char* ClosedPrefix)
 {
     CriticalBlock b(crit);
-    Rollover(ClosedPrefix);
+    Close();
     Init();
     Open(Directory, NewFileName, Prefix);
 }
@@ -234,7 +232,7 @@ bool CLogSerializer::readLogRequest(CLogRequestInFile* logRequestInFile, StringB
         ERRLOG("Failed to read logging file %s: not enough data for dataSize", fileName.str());
         return false;
     }
-    if (dataSize[8] != ' ')
+    if (dataSize[8] != '\t')
     {
         ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());
         return false;
@@ -242,7 +240,7 @@ bool CLogSerializer::readLogRequest(CLogRequestInFile* logRequestInFile, StringB
     dataSize[8] = 0;
 
     char* eptr = nullptr;
-    int dataLen = (int)strtol(dataSize, &eptr, 8);
+    int dataLen = (int)strtol(dataSize, &eptr, 10);
     if (*eptr != '\0')
     {
         ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());

+ 3 - 0
esp/logging/logginglib/LogSerializer.hpp

@@ -29,6 +29,9 @@
 typedef std::set<std::string> GuidSet;//
 typedef std::map<std::string, std::string> GuidMap;
 
+const char* const logFileExt = ".log";
+const char* const rolloverFileExt = ".old";
+
 class CLogRequestInFile : public CSimpleInterface
 {
     StringAttr fileName;

+ 9 - 6
esp/logging/logginglib/logthread.cpp

@@ -127,7 +127,7 @@ bool CLogThread::queueLog(IEspUpdateLogRequest* logRequest)
         return false;
 
     Owned<IEspUpdateLogRequestWrap> logRequestWrap = new CUpdateLogRequestWrap(NULL, logRequest->getOption(), logRequest->getLogContent());
-    return enqueue(logRequestWrap);
+    return enqueue(logRequestWrap, nullptr);
 }
 
 bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
@@ -135,16 +135,19 @@ bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
     unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
     Owned<IEspUpdateLogRequestWrap> logRequestFiltered = logAgent->filterLogContent(logRequest);
     ESPLOG(LogNormal, "LThread:filterLog: %dms\n", msTick() -  startTime);
-    return enqueue(logRequestFiltered);
+    return enqueue(logRequestFiltered, nullptr);
 }
 
-bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest)
+bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid)
 {
     if (logFailSafe.get())
     {
         StringBuffer GUID, reqBuf;
         unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
-        logFailSafe->GenerateGUID(GUID, NULL);
+        if (isEmptyString(guid))
+            logFailSafe->GenerateGUID(GUID, nullptr);
+        else
+            GUID.set(guid);
         logRequest->setGUID(GUID.str());
         if (serializeLogRequestContent(logRequest, reqBuf))
             logFailSafe->Add(GUID, reqBuf.str(), nullptr);
@@ -418,7 +421,7 @@ void CLogThread::checkPendingLogs(bool bOneRecOnly)
             Owned<IEspUpdateLogRequestWrap> logRequest = unserializeLogRequestContent(logData.str());
             if (!logRequest)
                 ERRLOG("checkPendingLogs: failed to unserialize: %s", logData.str());
-            else if (!enqueue(logRequest))
+            else if (!enqueue(logRequest, GUID))
             {
                 ERRLOG("checkPendingLogs: failed to add a log request to queue");
                 queueLogError=true;
@@ -429,7 +432,7 @@ void CLogThread::checkPendingLogs(bool bOneRecOnly)
         }
         //if everything went ok then we should be able to rollover the old logs.
         if (!queueLogError && !bOneRecOnly)
-            logFailSafe->RolloverAllLogs();
+            logFailSafe->RollOldLogs();
     }
     catch(IException* ex)
     {

+ 1 - 1
esp/logging/logginglib/logthread.hpp

@@ -56,7 +56,7 @@ class CLogThread : public Thread , implements IUpdateLogThread
 
     unsigned serializeLogRequestContent(IEspUpdateLogRequestWrap* request, StringBuffer& logData);
     IEspUpdateLogRequestWrap* unserializeLogRequestContent(const char* logData);
-    bool enqueue(IEspUpdateLogRequestWrap* logRequest);
+    bool enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid);
     void writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite);
     IEspUpdateLogRequestWrap* readJobQueue();
     IEspUpdateLogRequestWrap* checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest);