Browse Source

HPCC-18994 Spraying small JSON file - "Read beyond buffer" error

Fix the problem when the file has less objects than number of file parts.
Fix the problem when a splitOffset points into an already processes area.

Add spray_test_json.ecl test to Regression Suite to exercise
Despray - Spray with:

- a small (less objects than the number of fileparts  on multislave
  environment)

- a smal unbalanced (one record/object is much larger than the others)

- a bigger (number of nodes * 100 records/objects) JSON file.

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 7 years ago
parent
commit
2a10c61e95

+ 6 - 1
dali/ft/daftformat.cpp

@@ -94,6 +94,9 @@ void CPartitioner::commonCalcPartitions()
     if (endOffset == totalSize) lastSplit = numParts-1;
     if (lastSplit >= numParts) lastSplit = numParts-1;                                      // very rare with variable length records, last file is very small or copying a couple of records 50 ways.
 
+    //LOG(MCdebugInfo, unknownJob, "commonCalcPartitions: partSize:%lld, endOffset: %lld, firstSplit: %d, lastSplit: %d ",partSize ,endOffset, firstSplit, lastSplit);
+    JSON_DBGLOG("commonCalcPartitions: partSize:%lld, endOffset: %lld, firstSplit: %d, lastSplit: %d ",partSize ,endOffset, firstSplit, lastSplit);
+
     if (!partSeparator.isEmpty() && appendingContent) //appending to existing content, add a separator if necessary
     {
         Owned<PartitionPoint> separator = new PartitionPoint;
@@ -122,6 +125,7 @@ void CPartitioner::commonCalcPartitions()
             splitPoint =  (split * totalSize) / numParts;
         else
             splitPoint =  split * partSize;
+        JSON_DBGLOG("commonCalcPartitions: split:%d, splitPoint: %lld",split ,splitPoint);
         findSplitPoint(splitPoint, cursor);
         const offset_t inputOffset = cursor.inputOffset;
         assertex(inputOffset >= thisOffset && inputOffset <= thisOffset + thisSize);
@@ -130,6 +134,7 @@ void CPartitioner::commonCalcPartitions()
         if ((split != firstSplit) || (inputOffset != startInputOffset))
         {
             results.append(*new PartitionPoint(whichInput, split-1, startInputOffset-thisOffset+thisHeaderSize, inputOffset - startInputOffset - cursor.trimLength, cursor.outputOffset-startOutputOffset));
+            JSON_DBGLOG("commonCalcPartitions: startInputOffset:%lld, inputOffset: %lld, startOutputOffset: %lld, cursor.outputOffset: %lld",startInputOffset ,inputOffset, startOutputOffset, cursor.outputOffset);
             startInputOffset = inputOffset;
             startOutputOffset = cursor.outputOffset;
         }
@@ -137,7 +142,7 @@ void CPartitioner::commonCalcPartitions()
 
     assertex(startInputOffset != endOffset || splitAfterPoint());
     findSplitPoint(endOffset, cursor);
-
+    JSON_DBGLOG("commonCalcPartitions: lastSplit: %d, startInputOffset: %lld, thisOffset: %lld, thisHeaderSize: %d, startOutputOffset: %lld, cursor.outputOffset: %lld", lastSplit, startInputOffset, thisOffset, thisHeaderSize ,startOutputOffset, cursor.outputOffset);
     killBuffer(); // don't keep buffer longer than needed
 
     results.append(*new PartitionPoint(whichInput, lastSplit, startInputOffset-thisOffset+thisHeaderSize, endOffset - startInputOffset, cursor.outputOffset-startOutputOffset));

+ 58 - 12
dali/ft/daftformat.ipp

@@ -25,6 +25,19 @@
 #include "rmtpass.hpp"
 #include "jptree.hpp"
 
+
+//#define JSON_DEBUG 1
+
+#ifdef JSON_DEBUG
+    #ifdef DEBUG
+        #define JSON_DBGLOG(...) DBGLOG(__VA_ARGS__)
+    #else
+        #define JSON_DBGLOG(...)
+    #endif
+#else
+    #define JSON_DBGLOG(...)
+#endif
+
 //---------------------------------------------------------------------------
 
 class DALIFT_API CPartitioner : implements IFormatProcessor, public CInterface
@@ -45,7 +58,6 @@ protected:
     virtual void findSplitPoint(offset_t curOffset, PartitionCursor & cursor) = 0;
     virtual bool splitAfterPoint() { return false; }
     virtual void killBuffer() = 0;
-    
 
     void commonCalcPartitions();
 
@@ -125,7 +137,7 @@ protected:
 
 
 //---------------------------------------------------------------------------
-// More complex processors that need to read the source file - e.g. because 
+// More complex processors that need to read the source file - e.g. because
 // output offset being calculated.
 
 
@@ -151,13 +163,13 @@ protected:
     void seekInput(offset_t offset);
     offset_t tellInput();
 
-    inline byte *bufferBase()  
-    { 
-        return (byte *)((bufattr.length()!=bufferSize)?bufattr.allocate(bufferSize):bufattr.bufferBase()); 
+    inline byte *bufferBase()
+    {
+        return (byte *)((bufattr.length()!=bufferSize)?bufattr.allocate(bufferSize):bufattr.bufferBase());
     }
     virtual void killBuffer()  { bufattr.clear(); }
     virtual void clearBufferOverrun() { numOfBufferOverrun = 0; numOfProcessedBytes = 0; }
-protected: 
+protected:
     Owned<IFileIOStream>   inStream;
     MemoryAttr             bufattr;
     size32_t               headerSize;
@@ -252,7 +264,7 @@ protected:
 
 private:
     void storeFieldName(const char * start, unsigned len);
-    
+
 protected:
     enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
     unsigned        maxElementLength;
@@ -541,24 +553,57 @@ public:
 protected:
     virtual void findSplitPoint(offset_t splitOffset, PartitionCursor & cursor)
     {
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: splitOffset %lld", splitOffset);
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: cursor(inputOffset: %lld, nextInputOffset: %lld, outputOffset: %lld, trimLength: %lld",
+                        cursor.inputOffset, cursor.nextInputOffset, cursor.outputOffset, cursor.trimLength);
+        cursor.inputOffset = 0;
         if (!splitOffset) //header + 0 is first offset
             return;
 
+        if (eof)
+            return;
+
+        // To prevent the splitOffset points into an already processed file area
+        if (splitOffset < cursor.nextInputOffset)
+            splitOffset = cursor.nextInputOffset;
+
         offset_t prevRowEnd = 0;
-        json->findRowEnd(splitOffset-thisOffset + thisHeaderSize, prevRowEnd); //false return just means we're processing the end
-        if (!json->checkFoundRowStart())
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: thisOffset: %lld, thisHeaderSize: %u, prevRowEnd: %lld", thisOffset, thisHeaderSize, prevRowEnd);
+        bool foundRowEnd = json->findRowEnd(splitOffset-thisOffset + thisHeaderSize, prevRowEnd);
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: thisOffset: %lld, thisHeaderSize: %u, prevRowEnd: %lld, foundRowEnd:%s ", thisOffset, thisHeaderSize, prevRowEnd, (foundRowEnd ? "True" : "False"));
+        if (! foundRowEnd) //false return just means we're processing the end
+        {
+            //cursor.inputOffset = prevRowEnd;
             return;
-        if (!json->newRowSet) //get rid of extra delimiter if we haven't closed and reopened in the meantime
+        }
+
+        bool checkFoundRowStartRes = json->checkFoundRowStart();
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: checkFoundRowStartRes:%s)", (checkFoundRowStartRes ? "True" : "False"));
+        if (!checkFoundRowStartRes)
+            return;
+
+        bool isNewRowSet = json->newRowSet;
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: isNewRowSet:%s", (isNewRowSet ? "True" : "False"));
+        if (!isNewRowSet) //get rid of extra delimiter if we haven't closed and reopened in the meantime
         {
             cursor.trimLength = json->rowStart - prevRowEnd;
             if (cursor.trimLength && json->isRootless()) //compensate for difference in rootless offset
                 cursor.trimLength--;
         }
         cursor.inputOffset = json->getRowOffset() + thisOffset;
+        JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: eof:%s, cursor.inputOffset:%lld", (eof ? "True" : "False"), cursor.inputOffset);
         if (json->findNextRow())
+        {
+            cursor.inputOffset = json->getRowOffset() + thisOffset;
             cursor.nextInputOffset = json->getRowOffset() + thisOffset;
+        }
         else
+        {
+            cursor.inputOffset  =  prevRowEnd;
             cursor.nextInputOffset = cursor.inputOffset;  //eof
+            eof = true;
+            cursor.trimLength = 0;
+        }
     }
 
 protected:
@@ -567,6 +612,7 @@ protected:
     Owned<JsonSplitter> json;
     static IFileIOCache    *openfilecache;
     static CriticalSection openfilecachesect;
+    bool eof = false;
 };
 
 
