瀏覽代碼

HPCC-11445 Roxie not stopping unused sink activities

Reinstate code that was removed in 3.8 when tidying up code that was believed
to be unused.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>

Conflicts:

	roxie/ccd/ccdserver.cpp
Richard Chapman 11 年之前
父節點
當前提交
3ad4fdec9a
共有 4 個文件被更改,包括 115 次插入29 次删除
  1. 66 29
      roxie/ccd/ccdserver.cpp
  2. 1 0
      roxie/ccd/ccdserver.hpp
  3. 12 0
      testing/regress/ecl/key/resetsplitter.xml
  4. 36 0
      testing/regress/ecl/resetsplitter.ecl

+ 66 - 29
roxie/ccd/ccdserver.cpp

@@ -887,6 +887,7 @@ protected:
     IRoxieSlaveContext *ctx;
     const IRoxieServerActivityFactory *factory;
     IRoxieServerActivityCopyArray dependencies;
+    IntArray dependencyIndexes;
     IntArray dependencyControlIds;
     IArrayOf<IActivityGraph> childGraphs;
     CachedOutputMetaData meta;
@@ -1247,6 +1248,12 @@ public:
                     }
                 }
 #endif
+                // NOTE - this is needed to ensure that dependencies which were not used are properly stopped
+                ForEachItemIn(idx, dependencies)
+                {
+                    if (dependencyControlIds.item(idx) == 0)
+                        dependencies.item(idx).stopSink(dependencyIndexes.item(idx));
+                }
                 if (input)
                     input->stop(aborting);
             }
@@ -1303,6 +1310,7 @@ public:
     virtual void addDependency(IRoxieServerActivity &source, unsigned sourceIdx, int controlId) 
     {
         dependencies.append(source);
+        dependencyIndexes.append(sourceIdx);
         dependencyControlIds.append(controlId);
     } 
 
@@ -1322,6 +1330,11 @@ public:
         throw MakeStringException(ROXIE_SINK, "Internal error: executeChild() requires a suitable sink");
     }
 
+    virtual void stopSink(unsigned idx)
+    {
+        throw MakeStringException(ROXIE_SINK, "Internal error: stopSink() requires a suitable sink");
+    }
+
     virtual __int64 evaluate() 
     {
         throw MakeStringException(ROXIE_SINK, "Internal error: evaluate() requires a function");
@@ -2116,19 +2129,31 @@ public:
 class CRoxieServerInternalSinkActivity : public CRoxieServerActivity
 {
 protected:
+    unsigned numOutputs;
     bool executed;
+    bool *stopped;
     CriticalSection ecrit;
     Owned<IException> exception;
 
 public:
-    CRoxieServerInternalSinkActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerActivity(_factory, _probeManager)
+    CRoxieServerInternalSinkActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerActivity(_factory, _probeManager), numOutputs(_numOutputs)
     {
         executed = false;
+        stopped = new bool[numOutputs];
+        for (unsigned s = 0; s < numOutputs; s++)
+            stopped[s] = false;
+    }
+
+    ~CRoxieServerInternalSinkActivity()
+    {
+        delete [] stopped;
     }
 
     virtual void reset()
     {
+        for (unsigned s = 0; s < numOutputs; s++)
+            stopped[s] = false;
         executed = false;
         exception.clear();
         CRoxieServerActivity::reset();
@@ -2139,6 +2164,18 @@ public:
         return NULL;
     }
 
+    virtual void stopSink(unsigned outputIdx)
+    {
+        if (!stopped[outputIdx])
+        {
+            stopped[outputIdx] = true;
+            for (unsigned s = 0; s < numOutputs; s++)
+                if (!stopped[s])
+                    return;
+            stop(false); // all outputs stopped - stop parent.
+        }
+    }
+
     virtual const void *nextInGroup()
     {
         throwUnexpected(); // I am nobody's input
@@ -4443,7 +4480,7 @@ class CRoxieServerApplyActivity : public CRoxieServerInternalSinkActivity
 
 public:
     CRoxieServerApplyActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorApplyArg &) basehelper)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper((IHThorApplyArg &) basehelper)
     {
     }
 
@@ -5813,8 +5850,8 @@ class CRoxieServerLocalResultWriteActivity : public CRoxieServerInternalSinkActi
     unsigned graphId;
 
 public:
-    CRoxieServerLocalResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorLocalResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerLocalResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorLocalResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -5859,7 +5896,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerLocalResultWriteActivity(this, _probeManager, graphId);
+        return new CRoxieServerLocalResultWriteActivity(this, _probeManager, graphId, usageCount);
     }
 
 };
