Bläddra i källkod

HPCC-15875 Thor NWay expanded input v input stream handling fixes

Also add regression tests.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 år sedan
förälder
incheckning
9b3f9c5cc1

+ 5 - 0
testing/regress/ecl/key/nwaytest.xml

@@ -0,0 +1,5 @@
+<Dataset name='Result 1'>
+ <Row><key>1</key><children1><Row><key>1</key><l>a1</l></Row><Row><key>2</key><l>a2</l></Row></children1><children2><Row><key>1</key><l>a1</l></Row><Row><key>2</key><l>a2</l></Row></children2><joinres><Row><key>1</key><l>a1c1</l></Row><Row><key>2</key><l>a2c2</l></Row></joinres></Row>
+ <Row><key>2</key><children1><Row><key>1</key><l>c1</l></Row><Row><key>2</key><l>c2</l></Row></children1><children2><Row><key>1</key><l>t_b1</l></Row><Row><key>2</key><l>t_b2</l></Row></children2><joinres><Row><key>1</key><l>t_b1c1</l></Row><Row><key>2</key><l>t_b2c2</l></Row></joinres></Row>
+ <Row><key>2</key><children1><Row><key>1</key><l>c1</l></Row><Row><key>2</key><l>c2</l></Row></children1><children2><Row><key>1</key><l>t_b1</l></Row><Row><key>2</key><l>t_b2</l></Row></children2><joinres><Row><key>1</key><l>t_b1c1</l></Row><Row><key>2</key><l>t_b2c2</l></Row></joinres></Row>
+</Dataset>

+ 47 - 0
testing/regress/ecl/nwaytest.ecl

@@ -0,0 +1,47 @@
+
+
+rec := RECORD
+ unsigned key;
+ string l;
+END;
+
+input1 := DATASET([{1, 'a1'}, {2, 'a2'}], rec);
+input2 := DATASET([{1, 'b1'}, {2, 'b2'}], rec);
+input3 := DATASET([{1, 'c1'}, {2, 'c2'}], rec);
+
+p2 := PROJECT(NOFOLD(input2), TRANSFORM(rec, SELF.l := 't_' + LEFT.l; SELF := LEFT), PARALLEL(4));
+
+inputs := [input1, p2, input3];
+
+rec dojoin(DATASET(rec) m) := TRANSFORM
+ SELF.key := m[1].key;
+ SELF.l := m[1].l + m[2].l + m[3].l;
+END;
+
+
+
+parentrec := RECORD
+ unsigned key;
+ DATASET(rec) children1;
+ DATASET(rec) children2;
+ DATASET(rec) joinres;
+END;
+
+inrec := RECORD
+ unsigned key;
+ unsigned which;
+END;
+
+parentrec trans(inrec l) := TRANSFORM
+ r := RANGE(inputs, [l.which, 3]);
+ SELF.children1 := r[l.which];
+ SELF.children2 := inputs[l.which];
+ SELF.joinres := JOIN(r, LEFT.key=RIGHT.key, dojoin(ROWS(LEFT)), SORTED(key));
+ SELF := l;
+END;
+
+ds := DATASET([{1, 1}, {2, 2}, {2, 2}], inrec);
+t1 := PROJECT(ds, trans(LEFT), PARALLEL(4));
+
+OUTPUT(t1);
+

+ 43 - 6
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -769,7 +769,7 @@ class CNWaySelectActivity : public CSlaveActivity, public CThorSteppable
     IHThorNWaySelectArg *helper;
     IThorDataLink *selectedInputITDL = nullptr;
     IEngineRowStream *selectedStream = nullptr;
-    Owned<IStrandJunction> selectedJunction;
+    IStrandJunction *selectedJunction = nullptr;
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
@@ -782,11 +782,10 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
 
-        PARENT::start();
-
         unsigned whichInput = helper->getInputIndex();