@@ -605,7 +651,7 @@ public:
     size32_t getEndOfRecord(const byte * record, unsigned maxToRead);
     offset_t getHeaderLength(BufferedDirectReader & reader);
     offset_t getFooterLength(BufferedDirectReader & reader, offset_t size);
-    
+
     unsigned getMaxElementLength() { return maxElementLength; }
 
 protected:
@@ -710,7 +756,7 @@ public:
 
 protected:
     offset_t                outputOffset;
-    OwnedIFileIOStream      out; 
+    OwnedIFileIOStream      out;
 };
 
 class DALIFT_API CFixedOutputProcessor : public COutputProcessor

+ 13 - 10
dali/ft/ftbase.cpp

@@ -87,6 +87,9 @@ PartitionPoint::PartitionPoint(unsigned _whichInput, unsigned _whichOutput, offs
     inputOffset = _inputOffset;
     inputLength = _inputLength;
     outputLength = _outputLength;
+#ifdef DEBUG
+    display();
+#endif
 }
 
 
@@ -121,9 +124,9 @@ void PartitionPoint::deserialize(MemoryBuffer & in)
 void PartitionPoint::display()
 {
     StringBuffer fulli, fullo;
-    LOG(MCdebugInfoDetail, unknownJob, 
+    LOG(MCdebugInfoDetail, unknownJob,
              "Partition %s{%d}[%" I64F "d size %" I64F "d]->%s{%d}[%" I64F "d size %" I64F "d]",
-             inputName.getPath(fulli).str(), whichInput, inputOffset, inputLength, 
+             inputName.getPath(fulli).str(), whichInput, inputOffset, inputLength,
              outputName.getPath(fullo).str(), whichOutput, outputOffset, outputLength);
 }
 
@@ -443,10 +446,10 @@ void FileFormat::serializeExtra(MemoryBuffer & out, unsigned version) const
     }
 }
 
