瀏覽代碼

Merge pull request #10797 from jakesmith/hpcc-18986

HPCC-18986 Fix potential stall with csvread with header lines

Reviewed-By: Mark Kelly <mark.kelly@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 年之前
父節點
當前提交
cf26299183
共有 1 個文件被更改,包括 29 次插入26 次删除
  1. 29 26
      thorlcr/activities/csvread/thcsvrslave.cpp

+ 29 - 26
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -59,11 +59,38 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
         bool readFinished;
         offset_t localOffset;
         size32_t maxRowSize;
+        bool processHeaderLines = false;
 
         unsigned splitLine()
         {
             if (inputStream->eos())
                 return 0;
+            if (processHeaderLines)
+            {
+                processHeaderLines = false;
+                unsigned subFile = 0;
+                unsigned pnum = partDesc->queryPartIndex();
+                if (activity.superFDesc)
+                {
+                    unsigned lnum;
+                    if (!activity.superFDesc->mapSubPart(pnum, subFile, lnum))
+                        throwUnexpected(); // was validated earlier
+                    pnum = lnum;
+                }
+                unsigned &headerLinesRemaining = activity.getHeaderLines(subFile);
+                if (headerLinesRemaining)
+                {
+                    do
+                    {
+                        unsigned lineLength = splitLine();
+                        if (0 == lineLength)
+                            break;
+                        inputStream->skip(lineLength);
+                    }
+                    while (--headerLinesRemaining);
+                }
+                activity.sendHeaderLines(subFile, pnum);
+            }
             size32_t minRequired = 4096; // MORE - make configurable
             size32_t thisLineLength;
             for (;;)
@@ -119,33 +146,9 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
                 CriticalBlock block(statsCs);
                 iFileIO.setown(partFileIO.getClear());
             }
-
             inputStream.setown(createFileSerialStream(iFileIO));
             if (activity.headerLines)
-            {
-                unsigned subFile = 0;
-                unsigned pnum = partDesc->queryPartIndex();
-                if (activity.superFDesc)
-                {
-                    unsigned lnum;
-                    if (!activity.superFDesc->mapSubPart(pnum, subFile, lnum))
-                        throwUnexpected(); // was validated earlier
-                    pnum = lnum;
-                }
-                unsigned &headerLinesRemaining = activity.getHeaderLines(subFile);
-                if (headerLinesRemaining)
-                {
-                    do
-                    {
-                        unsigned lineLength = splitLine();
-                        if (0 == lineLength)
-                            break;
-                        inputStream->skip(lineLength);
-                    }
-                    while (--headerLinesRemaining);
-                }
-                activity.sendHeaderLines(subFile, pnum);
-            }
+                processHeaderLines = true;
         }
         virtual void close(CRC32 &fileCRC)
         {
@@ -397,7 +400,7 @@ public:
             cachedMetaInfo.isSource = true;
             getPartsMetaInfo(cachedMetaInfo, partDescs.ordinality(), partDescs.getArray(), partHandler);
             cachedMetaInfo.unknownRowsOutput = true; // at least I don't think we know
-            cachedMetaInfo.fastThrough = true;
+            cachedMetaInfo.fastThrough = (0 == headerLines);
         }
         info = cachedMetaInfo;
     }