Explorar o código

Merge branch 'candidate-6.0.0'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=9) %!d(string=hai) anos
pai
achega
58d67d9aea
Modificáronse 73 ficheiros con 422 adicións e 508 borrados
  1. 3 4
      common/environment/environment.cpp
  2. 2 2
      ecl/hql/hqlutil.cpp
  3. 14 2
      ecl/hqlcpp/hqlttcpp.cpp
  4. 15 3
      esp/src/eclwatch/ESPGraph.js
  5. 12 1
      esp/src/eclwatch/GraphWidget.js
  6. 11 9
      esp/src/eclwatch/JSGraphWidget.js
  7. 31 21
      rtl/eclrtl/rtlbcdtest.cpp
  8. 48 6
      rtl/nbcd/nbcd.cpp
  9. 4 0
      testing/regress/ecl/key/loopif.xml
  10. 36 0
      testing/regress/ecl/loopif.ecl
  11. 1 1
      thorlcr/activities/aggregate/thaggregateslave.cpp
  12. 2 2
      thorlcr/activities/aggregate/thgroupaggregateslave.cpp
  13. 0 6
      thorlcr/activities/apply/thapplyslave.cpp
  14. 4 8
      thorlcr/activities/catch/thcatchslave.cpp
  15. 4 6
      thorlcr/activities/choosesets/thchoosesetsslave.cpp
  16. 1 4
      thorlcr/activities/countproject/thcountprojectslave.cpp
  17. 1 1
      thorlcr/activities/csvread/thcsvrslave.cpp
  18. 0 3
      thorlcr/activities/degroup/thdegroupslave.cpp
  19. 25 35
      thorlcr/activities/diskread/thdiskreadslave.cpp
  20. 4 2
      thorlcr/activities/diskwrite/thdwslave.cpp
  21. 1 1
      thorlcr/activities/distribution/thdistributionslave.cpp
  22. 0 3
      thorlcr/activities/enth/thenthslave.cpp
  23. 1 1
      thorlcr/activities/fetch/thfetchslave.cpp
  24. 2 14
      thorlcr/activities/filter/thfilterslave.cpp
  25. 0 3
      thorlcr/activities/firstn/thfirstnslave.cpp
  26. 2 22
      thorlcr/activities/funnel/thfunnelslave.cpp
  27. 1 1
      thorlcr/activities/group/thgroupslave.cpp
  28. 6 11
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  29. 3 26
      thorlcr/activities/indexread/thindexreadslave.cpp
  30. 3 6
      thorlcr/activities/iterate/thgroupiterateslave.cpp
  31. 7 23
      thorlcr/activities/iterate/thiterateslave.cpp
  32. 1 2
      thorlcr/activities/join/thjoin.cpp
  33. 43 58
      thorlcr/activities/join/thjoinslave.cpp
  34. 1 2
      thorlcr/activities/keydiff/thkeydiff.cpp
  35. 1 2
      thorlcr/activities/keydiff/thkeydiffslave.cpp
  36. 1 1
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  37. 1 2
      thorlcr/activities/keypatch/thkeypatch.cpp
  38. 2 7
      thorlcr/activities/keypatch/thkeypatchslave.cpp
  39. 1 2
      thorlcr/activities/limit/thlimitslave.cpp
  40. 1 4
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
  41. 6 15
      thorlcr/activities/loop/thloopslave.cpp
  42. 4 12
      thorlcr/activities/merge/thmergeslave.cpp
  43. 2 2
      thorlcr/activities/msort/thgroupsortslave.cpp
  44. 1 1
      thorlcr/activities/msort/thmsortslave.cpp
  45. 2 2
      thorlcr/activities/msort/thsortu.cpp
  46. 6 10
      thorlcr/activities/normalize/thnormalizeslave.cpp
  47. 17 41
      thorlcr/activities/nsplitter/thnsplitterslave.cpp
  48. 4 5
      thorlcr/activities/nullaction/thnullactionslave.cpp
  49. 2 3
      thorlcr/activities/parse/thparseslave.cpp
  50. 4 5
      thorlcr/activities/piperead/thprslave.cpp
  51. 9 13
      thorlcr/activities/project/thprojectslave.cpp
  52. 0 5
      thorlcr/activities/pull/thpullslave.cpp
  53. 4 13
      thorlcr/activities/rollup/throllupslave.cpp
  54. 2 4
      thorlcr/activities/sample/thsampleslave.cpp
  55. 2 2
      thorlcr/activities/selectnth/thselectnthslave.cpp
  56. 2 2
      thorlcr/activities/selfjoin/thselfjoinslave.cpp
  57. 8 4
      thorlcr/activities/soapcall/thsoapcallslave.cpp
  58. 2 2
      thorlcr/activities/spill/thspillslave.cpp
  59. 2 6
      thorlcr/activities/temptable/thtmptableslave.cpp
  60. 1 2
      thorlcr/activities/thdiskbaseslave.cpp
  61. 2 2
      thorlcr/activities/topn/thtopnslave.cpp
  62. 1 1
      thorlcr/activities/trace/thtraceslave.cpp
  63. 18 25
      thorlcr/activities/when/thwhenslave.cpp
  64. 2 2
      thorlcr/activities/wuidread/thwuidreadslave.cpp
  65. 2 2
      thorlcr/activities/xmlparse/thxmlparseslave.cpp
  66. 1 1
      thorlcr/activities/xmlread/thxmlreadslave.cpp
  67. 2 3
      thorlcr/graph/thgraph.cpp
  68. 3 2
      thorlcr/graph/thgraph.hpp
  69. 7 7
      thorlcr/graph/thgraphmaster.cpp
  70. 5 5
      thorlcr/graph/thgraphslave.cpp
  71. 1 1
      thorlcr/slave/slave.hpp
  72. 1 0
      thorlcr/thorutil/thbuf.cpp
  73. 1 1
      version.cmake

+ 3 - 4
common/environment/environment.cpp

