Przeglądaj źródła

Merge pull request #1711 from rengolin/roxie-writer

Dependencies stopped twice

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 lat temu
rodzic
commit
85f7765c84
2 zmienionych plików z 23 dodań i 61 usunięć
  1. 23 60
      roxie/ccd/ccdserver.cpp
  2. 0 1
      roxie/ccd/ccdserver.hpp

+ 23 - 60
roxie/ccd/ccdserver.cpp

@@ -857,7 +857,6 @@ protected:
     IRoxieSlaveContext *ctx;
     const IRoxieServerActivityFactory *factory;
     IRoxieServerActivityCopyArray dependencies;
-    IntArray dependencyIndexes;
     IntArray dependencyControlIds;
     IArrayOf<IActivityGraph> childGraphs;
     CachedOutputMetaData meta;
@@ -1231,12 +1230,6 @@ public:
                     }
                 }
 #endif
-                // MORE - not absolutely sure that this is needed since dependency gets started and stopped by the execute
-                ForEachItemIn(idx, dependencies)
-                {
-                    if (dependencyControlIds.item(idx) == 0)
-                        dependencies.item(idx).stopSink(dependencyIndexes.item(idx), aborting);
-                }
                 if (input)
                     input->stop(aborting);
             }
@@ -1292,7 +1285,6 @@ public:
     virtual void addDependency(IRoxieServerActivity &source, unsigned sourceIdx, int controlId) 
     {
         dependencies.append(source);
-        dependencyIndexes.append(sourceIdx);
         dependencyControlIds.append(controlId);
     } 
 
@@ -1312,11 +1304,6 @@ public:
         throw MakeStringException(ROXIE_SINK, "Internal error: executeChild() requires a suitable sink");
     }
 
-    virtual void stopSink(unsigned idx, bool abort)
-    {
-        throw MakeStringException(ROXIE_SINK, "Internal error: stopSink() requires a suitable sink");
-    }
-
     virtual __int64 evaluate() 
     {
         throw MakeStringException(ROXIE_SINK, "Internal error: evaluate() requires a function");
@@ -2135,31 +2122,19 @@ public:
 class CRoxieServerInternalSinkActivity : public CRoxieServerActivity
 {
 protected:
-    unsigned numOutputs;
     bool executed;
-    bool *stopped;
     CriticalSection ecrit;
     Owned<IException> exception;
 
 public:
-    CRoxieServerInternalSinkActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
-        : CRoxieServerActivity(_factory, _probeManager), numOutputs(_numOutputs)
+    CRoxieServerInternalSinkActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+        : CRoxieServerActivity(_factory, _probeManager)
     {
         executed = false;
-        stopped = new bool[numOutputs];
-        for (unsigned s = 0; s < numOutputs; s++)
-            stopped[s] = false;
-    }
-
-    ~CRoxieServerInternalSinkActivity()
-    {
-        delete [] stopped;
     }
 
     virtual void reset()
     {
-        for (unsigned s = 0; s < numOutputs; s++)
-            stopped[s] = false;
         executed = false;
         exception.clear();
         CRoxieServerActivity::reset();
@@ -2170,18 +2145,6 @@ public:
         return NULL;
     }
 
-    virtual void stopSink(unsigned outputIdx, bool abort)
-    {
-        if (!stopped[outputIdx])
-        {
-            stopped[outputIdx] = true;
-            for (unsigned s = 0; s < numOutputs; s++)
-                if (!stopped[s])
-                    return;
-            stop(abort); // all outputs stopped - stop parent.
-        }
-    }
-
     virtual const void *nextInGroup()
     {
         throwUnexpected(); // I am nobody's input
@@ -4488,7 +4451,7 @@ class CRoxieServerApplyActivity : public CRoxieServerInternalSinkActivity
 
 public:
     CRoxieServerApplyActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper((IHThorApplyArg &) basehelper)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorApplyArg &) basehelper)
     {
     }
 
@@ -6034,8 +5997,8 @@ class CRoxieServerLocalResultWriteActivity : public CRoxieServerInternalSinkActi
     unsigned graphId;
 
 public:
-    CRoxieServerLocalResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorLocalResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerLocalResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorLocalResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -6099,7 +6062,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerLocalResultWriteActivity(this, _probeManager, graphId, usageCount);
+        return new CRoxieServerLocalResultWriteActivity(this, _probeManager, graphId);
     }
 
 };
