|
@@ -113,7 +113,7 @@ void CPartitioner::commonCalcPartitions()
|
|
|
//Don't add an empty block on the start of the this chunk to transfer.
|
|
|
if ((split != firstSplit) || (inputOffset != startInputOffset))
|
|
|
{
|
|
|
- results.append(*new PartitionPoint(whichInput, split-1, startInputOffset-thisOffset+thisHeaderSize, inputOffset - startInputOffset, cursor.outputOffset-startOutputOffset));
|
|
|
+ results.append(*new PartitionPoint(whichInput, split-1, startInputOffset-thisOffset+thisHeaderSize, inputOffset - startInputOffset - cursor.trimLength, cursor.outputOffset-startOutputOffset));
|
|
|
startInputOffset = inputOffset;
|
|
|
startOutputOffset = cursor.outputOffset;
|
|
|
}
|
|
@@ -1577,6 +1577,64 @@ offset_t XmlSplitter::getFooterLength(BufferedDirectReader & reader, offset_t si
|
|
|
}
|
|
|
|
|
|
|
|
|
+CJsonInputPartitioner::CJsonInputPartitioner(const FileFormat & _format)
|
|
|
+{
|
|
|
+ format.set(_format);
|
|
|
+ CriticalBlock block(openfilecachesect);
|
|
|
+ if (!openfilecache)
|
|
|
+ openfilecache = createFileIOCache(16);
|
|
|
+ else
|
|
|
+ openfilecache->Link();
|
|
|
+}
|
|
|
+
|
|
|
+IFileIOCache *CJsonInputPartitioner::openfilecache = NULL;
|
|
|
+CriticalSection CJsonInputPartitioner::openfilecachesect;
|
|
|
+
|
|
|
+CJsonInputPartitioner::~CJsonInputPartitioner()
|
|
|
+{
|
|
|
+ json.clear();
|
|
|
+ inStream.clear();
|
|
|
+ if (openfilecache) {
|
|
|
+ CriticalBlock block(openfilecachesect);
|
|
|
+ if (openfilecache->Release())
|
|
|
+ openfilecache = NULL;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void CJsonInputPartitioner::setSource(unsigned _whichInput, const RemoteFilename & _fullPath, bool _compressedInput, const char *_decryptKey)
|
|
|
+{
|
|
|
+ CPartitioner::setSource(_whichInput, _fullPath, _compressedInput,_decryptKey);
|
|
|
+ Owned<IFileIO> inIO;
|
|
|
+ Owned<IFile> inFile = createIFile(inputName);
|
|
|
+ if (!inFile->exists()) {
|
|
|
+ StringBuffer tmp;
|
|
|
+ inputName.getRemotePath(tmp);
|
|
|
+ throwError1(DFTERR_CouldNotOpenFilePart, tmp.str());
|
|
|
+ }
|
|
|
+ inIO.setown(openfilecache->addFile(inputName,IFOread));
|
|
|
+
|
|
|
+ if (_compressedInput) {
|
|
|
+ Owned<IExpander> expander;
|
|
|
+ if (_decryptKey&&*_decryptKey) {
|
|
|
+ StringBuffer key;
|
|
|
+ decrypt(key,_decryptKey);
|
|
|
+ expander.setown(createAESExpander256(key.length(),key.str()));
|
|
|
+ }
|
|
|
+ inIO.setown(createCompressedFileReader(inIO,expander));
|
|
|
+ }
|
|
|
+
|
|
|
+ inStream.setown(createIOStream(inIO));
|
|
|
+ json.setown(new JsonSplitter(format, *inStream));
|
|
|
+ json->getHeaderLength();
|
|
|
+}
|
|
|
+
|
|
|
+CJsonPartitioner::CJsonPartitioner(const FileFormat & _format) : CJsonInputPartitioner(_format)
|
|
|
+{
|
|
|
+ unitSize = format.getUnitSize();
|
|
|
+ utfFormat = getUtfFormatType(format.type);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
CXmlPartitioner::CXmlPartitioner(const FileFormat & _format) : CInputBasePartitioner(_format.maxRecordSize, _format.maxRecordSize), splitter(_format)
|
|
|
{
|
|
|
LOG(MCdebugProgressDetail, unknownJob, "CXmlPartitioner::CXmlPartitioner(_format.type :'%s', unitSize:%d)", _format.getFileFormatTypeString(), format.getUnitSize());
|
|
@@ -2128,13 +2186,15 @@ IFormatProcessor * createFormatProcessor(const FileFormat & srcFormat, const Fil
|
|
|
partitioner = new CCsvQuickPartitioner(srcFormat, sameFormats);
|
|
|
break;
|
|
|
case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
|
|
|
- if (srcFormat.rowTag)
|
|
|
+ if (srcFormat.hasXmlMarkup())
|
|
|
{
|
|
|
if (calcOutput && !sameFormats)
|
|
|
partitioner = new CXmlPartitioner(srcFormat);
|
|
|
else
|
|
|
partitioner = new CXmlQuickPartitioner(srcFormat, sameFormats);
|
|
|
}
|
|
|
+ else if (srcFormat.hasJsonMarkup())
|
|
|
+ partitioner = new CJsonPartitioner(srcFormat);
|
|
|
else
|
|
|
{
|
|
|
if (calcOutput && !sameFormats)
|
|
@@ -2190,15 +2250,13 @@ IFormatPartitioner * createFormatPartitioner(const SocketEndpoint & ep, const Fi
|
|
|
return new CCsvQuickPartitioner(srcFormat, sameFormats);
|
|
|
break;
|
|
|
case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
|
|
|
- if (srcFormat.rowTag)
|
|
|
+ if (srcFormat.hasXmlMarkup())
|
|
|
return new CXmlQuickPartitioner(srcFormat, sameFormats);
|
|
|
- else
|
|
|
- {
|
|
|
- if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
|
|
|
- return new CUtfPartitioner(srcFormat);
|
|
|
- else
|
|
|
- return new CUtfQuickPartitioner(srcFormat, sameFormats);
|
|
|
- }
|
|
|
+ if (srcFormat.hasJsonMarkup())
|
|
|
+ return new CJsonPartitioner(srcFormat);
|
|
|
+ if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
|
|
|
+ return new CUtfPartitioner(srcFormat);
|
|
|
+ return new CUtfQuickPartitioner(srcFormat, sameFormats);
|
|
|
}
|
|
|
}
|
|
|
if (!calcOutput)
|
|
@@ -2220,10 +2278,11 @@ IFormatPartitioner * createFormatPartitioner(const SocketEndpoint & ep, const Fi
|
|
|
case FFTcsv:
|
|
|
return new CCsvQuickPartitioner(srcFormat, sameFormats);
|
|
|
case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
|
|
|
- if (srcFormat.rowTag)
|
|
|
+ if (srcFormat.hasXmlMarkup())
|
|
|
return new CXmlQuickPartitioner(srcFormat, sameFormats);
|
|
|
- else
|
|
|
- return new CUtfQuickPartitioner(srcFormat, sameFormats);
|
|
|
+ if (srcFormat.hasJsonMarkup())
|
|
|
+ return new CJsonPartitioner(srcFormat);
|
|
|
+ return new CUtfQuickPartitioner(srcFormat, sameFormats);
|
|
|
default:
|
|
|
throwError(DFTERR_UnknownFileFormatType);
|
|
|
break;
|