@@ -1576,12 +1576,11 @@ IConstDropZoneInfo * CLocalEnvironment::getDropZoneByAddressPath(const char * ne
 
     ForEach(*zoneIt)
     {
-        SCMStringBuffer dropZoneName;
-        zoneIt->query().getName(dropZoneName);
-
         SCMStringBuffer dropZoneDir;
         zoneIt->query().getDirectory(dropZoneDir);
-        const char * pdropzoneDir = dropZoneDir.str();
+        StringBuffer fullDropZoneDir(dropZoneDir.str());
+        addPathSepChar(fullDropZoneDir);
+        const char * pdropzoneDir = fullDropZoneDir.str();
 
         // Check target file path starts with this Drop zone path
         // the drop zone paths can be nested (nothing forbids it) like

+ 2 - 2
ecl/hql/hqlutil.cpp

@@ -2443,7 +2443,7 @@ void DependenciesUsed::addResultRead(IHqlExpression * wuid, IHqlExpression * seq
     if (!isGraphResult)
         if (!seq || !seq->queryValue())
             return;         //Can be called in parser when no sequence has been allocated
-    OwnedHqlExpr result = createAttribute(resultAtom, LINK(seq), LINK(name), LINK(wuid));
+    OwnedHqlExpr result = createExprAttribute(resultAtom, LINK(seq), LINK(name), LINK(wuid));
     if (resultsWritten.find(*result) == NotFound)
         appendUniqueExpr(resultsRead, LINK(result));
 }
@@ -2453,7 +2453,7 @@ void DependenciesUsed::addResultWrite(IHqlExpression * seq, IHqlExpression * nam
     if (!isGraphResult)
         if (!seq || !seq->queryValue())
             return;         //Can be called in parser when no sequence has been allocated
-    OwnedHqlExpr result = createAttribute(resultAtom, LINK(seq), LINK(name));
+    OwnedHqlExpr result = createExprAttribute(resultAtom, LINK(seq), LINK(name));
     if (appendUniqueExpr(resultsWritten, LINK(result)))
         if (resultsRead.contains(*result))
             noteInconsistency(result);

+ 14 - 2
ecl/hqlcpp/hqlttcpp.cpp

@@ -5600,6 +5600,18 @@ void intersectDependencies(UnsignedArray & target, UnsignedArray const & d1, Uns
     }
 }
 
+static IHqlExpression * createResultsAttribute(const HqlExprArray & results)
+{
+    HqlExprArray globalResults;
+    ForEachItemIn(i, results)
+    {
+        IHqlExpression & cur = results.item(i);
+        //Only include global results - it may be possible to have a result read in a child query, from a workunit with an unknown wuid.
+        if (cur.isIndependentOfScope())
+            globalResults.append(OLINK(cur));
+    }
+    return createExprAttribute(_results_Atom, globalResults);
+}
 
 //------------------------------------------------------------------------
 
@@ -6082,7 +6094,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
             inheritDependencies(&checkArgs.item(0));
             if (dependencies.resultsRead.ordinality())
             {
-                checkArgs.append(*createExprAttribute(_results_Atom, dependencies.resultsRead));
+                checkArgs.append(*createResultsAttribute(dependencies.resultsRead));
                 inheritDependencies(&checkArgs.item(1));
             }
             checkArgs.append(*createAttribute(_codehash_Atom, LINK(codehash)));
@@ -6328,7 +6340,7 @@ IHqlExpression * WorkflowTransformer::createTransformed(IHqlExpression * expr)
                         updateArgs.append(*attr.getClear());
                 }
                 if (dependencies.resultsRead.ordinality())
-                    updateArgs.append(*createExprAttribute(_results_Atom, dependencies.resultsRead));
+                    updateArgs.append(*createResultsAttribute(dependencies.resultsRead));
 
                 HqlExprArray args;
                 unwindChildren(args, transformed);

+ 15 - 3
esp/src/eclwatch/ESPGraph.js

@@ -326,12 +326,20 @@ define([
 
         addSubgraph: function (subgraph) {
             subgraph.__hpcc_parent = this;
-            this.__hpcc_subgraphs.push(subgraph);
+            if (!arrayUtil.some(this.__hpcc_subgraphs, function (subgraph2) {
+                return subgraph === subgraph2;
+            })) {
+                this.__hpcc_subgraphs.push(subgraph);
+            }
         },
 
         addVertex: function (vertex) {
             vertex.__hpcc_parent = this;
-            this.__hpcc_vertices.push(vertex);
+            if (!arrayUtil.some(this.__hpcc_vertices, function (vertex2) {
+                return vertex === vertex2;
+            })) {
+                this.__hpcc_vertices.push(vertex);
+            }
         },
 
         removeVertex: function (vertex) {
@@ -342,7 +350,11 @@ define([
 
         addEdge: function (edge) {
             edge.__hpcc_parent = this;
-            this.__hpcc_edges.push(edge);
+            if (!arrayUtil.some(this.__hpcc_edges, function (edge2) {
+                return edge === edge2;
+            })) {
+                this.__hpcc_edges.push(edge);
+            }
         },
 
         removeEdge: function (edge) {

+ 12 - 1
esp/src/eclwatch/GraphWidget.js

@@ -432,6 +432,14 @@ define([
                     this.isIE11 = true;
                 }
                 this.graphViewHistory = new GraphViewHistory(this);
+                this._options = {};
+            },
+
+            option: function (key, _) {
+                if (arguments.length < 1) throw Error("Invalid Call:  option");
+                if (arguments.length === 1) return this._options[key];
+                this._options[key] = _ instanceof Array ? _.length > 0 : _;
+                return this;
             },
 
             _onClickRefresh: function () {
@@ -946,7 +954,10 @@ define([
 
             getLocalisedXGMML: function (selectedItems, depth, distance, hideSpills) {
                 if (this.hasPlugin()) {
-                    return this._plugin.getLocalisedXGMML(selectedItems, depth, distance, hideSpills);
+                    if (this._plugin.getLocalisedXGMML2) {
+                        return this._plugin.getLocalisedXGMML2(selectedItems, depth, distance, hideSpills);
+                    }
+                    return this._plugin.getLocalisedXGMML(selectedItems, depth, distance);
                 }
                 return null;
             },

+ 11 - 9
esp/src/eclwatch/JSGraphWidget.js

@@ -313,7 +313,7 @@ define([
                         return this.cleanObjects(this.graphData.edges);
                     },
 
-                    getLocalisedXGMML: function (selectedItems, depth, distance, noSpills) {
+                    getLocalisedXGMML2: function (selectedItems, depth, distance, noSpills) {
                         return this.graphData.getLocalisedXGMML(selectedItems, depth, distance, noSpills);
                     },
 
@@ -467,14 +467,16 @@ define([
                                         source = inputs[0];
                                     }
                                 }
-                                item.__widget = new Edge()
-                                    .sourceVertex(source.__widget)
-                                    .targetVertex(target.__widget)
-                                    .targetMarker("arrowHead")
-                                    .weight(weight)
-                                    .strokeDasharray(strokeDasharray)
-                                ;
-                                item.__widget.__hpcc_globalID = item.__hpcc_id;
+                                if (!merge || !item.__widget) {
+                                    item.__widget = new Edge()
+                                        .sourceVertex(source.__widget)
+                                        .targetVertex(target.__widget)
+                                        .targetMarker("arrowHead")
+                                        .weight(weight)
+                                        .strokeDasharray(strokeDasharray)
+                                    ;
+                                    item.__widget.__hpcc_globalID = item.__hpcc_id;
+                                }
                                 item.__widget.text(label);
                                 item.__widget.tooltip(tooltip);
                                 item.__widget.classed({

+ 31 - 21
rtl/eclrtl/rtlbcdtest.cpp

@@ -147,6 +147,9 @@ protected:
     {
         char temp[80];
         value.getCString(sizeof(temp), temp);
+        const char * unknown = strchr(expected, 'x');
+        if (unknown && temp[unknown-expected])
+            temp[unknown-expected] = 'x';
         cppunit_assert(strcmp(expected, temp) == 0, "ERROR: checkDecimal/char: expected '%s', got '%s'", expected, temp);
     }
 
@@ -440,6 +443,8 @@ protected:
         testMultiply("0.000000000000000101","0.0000000000000000099009901","0");
         testMultiply("0.000000000000000101","0.000000000000000099009901","0.00000000000000000000000000000001");
         testMultiply("109", "9174311926605504587155963302.75229357798165137614678899082568", "999999999999999999999999999999.99999999999999999999999999999912");
+        testMultiply("109", "9174311926605504587155963302.75229357798165137614678899082569", "1000000000000000000000000000000.00000000000000000000000000000021");
+        testMultiply("9999999999.999999999999999999999999999999","9999999999.999999999999999999999999999999","99999999999999999999.99999999999999999998"); // actually 99999999999999999999.999999999999999999980000000000000000000000000000000000000001
 
         Decimal a = "9999999999999999";
         Decimal b = "10000000000000002";
@@ -456,10 +461,11 @@ protected:
         testDivide("125","5","25");
         testDivide("99980001","9999","9999");
         testDivide("0.1234","10000000000000000000000000000000","0.00000000000000000000000000000001");
-        testDivide("0.1234","20000000000000000000000000000000","0");
+        testDivide("0.1234","20000000000000000000000000000000","0.00000000000000000000000000000001");
+        testDivide("0.1234","30000000000000000000000000000000","0");
         testDivide("1","0.00000000000000000000000000000002", "50000000000000000000000000000000");
         testDivide("1","3", "0.33333333333333333333333333333333");
-        testDivide("1000000000000000000000000000000","109", "9174311926605504587155963302.75229357798165137614678899082568");
+        testDivide("1000000000000000000000000000000","109", "9174311926605504587155963302.75229357798165137614678899082569");
         testModulus("1000000000000000000000000000000","109", "82");
         testModulus("10","5","0");
         testModulus("10","6","4");
@@ -500,7 +506,7 @@ protected:
     void testBcdPower()
     {
         //MORE: Test power functions...
-        const char * values[] = { "0.00001", "10000", "-1", "-10", "1.0001", "9.99" };
+        const char * values[] = { "10000", "-1", "-10", "1.0001", "9.99" };
         Decimal one(1);
         for (unsigned idx = 0; idx < _elements_in(values); idx++)
         {
@@ -537,18 +543,21 @@ protected:
                 }
 
                 //internal consistency test, but liable to rounding errors....
-                if (true)
+                Decimal product(powerValue1);
+                product.multiply(powerValue2);
+                if (power && (product.compareNull() != 0) && (product.compare(one) != 0))
                 {
-                    powerValue1.multiply(powerValue2);
-                    if (power && (powerValue1.compareNull() != 0) && (powerValue1.compare(one) != 0))
-                    {
-                        Decimal diff = powerValue1;
-                        diff.subtract(one);
-                        one.getCString(sizeof(temp1), temp1);
-                        powerValue1.getCString(sizeof(temp2), temp2);
-                        diff.getCString(sizeof(temp3), temp3);
-                        success &= check(false, "ERROR: %s^%d^-%d=%s (expected %s) diff %s", values[idx], power, power, temp2, temp1, temp3);
-                    }
+                    char temp4[80];
+                    char temp5[80];
+                    Decimal diff = product;
+                    diff.subtract(one);
+                    one.getCString(sizeof(temp1), temp1);
+                    product.getCString(sizeof(temp2), temp2);
+                    diff.getCString(sizeof(temp3), temp3);
+                    powerValue1.getCString(sizeof(temp4), temp4);
+                    powerValue2.getCString(sizeof(temp5), temp5);
+                    //Report rounding errors, but don't trigger a failure
+                    check(false, "ERROR: %s^%d^-%d=%s (expected %s) diff %s [%s*%s]", values[idx], power, power, temp2, temp1, temp3, temp4, temp5);
                 }
 
                 sofar1.multiply(value);
@@ -565,13 +574,14 @@ protected:
         checkDecimal(-9999999.12, "-9999999.12");
         checkDecimal(9999999.12345678, "9999999.12345678");
         checkDecimal(-9999999.12345678, "-9999999.12345678");
-        checkDecimal(9999999.123456789, "9999999.12345679");
-        checkDecimal(-9999999.123456789, "-9999999.12345679");
-
-        checkDecimal(99999991234567800.00, "99999991234567800");
-        checkDecimal(-99999991234567800.00, "-99999991234567800");
-        checkDecimal(99999991234567890.00, "99999991234567900");
-        checkDecimal(-99999991234567890.00, "-99999991234567900");
+        checkDecimal(9999999.123456789, "9999999.123456789");
+        checkDecimal(-9999999.123456789, "-9999999.123456789");
+
+        //MORE: The exact values are out of our control.
+        //Real->decimal extracts 16 decimal digits, but only 15.9 are significant, so the last digit cannot be guaranteed.
+        checkDecimal(91999991234567800.00, "919999912345678x0");
+        checkDecimal(-91999991234567800.00, "-919999912345678x0");
+        checkDecimal(91999991234567123.00, "919999912345671x0");
 
         // in vc++ these real constants seem to only have 14 significant digits
 //      checkDecimal(0.99999991234567800, "0.999999912345678");

+ 48 - 6
rtl/nbcd/nbcd.cpp

@@ -151,7 +151,7 @@ Decimal & Decimal::divide(const Decimal & other)
 
     int nd1 = hi1+1-lo1;
     int nd2 = hi2+1-lo2;
-    int hi = (hi1-hi2)+zeroDigit;
+    int hi = (hi1-hi2)+zeroDigit; // how many digits will there be in the result
     int iters = hi+1;
     if (hi < 0)
     {
@@ -168,9 +168,9 @@ Decimal & Decimal::divide(const Decimal & other)
     lsb = 0;
     msb = hi >= maxDigits ? maxDigits-1 : hi;
 
-    const byte spare = 2;
+    const byte spare = 3;
     byte temp[maxDigits*2 + 3];
-    unsigned numeratorDigits = hi + 1 + nd2;
+    unsigned numeratorDigits = (hi + 1) + nd2;
     memset(temp, 0, numeratorDigits+spare);             // ensure two zero in msb, and below lsb.  Also 2 zeros for looking 2 bytes ahead..
 
     byte * numerator = temp+spare;
@@ -227,6 +227,38 @@ Decimal & Decimal::divide(const Decimal & other)
             digits[iter] = q;
     }
     //MORE: This should really calculate the next digit, and conditionally round the least significant digit.
+    if (true)
+    {
+        //The following guess for q is never too small, may be 1 too large
+        byte * curNumerator = numerator-1;
+        unsigned numerator012 = curNumerator[nd2] * 100 + curNumerator[nd2-1] * 10 + curNumerator[nd2-2];
+        unsigned q = numerator012 / divisor01;
+        if (q == 5)
+        {
+            unsigned carry = 0;
+            for (int i = 0; i < nd2; i++)
+            {
+                int next = 90 + curNumerator[i] - divisor[i] * q - carry;
+                carry = 9 - next / 10;
+            }
+            carry -= curNumerator[nd2];
+            if (carry)
+                q--;
+        }
+
+        if (q >= 5)
+        {
+            for (unsigned roundDigit=0; roundDigit < iters; roundDigit++)
+            {
+                unsigned next = digits[roundDigit]+1;
+                if (next == 10)
+                    next = 0;
+                digits[roundDigit] = next;
+                if (next != 0)
+                    break;
+            }
+        }
+    }
 
     negative ^= other.negative;
     return *this;
@@ -305,19 +337,29 @@ Decimal & Decimal::multiply(const Decimal & other)
         }
     }
 
-    //Now copy the results, taking cary of the carries 
+    //Now copy the results, taking care of the carries
     unsigned carry = 0;
     int j;
     for (j = low1+low2 - zeroDigit; j < lowt; j++)
-        carry = (temp[j+zeroDigit]+carry)/10;
+    {
+        unsigned next = temp[j+zeroDigit]+carry;
+        //Round the least significant digit
+        if (j+1 == lowt)
+            next += 5;
+        carry = next / 10;
+    }
+
     for (j = lowt; j <= hight; j++)
     {
         div_t next = div(temp[j+zeroDigit]+carry, 10);
         digits[j] = next.rem;
         carry = next.quot;
     }
-    if ((hight < maxDigits-1) && (carry != 0))
+    while ((hight < maxDigits-1) && (carry != 0))
+    {
         digits[++hight] = carry % 10;
+        carry = carry / 10;
+    }
 
     lsb = lowt;
     msb = hight;

+ 4 - 0
testing/regress/ecl/key/loopif.xml

@@ -0,0 +1,4 @@
+<Dataset name='Result 1'>
+ <Row><i>61</i><j>1</j><c>4</c></Row>
+ <Row><i>62</i><j>2</j><c>4</c></Row>
+</Dataset>

+ 36 - 0
testing/regress/ecl/loopif.ecl

@@ -0,0 +1,36 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+rec := RECORD
+ unsigned4 i;
+ unsigned4 j;
+ unsigned4 c := 0;
+END;
+
+ds := DATASET([{1, 1}, {2, 2}], rec);
+
+DATASET(rec) loopBody(DATASET(rec) ds, unsigned c) := FUNCTION
+ step1 := TABLE(NOFOLD(ds), { i, j, unsigned ni := SUM(GROUP, i); }, j, FEW);
+ p1 := PROJECT(ds, TRANSFORM(rec, SELF.i := LEFT.i + 20; SELF.c := c; SELF := LEFT));
+ p2 := PROJECT(NOFOLD(step1), TRANSFORM(rec, SELF.i := LEFT.i + 10; SELF.c := c; SELF := LEFT));
+ dsret := IF(c % 2 = 1, p1, p2);
+ RETURN dsret;
+END;
+
+l1 := LOOP(NOFOLD(ds), 4, loopBody(ROWS(LEFT), COUNTER));
+
+OUTPUT(l1);

+ 1 - 1
thorlcr/activities/aggregate/thaggregateslave.cpp

@@ -118,12 +118,12 @@ public:
     AggregateSlaveBase(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         hadElement = inputStopped = false;
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         if (!container.queryLocal())
             mpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
     }
 };
 

+ 2 - 2
thorlcr/activities/aggregate/thgroupaggregateslave.cpp

@@ -28,12 +28,12 @@ public:
     GroupAggregateSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     { 
+        helper = static_cast <IHThorAggregateArg *> (queryHelper());
+        appendOutputLinked(this);
     }
 
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
-        helper = static_cast <IHThorAggregateArg *> (queryHelper());
     }
 
     virtual void start() override

+ 0 - 6
thorlcr/activities/apply/thapplyslave.cpp

@@ -26,12 +26,6 @@ public:
     CApplySlaveActivity(CGraphElementBase *container) 
         : ProcessSlaveActivity(container)
     { 
-        helper = NULL;
-    }
-
-// IThorSlaveActivity overloaded methods
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
         helper = static_cast <IHThorApplyArg *> (queryHelper());
     }
 // IThorSlaveProcess overloaded methods

+ 4 - 8
thorlcr/activities/catch/thcatchslave.cpp

@@ -27,16 +27,12 @@ class CCatchSlaveActivityBase : public CSlaveActivity
     typedef CSlaveActivity PARENT;
 protected:
     IHThorCatchArg *helper;
-    bool eos;
+    bool eos = false;
 
 public:
     CCatchSlaveActivityBase(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         helper = static_cast <IHThorCatchArg *> (queryHelper());
-        eos = false;
         appendOutputLinked(this);
     }
     virtual void start() override
@@ -151,7 +147,7 @@ class CSkipCatchSlaveActivity : public CCatchSlaveActivityBase
 {
     bool gathered, global, grouped, running;
     Owned<IBarrier> barrier;
-    Owned<IRowStream> inputStream;
+    Owned<IRowStream> gatheredInputStream;
 
     bool gather()
     {
@@ -174,7 +170,7 @@ class CSkipCatchSlaveActivity : public CCatchSlaveActivityBase
                 overflowBuf->putRow(row.getClear());
             }
             overflowBuf->flush();
-            inputStream.setown(overflowBuf->getReader()); 
+            gatheredInputStream.setown(overflowBuf->getReader());
         }
         catch (IException *)
         {
@@ -251,7 +247,7 @@ public:
                 return NULL;
             }
         }
-        OwnedConstThorRow row(inputStream->nextRow());
+        OwnedConstThorRow row(gatheredInputStream->nextRow());
         if (!row)
             return NULL;
         dataLinkIncrement();

+ 4 - 6
thorlcr/activities/choosesets/thchoosesetsslave.cpp

@@ -32,9 +32,10 @@ protected:
 public:
     BaseChooseSetsActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-        helper = NULL;
+        helper = static_cast <IHThorChooseSetsArg *> (queryHelper());
         done = false;
         tallies = NULL;
+        appendOutputLinked(this);
     }
     ~BaseChooseSetsActivity()
     {
@@ -44,8 +45,6 @@ public:
     virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
     {
         mpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
-        helper = static_cast <IHThorChooseSetsArg *> (queryHelper());
     }
     virtual void start() override
     {
@@ -255,12 +254,13 @@ public:
 
     ChooseSetsPlusActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-        helper = NULL;
+        helper = static_cast <IHThorChooseSetsExArg *> (queryHelper());
         counts = NULL;
         priorCounts = NULL;
         totalCounts = NULL;
         limits = NULL;
         inputCounter.setown(new CInputCounter(*this));
+        appendOutputLinked(this);
     }
     ~ChooseSetsPlusActivity()
     {
@@ -273,8 +273,6 @@ public:
     {
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
-        helper = static_cast <IHThorChooseSetsExArg *> (queryHelper());
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {

+ 1 - 4
thorlcr/activities/countproject/thcountprojectslave.cpp

@@ -35,11 +35,8 @@ protected:
 public:
     BaseCountProjectActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
-    {
-        appendOutputLinked(this);
         helper = static_cast <IHThorCountProjectArg *> (queryHelper());
+        appendOutputLinked(this);
     }
 };
 

+ 1 - 1
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -316,6 +316,7 @@ public:
         headerLines = helper->queryCsvParameters()->queryHeaderLen();
         superFDesc = NULL;
         subFiles = 0;
+        appendOutputLinked(this);
     }
 
 // IThorSlaveActivity
@@ -360,7 +361,6 @@ public:
             sentHeaderLines.setown(createThreadSafeBitSet());
         }
         partHandler.setown(new CCsvPartHandler(*this));
-        appendOutputLinked(this);
     }
     virtual void kill()
     {

+ 0 - 3
thorlcr/activities/degroup/thdegroupslave.cpp

@@ -27,9 +27,6 @@ public:
     CDegroupSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container), CThorSteppable(this)
     { 
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override

+ 25 - 35
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -391,11 +391,10 @@ public:
     };
 
 public:
-    bool needTransform, unsorted, countSent;
-    rowcount_t limit;
-    rowcount_t stopAfter;
-    IRowStream *out;
-    size32_t maxrecsize;
+    bool needTransform = false, unsorted = false, countSent = false;
+    rowcount_t limit = 0;
+    rowcount_t stopAfter = 0;
+    IRowStream *out = nullptr;
 
     IHThorDiskReadArg *helper;
 
@@ -405,13 +404,7 @@ public:
         unsorted = 0 != (TDRunsorted & helper->getFlags());
         grouped = 0 != (TDXgrouped & helper->getFlags());
         needTransform = segMonitors.length() || helper->needTransform();
-        out = NULL;
-        countSent = false;
-        if (helper->getFlags() & TDRlimitskips)
-            limit = RCMAX;
-        else
-            limit = (rowcount_t)helper->getRowLimit();
-        stopAfter = (rowcount_t)helper->getChooseNLimit();
+        appendOutputLinked(this);
     }
     ~CDiskReadSlaveActivity()
     {
@@ -443,9 +436,6 @@ public:
                 unsorted = false;
             }
         }
-
-        appendOutputLinked(this);
-
     }
     virtual void kill()
     {
@@ -477,6 +467,11 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         CDiskReadSlaveActivityRecord::start();
+        if (helper->getFlags() & TDRlimitskips)
+            limit = RCMAX;
+        else
+            limit = (rowcount_t)helper->getRowLimit();
+        stopAfter = (rowcount_t)helper->getChooseNLimit();
         out = createSequentialPartHandler(partHandler, partDescs, grouped); // **
     }
     virtual bool isGrouped() const override { return grouped; }
@@ -586,21 +581,16 @@ class CDiskNormalizeSlave : public CDiskReadSlaveActivityRecord
     };
 
     IHThorDiskNormalizeArg *helper;
-    rowcount_t limit;
-    rowcount_t stopAfter;
-    IRowStream *out;
+    rowcount_t limit = 0;
+    rowcount_t stopAfter = 0;
+    IRowStream *out = nullptr;
 
 public:
     CDiskNormalizeSlave(CGraphElementBase *_container) 
         : CDiskReadSlaveActivityRecord(_container)
     {
         helper = (IHThorDiskNormalizeArg *)queryHelper();
-        if (helper->getFlags() & TDRlimitskips)
-            limit = RCMAX;
-        else
-            limit = (rowcount_t)helper->getRowLimit();
-        stopAfter = (rowcount_t)helper->getChooseNLimit();
-        out = NULL;
+        appendOutputLinked(this);
     }
     ~CDiskNormalizeSlave()
     {
@@ -611,7 +601,6 @@ public:
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         CDiskReadSlaveActivityRecord::init(data, slaveData);
-        appendOutputLinked(this);
         partHandler.setown(new CNormalizePartHandler(*this));
     }
 
@@ -632,6 +621,11 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         CDiskReadSlaveActivityRecord::start();
+        if (helper->getFlags() & TDRlimitskips)
+            limit = RCMAX;
+        else
+            limit = (rowcount_t)helper->getRowLimit();
+        stopAfter = (rowcount_t)helper->getChooseNLimit();
         out = createSequentialPartHandler(partHandler, partDescs, false);
     }
     virtual bool isGrouped() const override { return false; }
@@ -727,6 +721,7 @@ public:
         helper = (IHThorDiskAggregateArg *)queryHelper();
         eoi = false;
         allocator.set(queryRowAllocator());
+        appendOutputLinked(this);
     }
 
 // IThorSlaveActivity
@@ -735,7 +730,6 @@ public:
         CDiskReadSlaveActivityRecord::init(data, slaveData);
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
         partHandler.setown(new CDiskSimplePartHandler(*this));
     }
     virtual void abort()
@@ -834,18 +828,14 @@ class CDiskCountSlave : public CDiskReadSlaveActivityRecord
     typedef CDiskReadSlaveActivityRecord PARENT;
 
     IHThorDiskCountArg *helper;
-    rowcount_t stopAfter, preknownTotalCount;
-    bool eoi, totalCountKnown;
+    rowcount_t stopAfter = 0, preknownTotalCount = 0;
+    bool eoi = false, totalCountKnown = false;
 
 public:
     CDiskCountSlave(CGraphElementBase *_container) : CDiskReadSlaveActivityRecord(_container)
     {
         helper = (IHThorDiskCountArg *)queryHelper();
-        totalCountKnown = eoi = false;
-        preknownTotalCount = 0;
-        mpTag = TAG_NULL;
-        stopAfter = (rowcount_t)helper->getChooseNLimit();
-        totalCountKnown = false;
+        appendOutputLinked(this);
     }
 
 // IThorSlaveActivity
@@ -856,7 +846,6 @@ public:
             mpTag = container.queryJobChannel().deserializeMPTag(data);
         data.read(totalCountKnown);
         data.read(preknownTotalCount);
-        appendOutputLinked(this);
         partHandler.setown(new CDiskSimplePartHandler(*this));
     }
     virtual void abort()