-        selectedInputITDL = NULL;
-        selectedStream = NULL;
+        selectedInputITDL = nullptr;
+        selectedStream = nullptr;
+        selectedJunction = nullptr;
         if (whichInput--)
         {
             ForEachItemIn(i, inputs)
@@ -795,20 +794,36 @@ public:
                 IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
                 if (nWayInput)
                 {
+                    cur->start();
                     unsigned numRealInputs = nWayInput->numConcreteOutputs();
                     if (whichInput < numRealInputs)
                     {
                         selectedInputITDL = nWayInput->queryConcreteInput(whichInput);
-                        selectedStream = connectSingleStream(*this, selectedInputITDL, 0, selectedJunction, true);  // Should this be passing whichInput??
+                        selectedStream = nWayInput->queryConcreteInputStream(whichInput);
+                        selectedJunction = nWayInput->queryConcreteInputJunction(whichInput);
                         break;
                     }
                     whichInput -= numRealInputs;
                 }
+                else
+                {
+                    if (whichInput == 0)
+                    {
+                        selectedInputITDL = cur;
+                        selectedStream = queryInputStream(i);
+                        selectedJunction = queryInputJunction(i);
+                        break;
+                    }
+                    whichInput -= 1;
+                }
+                if (selectedInputITDL)
+                    break;
             }
         }
         if (selectedInputITDL)
             selectedInputITDL->start();
         startJunction(selectedJunction);
+        dataLinkStart();
     }
     virtual void stop() override
     {
@@ -877,6 +892,8 @@ class CThorNWayInputSlaveActivity : public CSlaveActivity, implements IThorNWayI
 {
     IHThorNWayInputArg *helper;
     PointerArrayOf<IThorDataLink> selectedInputs;
+    PointerArrayOf<IEngineRowStream> selectedInputStreams;
+    PointerArrayOf<IStrandJunction> selectedInputJunctions;
     bool grouped;
 
 public:
@@ -896,10 +913,16 @@ public:
         rtlDataAttr selection;
         helper->getInputSelection(selectionIsAll, selectionLen, selection.refdata());
         selectedInputs.kill();
+        selectedInputStreams.kill();
+        selectedInputJunctions.kill();
         if (selectionIsAll)
         {
             ForEachItemIn(i, inputs)
+            {
                 selectedInputs.append(queryInput(i));
+                selectedInputStreams.append(queryInputStream(i));
+                selectedInputJunctions.append(queryInputJunction(i));
+            }
         }
         else
         {
@@ -918,6 +941,8 @@ public:
                     throw MakeStringException(100, "Index %d in RANGE selection list is out of range", nextIndex);
 
                 selectedInputs.append(queryInput(nextIndex-1));
+                selectedInputStreams.append(queryInputStream(nextIndex-1));
+                selectedInputJunctions.append(queryInputJunction(nextIndex-1));
             }
         }
         // NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs
@@ -947,6 +972,18 @@ public:
             return selectedInputs.item(idx);
         return NULL;
     }
+    virtual IEngineRowStream *queryConcreteInputStream(unsigned idx) const
+    {
+        if (selectedInputStreams.isItem(idx))
+            return selectedInputStreams.item(idx);
+        return NULL;
+    }
+    virtual IStrandJunction *queryConcreteInputJunction(unsigned idx) const
+    {
+        if (selectedInputJunctions.isItem(idx))
+            return selectedInputJunctions.item(idx);
+        return NULL;
+    }
 };
 
 

+ 6 - 0
thorlcr/graph/thgraphslave.cpp

@@ -251,6 +251,12 @@ IEngineRowStream *CSlaveActivity::queryInputStream(unsigned index) const
     return inputs.item(index).stream;
 }
 