@@ -5878,8 +5915,8 @@ class CRoxieServerDictionaryResultWriteActivity : public CRoxieServerInternalSin
     unsigned graphId;
 
 public:
-    CRoxieServerDictionaryResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDictionaryResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerDictionaryResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _usageCount, unsigned _graphId)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _usageCount), helper((IHThorDictionaryResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -5936,7 +5973,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerDictionaryResultWriteActivity(this, _probeManager, graphId);
+        return new CRoxieServerDictionaryResultWriteActivity(this, _probeManager, usageCount, graphId);
     }
 };
 
@@ -6113,8 +6150,8 @@ class CRoxieServerGraphLoopResultWriteActivity : public CRoxieServerInternalSink
     unsigned graphId;
 
 public:
-    CRoxieServerGraphLoopResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerGraphLoopResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -6199,7 +6236,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerGraphLoopResultWriteActivity(this, _probeManager, graphId);
+        return new CRoxieServerGraphLoopResultWriteActivity(this, _probeManager, graphId, usageCount);
     }
 
 };
@@ -9019,8 +9056,8 @@ class CRoxieServerPipeWriteActivity : public CRoxieServerInternalSinkActivity
     bool recreate;
     bool inputExhausted;
 public:
-    CRoxieServerPipeWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorPipeWriteArg &)basehelper)
+    CRoxieServerPipeWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorPipeWriteArg &)basehelper)
     {
         recreate = helper.recreateEachRow();
         firstRead = false;
@@ -9140,7 +9177,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerPipeWriteActivity(this, _probeManager);
+        return new CRoxieServerPipeWriteActivity(this, _probeManager, usageCount);
     }
 };
 
@@ -9663,8 +9700,8 @@ class CRoxieServerActionActivity : public CRoxieServerInternalSinkActivity
     IHThorActionArg &helper;
 public:
 
-    CRoxieServerActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorActionArg &)basehelper)
+    CRoxieServerActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorActionArg &)basehelper)
     {
     }
 
@@ -9684,7 +9721,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerActionActivity(this, _probeManager);
+        return new CRoxieServerActionActivity(this, _probeManager, usageCount);
     }
 };
 
@@ -10711,7 +10748,7 @@ protected:
 
 public:
     CRoxieServerDiskWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDiskWriteArg &)basehelper)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper((IHThorDiskWriteArg &)basehelper)
     {
         extend = ((helper.getFlags() & TDWextend) != 0);
         overwrite = ((helper.getFlags() & TDWoverwrite) != 0);
@@ -11195,7 +11232,7 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity,
 
 public:
     CRoxieServerIndexWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper(static_cast<IHThorIndexWriteArg &>(basehelper))
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper(static_cast<IHThorIndexWriteArg &>(basehelper))
     {
         overwrite = ((helper.getFlags() & TIWoverwrite) != 0);
         reccount = 0;
@@ -19315,8 +19352,8 @@ class CRoxieServerWorkUnitWriteActivity : public CRoxieServerInternalSinkActivit
     IRoxieServerContext *serverContext;
 
 public:
-    CRoxieServerWorkUnitWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
+    CRoxieServerWorkUnitWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
     {
         grouped = (helper.getFlags() & POFgrouped) != 0;
         serverContext = NULL;
@@ -19460,7 +19497,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerWorkUnitWriteActivity(this, _probeManager, isReread);
+        return new CRoxieServerWorkUnitWriteActivity(this, _probeManager, isReread, usageCount);
     }
 
 };
@@ -19479,8 +19516,8 @@ class CRoxieServerWorkUnitWriteDictActivity : public CRoxieServerInternalSinkAct
     IRoxieServerContext *serverContext;
 
 public:
-    CRoxieServerWorkUnitWriteDictActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDictionaryWorkUnitWriteArg &)basehelper)
+    CRoxieServerWorkUnitWriteDictActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _usageCount)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _usageCount), helper((IHThorDictionaryWorkUnitWriteArg &)basehelper)
     {
         serverContext = NULL;
     }