@@ -878,6 +867,7 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
         CDiskReadSlaveActivityRecord::start();
+        stopAfter = (rowcount_t)helper->getChooseNLimit();
         eoi = false;
         if (!helper->canMatchAny())
         {
@@ -969,6 +959,7 @@ public:
     {
         helper = (IHThorDiskGroupAggregateArg *)queryHelper();
         merging = false;
+        appendOutputLinked(this);
     }
 
 // IHThorGroupAggregateCallback
@@ -981,7 +972,6 @@ public:
     {
         CDiskReadSlaveActivityRecord::init(data, slaveData);
         mpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
         partHandler.setown(new CDiskSimplePartHandler(*this));
         allocator.set(queryRowAllocator());
     }

+ 4 - 2
thorlcr/activities/diskwrite/thdwslave.cpp

@@ -132,11 +132,13 @@ protected:
     }
 
 public:
-    CCsvWriteSlaveActivity(CGraphElementBase *container) : CDiskWriteSlaveActivity(container) { }
+    CCsvWriteSlaveActivity(CGraphElementBase *container) : CDiskWriteSlaveActivity(container)
+    {
+        helper = static_cast <IHThorCsvWriteArg *> (queryHelper());
+    }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         CDiskWriteSlaveActivity::init(data, slaveData);
-        helper = static_cast <IHThorCsvWriteArg *> (queryHelper());
 
         singleHF = 0 != (ICsvParameters::singleHeaderFooter & helper->queryCsvParameters()->getFlags());
         csvOutput.init(helper->queryCsvParameters(), 0 != container.queryJob().getWorkUnitValueInt("oldCSVoutputFormat", 0));

+ 1 - 1
thorlcr/activities/distribution/thdistributionslave.cpp

@@ -27,11 +27,11 @@ class CDistributionSlaveActivity : public ProcessSlaveActivity
 public:
     CDistributionSlaveActivity(CGraphElementBase *container) : ProcessSlaveActivity(container)
     {
+        helper = static_cast <IHThorDistributionArg *> (queryHelper());
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         mpTag = container.queryJobChannel().deserializeMPTag(data);
-        helper = static_cast <IHThorDistributionArg *> (queryHelper()); 
         aggy = (IDistributionTable * *)ma.allocate(helper->queryInternalRecordSize()->getMinRecordSize());
     }
     void kill()

+ 0 - 3
thorlcr/activities/enth/thenthslave.cpp

@@ -92,9 +92,6 @@ public:
 
     BaseEnthActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -299,6 +299,7 @@ public:
         fetchBaseHelper = (IHThorFetchBaseArg *)queryHelper();
         fetchContext = static_cast<IHThorFetchContext *>(fetchBaseHelper->selectInterface(TAIfetchcontext_1));
         reInit = 0 != (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
+        appendOutputLinked(this);
     }
     ~CFetchSlaveBase()
     {
@@ -340,7 +341,6 @@ public:
             free(encryptedKey);
         }
         fetchDiskRowIf.setown(createThorRowInterfaces(queryRowManager(), fetchContext->queryDiskRecordSize(), queryId(), queryCodeContext()));
-        appendOutputLinked(this);
     }
 
     virtual void initializeFileParts()

+ 2 - 14
thorlcr/activities/filter/thfilterslave.cpp

@@ -28,9 +28,6 @@ public:
     explicit CFilterSlaveActivityBase(CGraphElementBase *_container)
         : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override
@@ -63,10 +60,6 @@ public:
     CFilterSlaveActivity(CGraphElementBase *container)
         : CFilterSlaveActivityBase(container), CThorSteppable(this)
     {
-    }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        PARENT::init(data,slaveData);
         helper = static_cast <IHThorFilterArg *> (queryHelper());
     }
     virtual void start() override
@@ -183,11 +176,11 @@ public:
     CFilterProjectSlaveActivity(CGraphElementBase *container) 
         : CFilterSlaveActivityBase(container)
     {
+        helper = static_cast <IHThorFilterProjectArg *> (queryHelper());
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         PARENT::init(data,slaveData);
-        helper = static_cast <IHThorFilterProjectArg *> (queryHelper());
         allocator.set(queryRowAllocator());
     }
     virtual void start() override
