|
@@ -34,15 +34,14 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
|
|
|
{
|
|
|
CThreadedPersistent threaded;
|
|
|
CParallelFunnel &funnel;
|
|
|
- Linked<IRowStream> input;
|
|
|
CriticalSection stopCrit;
|
|
|
StringAttr idStr;
|
|
|
unsigned inputIndex;
|
|
|
rowcount_t readThisInput; // purely for tracing
|
|
|
bool stopping;
|
|
|
public:
|
|
|
- CInputHandler(CParallelFunnel &_funnel, IRowStream *_input, unsigned _inputIndex)
|
|
|
- : threaded("CInputHandler", this), funnel(_funnel), input(_input), inputIndex(_inputIndex)
|
|
|
+ CInputHandler(CParallelFunnel &_funnel, unsigned _inputIndex)
|
|
|
+ : threaded("CInputHandler", this), funnel(_funnel), inputIndex(_inputIndex)
|
|
|
{
|
|
|
readThisInput = 0;
|
|
|
StringBuffer s(funnel.idStr);
|
|
@@ -74,17 +73,15 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
|
|
|
virtual void main()
|
|
|
{
|
|
|
bool started = false;
|
|
|
+ IEngineRowStream *inputStream = nullptr;
|
|
|
try
|
|
|
{
|
|
|
- if (funnel.startInputs)
|
|
|
- {
|
|
|
- IThorDataLink *_input = QUERYINTERFACE(input.get(), IThorDataLink);
|
|
|
- _input->start();
|
|
|
- }
|
|
|
+ funnel.activity.startInput(inputIndex);
|
|
|
started = true;
|
|
|
+ inputStream = funnel.activity.queryInputStream(inputIndex);
|
|
|
while (!stopping)
|
|
|
{
|
|
|
- OwnedConstThorRow row = input->ungroupedNextRow();
|
|
|
+ OwnedConstThorRow row = inputStream->ungroupedNextRow();
|
|
|
if (!row) break;
|
|
|
|
|
|
{
|
|
@@ -107,7 +104,7 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
|
|
|
return;
|
|
|
try
|
|
|
{
|
|
|
- input->stop();
|
|
|
+ inputStream->stop();
|
|
|
}
|
|
|
catch (IException *e)
|
|
|
{
|
|
@@ -118,12 +115,11 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- CActivityBase &activity;
|
|
|
+ CSlaveActivity &activity;
|
|
|
CIArrayOf<CInputHandler> inputHandlers;
|
|
|
bool startInputs;
|
|
|
Linked<IException> exception;
|
|
|
unsigned eoss;
|
|
|
- IArrayOf<IRowStream> oinstreams;
|
|
|
StringAttr idStr;
|
|
|
|
|
|
CriticalSection fullCrit, crit;
|
|
@@ -150,18 +146,8 @@ class CParallelFunnel : public CSimpleInterface, implements IRowStream
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CParallelFunnel(CActivityBase &_activity, IRowStream **instreams, unsigned numstreams) : activity(_activity)
|
|
|
- {
|
|
|
- startInputs = false;
|
|
|
- unsigned n = 0;
|
|
|
- while (n<numstreams) oinstreams.append(*LINK(instreams[n++]));
|
|
|
- init();
|
|
|
- }
|
|
|
- CParallelFunnel(CActivityBase &_activity, IThorDataLink **instreams, unsigned numstreams, bool _startInputs) : activity(_activity)
|
|
|
+ CParallelFunnel(CSlaveActivity &_activity) : activity(_activity)
|
|
|
{
|
|
|
- startInputs = _startInputs;
|
|
|
- unsigned n = 0;
|
|
|
- while (n<numstreams) oinstreams.append(*LINK(instreams[n++]));
|
|
|
init();
|
|
|
}
|
|
|
~CParallelFunnel()
|
|
@@ -179,10 +165,9 @@ public:
|
|
|
stopped = full = false;
|
|
|
totSize = 0;
|
|
|
eoss = 0;
|
|
|
- unsigned numinputs = oinstreams.ordinality();
|
|
|
serializer.set(activity.queryRowSerializer());
|
|
|
- ForEachItemIn(i, oinstreams)
|
|
|
- inputHandlers.append(* new CInputHandler(*this, &oinstreams.item(i), i));
|
|
|
+ for (unsigned i=0; i<activity.queryNumInputs(); i++)
|
|
|
+ inputHandlers.append(* new CInputHandler(*this, i));
|
|
|
// because of the way eos reported make sure started afterwards
|
|
|
ForEachItemIn(j, inputHandlers)
|
|
|
inputHandlers.item(j).start();
|
|
@@ -269,17 +254,6 @@ friend class CInputHandler;
|
|
|
};
|
|
|
|
|
|
|
|
|
-IRowStream *createParallelFunnel(CActivityBase &activity, IRowStream **instreams, unsigned numstreams)
|
|
|
-{
|
|
|
- return new CParallelFunnel(activity, instreams, numstreams);
|
|
|
-}
|
|
|
-
|
|
|
-IRowStream *createParallelFunnel(CActivityBase &activity, IThorDataLink **instreams, unsigned numstreams, bool startInputs)
|
|
|
-{
|
|
|
- return new CParallelFunnel(activity, instreams, numstreams, startInputs);
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
///////////////////
|
|
|
//
|
|
|
// FunnelSlaveActivity
|
|
@@ -287,9 +261,11 @@ IRowStream *createParallelFunnel(CActivityBase &activity, IThorDataLink **instre
|
|
|
|
|
|
//class CParallelFunnel;
|
|
|
//interface IBitSet;
|
|
|
-class FunnelSlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
+class FunnelSlaveActivity : public CSlaveActivity
|
|
|
{
|
|
|
- IThorDataLink *current;
|
|
|
+ typedef CSlaveActivity PARENT;
|
|
|
+
|
|
|
+ IRowStream *current;
|
|
|
unsigned currentMarker;
|
|
|
bool grouped, *eog, eogNext, parallel;
|
|
|
rowcount_t readThisInput;
|
|
@@ -297,9 +273,7 @@ class FunnelSlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
Owned<IRowStream> parallelOutput;
|
|
|
|
|
|
public:
|
|
|
- IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
-
|
|
|
- FunnelSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this)
|
|
|
+ FunnelSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
|
|
|
{
|
|
|
grouped = false;
|
|
|
eog = NULL;
|
|
@@ -326,7 +300,10 @@ public:
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
if (!grouped && parallel)
|
|
|
- parallelOutput.setown(createParallelFunnel(*this, inputs.getArray(), inputs.ordinality(), true));
|
|
|
+ {
|
|
|
+ //NB starts inputs on each thread
|
|
|
+ parallelOutput.setown(new CParallelFunnel(*this));
|
|
|
+ }
|
|
|
else
|
|
|
{
|
|
|
eogNext = false;
|
|
@@ -344,14 +321,14 @@ public:
|
|
|
readThisInput = 0;
|
|
|
ForEachItemIn(i, inputs)
|
|
|
{
|
|
|
- IThorDataLink * input = inputs.item(i);
|
|
|
- try { startInput(input); }
|
|
|
+ try { startInput(i); }
|
|
|
catch (CATCHALL)
|
|
|
{
|
|
|
ActPrintLog("FUNNEL(%" ACTPF "d): Error staring input %d", container.queryId(), i);
|
|
|
throw;
|
|
|
}
|
|
|
- if (!current) current = input;
|
|
|
+ if (!current)
|
|
|
+ current = queryInputStream(i);
|
|
|
}
|
|
|
}
|
|
|
dataLinkStart();
|
|
@@ -368,7 +345,7 @@ public:
|
|
|
current = NULL;
|
|
|
unsigned i = stopped;
|
|
|
for (;i<inputs.ordinality(); i++)
|
|
|
- stopInput(inputs.item(i));
|
|
|
+ stopInput(i);
|
|
|
stopped = 0;
|
|
|
}
|
|
|
dataLinkStop();
|
|
@@ -395,11 +372,11 @@ public:
|
|
|
if (currentMarker + 1 < inputs.ordinality())
|
|
|
{
|
|
|
readThisInput = 0;
|
|
|
+ stopInput(currentMarker);
|
|
|
+ ++stopped;
|
|
|
currentMarker++;
|
|
|
ActPrintLog("FUNNEL: changing to input %d", currentMarker);
|
|
|
- ++stopped;
|
|
|
- stopInput(current);
|
|
|
- current = inputs.item(currentMarker);
|
|
|
+ current = queryInputStream(currentMarker);
|
|
|
// if empty stream, move on (ensuring eog,eog not returned by empty streams)
|
|
|
row.setown(current->nextRow());
|
|
|
if (row)
|
|
@@ -449,11 +426,11 @@ public:
|
|
|
if (currentMarker + 1 < inputs.ordinality())
|
|
|
{
|
|
|
readThisInput = 0;
|
|
|
+ stopInput(currentMarker);
|
|
|
+ ++stopped;
|
|
|
currentMarker++;
|
|
|
ActPrintLog("FUNNEL: changing to input %d", currentMarker);
|
|
|
- ++stopped;
|
|
|
- stopInput(current);
|
|
|
- current = inputs.item(currentMarker);
|
|
|
+ current = queryInputStream(currentMarker);
|
|
|
row.setown(current->ungroupedNextRow());
|
|
|
if (row)
|
|
|
{
|
|
@@ -475,9 +452,9 @@ public:
|
|
|
virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
- calcMetaInfoSize(info, inputs.getArray(), inputs.ordinality());
|
|
|
+ calcMetaInfoSize(info, inputs);
|
|
|
}
|
|
|
- virtual bool isGrouped() { return grouped; }
|
|
|
+ virtual bool isGrouped() const override { return grouped; }
|
|
|
};
|
|
|
|
|
|
/////
|
|
@@ -486,7 +463,7 @@ public:
|
|
|
// CombineSlaveActivity
|
|
|
//
|
|
|
|
|
|
-class CombineSlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
+class CombineSlaveActivity : public CSlaveActivity
|
|
|
{
|
|
|
IHThorCombineArg *helper;
|
|
|
bool grouped;
|
|
@@ -495,11 +472,8 @@ class CombineSlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
CThorExpandingRowArray rows;
|
|
|
|
|
|
public:
|
|
|
- IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
-
|
|
|
-
|
|
|
CombineSlaveActivity(CGraphElementBase *_container)
|
|
|
- : CSlaveActivity(_container), CThorDataLink(this), rows(*this, this)
|
|
|
+ : CSlaveActivity(_container), rows(*this, this)
|
|
|
{
|
|
|
grouped = container.queryGrouped();
|
|
|
}
|
|
@@ -508,18 +482,17 @@ public:
|
|
|
helper = (IHThorCombineArg *) queryHelper();
|
|
|
appendOutputLinked(this);
|
|
|
}
|
|
|
- void init(MemoryBuffer &data, MemoryBuffer &slaveData)
|
|
|
+ virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
init();
|
|
|
}
|
|
|
- void start()
|
|
|
+ virtual void start() override
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
eogNext = false;
|
|
|
ForEachItemIn(i, inputs)
|
|
|
{
|
|
|
- IThorDataLink * input = inputs.item(i);
|
|
|
- try { startInput(input); }
|
|
|
+ try { startInput(i); }
|
|
|
catch (CATCHALL)
|
|
|
{
|
|
|
ActPrintLog("COMBINE(%" ACTPF "d): Error staring input %d", container.queryId(), i);
|
|
@@ -528,38 +501,44 @@ public:
|
|
|
}
|
|
|
dataLinkStart();
|
|
|
}
|
|
|
- void stop()
|
|
|
+ virtual void stop() override
|
|
|
{
|
|
|
- for (unsigned i=0;i<inputs.ordinality(); i++)
|
|
|
- stopInput(inputs.item(i));
|
|
|
+ stopAllInputs();
|
|
|
dataLinkStop();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
- loop {
|
|
|
+ loop
|
|
|
+ {
|
|
|
bool eog = false;
|
|
|
bool err = false;
|
|
|
unsigned i;
|
|
|
unsigned n = inputs.ordinality();
|
|
|
- for (i=0;i<n;i++) {
|
|
|
- OwnedConstThorRow row = inputs.item(i)->nextRow();
|
|
|
- if (row) {
|
|
|
- if (eog) {
|
|
|
+ for (i=0;i<n;i++)
|
|
|
+ {
|
|
|
+ OwnedConstThorRow row = queryInputStream(i)->nextRow();
|
|
|
+ if (row)
|
|
|
+ {
|
|
|
+ if (eog)
|
|
|
+ {
|
|
|
err = true;
|
|
|
break;
|
|
|
}
|
|
|
rows.append(row.getClear());
|
|
|
}
|
|
|
- else {
|
|
|
- if (i&&!eog) {
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (i&&!eog)
|
|
|
+ {
|
|
|
err = true;
|
|
|
break;
|
|
|
}
|
|
|
eog = true;
|
|
|
}
|
|
|
}
|
|
|
- if (err) {
|
|
|
+ if (err)
|
|
|
+ {
|
|
|
eog = true;
|
|
|
rows.kill();
|
|
|
throw MakeActivityException(this, -1, "mismatched input row count for Combine");
|
|
@@ -569,7 +548,8 @@ public:
|
|
|
RtlDynamicRowBuilder row(queryRowAllocator());
|
|
|
size32_t sizeGot = helper->transform(row, rows.ordinality(), rows.getRowArray());
|
|
|
rows.kill();
|
|
|
- if (sizeGot) {
|
|
|
+ if (sizeGot)
|
|
|
+ {
|
|
|
dataLinkIncrement();
|
|
|
return row.finalizeRowClear(sizeGot);
|
|
|
}
|
|
@@ -577,11 +557,11 @@ public:
|
|
|
rows.kill();
|
|
|
return NULL;
|
|
|
}
|
|
|
- bool isGrouped()
|
|
|
+ virtual bool isGrouped() const override
|
|
|
{
|
|
|
- return inputs.item(0)->isGrouped();
|
|
|
+ return queryInput(0)->isGrouped();
|
|
|
}
|
|
|
- void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
+ virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
// TBD I think this should say max out = lhs set.
|
|
@@ -592,16 +572,14 @@ public:
|
|
|
/////
|
|
|
|
|
|
|
|
|
-class RegroupSlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
+class RegroupSlaveActivity : public CSlaveActivity
|
|
|
{
|
|
|
IHThorRegroupArg *helper;
|
|
|
bool grouped;
|
|
|
bool eogNext;
|
|
|
unsigned curinput;
|
|
|
public:
|
|
|
- IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
-
|
|
|
- RegroupSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this)
|
|
|
+ RegroupSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
|
|
|
{
|
|
|
grouped = container.queryGrouped();
|
|
|
}
|
|
@@ -610,19 +588,18 @@ public:
|
|
|
helper = (IHThorRegroupArg *) queryHelper();
|
|
|
appendOutputLinked(this);
|
|
|
}
|
|
|
- void init(MemoryBuffer &data, MemoryBuffer &slaveData)
|
|
|
+ virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
init();
|
|
|
}
|
|
|
- void start()
|
|
|
+ virtual void start() override
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
curinput = 0;
|
|
|
eogNext = false;
|
|
|
ForEachItemIn(i, inputs)
|
|
|
{
|
|
|
- IThorDataLink * input = inputs.item(i);
|
|
|
- try { startInput(input); }
|
|
|
+ try { startInput(i); }
|
|
|
catch (CATCHALL)
|
|
|
{
|
|
|
ActPrintLog("REGROUP(%" ACTPF "d): Error staring input %d", container.queryId(), i);
|
|
@@ -631,42 +608,46 @@ public:
|
|
|
}
|
|
|
dataLinkStart();
|
|
|
}
|
|
|
- void stop()
|
|
|
+ virtual void stop() override
|
|
|
{
|
|
|
- for (unsigned i=0;i<inputs.ordinality(); i++)
|
|
|
- stopInput(inputs.item(i));
|
|
|
+ stopAllInputs();
|
|
|
dataLinkStop();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
unsigned n = inputs.ordinality();
|
|
|
- loop {
|
|
|
- if (curinput==n) {
|
|
|
- curinput = 0;
|
|
|
- break;
|
|
|
- }
|
|
|
- OwnedConstThorRow row = inputs.item(curinput)->nextRow();
|
|
|
- if (row) {
|
|
|
+ IRowStream *current = queryInputStream(curinput);
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ OwnedConstThorRow row = current->nextRow();
|
|
|
+ if (row)
|
|
|
+ {
|
|
|
dataLinkIncrement();
|
|
|
return row.getClear();
|
|
|
}
|
|
|
curinput++;
|
|
|
+ if (curinput==n)
|
|
|
+ {
|
|
|
+ curinput = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ current = queryInputStream(curinput);
|
|
|
}
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
+ virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
- calcMetaInfoSize(info, inputs.getArray(), inputs.ordinality());
|
|
|
+ calcMetaInfoSize(info, inputs);
|
|
|
}
|
|
|
- virtual bool isGrouped() { return true; }
|
|
|
+ virtual bool isGrouped() const override { return true; }
|
|
|
};
|
|
|
|
|
|
/////
|
|
|
|
|
|
-class NonEmptySlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
+class NonEmptySlaveActivity : public CSlaveActivity
|
|
|
{
|
|
|
IHThorNonEmptyArg *helper;
|
|
|
bool eogNext, eoi, anyThisGroup, anyThisInput;
|
|
@@ -709,9 +690,7 @@ class NonEmptySlaveActivity : public CSlaveActivity, public CThorDataLink
|
|
|
}
|
|
|
|
|
|
public:
|
|
|
- IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
-
|
|
|
- NonEmptySlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this)
|
|
|
+ NonEmptySlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
|
|
|
{
|
|
|
helper = (IHThorNonEmptyArg *) queryHelper();
|
|
|
sendReceiving = false;
|
|
@@ -734,15 +713,14 @@ public:
|
|
|
}
|
|
|
|
|
|
// IThorDataLink
|
|
|
- virtual void start()
|
|
|
+ virtual void start() override
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
curinput = 0;
|
|
|
anyThisGroup = anyThisInput = eogNext = false;
|
|
|
ForEachItemIn(i, inputs)
|
|
|
{
|
|
|
- IThorDataLink * input = inputs.item(i);
|
|
|
- try { startInput(input); }
|
|
|
+ try { startInput(i); }
|
|
|
catch (CATCHALL)
|
|
|
{
|
|
|
ActPrintLog("NONEMPTY(%" ACTPF "d): Error staring input %d", container.queryId(), i);
|
|
@@ -752,10 +730,9 @@ public:
|
|
|
eoi = 0 == inputs.ordinality();
|
|
|
dataLinkStart();
|
|
|
}
|
|
|
- virtual void stop()
|
|
|
+ virtual void stop() override
|
|
|
{
|
|
|
- for (unsigned i=0;i<inputs.ordinality(); i++)
|
|
|
- stopInput(inputs.item(i));
|
|
|
+ stopAllInputs();
|
|
|
dataLinkStop();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
@@ -769,7 +746,7 @@ public:
|
|
|
}
|
|
|
loop
|
|
|
{
|
|
|
- OwnedConstThorRow row = inputs.item(curinput)->nextRow();
|
|
|
+ OwnedConstThorRow row = queryInputStream(curinput)->nextRow();
|
|
|
if (row ) {
|
|
|
anyThisGroup = true;
|
|
|
anyThisInput = true;
|
|
@@ -789,136 +766,131 @@ public:
|
|
|
}
|
|
|
return NULL;
|
|
|
}
|
|
|
- virtual bool isGrouped() { return container.queryGrouped(); }
|
|
|
+ virtual bool isGrouped() const override { return container.queryGrouped(); }
|
|
|
virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
info.canReduceNumRows = true;
|
|
|
- calcMetaInfoSize(info, inputs.getArray(), inputs.ordinality());
|
|
|
+ calcMetaInfoSize(info, inputs);
|
|
|
}
|
|
|
};
|
|
|
|
|
|
|
|
|
-class CNWaySelectActivity : public CSlaveActivity, public CThorDataLink, public CThorSteppable
|
|
|
+class CNWaySelectActivity : public CSlaveActivity, public CThorSteppable
|
|
|
{
|
|
|
+ typedef CSlaveActivity PARENT;
|
|
|
+
|
|
|
IHThorNWaySelectArg *helper;
|
|
|
- IThorDataLink *selectedInput;
|
|
|
+ IThorDataLink *selectedInputITDL = nullptr;
|
|
|
+ IEngineRowStream *selectedStream = nullptr;
|
|
|
+ Owned<IStrandJunction> selectedJunction;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
|
|
|
- CNWaySelectActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorDataLink(this), CThorSteppable(this)
|
|
|
+ CNWaySelectActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorSteppable(this)
|
|
|
{
|
|
|
helper = (IHThorNWaySelectArg *)queryHelper();
|
|
|
- selectedInput = NULL;
|
|
|
}
|
|
|
- void init(MemoryBuffer &data, MemoryBuffer &slaveData)
|
|
|
+ virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
|
|
|
{
|
|
|
appendOutputLinked(this);
|
|
|
}
|
|
|
- void start()
|
|
|
+ virtual void start() override
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
|
|
|
- startInput(inputs.item(0));
|
|
|
+ PARENT::start();
|
|
|
|
|
|
unsigned whichInput = helper->getInputIndex();
|
|
|
- selectedInput = NULL;
|
|
|
+ selectedInputITDL = NULL;
|
|
|
+ selectedStream = NULL;
|
|
|
if (whichInput--)
|
|
|
{
|
|
|
ForEachItemIn(i, inputs)
|
|
|
{
|
|
|
- IThorDataLink *cur = inputs.item(i);
|
|
|
- CActivityBase *activity = cur->queryFromActivity();
|
|
|
- IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(activity);
|
|
|
+ IThorDataLink *cur = queryInput(i);
|
|
|
+ IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
|
|
|
if (nWayInput)
|
|
|
{
|
|
|
unsigned numRealInputs = nWayInput->numConcreteOutputs();
|
|
|
if (whichInput < numRealInputs)
|
|
|
{
|
|
|
- selectedInput = nWayInput->queryConcreteInput(whichInput);
|
|
|
+ selectedInputITDL = nWayInput->queryConcreteInput(whichInput);
|
|
|
+ selectedStream = connectSingleStream(*this, selectedInputITDL, 0, selectedJunction, true); // Should this be passing whichInput??
|
|
|
break;
|
|
|
}
|
|
|
whichInput -= numRealInputs;
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- if (whichInput == 0)
|
|
|
- selectedInput = cur;
|
|
|
- whichInput -= 1;
|
|
|
- }
|
|
|
- if (selectedInput)
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
- if (selectedInput)
|
|
|
- selectedInput->start();
|
|
|
- dataLinkStart();
|
|
|
+ if (selectedInputITDL)
|
|
|
+ selectedInputITDL->start();
|
|
|
+ startJunction(selectedJunction);
|
|
|
}
|
|
|
- void stop()
|
|
|
+ virtual void stop() override
|
|
|
{
|
|
|
- stopInput(inputs.item(0));
|
|
|
- if (selectedInput)
|
|
|
- selectedInput->stop();
|
|
|
- dataLinkStop();
|
|
|
+ PARENT::stop();
|
|
|
+ if (selectedStream)
|
|
|
+ selectedStream->stop();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
- if (!selectedInput)
|
|
|
+ if (!selectedStream)
|
|
|
return NULL;
|
|
|
- OwnedConstThorRow ret = selectedInput->nextRow();
|
|
|
+ OwnedConstThorRow ret = selectedStream->nextRow();
|
|
|
if (ret)
|
|
|
dataLinkIncrement();
|
|
|
return ret.getClear();
|
|
|
}
|
|
|
- bool gatherConjunctions(ISteppedConjunctionCollector &collector)
|
|
|
+ virtual bool gatherConjunctions(ISteppedConjunctionCollector &collector)
|
|
|
{
|
|
|
- if (!selectedInput)
|
|
|
+ if (!selectedStream)
|
|
|
return false;
|
|
|
- return selectedInput->gatherConjunctions(collector);
|
|
|
+ return selectedInputITDL->gatherConjunctions(collector);
|
|
|
}
|
|
|
- void resetEOF()
|
|
|
+ virtual void resetEOF()
|
|
|
{
|
|
|
- if (selectedInput)
|
|
|
- selectedInput->resetEOF();
|
|
|
+ if (selectedStream)
|
|
|
+ selectedStream->resetEOF();
|
|
|
}
|
|
|
- const void *nextRowGE(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
|
|
|
+ virtual const void *nextRowGE(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
|
|
|
{
|
|
|
try { return nextRowGENoCatch(seek, numFields, wasCompleteMatch, stepExtra); }
|
|
|
CATCH_NEXTROWX_CATCH;
|
|
|
}
|
|
|
- const void *nextRowGENoCatch(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
|
|
|
+ virtual const void *nextRowGENoCatch(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
|
|
|
{
|
|
|
ActivityTimer t(totalCycles, timeActivities);
|
|
|
- if (!selectedInput)
|
|
|
+ if (!selectedStream)
|
|
|
return NULL;
|
|
|
- return selectedInput->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
|
|
|
+ return selectedStream->nextRowGE(seek, numFields, wasCompleteMatch, stepExtra);
|
|
|
}
|
|
|
- void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
+ virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
- if (selectedInput)
|
|
|
- calcMetaInfoSize(info, selectedInput);
|
|
|
- else if (!started())
|
|
|
+ if (selectedStream)
|
|
|
+ calcMetaInfoSize(info, selectedInputITDL);
|
|
|
+ else if (!hasStarted())
|
|
|
info.canStall = true; // unkwown if !started
|
|
|
}
|
|
|
- virtual bool isGrouped() { return selectedInput ? selectedInput->isGrouped() : false; }
|
|
|
+ virtual bool isGrouped() const override { return selectedInputITDL ? selectedInputITDL->isGrouped() : false; }
|
|
|
// steppable
|
|
|
- virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx)
|
|
|
+ virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) override
|
|
|
{
|
|
|
- CSlaveActivity::setInput(index, inputActivity, inputOutIdx);
|
|
|
- CThorSteppable::setInput(index, inputActivity, inputOutIdx);
|
|
|
+ CSlaveActivity::setInputStream(index, input, consumerOrdered);
|
|
|
+ CThorSteppable::setInputStream(index, input, consumerOrdered);
|
|
|
}
|
|
|
virtual IInputSteppingMeta *querySteppingMeta()
|
|
|
{
|
|
|
- if (selectedInput)
|
|
|
- return selectedInput->querySteppingMeta();
|
|
|
+ if (selectedInputITDL)
|
|
|
+ return selectedInputITDL->querySteppingMeta();
|
|
|
return NULL;
|
|
|
}
|
|
|
};
|
|
|
|
|
|
|
|
|
-class CThorNWayInputSlaveActivity : public CSlaveActivity, public CThorDataLink, implements IThorNWayInput
|
|
|
+class CThorNWayInputSlaveActivity : public CSlaveActivity, implements IThorNWayInput
|
|
|
{
|
|
|
IHThorNWayInputArg *helper;
|
|
|
PointerArrayOf<IThorDataLink> selectedInputs;
|
|
@@ -927,16 +899,16 @@ class CThorNWayInputSlaveActivity : public CSlaveActivity, public CThorDataLink,
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
|
|
|
|
|
|
- CThorNWayInputSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container), CThorDataLink(this)
|
|
|
+ CThorNWayInputSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container)
|
|
|
{
|
|
|
helper = (IHThorNWayInputArg *)queryHelper();
|
|
|
grouped = helper->queryOutputMeta()->isGrouped(); // JCSMORE should match graph info, i.e. container.queryGrouped()
|
|
|
}
|
|
|
- void init(MemoryBuffer &data, MemoryBuffer &slaveData)
|
|
|
+ virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
|
|
|
{
|
|
|
appendOutputLinked(this);
|
|
|
}
|
|
|
- void start()
|
|
|
+ virtual void start() override
|
|
|
{
|
|
|
ActivityTimer s(totalCycles, timeActivities);
|
|
|
bool selectionIsAll;
|
|
@@ -947,7 +919,7 @@ public:
|
|
|
if (selectionIsAll)
|
|
|
{
|
|
|
ForEachItemIn(i, inputs)
|
|
|
- selectedInputs.append(inputs.item(i));
|
|
|
+ selectedInputs.append(queryInput(i));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -965,27 +937,25 @@ public:
|
|
|
if (!inputs.isItem(nextIndex-1))
|
|
|
throw MakeStringException(100, "Index %d in RANGE selection list is out of range", nextIndex);
|
|
|
|
|
|
- selectedInputs.append(inputs.item(nextIndex-1));
|
|
|
+ selectedInputs.append(queryInput(nextIndex-1));
|
|
|
}
|
|
|
}
|
|
|
// NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs
|
|
|
- dataLinkStart();
|
|
|
}
|
|
|
- void stop()
|
|
|
+ virtual void stop() override
|
|
|
{
|
|
|
// NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs
|
|
|
- dataLinkStop();
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
|
{
|
|
|
throwUnexpected();
|
|
|
}
|
|
|
- void getMetaInfo(ThorDataLinkMetaInfo &info)
|
|
|
+ virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
|
|
|
{
|
|
|
initMetaInfo(info);
|
|
|
- calcMetaInfoSize(info,inputs.item(0));
|
|
|
+ calcMetaInfoSize(info, queryInput(0));
|
|
|
}
|
|
|
- virtual bool isGrouped() { return grouped; }
|
|
|
+ virtual bool isGrouped() const override { return grouped; }
|
|
|
// IThorNWayInput impl.
|
|
|
virtual unsigned numConcreteOutputs() const
|
|
|
{
|