فهرست منبع

Merge pull request #14888 from richardkchapman/sinkmode

HPCC-25775 Roxie may create too many threads

Reviewed-by: Mark Kelly
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 4 سال پیش
والد
کامیت
071440d752

+ 2 - 2
ecl/hql/hqlgram.y

@@ -8642,11 +8642,11 @@ simpleDataSet
                             parser->checkDistribution($3, $$.queryExpr(), false);
                             $$.setPosition($1);
                         }
-    | ROLLUP '(' startTopLeftRightSeqFilter ',' startLeftRowsGroup ',' transform ')' endRowsGroup endTopLeftRightFilter endSelectorSequence
+    | ROLLUP '(' startTopLeftRightSeqFilter ',' startLeftRowsGroup ',' transform optCommonAttrs ')' endRowsGroup endTopLeftRightFilter endSelectorSequence
                         {
                             parser->checkGrouped($3);
                             IHqlExpression *attr = NULL;
-                            $$.setExpr(createDataset(no_rollupgroup, { $3.getExpr(), $7.getExpr(), attr, $9.getExpr(), $11.getExpr() }));
+                            $$.setExpr(createDataset(no_rollupgroup, { $3.getExpr(), $7.getExpr(), attr, $10.getExpr(), $12.getExpr(), $8.getExpr() }));
                             $$.setPosition($1);
                         }
     | COMBINE '(' startLeftDelaySeqFilter ',' startRightFilter optCommonAttrs ')' endSelectorSequence

+ 13 - 0
roxie/ccd/ccd.hpp

@@ -260,6 +260,18 @@ interface IRoxieQueryPacket : extends IInterface
 
 interface IQueryDll;
 
+//----------------------------------------------------------------------------------------------
+// SinkMode determines how parallel sinks are executed
+//----------------------------------------------------------------------------------------------
+
+enum class SinkMode : byte
+{
+    Parallel = 0,           // Execute sinks in parallel - this is the default
+    ParallelPersistent = 1, // Execute sinks in parallel using persistent threads. May be faster for a heavily-reused child query, but lead to higher thread usage
+    Sequential = 2          // Execute sinks sequentially - sometimes faster if sinks not doing much work
+};
+
+
 // Global configuration info
 extern bool shuttingDown;
 extern unsigned numChannels;
@@ -352,6 +364,7 @@ extern bool defaultNoSeekBuildIndex;
 extern unsigned parallelLoadQueries;
 extern bool adhocRoxie;
 extern bool alwaysFailOnLeaks;
+extern SinkMode defaultSinkMode;
 
 #ifdef _CONTAINERIZED
 static constexpr bool roxieMulticastEnabled = false;

+ 4 - 1
roxie/ccd/ccdmain.cpp

@@ -136,7 +136,7 @@ bool defaultCollectFactoryStatistics = true;
 bool defaultNoSeekBuildIndex = false;
 unsigned parallelLoadQueries = 8;
 bool alwaysFailOnLeaks = false;
-
+SinkMode defaultSinkMode = SinkMode::ParallelPersistent;
 unsigned continuationCompressThreshold = 1024;
 
 bool useOldTopology = false;
@@ -1072,6 +1072,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         if (!parallelLoadQueries)
             parallelLoadQueries = 1;
         alwaysFailOnLeaks = topology->getPropBool("@alwaysFailOnLeaks", false);
+        const char *sinkModeText = topology->queryProp("@sinkMode");
+        if (sinkModeText)
+            defaultSinkMode = getSinkMode(sinkModeText);
 
         enableKeyDiff = topology->getPropBool("@enableKeyDiff", true);
         cacheReportPeriodSeconds = topology->getPropInt("@cacheReportPeriodSeconds", 5*60);

+ 11 - 0
roxie/ccd/ccdquery.cpp