@@ -261,8 +254,8 @@ class CFilterGroupSlaveActivity : public CFilterSlaveActivityBase, public CThorS
 public:
     CFilterGroupSlaveActivity(CGraphElementBase *container) : CFilterSlaveActivityBase(container), CThorSteppable(this)
     {
+        helper = (IHThorFilterGroupArg *)queryHelper();
         groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
-        helper = NULL;
         spillCompInfo = 0x0;
         if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
         {
@@ -271,11 +264,6 @@ public:
             setCompFlag(compType, spillCompInfo);
         }
     }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        PARENT::init(data,slaveData);
-        helper = (IHThorFilterGroupArg *)queryHelper();
-    }
     virtual void start() override
     {   
         ActivityTimer s(totalCycles, timeActivities);

+ 0 - 3
thorlcr/activities/firstn/thfirstnslave.cpp

@@ -44,9 +44,6 @@ public:
     {
         stopped = true;
         helper = (IHThorFirstNArg *)container.queryHelper();
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override

+ 2 - 22
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -283,6 +283,7 @@ public:
         readThisInput = 0;
         stopped = true;
         parallel = false;
+        appendOutputLinked(this);
     }
     ~FunnelSlaveActivity()
     {
@@ -293,7 +294,6 @@ public:
         IHThorFunnelArg *helper = (IHThorFunnelArg *)queryHelper();
         parallel = !container.queryGrouped() && !helper->isOrdered() && getOptBool(THOROPT_PARALLEL_FUNNEL, true);
         grouped = container.queryGrouped();
-        appendOutputLinked(this);
         ActPrintLog("FUNNEL mode = %s, grouped=%s", parallel?"PARALLEL":"ORDERED", grouped?"GROUPED":"UNGROUPED");
     }
     virtual void start() override
@@ -476,16 +476,9 @@ public:
         : CSlaveActivity(_container), rows(*this, this)
     {
         grouped = container.queryGrouped();
-    }
-    void init()
-    {
         helper = (IHThorCombineArg *) queryHelper();
         appendOutputLinked(this);
     }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        init();
-    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
@@ -582,16 +575,9 @@ public:
     RegroupSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         grouped = container.queryGrouped();
-    }
-    void init()
-    {
         helper = (IHThorRegroupArg *) queryHelper();
         appendOutputLinked(this);
     }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        init();
-    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
@@ -695,6 +681,7 @@ public:
         helper = (IHThorNonEmptyArg *) queryHelper();
         sendReceiving = false;
         masterMpTag = TAG_NULL;
+        appendOutputLinked(this);
     }
 
 // IThorSlaveActivity overloaded methods
@@ -702,7 +689,6 @@ public:
     {
         if (!container.queryLocalOrGrouped())
             masterMpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
     }
     void abort()
     {
@@ -790,9 +776,6 @@ public:
     CNWaySelectActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CThorSteppable(this)
     {
         helper = (IHThorNWaySelectArg *)queryHelper();
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
         appendOutputLinked(this);
     }
     virtual void start() override
@@ -903,9 +886,6 @@ public:
     {
         helper = (IHThorNWayInputArg *)queryHelper();
         grouped = helper->queryOutputMeta()->isGrouped(); // JCSMORE should match graph info, i.e. container.queryGrouped()
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override

+ 1 - 1
thorlcr/activities/group/thgroupslave.cpp

@@ -61,10 +61,10 @@ public:
         numGroups = 0;
         numGroupMax = 0;
         startLastGroup = 0;
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
         if (!container.queryLocalOrGrouped())
         {
             mpTag = container.queryJobChannel().deserializeMPTag(data);

+ 6 - 11
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -1962,6 +1962,7 @@ public:
     HashDistributeSlaveBase(CGraphElementBase *_container)
         : CSlaveActivity(_container)
     {
+        appendOutputLinked(this);
     }
     ~HashDistributeSlaveBase()
     {
@@ -1975,7 +1976,6 @@ public:
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
         mptag = container.queryJobChannel().deserializeMPTag(data);
         ActPrintLog("HASHDISTRIB: %sinit tag %d",mergecmp?"merge, ":"",(int)mptag);
 
@@ -2814,15 +2814,16 @@ public:
     HashDedupSlaveActivityBase(CGraphElementBase *_container, bool _local)
         : CSlaveActivity(_container), local(_local)
     {
+        helper = (IHThorHashDedupArg *)queryHelper();
         initialNumBuckets = 0;
         inputstopped = eos = lastEog = extractKey = local = isVariable = grouped = false;
-        helper = NULL;
         iHash = iKeyHash = NULL;
         iCompare = rowKeyCompare = NULL;
         keyRowInterfaces = NULL;
         hashTables = NULL;
         numHashTables = initialNumBuckets = 0;
         roxiemem::RoxieHeapFlags allocFlags = roxiemem::RHFnone;
+        appendOutputLinked(this);
     }
     ~HashDedupSlaveActivityBase()
     {
@@ -2831,9 +2832,7 @@ public:
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        helper = (IHThorHashDedupArg *)queryHelper();
         iHash = helper->queryHash();
-        appendOutputLinked(this);
         iCompare = helper->queryCompare();
 
         // JCSMORE - really should ask / lookup what flags the allocator created for extractKey has...
@@ -3551,6 +3550,7 @@ public:
         lhsProgressCount = rhsProgressCount = 0;
         mptag = TAG_NULL;
         mptag2 = TAG_NULL;
+        appendOutputLinked(this);
     }
     ~HashJoinSlaveActivity()
     {
@@ -3561,7 +3561,6 @@ public:
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         joinargs = (IHThorHashJoinArg *)queryHelper();
-        appendOutputLinked(this);
         mptag = container.queryJobChannel().deserializeMPTag(data);
         mptag2 = container.queryJobChannel().deserializeMPTag(data);
         ActPrintLog("HASHJOIN: init tags %d,%d",(int)mptag,(int)mptag2);
@@ -3888,14 +3887,13 @@ public:
     CHashAggregateSlave(CGraphElementBase *_container)
         : CSlaveActivity(_container)
     {
+        helper = static_cast <IHThorHashAggregateArg *> (queryHelper());
         mptag = TAG_NULL;
         eos = true;
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
     {
-        helper = static_cast <IHThorHashAggregateArg *> (queryHelper());
-        appendOutputLinked(this);
-
         if (!container.queryLocalOrGrouped())
         {
             mptag = container.queryJobChannel().deserializeMPTag(data);
@@ -3983,9 +3981,6 @@ public:
         ihash = distribargs->queryHash();
         myNode = queryJobChannel().queryMyRank()-1;
         nodes = container.queryJob().querySlaves();
-    }
-    virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override

+ 3 - 26
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -579,6 +579,7 @@ public:
             for (unsigned i=1; i < maxFields; i++)
                 seekSizes.append(seekSizes.item(i-1) + fields[i].size);
         }
+        appendOutputLinked(this);
     }
     ~CIndexReadSlaveActivity()
     {
@@ -611,7 +612,6 @@ public:
             else
                 steppingMeta.init(rawMeta, hasPostFilter);
         }
-        appendOutputLinked(this);
     }
 
 // IThorDataLink
@@ -783,18 +783,13 @@ public:
     {
         helper = (IHThorIndexGroupAggregateArg *)container.queryHelper();
         merging = false;
+        appendOutputLinked(this);
     }
 // IHThorGroupAggregateCallback
     virtual void processRow(const void *next)
     {
         localAggTable->addRow(next);
     }
-// IThorSlaveActivity
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        CIndexReadSlaveBase::init(data, slaveData);
-        appendOutputLinked(this);
-    }
 // IThorDataLink
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
     {
@@ -889,12 +884,6 @@ public:
     CIndexCountSlaveActivity(CGraphElementBase *_container) : CIndexReadSlaveBase(_container)
     {
         helper = static_cast <IHThorIndexCountArg *> (container.queryHelper());
-    }
-
-// IThorSlaveActivity
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        CIndexReadSlaveBase::init(data, slaveData);
         appendOutputLinked(this);
     }
 
@@ -1035,6 +1024,7 @@ public:
     CIndexNormalizeSlaveActivity(CGraphElementBase *_container) : CIndexReadSlaveBase(_container), partHelper(*this)
     {
         helper = (IHThorIndexNormalizeArg *)container.queryHelper();
+        appendOutputLinked(this);
     }
 
     virtual bool keyed()
@@ -1051,13 +1041,6 @@ public:
         return false;
     }
 
-// IThorSlaveActivity
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
-        CIndexReadSlaveBase::init(data, slaveData);
-        appendOutputLinked(this);
-    }
-
 // IThorDataLink
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
@@ -1233,12 +1216,6 @@ public:
         : CIndexReadSlaveBase(_container), partHelper(*this), aggregator(*this)
     {
         helper = (IHThorIndexAggregateArg *)container.queryHelper();
-    }
-
-// IThorSlaveActivity
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        CIndexReadSlaveBase::init(data, slaveData);
         appendOutputLinked(this);
     }
 

+ 3 - 6
thorlcr/activities/iterate/thgroupiterateslave.cpp

@@ -34,11 +34,8 @@ class GroupIterateSlaveActivity : public CSlaveActivity
 public:
     GroupIterateSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        appendOutputLinked(this);   // adding 'me' to outputs array
         helper = static_cast <IHThorGroupIterateArg *> (queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void start() override
     {
@@ -119,11 +116,11 @@ class GroupProcessSlaveActivity : public CSlaveActivity
 public:
     GroupProcessSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
+        helper = static_cast <IHThorProcessArg *> (queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);   // adding 'me' to outputs array
-        helper = static_cast <IHThorProcessArg *> (queryHelper());
         rightrowif.setown(createThorRowInterfaces(queryRowManager(), helper->queryRightRecordSize(),queryId(),queryCodeContext()));
         rightAllocator.set(rightrowif->queryRowAllocator());
     }

+ 7 - 23
thorlcr/activities/iterate/thiterateslave.cpp

@@ -36,10 +36,10 @@ public:
     IterateSlaveActivityBase(CGraphElementBase *_container, bool _global) : CSlaveActivity(_container)
     {
         global = _global;
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);   // adding 'me' to outputs array
         if (global)
             mpTag = container.queryJobChannel().deserializeMPTag(data);
     }
@@ -105,11 +105,7 @@ public:
     IterateSlaveActivity(CGraphElementBase *_container, bool _global) 
         : IterateSlaveActivityBase(_container,_global)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         helper = static_cast <IHThorIterateArg *> (queryHelper());
-        IterateSlaveActivityBase::init(data,slaveData);
     }
     virtual void start() override
     {
@@ -189,19 +185,13 @@ public:
     CProcessSlaveActivity(CGraphElementBase *_container, bool _global) 
         : IterateSlaveActivityBase(_container,_global)
     {
+        helper = static_cast <IHThorProcessArg *> (queryHelper());
     }
-
-    ~CProcessSlaveActivity()
-    {
-    }
-
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        helper = static_cast <IHThorProcessArg *> (queryHelper());
         rightRowAllocator.setown(getRowAllocator(helper->queryRightRecordSize()));
         IterateSlaveActivityBase::init(data,slaveData);
     }
