Browse Source

Merge pull request #7602 from afishbeck/jsonMultiSpray

HPCC-13975 Support spraying  multiple small JSON files to one logical

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 năm trước cách đây
mục cha
commit
39b62f5826
2 tập tin đã thay đổi với 21 bổ sung3 xóa
  1. 17 0
      dali/ft/daftformat.cpp
  2. 4 3
      dali/ft/daftformat.ipp

+ 17 - 0
dali/ft/daftformat.cpp

@@ -78,18 +78,34 @@ void CPartitioner::commonCalcPartitions()
     const offset_t endOffset = thisOffset + thisSize;
     unsigned firstSplit;
     unsigned lastSplit;
+    bool appendingContent=false;
     if (partSize)
     {
         firstSplit = (unsigned)((thisOffset + partSize-1)/partSize);
         lastSplit = (unsigned)((endOffset-1)/partSize);
+        appendingContent=(thisOffset % partSize)!=0;
     }
     else
     {
         firstSplit = (unsigned)((thisOffset*numParts)/totalSize);
         lastSplit = (unsigned)(((endOffset-1)*numParts)/totalSize);
+        appendingContent=((thisOffset*numParts) % totalSize)!=0;
     }
     if (endOffset == totalSize) lastSplit = numParts-1;
     if (lastSplit >= numParts) lastSplit = numParts-1;                                      // very rare with variable length records, last file is very small or copying a couple of records 50 ways.
+
+    if (!partSeparator.isEmpty() && appendingContent) //appending to existing content, add a separator if necessary
+    {
+        Owned<PartitionPoint> separator = new PartitionPoint;
+        separator->inputOffset = 0;
+        separator->inputLength = partSeparator.length();
+        separator->outputLength = partSeparator.length();
+        separator->fixedText.set(partSeparator.length(), partSeparator.get());
+        separator->whichInput = whichInput;
+        separator->whichOutput = firstSplit-1;
+        results.append(*separator.getClear());
+    }
+
     offset_t startInputOffset = thisOffset;
     offset_t startOutputOffset = 0;
 
@@ -1586,6 +1602,7 @@ CJsonInputPartitioner::CJsonInputPartitioner(const FileFormat & _format)
         openfilecache = createFileIOCache(16);
     else
         openfilecache->Link();
+    partSeparator.set(",\n");
 }
 
 IFileIOCache *CJsonInputPartitioner::openfilecache = NULL;

+ 4 - 3
dali/ft/daftformat.ipp

@@ -55,6 +55,7 @@ protected:
     unsigned                    whichInput;
     RemoteFilename              inputName;
     StringAttr                  fullPath;
+    StringAttr                  partSeparator;
     Linked<IOutputProcessor>    target;
 
     offset_t                    totalSize;
@@ -538,7 +539,7 @@ protected:
             return;
 
         offset_t prevRowEnd;
-        json->findRowEnd(splitOffset, prevRowEnd);
+        json->findRowEnd(splitOffset-thisOffset + thisHeaderSize, prevRowEnd);
         if (!json->rowStart)
             return;
         if (!json->newRowSet) //get rid of extra delimiter if we haven't closed and reopened in the meantime
@@ -547,9 +548,9 @@ protected:
             if (cursor.trimLength && json->isRootless()) //compensate for difference in rootless offset
                 cursor.trimLength--;
         }
-        cursor.inputOffset = json->getRowOffset();
+        cursor.inputOffset = json->getRowOffset() + thisOffset;
         if (json->findNextRow())
-            cursor.nextInputOffset = json->getRowOffset();
+            cursor.nextInputOffset = json->getRowOffset() + thisOffset;
         else
             cursor.nextInputOffset = cursor.inputOffset;  //eof
     }