|
@@ -1022,7 +1022,7 @@ class CRoxieCsvReadActivity;
|
|
|
class CRoxieXmlReadActivity;
|
|
|
IInMemoryFileProcessor *createKeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool resent);
|
|
|
IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, IDirectReader *reader);
|
|
|
-IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *reader, bool _skipHeader, const IResolvedFile *datafile);
|
|
|
+IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize);
|
|
|
IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *reader);
|
|
|
|
|
|
class CRoxieDiskReadActivity : public CRoxieDiskReadBaseActivity
|
|
@@ -1075,11 +1075,12 @@ public:
|
|
|
protected:
|
|
|
IHThorCsvReadArg *helper;
|
|
|
const IResolvedFile *datafile;
|
|
|
+ size32_t maxRowSize;
|
|
|
|
|
|
public:
|
|
|
- CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory,
|
|
|
- const CSlaveActivityFactory *_aFactory, IInMemoryIndexManager *_manager, const IResolvedFile *_datafile)
|
|
|
- : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true), datafile(_datafile)
|
|
|
+ CRoxieCsvReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
|
+ IInMemoryIndexManager *_manager, const IResolvedFile *_datafile, size32_t _maxRowSize)
|
|
|
+ : CRoxieDiskReadBaseActivity(_logctx, _packet, _hFactory, _aFactory, _manager, 0, 1, true), datafile(_datafile), maxRowSize(_maxRowSize)
|
|
|
{
|
|
|
onCreate();
|
|
|
helper = (IHThorCsvReadArg *) basehelper;
|
|
@@ -1098,7 +1099,7 @@ public:
|
|
|
createCsvRecordProcessor(*this,
|
|
|
manager->createReader(readPos, parallelPartNo, numParallel),
|
|
|
packet->queryHeader().channel==1 && !resent,
|
|
|
- varFileInfo ? varFileInfo.get() : datafile));
|
|
|
+ varFileInfo ? varFileInfo.get() : datafile, maxRowSize));
|
|
|
}
|
|
|
unsigned __int64 rowLimit = helper->getRowLimit();
|
|
|
unsigned __int64 stopAfter = helper->getChooseNLimit();
|
|
@@ -1178,17 +1179,23 @@ public:
|
|
|
|
|
|
class CRoxieCsvReadActivityFactory : public CRoxieDiskBaseActivityFactory
|
|
|
{
|
|
|
+ size32_t maxRowSize;
|
|
|
+
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
CRoxieCsvReadActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
|
|
|
: CRoxieDiskBaseActivityFactory(_graphNode, _subgraphId, _queryFactory, _helperFactory)
|
|
|
{
|
|
|
+ maxRowSize = defaultDaliResultLimit * 1024 * 1024;
|
|
|
+ IConstWorkUnit *workunit = _queryFactory.queryWorkUnit();
|
|
|
+ if (workunit)
|
|
|
+ maxRowSize = workunit->getDebugValueInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
|
|
|
}
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, datafile);
|
|
|
+ return new CRoxieCsvReadActivity(logctx, packet, helperFactory, this, manager, datafile, maxRowSize);
|
|
|
}
|
|
|
|
|
|
virtual StringBuffer &toString(StringBuffer &s) const
|
|
@@ -1496,12 +1503,13 @@ protected:
|
|
|
Owned<IDirectReader> reader;
|
|
|
bool skipHeader;
|
|
|
const IResolvedFile *datafile;
|
|
|
+ size32_t maxRowSize;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
- CsvRecordProcessor(CRoxieCsvReadActivity &_owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *_datafile)
|
|
|
- : RecordProcessor(NULL), owner(_owner), reader(_reader), datafile(_datafile)
|
|
|
+ CsvRecordProcessor(CRoxieCsvReadActivity &_owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *_datafile, size32_t _maxRowSize)
|
|
|
+ : RecordProcessor(NULL), owner(_owner), reader(_reader), datafile(_datafile), maxRowSize(_maxRowSize)
|
|
|
{
|
|
|
helper = _owner.helper;
|
|
|
skipHeader = _skipHeader;
|
|
@@ -1538,7 +1546,6 @@ public:
|
|
|
break;
|
|
|
}
|
|
|
size32_t rowSize = 4096; // MORE - make configurable
|
|
|
- size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
|
|
|
size32_t thisLineLength;
|
|
|
loop
|
|
|
{
|
|
@@ -1548,7 +1555,7 @@ public:
|
|
|
if (thisLineLength < rowSize || avail < rowSize)
|
|
|
break;
|
|
|
if (rowSize == maxRowSize)
|
|
|
- throw MakeStringException(0, "Row too big");
|
|
|
+ throw MakeStringException(0, "File contained a line of length greater than %d bytes.", maxRowSize);
|
|
|
if (rowSize >= maxRowSize/2)
|
|
|
rowSize = maxRowSize;
|
|
|
else
|
|
@@ -1677,9 +1684,9 @@ protected:
|
|
|
Owned<IDirectReader> reader;
|
|
|
};
|
|
|
|
|
|
-IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile)
|
|
|
+IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize)
|
|
|
{
|
|
|
- return new CsvRecordProcessor(owner, _reader, _skipHeader, datafile);
|
|
|
+ return new CsvRecordProcessor(owner, _reader, _skipHeader, datafile, maxRowSize);
|
|
|
}
|
|
|
|
|
|
IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *_reader)
|
|
@@ -4428,11 +4435,12 @@ IRoxieSlaveActivity *CRoxieFetchActivityFactory::createActivity(SlaveContextLogg
|
|
|
|
|
|
class CRoxieCSVFetchActivity : public CRoxieFetchActivityBase
|
|
|
{
|
|
|
- CSVSplitter csvSplitter;
|
|
|
+ CSVSplitter csvSplitter;
|
|
|
+ size32_t maxRowSize;
|
|
|
|
|
|
public:
|
|
|
- CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _maxColumns)
|
|
|
- : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
|
|
|
+ CRoxieCSVFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory, unsigned _maxColumns, size32_t _maxRowSize)
|
|
|
+ : CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory), maxRowSize(_maxRowSize)
|
|
|
{
|
|
|
const char * quotes = NULL;
|
|
|
const char * separators = NULL;
|
|
@@ -4462,7 +4470,6 @@ public:
|
|
|
IHThorCsvFetchArg *h = (IHThorCsvFetchArg *) helper;
|
|
|
rawStream->reset(pos);
|
|
|
size32_t rowSize = 4096; // MORE - make configurable
|
|
|
- size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
|
|
|
loop
|
|
|
{
|
|
|
size32_t avail;
|
|
@@ -4470,7 +4477,7 @@ public:
|
|
|
if (csvSplitter.splitLine(avail, (const byte *)peek) < rowSize || avail < rowSize)
|
|
|
break;
|
|
|
if (rowSize == maxRowSize)
|
|
|
- throw MakeStringException(0, "Row too big");
|
|
|
+ throw MakeStringException(0, "File contained a line of length greater than %d bytes.", maxRowSize);
|
|
|
if (rowSize >= maxRowSize/2)
|
|
|
rowSize = maxRowSize;
|
|
|
else
|
|
@@ -4541,6 +4548,7 @@ void CRoxieFetchActivityBase::setPartNo(bool filechanged)
|
|
|
class CRoxieCSVFetchActivityFactory : public CRoxieFetchActivityFactory
|
|
|
{
|
|
|
unsigned maxColumns;
|
|
|
+ size32_t maxRowSize;
|
|
|
|
|
|
public:
|
|
|
CRoxieCSVFetchActivityFactory(IPropertyTree &_graphNode, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory)
|
|
@@ -4550,11 +4558,15 @@ public:
|
|
|
maxColumns = helper->getMaxColumns();
|
|
|
ICsvParameters *csvInfo = helper->queryCsvParameters();
|
|
|
assertex(!csvInfo->queryEBCDIC());
|
|
|
+ maxRowSize = defaultDaliResultLimit * 1024 * 1024;
|
|
|
+ IConstWorkUnit *workunit = _queryFactory.queryWorkUnit();
|
|
|
+ if (workunit)
|
|
|
+ maxRowSize = workunit->getDebugValueInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
|
|
|
}
|
|
|
|
|
|
virtual IRoxieSlaveActivity *createActivity(SlaveContextLogger &logctx, IRoxieQueryPacket *packet) const
|
|
|
{
|
|
|
- return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, maxColumns);
|
|
|
+ return new CRoxieCSVFetchActivity(logctx, packet, helperFactory, this, maxColumns, maxRowSize);
|
|
|
}
|
|
|
};
|
|
|
|