@@ -334,6 +334,7 @@ QueryOptions::QueryOptions()
     failOnLeaks = alwaysFailOnLeaks;
     collectFactoryStatistics = defaultCollectFactoryStatistics;
     parallelWorkflow = false;
+    sinkMode = defaultSinkMode;
     numWorkflowThreads = 1;
 }
 
@@ -369,6 +370,7 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     collectFactoryStatistics = other.collectFactoryStatistics;
 
     parallelWorkflow = other.parallelWorkflow;
+    sinkMode = other.sinkMode;
     numWorkflowThreads = other.numWorkflowThreads;
 }
 
@@ -414,6 +416,7 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(collectFactoryStatistics, wu, "collectFactoryStatistics");
 
     updateFromWorkUnit(parallelWorkflow, wu, "parallelWorkflow");
+    updateFromWorkUnit(sinkMode, wu, "sinkMode");
     updateFromWorkUnit(numWorkflowThreads, wu, "numWorkflowthreads");
 }
 
@@ -445,6 +448,14 @@ void QueryOptions::updateFromWorkUnit(RecordTranslationMode &value, IConstWorkUn
         value = getTranslationMode(val.str(), false);
 }
 
+void QueryOptions::updateFromWorkUnit(SinkMode &value, IConstWorkUnit &wu, const char *name)
+{
+    SCMStringBuffer val;
+    wu.getDebugValue(name, val);
+    if (val.length())
+        value = ::getSinkMode(val.str());
+}
+
 void QueryOptions::setFromContext(const IPropertyTree *ctx)
 {
     if (ctx)

+ 2 - 0
roxie/ccd/ccdquery.hpp

@@ -110,6 +110,7 @@ public:
     bool collectFactoryStatistics;
     bool noSeekBuildIndex;
     bool parallelWorkflow;
+    SinkMode sinkMode;
     unsigned numWorkflowThreads;
 
 private:
@@ -119,6 +120,7 @@ private:
     static void updateFromWorkUnit(unsigned &value, IConstWorkUnit &wu, const char *name);
     static void updateFromWorkUnit(bool &value, IConstWorkUnit &wu, const char *name);
     static void updateFromWorkUnit(RecordTranslationMode &value, IConstWorkUnit &wu, const char *name);
+    static void updateFromWorkUnit(SinkMode &value, IConstWorkUnit &wu, const char *name);
     static void updateFromContextM(memsize_t &val, const IPropertyTree *ctx, const char *name, const char *name2 = NULL); // Needs different name to ensure works in 32-bit where memsize_t and unsigned are same type
     static void updateFromContext(int &val, const IPropertyTree *ctx, const char *name, const char *name2 = NULL);
     static void updateFromContext(unsigned &val, const IPropertyTree *ctx, const char *name, const char *name2 = NULL);

+ 76 - 41
roxie/ccd/ccdserver.cpp

@@ -404,6 +404,21 @@ static const StatisticsMapping indexWriteStatistics({ StNumDuplicateKeys }, actS
 
 //=================================================================================
 
+extern SinkMode getSinkMode(const char *val)
+{
+    if (strieq(val, "parallelpersistent"))
+        return SinkMode::ParallelPersistent;
+    else if (strieq(val, "sequential"))
+        return SinkMode::Sequential;
+    else
+    {
+        if (!strieq(val, "parallel"))
+            WARNLOG("Unsupported sinkmode %s - assuming parallel", val);
+        return SinkMode::Parallel;
+    }
+}
+
+
 const static unsigned minus1U = (0U-1U);
 class CRoxieServerActivityFactoryBase : public CActivityFactory, implements IRoxieServerActivityFactory
 {
@@ -418,6 +433,7 @@ protected:
     bool optUnstableInput = false;  // is the input forced to unordered?
     bool optUnordered = false; // is the output specified as unordered?
     bool isCodeSigned = false;
+    SinkMode sinkMode = SinkMode::Parallel;
     unsigned heapFlags;
     mutable RelaxedAtomic<__int64> processed = {0};
     mutable RelaxedAtomic<__int64> started = {0};
@@ -432,6 +448,11 @@ public:
         optParallel = _graphNode.getPropInt("att[@name='parallel']/@value", 0);
         optUnordered = !_graphNode.getPropBool("att[@name='ordered']/@value", true);
         heapFlags = _graphNode.getPropInt("hint[@name='heapflags']/@value", _queryFactory.queryOptions().heapFlags);
+        const char *sinkModeText  = _graphNode.queryProp("hint[@name='sinkmode']/@value");
+        if (sinkModeText)
+            sinkMode = ::getSinkMode(sinkModeText);
+        else
+            sinkMode = _queryFactory.queryOptions().sinkMode;
         isCodeSigned = ::isActivityCodeSigned(_graphNode);
     }
     
@@ -620,6 +641,10 @@ public:
     {
         return CActivityFactory::getEnableFieldTranslation();
     }
+    virtual SinkMode getSinkMode() const override
+    {
+        return sinkMode;
+    }
 };
 
 class CRoxieServerMultiInputInfo
@@ -27603,6 +27628,7 @@ protected:
     IRoxieServerActivity *parentActivity;
     unsigned id;
     unsigned loopCounter;
+    SinkMode sinkMode = SinkMode::Parallel;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -27616,6 +27642,10 @@ public:
         loopCounter = 0;
         graphAgentContext.setCodeContext(&graphCodeContext);
         graphCodeContext.setContainer(&graphAgentContext, this);
+        if (parentActivity)
+            sinkMode = parentActivity->queryFactory()->getSinkMode();
+        else
+            sinkMode = _ctx->queryOptions().sinkMode;
     }
 
     ~CActivityGraph()
