Преглед изворни кода

Merge pull request #9410 from rpastrana/HPCC-16755-kevinslogqueue

HPCC-16755 Release log queue lock after a queue item is read

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday пре 8 година
родитељ
комит
1d57a2ca32
2 измењених фајлова са 29 додато и 19 уклоњено
  1. 27 19
      esp/logging/logginglib/logthread.cpp
  2. 2 0
      esp/logging/logginglib/logthread.hpp

+ 27 - 19
esp/logging/logginglib/logthread.cpp

@@ -170,17 +170,7 @@ bool CLogThread::enqueue(IEspUpdateLogRequestWrap* logRequest)
             logFailSafe->Add(GUID, reqBuf.str());
     }
 
-    {
-        CriticalBlock b(logQueueCrit);
-        int QueueSize = logQueue.ordinality();
-        if(QueueSize > maxLogQueueLength)
-            ERRLOG("LOGGING QUEUE SIZE %d EXECEEDED MaxLogQueueLength %d, check the logging server.",QueueSize, maxLogQueueLength);
-
-        if(QueueSize!=0 && QueueSize % signalGrowingQueueAt == 0)
-            ERRLOG("Logging Queue at %d records. Check the logging server.",QueueSize);
-
-        logQueue.enqueue(LINK(logRequest));
-    }
+    writeJobQueue(logRequest);
 
     m_sem.signal();
 
@@ -195,15 +185,11 @@ void CLogThread::sendLog()
             return;
 
         int recSend = 0;
-        IEspUpdateLogRequestWrap* logRequest  = 0;
-
-        CriticalBlock b(logQueueCrit);
-
-        ForEachQueueItemIn(i,logQueue)
+        while(true)
         {
-            logRequest  = (IEspUpdateLogRequestWrap*)logQueue.dequeue();
+            IEspUpdateLogRequestWrap* logRequest  = readJobQueue();
             if (!logRequest)
-                continue;
+                break;
 
             const char* GUID= logRequest->getGUID();
             if ((!GUID || !*GUID) && failSafeLogging && logFailSafe.get())
@@ -243,7 +229,7 @@ void CLogThread::sendLog()
                     else
                     {
                         willRetry = true;
-                        logQueue.enqueue(logRequest);
+                        writeJobQueue(logRequest);
                         errorMessage.appendf(" Adding back to logging queue for retrying %d.", retry);
                     }
                 }
@@ -414,3 +400,25 @@ IEspUpdateLogRequestWrap* CLogThread::unserializeLogRequestContent(const char* l
 
     return new CUpdateLogRequestWrap(guid, opt, buffer.str());
 };
+
+void CLogThread::writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite)
+{
+    if (jobToWrite)
+    {
+        CriticalBlock b(logQueueCrit);
+        int QueueSize = logQueue.ordinality();
+        if(QueueSize > maxLogQueueLength)
+            ERRLOG("LOGGING QUEUE SIZE %d EXECEEDED MaxLogQueueLength %d, check the logging server.",QueueSize, maxLogQueueLength);
+
+        if(QueueSize!=0 && QueueSize % signalGrowingQueueAt == 0)
+            ERRLOG("Logging Queue at %d records. Check the logging server.",QueueSize);
+
+        logQueue.enqueue(LINK(jobToWrite));
+    }
+}
+
+IEspUpdateLogRequestWrap* CLogThread::readJobQueue()
+{
+    CriticalBlock b(logQueueCrit);
+    return (IEspUpdateLogRequestWrap*)logQueue.dequeue();
+}

+ 2 - 0
esp/logging/logginglib/logthread.hpp

@@ -58,6 +58,8 @@ class CLogThread : public Thread , implements IUpdateLogThread
     unsigned serializeLogRequestContent(IEspUpdateLogRequestWrap* request, StringBuffer& logData);
     IEspUpdateLogRequestWrap* unserializeLogRequestContent(const char* logData);
     bool enqueue(IEspUpdateLogRequestWrap* logRequest);
+    void writeJobQueue(IEspUpdateLogRequestWrap* jobToWrite);
+    IEspUpdateLogRequestWrap* readJobQueue();
 
 public:
     IMPLEMENT_IINTERFACE;