浏览代码

HPCC-14871 Changes following code review

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 年之前
父节点
当前提交
d7dec16982
共有 3 个文件被更改,包括 30 次插入27 次删除
  1. 2 4
      common/thorhelper/thorstrand.cpp
  2. 2 2
      roxie/ccd/ccdmain.cpp
  3. 26 21
      roxie/ccd/ccdserver.cpp

+ 2 - 4
common/thorhelper/thorstrand.cpp

@@ -840,7 +840,7 @@ void clearRowQueue(IRowQueue * queue)
 //---------------------------------------------------------------------------------------------------------------------
 
 //Class for managing processing on a single ordered strand
-class OrderedStrandRowBlockInput : public CInterface, implements IEngineRowStream
+class OrderedStrandRowBlockInput : public CInterfaceOf<IEngineRowStream>
 {
     friend class OrderedManyToOneJunction;
 
@@ -850,7 +850,6 @@ public:
     {
         curInputBlock = nullptr;
     }
-    IMPLEMENT_IINTERFACE
 
  //interface IEngineRowStream for the rows being supplied to the strand.
     virtual const void *nextRow()
@@ -1079,7 +1078,7 @@ protected:
 //---------------------------------------------------------------------------------------------------------------------
 
 //Class for reading input from a streaming source activity.
-class OrderedStrandStreamInput : public CInterface, implements IEngineRowStream
+class OrderedStrandStreamInput : public CInterfaceOf<IEngineRowStream>
 {
     friend class OrderedManyToOneJunction;
 
@@ -1088,7 +1087,6 @@ public:
     : splitJunction(_splitJunction), callback(_callback)
     {
     }
-    IMPLEMENT_IINTERFACE
 
     void setInput(IEngineRowStream * _input)
     {

+ 2 - 2
roxie/ccd/ccdmain.cpp

@@ -133,7 +133,7 @@ unsigned defaultKeyedJoinPreload = 0;
 unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
 unsigned defaultStrandBlockSize = 512;
-unsigned defaultForceNumStrands = 1;
+unsigned defaultForceNumStrands = 0;
 
 unsigned slaveQueryReleaseDelaySeconds = 60;
 unsigned coresPerQuery = 0;
@@ -764,7 +764,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
         defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
         defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
-        defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 1);
+        defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
         defaultCheckingHeap = topology->getPropBool("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
 
         slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);

+ 26 - 21
roxie/ccd/ccdserver.cpp

@@ -368,7 +368,7 @@ static const StatisticsMapping sortStatistics(&actStatistics, StTimeSortElapsed,
 
 //=================================================================================
 
-const static unsigned DefaultParallel = (0U-1U); // parallel requested, but no number supplied
+const static unsigned minus1U = (0U-1U);
 class CRoxieServerActivityFactoryBase : public CActivityFactory, implements IRoxieServerActivityFactory
 {
 protected:
@@ -377,7 +377,7 @@ protected:
     IntArray dependencyControlIds; // things I am dependent on
     StringArray dependencyEdgeIds; // How to describe them to the debugger
     unsigned dependentCount; // things dependent on me
-    unsigned optParallel = 0; // -1 if enabled
+    unsigned optParallel = 0; // -1=enable,0=not specified,1=single-threaded,n=<num threads>
     bool optStableInput = true; // is the input forced to ordered?
     bool optUnstableInput = false;  // is the input forced to unordered?
     bool optUnordered = false; // is the output specified as unordered?
@@ -399,10 +399,7 @@ public:
         totalCycles = 0;
         localCycles = 0;
         dependentCount = 0;
-        unsigned hintNumStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 0);
-        optParallel = _graphNode.getPropInt("att[@name='parallel']/@value", hintNumStrands);
-        if (optParallel == DefaultParallel)
-            optParallel = getAffinityCpus();
+        optParallel = _graphNode.getPropInt("att[@name='parallel']/@value", 0);
         optUnordered = !_graphNode.getPropBool("att[@name='ordered']/@value", true);
     }
     
@@ -1420,7 +1417,6 @@ public:
     {
         assertex(!idx);
         // By default, activities are assumed NOT to support streams
-        // Assume that for
         bool inputOrdered = isInputOrdered(consumerOrdered, 0);
         connectInputStreams(inputOrdered);
         // Return a single stream
@@ -1573,13 +1569,10 @@ class StrandOptions
 public:
     explicit StrandOptions(IPropertyTree &_graphNode)
     {
-        //PARALLEL(0) can be used to explicitly disable parallel processing.
-        unsigned hintNumStrands = _graphNode.getPropInt("hint[@name='numstrands']/@value", 1);
-        numStrands = _graphNode.getPropInt("att[@name='parallel']/@value", hintNumStrands);
-        if (numStrands == DefaultParallel)
+        //PARALLEL(1) can be used to explicitly disable parallel processing.
+        numStrands = _graphNode.getPropInt("att[@name='parallel']/@value", 0);
+        if ((numStrands == minus1U) || (numStrands > MAX_SENSIBLE_STRANDS))
             numStrands = getAffinityCpus();
-        if (numStrands > MAX_SENSIBLE_STRANDS)
-            numStrands = 1;
         blockSize = _graphNode.getPropInt("hint[@name='strandblocksize']/@value", 0);
     }
     StrandOptions(const StrandOptions &from, IRoxieSlaveContext *ctx)
@@ -1589,12 +1582,12 @@ public:
 
         if (!blockSize)
             blockSize = ctx->queryOptions().strandBlockSize;
-        if (numStrands == 1)
+        if (numStrands == 0)
             numStrands = ctx->queryOptions().forceNumStrands;
     }
 public:
-    unsigned numStrands; // if 0 it forces single-stranded operations.  Useful for testing.
-    unsigned blockSize;
+    unsigned numStrands = 0; // if 1 it forces single-stranded operations.  (Useful for testing.)
+    unsigned blockSize = 0;
 };
 
 class StrandProcessor : public CInterfaceOf<IEngineRowStream>
@@ -1704,15 +1697,27 @@ public:
         CRoxieServerActivity::connectDependencies();
 
         bool inputOrdered = isInputOrdered(consumerOrdered, idx);
-        if (consumerOptions && (consumerOptions->numStrands > strandOptions.numStrands) && (strandOptions.numStrands != 0))
-            strandOptions.numStrands = consumerOptions->numStrands;
+        //Note, numStrands == 1 is an explicit request to disable threading
+        if (consumerOptions && (consumerOptions->numStrands != 1) && (strandOptions.numStrands != 1))
+        {
+            //Check to see if the consumer's settings should override
+            if (strandOptions.numStrands == 0)
+            {
+                strandOptions.numStrands = consumerOptions->numStrands;
+                strandOptions.blockSize = consumerOptions->blockSize;
+            }
+            else if (consumerOptions->numStrands > strandOptions.numStrands)
+            {
+                strandOptions.numStrands = consumerOptions->numStrands;
+            }
+        }
 
         Owned <IStrandJunction> recombiner;
         if (input)
         {
-            if (strandOptions.numStrands <= 1)
+            if (strandOptions.numStrands == 1)
             {
-                // 0 means explicitly requested single-strand.
+                // 1 means explicitly requested single-strand.
                 IEngineRowStream *instream = connectSingleStream(ctx, input, sourceIdx, junction, inputOrdered);
                 strands.append(*createStrandProcessor(instream));
             }
@@ -1720,7 +1725,7 @@ public:
             {
                 PointerArrayOf<IEngineRowStream> instreams;
                 recombiner.setown(input->getOutputStreams(ctx, sourceIdx, instreams, &strandOptions, inputOrdered));
-                if (instreams.length() == 1)
+                if ((instreams.length() == 1) && (strandOptions.numStrands != 0))  // 0 means did not specify - we should use the strands that our upstream provides
                 {
                     assertex(recombiner == NULL);
                     // Create a splitter to split the input into n... and a recombiner if need to preserve sorting