Pārlūkot izejas kodu

Merge pull request #4783 from richardkchapman/HPCC-9867-fix

Hpcc 9867 fix

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 gadi atpakaļ
vecāks
revīzija
b2211958ff
2 mainītis faili ar 123 papildinājumiem un 46 dzēšanām
  1. 100 23
      dali/ft/daftformat.cpp
  2. 23 23
      dali/ft/daftformat.ipp

+ 100 - 23
dali/ft/daftformat.cpp

@@ -227,6 +227,7 @@ void CInputBasePartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
     offset_t nextInputOffset = cursor.nextInputOffset;
     const byte *buffer = bufferBase();
 
+    bool processFullBuffer = true;
     while (nextInputOffset < splitOffset)
     {
 
@@ -234,7 +235,11 @@ void CInputBasePartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
 
         ensureBuffered(headerSize);
         assertex((headerSize ==0) || (numInBuffer != bufferOffset));
-        unsigned size = getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset);
+
+        processFullBuffer =  (nextInputOffset + blockSize) < splitOffset;
+
+        unsigned size = getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, processFullBuffer);
+
         if (size==0)
             throwError1(DFTERR_PartitioningZeroSizedRowLink,((offset_t)(buffer+bufferOffset)));
         ensureBuffered(size); 
@@ -272,8 +277,8 @@ void CInputBasePartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
 CInputBasePartitioner::CInputBasePartitioner(unsigned _headerSize, unsigned expectedRecordSize)
 {
     headerSize = _headerSize;
-    blockSize = 0x10000;
-    bufferSize = 2 * blockSize + expectedRecordSize;
+    blockSize = 0x40000;
+    bufferSize = 4 * blockSize + expectedRecordSize;
     doInputCRC = false;
     CriticalBlock block(openfilecachesect);
     if (!openfilecache) 
@@ -381,7 +386,7 @@ CFixedPartitioner::CFixedPartitioner(size32_t _recordSize) : CInputBasePartition
     recordSize = _recordSize;
 }
 
-size32_t CFixedPartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead)
+size32_t CFixedPartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer)
 {
     return recordSize;
 }
@@ -424,7 +429,7 @@ size32_t CVariablePartitioner::getRecordSize(const byte * record, unsigned maxTo
 }
 
 
-size32_t CVariablePartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead)
+size32_t CVariablePartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer)
 {
     return getRecordSize(record, maxToRead);
 }
@@ -460,7 +465,7 @@ size32_t CRECFMvbPartitioner::getRecordSize(const byte * record, unsigned maxToR
 }
 
 
-size32_t CRECFMvbPartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead)
+size32_t CRECFMvbPartitioner::getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer)
 {
     return getRecordSize(record, maxToRead);
 }
@@ -544,17 +549,25 @@ CCsvPartitioner::CCsvPartitioner(const FileFormat & _format) : CInputBasePartiti
     addActionList(matcher, format.separate.get() ? format.separate.get() : "\\,", SEPARATOR, &maxElementLength);
     addActionList(matcher, format.quote.get() ? format.quote.get() : "'", QUOTE, &maxElementLength);
     addActionList(matcher, format.terminate.get() ? format.terminate.get() : "\\n,\\r\\n", TERMINATOR, &maxElementLength);
+    const char * escape = format.escape.get();
+    if (escape && *escape)
+        addActionList(matcher,  escape, ESCAPE, &maxElementLength);
+
     matcher.queryAddEntry(1, " ", WHITESPACE);
     matcher.queryAddEntry(1, "\t", WHITESPACE);
 }
 
