ソースを参照

HPCC-16016 CheckAbnormalTermination could incorrectly report job failed

Signed-off-by: Mark Kelly <mark.kelly@lexisnexis.com>
Mark Kelly 8 年 前
コミット
48e965f57c

+ 32 - 31
common/workunit/workunit.cpp

@@ -2598,38 +2598,26 @@ void CWorkUnitFactory::clearAborting(const char *wuid)
     }
 }
 
-bool CWorkUnitFactory::checkAbnormalTermination(const char *wuid, WUState &state, SessionId agent)
+void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent)
 {
-    if (queryDaliServerVersion().compare("2.1")>=0)
+    WARNLOG("reportAbnormalTermination: session stopped unexpectedly: %" I64F "d state: %d", (__int64) agent, (int) state);
+    bool isEcl = false;
+    switch (state)
     {
-        if((agent>0) && querySessionManager().sessionStopped(agent, 0))
-        {
-            bool isEcl = false;
-            switch (state)
-            {
-                case WUStateCompiling:
-                    isEcl = true;
-                    // drop into
-                case WUStateRunning:
-                case WUStateBlocked:
-                    state = WUStateFailed;
-                    break;
-                case WUStateAborting:
-                    state = WUStateAborted;
-                    break;
-                default:
-                    return false;
-            }
-            WARNLOG("checkAbnormalTermination: workunit terminated: %" I64F "d state = %d",(__int64) agent, (int) state);
-            Owned<IWorkUnit> wu = updateWorkUnit(wuid, NULL, NULL);
-            wu->setState(state);
-            Owned<IWUException> e = wu->createException();
-            e->setExceptionCode(isEcl ? 1001 : 1000);
-            e->setExceptionMessage(isEcl ? "EclServer terminated unexpectedly" : "Workunit terminated unexpectedly");
-            return true;
-        }
+        case WUStateAborting:
+            state = WUStateAborted;
+            break;
+        case WUStateCompiling:
+            isEcl = true;
+            // drop into
+        default:
+            state = WUStateFailed;
     }
-    return false;
+    Owned<IWorkUnit> wu = updateWorkUnit(wuid, NULL, NULL);
+    wu->setState(state);
+    Owned<IWUException> e = wu->createException();
+    e->setExceptionCode(isEcl ? 1001 : 1000);
+    e->setExceptionMessage(isEcl ? "EclServer terminated unexpectedly" : "Workunit terminated unexpectedly");
 }
 
 static CriticalSection deleteDllLock;
@@ -2952,6 +2940,8 @@ public:
         Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), session, 0, SDS_LOCK_TIMEOUT);
         if (conn)
         {
+            SessionId agent = -1;
+            bool agentSessionStopped = false;
             unsigned start = msTick();
             loop
             {
@@ -2979,13 +2969,24 @@ public:
                 case WUStateDebugRunning:
                 case WUStateBlocked:
                 case WUStateAborting:
-                    SessionId agent = conn->queryRoot()->getPropInt64("@agentSession", -1);
-                    if (checkAbnormalTermination(wuid, ret, agent))
+                    if (agentSessionStopped)
                     {
+                        reportAbnormalTermination(wuid, ret, agent);
                         return ret;
                     }
+                    if (queryDaliServerVersion().compare("2.1")>=0)
+                    {
+                        agent = conn->queryRoot()->getPropInt64("@agentSession", -1);
+                        if((agent>0) && querySessionManager().sessionStopped(agent, 0))
+                        {
+                            agentSessionStopped = true;
+                            conn->reload();
+                            continue;
+                        }
+                    }
                     break;
                 }
+                agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
                 unsigned waited = msTick() - start;
                 if (timeout==-1)
                 {

+ 1 - 1
common/workunit/workunit.ipp

@@ -623,7 +623,7 @@ public:
     }
 
 protected:
-    bool checkAbnormalTermination(const char *wuid, WUState &state, SessionId agent);
+    void reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent);
 
     // These need to be implemented by the derived classes
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;

+ 16 - 2
plugins/cassandra/cassandrawu.cpp

@@ -3492,6 +3492,8 @@ public:
         CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
         statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
         statement.bindString(1, wuid);
+        SessionId agent = 0;
+        bool agentSessionStopped = false;
         unsigned start = msTick();
         loop
         {
@@ -3528,11 +3530,23 @@ public:
             case WUStateDebugRunning:
             case WUStateBlocked:
             case WUStateAborting:
-                SessionId agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
-                if (agent && checkAbnormalTermination(wuid, state, agent))
+                if (agentSessionStopped)
+                {
+                    reportAbnormalTermination(wuid, state, agent);
                     return state;
+                }
+                if (queryDaliServerVersion().compare("2.1")>=0)
+                {
+                    agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+                    if(agent && querySessionManager().sessionStopped(agent, 0))
+                    {
+                        agentSessionStopped = true;
+                        continue;
+                    }
+                }
                 break;
             }
+            agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
             unsigned waited = msTick() - start;
             if (timeout==-1)
             {