Jelajahi Sumber

HPCC-21499 Use shared tank file from ESP Logging Agent

ESP Logging Manager creates a job file for each Logging Agent.
The job file contains the information about the jobs inside
the shared tank file. Logging Agents will read the job files
if available.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 6 tahun lalu
induk
melakukan
120b41af0d

+ 73 - 1
esp/logging/logginglib/LogSerializer.cpp

@@ -30,6 +30,8 @@
  */
 
 #define TRACE_INTERVAL 100
+const char* const logFileExt = ".log";
+const char* const rolloverFileExt = ".old";
 
 CLogSerializer::CLogSerializer()
 {
@@ -203,6 +205,76 @@ void CLogSerializer::splitLogRecord(MemoryBuffer& rawdata, StringBuffer& GUID, S
     }
 }
 
+bool CLogSerializer::readLogRequest(CLogRequestInFile* logRequestInFile, StringBuffer& logRequest)
+{
+    //Open the file if exists.
+    StringBuffer fileName = logRequestInFile->getFileName();
+    Owned<IFile> file = createIFile(fileName);
+    Owned<IFileIO> fileIO = file->open(IFOread);
+    if (!fileIO)
+    {
+        //The file may be renamed from .log to .old.
+        fileName.replaceString(logFileExt, rolloverFileExt);
+        file.setown(createIFile(fileName));
+        fileIO.setown(file->open(IFOread));
+        if (!fileIO)
+        {
+            ERRLOG("Unable to open logging file %s", fileName.str());
+            return false;
+        }
+    }
+
+    //Read data size
+    char dataSize[9];
+    memset(dataSize, 0, 9);
+    offset_t finger = logRequestInFile->getPos();
+    size32_t bytesRead = fileIO->read(finger, 9, dataSize);
+    if (bytesRead < 9)
+    {
+        ERRLOG("Failed to read logging file %s: not enough data for dataSize", fileName.str());
+        return false;
+    }
+    if (dataSize[8] != ' ')
+    {
+        ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());
+        return false;
+    }
+    dataSize[8] = 0;
+
+    char* eptr = nullptr;
+    int dataLen = (int)strtol(dataSize, &eptr, 8);
+    if (*eptr != '\0')
+    {
+        ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());
+        return false;
+    }
+
+    if (dataLen + 9 != logRequestInFile->getSize())
+    {
+        ERRLOG("Failed to read logging file %s: incorrect dataSize", fileName.str());
+        return false;
+    }
+
+    //Read other data
+    MemoryBuffer data;
+    finger += 9;
+    bytesRead = fileIO->read(finger, dataLen, data.reserveTruncate(dataLen));
+    if (bytesRead < dataLen)
+    {
+        ERRLOG("Failed to read logging file %s: dataSize = %d, bytesRead = %d", fileName.str(), dataLen, bytesRead);
+        return false;
+    }
+
+    //Find GUID and log request
+    StringBuffer GUID;
+    splitLogRecord(data, GUID, logRequest);
+    if (strieq(GUID, logRequestInFile->getGUID()))
+        return true;
+
+    ERRLOG("Failed to read logging file %s: GUID read (%s) is not same as GUID (%s)", fileName.str(), GUID.str(), logRequestInFile->getGUID());
+    return false;
+}
+
 void CLogSerializer::loadSendLogs(GuidSet& ackSet, GuidMap& missedLogs, unsigned long& total_missed)//
 {
     try
@@ -311,7 +383,7 @@ void CLogSerializer::loadAckedLogs(GuidSet& ackedLogs)//
 StringBuffer& CLogSerializer::GetRolloverFileName(StringBuffer& oldFile, StringBuffer& newfile, const char* newExtension)
 {
     newfile.append(oldFile);
-    newfile.replaceString(".log",newExtension);
+    newfile.replaceString(logFileExt, newExtension);
     return newfile;
 }
 

+ 1 - 0
esp/logging/logginglib/LogSerializer.hpp

@@ -90,6 +90,7 @@ public:
     unsigned long getItemCount() const { return m_ItemCount; }//
     unsigned long getFileSize() const { return fileSize; }//
     static StringBuffer& extractFileName(const char* fullName, StringBuffer& fileName);//
+    bool readLogRequest(CLogRequestInFile* file, StringBuffer& logRequest);
     virtual void loadSendLogs(GuidSet& ACKSet, GuidMap& MissedLogs, unsigned long& total_missed);//
     virtual void loadAckedLogs(GuidSet& ReceiveMap);//
 };

