|
@@ -28,6 +28,7 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
|
|
|
bool createDefaultIfFail;
|
|
|
IHThorSelectNArg *helper;
|
|
|
SpinLock spin; // MORE: Remove this and use an atomic variable for lookaheadN
|
|
|
+ Owned<IEngineRowStream> originalInputStream;
|
|
|
|
|
|
void initN()
|
|
|
{
|
|
@@ -72,19 +73,12 @@ public:
|
|
|
}
|
|
|
|
|
|
// IThorSlaveActivity overloaded methods
|
|
|
- virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
|
|
|
+ virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
if (!container.queryLocalOrGrouped())
|
|
|
mpTag = container.queryJobChannel().deserializeMPTag(data);
|
|
|
}
|
|
|
- virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
|
|
|
- {
|
|
|
- PARENT::setInputStream(index, _input, consumerOrdered);
|
|
|
- rowcount_t rowN = (rowcount_t)helper->getRowToSelect();
|
|
|
- if (!isLocal && rowN)
|
|
|
- setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage()));
|
|
|
- }
|
|
|
- virtual void start()
|
|
|
+ virtual void start() override
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
|
|
@@ -102,8 +96,17 @@ public:
|
|
|
throw;
|
|
|
}
|
|
|
|
|
|
+ rowcount_t rowN = (rowcount_t)helper->getRowToSelect();
|
|
|
+ IStartableEngineRowStream *lookAhead = nullptr;
|
|
|
+ if (!isLocal && rowN)
|
|
|
+ {
|
|
|
+ lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage());
|
|
|
+ originalInputStream.setown(replaceInputStream(0, lookAhead));
|
|
|
+ lookAhead->start();
|
|
|
+ }
|
|
|
+
|
|
|
seenNth = false;
|
|
|
- if (0==helper->getRowToSelect())
|
|
|
+ if (0 == rowN)
|
|
|
{
|
|
|
ThorDataLinkMetaInfo info;
|
|
|
queryInput(0)->getMetaInfo(info);
|
|
@@ -119,7 +122,15 @@ public:
|
|
|
}
|
|
|
first = true;
|
|
|
}
|
|
|
- virtual void abort()
|
|
|
+ virtual void stop() override
|
|
|
+ {
|
|
|
+ PARENT::stop();
|
|
|
+ if (originalInputStream)
|
|
|
+ {
|
|
|
+ Owned<IEngineRowStream> lookAhead = replaceInputStream(0, originalInputStream.getClear());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ virtual void abort() override
|
|
|
{
|
|
|
CSlaveActivity::abort();
|
|
|
if (!firstNode())
|
|
@@ -188,7 +199,7 @@ public:
|
|
|
return ret.getClear();
|
|
|
}
|
|
|
virtual bool isGrouped() const override { return false; }
|
|
|
- void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
+ virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
info.isSequential = true;
|
|
@@ -196,7 +207,7 @@ public:
|
|
|
calcMetaInfoSize(info, queryInput(0));
|
|
|
}
|
|
|
// IStartableEngineRowStream methods used for global selectn only
|
|
|
- virtual void onInputFinished(rowcount_t count)
|
|
|
+ virtual void onInputFinished(rowcount_t count) override
|
|
|
{
|
|
|
SpinBlock b(spin);
|
|
|
lookaheadN = count;
|