+IStrandJunction *CSlaveActivity::queryInputJunction(unsigned index) const
+{
+    if (index>=inputs.ordinality()) return nullptr;
+    return inputs.item(index).junction;
+}
+
 IEngineRowStream *CSlaveActivity::queryOutputStream(unsigned index) const
 {
     if (index>=outputStreams.ordinality()) return nullptr;

+ 1 - 0
thorlcr/graph/thgraphslave.hpp

@@ -171,6 +171,7 @@ public:
     IThorDataLink *queryOutput(unsigned index) const;
     IThorDataLink *queryInput(unsigned index) const;
     IEngineRowStream *queryInputStream(unsigned index) const;
+    IStrandJunction *queryInputJunction(unsigned index) const;
     IEngineRowStream *queryOutputStream(unsigned index) const;
     inline bool queryInputStarted(unsigned input) const { return inputs.item(input).isStarted(); }
     inline bool queryInputStopped(unsigned input) const { return inputs.item(input).isStopped(); }

+ 15 - 13
thorlcr/slave/slave.ipp

@@ -83,6 +83,8 @@ interface IThorNWayInput
 {
     virtual unsigned numConcreteOutputs() const = 0;
     virtual IThorDataLink *queryConcreteInput(unsigned idx) const = 0;
+    virtual IEngineRowStream *queryConcreteInputStream(unsigned idx) const = 0;
+    virtual IStrandJunction *queryConcreteInputJunction(unsigned idx) const = 0;
 };
 
 
@@ -92,17 +94,13 @@ class CThorNarySlaveActivity : public CSlaveActivity
     
 protected:
     PointerArrayOf<IThorDataLink> expandedInputs;
-    Owned<IStrandJunction> *expandedJunctions = nullptr;
     PointerArrayOf<IEngineRowStream> expandedStreams;
+    PointerArrayOf<IStrandJunction> expandedJunctions;
 
 public:
     CThorNarySlaveActivity(CGraphElementBase *container) : CSlaveActivity(container)
     {
     }
-    ~CThorNarySlaveActivity()
-    {
-        delete [] expandedJunctions;
-    }
     virtual void start() override
     {
         ForEachItemIn(i, inputs)
@@ -112,24 +110,29 @@ public:
             IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
             if (nWayInput)
             {
+                cur->start();
                 unsigned numRealInputs = nWayInput->numConcreteOutputs();
                 for (unsigned i=0; i < numRealInputs; i++)
                 {
                     IThorDataLink *curReal = nWayInput->queryConcreteInput(i);
+                    IEngineRowStream *curRealStream = nWayInput->queryConcreteInputStream(i);
+                    IStrandJunction *curRealJunction = nWayInput->queryConcreteInputJunction(i);
                     expandedInputs.append(curReal);
+                    expandedStreams.append(curRealStream);
+                    expandedJunctions.append(curRealJunction);
                 }
             }
             else
+            {
                 expandedInputs.append(cur);
+                expandedStreams.append(queryInputStream(i));
+                expandedJunctions.append(queryInputJunction(i));
+            }
         }
         ForEachItemIn(ei, expandedInputs)
             expandedInputs.item(ei)->start();
-        expandedJunctions = new Owned<IStrandJunction> [expandedInputs.ordinality()];
         ForEachItemIn(idx, expandedInputs)
-        {
-            expandedStreams.append(connectSingleStream(*this, expandedInputs.item(idx), 0, expandedJunctions[idx], true));  // MORE - is the index 0 right?
-            startJunction(expandedJunctions[idx]);
-        }
+            startJunction(expandedJunctions.item(idx));
         dataLinkStart();
     }
     void stop()
@@ -137,11 +140,10 @@ public:
         ForEachItemIn(ei, expandedStreams)
             expandedStreams.item(ei)->stop();
         ForEachItemIn(idx, expandedInputs)
-            resetJunction(expandedJunctions[idx]);
+            resetJunction(expandedJunctions.item(idx));
         expandedInputs.kill();
         expandedStreams.kill();
-        delete [] expandedJunctions;
-        expandedJunctions = nullptr;
+        expandedJunctions.kill();
     }
 };