-
     CATCH_NEXTROW()
     {
         ActivityTimer t(totalCycles, timeActivities);
@@ -281,11 +271,8 @@ class CChildIteratorSlaveActivity : public CSlaveActivity
 public:
     CChildIteratorSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        appendOutputLinked(this);   // adding 'me' to outputs array
         helper = static_cast <IHThorChildIteratorArg *> (queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void start() override
     {
@@ -334,11 +321,11 @@ public:
     CLinkedRawIteratorSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     {
+        helper = static_cast <IHThorLinkedRawIteratorArg *> (queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);   // adding 'me' to outputs array
-        helper = static_cast <IHThorLinkedRawIteratorArg *> (queryHelper());
         grouped = helper->queryOutputMeta()->isGrouped();
     }
     virtual void start() override
@@ -393,11 +380,8 @@ public:
     CStreamedIteratorSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     {
-    }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
-        appendOutputLinked(this);   // adding 'me' to outputs array
         helper = static_cast <IHThorStreamedIteratorArg *> (queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void start() override
     {

+ 1 - 2
thorlcr/activities/join/thjoin.cpp

@@ -83,7 +83,7 @@ public:
         ActPrintLog("JoinActivityMaster");
         lhsProgress.setown(new ProgressInfo(queryJob()));
         rhsProgress.setown(new ProgressInfo(queryJob()));
-        helper = NULL;
+        helper = (IHThorJoinArg *)queryHelper();
         islocal = local;
         imaster = NULL;
         selfJoinWarnLevel = INITIAL_SELFJOIN_MATCH_WARNING_LEVEL;
@@ -134,7 +134,6 @@ public:
         CMasterActivity::process();
         if (!islocal)
         {
-            helper = (IHThorJoinArg *)queryHelper();
             StringBuffer skewV;
             double skewError;
             container.queryJob().getWorkUnitValue("overrideSkewError", skewV);

+ 43 - 58
thorlcr/activities/join/thjoinslave.cpp

@@ -49,37 +49,33 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
     IEngineRowStream *secondaryInputStream = nullptr;
     Owned<IThorDataLink> leftInput, rightInput;
     Owned<IThorDataLink> secondaryInput, primaryInput;
-    IHThorJoinBaseArg *helper;
-    IHThorJoinArg *helperjn;
-    IHThorDenormalizeArg *helperdn;
     Owned<IThorSorter> sorter;
-    unsigned portbase;
-    mptag_t mpTagRPC;
-    ICompare *leftCompare;
-    ICompare *rightCompare;
-    ISortKeySerializer *leftKeySerializer;
-    ISortKeySerializer *rightKeySerializer;
-    ICompare *primarySecondaryCompare;
-    ICompare *primarySecondaryUpperCompare; // if non-null then between join
+    unsigned portbase = 0;
+    mptag_t mpTagRPC = TAG_NULL;
+    ICompare *primarySecondaryCompare = nullptr;
+    ICompare *primarySecondaryUpperCompare = nullptr; // if non-null then between join
 
     Owned<IRowStream> leftStream, rightStream;
     Owned<IException> secondaryStartException;
 
-    bool islocal;
     Owned<IBarrier> barrier;
     SocketEndpoint server;
 
-#ifdef _TESTING
-    bool started;
-#endif
-
     Owned<IJoinHelper> joinhelper;
-    rowcount_t lhsProgressCount, rhsProgressCount;
+    rowcount_t lhsProgressCount = 0, rhsProgressCount = 0;
     CriticalSection joinHelperCrit;
-    bool leftInputStopped;
-    bool rightInputStopped;
-    bool rightpartition;
+    bool leftInputStopped = true;
+    bool rightInputStopped = true;
     CRuntimeStatisticCollection spillStats;
+    IHThorJoinBaseArg *helper;
+    IHThorJoinArg *helperjn;
+    IHThorDenormalizeArg *helperdn;
+    ICompare *leftCompare;
+    ICompare *rightCompare;
+    ISortKeySerializer *leftKeySerializer;
+    ISortKeySerializer *rightKeySerializer;
+    bool rightpartition;
+    bool islocal;
 
 
     bool noSortPartitionSide()
@@ -144,51 +140,20 @@ public:
         : CSlaveActivity(_container), spillStats(spillStatistics)
     {
         islocal = local;
-        portbase = 0;
-#ifdef _TESTING
-        started = false;
-#endif
-        leftInputStopped = true;
-        rightInputStopped = true;
-        lhsProgressCount = 0;
-        rhsProgressCount = 0;
-        mpTagRPC = TAG_NULL;
-    }
-
-    ~JoinSlaveActivity()
-    {
-        if (portbase) 
-            freePort(portbase,NUMSLAVEPORTS);
-    }
-
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        if (!islocal)
-        {
-            mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
-            mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
-            barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
-            portbase = allocPort(NUMSLAVEPORTS);
-            ActPrintLog("SortJoinSlaveActivity::init portbase = %d, mpTagRPC=%d",portbase,(int)mpTagRPC);
-            server.setLocalHost(portbase); 
-            sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
-            server.serialize(slaveData);
-        }
-        appendOutputLinked(this);
         switch (container.getKind())
         {
             case TAKdenormalize:
             case TAKdenormalizegroup:
             {
-                helperjn = NULL;        
-                helperdn = (IHThorDenormalizeArg *)container.queryHelper();     
+                helperjn = nullptr;
+                helperdn = (IHThorDenormalizeArg *)container.queryHelper();
                 helper = helperdn;
                 break;
             }
             case TAKjoin:
             {
-                helperjn = (IHThorJoinArg *)container.queryHelper();        
-                helperdn = NULL;        
+                helperjn = (IHThorJoinArg *)container.queryHelper();
+                helperdn = nullptr;
                 helper = helperjn;
                 break;
             }
@@ -200,6 +165,29 @@ public:
         leftKeySerializer = helper->querySerializeLeft();
         rightKeySerializer = helper->querySerializeRight();
         rightpartition = (container.getKind()==TAKjoin)&&((helper->getJoinFlags()&JFpartitionright)!=0);
+        appendOutputLinked(this);
+    }
+
+    ~JoinSlaveActivity()
+    {
+        if (portbase) 
+            freePort(portbase,NUMSLAVEPORTS);
+    }
+
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
+    {
+        if (!islocal)
+        {
+            mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
+            mptag_t barrierTag = container.queryJobChannel().deserializeMPTag(data);
+            barrier.setown(container.queryJobChannel().createBarrier(barrierTag));
+            portbase = allocPort(NUMSLAVEPORTS);
+            ActPrintLog("SortJoinSlaveActivity::init portbase = %d, mpTagRPC=%d",portbase,(int)mpTagRPC);
+            server.setLocalHost(portbase); 
+            sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
+            server.serialize(slaveData);
+        }
+
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {
@@ -653,9 +641,6 @@ public:
         helper = (IHThorNWayMergeJoinArg *)queryHelper();
         inputAllocator.setown(getRowAllocator(helper->queryInputMeta()));
         outputAllocator.setown(getRowAllocator(helper->queryOutputMeta()));
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override

+ 1 - 2
thorlcr/activities/keydiff/thkeydiff.cpp

@@ -37,7 +37,7 @@ class CKeyDiffMaster : public CMasterActivity
 public:
     CKeyDiffMaster(CMasterGraphElement *info) : CMasterActivity(info)
     {
-        helper = NULL;
+        helper = (IHThorKeyDiffArg *)queryHelper();
         local = false;
         width = 0;
         copyTlk = globals->getPropBool("@diffCopyTlk", true); // because tlk can have meta data and diff/patch does not support
@@ -45,7 +45,6 @@ public:
     virtual void init()
     {
         CMasterActivity::init();
-        helper = (IHThorKeyDiffArg *)queryHelper();
         OwnedRoxieString originalHelperName(helper->getOriginalName());
         OwnedRoxieString updatedHelperName(helper->getUpdatedName());
         OwnedRoxieString outputHelperName(helper->getOutputName());

+ 1 - 2
thorlcr/activities/keydiff/thkeydiffslave.cpp

@@ -42,7 +42,7 @@ class CKeyDiffSlave : public ProcessSlaveActivity
 public:
     CKeyDiffSlave(CGraphElementBase *container) : ProcessSlaveActivity(container)
     {
-        helper = NULL;
+        helper = (IHThorKeyDiffArg *)queryHelper();
         tlk = false;
         copyTlk = globals->getPropBool("@diffCopyTlk", true); // because tlk can have meta data and diff/patch does not support
     }
@@ -52,7 +52,6 @@ public:
 
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        helper = (IHThorKeyDiffArg *)queryHelper();
         bool active;
         data.read(active);
         if (!active)

+ 1 - 1
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -1598,6 +1598,7 @@ public:
 #endif
         helper = (IHThorKeyedJoinArg *)queryHelper();
         reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
+        appendOutputLinked(this);
     }
     ~CKeyedJoinSlave()
     {
@@ -2014,7 +2015,6 @@ public:
             parallelLookups = 0;
             resultDistStream = new CKeyLocalLookup(*this);
         }
-        appendOutputLinked(this);
     }
     virtual void abort()
     {

+ 1 - 2
thorlcr/activities/keypatch/thkeypatch.cpp

@@ -37,14 +37,13 @@ class CKeyPatchMaster : public CMasterActivity
 public:
     CKeyPatchMaster(CMasterGraphElement *info) : CMasterActivity(info)
     {
-        helper = NULL;
+        helper = (IHThorKeyPatchArg *)queryHelper();
         local = false;
         width = 0;
     }
     virtual void init()
     {
         CMasterActivity::init();
-        helper = (IHThorKeyPatchArg *)queryHelper();
         OwnedRoxieString originalHelperName(helper->getOriginalName());
         OwnedRoxieString patchHelperName(helper->getPatchName());
         OwnedRoxieString outputHelperName(helper->getOutputName());

+ 2 - 7
thorlcr/activities/keypatch/thkeypatchslave.cpp

@@ -40,16 +40,11 @@ class CKeyPatchSlave : public ProcessSlaveActivity
 public:
     CKeyPatchSlave(CGraphElementBase *container) : ProcessSlaveActivity(container)
     {
-        helper = NULL;
+        helper = (IHThorKeyPatchArg *)queryHelper();
         tlk = copyTlk = false;
     }
-    ~CKeyPatchSlave()
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-    }
-
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
-        helper = (IHThorKeyPatchArg *)queryHelper();
         bool active;
         data.read(active);
         if (!active)

+ 1 - 2
thorlcr/activities/limit/thlimitslave.cpp

@@ -49,11 +49,10 @@ public:
         resultSent = container.queryLocal(); // i.e. local, so don't send result to master
         eos = stopped = anyThisGroup = eogNext = false;
         rowLimit = RCMAX;
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
-
         if (!container.queryLocal())
             mpTag = container.queryJobChannel().deserializeMPTag(data);
     }

+ 1 - 4
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -630,8 +630,6 @@ public:
         parallelMinChunkSize = 1024;
         parallelChunkSize = 10*parallelMinChunkSize;
         threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
-        if (0 == threadCount)
-            threadCount = getAffinityCpus();
     }
     bool init(rowidx_t rowCount, roxiemem::IRowManager *rowManager)
     {
@@ -1330,6 +1328,7 @@ public:
             rightThorAllocator = queryJobChannel().queryThorAllocator();
         rightRowManager = rightThorAllocator->queryRowManager();
         broadcastLock = NULL;
+        appendOutputLinked(this);
     }
     ~CInMemJoinBase()
     {
@@ -1362,8 +1361,6 @@ public:
 // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
-
         StringBuffer str;
         ActPrintLog("Join type is %s", getJoinTypeStr(str).str());
 

+ 6 - 15
thorlcr/activities/loop/thloopslave.cpp

@@ -80,10 +80,10 @@ public:
     {
         mpTag = TAG_NULL;
         maxEmptyLoopIterations = getOptUInt(THOROPT_LOOP_MAX_EMPTY, 1000);
+        appendOutputLinked(this);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJobChannel().deserializeMPTag(data);
         global = !queryContainer().queryLoopGraph()->queryGraph()->isLocalOnly();
@@ -528,10 +528,10 @@ public:
         helper = (IHThorLocalResultReadArg *)queryHelper();
         curRow = 0;
         replyTag = queryMPServer().createReplyTag();
+        appendOutputLinked(this);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
         assertex(container.queryResultsGraph());
     }
     virtual void start()
@@ -609,10 +609,10 @@ public:
     CLocalResultSpillActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         helper = (IHThorLocalResultSpillArg *)queryHelper();
+        appendOutputLinked(this);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
         mpTag = container.queryJobChannel().deserializeMPTag(data);
     }
     virtual void start()
@@ -800,9 +800,6 @@ protected:
 public:
     CConditionalActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
     virtual void start() override
@@ -962,11 +959,11 @@ public:
     CChildNormalizeSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         helper = (IHThorChildNormalizeArg *)queryHelper();
+        appendOutputLinked(this);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         allocator.set(queryRowAllocator());
-        appendOutputLinked(this);
     }
     virtual void start()
     {
@@ -1034,9 +1031,6 @@ public:
     CChildAggregateSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         helper = (IHThorChildAggregateArg *)queryHelper();
-    }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
         appendOutputLinked(this);
     }
     virtual void start()
@@ -1085,11 +1079,11 @@ public:
     CChildGroupAggregateActivitySlave(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         helper = (IHThorChildGroupAggregateArg *)queryHelper();
+        appendOutputLinked(this);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         allocator.set(queryRowAllocator());
-        appendOutputLinked(this);
     }
     virtual void start() override
     {
@@ -1154,12 +1148,12 @@ public:
     CChildThroughNormalizeSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), nextOutput(NULL)
     {
         helper = (IHThorChildThroughNormalizeArg *)queryHelper();
+        appendOutputLinked(this);
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         allocator.set(queryRowAllocator());
         nextOutput.setAllocator(allocator);
-        appendOutputLinked(this);
     }
     virtual void start()
     {
@@ -1233,9 +1227,6 @@ public:
     CGraphLoopResultReadSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container)
     {
         helper = (IHThorGraphLoopResultReadArg *)queryHelper();
-    }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
-    {
         appendOutputLinked(this);
     }
     virtual void kill()

+ 4 - 12
thorlcr/activities/merge/thmergeslave.cpp

@@ -248,6 +248,8 @@ public:
     {
         partitionpos = NULL;
         linkcounter.setown(new CThorRowLinkCounter);
+        helper = (IHThorMergeArg *)queryHelper();
+        appendOutputLinked(this);
     }
 
     ~GlobalMergeSlaveActivity()
@@ -267,8 +269,6 @@ public:
 // IThorSlaveActivity overloaded methods
     void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        helper = (IHThorMergeArg *)queryHelper();
-        appendOutputLinked(this);
         masterMpTag = container.queryJobChannel().deserializeMPTag(data);
     }
 
@@ -418,22 +418,17 @@ class LocalMergeSlaveActivity : public CSlaveActivity
     Owned<IRowStream> out;
     IHThorMergeArg *helper;
 public:
-    LocalMergeSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container) { }
-
-// IThorSlaveActivity overloaded methods
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
+    LocalMergeSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
         helper = (IHThorMergeArg *)queryHelper();
         appendOutputLinked(this);
     }
-
     void abort()
     {
         ActPrintLog("abort");
         CSlaveActivity::abort();
     }
 
-
 // IThorDataLink
     virtual void start() override
     {
@@ -545,15 +540,12 @@ public:
         helper = (IHThorNWayMergeArg *)queryHelper();
         merger.init(helper->queryCompare(), helper->dedup(), helper->querySteppingMeta()->queryCompare());
         initializedMeta = false;
+        appendOutputLinked(this);
     }
     ~CNWayMergeActivity()
     {
         merger.cleanup();
     }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        appendOutputLinked(this);
-    }
     virtual void start() override
     {
         CThorNarySlaveActivity::start();

+ 2 - 2
thorlcr/activities/msort/thgroupsortslave.cpp

@@ -46,6 +46,7 @@ public:
     CLocalSortSlaveActivity(CGraphElementBase *_container)
         : CSlaveActivity(_container), spillStats(spillStatistics)
     {
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
@@ -53,7 +54,6 @@ public:
         iCompare = helper->queryCompare();
         IHThorAlgorithm * algo = helper?(static_cast<IHThorAlgorithm *>(helper->selectInterface(TAIalgorithm_1))):NULL;
         unstable = (algo&&(algo->getAlgorithmFlags()&TAFunstable));
-        appendOutputLinked(this);
     }
     virtual void start()
     {
@@ -139,12 +139,12 @@ public:
     {
         helper = (IHThorSortedArg *)queryHelper();
         icompare = helper->queryCompare();
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         helper = (IHThorSortedArg *)queryHelper();
         icompare = helper->queryCompare();
-        appendOutputLinked(this);
     }
     virtual void start() override
     {

+ 1 - 1
thorlcr/activities/msort/thmsortslave.cpp

@@ -62,6 +62,7 @@ public:
     {
         portbase = 0;
         totalrows = RCUNSET;
+        appendOutputLinked(this);
     }
     ~MSortSlaveActivity()
     {
@@ -78,7 +79,6 @@ public:
         server.setLocalHost(portbase); 
         helper = (IHThorSortArg *)queryHelper();
         sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
-        appendOutputLinked(this);
         server.serialize(slaveData);
     }
     virtual void start() override

+ 2 - 2
thorlcr/activities/msort/thsortu.cpp

@@ -2016,7 +2016,7 @@ IJoinHelper *createJoinHelper(CActivityBase &activity, IHThorJoinArg *helper, IT
     IJoinHelper *jhelper = new CJoinHelper(activity, helper, rowIf);
     if (!parallelmatch||helper->getKeepLimit()||((helper->getJoinFlags()&JFslidingmatch)!=0)) // currently don't support betweenjoin or keep and multicore
         return jhelper;
-    unsigned numthreads = activity.getOptInt(THOROPT_JOINHELPER_THREADS, getAffinityCpus());
+    unsigned numthreads = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
     ActPrintLog(&activity, "Join helper using %d threads", numthreads);
     if (unsortedoutput)
         return new CMultiCoreUnorderedJoinHelper(activity, numthreads, false, jhelper, helper, rowIf);
@@ -2035,7 +2035,7 @@ IJoinHelper *createSelfJoinHelper(CActivityBase &activity, IHThorJoinArg *helper
     IJoinHelper *jhelper = new SelfJoinHelper(activity, helper, rowIf);
     if (!parallelmatch||helper->getKeepLimit()||((helper->getJoinFlags()&JFslidingmatch)!=0)) // currently don't support betweenjoin or keep and multicore
         return jhelper;
-    unsigned numthreads = activity.getOptInt(THOROPT_JOINHELPER_THREADS, getAffinityCpus());
+    unsigned numthreads = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
     ActPrintLog(&activity, "Self join helper using %d threads", numthreads);
     if (unsortedoutput)
         return new CMultiCoreUnorderedJoinHelper(activity, numthreads, true, jhelper, helper, rowIf);

+ 6 - 10
thorlcr/activities/normalize/thnormalizeslave.cpp

@@ -42,11 +42,11 @@ public:
     NormalizeSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     {
+        helper = static_cast <IHThorNormalizeArg *> (queryHelper());
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
-        helper = static_cast <IHThorNormalizeArg *> (queryHelper());
         allocator.set(queryRowAllocator());
     }
     virtual void start() override
@@ -116,13 +116,12 @@ public:
     CNormalizeChildSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     { 
+        helper = static_cast <IHThorNormalizeChildArg *> (queryHelper());
+        appendOutputLinked(this);
     }
     virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
-        helper = static_cast <IHThorNormalizeChildArg *> (queryHelper());
-
         cursor = helper->queryIterator();
         allocator.set(queryRowAllocator());
     }
@@ -209,13 +208,10 @@ public:
     CNormalizeLinkedChildSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     { 
-    }
-    virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        appendOutputLinked(this);
         helper = static_cast <IHThorNormalizeLinkedChildArg *> (queryHelper());
+        appendOutputLinked(this);
     }
+    virtual bool isGrouped() const override { return queryInput(0)->isGrouped(); }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);

+ 17 - 41
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -31,13 +31,13 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
     Semaphore writeBlockSem;
     bool started = false, stopped = false;
 
-    unsigned activeOutput;
+    unsigned outIdx;
     rowcount_t rec = 0, max = 0;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IStartableEngineRowStream>);
 
-    CSplitterOutput(NSplitterSlaveActivity &_activity, unsigned outIdx, unsigned activeOutput);
+    CSplitterOutput(NSplitterSlaveActivity &_activity, unsigned outIdx);
 
     void reset()
     {
@@ -139,8 +139,12 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
         }
     }
 public:
-    NSplitterSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container), writer(*this)
+    NSplitterSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), writer(*this)
     {
+        activeOutputs = container.getOutputs();
+        ActPrintLog("Number of connected outputs: %u", activeOutputs);
+        ForEachItemIn(o, container.outputs)
+            appendOutput(new CSplitterOutput(*this, o));
     }
     virtual void reset() override
     {
@@ -158,36 +162,8 @@ public:
                 output->reset();
         }
     }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        ForEachItemIn(o, container.outputs)