@@ -27795,68 +27825,73 @@ public:
         if (sinks.ordinality()==1)
             sinks.item(0).execute(parentExtractSize, parentExtract);
 #ifdef PARALLEL_EXECUTE
-        else if (!probeManager && !graphDefinition.isSequential())
+        else if (!probeManager && !graphDefinition.isSequential() && sinkMode != SinkMode::Sequential)
         {
-#ifdef PARALLEL_PERSISTANT_THREADS
-            if (!threads.ordinality())
+            if (sinkMode == SinkMode::ParallelPersistent)
             {
-                for (unsigned i = 0; i < sinks.ordinality()-1; i++)
+                if (!threads.ordinality())
                 {
-                    threads.append(*new SinkThread(*this, sinks.item(i)));
+                    for (unsigned i = 0; i < sinks.ordinality()-1; i++)
+                    {
+                        threads.append(*new SinkThread(*this, sinks.item(i)));
+                    }
                 }
-            }
-            for (unsigned i = 0; i < sinks.ordinality()-1; i++)
-                threads.item(i).start(parentExtractSize, parentExtract);
-            try
-            {
-                sinks.item(sinks.ordinality()-1).execute(parentExtractSize, parentExtract);
-            }
-            catch (IException *E)
-            {
-                noteException(E);
-                E->Release();
-            }
-            for (unsigned i = 0; i < sinks.ordinality()-1; i++)
-            {
+                for (unsigned i = 0; i < sinks.ordinality()-1; i++)
+                    threads.item(i).start(parentExtractSize, parentExtract);
                 try
                 {
-                    threads.item(i).join();
+                    sinks.item(sinks.ordinality()-1).execute(parentExtractSize, parentExtract);
                 }
                 catch (IException *E)
                 {
                     noteException(E);
                     E->Release();
                 }
-            }
-            checkAbort();
- #else
-            class casyncfor: public CAsyncFor
-            {
-            public:
-                IActivityGraph &parent;
-                unsigned parentExtractSize;
-                const byte * parentExtract;
-
-                casyncfor(IRoxieServerActivityCopyArray &_sinks, IActivityGraph &_parent, unsigned _parentExtractSize, const byte * _parentExtract) : 
-                    sinks(_sinks), parent(_parent), parentExtractSize(_parentExtractSize), parentExtract(_parentExtract) { }
-                void Do(unsigned i)
+                for (unsigned i = 0; i < sinks.ordinality()-1; i++)
                 {
                     try
                     {
-                        sinks.item(i).execute(parentExtractSize, parentExtract);
+                        threads.item(i).join();
                     }
                     catch (IException *E)
                     {
-                        parent.noteException(E);
-                        throw;
+                        noteException(E);
+                        E->Release();
                     }
                 }
-            private:
-                IRoxieServerActivityCopyArray &sinks;
-            } afor(sinks, *this, parentExtractSize, parentExtract);
-            afor.For(sinks.ordinality(), sinks.ordinality());
-#endif
+                checkAbort();
             }
