Browse Source

HPCC-18303 Fix nway input stop issues.

The NWay input activities were not correctly stopping all inputs,
which could cause problems with upstream splitters or if inside
a child query that was restarted.
The symptom reported was an activity that failed to re-start
because it's stop() had not been called.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 years ago
parent
commit
f3b19ff65a

+ 42 - 0
testing/regress/ecl/childcondnway.ecl

@@ -0,0 +1,42 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2017 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//noroxie
+
+rec := RECORD
+ STRING10 name;
+ STRING1 sex;
+END;
+
+parentRec := RECORD
+ string surname;
+ DATASET(rec) cds;
+END;
+
+ds := DATASET([{'Smith',[{'John','M'},{'Heather','F'},{'Jim','M'}]},{'Jones',[{'Adam','M'},{'Gina','F'}]},{'Peters',[{'David','M'},{'Daniel','M'},{'Damien','M'}]},{'Jones',[{'Gloria','F'},{'Arthur','M'},{'Grace','F'}]},{'Smith',[{'James','M'},{'Hazel','F'},{'Helen','F'}]}], parentRec);
+
+parentRec trans(parentRec l, parentRec r) := TRANSFORM
+ sl := SORT(l.cds, sex);
+ sr := SORT(r.cds, sex);
+ mr := MERGEJOIN([sl, sr], LEFT.sex = RIGHT.sex, SORTED(sex));
+ SELF.cds := IF(l.surname=r.surname, mr, sr);
+ SELF := r;
+END;
+
+s := SORT(ds, surname);
+i := ITERATE(s, trans(LEFT, RIGHT));
+OUTPUT(i);

+ 7 - 0
testing/regress/ecl/key/childcondnway.xml

@@ -0,0 +1,7 @@
+<Dataset name='Result 1'>
+ <Row><surname>Jones</surname><cds><Row><name>Gina      </name><sex>F</sex></Row><Row><name>Adam      </name><sex>M</sex></Row></cds></Row>
+ <Row><surname>Jones</surname><cds><Row><name>Gina      </name><sex>F</sex></Row><Row><name>Gloria    </name><sex>F</sex></Row><Row><name>Grace     </name><sex>F</sex></Row><Row><name>Adam      </name><sex>M</sex></Row><Row><name>Arthur    </name><sex>M</sex></Row></cds></Row>
+ <Row><surname>Peters</surname><cds><Row><name>David     </name><sex>M</sex></Row><Row><name>Daniel    </name><sex>M</sex></Row><Row><name>Damien    </name><sex>M</sex></Row></cds></Row>
+ <Row><surname>Smith</surname><cds><Row><name>Heather   </name><sex>F</sex></Row><Row><name>John      </name><sex>M</sex></Row><Row><name>Jim       </name><sex>M</sex></Row></cds></Row>
+ <Row><surname>Smith</surname><cds><Row><name>Heather   </name><sex>F</sex></Row><Row><name>Hazel     </name><sex>F</sex></Row><Row><name>Helen     </name><sex>F</sex></Row><Row><name>John      </name><sex>M</sex></Row><Row><name>Jim       </name><sex>M</sex></Row><Row><name>James     </name><sex>M</sex></Row></cds></Row>
+</Dataset>

+ 4 - 2
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -948,11 +948,13 @@ public:
                 selectedInputJunctions.append(queryInputJunction(nextIndex-1));
             }
         }
-        // NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs and selectedInputJunctions
+        // NB: Whatever pulls this IThorNWayInput, starts the selectedInputs and selectedInputJunctions
+        dataLinkStart();
     }
     virtual void stop() override
     {
-        // NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs
+        stopAllInputs();
+        dataLinkStop();
     }
     CATCH_NEXTROW()
     {

+ 0 - 1
thorlcr/activities/join/thjoinslave.cpp

@@ -658,7 +658,6 @@ public:
     {
         processor.afterProcessing();
         CThorNarySlaveActivity::stop();
-        dataLinkStop();
     }
     CATCH_NEXTROW()
     {

+ 2 - 0
thorlcr/graph/thgraphslave.hpp

@@ -66,6 +66,8 @@ public:
         owner.ActPrintLog("ITDL starting for output %d", outputId);
 #endif
 #ifdef _TESTING
+        bool started = hasStarted();
+        bool stopped = hasStopped();
         assertex(!hasStarted() || hasStopped());      // ITDL started twice
 #endif
         icount = 0;

+ 2 - 4
thorlcr/slave/slave.ipp

@@ -138,13 +138,11 @@ public:
     }
     void stop()
     {
-        ForEachItemIn(ei, expandedStreams)
-            expandedStreams.item(ei)->stop();
-        ForEachItemIn(idx, expandedInputs)
-            resetJunction(expandedJunctions.item(idx));
+        stopAllInputs();
         expandedInputs.kill();
         expandedStreams.kill();
         expandedJunctions.kill();
+        dataLinkStop();
     }
 };