Browse Source

Merge pull request #6629 from ghalliday/issue12563

HPCC-12563 Clean up scheduling workunits that are failed then deleted

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
3df3b29b9f
3 changed files with 24 additions and 1 deletions
  1. 9 0
      common/workunit/workunit.cpp
  2. 1 0
      common/workunit/workunit.hpp
  3. 14 1
      ecl/eclscheduler/eclscheduler.cpp

+ 9 - 0
common/workunit/workunit.cpp

@@ -10020,6 +10020,15 @@ public:
         eventQueue->addPropTree("Item", eventItem.getLink());
     }
 
+    virtual void remove()
+    {
+        if (baseconn)
+        {
+            baseconn->close(true);
+            baseconn.clear();
+        }
+    }
+
 private:
     void resetItemStateAndDependents(IWorkflowItemArray * workflow, unsigned wfid) const
     {

+ 1 - 0
common/workunit/workunit.hpp

@@ -1293,6 +1293,7 @@ interface IWorkflowScheduleConnection : extends IInterface
     virtual bool queryActive() = 0;
     virtual bool pull(IWorkflowItemArray * workflow) = 0;
     virtual void push(const char * name, const char * text) = 0;
+    virtual void remove() = 0;
 };
 
 

+ 14 - 1
ecl/eclscheduler/eclscheduler.cpp

@@ -135,7 +135,20 @@ private:
         wfconn->lock();
         wfconn->push(name, text);
         if(!wfconn->queryActive())
-            runWorkUnit(wuid);
+        {
+            if (!runWorkUnit(wuid))
+            {
+                //The work unit failed to run for some reason.. check if it has disappeared
+                Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+                Owned<IConstWorkUnit> w = factory->openWorkUnit(wuid, false);
+                if (!w)
+                {
+                    ERRLOG("Scheduled workunit %s no longer exists - descheduling", wuid);
+                    descheduleWorkunit(wuid);
+                    wfconn->remove();
+                }
+            }
+        }
         wfconn->unlock();
     }