@@ -6265,8 +6228,8 @@ class CRoxieServerGraphLoopResultWriteActivity : public CRoxieServerInternalSink
     unsigned graphId;
 
 public:
-    CRoxieServerGraphLoopResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerGraphLoopResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -6370,7 +6333,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerGraphLoopResultWriteActivity(this, _probeManager, graphId, usageCount);
+        return new CRoxieServerGraphLoopResultWriteActivity(this, _probeManager, graphId);
     }
 
 };
@@ -8917,8 +8880,8 @@ class CRoxieServerPipeWriteActivity : public CRoxieServerInternalSinkActivity
     bool recreate;
     bool inputExhausted;
 public:
-    CRoxieServerPipeWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorPipeWriteArg &)basehelper)
+    CRoxieServerPipeWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorPipeWriteArg &)basehelper)
     {
         recreate = helper.recreateEachRow();
         firstRead = false;
@@ -9035,7 +8998,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerPipeWriteActivity(this, _probeManager, usageCount);
+        return new CRoxieServerPipeWriteActivity(this, _probeManager);
     }
 };
 
@@ -9556,8 +9519,8 @@ class CRoxieServerActionActivity : public CRoxieServerInternalSinkActivity
     IHThorActionArg &helper;
 public:
 
-    CRoxieServerActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorActionArg &)basehelper)
+    CRoxieServerActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorActionArg &)basehelper)
     {
     }
 
@@ -9577,7 +9540,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerActionActivity(this, _probeManager, usageCount);
+        return new CRoxieServerActionActivity(this, _probeManager);
     }
 };
 
@@ -10585,7 +10548,7 @@ protected:
 
 public:
     CRoxieServerDiskWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper((IHThorDiskWriteArg &)basehelper)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDiskWriteArg &)basehelper)
     {
         extend = ((helper.getFlags() & TDWextend) != 0);
         overwrite = ((helper.getFlags() & TDWoverwrite) != 0);
@@ -11038,7 +11001,7 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity,
 
 public:
     CRoxieServerIndexWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper(static_cast<IHThorIndexWriteArg &>(basehelper))
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper(static_cast<IHThorIndexWriteArg &>(basehelper))
     {
         overwrite = ((helper.getFlags() & TIWoverwrite) != 0);
         reccount = 0;
@@ -19121,8 +19084,8 @@ class CRoxieServerWorkUnitWriteActivity : public CRoxieServerInternalSinkActivit
     IRoxieServerContext *serverContext;
 
 public:
-    CRoxieServerWorkUnitWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread, unsigned _numOutputs)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
+    CRoxieServerWorkUnitWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
     {
         grouped = (helper.getFlags() & POFgrouped) != 0;
         serverContext = NULL;
@@ -19259,7 +19222,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerWorkUnitWriteActivity(this, _probeManager, isReread, usageCount);
+        return new CRoxieServerWorkUnitWriteActivity(this, _probeManager, isReread);
     }
 
 };
@@ -19277,8 +19240,8 @@ class CRoxieServerRemoteResultActivity : public CRoxieServerInternalSinkActivity
     IHThorRemoteResultArg &helper;
 
 public:
-    CRoxieServerRemoteResultActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorRemoteResultArg &)basehelper)
+    CRoxieServerRemoteResultActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorRemoteResultArg &)basehelper)
     {
     }
 
@@ -19302,7 +19265,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerRemoteResultActivity(this, _probeManager, usageCount);
+        return new CRoxieServerRemoteResultActivity(this, _probeManager);
     }
 
 };

+ 0 - 1
roxie/ccd/ccdserver.hpp

@@ -250,7 +250,6 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual void executeChild(size32_t & retSize, void * & ret, unsigned parentExtractSize, const byte * parentExtract) = 0;
     virtual void serializeCreateStartContext(MemoryBuffer &out) = 0;
     virtual void serializeExtra(MemoryBuffer &out) = 0;
-    virtual void stopSink(unsigned idx, bool abort) = 0;
 //Functions to support result streaming between parallel loop/graphloop/library implementations
     virtual IRoxieInput * querySelectOutput(unsigned id) = 0;
     virtual bool querySetStreamInput(unsigned id, IRoxieInput * _input) = 0;