-size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool ateof)
+size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool processFullBuffer, bool ateof)
 {
     //more complicated processing of quotes etc....
     unsigned quote = 0;
+    unsigned quoteToStrip = 0;
     const byte * cur = start;
     const byte * end = start + maxToRead;
-    const byte * startOfColumn = cur;
+    const byte * firstGood = start;
+    const byte * lastGood = start;
+    const byte * last = start;
+    bool lastEscape = false;
 
     while (cur != end)
     {
@@ -564,44 +577,107 @@ size32_t CCsvPartitioner::getSplitRecordSize(const byte * start, unsigned maxToR
         {
         case NONE:
             cur++;          // matchLen == 0;
+            lastGood = cur;
             break;
         case WHITESPACE:
             //Skip leading whitepace
-            if (!quote&&(cur == startOfColumn))
+            if (quote)
+                lastGood = cur+matchLen;
+            else if (cur == firstGood)
             {
-                startOfColumn = cur+matchLen;
+                firstGood = cur+matchLen;
+                lastGood = cur+matchLen;
             }
             break;
         case SEPARATOR:
+            // Quoted separator
             if (quote == 0)
             {
-                startOfColumn = cur + matchLen;     // NB: Can write one past end.
+                lastEscape = false;
+                quoteToStrip = 0;
+                firstGood = cur + matchLen;
             }
+            lastGood = cur+matchLen;
             break;
         case TERMINATOR:
-            if (quote == 0)
+            if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
             {
-                return cur + matchLen - start;
+               if (processFullBuffer)
+               {
+                   last = cur + matchLen;
+                   // Reset to process a new record
+                   lastEscape = false;
+                   quoteToStrip = 0;
+                   firstGood = cur + matchLen;
+               }
+               else
+               {
+                    return (size32_t)(cur + matchLen - start);
+               }
             }
+            lastGood = cur+matchLen;
             break;
         case QUOTE:
+            // Quoted quote
             if (quote == 0)
             {
-                if (cur == startOfColumn)
+                if (cur == firstGood)
                 {
                     quote = match;
-                    startOfColumn = cur+matchLen;
+                    firstGood = cur+matchLen;
                 }
+                lastGood = cur+matchLen;
             }
             else
             {
                 if (quote == match)
-                    quote = 0;
+                {
+                    const byte * next = cur + matchLen;
+                    //Check for double quotes
+                    if ((next != end))
+                    {
+                        unsigned nextMatchLen;
+                        unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
+                        if (nextMatch == quote)
+                        {
+                            quoteToStrip = quote;
+                            matchLen += nextMatchLen;
+                            lastGood = cur+matchLen;
+                        }
+                        else
+                            quote = 0;
+                    }
+                    else
+                        quote = 0;
+                }
+                else
+                    lastGood = cur+matchLen;
             }
             break;
+        case ESCAPE:
+            lastEscape = true;
+            lastGood = cur+matchLen;
+            // If this escape is at the end, proceed to field range
+            if (lastGood == end)
+                break;
+
+            // Skip escape and ignore the next match
+            cur += matchLen;
+            match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
+            if ((match & 255) == NONE)
+                matchLen = 1;
+            lastGood += matchLen;
+            break;
+
         }
         cur += matchLen;
     }
+
+    if (processFullBuffer && (last != start))
+    {
+        return last - start;
+    }
+
     if (!ateof)
         throwError(DFTERR_EndOfRecordNotFound);
     LOG(MCdebugProgress, unknownJob, "CSV splitRecordSize(%d) at end of file", (unsigned) (end - start));
@@ -642,6 +718,7 @@ void CCsvQuickPartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
                 eof = !ensureBuffered(blockSize);
             else
                 eof = !ensureBuffered(format.maxRecordSize + maxElementLength);
+            bool fullBuffer = false;
             //Could be end of file - if no elements read.
             if (numInBuffer != bufferOffset)
             {
@@ -660,7 +737,7 @@ void CCsvQuickPartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
                 if (numInBuffer != bufferOffset)
                 {
                     if (format.maxRecordSize <= blockSize)
-                        bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, eof);
+                        bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, fullBuffer, eof);
                     else
                     {
                         //For large 
@@ -671,7 +748,7 @@ void CCsvQuickPartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
                             {
                                 //There is still going to be enough buffered for a whole record.
                                 eof = !ensureBuffered(ensureSize);
-                                bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, eof);
+                                bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, fullBuffer, eof);
                                 break;
                             }
                             catch (IException * e)
@@ -715,7 +792,7 @@ CUtfPartitioner::CUtfPartitioner(const FileFormat & _format) : CInputBasePartiti
     unitSize = format.getUnitSize();
 }
 