-        {
-            if (nullptr != container.connectedOutputs.queryItem(o))
-                ++activeOutputs;
-        }
-        ActPrintLog("Number of connected outputs: %u", activeOutputs);
-        if (activeOutputs <= 1)
-        {
-            ForEachItemIn(o2, container.outputs)
-            {
-                if (o2 == o)
-                    appendOutputLinked(this);
-                else
-                    appendOutput(nullptr);
-            }
-        }
-        else
-        {
-            unsigned activeOutput = 0;
-            ForEachItemIn(o, container.outputs)
-            {
-                CIOConnection *io = container.connectedOutputs.queryItem(o);
-                if (nullptr != io)
-                    appendOutput(new CSplitterOutput(*this, io->index, activeOutput++));
-                else
-                    appendOutput(nullptr);
-            }
-        }
         IHThorSplitArg *helper = (IHThorSplitArg *)queryHelper();
         int dV = getOptInt(THOROPT_SPLITTER_SPILL, -1);
         if (-1 == dV)
@@ -209,7 +185,7 @@ public:
                 if (output && output->isStopped())
                     --remainingOutputs;
             }
-            assertex(remainingOutputs); // must be >=1, as this output (activeOutput) has invoked prepareInput
+            assertex(remainingOutputs); // must be >=1, as this output (outIdx) has invoked prepareInput
             if (1 == remainingOutputs)
                 return false;
             if (smartBuf)
@@ -241,11 +217,11 @@ public:
         }
         return true;
     }
-    inline const void *nextRow(unsigned activeOutput)
+    inline const void *nextRow(unsigned outIdx)
     {
         if (1 == remainingOutputs) // will be true, if only 1 input connect, or only 1 input was active (others stopped) when it started reading
             return inputStream->nextRow();
-        OwnedConstThorRow row = smartBuf->queryOutput(activeOutput)->nextRow(); // will block until available
+        OwnedConstThorRow row = smartBuf->queryOutput(outIdx)->nextRow(); // will block until available
         if (writeAheadException)
             throw LINK(writeAheadException);
         return row.getClear();
@@ -307,7 +283,7 @@ public:
         }
         return recsReady;
     }
-    void inputStopped(unsigned activeOutput)
+    void inputStopped(unsigned outIdx)
     {
         CriticalBlock block(startLock);
         if (smartBuf)
@@ -315,7 +291,7 @@ public:
             /* If no output has started reading (nextRow()), then it will not have been prepared
              * If only 1 output is left, it will bypass the smart buffer when it starts.
              */
-            smartBuf->queryOutput(activeOutput)->stop();
+            smartBuf->queryOutput(outIdx)->stop();
         }
         ++stoppedOutputs;
         if (stoppedOutputs == activeOutputs)
@@ -420,8 +396,8 @@ void CSplitterOutput::debugRequest(MemoryBuffer &mb)
 }
 
 
-CSplitterOutput::CSplitterOutput(NSplitterSlaveActivity &_activity, unsigned outIdx, unsigned _activeOutput)
-   : CEdgeProgress(&_activity, outIdx), activity(_activity), activeOutput(_activeOutput)
+CSplitterOutput::CSplitterOutput(NSplitterSlaveActivity &_activity, unsigned _outIdx)
+   : CEdgeProgress(&_activity, _outIdx), activity(_activity), outIdx(_outIdx)
 {
 }
 
@@ -437,7 +413,7 @@ void CSplitterOutput::start()
 void CSplitterOutput::stop()
 { 
     stopped = true;
-    activity.inputStopped(activeOutput);
+    activity.inputStopped(outIdx);
     dataLinkStop();
 }
 
@@ -449,7 +425,7 @@ const void *CSplitterOutput::nextRow()
         // NB: if this is sole input that actually started, writeahead will have returned RCMAX and calls to activity.nextRow will go directly to splitter input
     }
     ActivityTimer t(totalCycles, activity.queryTimeActivities());
-    const void *row = activity.nextRow(activeOutput); // pass ptr to max if need more
+    const void *row = activity.nextRow(outIdx); // pass ptr to max if need more
     ++rec;
     if (row)
         dataLinkIncrement();

+ 4 - 5
thorlcr/activities/nullaction/thnullactionslave.cpp

@@ -29,14 +29,13 @@ class CNullActionSlaveActivity : public CSlaveActivity
     typedef CSlaveActivity PARENT;
 
 public:
-    CNullActionSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container) { }
-    ~CNullActionSlaveActivity()
+    CNullActionSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
+        appendOutputLinked(this);
     }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
+    ~CNullActionSlaveActivity()
     {
-        appendOutputLinked(this);
-    } 
+    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);

+ 2 - 3
thorlcr/activities/parse/thparseslave.cpp

@@ -45,10 +45,11 @@ class CParseSlaveActivity : public CSlaveActivity, implements IMatchedAction
 public:
     CParseSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
+        helper = (IHThorParseArg *)queryHelper();
         anyThisGroup = false;
         curSearchTextLen = 0;
         curSearchText = NULL;
