|
@@ -1178,6 +1178,8 @@ public:
|
|
|
return ctx;
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return junction; }
|
|
|
virtual IRoxieServerActivity *queryActivity() { return this; }
|
|
|
virtual IIndexReadActivityInfo *queryIndexReadActivity() { return NULL; }
|
|
|
|
|
@@ -2242,6 +2244,9 @@ public:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { return nullptr; }
|
|
|
+
|
|
|
virtual IIndexReadActivityInfo *queryIndexReadActivity()
|
|
|
{
|
|
|
return puller.queryInput()->queryIndexReadActivity();
|
|
@@ -2541,7 +2546,28 @@ public:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- virtual void reset()
|
|
|
+ virtual IFinalRoxieInput * queryConcreteInput(unsigned idx)
|
|
|
+ {
|
|
|
+ return queryInput(idx);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
|
|
|
+ {
|
|
|
+ if (whichInput < numInputs)
|
|
|
+ return streamArray[whichInput];
|
|
|
+ else
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
|
|
|
+ {
|
|
|
+ if (idx < numInputs)
|
|
|
+ return junctionArray[idx];
|
|
|
+ else
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void reset()
|
|
|
{
|
|
|
for (unsigned i = 0; i < numInputs; i++)
|
|
|
inputArray[i]->reset();
|
|
@@ -4040,6 +4066,9 @@ public:
|
|
|
meta.set(newmeta);
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
|
|
|
+
|
|
|
virtual IRoxieServerActivity *queryActivity()
|
|
|
{
|
|
|
return &activity;
|
|
@@ -6072,6 +6101,8 @@ public:
|
|
|
{
|
|
|
return input->queryTotalCycles();
|
|
|
}
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
|
|
|
virtual IRoxieServerActivity *queryActivity()
|
|
|
{
|
|
|
return input->queryActivity();
|
|
@@ -6138,6 +6169,9 @@ public:
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
|
|
|
+
|
|
|
virtual IRoxieServerActivity *queryActivity()
|
|
|
{
|
|
|
throwUnexpected();
|
|
@@ -6221,6 +6255,16 @@ public:
|
|
|
return input->queryConcreteInput(idx);
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
|
|
|
+ {
|
|
|
+ return input->queryConcreteOutputStream(whichInput);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
|
|
|
+ {
|
|
|
+ return input->queryConcreteOutputJunction(idx);
|
|
|
+ }
|
|
|
+
|
|
|
virtual IOutputMetaData * queryOutputMeta() const
|
|
|
{
|
|
|
return input->queryOutputMeta();
|
|
@@ -8607,6 +8651,9 @@ public:
|
|
|
stopped = false;
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(idx==0); return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
|
|
|
+
|
|
|
virtual IRoxieServerActivity *queryActivity()
|
|
|
{
|
|
|
return parent;
|
|
@@ -8616,7 +8663,7 @@ public:
|
|
|
{
|
|
|
return parent->queryIndexReadActivity();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
virtual unsigned __int64 queryTotalCycles() const
|
|
|
{
|
|
|
return totalCycles;
|
|
@@ -16075,6 +16122,7 @@ class CRoxieServerNWayInputBaseActivity : public CRoxieServerMultiInputBaseActiv
|
|
|
protected:
|
|
|
PointerArrayOf<IFinalRoxieInput> selectedInputs;
|
|
|
PointerArrayOf<IEngineRowStream> selectedStreams;
|
|
|
+ PointerArrayOf<IStrandJunction> selectedJunctions;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerNWayInputBaseActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
|
|
@@ -16116,6 +16164,7 @@ public:
|
|
|
selectedInputs.item(i)->reset();
|
|
|
selectedInputs.kill();
|
|
|
selectedStreams.kill();
|
|
|
+ selectedJunctions.kill();
|
|
|
CRoxieServerMultiInputBaseActivity::reset();
|
|
|
}
|
|
|
|
|
@@ -16132,8 +16181,20 @@ public:
|
|
|
|
|
|
virtual IFinalRoxieInput * queryConcreteInput(unsigned idx)
|
|
|
{
|
|
|
- if (selectedInputs.isItem(idx))
|
|
|
- return selectedInputs.item(idx);
|
|
|
+ return queryInput(idx);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
|
|
|
+ {
|
|
|
+ if (selectedStreams.isItem(whichInput))
|
|
|
+ return selectedStreams.item(whichInput);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
|
|
|
+ {
|
|
|
+ if (selectedJunctions.isItem(idx))
|
|
|
+ return selectedJunctions.item(idx);
|
|
|
return NULL;
|
|
|
}
|
|
|
};
|
|
@@ -16157,6 +16218,8 @@ public:
|
|
|
helper.getInputSelection(selectionIsAll, selectionLen, selection.refdata());
|
|
|
|
|
|
selectedInputs.kill();
|
|
|
+ selectedStreams.kill();
|
|
|
+ selectedJunctions.kill();
|
|
|
assertex(numInputs==numStreams); // Will need refactoring when that ceases to be true
|
|
|
if (selectionIsAll)
|
|
|
{
|
|
@@ -16164,6 +16227,7 @@ public:
|
|
|
{
|
|
|
selectedInputs.append(inputArray[i]);
|
|
|
selectedStreams.append(streamArray[i]); // Assumes 1:1 relationship - is that good?
|
|
|
+ selectedJunctions.append(junctionArray[i]);
|
|
|
}
|
|
|
}
|
|
|
else
|
|
@@ -16184,13 +16248,17 @@ public:
|
|
|
|
|
|
selectedInputs.append(inputArray[nextIndex-1]);
|
|
|
selectedStreams.append(streamArray[nextIndex-1]); // Assumes 1:1 relationship - is that good?
|
|
|
+ selectedJunctions.append(junctionArray[nextIndex-1]);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- ForEachItemIn(i2, selectedInputs)
|
|
|
- selectedInputs.item(i2)->start(parentExtractSize, parentExtract, paused);
|
|
|
+ // NB: Whatever pulls this nwayinput activity, starts and stops the selectedInputs and selectedJunctions
|
|
|
}
|
|
|
|
|
|
+ virtual void stop()
|
|
|
+ {
|
|
|
+ // NB: Whatever pulls this nwayinput activity, starts and stops the selectedInputs
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
class CRoxieServerNWayInputActivityFactory : public CRoxieServerMultiInputFactory
|
|
@@ -16259,15 +16327,9 @@ public:
|
|
|
resultInput->onCreate(colocalParent);
|
|
|
resultInput->start(parentExtractSize, parentExtract, paused);
|
|
|
selectedStreams.append(connectSingleStream(ctx, resultInput, 0, resultJunctions[i], true));
|
|
|
+ selectedJunctions.append(resultJunctions[i]);
|
|
|
}
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- ForEachItemIn(i, selectedInputs)
|
|
|
- selectedInputs.item(i)->start(parentExtractSize, parentExtract, paused);
|
|
|
- }
|
|
|
- ForEachItemIn(i, selectedStreams)
|
|
|
- startJunction(resultJunctions[i]);
|
|
|
}
|
|
|
|
|
|
virtual void reset()
|
|
@@ -16301,6 +16363,7 @@ public:
|
|
|
IFinalRoxieInput *in = processor.connectIterationOutput(selections[i], probeManager, probes, this, i);
|
|
|
selectedInputs.append(in);
|
|
|
selectedStreams.append(connectSingleStream(ctx, in, 0, resultJunctions[i], true));
|
|
|
+ selectedJunctions.append(resultJunctions[i]);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -16434,52 +16497,84 @@ protected:
|
|
|
|
|
|
//=================================================================================
|
|
|
|
|
|
-class CRoxieServerNaryActivity : public CRoxieServerMultiInputActivity
|
|
|
+class CRoxieServerNWayBaseActivity : public CRoxieServerMultiInputBaseActivity
|
|
|
{
|
|
|
public:
|
|
|
- CRoxieServerNaryActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
|
|
|
- : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs), expandedJunctions(nullptr)
|
|
|
+ CRoxieServerNWayBaseActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
|
|
|
+ : CRoxieServerMultiInputBaseActivity(_ctx, _factory, _probeManager, _numInputs)
|
|
|
{
|
|
|
}
|
|
|
|
|
|
- virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
+ void startExpandedInputs(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
{
|
|
|
- CRoxieServerMultiInputActivity::start(parentExtractSize, parentExtract, paused);
|
|
|
- for (unsigned i=0; i < numInputs; i++)
|
|
|
- {
|
|
|
- IFinalRoxieInput * cur = inputArray[i];
|
|
|
- unsigned numRealInputs = cur->numConcreteOutputs();
|
|
|
- for (unsigned j = 0; j < numRealInputs; j++)
|
|
|
- {
|
|
|
- IFinalRoxieInput * curReal = cur->queryConcreteInput(j);
|
|
|
- expandedInputs.append(curReal);
|
|
|
- }
|
|
|
- }
|
|
|
- expandedJunctions = new Owned<IStrandJunction> [expandedInputs.length()];
|
|
|
+ ForEachItemIn(ei, expandedInputs)
|
|
|
+ expandedInputs.item(ei)->start(parentExtractSize, parentExtract, paused);
|
|
|
ForEachItemIn(idx, expandedInputs)
|
|
|
- {
|
|
|
- expandedStreams.append(connectSingleStream(ctx, expandedInputs.item(idx), 0, expandedJunctions[idx], true)); // MORE - is the index 0 right?
|
|
|
- startJunction(expandedJunctions[idx]);
|
|
|
- }
|
|
|
+ startJunction(expandedJunctions.item(idx));
|
|
|
}
|
|
|
|
|
|
- virtual void reset()
|
|
|
+ virtual void stop()
|
|
|
+ {
|
|
|
+ ForEachItemIn(i, expandedStreams)
|
|
|
+ expandedStreams.item(i)->stop();
|
|
|
+ CRoxieServerMultiInputBaseActivity::stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void reset()
|
|
|
{
|
|
|
ForEachItemIn(idx, expandedInputs)
|
|
|
- resetJunction(expandedJunctions[idx]);
|
|
|
+ resetJunction(expandedJunctions.item(idx));
|
|
|
expandedInputs.kill();
|
|
|
expandedStreams.kill();
|
|
|
- delete [] expandedJunctions;
|
|
|
- expandedJunctions = nullptr;
|
|
|
- CRoxieServerMultiInputActivity::reset();
|
|
|
+ expandedJunctions.kill();
|
|
|
+ CRoxieServerMultiInputBaseActivity::reset();
|
|
|
}
|
|
|
-
|
|
|
protected:
|
|
|
PointerArrayOf<IFinalRoxieInput> expandedInputs;
|
|
|
PointerArrayOf<IEngineRowStream> expandedStreams;
|
|
|
- Owned<IStrandJunction> *expandedJunctions;
|
|
|
+ PointerArrayOf<IStrandJunction> expandedJunctions;
|
|
|
};
|
|
|
|
|
|
+//=================================================================================
|
|
|
+
|
|
|
+class CRoxieServerNaryActivity : public CRoxieServerNWayBaseActivity
|
|
|
+{
|
|
|
+public:
|
|
|
+ CRoxieServerNaryActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
|
|
|
+ : CRoxieServerNWayBaseActivity(_ctx, _factory, _probeManager, _numInputs)
|
|
|
+ {
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
+ {
|
|
|
+ CRoxieServerNWayBaseActivity::start(parentExtractSize, parentExtract, paused);
|
|
|
+
|
|
|
+ for (unsigned i=0; i < numInputs; i++)
|
|
|
+ {
|
|
|
+ IFinalRoxieInput * cur = inputArray[i];
|
|
|
+ CRoxieServerNWayInputBaseActivity *nWayInput = dynamic_cast<CRoxieServerNWayInputBaseActivity *>(cur);
|
|
|
+ if (nWayInput)
|
|
|
+ {
|
|
|
+ nWayInput->start(parentExtractSize, parentExtract, paused);
|
|
|
+ unsigned numRealInputs = cur->numConcreteOutputs();
|
|
|
+ for (unsigned j = 0; j < numRealInputs; j++)
|
|
|
+ {
|
|
|
+ expandedInputs.append(cur->queryConcreteInput(j));
|
|
|
+ expandedStreams.append(cur->queryConcreteOutputStream(j));
|
|
|
+ expandedJunctions.append(cur->queryConcreteOutputJunction(j));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ expandedInputs.append(cur);
|
|
|
+ // NB: this activities input streams + junction have been setup and held in CRoxieServerMultiInputActivity base
|
|
|
+ expandedStreams.append(queryConcreteOutputStream(i));
|
|
|
+ expandedJunctions.append(queryConcreteOutputJunction(i));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ startExpandedInputs(parentExtractSize, parentExtract, paused);
|
|
|
+ }
|
|
|
+};
|
|
|
|
|
|
//=================================================================================
|
|
|
|
|
@@ -16781,50 +16876,62 @@ IRoxieServerActivityFactory *createRoxieServerNWayMergeJoinActivityFactory(unsig
|
|
|
|
|
|
//=================================================================================
|
|
|
|
|
|
-class CRoxieServerNWaySelectActivity : public CRoxieServerMultiInputActivity
|
|
|
+class CRoxieServerNWaySelectActivity : public CRoxieServerNWayBaseActivity
|
|
|
{
|
|
|
IHThorNWaySelectArg &helper;
|
|
|
public:
|
|
|
CRoxieServerNWaySelectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
|
|
|
- : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs),
|
|
|
+ : CRoxieServerNWayBaseActivity(_ctx, _factory, _probeManager, _numInputs),
|
|
|
helper((IHThorNWaySelectArg &)basehelper)
|
|
|
{
|
|
|
- selectedInput = NULL;
|
|
|
- selectedStream = NULL;
|
|
|
}
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
{
|
|
|
- CRoxieServerMultiInputActivity::start(parentExtractSize, parentExtract, paused);
|
|
|
+ CRoxieServerNWayBaseActivity::start(parentExtractSize, parentExtract, paused);
|
|
|
|
|
|
unsigned whichInput = helper.getInputIndex();
|
|
|
- selectedInput = NULL;
|
|
|
- selectedStream = NULL;
|
|
|
+ selectedInput = nullptr;
|
|
|
+ selectedStream = nullptr;
|
|
|
if (whichInput--)
|
|
|
{
|
|
|
for (unsigned i=0; i < numInputs; i++)
|
|
|
{
|
|
|
IFinalRoxieInput * cur = inputArray[i];
|
|
|
- unsigned numRealInputs = cur->numConcreteOutputs();
|
|
|
- if (whichInput < numRealInputs)
|
|
|
+ CRoxieServerNWayInputBaseActivity *nWayInput = dynamic_cast<CRoxieServerNWayInputBaseActivity *>(cur);
|
|
|
+ if (nWayInput)
|
|
|
{
|
|
|
- selectedInput = cur->queryConcreteInput(whichInput);
|
|
|
- selectedStream = connectSingleStream(ctx, selectedInput, 0, selectedJunction, true); // Should this be passing whichInput??
|
|
|
- break;
|
|
|
+ nWayInput->start(parentExtractSize, parentExtract, paused);
|
|
|
+ unsigned numRealInputs = cur->numConcreteOutputs();
|
|
|
+ if (whichInput < numRealInputs)
|
|
|
+ {
|
|
|
+ expandedInputs.append(cur->queryConcreteInput(whichInput));
|
|
|
+ expandedStreams.append(cur->queryConcreteOutputStream(whichInput));
|
|
|
+ expandedJunctions.append(cur->queryConcreteOutputJunction(whichInput));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ whichInput -= numRealInputs;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (whichInput == 0)
|
|
|
+ {
|
|
|
+ expandedInputs.append(cur);
|
|
|
+ // NB: this activities input streams + junction have been setup and held in CRoxieServerMultiInputActivity base
|
|
|
+ expandedStreams.append(queryConcreteOutputStream(i));
|
|
|
+ expandedJunctions.append(queryConcreteOutputJunction(i));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ whichInput -= 1;
|
|
|
}
|
|
|
- whichInput -= numRealInputs;
|
|
|
}
|
|
|
}
|
|
|
- startJunction(selectedJunction);
|
|
|
- }
|
|
|
-
|
|
|
- virtual void reset()
|
|
|
- {
|
|
|
- selectedInput = NULL;
|
|
|
- selectedStream = NULL;
|
|
|
- resetJunction(selectedJunction);
|
|
|
- selectedJunction.clear();
|
|
|
- CRoxieServerMultiInputActivity::reset();
|
|
|
+ if (expandedInputs.ordinality())
|
|
|
+ {
|
|
|
+ startExpandedInputs(parentExtractSize, parentExtract, paused);
|
|
|
+ selectedInput = expandedInputs.item(0);
|
|
|
+ selectedStream = expandedStreams.item(0);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
const void * nextRow()
|
|
@@ -16864,8 +16971,8 @@ public:
|
|
|
}
|
|
|
|
|
|
protected:
|
|
|
- IFinalRoxieInput * selectedInput;
|
|
|
- IEngineRowStream * selectedStream;
|
|
|
+ IFinalRoxieInput * selectedInput = nullptr;
|
|
|
+ IEngineRowStream * selectedStream = nullptr;
|
|
|
Owned<IStrandJunction> selectedJunction;
|
|
|
};
|
|
|
|
|
@@ -20060,6 +20167,9 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { return nullptr; }
|
|
|
+
|
|
|
virtual IIndexReadActivityInfo *queryIndexReadActivity()
|
|
|
{
|
|
|
IFinalRoxieInput *in = cond ? inputTrue : inputFalse;
|
|
@@ -27459,6 +27569,8 @@ public:
|
|
|
streams.append(this);
|
|
|
return NULL;
|
|
|
}
|
|
|
+ virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
|
|
|
+ virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
|
|
|
virtual IRoxieServerActivity *queryActivity()
|
|
|
{
|
|
|
throwUnexpected();
|