+ 79 - 1
esp/logging/logginglib/logthread.cpp

@@ -21,6 +21,7 @@
 #include "LoggingErrors.hpp"
 #include "LogSerializer.hpp"
 #include "logthread.hpp"
+#include "compressutil.hpp"
 
 const char* const PropMaxLogQueueLength = "MaxLogQueueLength";
 const char* const PropQueueSizeSignal = "QueueSizeSignal";
@@ -175,11 +176,16 @@ void CLogThread::sendLog()
             if ((!GUID || !*GUID) && ensureFailSafe && logFailSafe.get())
                 continue;
 
+            Owned<IEspUpdateLogRequestWrap> logRequestInFile = checkAndReadLogRequestFromSharedTankFile(logRequest);
+
             try
             {
                 unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
                 Owned<IEspUpdateLogResponse> logResponse = createUpdateLogResponse();
-                logAgent->updateLog(*logRequest, *logResponse);
+                if (logRequestInFile)
+                    logAgent->updateLog(*logRequestInFile, *logResponse);
+                else
+                    logAgent->updateLog(*logRequest, *logResponse);
                 if (!logResponse)
                     throw MakeStringException(EspLoggingErrors::UpdateLogFailed, "no response");
                 if (logResponse->getStatusCode())
@@ -206,6 +212,9 @@ void CLogThread::sendLog()
                 errorMessage.appendf("Failed to update log for %s: error code %d, error message %s", GUID, e->errorCode(), e->errorMessage(errorStr).str());
                 e->Release();
 
+                if (logRequestInFile)
+                    logRequest->setNoResend(logRequestInFile->getNoResend());
+
                 bool willRetry = false;
                 if (!logRequest->getNoResend() && (maxLogRetries != 0))
                 {
@@ -242,6 +251,75 @@ void CLogThread::sendLog()
     return;
 }
 
+//At first, we check whether the logRequest contains the information about original log Request
+//in shared tank file created by logging manager. If yes, try to read the original log Request
+//based on the information. If the original log Request is found and unserialized, return new
+//IEspUpdateLogRequestWrap which contains original log Request.
+IEspUpdateLogRequestWrap* CLogThread::checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest)
+{
+    //Read LogRequestInFile info if exists.
+    Owned<IPropertyTree> logInFle = createPTreeFromXMLString(logRequest->getUpdateLogRequest());
+    if (!logInFle)
+        return nullptr;
+
+    const char* GUID = logInFle->queryProp(LOGREQUEST_GUID);
+    if (isEmptyString(GUID))
+        return nullptr;
+
+    const char* fileName = logInFle->queryProp(LOGCONTENTINFILE_FILENAME);
+    if (isEmptyString(fileName))
+        return nullptr;
+
+    __int64 pos = logInFle->getPropInt64(LOGCONTENTINFILE_FILEPOS, -1);
+    if (pos < 0)
+        return nullptr;
+
+    int size = logInFle->getPropInt64(LOGCONTENTINFILE_FILESIZE, -1);
+    if (size < 0)
+        return nullptr;
+
+    Owned<CLogRequestInFile> reqInFile = new CLogRequestInFile();
+    reqInFile->setGUID(GUID);
+    reqInFile->setFileName(fileName);
+    reqInFile->setPos(pos);
+    reqInFile->setSize(size);
+
+    //Read Log Request from the file
+    StringBuffer logRequestStr;
+    CLogSerializer logSerializer;
+    if (!logSerializer.readLogRequest(reqInFile, logRequestStr))
+    {
+        ERRLOG("Failed to read Log Request from %s", fileName);
+        return nullptr;
+    }
+
+    try
+    {
+        Owned<IPropertyTree> logRequestTree = createPTreeFromXMLString(logRequestStr.str());
+        if (!logRequestTree)
+            return nullptr;
+
+        const char* guid = logRequestTree->queryProp("GUID");
+        const char* opt = logRequestTree->queryProp("Option");
+        const char* reqBuf = logRequestTree->queryProp("LogRequest");
+        if (isEmptyString(reqBuf))
+            return nullptr;
+
+        StringBuffer decoded, req;
+        JBASE64_Decode(reqBuf, decoded);
+        LZWExpand(decoded, decoded.length(), req);
+
+        return new CUpdateLogRequestWrap(guid, opt, req.str());
+    }
+    catch(IException* e)
+    {
+        StringBuffer errorStr;
+        ERRLOG("Exception when unserializing Log Request Content: %d %s", e->errorCode(), e->errorMessage(errorStr).str());
+        e->Release();
+    }
+    return nullptr;
+}
+
 //////////////////////////FailSafe////////////////////////////
 void CLogThread::checkRollOver()
 {

+ 1 - 0
esp/logging/logginglib/logthread.hpp

@@ -59,6 +59,7 @@ class CLogThread : public Thread , implements IUpdateLogThread
     bool enqueue(IEspUpdateLogRequestWrap* logRequest);
     void writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite);
     IEspUpdateLogRequestWrap* readJobQueue();