-
+        appendOutputLinked(this);
     }
     ~CParseSlaveActivity()
     {
@@ -57,8 +58,6 @@ public:
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
-        helper = (IHThorParseArg *)queryHelper();
         algorithm.setown(createThorParser(queryCodeContext(), *helper));
         parser.setown(algorithm->createParser(queryCodeContext(), (unsigned)container.queryId(), helper->queryHelper(), helper));
         rowIter = parser->queryResultIter();

+ 4 - 5
thorlcr/activities/piperead/thprslave.cpp

@@ -189,6 +189,8 @@ public:
     CPipeReadSlaveActivity(CGraphElementBase *_container) 
         : CPipeSlaveBase(_container)
     {
+        helper = static_cast <IHThorPipeReadArg *> (queryHelper());
+        appendOutputLinked(this);
     }
     CATCH_NEXTROW()
     {   
@@ -229,13 +231,11 @@ public:
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        helper = static_cast <IHThorPipeReadArg *> (queryHelper());
         flags = helper->getPipeFlags();
         needTransform = false;
 
         if (needTransform)
             inrowif.setown(createThorRowInterfaces(queryRowManager(), helper->queryDiskRecordSize(), queryId(), queryCodeContext()));
-        appendOutputLinked(this);
     }
     virtual void start() override
     {
@@ -337,8 +337,10 @@ public:
     CPipeThroughSlaveActivity(CGraphElementBase *_container)
         : CPipeSlaveBase(_container)
     {
+        helper = static_cast <IHThorPipeThroughArg *> (queryHelper());
         pipeWriter = NULL;
         grouped = false;
+        appendOutputLinked(this);
     }
     ~CPipeThroughSlaveActivity()
     {
@@ -346,12 +348,9 @@ public:
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        helper = static_cast <IHThorPipeThroughArg *> (queryHelper());
         flags = helper->getPipeFlags();
         recreate = helper->recreateEachRow();
         grouped = 0 != (flags & TPFgroupeachrow);
-
-        appendOutputLinked(this);
     }
     virtual void start() override
     {

+ 9 - 13
thorlcr/activities/project/thprojectslave.cpp

@@ -83,6 +83,7 @@ class CProjectSlaveActivity : public CThorStrandedActivity
 public:
     explicit CProjectSlaveActivity(CGraphElementBase *_container) : CThorStrandedActivity(_container)
     {
+        helper = static_cast <IHThorProjectArg *> (queryHelper());
         appendOutputLinked(this);
     }
 
@@ -92,11 +93,6 @@ public:
     }
     virtual CThorStrandProcessor *createStrandSourceProcessor(bool inputOrdered) override { throwUnexpected(); }
 
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        helper = static_cast <IHThorProjectArg *> (queryHelper());
-    }
-
 // IThorDataLink
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
     {
@@ -115,12 +111,12 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity
     typedef CSlaveActivity PARENT;
 
     IHThorPrefetchProjectArg *helper;
-    rowcount_t numProcessedLastGroup;
-    bool eof;
+    rowcount_t numProcessedLastGroup = 0;
+    bool eof = false;
     Owned<IEngineRowAllocator> allocator;
     IThorChildGraph *child = nullptr;
-    bool parallel;
-    unsigned preload;
+    bool parallel = false;
+    unsigned preload = 0;
 
     class PrefetchInfo : public CSimpleInterface
     {
@@ -252,19 +248,19 @@ public:
     {
         helper = (IHThorPrefetchProjectArg *) queryHelper();
         parallel = 0 != (helper->getFlags() & PPFparallel);
-        preload = helper->getLookahead();
-        if (!preload)
-            preload = 10; // default
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
         allocator.set(queryRowAllocator());
     }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);
         PARENT::start();
+        preload = helper->getLookahead();
+        if (!preload)
+            preload = 10; // default
         child = helper->queryChild();
         numProcessedLastGroup = getDataLinkGlobalCount();
         eof = !helper->canMatchAny();

+ 0 - 5
thorlcr/activities/pull/thpullslave.cpp

@@ -28,11 +28,6 @@ class PullSlaveActivity : public CSlaveActivity
 public:
     PullSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-    }
-
-// IThorSlaveActivity overloaded methods
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
     }
 

+ 4 - 13
thorlcr/activities/rollup/throllupslave.cpp

@@ -293,12 +293,8 @@ public:
     CDedupBaseSlaveActivity(CGraphElementBase *_container, bool global, bool groupOp)
         : CDedupRollupBaseActivity(_container, false, global, groupOp)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        CDedupRollupBaseActivity::init(data, slaveData);
-        appendOutputLinked(this);   // adding 'me' to outputs array
         ddhelper = static_cast <IHThorDedupArg *>(queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     virtual void start() override
     {
@@ -455,12 +451,8 @@ public:
     CRollupSlaveActivity(CGraphElementBase *_container, bool global, bool groupOp) 
         : CDedupRollupBaseActivity(_container, true, global, groupOp)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        CDedupRollupBaseActivity::init(data, slaveData);
-        appendOutputLinked(this);   // adding 'me' to outputs array
         ruhelper = static_cast <IHThorRollupArg *>  (queryHelper());
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     inline bool eog()
     {
@@ -560,13 +552,12 @@ class CRollupGroupSlaveActivity : public CSlaveActivity
 public:
     CRollupGroupSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), rows(*this, NULL)
     {
+        helper = (IHThorRollupGroupArg *)queryHelper();
         eoi = false;
-        helper = NULL;
+        appendOutputLinked(this);   // adding 'me' to outputs array
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        helper = (IHThorRollupGroupArg *)queryHelper();
-        appendOutputLinked(this);   // adding 'me' to outputs array
         groupLoader.setown(createThorRowLoader(*this, NULL, stableSort_none, rc_allMem));
     }
     virtual void start()

+ 2 - 4
thorlcr/activities/sample/thsampleslave.cpp

@@ -28,12 +28,10 @@ class SampleSlaveActivity : public CSlaveActivity
     bool eogNext;
 
 public:
-    SampleSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container) { }
-
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
+    SampleSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-        appendOutputLinked(this);
         helper = static_cast <IHThorSampleArg *> (queryHelper());
+        appendOutputLinked(this);
     }
     virtual void start() override
     {

+ 2 - 2
thorlcr/activities/selectnth/thselectnthslave.cpp

@@ -65,8 +65,10 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
 public:
     CSelectNthSlaveActivity(CGraphElementBase *_container, bool _isLocal) : CSlaveActivity(_container)
     {
+        helper = static_cast <IHThorSelectNArg *> (queryHelper());
         isLocal = _isLocal;
         createDefaultIfFail = isLocal || lastNode();
+        appendOutputLinked(this);
     }
 
 // IThorSlaveActivity overloaded methods
@@ -74,8 +76,6 @@ public:
     {
         if (!container.queryLocalOrGrouped())
             mpTag = container.queryJobChannel().deserializeMPTag(data);
-        appendOutputLinked(this);
-        helper = static_cast <IHThorSelectNArg *> (queryHelper());
     }
     virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
     {

+ 2 - 2
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -103,6 +103,7 @@ public:
     SelfJoinSlaveActivity(CGraphElementBase *_container, bool _isLocal, bool _isLightweight)
         : CSlaveActivity(_container), spillStats(spillStatistics)
     {
+        helper = static_cast <IHThorJoinArg *> (queryHelper());
         isLocal = _isLocal||_isLightweight;
         isLightweight = _isLightweight;
         portbase = 0;
@@ -110,6 +111,7 @@ public:
         keyserializer = NULL;
         inputStopped = false;
         mpTagRPC = TAG_NULL;
+        appendOutputLinked(this);
     }
 
     ~SelfJoinSlaveActivity()
@@ -121,7 +123,6 @@ public:
 // IThorSlaveActivity
     virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
     {       
-        appendOutputLinked(this);
         if(!isLocal)
         {
             mpTagRPC = container.queryJobChannel().deserializeMPTag(data);
@@ -132,7 +133,6 @@ public:
             sorter.setown(CreateThorSorter(this, server,&container.queryJob().queryIDiskUsage(),&queryJobChannel().queryJobComm(),mpTagRPC));
             server.serialize(slaveData);
         }
-        helper = static_cast <IHThorJoinArg *> (queryHelper());
         compare = helper->queryCompareLeft();                   // NB not CompareLeftRight
         keyserializer = helper->querySerializeLeft();           // hopefully never need right
         if(isLightweight) 

+ 8 - 4
thorlcr/activities/soapcall/thsoapcallslave.cpp

@@ -43,13 +43,15 @@ class CWscRowCallSlaveActivity : public CSlaveActivity, implements IWSCRowProvid
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
-    CWscRowCallSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container) { }
+    CWscRowCallSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
+    {
+        appendOutputLinked(this);
+    }
 
     // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         buildAuthToken(queryJob().queryUserDescriptor(), authToken);
-        appendOutputLinked(this);
     }
     // IThorDataLink methods
     virtual void start()
@@ -136,14 +138,16 @@ class SoapDatasetCallSlaveActivity : public CSlaveActivity, implements IWSCRowPr
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
-    SoapDatasetCallSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container) { }
+    SoapDatasetCallSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
+    {
+        appendOutputLinked(this);
+    }
 
     // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
         StringBuffer authToken;
         buildAuthToken(queryJob().queryUserDescriptor(), authToken);
-        appendOutputLinked(this);
         wscHelper.setown(createSoapCallHelper(this, queryRowAllocator(), authToken.str(), SCdataset, NULL, queryDummyContextLogger(),NULL));
     }
     // IThorDataLink methods

+ 2 - 2
thorlcr/activities/spill/thspillslave.cpp

@@ -49,6 +49,7 @@ public:
         compress = false;
         grouped = false;
         usageCount = 0;
+        appendOutputLinked(this);
     }
 
     ~SpillSlaveActivity()
@@ -56,14 +57,13 @@ public:
         close();
     }
 
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         partDesc.setown(deserializePartFileDescriptor(data));
         compress = partDesc->queryOwner().isCompressed();
         data.read(usageCount);
         getPartFilename(*partDesc, 0, fileName);
         grouped = 0 != (TDXgrouped & ((IHThorSpillArg *)queryHelper())->getFlags());
-        appendOutputLinked(this);
     }
 
     void open()

+ 2 - 6
thorlcr/activities/temptable/thtmptableslave.cpp

@@ -42,17 +42,13 @@ public:
     CInlineTableSlaveActivity(CGraphElementBase *_container)
     : CSlaveActivity(_container)
     {
-        helper = NULL;
+        helper = static_cast <IHThorInlineTableArg *> (queryHelper());
         startRow = 0;
         currentRow = 0;
         maxRow = 0;
-    }
-    virtual bool isGrouped() const override { return false; }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
         appendOutputLinked(this);
-        helper = static_cast <IHThorInlineTableArg *> (queryHelper());
     }
+    virtual bool isGrouped() const override { return false; }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);

+ 1 - 2
thorlcr/activities/thdiskbaseslave.cpp

