浏览代码

Merge branch 'candidate-7.2.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 6 年之前
父节点
当前提交
ca66a2f997

+ 3 - 1
ecl/hthor/hthor.cpp

@@ -101,7 +101,7 @@ void * checked_calloc(size_t size, size_t num, char const * label)
 
 inline bool checkIsCompressed(unsigned int flags, size32_t fixedSize, bool grouped)
 {
-    return ((flags & TDWnewcompress) || ((flags & TDXcompress) && (fixedSize+(grouped?1:0) >= MIN_ROWCOMPRESS_RECSIZE)));
+    return ((flags & TDWnewcompress) || ((flags & TDXcompress) && ((0 == fixedSize) || (fixedSize+(grouped?1:0) >= MIN_ROWCOMPRESS_RECSIZE))));
 }
 
 //=====================================================================================================
@@ -8120,6 +8120,8 @@ void CHThorDiskReadBaseActivity::resolve()
             Owned<IFileDescriptor> fdesc;
             fdesc.setown(ldFile->getFileDescriptor());
             gatherInfo(fdesc);
+            if (ldFile->isExternal())
+                compressed = checkIsCompressed(helper.getFlags(), fixedDiskRecordSize, false);//grouped=FALSE because fixedDiskRecordSize already includes grouped
             IDistributedFile *dFile = ldFile->queryDistributedFile();
             if (dFile)  //only makes sense for distributed (non local) files
             {

+ 38 - 43
esp/logging/logginglib/LogFailSafe.cpp

@@ -34,7 +34,6 @@
 #define RECEIVING "_acked_"
 #define SENDING   "_sending_"
 
-const char* const RolloverExt=".old";
 const unsigned int TRACE_PENDING_LOGS_MIN = 10;
 const unsigned int TRACE_PENDING_LOGS_MAX = 50;
 
@@ -153,30 +152,32 @@ void CLogFailSafe::LoadOldLogs(StringArray& oldLogData)
 
 void CLogFailSafe::loadPendingLogReqsFromExistingLogFiles()
 {
-    VStringBuffer fileName("%s%s%s*.log", m_LogService.str(), m_LogType.str(), SENDING);
+    //Read acked LogReqs from all of ack files.
+    //Filter out those acked LogReqs from all of sending files.
+    //Non-acked LogReqs are added into m_PendingLogs.
+    //The LogReqs in the m_PendingLogs will be added to queue and
+    //new tank files through the enqueue() in the checkPendingLogs().
+    //After that, the oldLogs will be renamed to .old file.
+    GuidSet ackedSet;
+    VStringBuffer fileName("%s%s%s*%s", m_LogService.str(), m_LogType.str(), RECEIVING, logFileExt);
+    Owned<IDirectoryIterator> it = createDirectoryIterator(m_logsdir.str(), fileName.str());
+    ForEach (*it)
+    {
+        IFile &file = it->query();
+        CLogSerializer ackedLog(file.queryFilename());
+        ackedLog.loadAckedLogs(ackedSet);
+        oldLogs.append(file.queryFilename());
+    }
+
+    fileName.setf("%s%s%s*%s", m_LogService.str(), m_LogType.str(), SENDING, logFileExt);
     Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
     ForEach (*di)
     {
         IFile &file = di->query();
-
-        StringBuffer ackedName;
-        GuidSet ackedSet;
-        getReceiveFileName(file.queryFilename(),ackedName);
-        CLogSerializer ackedLog(ackedName.str());
-        ackedLog.loadAckedLogs(ackedSet);
-
+        oldLogs.append(file.queryFilename()); //add to the files for rollover
         CLogSerializer sendLog(file.queryFilename());
         unsigned long total_missed = 0;
-        {//scope needed for critical block below
-            CriticalBlock b(m_critSec); //since we plan to use m_PendingLogs
-            sendLog.loadSendLogs(ackedSet, m_PendingLogs, total_missed);
-        }
-
-        if (total_missed == 0)
-        {
-            ackedLog.Rollover(RolloverExt);
-            sendLog.Rollover(RolloverExt);
-        }
+        sendLog.loadSendLogs(ackedSet, m_PendingLogs, total_missed);
     }
 }
 
@@ -188,8 +189,8 @@ void CLogFailSafe::generateNewFileNames(StringBuffer& sendingFile, StringBuffer&
     StringBuffer tmp;
     tmp.append(m_LogService).append(m_LogType);
 
-    sendingFile.append(tmp).append(SENDING).append(GUID).append(".log");
-    receivingFile.append(tmp).append(RECEIVING).append(GUID).append(".log");
+    sendingFile.append(tmp).append(SENDING).append(GUID).append(logFileExt);
+    receivingFile.append(tmp).append(RECEIVING).append(GUID).append(logFileExt);
 }
 
 bool CLogFailSafe::PopPendingLogRecord(StringBuffer& GUID, StringBuffer& cache)
@@ -215,7 +216,7 @@ void CLogFailSafe::createNew(const char* logType)
 {
     StringBuffer UniqueID;
     GenerateGUID(UniqueID);
-    UniqueID.append(".log");
+    UniqueID.append(logFileExt);
 
     StringBuffer send(logType),recieve(logType);
 
@@ -229,7 +230,7 @@ void CLogFailSafe::createNew(const char* logType)
 void CLogFailSafe::loadFailed(const char* logType)
 {
     StringBuffer fileName;
-    fileName.appendf("%s_sending*.log",logType);
+    fileName.appendf("%s_sending*%s", logType, logFileExt);
 
     Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), fileName.str());
     ForEach (*di)
@@ -317,8 +318,8 @@ void CLogFailSafe::AddACK(const char* GUID)
 
 void CLogFailSafe::RollCurrentLog()
 {
-    m_Added.Rollover(RolloverExt);
-    m_Cleared.Rollover(RolloverExt);
+    m_Added.Rollover(rolloverFileExt);
+    m_Cleared.Rollover(rolloverFileExt);
 }
 
 void CLogFailSafe::SafeRollover()
@@ -328,34 +329,28 @@ void CLogFailSafe::SafeRollover()
 
     // Rolling over m_Added first is desirable here beccause requests being written to the new tank file before
     // m_Cleared finishes rolling all haven't been sent yet (because the sending thread is here busy rolling).
-    m_Added.SafeRollover  (m_logsdir.str(), send.str(), NULL,   RolloverExt);
-    m_Cleared.SafeRollover(m_logsdir.str(), receive.str(), NULL, RolloverExt);
+    m_Added.SafeRollover  (m_logsdir.str(), send.str(), nullptr, rolloverFileExt);
+    m_Cleared.SafeRollover(m_logsdir.str(), receive.str(), nullptr, rolloverFileExt);
 }
 