-void FileFormat::set(const FileFormat & src) 
-{ 
-    type = src.type; 
-    recordSize = src.recordSize; 
+void FileFormat::set(const FileFormat & src)
+{
+    type = src.type;
+    recordSize = src.recordSize;
     maxRecordSize = src.maxRecordSize;
     separate.set(src.separate);
     quote.set(src.quote);
@@ -505,7 +508,7 @@ const char * getHeaderText(FileFormatType type)
     case FFTutf:
     case FFTutf8:
         return "\xEF\xBB\xBF";
-    case FFTutf16: 
+    case FFTutf16:
         return "\xFE\xFF";
     case FFTutf32:
         return "\x00\x00\xFE\xFF";
@@ -540,7 +543,7 @@ void OutputProgress::reset()
 }
 
 MemoryBuffer & OutputProgress::deserializeCore(MemoryBuffer & in)
-{ 
+{
     unsigned _inputCRC, _outputCRC;
     bool hasTime;
     in.read(status).read(whichPartition).read(hasInputCRC).read(_inputCRC).read(inputLength).read(_outputCRC).read(outputLength).read(hasTime);
@@ -576,8 +579,8 @@ void OutputProgress::trace()
     LOG(MCdebugInfoDetail, unknownJob, "Chunk %d status: %s  input length: %" I64F "d[CRC:%x] -> output length:%" I64F "d[CRC:%x]", whichPartition, statusText[status], inputLength, inputCRC, outputLength, outputCRC);
 }
 
-MemoryBuffer & OutputProgress::serializeCore(MemoryBuffer & out)        
-{ 
+MemoryBuffer & OutputProgress::serializeCore(MemoryBuffer & out)
+{
     bool hasTime = !resultTime.isNull();
     unsigned _inputCRC = inputCRC;
     unsigned _outputCRC = outputCRC;

+ 11 - 0
testing/regress/ecl/key/spray_test_json.xml

@@ -0,0 +1,11 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><result>Despray Pass</result></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><result>Spray Pass</result></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>Compare Pass</Result_4></Row>
+</Dataset>

+ 204 - 0
testing/regress/ecl/spray_test_json.ecl

@@ -0,0 +1,204 @@
+/*##############################################################################
+
+    Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+//nohthor
+//class=spray
+
+//version isSmallFile=true,isUnBallanced=false
+//version isSmallFile=true,isUnBallanced=true
+//version isSmallFile=false
+
+import std.system.thorlib;
+import Std.File AS FileServices;
+import ^ as root;
+
+jlib:= SERVICE
+    unsigned8 rtlTick() : library='jlib',eclrtl,entrypoint='rtlNano';
+END;
+
+isSmallFile := #IFDEFINED(root.isSmallFile, true);
+
+isUnBallanced := #IFDEFINED(root.isUnBallanced, false);
+
+dropzonePath := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath');
+engine := thorlib.platform() : stored('thor');
+prefix := engine + '-';
+suffix := '-' + jlib.rtlTick() : stored('startTime');
+nodes := thorlib.nodes();
+
+unsigned VERBOSE := 0;
+
+Layout_Person := RECORD
+  UNSIGNED4 PersonID;
+  STRING    FirstName;
+  STRING25  LastName;
+END;
+
+allPeople := DATASET([ {1,'Fred','Smith'},
+                       {2,'Joe','Blow'},
+                       {3,'Jane','Smith'}],Layout_Person);
+
+// One record, object is much larger tha the others. It is testing the
+// split point calculation on multi target environment.
+unBallanced := DATASET([ {1,'Fred','Smith'},
+                       {2,'Joe_012345678901234567890912345678901234567890123456789001234567890123456789','Blow'},
+                       {3,'Jane','Smith'}],Layout_Person);
+
+manyPeople := DATASET(nodes * 100,
+              TRANSFORM({Layout_Person},
+                         SELF.PersonID := COUNTER;
+                         SELF.FirstName := allPeople[(COUNTER-1) % 3 + 1].FirstName;
+                         SELF.LastName := allPeople[(COUNTER-1) % 3 + 1].LastName;
+                        )
+              ,DISTRIBUTED
+              );
+
+#if (isSmallFile)
+    somePeople := if (nodes = 1,
+                        allPeople(LastName = 'Smith'),
+                    #if (isUnBallanced)
+                         unBallanced
+                    #else
+                        allPeople(LastName = 'Blow')
+                    #end
+                    );
+#else
+    somePeople := manyPeople;
+#end
+
+SrcAddrIp := '.';
+File := 'persons';
+OriginalDataFile := prefix + File + suffix;
+
+//  Outputs  ---
+setupPeople := OUTPUT(somePeople,,OriginalDataFile, JSON, OVERWRITE);
+
+
+ClusterName := 'mythor';
+
+desprayRec := RECORD
+   string sourceFile;
+   string destFile;
+   string ip;
+   boolean allowOverwrite;
+   string result;
+   string msg;
+ end;
+
+desprayRec doDespray(desprayRec l) := TRANSFORM
+   SELF.sourceFile := l.sourceFile;
+   SELF.msg := FileServices.fDespray(l.sourceFile
+                                          ,l.ip
+                                          ,destinationPath := l.destFile
+                                          ,ALLOWOVERWRITE := l.allowOverwrite
+                                          );
+   SELF.result := 'Despray Pass';
+   SELF.ip := l.ip;
+   SELF.allowOverwrite := l.allowOverwrite;
+   SELF.destFile := l.destFile;
+ end;
+
+// This should be fine based on valid target file path and SrcAddIp
+DesprayTargetFile1 := dropzonePath + File + suffix;
+dst2 := NOFOLD(DATASET([{OriginalDataFile, DesprayTargetFile1, SrcAddrIp, True, '', ''}], desprayRec));
+
+p2 := NOTHOR(PROJECT(NOFOLD(dst2), doDespray(LEFT)));
+
+c2 := CATCH(NOFOLD(p2), ONFAIL(TRANSFORM(desprayRec,
+                                  SELF.sourceFile := OriginalDataFile,
+                                  SELF.destFile := DesprayTargetFile1,
+                                  SELF.ip := SrcAddrIp,
+                                  SELF.allowOverwrite := True,
+                                  SELF.result := 'Fail',
+                                  SELF.msg := FAILMESSAGE
+                                 )));
+#if (VERBOSE = 1)
+     despray := output(c2);
+#else
+     despray := output(c2, {result});
+#end
+
+
+sprayRec := RECORD
+  string result;
+  string msg;
+end;
+
+SprayTargetFileName := prefix + 'spray_test-' + suffix;
+
+//To spray a JSON file we use XML Spray
+sprayRec doSpray(sprayRec l) := TRANSFORM
+    SELF.msg := FileServices.fSprayXml(
+                                SOURCEIP := '.',
+                                SOURCEPATH := DesprayTargetFile1,
+                                SOURCEROWTAG := 'Row',
+                                DESTINATIONGROUP := 'my'+engine,
+                                DESTINATIONLOGICALNAME := SprayTargetFileName,
+                                TIMEOUT := -1,
+                                ESPSERVERIPPORT := 'http://127.0.0.1:8010/FileSpray',
+                                ALLOWOVERWRITE := true
+                                );
+    self.result := 'Spray Pass';
+end;
+
+
+dst3 := NOFOLD(DATASET([{'', ''}], sprayRec));
+
+p3 := NOTHOR(PROJECT(NOFOLD(dst3), doSpray(LEFT)));
+c3 := CATCH(NOFOLD(p3), ONFAIL(TRANSFORM(sprayRec,
+                                  SELF.result := 'Spray Fail',
+                                  SELF.msg := FAILMESSAGE
+                                 )));
+#if (VERBOSE = 1)
+    spray := output(c3);
+#else
+    spray := output(c3, {result});
+#end
+
+
+ds := DATASET(SprayTargetFileName, Layout_Person, JSON('Row'));
+
+string compareDatasets(dataset(Layout_Person) ds1, dataset(Layout_Person) ds2) := FUNCTION
+   boolean result := (0 = COUNT(JOIN(ds1, ds2, left.PersonID=right.PersonID, FULL ONLY)));
+   RETURN if(result, 'Compare Pass', 'Fail');
+END;
+
+SEQUENTIAL(
+
+#if (VERBOSE = 1)
+    output(isSmallFile, NAMED('isSmallFile'));
+    output(isUnBallanced, NAMED('isUnBallanced'));
+    output(somePeople, NAMED('somePeople')),
+#end
+
+    setupPeople,
+    despray,
+    spray,
+
+#if (VERBOSE = 1)
+    output(ds, NAMED('ds')),
+#end
+
+    output(compareDatasets(somePeople,ds)),
+
+    // Clean-up
+    FileServices.DeleteLogicalFile(OriginalDataFile),
+    FileServices.DeleteLogicalFile(SprayTargetFileName),
+    FileServices.DeleteExternalFile('.', DesprayTargetFile1),
+
+);