@@ -456,6 +456,7 @@ void CDiskWriteSlaveActivityBase::close()
 CDiskWriteSlaveActivityBase::CDiskWriteSlaveActivityBase(CGraphElementBase *container)
 : ProcessSlaveActivity(container), fileStats(diskWriteRemoteStatistics)
 {
+    diskHelperBase = static_cast <IHThorDiskWriteArg *> (queryHelper());
     grouped = false;
     compress = calcFileCrc = false;
     uncompressedBytesWritten = 0;
@@ -466,8 +467,6 @@ CDiskWriteSlaveActivityBase::CDiskWriteSlaveActivityBase(CGraphElementBase *cont
 
 void CDiskWriteSlaveActivityBase::init(MemoryBuffer &data, MemoryBuffer &slaveData)
 {
-    diskHelperBase = static_cast <IHThorDiskWriteArg *> (queryHelper());
-
     StringAttr logicalFilename;
     data.read(logicalFilename);
     dlfn.set(logicalFilename);

+ 2 - 2
thorlcr/activities/topn/thtopnslave.cpp

@@ -85,7 +85,9 @@ public:
         : CSlaveActivity(_container), global(_global), grouped(_grouped), sortedRows(*this, this)
     {
         assertex(!(global && grouped));
+        helper = (IHThorTopNArg *) queryHelper();
         eog = eos = false;
+        appendOutputLinked(this);
     }
     ~TopNSlaveActivity()
     {
@@ -94,8 +96,6 @@ public:
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
-        helper = (IHThorTopNArg *) queryHelper();
         topNLimit = RIUNSET;
         compare = helper->queryCompare();
 

+ 1 - 1
thorlcr/activities/trace/thtraceslave.cpp

@@ -39,10 +39,10 @@ public:
           keepLimit(0), skip(0), sample(0), traceEnabled(false)
     {
         helper = (IHThorTraceArg *) queryHelper();
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
         traceEnabled = getOptBool(THOROPT_TRACE_ENABLED, false);
     }
     virtual void start() override

+ 18 - 25
thorlcr/activities/when/thwhenslave.cpp

@@ -22,33 +22,38 @@
 #include "commonext.hpp"
 #include "slave.ipp"
 
-class CDependencyExecutorSlaveActivity : public CSimpleInterface
+class CDependencyExecutorSlaveActivity : public CSlaveActivity
 {
 protected:
     size32_t savedParentExtractSz;
     const byte *savedParentExtract;
     bool global;
     Owned<IBarrier> barrier;
-    CSlaveActivity *activity;
 
 public:
-    CDependencyExecutorSlaveActivity(CSlaveActivity *_activity) : activity(_activity)
+    CDependencyExecutorSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
-        global = !activity->queryContainer().queryOwner().queryOwner() || activity->queryContainer().queryOwner().isGlobal();
+        global = !queryContainer().queryOwner().queryOwner() || queryContainer().queryOwner().isGlobal();
     }
-    void init(MemoryBuffer &data, MemoryBuffer &slaveData)
+    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
         if (global)
         {
-            mptag_t barrierTag = activity->queryContainer().queryJobChannel().deserializeMPTag(data);
-            barrier.setown(activity->queryContainer().queryJobChannel().createBarrier(barrierTag));
+            mptag_t barrierTag = queryJobChannel().deserializeMPTag(data);
+            barrier.setown(queryJobChannel().createBarrier(barrierTag));
         }
     }
-    void preStart(size32_t parentExtractSz, const byte *parentExtract)
+    virtual void preStart(size32_t parentExtractSz, const byte *parentExtract) override
     {
         savedParentExtractSz = parentExtractSz;
         savedParentExtract = parentExtract;
     }
+    virtual void abort()
+    {
+        CSlaveActivity::abort();
+        if (global)
+            barrier->cancel();
+    }
     bool executeDependencies(int controlId)
     {
         if (global)
@@ -58,27 +63,21 @@ public:
         }
         else
         {
-            ActPrintLog(activity, "Executing dependencies");
-            activity->queryContainer().executeDependencies(savedParentExtractSz, savedParentExtract, controlId, true);
+            ActPrintLog("Executing dependencies");
+            queryContainer().executeDependencies(savedParentExtractSz, savedParentExtract, controlId, true);
         }
         return true;
     }
 };
 
 
-class CWhenSlaveActivity : public CSlaveActivity, public CDependencyExecutorSlaveActivity
+class CWhenSlaveActivity : public CDependencyExecutorSlaveActivity
 {
-    typedef CSlaveActivity PARENT;
+    typedef CDependencyExecutorSlaveActivity PARENT;
 
 public:
-    IMPLEMENT_IINTERFACE_USING(CDependencyExecutorSlaveActivity);
-
-    CWhenSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container), CDependencyExecutorSlaveActivity(this)
+    CWhenSlaveActivity(CGraphElementBase *_container) : CDependencyExecutorSlaveActivity(_container)
     {
-    }
-    virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
-    {
-        CDependencyExecutorSlaveActivity::init(data, slaveData);
         appendOutputLinked(this);
     }
     virtual void stop() override
@@ -100,12 +99,6 @@ public:
         dataLinkIncrement();
         return row.getClear();
     }
-    virtual void abort()
-    {
-        CSlaveActivity::abort();
-        if (global)
-            barrier->cancel();
-    }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
     {
         initMetaInfo(info);

+ 2 - 2
thorlcr/activities/wuidread/thwuidreadslave.cpp

@@ -41,14 +41,14 @@ public:
     CWuidReadSlaveActivity(CGraphElementBase *_container) 
         : CSlaveActivity(_container)
     {
+        helper = (IHThorWorkunitReadArg *)queryHelper();
         replyTag = queryMPServer().createReplyTag();
         replyStream.setown(createMemoryBufferSerialStream(masterReplyMsg));
         rowSource.setStream(replyStream);
+        appendOutputLinked(this);
     }
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
     {
-        appendOutputLinked(this);
-        helper = (IHThorWorkunitReadArg *)queryHelper();
         grouped = helper->queryOutputMeta()->isGrouped();
     } 
     virtual void start() override

+ 2 - 2
thorlcr/activities/xmlparse/thxmlparseslave.cpp

@@ -40,7 +40,9 @@ public:
 
     CXmlParseSlaveActivity(CGraphElementBase *_container) : CSlaveActivity(_container)
     {
+        helper = static_cast <IHThorXmlParseArg *> (queryHelper());
         searchStr = NULL;
+        appendOutputLinked(this);
     }
 
 // IXMLSelect
@@ -52,8 +54,6 @@ public:
 // IThorSlaveActivity overloaded methods
     virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
-        appendOutputLinked(this);
-        helper = static_cast <IHThorXmlParseArg *> (queryHelper());
         allocator.set(queryRowAllocator());
     }
     virtual void kill()

+ 1 - 1
thorlcr/activities/xmlread/thxmlreadslave.cpp

@@ -204,6 +204,7 @@ public:
             limit = RCMAX;
         else
             limit = (rowcount_t)helper->getRowLimit();  
+        appendOutputLinked(this);
     }
     ~CXmlReadSlaveActivity()
     {
@@ -213,7 +214,6 @@ public:
     {
         CDiskReadSlaveActivityBase::init(data, slaveData);
         partHandler.setown(new CXmlPartHandler(*this,queryRowAllocator()));
-        appendOutputLinked(this);
     }
     virtual void kill()
     {

+ 2 - 3
thorlcr/graph/thgraph.cpp

@@ -373,6 +373,7 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
     whichBranch = (unsigned)-1;
     log = true;
     sentActInitData.setown(createThreadSafeBitSet());
+    maxCores = queryXGMML().getPropInt("hint[@name=\"max_cores\"]/@value", queryJob().queryMaxDefaultActivityCores());
     baseHelper.setown(helperFactory());
 }
 
@@ -2386,7 +2387,7 @@ void CJobBase::init()
 
     // global setting default on, can be overridden by #option
     timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
-    maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", 0)); // NB: 0 means system decides
+    maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", getAffinityCpus())); // NB: 0 means system decides
     pausing = false;
     resumed = false;
 
@@ -2853,8 +2854,6 @@ CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_contai
     baseHelper.set(container.queryHelper());
     parentExtractSz = 0;
     parentExtract = NULL;
-    // NB: maxCores, currently only used to control # cores used by sorts
-    maxCores = container.queryXGMML().getPropInt("hint[@name=\"max_cores\"]/@value", container.queryJob().queryMaxDefaultActivityCores());
 }
 
 CActivityBase::~CActivityBase()

+ 3 - 2
thorlcr/graph/thgraph.hpp

@@ -256,6 +256,7 @@ protected:
     Owned<IThorBoundLoopGraph> loopGraph; // really only here as master and slave derivatives set/use
     MemoryBuffer createCtxMb, startCtxMb;
     bool haveCreateCtx, haveStartCtx;
+    unsigned maxCores;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -336,6 +337,7 @@ public:
         return NULL;
     }
     IHThorArg *queryHelper() const { return baseHelper; }
+    unsigned queryMaxCores() const { return maxCores; }
 
     IPropertyTree &queryXGMML() const { return *xgmml; }
     const activity_id &queryOwnerId() const { return ownerId; }
@@ -1002,7 +1004,6 @@ protected:
     size32_t parentExtractSz;
     const byte *parentExtract;
     bool receiving, cancelledReceive, reInit;
-    unsigned maxCores; // NB: only used by acts that sort at the moment
     Owned<IThorGraphResults> ownedResults; // NB: probably only to be used by loop results
 
 public:
@@ -1029,7 +1030,7 @@ public:
     void cancelReceiveMsg(const rank_t rank, const mptag_t mpTag);
     bool firstNode() { return 1 == container.queryJobChannel().queryMyRank(); }
     bool lastNode() { return container.queryJob().querySlaves() == container.queryJobChannel().queryMyRank(); }
-    unsigned queryMaxCores() const { return maxCores; }
+    unsigned queryMaxCores() const { return container.queryMaxCores(); }
     IThorRowInterfaces *getRowInterfaces();
     IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
 

+ 7 - 7
thorlcr/graph/thgraphmaster.cpp

@@ -2292,7 +2292,7 @@ void CMasterGraph::sendActivityInitData()
     for (; w<queryJob().querySlaves(); w++)
     {
         unsigned needActInit = 0;
-        Owned<IThorActivityIterator> iter = getConnectedIterator(false);
+        Owned<IThorActivityIterator> iter = getConnectedIterator();
         ForEach(*iter)
         {
             CGraphElementBase &element = iter->query();
@@ -2307,7 +2307,7 @@ void CMasterGraph::sendActivityInitData()
             try
             {
                 msg.rewrite(pos);
-                Owned<IThorActivityIterator> iter = getConnectedIterator(false);
+                Owned<IThorActivityIterator> iter = getConnectedIterator();
                 serializeActivityInitData(w, msg, *iter);
             }
             catch (IException *e)
@@ -2419,11 +2419,11 @@ void CMasterGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentE
                 }
             }
         }
-        if (syncInitData())
-        {
-            sendActivityInitData(); // has to be done at least once
-            // NB: At this point, on the slaves, the graphs will start
-        }
+    }
+    if (syncInitData())
+    {
+        sendActivityInitData(); // has to be done at least once
+        // NB: At this point, on the slaves, the graphs will start
     }
     fatalHandler.clear();
     fatalHandler.setown(new CFatalHandler(globals->getPropInt("@fatal_timeout", FATAL_TIMEOUT)));

+ 5 - 5
thorlcr/graph/thgraphslave.cpp

@@ -798,7 +798,7 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
 {
     bool ret = true;
     unsigned needActInit = 0;
-    Owned<IThorActivityIterator> iter = getConnectedIterator(false);
+    Owned<IThorActivityIterator> iter = getConnectedIterator();
     ForEach(*iter)
     {
         CGraphElementBase &element = (CGraphElementBase &)iter->query();
@@ -832,7 +832,7 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
             assertex(!parentExtractSz || NULL!=parentExtract);
             msg.append(parentExtractSz);
             msg.append(parentExtractSz, parentExtract);
-            Owned<IThorActivityIterator> iter = getConnectedIterator(false);
+            Owned<IThorActivityIterator> iter = getConnectedIterator();
             ForEach(*iter)
             {
                 CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
@@ -869,7 +869,7 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
             if (queryOwner() && !isGlobal())
             {
                 // initialize any for which no data was sent
-                Owned<IThorActivityIterator> iter = getConnectedIterator(false);
+                Owned<IThorActivityIterator> iter = getConnectedIterator();
                 ForEach(*iter)
                 {
                     CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
@@ -1006,10 +1006,10 @@ void CSlaveGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentEx
         // could still request 1 off, onCreate serialization from master 1st.
                 }
             }
-            if (!recvActivityInitData(parentExtractSz, parentExtract))
-                throw MakeThorException(0, "preStart failure");
             connect(); // only now do slave acts. have all their outputs prepared.
         }
+        if (!recvActivityInitData(parentExtractSz, parentExtract))
+            throw MakeThorException(0, "preStart failure");
         CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
     }
     catch (IException *e)

+ 1 - 1
thorlcr/slave/slave.hpp

@@ -73,7 +73,7 @@ public:
         //PARALLEL(1) can be used to explicitly disable parallel processing.
         numStrands = container.queryXGMML().getPropInt("att[@name='parallel']/@value", 0);
         if ((numStrands == NotFound) || (numStrands > MAX_SENSIBLE_STRANDS))
-            numStrands = getAffinityCpus();
+            numStrands = container.queryMaxCores();
         if (0 == numStrands)
             numStrands = container.queryJob().getOptInt("forceNumStrands");
         blockSize = container.queryJob().getOptInt("strandBlockSize");

+ 1 - 0
thorlcr/thorutil/thbuf.cpp

@@ -1767,6 +1767,7 @@ public:
 // ISharedWriteBuffer impl.
     virtual IRowWriter *getWriter()
     {
+        CThorArrayLockBlock block(rows);
         ++numWriters;
         return new CAWriter(*this);
     }

+ 1 - 1
version.cmake

@@ -6,5 +6,5 @@ set ( HPCC_MAJOR 6 )
 set ( HPCC_MINOR 1 )
 set ( HPCC_POINT 0 )
 set ( HPCC_MATURITY "rc" )
-set ( HPCC_SEQUENCE 2 )
+set ( HPCC_SEQUENCE 0 )
 ###