|
@@ -68,7 +68,7 @@ private:
|
|
|
Owned<IThorRowLoader> iLoader = createThorRowLoader(*this, ::queryRowInterfaces(input), compare, isUnstable() ? stableSort_none : stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_SELFJOIN);
|
|
|
Owned<IRowStream> rs = iLoader->load(inputStream, abortSoon);
|
|
|
mergeStats(spillStats, iLoader); // Not sure of the best policy if rs spills later on.
|
|
|
- PARENT::stop();
|
|
|
+ PARENT::stopInput(0);
|
|
|
return rs.getClear();
|
|
|
}
|
|
|
|
|
@@ -78,7 +78,7 @@ private:
|
|
|
ActPrintLog("SELFJOIN: Performing global self-join");
|
|
|
#endif
|
|
|
sorter->Gather(::queryRowInterfaces(input), inputStream, compare, NULL, NULL, keyserializer, NULL, NULL, false, isUnstable(), abortSoon, NULL);
|
|
|
- PARENT::stop();
|
|
|
+ PARENT::stopInput(0);
|
|
|
if (abortSoon)
|
|
|
{
|
|
|
barrier->cancel();
|
|
@@ -174,7 +174,7 @@ public:
|
|
|
{
|
|
|
strm.setown(isLocal ? doLocalSelfJoin() : doGlobalSelfJoin());
|
|
|
assertex(strm);
|
|
|
- // NB: PARENT::stop() will now have been called
|
|
|
+ // NB: PARENT::stopInput(0) will now have been called
|
|
|
}
|
|
|
|
|
|
joinhelper->init(strm, NULL, ::queryRowAllocator(queryInput(0)), ::queryRowAllocator(queryInput(0)), ::queryRowMetaData(queryInput(0)));
|
|
@@ -199,14 +199,13 @@ public:
|
|
|
CriticalBlock b(joinHelperCrit);
|
|
|
joinhelper.clear();
|
|
|
}
|
|
|
- if (isLightweight)
|
|
|
- PARENT::stop();
|
|
|
- else if (strm) // if !isLightWeight, PARENT::stop() will have been called in start()
|
|
|
+ if (strm)
|
|
|
+ {
|
|
|
strm->stop();
|
|
|
- strm.clear();
|
|
|
+ strm.clear();
|
|
|
+ }
|
|
|
}
|
|
|
- else
|
|
|
- PARENT::stop();
|
|
|
+ PARENT::stop();
|
|
|
}
|
|
|
|
|
|
CATCH_NEXTROW()
|