+//Only rollover the files inside the oldLogs.
+//This is called only when a log agent is starting.
 void CLogFailSafe::RollOldLogs()
 {
-    StringBuffer filesToFind;
-    filesToFind.appendf("%s*.log",m_LogType.str());
-
-    Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
-    ForEach (*di)
+    ForEachItemIn(i, oldLogs)
     {
-        IFile &file = di->query();
-
-        StringBuffer fileName;
-        ExtractFileName(file.queryFilename(),fileName);
-        if (fileName.length() && strcmp(fileName.str(),m_Added.queryFileName()) != 0 &&  strcmp(fileName.str(),m_Cleared.queryFileName()) != 0 )
-        {
-            fileName.replaceString(".log", RolloverExt);
-            file.rename(fileName.str());
-        }
+        StringBuffer fileName = oldLogs.item(i);
+        Owned<IFile> file = createIFile(fileName);
+        fileName.replaceString(logFileExt, rolloverFileExt);
+        file->rename(fileName.str());
     }
+    oldLogs.kill();
 }
 
 //Rename existing .log files (except for current added/cleared log files) to .old files
 void CLogFailSafe::RolloverAllLogs()
 {
-    VStringBuffer filesToFind("%s%s*.log", m_LogService.str(), m_LogType.str());
+    VStringBuffer filesToFind("%s%s*%s", m_LogService.str(), m_LogType.str(), logFileExt);
     Owned<IDirectoryIterator> di = createDirectoryIterator(m_logsdir.str(), filesToFind.str());
     ForEach (*di)
     {
@@ -366,7 +361,7 @@ void CLogFailSafe::RolloverAllLogs()
         if (fileName.length() && !streq(fileName.str(), m_Added.queryFileName()) &&
             !streq(fileName.str(), m_Cleared.queryFileName()))
         {
-            fileName.replaceString(".log", RolloverExt);
+            fileName.replaceString(logFileExt, rolloverFileExt);
             file.rename(fileName.str());
         }
     }

+ 1 - 0
esp/logging/logginglib/LogFailSafe.hpp

@@ -60,6 +60,7 @@ class CLogFailSafe : implements ILogFailSafe, public CInterface
     StringArray m_UnsentLogs;
     StringBuffer m_logsdir;
     StringBuffer m_LogService;//
+    StringArray oldLogs;
 
     CriticalSection m_critSec;//
     GuidMap m_PendingLogs;//

+ 3 - 5
esp/logging/logginglib/LogSerializer.cpp

@@ -30,8 +30,6 @@
  */
 
 #define TRACE_INTERVAL 100
-const char* const logFileExt = ".log";
-const char* const rolloverFileExt = ".old";
 
 CLogSerializer::CLogSerializer()
 {
@@ -175,7 +173,7 @@ void CLogSerializer::Rollover(const char* ClosedPrefix)
 void CLogSerializer::SafeRollover(const char*Directory,const char* NewFileName,const char* Prefix, const char* ClosedPrefix)
 {
     CriticalBlock b(crit);
-    Rollover(ClosedPrefix);
+    Close();
     Init();
     Open(Directory, NewFileName, Prefix);
 }
@@ -234,7 +232,7 @@ bool CLogSerializer::readLogRequest(CLogRequestInFile* logRequestInFile, StringB
         ERRLOG("Failed to read logging file %s: not enough data for dataSize", fileName.str());
         return false;
     }
-    if (dataSize[8] != ' ')
+    if (dataSize[8] != '\t')
     {
         ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());
         return false;
@@ -242,7 +240,7 @@ bool CLogSerializer::readLogRequest(CLogRequestInFile* logRequestInFile, StringB
     dataSize[8] = 0;
 
     char* eptr = nullptr;
-    int dataLen = (int)strtol(dataSize, &eptr, 8);
+    int dataLen = (int)strtol(dataSize, &eptr, 10);
     if (*eptr != '\0')
     {
         ERRLOG("Failed to read logging file %s: incorrect data format for dataSize.", fileName.str());

+ 3 - 0
esp/logging/logginglib/LogSerializer.hpp

@@ -29,6 +29,9 @@
 typedef std::set<std::string> GuidSet;//
 typedef std::map<std::string, std::string> GuidMap;
 
+const char* const logFileExt = ".log";
+const char* const rolloverFileExt = ".old";
+
 class CLogRequestInFile : public CSimpleInterface
 {
     StringAttr fileName;

+ 9 - 6
esp/logging/logginglib/logthread.cpp

@@ -127,7 +127,7 @@ bool CLogThread::queueLog(IEspUpdateLogRequest* logRequest)
         return false;
 
     Owned<IEspUpdateLogRequestWrap> logRequestWrap = new CUpdateLogRequestWrap(NULL, logRequest->getOption(), logRequest->getLogContent());
-    return enqueue(logRequestWrap);
+    return enqueue(logRequestWrap, nullptr);
 }
 
 bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
@@ -135,16 +135,19 @@ bool CLogThread::queueLog(IEspUpdateLogRequestWrap* logRequest)
     unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
     Owned<IEspUpdateLogRequestWrap> logRequestFiltered = logAgent->filterLogContent(logRequest);
     ESPLOG(LogNormal, "LThread:filterLog: %dms\n", msTick() -  startTime);
-    return enqueue(logRequestFiltered);
+    return enqueue(logRequestFiltered, nullptr);
 }
 
-bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest)
+bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid)
 {
     if (logFailSafe.get())
     {
         StringBuffer GUID, reqBuf;
         unsigned startTime = (getEspLogLevel()>=LogNormal) ? msTick() : 0;
-        logFailSafe->GenerateGUID(GUID, NULL);
+        if (isEmptyString(guid))
+            logFailSafe->GenerateGUID(GUID, nullptr);
+        else
+            GUID.set(guid);
         logRequest->setGUID(GUID.str());
         if (serializeLogRequestContent(logRequest, reqBuf))
             logFailSafe->Add(GUID, reqBuf.str(), nullptr);
@@ -418,7 +421,7 @@ void CLogThread::checkPendingLogs(bool bOneRecOnly)
             Owned<IEspUpdateLogRequestWrap> logRequest = unserializeLogRequestContent(logData.str());
             if (!logRequest)
                 ERRLOG("checkPendingLogs: failed to unserialize: %s", logData.str());
-            else if (!enqueue(logRequest))
+            else if (!enqueue(logRequest, GUID))
             {
                 ERRLOG("checkPendingLogs: failed to add a log request to queue");
                 queueLogError=true;
@@ -429,7 +432,7 @@ void CLogThread::checkPendingLogs(bool bOneRecOnly)
         }
         //if everything went ok then we should be able to rollover the old logs.
         if (!queueLogError && !bOneRecOnly)
-            logFailSafe->RolloverAllLogs();
+            logFailSafe->RollOldLogs();
     }
     catch(IException* ex)
     {

+ 1 - 1
esp/logging/logginglib/logthread.hpp

@@ -56,7 +56,7 @@ class CLogThread : public Thread , implements IUpdateLogThread
 
     unsigned serializeLogRequestContent(IEspUpdateLogRequestWrap* request, StringBuffer& logData);
     IEspUpdateLogRequestWrap* unserializeLogRequestContent(const char* logData);
-    bool enqueue(IEspUpdateLogRequestWrap* logRequest);
+    bool enqueue(IEspUpdateLogRequestWrap* logRequest, const char* guid);
     void writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite);
     IEspUpdateLogRequestWrap* readJobQueue();
     IEspUpdateLogRequestWrap* checkAndReadLogRequestFromSharedTankFile(IEspUpdateLogRequestWrap* logRequest);

+ 2 - 1
roxie/ccd/ccdserver.cpp

@@ -11685,7 +11685,8 @@ public:
         diskmeta.set(helper.queryDiskRecordSize()->querySerializedDiskMeta());
         if (grouped)
             diskmeta.setown(createDeltaRecordSize(diskmeta, +1));
-        blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && (diskmeta->getFixedSize() >= MIN_ROWCOMPRESS_RECSIZE))); //always use new compression
+        size32_t fixedSize = diskmeta->getFixedSize();
+        blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && ((0 == fixedSize) || (fixedSize >= MIN_ROWCOMPRESS_RECSIZE)))); //always use new compression
         encrypted = false; // set later
         tallycrc = true;
         uncompressedBytesWritten = 0;