+            else if (sinkMode == SinkMode::Parallel)
+            {
+                class casyncfor: public CAsyncFor
+                {
+                public:
+                    IActivityGraph &parent;
+                    unsigned parentExtractSize;
+                    const byte * parentExtract;
+
+                    casyncfor(IRoxieServerActivityCopyArray &_sinks, IActivityGraph &_parent, unsigned _parentExtractSize, const byte * _parentExtract) :
+                        parent(_parent), parentExtractSize(_parentExtractSize), parentExtract(_parentExtract), sinks(_sinks) { }
+                    void Do(unsigned i)
+                    {
+                        try
+                        {
+                            sinks.item(i).execute(parentExtractSize, parentExtract);
+                        }
+                        catch (IException *E)
+                        {
+                            parent.noteException(E);
+                            throw;
+                        }
+                    }
+                private:
+                    IRoxieServerActivityCopyArray &sinks;
+                } afor(sinks, *this, parentExtractSize, parentExtract);
+                afor.For(sinks.ordinality(), sinks.ordinality());
+            }
+            else
+                throwUnexpected();
+        }
 #endif
         else
         {

+ 3 - 0
roxie/ccd/ccdserver.hpp

@@ -116,6 +116,8 @@ interface IFinalRoxieInput : extends IInputBase
     }
 };
 
+extern SinkMode getSinkMode(const char *sinkModetext);
+
 extern IEngineRowStream *connectSingleStream(IRoxieAgentContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered);
 
 interface ISteppedConjunctionCollector;
@@ -228,6 +230,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual roxiemem::RoxieHeapFlags getHeapFlags() const = 0;
     virtual bool isActivityCodeSigned() const = 0;
     virtual RecordTranslationMode getEnableFieldTranslation() const = 0;