-size32_t CUtfPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool ateof)
+size32_t CUtfPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool processFullBuffer, bool ateof)
 {
     //If we need more complicated processing...
     const byte * cur = start;
@@ -769,6 +846,7 @@ void CUtfQuickPartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
             eof = !ensureBuffered(blockSize);
         else
             eof = !ensureBuffered(format.maxRecordSize + maxElementLength);
+        bool fullBuffer = false;
         //Could be end of file - if no elements read.
         if (numInBuffer != bufferOffset)
         {
@@ -787,7 +865,7 @@ void CUtfQuickPartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
             if (numInBuffer != bufferOffset)
             {
                 if (format.maxRecordSize <= blockSize)
-                    bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, eof);
+                    bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, fullBuffer, eof);
                 else
                 {
                     //For large 
@@ -798,7 +876,7 @@ void CUtfQuickPartitioner::findSplitPoint(offset_t splitOffset, PartitionCursor
                         {
                             //There is still going to be enough buffered for a whole record.
                             eof = !ensureBuffered(ensureSize);
-                            bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, eof);
+                            bufferOffset += getSplitRecordSize(buffer+bufferOffset, numInBuffer-bufferOffset, fullBuffer, eof);
                             break;
                         }
                         catch (IException * e)
@@ -1095,7 +1173,7 @@ CXmlPartitioner::CXmlPartitioner(const FileFormat & _format) : CInputBasePartiti
     utfFormat = getUtfFormatType(format.type);
 }
 
-size32_t CXmlPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead)
+size32_t CXmlPartitioner::getSplitRecordSize(const byte * start, unsigned maxToRead, bool processFullBuffer)
 {
     return splitter.getRecordSize(start, maxToRead, true);
 }
@@ -1712,4 +1790,3 @@ IFormatPartitioner * createFormatPartitioner(const SocketEndpoint & ep, const Fi
 
     return new CRemotePartitioner(ep, srcFormat, tgtFormat, slave, wuid);
 }
-

+ 23 - 23
dali/ft/daftformat.ipp

@@ -140,9 +140,9 @@ public:
     virtual void setInputCRC(crc32_t value) { doInputCRC = true; inputCRC = value; }
 
 protected:
-            bool ensureBuffered(unsigned required);
+    bool ensureBuffered(unsigned required);
     virtual void findSplitPoint(offset_t curOffset, PartitionCursor & cursor);
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead) = 0;
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer) = 0;
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead) = 0;
     void seekInput(offset_t offset);
     offset_t tellInput();
@@ -153,16 +153,16 @@ protected:
     }
     virtual void killBuffer()  { bufattr.clear(); }
 protected: 
-    Owned<IFileIOStream>        inStream;
-    MemoryAttr                      bufattr;
-    size32_t                        headerSize;
-    size32_t                        blockSize;
-    size32_t                        bufferSize;
-    size32_t                        numInBuffer;
-    size32_t                        bufferOffset;
-    unsigned                    inputCRC;
-    bool                        doInputCRC;
-    static IFileIOCache *openfilecache;
+    Owned<IFileIOStream>   inStream;
+    MemoryAttr             bufattr;
+    size32_t               headerSize;
+    size32_t               blockSize;
+    size32_t               bufferSize;
+    size32_t               numInBuffer;
+    size32_t               bufferOffset;
+    unsigned               inputCRC;
+    bool                   doInputCRC;
+    static IFileIOCache    *openfilecache;
     static CriticalSection openfilecachesect;
 };
 
@@ -173,7 +173,7 @@ public:
     CFixedPartitioner(unsigned _recordSize);
 
 protected:
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead);
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
 
 protected:
@@ -195,7 +195,7 @@ class DALIFT_API CRECFMvbPartitioner : public CInputBasePartitioner
     bool isBlocked;
 protected:
     virtual size32_t getRecordSize(const byte * record, unsigned maxToRead);
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead);
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
 public:
     CRECFMvbPartitioner(bool blocked);
@@ -213,7 +213,7 @@ public:
     virtual void setTarget(IOutputProcessor * _target);
 
 protected:
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead);
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
     size32_t getRecordSize(const byte * record, unsigned maxToRead);
 
@@ -231,15 +231,15 @@ public:
     virtual void setTarget(IOutputProcessor * _target);
 
 protected:
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool ateof);
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer, bool ateof);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead)
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer)
     {
-        return getSplitRecordSize(record,maxToRead,true);
+        return getSplitRecordSize(record,maxToRead,processFullBuffer,true);
     }
 
 protected:
-    enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4 };
+    enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
     unsigned        maxElementLength;
     FileFormat      format;
     StringMatcher   matcher;
@@ -286,11 +286,11 @@ public:
     virtual void setTarget(IOutputProcessor * _target);
 
 protected:
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool ateof);
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer, bool ateof);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead)
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer)
     {
-        return getSplitRecordSize(record,maxToRead,false);
+        return getSplitRecordSize(record,maxToRead,processFullBuffer,false);
     }
 
 protected:
@@ -371,7 +371,7 @@ public:
     virtual void setTarget(IOutputProcessor * _target);
 
 protected:
-    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead);
+    virtual size32_t getSplitRecordSize(const byte * record, unsigned maxToRead, bool processFullBuffer);
     virtual size32_t getTransformRecordSize(const byte * record, unsigned maxToRead);
 
 protected: