Parcourir la source

HPCC-15895 Roxie NWay expanded input v input stream handling fixes

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith il y a 9 ans
Parent
commit
e3b0ceeb1b

+ 8 - 0
roxie/ccd/ccddebug.cpp

@@ -114,6 +114,14 @@ public:
         else
             return in->queryConcreteInput(idx);
     }
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
+    {
+        return this;
+    }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
+    {
+        return nullptr;
+    }
     virtual IRoxieServerActivity *queryActivity()
     {
         return in->queryActivity();

+ 177 - 65
roxie/ccd/ccdserver.cpp

@@ -1155,6 +1155,8 @@ public:
         return ctx;
     }
 
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return junction; }
     virtual IRoxieServerActivity *queryActivity() { return this; }
     virtual IIndexReadActivityInfo *queryIndexReadActivity() { return NULL; }
 
@@ -2219,6 +2221,9 @@ public:
         return NULL;
     }
 
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { return nullptr; }
+
     virtual IIndexReadActivityInfo *queryIndexReadActivity() 
     {
         return puller.queryInput()->queryIndexReadActivity();
@@ -2518,7 +2523,28 @@ public:
             return NULL;
     }
 
-    virtual void reset()    
+    virtual IFinalRoxieInput * queryConcreteInput(unsigned idx)
+    {
+        return queryInput(idx);
+    }
+
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
+    {
+        if (whichInput < numInputs)
+            return streamArray[whichInput];
+        else
+            return NULL;
+    }
+
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
+    {
+        if (idx < numInputs)
+            return junctionArray[idx];
+        else
+            return NULL;
+    }
+
+    virtual void reset()
     {
         for (unsigned i = 0; i < numInputs; i++)
             inputArray[i]->reset();
@@ -4017,6 +4043,9 @@ public:
         meta.set(newmeta);
     }
 
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
+
     virtual IRoxieServerActivity *queryActivity()
     {
         return &activity;
@@ -6049,6 +6078,8 @@ public:
     {
         return input->queryTotalCycles();
     }
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
     virtual IRoxieServerActivity *queryActivity()
     {
         return input->queryActivity();
@@ -6115,6 +6146,9 @@ public:
         return 0;
     }
 
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
+
     virtual IRoxieServerActivity *queryActivity()
     {
         throwUnexpected();
@@ -6198,6 +6232,16 @@ public:
         return input->queryConcreteInput(idx);
     }
 
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
+    {
+        return input->queryConcreteOutputStream(whichInput);
+    }
+
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
+    {
+        return input->queryConcreteOutputJunction(idx);
+    }
+
     virtual IOutputMetaData * queryOutputMeta() const 
     { 
         return input->queryOutputMeta(); 
@@ -8584,6 +8628,9 @@ public:
             stopped = false;
         }
 
+        virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(idx==0); return this; }
+        virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
+
         virtual IRoxieServerActivity *queryActivity()
         {
             return parent;
@@ -8593,7 +8640,7 @@ public:
         {
             return parent->queryIndexReadActivity();
         }
-        
+
         virtual unsigned __int64 queryTotalCycles() const
         {
             return totalCycles;
@@ -16054,6 +16101,7 @@ class CRoxieServerNWayInputBaseActivity : public CRoxieServerMultiInputBaseActiv
 protected:
     PointerArrayOf<IFinalRoxieInput> selectedInputs;
     PointerArrayOf<IEngineRowStream> selectedStreams;
+    PointerArrayOf<IStrandJunction> selectedJunctions;
 
 public:
     CRoxieServerNWayInputBaseActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
@@ -16095,6 +16143,7 @@ public:
             selectedInputs.item(i)->reset();
         selectedInputs.kill();
         selectedStreams.kill();
+        selectedJunctions.kill();
         CRoxieServerMultiInputBaseActivity::reset(); 
     }
 
@@ -16111,8 +16160,20 @@ public:
 
     virtual IFinalRoxieInput * queryConcreteInput(unsigned idx)
     {
-        if (selectedInputs.isItem(idx))
-            return selectedInputs.item(idx);
+        return queryInput(idx);
+    }
+
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput)
+    {
+        if (selectedStreams.isItem(whichInput))
+            return selectedStreams.item(whichInput);
+        return NULL;
+    }
+
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
+    {
+        if (selectedJunctions.isItem(idx))
+            return selectedJunctions.item(idx);
         return NULL;
     }
 };
@@ -16136,6 +16197,8 @@ public:
         helper.getInputSelection(selectionIsAll, selectionLen, selection.refdata());
 
         selectedInputs.kill();
+        selectedStreams.kill();
+        selectedJunctions.kill();
         assertex(numInputs==numStreams); // Will need refactoring when that ceases to be true
         if (selectionIsAll)
         {
@@ -16143,6 +16206,7 @@ public:
             {
                 selectedInputs.append(inputArray[i]);
                 selectedStreams.append(streamArray[i]);    // Assumes 1:1 relationship - is that good?
+                selectedJunctions.append(junctionArray[i]);
             }
         }
         else
@@ -16163,13 +16227,17 @@ public:
 
                 selectedInputs.append(inputArray[nextIndex-1]);
                 selectedStreams.append(streamArray[nextIndex-1]);    // Assumes 1:1 relationship - is that good?
+                selectedJunctions.append(junctionArray[nextIndex-1]);
             }
         }
 
-        ForEachItemIn(i2, selectedInputs)
-            selectedInputs.item(i2)->start(parentExtractSize, parentExtract, paused);
+        // NB: Whatever pulls this nwayinput activity, starts and stops the selectedInputs and selectedJunctions
     }
 
+    virtual void stop()
+    {
+        // NB: Whatever pulls this nwayinput activity, starts and stops the selectedInputs
+    }
 };
 
 class CRoxieServerNWayInputActivityFactory : public CRoxieServerMultiInputFactory
@@ -16238,15 +16306,9 @@ public:
                 resultInput->onCreate(colocalParent);
                 resultInput->start(parentExtractSize, parentExtract, paused);
                 selectedStreams.append(connectSingleStream(ctx, resultInput, 0, resultJunctions[i], true));
+                selectedJunctions.append(resultJunctions[i]);
             }
         }
-        else
-        {
-            ForEachItemIn(i, selectedInputs)
-                selectedInputs.item(i)->start(parentExtractSize, parentExtract, paused);
-        }
-        ForEachItemIn(i, selectedStreams)
-            startJunction(resultJunctions[i]);
     }
 
     virtual void reset()    
@@ -16280,6 +16342,7 @@ public:
             IFinalRoxieInput *in = processor.connectIterationOutput(selections[i], probeManager, probes, this, i);
             selectedInputs.append(in);
             selectedStreams.append(connectSingleStream(ctx, in, 0, resultJunctions[i], true));
+            selectedJunctions.append(resultJunctions[i]);
         }
     }
 
@@ -16413,52 +16476,84 @@ protected:
 
 //=================================================================================
 
-class CRoxieServerNaryActivity : public CRoxieServerMultiInputActivity
+class CRoxieServerNWayBaseActivity : public CRoxieServerMultiInputBaseActivity
 {
 public:
-    CRoxieServerNaryActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
-        : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs), expandedJunctions(nullptr)
+    CRoxieServerNWayBaseActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
+        : CRoxieServerMultiInputBaseActivity(_ctx, _factory, _probeManager, _numInputs)
     {
     }
 
-    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    void startExpandedInputs(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
-        CRoxieServerMultiInputActivity::start(parentExtractSize, parentExtract, paused);
-        for (unsigned i=0; i < numInputs; i++)
-        {
-            IFinalRoxieInput * cur = inputArray[i];
-            unsigned numRealInputs = cur->numConcreteOutputs();
-            for (unsigned j = 0; j < numRealInputs; j++)
-            {
-                IFinalRoxieInput * curReal = cur->queryConcreteInput(j);
-                expandedInputs.append(curReal);
-            }
-        }
-        expandedJunctions = new Owned<IStrandJunction> [expandedInputs.length()];
+        ForEachItemIn(ei, expandedInputs)
+            expandedInputs.item(ei)->start(parentExtractSize, parentExtract, paused);
         ForEachItemIn(idx, expandedInputs)
-        {
-            expandedStreams.append(connectSingleStream(ctx, expandedInputs.item(idx), 0, expandedJunctions[idx], true));  // MORE - is the index 0 right?
-            startJunction(expandedJunctions[idx]);
-        }
+            startJunction(expandedJunctions.item(idx));
     }
 
-    virtual void reset()    
+    virtual void stop()
+    {
+        ForEachItemIn(i, expandedStreams)
+            expandedStreams.item(i)->stop();
+        CRoxieServerMultiInputBaseActivity::stop();
+    }
+
+    virtual void reset()
     {
         ForEachItemIn(idx, expandedInputs)
-            resetJunction(expandedJunctions[idx]);
+            resetJunction(expandedJunctions.item(idx));
         expandedInputs.kill();
         expandedStreams.kill();
-        delete [] expandedJunctions;
-        expandedJunctions = nullptr;
-        CRoxieServerMultiInputActivity::reset(); 
+        expandedJunctions.kill();
+        CRoxieServerMultiInputBaseActivity::reset();
     }
-
 protected:
     PointerArrayOf<IFinalRoxieInput> expandedInputs;
     PointerArrayOf<IEngineRowStream> expandedStreams;
-    Owned<IStrandJunction> *expandedJunctions;
+    PointerArrayOf<IStrandJunction> expandedJunctions;
 };
 
+//=================================================================================
+
+class CRoxieServerNaryActivity : public CRoxieServerNWayBaseActivity
+{
+public:
+    CRoxieServerNaryActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
+        : CRoxieServerNWayBaseActivity(_ctx, _factory, _probeManager, _numInputs)
+    {
+    }
+
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        CRoxieServerNWayBaseActivity::start(parentExtractSize, parentExtract, paused);
+
+        for (unsigned i=0; i < numInputs; i++)
+        {
+            IFinalRoxieInput * cur = inputArray[i];
+            CRoxieServerNWayInputBaseActivity *nWayInput = dynamic_cast<CRoxieServerNWayInputBaseActivity *>(cur);
+            if (nWayInput)
+            {
+                nWayInput->start(parentExtractSize, parentExtract, paused);
+                unsigned numRealInputs = cur->numConcreteOutputs();
+                for (unsigned j = 0; j < numRealInputs; j++)
+                {
+                    expandedInputs.append(cur->queryConcreteInput(j));
+                    expandedStreams.append(cur->queryConcreteOutputStream(j));
+                    expandedJunctions.append(cur->queryConcreteOutputJunction(j));
+                }
+            }
+            else
+            {
+                expandedInputs.append(cur);
+                // NB: this activities input streams + junction have been setup and held in CRoxieServerMultiInputActivity base
+                expandedStreams.append(queryConcreteOutputStream(i));
+                expandedJunctions.append(queryConcreteOutputJunction(i));
+            }
+        }
+        startExpandedInputs(parentExtractSize, parentExtract, paused);
+    }
+};
 
 //=================================================================================
 
@@ -16760,50 +16855,62 @@ IRoxieServerActivityFactory *createRoxieServerNWayMergeJoinActivityFactory(unsig
 
 //=================================================================================
 
-class CRoxieServerNWaySelectActivity : public CRoxieServerMultiInputActivity
+class CRoxieServerNWaySelectActivity : public CRoxieServerNWayBaseActivity
 {
     IHThorNWaySelectArg &helper;
 public:
     CRoxieServerNWaySelectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
-        : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs),
+        : CRoxieServerNWayBaseActivity(_ctx, _factory, _probeManager, _numInputs),
           helper((IHThorNWaySelectArg &)basehelper)
     {
-        selectedInput = NULL;
-        selectedStream = NULL;
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
-        CRoxieServerMultiInputActivity::start(parentExtractSize, parentExtract, paused);
+        CRoxieServerNWayBaseActivity::start(parentExtractSize, parentExtract, paused);
 
         unsigned whichInput = helper.getInputIndex();
-        selectedInput = NULL;
-        selectedStream = NULL;
+        selectedInput = nullptr;
+        selectedStream = nullptr;
         if (whichInput--)
         {
             for (unsigned i=0; i < numInputs; i++)
             {
                 IFinalRoxieInput * cur = inputArray[i];
-                unsigned numRealInputs = cur->numConcreteOutputs();
-                if (whichInput < numRealInputs)
+                CRoxieServerNWayInputBaseActivity *nWayInput = dynamic_cast<CRoxieServerNWayInputBaseActivity *>(cur);
+                if (nWayInput)
                 {
-                    selectedInput = cur->queryConcreteInput(whichInput);
-                    selectedStream = connectSingleStream(ctx, selectedInput, 0, selectedJunction, true);  // Should this be passing whichInput??
-                    break;
+                    nWayInput->start(parentExtractSize, parentExtract, paused);
+                    unsigned numRealInputs = cur->numConcreteOutputs();
+                    if (whichInput < numRealInputs)
+                    {
+                        expandedInputs.append(cur->queryConcreteInput(whichInput));
+                        expandedStreams.append(cur->queryConcreteOutputStream(whichInput));
+                        expandedJunctions.append(cur->queryConcreteOutputJunction(whichInput));
+                        break;
+                    }
+                    whichInput -= numRealInputs;
+                }
+                else
+                {
+                    if (whichInput == 0)
+                    {
+                        expandedInputs.append(cur);
+                        // NB: this activities input streams + junction have been setup and held in CRoxieServerMultiInputActivity base
+                        expandedStreams.append(queryConcreteOutputStream(i));
+                        expandedJunctions.append(queryConcreteOutputJunction(i));
+                        break;
+                    }
+                    whichInput -= 1;
                 }
-                whichInput -= numRealInputs;
             }
         }
-        startJunction(selectedJunction);
-    }
-
-    virtual void reset()    
-    {
-        selectedInput = NULL;
-        selectedStream = NULL;
-        resetJunction(selectedJunction);
-        selectedJunction.clear();
-        CRoxieServerMultiInputActivity::reset(); 
+        if (expandedInputs.ordinality())
+        {
+            startExpandedInputs(parentExtractSize, parentExtract, paused);
+            selectedInput = expandedInputs.item(0);
+            selectedStream = expandedStreams.item(0);
+        }
     }
 
     const void * nextRow()
@@ -16843,8 +16950,8 @@ public:
     }
 
 protected:
-    IFinalRoxieInput * selectedInput;
-    IEngineRowStream * selectedStream;
+    IFinalRoxieInput * selectedInput = nullptr;
+    IEngineRowStream * selectedStream = nullptr;
     Owned<IStrandJunction> selectedJunction;
 };
 
@@ -20039,6 +20146,9 @@ public:
         }
     }
 
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { return nullptr; }
+
     virtual IIndexReadActivityInfo *queryIndexReadActivity()
     {
         IFinalRoxieInput *in = cond ? inputTrue  : inputFalse;
@@ -27427,6 +27537,8 @@ public:
         streams.append(this);
         return NULL;
     }
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) { assertex(whichInput==0); return this; }
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const { assertex(idx==0); return nullptr; }
     virtual IRoxieServerActivity *queryActivity()
     {
         throwUnexpected();

+ 2 - 0
roxie/ccd/ccdserver.hpp

@@ -102,6 +102,8 @@ interface IFinalRoxieInput : extends IInputBase
     virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
     virtual unsigned numConcreteOutputs() const { return 1; }
     virtual IFinalRoxieInput * queryConcreteInput(unsigned idx) { assertex(idx==0); return this; }
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) = 0;
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const = 0;
     virtual IRoxieServerActivity *queryActivity() = 0;
     virtual IIndexReadActivityInfo *queryIndexReadActivity() = 0;
 

+ 31 - 26
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -767,7 +767,7 @@ class CNWaySelectActivity : public CSlaveActivity, public CThorSteppable
     typedef CSlaveActivity PARENT;
 
     IHThorNWaySelectArg *helper;
-    IThorDataLink *selectedInputITDL = nullptr;
+    IThorDataLink *selectedInput = nullptr;
     IEngineRowStream *selectedStream = nullptr;
     IStrandJunction *selectedJunction = nullptr;
 public:
@@ -783,45 +783,45 @@ public:
         ActivityTimer s(totalCycles, timeActivities);
 
         unsigned whichInput = helper->getInputIndex();
-        selectedInputITDL = nullptr;
+        selectedInput = nullptr;
         selectedStream = nullptr;
         selectedJunction = nullptr;
         if (whichInput--)
         {
             ForEachItemIn(i, inputs)
             {
-                IThorDataLink *cur = queryInput(i);
-                IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
+                IThorDataLink *curInput = queryInput(i);
+                IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(curInput);
                 if (nWayInput)
                 {
-                    cur->start();
-                    unsigned numRealInputs = nWayInput->numConcreteOutputs();
-                    if (whichInput < numRealInputs)
+                    curInput->start();
+                    unsigned numOutputs = nWayInput->numConcreteOutputs();
+                    if (whichInput < numOutputs)
                     {
-                        selectedInputITDL = nWayInput->queryConcreteInput(whichInput);
-                        selectedStream = nWayInput->queryConcreteInputStream(whichInput);
-                        selectedJunction = nWayInput->queryConcreteInputJunction(whichInput);
+                        selectedInput = nWayInput->queryConcreteOutput(whichInput);
+                        selectedStream = nWayInput->queryConcreteOutputStream(whichInput);
+                        selectedJunction = nWayInput->queryConcreteOutputJunction(whichInput);
                         break;
                     }
-                    whichInput -= numRealInputs;
+                    whichInput -= numOutputs;
                 }
                 else
                 {
                     if (whichInput == 0)
                     {
-                        selectedInputITDL = cur;
+                        selectedInput = curInput;
                         selectedStream = queryInputStream(i);
                         selectedJunction = queryInputJunction(i);
                         break;
                     }
                     whichInput -= 1;
                 }
-                if (selectedInputITDL)
+                if (selectedInput)
                     break;
             }
         }
-        if (selectedInputITDL)
-            selectedInputITDL->start();
+        if (selectedInput)
+            selectedInput->start();
         startJunction(selectedJunction);
         dataLinkStart();
     }
@@ -845,7 +845,7 @@ public:
     { 
         if (!selectedStream)
             return false;
-        return selectedInputITDL->gatherConjunctions(collector);
+        return selectedInput->gatherConjunctions(collector);
     }
     virtual void resetEOF()
     { 
@@ -868,11 +868,11 @@ public:
     {
         initMetaInfo(info);
         if (selectedStream)
-            calcMetaInfoSize(info, selectedInputITDL);
+            calcMetaInfoSize(info, selectedInput);
         else if (!hasStarted())
             info.canStall = true; // unkwown if !started
     }
-    virtual bool isGrouped() const override { return selectedInputITDL ? selectedInputITDL->isGrouped() : false; }
+    virtual bool isGrouped() const override { return selectedInput ? selectedInput->isGrouped() : false; }
 // steppable
     virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) override
     {
@@ -881,8 +881,8 @@ public:
     }
     virtual IInputSteppingMeta *querySteppingMeta()
     {
-        if (selectedInputITDL)
-            return selectedInputITDL->querySteppingMeta();
+        if (selectedInput)
+            return selectedInput->querySteppingMeta();
         return NULL;
     }
 };
@@ -915,6 +915,11 @@ public:
         selectedInputs.kill();
         selectedInputStreams.kill();
         selectedInputJunctions.kill();
+
+        /* NB: all input streams have been connected and because NWayInput does not support handling multiple streams.
+         * i.e. not a CThorStrandedActivity, will use base getOutputStreams implementation and ensure single streams are created.
+         * To allow NWay activities to handle stranding would need handle at getOutputStreams level and conditional produce junctions if mismatched # of output streams
+         */
         if (selectionIsAll)
         {
             ForEachItemIn(i, inputs)
@@ -945,7 +950,7 @@ public:
                 selectedInputJunctions.append(queryInputJunction(nextIndex-1));
             }
         }
-        // NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs
+        // NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs and selectedInputJunctions
     }
     virtual void stop() override
     {
@@ -966,19 +971,19 @@ public:
     {
         return selectedInputs.ordinality();
     }
-    virtual IThorDataLink *queryConcreteInput(unsigned idx) const
+    virtual IThorDataLink *queryConcreteOutput(unsigned idx) const
     {
         if (selectedInputs.isItem(idx))
             return selectedInputs.item(idx);
         return NULL;
     }
-    virtual IEngineRowStream *queryConcreteInputStream(unsigned idx) const
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) const
     {
-        if (selectedInputStreams.isItem(idx))
-            return selectedInputStreams.item(idx);
+        if (selectedInputStreams.isItem(whichInput))
+            return selectedInputStreams.item(whichInput);
         return NULL;
     }
-    virtual IStrandJunction *queryConcreteInputJunction(unsigned idx) const
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned idx) const
     {
         if (selectedInputJunctions.isItem(idx))
             return selectedInputJunctions.item(idx);

+ 1 - 1
thorlcr/activities/loop/thloopslave.cpp

@@ -1341,7 +1341,7 @@ public:
         return inputs.ordinality();
     }
 
-    virtual IHThorInput * queryConcreteInput(unsigned idx) const
+    virtual IHThorInput * queryConcreteOutput(unsigned idx) const
     {
         if (inputs.isItem(idx))
             return &inputs.item(idx);

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -186,7 +186,7 @@ void CSlaveActivity::setInputStream(unsigned index, CThorInput &_input, bool con
         _input.junction.setown(junction.getClear());
         if (0 == index)
             inputStream = _inputStream;
-        _input.itdl->setOutputStream(_input.sourceIdx, LINK(_inputStream)); // used by debug request only at moment.
+        _input.itdl->setOutputStream(_input.sourceIdx, LINK(_inputStream)); // used by debug request only at moment. // JCSMORE - this should probably be the junction outputstream if there is one
     }
 }
 

+ 13 - 13
thorlcr/slave/slave.ipp

@@ -82,9 +82,9 @@ public:
 interface IThorNWayInput
 {
     virtual unsigned numConcreteOutputs() const = 0;
-    virtual IThorDataLink *queryConcreteInput(unsigned idx) const = 0;
-    virtual IEngineRowStream *queryConcreteInputStream(unsigned idx) const = 0;
-    virtual IStrandJunction *queryConcreteInputJunction(unsigned idx) const = 0;
+    virtual IThorDataLink *queryConcreteOutput(unsigned idx) const = 0;
+    virtual IEngineRowStream *queryConcreteOutputStream(unsigned whichInput) const = 0;
+    virtual IStrandJunction *queryConcreteOutputJunction(unsigned whichInput) const = 0;
 };
 
 
@@ -105,18 +105,18 @@ public:
     {
         ForEachItemIn(i, inputs)
         {
-            IThorDataLink *cur = queryInput(i);
-            CActivityBase *activity = cur->queryFromActivity();
-            IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
+            IThorDataLink *curInput = queryInput(i);
+            CActivityBase *activity = curInput->queryFromActivity();
+            IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(curInput);
             if (nWayInput)
             {
-                cur->start();
-                unsigned numRealInputs = nWayInput->numConcreteOutputs();
-                for (unsigned i=0; i < numRealInputs; i++)
+                curInput->start();
+                unsigned numOutputs = nWayInput->numConcreteOutputs();
+                for (unsigned i=0; i < numOutputs; i++)
                 {
-                    IThorDataLink *curReal = nWayInput->queryConcreteInput(i);
-                    IEngineRowStream *curRealStream = nWayInput->queryConcreteInputStream(i);
-                    IStrandJunction *curRealJunction = nWayInput->queryConcreteInputJunction(i);
+                    IThorDataLink *curReal = nWayInput->queryConcreteOutput(i);
+                    IEngineRowStream *curRealStream = nWayInput->queryConcreteOutputStream(i);
+                    IStrandJunction *curRealJunction = nWayInput->queryConcreteOutputJunction(i);
                     expandedInputs.append(curReal);
                     expandedStreams.append(curRealStream);
                     expandedJunctions.append(curRealJunction);
@@ -124,7 +124,7 @@ public:
             }
             else
             {
-                expandedInputs.append(cur);
+                expandedInputs.append(curInput);
                 expandedStreams.append(queryInputStream(i));
                 expandedJunctions.append(queryInputJunction(i));
             }