소스 검색

HPCC-24910 Records may still be pulled by PullerThread after stopped

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 년 전
부모
커밋
1759b15b85
1개의 변경된 파일10개의 추가작업 그리고 6개의 파일을 삭제
  1. 10 6
      roxie/ccd/ccdserver.cpp

+ 10 - 6
roxie/ccd/ccdserver.cpp

@@ -2184,7 +2184,8 @@ protected:
     IEngineRowStream *inputStream;
     IRecordPullerCallback *helper;
     Semaphore started;                      // MORE: GH->RKC I'm pretty sure this can be deleted, since handled by RestartableThread
-    bool groupAtOnce, eof, eog;
+    bool groupAtOnce, eog;
+    std::atomic<bool> eof;
     CriticalSection crit;
 
 public:
@@ -2277,6 +2278,7 @@ public:
             CriticalBlock c(crit); // stop is called on our consumer's thread. We need to take care calling stop for our input to make sure it is not in mid-nextRow etc etc.
             if (inputStream)
                 inputStream->stop();
+            eof = true;
         }
         RestartableThread::join();
     }
@@ -2318,12 +2320,13 @@ public:
     {
         if (eof)
             return false;
-        while (preload)
+        while (preload && !eof)
         {
-            const void * row;
+            const void * row = nullptr;
             {
                 CriticalBlock c(crit); // See comments in stop for why this is needed
-                row = inputStream->nextRow();
+                if (!eof)
+                    row = inputStream->nextRow();
             }
             if (row)
             {
@@ -2352,10 +2355,11 @@ public:
         unsigned rowsDone = 0;
         while (preload && !eof)
         {
-            const void *row;
+            const void *row = nullptr;
             {
                 CriticalBlock c(crit);
-                row = inputStream->nextRow();
+                if (!eof)
+                    row = inputStream->nextRow();
             }
             if (row)
             {