+    IEspUpdateLogRequestWrap* checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest);
 
 public:
     IMPLEMENT_IINTERFACE;

+ 30 - 10
esp/logging/loggingmanager/loggingmanager.cpp

@@ -221,7 +221,6 @@ bool CLoggingManager::updateLog(IEspContext* espContext, IEspUpdateLogRequestWra
     if (!initialized)
         throw MakeStringException(-1,"LoggingManager not initialized");
 
-    bool bRet = false;
     try
     {
         if (espContext)
@@ -231,17 +230,38 @@ bool CLoggingManager::updateLog(IEspContext* espContext, IEspUpdateLogRequestWra
         {
             Owned<CLogRequestInFile> reqInFile = new CLogRequestInFile();
             if (!saveToTankFile(req, reqInFile))
-                ERRLOG("LoggingManager: failed in saveToTankFile().");
-            //The reqInFile may be used for passing information to log agents in another PR.
+                throw MakeStringException(-1, "LoggingManager: failed in saveToTankFile().");
+
+            //Build new log request for logging agents
+            StringBuffer logContent, v;
+            appendXMLOpenTag(logContent, LOGCONTENTINFILE);
+            appendXMLTag(logContent, LOGCONTENTINFILE_FILENAME, reqInFile->getFileName());
+            appendXMLTag(logContent, LOGCONTENTINFILE_FILEPOS, v.append(reqInFile->getPos()));
+            appendXMLTag(logContent, LOGCONTENTINFILE_FILESIZE, v.clear().append(reqInFile->getSize()));
+            appendXMLTag(logContent, LOGREQUEST_GUID, reqInFile->getGUID());
+            appendXMLCloseTag(logContent, LOGCONTENTINFILE);
+
+            Owned<IEspUpdateLogRequest> logRequest = new CUpdateLogRequest("", "");
+            logRequest->setOption(reqInFile->getOption());
+            logRequest->setLogContent(logContent);
+            for (unsigned int x = 0; x < loggingAgentThreads.size(); x++)
+            {
+                IUpdateLogThread* loggingThread = loggingAgentThreads[x];
+                if (loggingThread->hasService(LGSTUpdateLOG))
+                {
+                    loggingThread->queueLog(logRequest);
+                }
+            }
         }
-
-        for (unsigned int x = 0; x < loggingAgentThreads.size(); x++)
+        else
         {
-            IUpdateLogThread* loggingThread = loggingAgentThreads[x];
-            if (loggingThread->hasService(LGSTUpdateLOG))
+            for (unsigned int x = 0; x < loggingAgentThreads.size(); x++)
             {
-                loggingThread->queueLog(&req);
-                bRet = true;
+                IUpdateLogThread* loggingThread = loggingAgentThreads[x];
+                if (loggingThread->hasService(LGSTUpdateLOG))
+                {
+                    loggingThread->queueLog(&req);
+                }
             }
         }
         if (espContext)
@@ -256,7 +276,7 @@ bool CLoggingManager::updateLog(IEspContext* espContext, IEspUpdateLogRequestWra
         resp.setStatusMessage(errorStr.str());
         e->Release();
     }
-    return bRet;
+    return true;
 }
 
 bool CLoggingManager::saveToTankFile(IEspUpdateLogRequestWrap& logRequest, CLogRequestInFile* reqInFile)