Procházet zdrojové kódy

Merge pull request #5203 from richardkchapman/concat-lockup

HPCC-10528 New threaded concat code can lock up

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday před 11 roky
rodič
revize
8acc0f8c14
1 změnil soubory, kde provedl 29 přidání a 16 odebrání
  1. 29 16
      roxie/ccd/ccdserver.cpp

+ 29 - 16
roxie/ccd/ccdserver.cpp

@@ -12175,9 +12175,8 @@ class CRoxieThreadedConcatReader : public CInterface, implements IRecordPullerCa
 public:
     IMPLEMENT_IINTERFACE;
     CRoxieThreadedConcatReader(InterruptableSemaphore &_ready, bool _grouped)
-    : puller(false), grouped(_grouped), ready(_ready), eof(false)
+    : puller(false), grouped(_grouped), atEog(true), ready(_ready), eof(false)
     {
-
     }
 
     void start(unsigned parentExtractSize, const byte *parentExtract, bool paused, IRoxieSlaveContext *ctx)
@@ -12204,6 +12203,7 @@ public:
             ReleaseRoxieRow(buffer.item(idx));
         buffer.clear();
         eof = false;
+        atEog = true;
     }
 
     void setInput(IRoxieInput *_in)
@@ -12232,8 +12232,7 @@ public:
 
     virtual void processDone()
     {
-        eof = true;
-        ready.signal();
+        processRow(NULL);
     }
 
     virtual bool fireException(IException *e)
@@ -12246,27 +12245,37 @@ public:
 
     bool peek(const void * &row, bool &anyActive)
     {
-        if (buffer.ordinality())
-        {
-            space.signal();
-            row = buffer.dequeue();
-            return true;
-        }
         if (!eof)
+        {
+            if (buffer.ordinality())
+            {
+                space.signal();
+                row = buffer.dequeue();
+                if (row==NULL)
+                {
+                    if (atEog)
+                    {
+                        eof = true;
+                        return false;
+                    }
+                    else
+                        atEog = true;
+                }
+                else if (grouped)
+                    atEog = false;
+                return true;
+            }
             anyActive = true;
+        }
         return false;
     }
 
-    inline bool atEof() const
-    {
-        return eof;
-    }
-
 protected:
     RecordPullerThread puller;
     InterruptableSemaphore space;
     InterruptableSemaphore &ready;
     SafeQueueOf<const void, true> buffer;
+    bool atEog;
     bool eof;
     bool grouped;
 };
@@ -12368,7 +12377,11 @@ public:
         loop
         {
             if (readyPending && !inGroup)
-                readyPending--;
+            {
+                if (readyPending > 1)
+                    ready.signal(readyPending-1);
+                readyPending = 0;
+            }
             else
                 ready.wait();
             bool anyActive = false;