Quellcode durchsuchen

HPCC-11477 Activities are being reset excessively

Better fix for the issue that was partially fixed in 4.2.6

Also fix the case where a sink activity was still in reset state even though
there were upstream activities started.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman vor 11 Jahren
Ursprung
Commit
6c926441e0
1 geänderte Dateien mit 35 neuen und 20 gelöschten Zeilen
  1. 35 20
      roxie/ccd/ccdserver.cpp

+ 35 - 20
roxie/ccd/ccdserver.cpp

@@ -944,7 +944,13 @@ public:
     {
         CriticalBlock cb(statecrit);
         if (traceStartStop)
-            DBGLOG("%p destroy state=%s", this, queryStateText(state)); // Note- CTXLOG may not be safe
+        {
+            DBGLOG("%p destroy %d state=%s", this, activityId, queryStateText(state)); // Note- CTXLOG may not be safe
+            if (watchActivityId && watchActivityId==activityId)
+            {
+                DBGLOG("WATCH: %p destroy %d state=%s", this, activityId, queryStateText(state)); // Note- CTXLOG may not be safe
+            }
+        }
         if (state!=STATEreset)
         {
             DBGLOG("STATE: Activity %d destroyed but not reset", activityId);
@@ -1151,10 +1157,10 @@ public:
 #ifdef TRACE_STARTSTOP
         if (traceStartStop)
         {
-            CTXLOG("start %d", activityId);
+            CTXLOG("start %p %d", this, activityId);
             if (watchActivityId && watchActivityId==activityId)
             {
-                CTXLOG("WATCH: start %d", activityId);
+                CTXLOG("WATCH: start %p %d", this, activityId);
             }
         }
 #endif
@@ -1231,23 +1237,22 @@ public:
 
     inline void stop(bool aborting)
     {
-        if (state != STATEstopped)
+        if (state != STATEstopped && state != STATEreset)
         {
             CriticalBlock cb(statecrit);
-            if (state != STATEstopped)
-            {
-                if (state != STATEreset)
-                    state=STATEstopped;
 #ifdef TRACE_STARTSTOP
-                if (traceStartStop)
+            if (traceStartStop)
+            {
+                CTXLOG("stop %p %d (state currently %s)", this, activityId, queryStateText(state));
+                if (watchActivityId && watchActivityId==activityId)
                 {
-                    CTXLOG("stop %d", activityId);
-                    if (watchActivityId && watchActivityId==activityId)
-                    {
-                        CTXLOG("WATCH: stop %d", activityId);
-                    }
+                    CTXLOG("WATCH: stop %p %d", this, activityId);
                 }
+            }
 #endif
+            if (state != STATEstopped && state != STATEreset)
+            {
+                state=STATEstopped;
                 // NOTE - this is needed to ensure that dependencies which were not used are properly stopped
                 ForEachItemIn(idx, dependencies)
                 {
@@ -1289,10 +1294,10 @@ public:
 #ifdef TRACE_STARTSTOP
                 if (traceStartStop)
                 {
-                    CTXLOG("reset %d", activityId);
+                    CTXLOG("reset %p %d", this, activityId);
                     if (watchActivityId && watchActivityId==activityId)
                     {
-                        CTXLOG("WATCH: reset %d", activityId);
+                        CTXLOG("WATCH: reset %p %d", this, activityId);
                     }
                 }
 #endif
@@ -5872,12 +5877,17 @@ public:
         graph->setResult(helper.querySequence(), result);
     }
 
+    virtual const void *nextInGroup()
+    {
+        return input->nextInGroup(); // I can act as a passthrough input
+    }
+
     IRoxieInput * querySelectOutput(unsigned id)
     {
         if (id == helper.querySequence())
         {
-            executed = true;
-            return LINK(input);
+            executed = true;  // Ensure that we don't try to pull as a sink as well as via the passthrough
+            return LINK(this);
         }
         return NULL;
     }
@@ -5950,12 +5960,17 @@ public:
         graph->setResult(helper.querySequence(), result);
     }
 
+    virtual const void *nextInGroup()
+    {
+        return input->nextInGroup(); // I can act as a passthrough input
+    }
+
     IRoxieInput * querySelectOutput(unsigned id)
     {
         if (id == helper.querySequence())
         {
-            executed = true;
-            return LINK(input);
+            executed = true;  // Ensure that we don't try to pull as a sink as well as via the passthrough
+            return LINK(this);
         }
         return NULL;
     }