+    virtual SinkMode getSinkMode() const = 0;
 };
 interface IGraphResult : public IInterface
 {

+ 129 - 0
testing/regress/ecl/childsink.ecl

@@ -0,0 +1,129 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+// Testing various forms of child query sinks
+
+//version childSinkOption='sequential'
+//version childSinkOption='parallel'
+//version childSinkOption='parallelPersistent'
+
+import ^ as root;
+childSinkMode := #IFDEFINED(root.childSinkOption, 'parallel');
+
+Simple := FALSE;
+Full := TRUE;
+
+Layout := RECORD
+  UNSIGNED8 UID;
+	UNSIGNED4 F1;
+	UNSIGNED4 F2;
+	UNSIGNED4 F3;
+	UNSIGNED4 F4;
+	UNSIGNED4 F5;
+	UNSIGNED4 F6;
+	UNSIGNED4 F7;
+	UNSIGNED4 F8;
+	UNSIGNED4 F9;
+	UNSIGNED4 F10;
+	UNSIGNED4 F11;
+	UNSIGNED4 F12;
+	UNSIGNED4 F13;
+	UNSIGNED4 F14;
+	UNSIGNED4 F15;
+	UNSIGNED4 F16;
+	UNSIGNED4 F17;
+	UNSIGNED4 F18;
+	UNSIGNED4 F19;
+	UNSIGNED4 F20;
+	UNSIGNED4 F21;
+	UNSIGNED4 F22;
+	UNSIGNED4 F23;
+	UNSIGNED4 F24;
+	UNSIGNED4 F25;
+END;
+
+DUP_UIDS := 4;
+DUP_Values := 1;
+Layout BuildData(Layout r, UNSIGNED c) := TRANSFORM
+  SELF.UID := c DIV DUP_UIDS;
+  SELF.F1 := HASH32(c DIV DUP_Values + 1);
+  SELF.F2 := HASH32(c DIV DUP_Values + 2);
+  SELF.F3 := HASH32(c DIV DUP_Values + 3);
+  SELF.F4 := HASH32(c DIV DUP_Values + 4);
+  SELF.F5 := HASH32(c DIV DUP_Values + 5);
+  SELF.F6 := HASH32(c DIV DUP_Values + 6);
+  SELF.F7 := HASH32(c DIV DUP_Values + 7);
+  SELF.F8 := HASH32(c DIV DUP_Values + 8);
+  SELF.F9 := HASH32(c DIV DUP_Values + 9);
+  SELF.F10 := HASH32(c DIV DUP_Values + 10);
+  SELF.F11 := HASH32(c DIV DUP_Values + 11);
+  SELF.F12 := HASH32(c DIV DUP_Values + 12);
+  SELF.F13 := HASH32(c DIV DUP_Values + 13);
+  SELF.F14 := HASH32(c DIV DUP_Values + 14);
+  SELF.F15 := HASH32(c DIV DUP_Values + 15);
+  SELF.F16 := HASH32(c DIV DUP_Values + 16);
+  SELF.F17 := HASH32(c DIV DUP_Values + 17);
+  SELF.F18 := HASH32(c DIV DUP_Values + 18);
+  SELF.F19 := HASH32(c DIV DUP_Values + 19);
+  SELF.F20 := HASH32(c DIV DUP_Values + 20);
+  SELF.F21 := HASH32(c DIV DUP_Values + 21);
+  SELF.F22 := HASH32(c DIV DUP_Values + 22);
+  SELF.F23 := HASH32(c DIV DUP_Values + 23);
+  SELF.F24 := HASH32(c DIV DUP_Values + 24);
+  SELF.F25 := HASH32(c DIV DUP_Values + 25);
+END;
+
+Seed := DATASET([{0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}], Layout);
+Source := NORMALIZE(Seed, 20000, BuildData(LEFT, COUNTER));
+Layout SimpleRollup(Layout r, DATASET(Layout) rs) := TRANSFORM
+  SELF := r;
+END;
+SimpleResult := ROLLUP(GROUP(Source, UID), GROUP, SimpleRollup(LEFT, ROWS(LEFT)));
+IF(Simple, OUTPUT(SimpleResult, , '~tmp::kel::rollupslowdown::simple', OVERWRITE, EXPIRE(1)));
+
+Layout FullRollup(Layout r, DATASET(Layout) rs) := TRANSFORM
+  SELF.F1 := TOPN(TABLE(rs, {F1,UNSIGNED C:=COUNT(GROUP)}, F1, FEW), 1, -C)[1].F1;  
+  SELF.F2 := TOPN(TABLE(rs, {F2,UNSIGNED C:=COUNT(GROUP)}, F2, FEW), 1, -C)[1].F2;  
+  SELF.F3 := TOPN(TABLE(rs, {F3,UNSIGNED C:=COUNT(GROUP)}, F3, FEW), 1, -C)[1].F3;  
+/*
+  SELF.F4 := TOPN(TABLE(rs, {F4,UNSIGNED C:=COUNT(GROUP)}, F4, FEW), 1, -C)[1].F4;  
+  SELF.F5 := TOPN(TABLE(rs, {F5,UNSIGNED C:=COUNT(GROUP)}, F5, FEW), 1, -C)[1].F5;  
+  SELF.F6 := TOPN(TABLE(rs, {F6,UNSIGNED C:=COUNT(GROUP)}, F6, FEW), 1, -C)[1].F6;  
+  SELF.F7 := TOPN(TABLE(rs, {F7,UNSIGNED C:=COUNT(GROUP)}, F7, FEW), 1, -C)[1].F7;  
+  SELF.F8 := TOPN(TABLE(rs, {F8,UNSIGNED C:=COUNT(GROUP)}, F8, FEW), 1, -C)[1].F8;  
+  SELF.F9 := TOPN(TABLE(rs, {F9,UNSIGNED C:=COUNT(GROUP)}, F9, FEW), 1, -C)[1].F9;  
+  SELF.F10 := TOPN(TABLE(rs, {F10,UNSIGNED C:=COUNT(GROUP)}, F10, FEW), 1, -C)[1].F10;  
+  SELF.F11 := TOPN(TABLE(rs, {F11,UNSIGNED C:=COUNT(GROUP)}, F11, FEW), 1, -C)[1].F11;  
+  SELF.F12 := TOPN(TABLE(rs, {F12,UNSIGNED C:=COUNT(GROUP)}, F12, FEW), 1, -C)[1].F12;  
+  SELF.F13 := TOPN(TABLE(rs, {F13,UNSIGNED C:=COUNT(GROUP)}, F13, FEW), 1, -C)[1].F13;  
+  SELF.F14 := TOPN(TABLE(rs, {F14,UNSIGNED C:=COUNT(GROUP)}, F14, FEW), 1, -C)[1].F14;  
+  SELF.F15 := TOPN(TABLE(rs, {F15,UNSIGNED C:=COUNT(GROUP)}, F15, FEW), 1, -C)[1].F15;  
+  SELF.F16 := TOPN(TABLE(rs, {F16,UNSIGNED C:=COUNT(GROUP)}, F16, FEW), 1, -C)[1].F16;  
+  SELF.F17 := TOPN(TABLE(rs, {F17,UNSIGNED C:=COUNT(GROUP)}, F17, FEW), 1, -C)[1].F17;  
+  SELF.F18 := TOPN(TABLE(rs, {F18,UNSIGNED C:=COUNT(GROUP)}, F18, FEW), 1, -C)[1].F18;  
+  SELF.F19 := TOPN(TABLE(rs, {F19,UNSIGNED C:=COUNT(GROUP)}, F19, FEW), 1, -C)[1].F19;  
+  SELF.F20 := TOPN(TABLE(rs, {F20,UNSIGNED C:=COUNT(GROUP)}, F20, FEW), 1, -C)[1].F20;  
+  SELF.F21 := TOPN(TABLE(rs, {F21,UNSIGNED C:=COUNT(GROUP)}, F21, FEW), 1, -C)[1].F21;  
+  SELF.F22 := TOPN(TABLE(rs, {F22,UNSIGNED C:=COUNT(GROUP)}, F22, FEW), 1, -C)[1].F22;  
+  SELF.F23 := TOPN(TABLE(rs, {F23,UNSIGNED C:=COUNT(GROUP)}, F23, FEW), 1, -C)[1].F23;  
+  SELF.F24 := TOPN(TABLE(rs, {F24,UNSIGNED C:=COUNT(GROUP)}, F24, FEW), 1, -C)[1].F24;  
+  SELF.F25 := TOPN(TABLE(rs, {F25,UNSIGNED C:=COUNT(GROUP)}, F25, FEW), 1, -C)[1].F25;  
+*/
+  SELF := r;
+END;
+FullResult := PULL(ROLLUP(GROUP(Source, UID), GROUP, FullRollup(LEFT, ROWS(LEFT)), HINT(sinkMode(childSinkMode))));
+OUTPUT(CHOOSEN(FullResult, 1));

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 3 - 0
testing/regress/ecl/key/childsink.xml