@@ -19534,7 +19571,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerWorkUnitWriteDictActivity(this, _probeManager);
+        return new CRoxieServerWorkUnitWriteDictActivity(this, _probeManager, usageCount);
     }
 
 };
@@ -19552,8 +19589,8 @@ class CRoxieServerRemoteResultActivity : public CRoxieServerInternalSinkActivity
     IHThorRemoteResultArg &helper;
 
 public:
-    CRoxieServerRemoteResultActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorRemoteResultArg &)basehelper)
+    CRoxieServerRemoteResultActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorRemoteResultArg &)basehelper)
     {
     }
 
@@ -19577,7 +19614,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerRemoteResultActivity(this, _probeManager);
+        return new CRoxieServerRemoteResultActivity(this, _probeManager, usageCount);
     }
 
 };

+ 1 - 0
roxie/ccd/ccdserver.hpp

@@ -159,6 +159,7 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual void executeChild(size32_t & retSize, void * & ret, unsigned parentExtractSize, const byte * parentExtract) = 0;
     virtual void serializeCreateStartContext(MemoryBuffer &out) = 0;
     virtual void serializeExtra(MemoryBuffer &out) = 0;
+    virtual void stopSink(unsigned idx) = 0;
 //Functions to support result streaming between parallel loop/graphloop/library implementations
     virtual IRoxieInput * querySelectOutput(unsigned id) = 0;
     virtual bool querySetStreamInput(unsigned id, IRoxieInput * _input) = 0;

+ 12 - 0
testing/regress/ecl/key/resetsplitter.xml

@@ -0,0 +1,12 @@
+<Dataset name='Result 1'>
+ <Row><id>64</id></Row>
+ <Row><id>65</id></Row>
+ <Row><id>66</id></Row>
+ <Row><id>67</id></Row>
+ <Row><id>68</id></Row>
+ <Row><id>69</id></Row>
+ <Row><id>70</id></Row>
+ <Row><id>71</id></Row>
+ <Row><id>72</id></Row>
+ <Row><id>73</id></Row>
+</Dataset>

+ 36 - 0
testing/regress/ecl/resetsplitter.ecl

@@ -0,0 +1,36 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+idRecord := { unsigned id; };
+
+myDataset := DATASET(100, TRANSFORM(idRecord, SELF.id := COUNTER), DISTRIBUTED);
+
+filtered := NOFOLD(myDataset)(id % 20 != 0);
+
+filter1 := NOFOLD(filtered)(id % 3 != 0);
+
+filter2 := NOFOLD(filtered)(id % 3 != 1);
+
+p1 := PROJECT(NOFOLD(filtered), TRANSFORM(idRecord, SELF.id := LEFT.id + COUNT(filter1)));
+
+p2 := PROJECT(NOFOLD(filtered), TRANSFORM(idRecord, SELF.id := LEFT.id + COUNT(filter2)));
+
+boolean test := false : stored('test');
+
+r := IF(test, NOFOLD(p1), NOFOLD(p2));
+
+output(CHOOSEN(NOFOLD(r), 10));