Browse Source

Merge pull request #15011 from jakesmith/hpcc-25932-super-csv-headerlines

HPCC-25932 Fix potential loss of lines reading super csv

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 years ago
parent
commit
be139effae
1 changed files with 29 additions and 19 deletions
  1. 29 19
      thorlcr/activities/csvread/thcsvrslave.cpp

+ 29 - 19
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -202,7 +202,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
                         for (;;)
                         {
                             msgMb.read(which);
-                            assertex(!gotHeaderLines->testSet(which));
+                            verifyex(!gotHeaderLines->testSet(which));
                             msgMb.read(headerLinesRemaining[which]);
                             if (subFile == which)
                             {
@@ -240,25 +240,33 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
         {
             if (sentHeaderLines->testSet(subFile))
                 return;
-            unsigned which = gotHeaderLines->scan(0, false);
-            if (which == subFiles) // all received
+
+            /* Before we can send state of headerLines of subfiles,
+             * need to have received any updates from previous worker.
+             * The previous worker will have sent updates as it progressed,
+             * and info. re. all files it is not dealing with (and all remaining if stopped) */
+            while (true)
             {
-                bool someLeft=false;
-                unsigned hL=0;
-                for (; hL<subFiles; hL++)
-                {
-                    if (headerLinesRemaining[hL])
-                    {
-                        someLeft = true;
-                        break;
-                    }
-                }
-                if (!someLeft)
+                unsigned which = gotHeaderLines->scan(0, false);
+                if (which == subFiles) // all received
+                    break;
+                getHeaderLines(which);
+            }
+            bool someLeft=false;
+            unsigned hL=0;
+            for (; hL<subFiles; hL++)
+            {
+                if (headerLinesRemaining[hL])
                 {
-                    sendAllDone();
-                    return;
+                    someLeft = true;
+                    break;
                 }
             }
+            if (!someLeft)
+            {
+                sendAllDone();
+                return;
+            }
         }
         else
         {
@@ -275,9 +283,11 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
         {
             if (NotFound == localLastPart[s])
             {
-                sentHeaderLines->testSet(s);
-                msgMb.append(s);
-                msgMb.append(headerLinesRemaining[s]);
+                if (!sentHeaderLines->testSet(s)) // may have already sent on previous sendHeaderLines() call
+                {
+                    msgMb.append(s);
+                    msgMb.append(headerLinesRemaining[s]);
+                }
             }
         }
         queryJobChannel().queryJobComm().send(msgMb, queryJobChannel().queryMyRank()+1, mpTag);