|
@@ -41,6 +41,7 @@
|
|
|
#include "dasds.hpp"
|
|
|
#include "jlog.hpp"
|
|
|
#include "dalienv.hpp"
|
|
|
+#include "ftbase.ipp"
|
|
|
|
|
|
#define DEFAULT_MAX_CONNECTIONS 25
|
|
|
#define PARTITION_RECOVERY_LIMIT 1000
|
|
@@ -1054,7 +1055,7 @@ void FileSprayer::calculateMany2OnePartition()
|
|
|
setCanAccessDirectly(curFilename);
|
|
|
if (partSeparator)
|
|
|
{
|
|
|
- offset_t contentLength = cur.size - cur.xmlHeaderLength - cur.xmlFooterLength;
|
|
|
+ offset_t contentLength = (cur.size > cur.xmlHeaderLength + cur.xmlFooterLength ? cur.size - cur.xmlHeaderLength - cur.xmlFooterLength : 0);
|
|
|
if (contentLength)
|
|
|
{
|
|
|
if (lastContentLength)
|
|
@@ -1449,7 +1450,7 @@ void FileSprayer::analyseFileHeaders(bool setcurheadersize)
|
|
|
}
|
|
|
io.setown(createCompressedFileReader(io,expander));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (defaultFormat != FFTunknown)
|
|
|
{
|
|
|
FileFormatType thisType;
|
|
@@ -1501,15 +1502,43 @@ void FileSprayer::analyseFileHeaders(bool setcurheadersize)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- if (srcFormat.headerLength == (unsigned)-1 || srcFormat.footerLength == (unsigned)-1)
|
|
|
- locateContentHeader(io, cur.headerSize, cur.xmlHeaderLength, cur.xmlFooterLength);
|
|
|
+ if (distributedSource)
|
|
|
+ {
|
|
|
+ // Despray from distributed file
|
|
|
+
|
|
|
+ // Check XMLheader/footer in file level
|
|
|
+ DistributedFilePropertyLock lock(distributedSource);
|
|
|
+ IPropertyTree &curProps = lock.queryAttributes();
|
|
|
+ if (curProps.hasProp(FPheaderLength) && curProps.hasProp(FPfooterLength))
|
|
|
+ {
|
|
|
+ cur.xmlHeaderLength = curProps.getPropInt(FPheaderLength, 0);
|
|
|
+ cur.xmlFooterLength = curProps.getPropInt(FPfooterLength, 0);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // Try it in file part level
|
|
|
+ Owned<IDistributedFilePart> curPart = distributedSource->getPart(idx);
|
|
|
+ IPropertyTree& curPartProps = curPart->queryAttributes();
|
|
|
+ cur.xmlHeaderLength = curPartProps.getPropInt(FPheaderLength, 0);
|
|
|
+ cur.xmlFooterLength = curPartProps.getPropInt(FPfooterLength, 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
else
|
|
|
{
|
|
|
- cur.xmlHeaderLength = srcFormat.headerLength;
|
|
|
- cur.xmlFooterLength = srcFormat.footerLength;
|
|
|
+ // Spray from file
|
|
|
+ if (srcFormat.headerLength == (unsigned)-1 || srcFormat.footerLength == (unsigned)-1)
|
|
|
+ locateContentHeader(io, cur.headerSize, cur.xmlHeaderLength, cur.xmlFooterLength);
|
|
|
+ else
|
|
|
+ {
|
|
|
+ cur.xmlHeaderLength = srcFormat.headerLength;
|
|
|
+ cur.xmlFooterLength = srcFormat.footerLength;
|
|
|
+ }
|
|
|
}
|
|
|
cur.headerSize += (unsigned)cur.xmlHeaderLength;
|
|
|
- cur.size -= (cur.xmlHeaderLength + cur.xmlFooterLength);
|
|
|
+ if (cur.size >= cur.xmlHeaderLength + cur.xmlFooterLength)
|
|
|
+ cur.size -= (cur.xmlHeaderLength + cur.xmlFooterLength);
|
|
|
+ else
|
|
|
+ throwError3(DFTERR_InvalidXmlPartSize, cur.size, cur.xmlHeaderLength, cur.xmlFooterLength);
|
|
|
}
|
|
|
catch (IException * e)
|
|
|
{
|
|
@@ -1557,12 +1586,38 @@ void FileSprayer::locateXmlHeader(IFileIO * io, unsigned headerSize, offset_t &
|
|
|
|
|
|
reader.set(in);
|
|
|
reader.seek(headerSize);
|
|
|
- xmlHeaderLength = splitter.getHeaderLength(reader);
|
|
|
+ if (xmlHeaderLength == 0)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ xmlHeaderLength = splitter.getHeaderLength(reader);
|
|
|
+ }
|
|
|
+ catch (IException * e)
|
|
|
+ {
|
|
|
+ if (e->errorCode() != DFTERR_CannotFindFirstXmlRecord)
|
|
|
+ throw;
|
|
|
+ e->Release();
|
|
|
+ xmlHeaderLength = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
offset_t size = io->size();
|
|
|
offset_t endOffset = (size > srcFormat.maxRecordSize*2 + headerSize) ? size - srcFormat.maxRecordSize*2 : headerSize;
|
|
|
reader.seek(endOffset);
|
|
|
- xmlFooterLength = splitter.getFooterLength(reader, size);
|
|
|
+ if (xmlFooterLength == 0)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ xmlFooterLength = splitter.getFooterLength(reader, size);
|
|
|
+ }
|
|
|
+ catch (IException * e)
|
|
|
+ {
|
|
|
+ if (e->errorCode() != DFTERR_CannotFindLastXmlRecord)
|
|
|
+ throw;
|
|
|
+ e->Release();
|
|
|
+ xmlFooterLength= 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void FileSprayer::locateJsonHeader(IFileIO * io, unsigned headerSize, offset_t & headerLength, offset_t & footerLength)
|
|
@@ -1943,8 +1998,10 @@ void FileSprayer::cloneHeaderFooter(unsigned idx, bool isHeader)
|
|
|
PartitionPoint & next = * new PartitionPoint;
|
|
|
//NB: headerSize include the size of the xmlHeader; size includes neither header or footers.
|
|
|
if (isHeader)
|
|
|
+ // Set offset to the XML header
|
|
|
next.inputOffset = curSrc.headerSize - curSrc.xmlHeaderLength;
|
|
|
else
|
|
|
+ //Set offset to the XML footer
|
|
|
next.inputOffset = curSrc.headerSize + curSrc.size;
|
|
|
next.inputLength = isHeader ? curSrc.xmlHeaderLength : curSrc.xmlFooterLength;
|
|
|
next.outputLength = needToCalcOutput() ? next.inputLength : 0;
|
|
@@ -2837,6 +2894,46 @@ void FileSprayer::spray()
|
|
|
cleanupRecovery();
|
|
|
}
|
|
|
|
|
|
+bool FileSprayer::isSameSizeHeaderFooter()
|
|
|
+{
|
|
|
+ bool retVal = true;
|
|
|
+ unsigned whichHeaderInput = 0;
|
|
|
+ bool isEmpty = true;
|
|
|
+ headerSize = 0;
|
|
|
+ footerSize = 0;
|
|
|
+
|
|
|
+ ForEachItemIn(idx, partition)
|
|
|
+ {
|
|
|
+ PartitionPoint & cur = partition.item(idx);
|
|
|
+ if (idx+1 == partition.ordinality() || partition.item(idx+1).whichOutput != cur.whichOutput)
|
|
|
+ {
|
|
|
+ if (isEmpty)
|
|
|
+ {
|
|
|
+ headerSize = sources.item(whichHeaderInput).xmlHeaderLength;
|
|
|
+ footerSize = sources.item(cur.whichInput).xmlFooterLength;
|
|
|
+ isEmpty = false;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (headerSize != sources.item(whichHeaderInput).xmlHeaderLength)
|
|
|
+ {
|
|
|
+ retVal = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (footerSize != sources.item(cur.whichInput).xmlFooterLength)
|
|
|
+ {
|
|
|
+ retVal = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( idx+1 != partition.ordinality() )
|
|
|
+ whichHeaderInput = partition.item(idx+1).whichInput;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ return retVal;
|
|
|
+}
|
|
|
|
|
|
void FileSprayer::updateTargetProperties()
|
|
|
{
|
|
@@ -2849,6 +2946,8 @@ void FileSprayer::updateTargetProperties()
|
|
|
CRC32Merger totalCRC;
|
|
|
offset_t totalLength = 0;
|
|
|
offset_t totalCompressedSize = 0;
|
|
|
+ unsigned whichHeaderInput = 0;
|
|
|
+ bool sameSizeHeaderFooter = isSameSizeHeaderFooter();
|
|
|
ForEachItemIn(idx, partition)
|
|
|
{
|
|
|
PartitionPoint & cur = partition.item(idx);
|
|
@@ -2873,6 +2972,20 @@ void FileSprayer::updateTargetProperties()
|
|
|
// TODO: Create DistributedFilePropertyLock for parts
|
|
|
curPart->lockProperties();
|
|
|
IPropertyTree& curProps = curPart->queryAttributes();
|
|
|
+
|
|
|
+ if (!sameSizeHeaderFooter)
|
|
|
+ {
|
|
|
+ FilePartInfo & curHeaderSource = sources.item(whichHeaderInput);
|
|
|
+ curProps.setPropInt(FPheaderLength, curHeaderSource.xmlHeaderLength);
|
|
|
+
|
|
|
+ FilePartInfo & curFooterSource = sources.item(cur.whichInput);
|
|
|
+ curProps.setPropInt(FPfooterLength, curFooterSource.xmlFooterLength);
|
|
|
+
|
|
|
+ if ( idx+1 != partition.ordinality() )
|
|
|
+ whichHeaderInput = partition.item(idx+1).whichInput;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
if (calcCRC())
|
|
|
{
|
|
|
curProps.setPropInt(FAcrc, partCRC.get());
|
|
@@ -2967,6 +3080,13 @@ void FileSprayer::updateTargetProperties()
|
|
|
curProps.setPropInt64(FArecordCount,totalLength/(offset_t)rs);
|
|
|
gotrc = true;
|
|
|
}
|
|
|
+
|
|
|
+ if (sameSizeHeaderFooter)
|
|
|
+ {
|
|
|
+ curProps.setPropInt(FPheaderLength, headerSize);
|
|
|
+ curProps.setPropInt(FPfooterLength, footerSize);
|
|
|
+ }
|
|
|
+
|
|
|
if (srcAttr.get() && !mirroring) {
|
|
|
StringBuffer s;
|
|
|
// copy some attributes (do as iterator in case we want to change to *exclude* some
|