|
@@ -1098,6 +1098,7 @@ public:
|
|
|
|
|
|
protected:
|
|
|
IHThorCsvReadArg *helper;
|
|
|
+ Owned<IDirectReader> ownedReader; // Ensure that the byte stream reader is released
|
|
|
const IResolvedFile *datafile;
|
|
|
size32_t maxRowSize;
|
|
|
|
|
@@ -1119,9 +1120,12 @@ public:
|
|
|
{
|
|
|
{
|
|
|
CriticalBlock p(pcrit);
|
|
|
+ //Processor should be clear when this is called, but include inside the critical block to be safe
|
|
|
+ processor.clear();
|
|
|
+ ownedReader.setown(manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators));
|
|
|
processor.setown(
|
|
|
createCsvRecordProcessor(*this,
|
|
|
- manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators),
|
|
|
+ ownedReader,
|
|
|
packet->queryHeader().channel==1 && !resent,
|
|
|
varFileInfo ? varFileInfo.get() : datafile, maxRowSize));
|
|
|
}
|
|
@@ -1145,6 +1149,7 @@ public:
|
|
|
|
|
|
protected:
|
|
|
IHThorXmlReadArg *helper;
|
|
|
+ Owned<IDirectReader> ownedReader; // Ensure that the byte stream reader is released
|
|
|
|
|
|
public:
|
|
|
CRoxieXmlReadActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CSlaveActivityFactory *_aFactory,
|
|
@@ -1164,7 +1169,9 @@ public:
|
|
|
{
|
|
|
{
|
|
|
CriticalBlock p(pcrit);
|
|
|
- processor.setown(createXmlRecordProcessor(*this, manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators)));
|
|
|
+ processor.clear();
|
|
|
+ ownedReader.setown(manager->createReader(postFilter, false, readPos, parallelPartNo, numParallel, translators));
|
|
|
+ processor.setown(createXmlRecordProcessor(*this, ownedReader));
|
|
|
}
|
|
|
unsigned __int64 rowLimit = helper->getRowLimit();
|
|
|
unsigned __int64 stopAfter = helper->getChooseNLimit();
|