Explorar el Código

Merge pull request #6298 from jakesmith/hpcc-12010

HPCC-12010 Fix deadlock in onActivity cache building

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday hace 11 años
padre
commit
e61ad9e8e5
Se han modificado 1 ficheros con 101 adiciones y 103 borrados
  1. 101 103
      esp/services/ws_smc/ws_smcService.cpp

+ 101 - 103
esp/services/ws_smc/ws_smcService.cpp

@@ -76,26 +76,6 @@ void AccessFailure(IEspContext& context, char const * msg,...)
     AUDIT(AUDIT_TYPE_ACCESS_FAILURE,buf.str());
 }
 
-struct QueueWrapper
-{
-    QueueWrapper(const char* targetName, const char* queueExt)
-    {
-        StringBuffer name;
-        name.append(targetName).append('.').append(queueExt);
-        queue.setown(createJobQueue(name.str()));
-    }
-
-    QueueWrapper(const char* queueName)
-    {
-        queue.setown(createJobQueue(queueName));
-    }
-
-    operator IJobQueue*() { return queue.get(); }
-    IJobQueue* operator->() { return queue.get(); }
-
-    Owned<IJobQueue> queue;
-};
-
 struct QueueLock
 {
     QueueLock(IJobQueue* q): queue(q) { queue->lock(); }
@@ -1266,17 +1246,18 @@ bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            IJobQueueItem * item0 = queue->getItem(index);
-            IJobQueueItem * item = queue->getItem(index+1);
-            if(item && item0 && (item0->getPriority() == item->getPriority()))
-                queue->moveAfter(req.getWuid(),item->queryWUID());
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
+            {
+                Owned<IJobQueueItem> item0 = queue->getItem(index);
+                Owned<IJobQueueItem> item = queue->getItem(index+1);
+                if(item && item0 && (item0->getPriority() == item->getPriority()))
+                    queue->moveAfter(req.getWuid(),item->queryWUID());
+            }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1294,17 +1275,18 @@ bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        unsigned index=queue->findRank(req.getWuid());
-        if(index>0 && index<queue->ordinality())
         {
-            IJobQueueItem * item0 = queue->getItem(index);
-            IJobQueueItem * item = queue->getItem(index-1);
-            if(item && item0 && (item0->getPriority() == item->getPriority()))
-                queue->moveBefore(req.getWuid(),item->queryWUID());
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            unsigned index=queue->findRank(req.getWuid());
+            if(index>0 && index<queue->ordinality())
+            {
+                Owned<IJobQueueItem> item0 = queue->getItem(index);
+                Owned<IJobQueueItem> item = queue->getItem(index-1);
+                if(item && item0 && (item0->getPriority() == item->getPriority()))
+                    queue->moveBefore(req.getWuid(),item->queryWUID());
+            }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1322,33 +1304,35 @@ bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            int priority0 = queue->getItem(index)->getPriority();
-            unsigned biggestIndoxInSamePriority = index;
-            unsigned nextIndex = biggestIndoxInSamePriority + 1;
-            while (nextIndex<queue->ordinality())
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
             {
-                IJobQueueItem * item = queue->getItem(nextIndex);
-                if (priority0 != item->getPriority())
+                Owned<IJobQueueItem> item = queue->getItem(index);
+                int priority0 = item->getPriority();
+                unsigned biggestIndoxInSamePriority = index;
+                unsigned nextIndex = biggestIndoxInSamePriority + 1;
+                while (nextIndex<queue->ordinality())
                 {
-                    break;
+                    item.setown(queue->getItem(nextIndex));
+                    if (priority0 != item->getPriority())
+                    {
+                        break;
+                    }
+                    biggestIndoxInSamePriority = nextIndex;
+                    nextIndex++;
                 }
-                biggestIndoxInSamePriority = nextIndex;
-                nextIndex++;
-            }
 
-            if (biggestIndoxInSamePriority != index)
-            {
-                IJobQueueItem * item = queue->getItem(biggestIndoxInSamePriority);
-                queue->moveAfter(req.getWuid(),item->queryWUID());
+                if (biggestIndoxInSamePriority != index)
+                {
+                    item.setown(queue->getItem(biggestIndoxInSamePriority));
+                    queue->moveAfter(req.getWuid(), item->queryWUID());
+                }
             }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1366,30 +1350,33 @@ bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEsp
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index>0 && index<queue->ordinality())
         {
-            int priority0 = queue->getItem(index)->getPriority();
-            unsigned smallestIndoxInSamePriority = index;
-            int nextIndex = smallestIndoxInSamePriority - 1;
-            while (nextIndex >= 0)
+            Owned<IJobQueue> queue=createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+
+            unsigned index=queue->findRank(req.getWuid());
+            if (index>0 && index<queue->ordinality())
             {
-                IJobQueueItem * item = queue->getItem(nextIndex);
-                if (priority0 != item->getPriority())
+                Owned<IJobQueueItem> item = queue->getItem(index);
+                int priority0 = item->getPriority();
+                unsigned smallestIndoxInSamePriority = index;
+                int nextIndex = smallestIndoxInSamePriority - 1;
+                while (nextIndex >= 0)
                 {
-                    break;
+                    item.setown(queue->getItem(nextIndex));
+                    if (priority0 != item->getPriority())
+                    {
+                        break;
+                    }
+                    smallestIndoxInSamePriority = nextIndex;
+                    nextIndex--;
                 }
-                smallestIndoxInSamePriority = nextIndex;
-                nextIndex--;
-            }
 
-            if (smallestIndoxInSamePriority != index)
-            {
-                IJobQueueItem * item = queue->getItem(smallestIndoxInSamePriority);
-                queue->moveBefore(req.getWuid(),item->queryWUID());
+                if (smallestIndoxInSamePriority != index)
+                {
+                    item.setown(queue->getItem(smallestIndoxInSamePriority));
+                    queue->moveBefore(req.getWuid(), item->queryWUID());
+                }
             }
         }
 
@@ -1412,16 +1399,17 @@ bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
 
         secAbortWorkUnit(req.getWuid(), *context.querySecManager(), *context.queryUser());
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            if(!queue->cancelInitiateConversation(req.getWuid()))
-                throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
-        }
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
 
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
+            {
+                if(!queue->cancelInitiateConversation(req.getWuid()))
+                    throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
+            }
+        }
         AccessSuccess(context, "Removed job %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1439,10 +1427,12 @@ bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        StringBuffer info;
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        queue->stop(createQueueActionInfo(context, "stopped", req, info));
-        AccessSuccess(context, "Stopped queue %s",req.getCluster());
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            StringBuffer info;
+            queue->stop(createQueueActionInfo(context, "stopped", req, info));
+        }
+        AccessSuccess(context, "Stopped queue %s", req.getCluster());
         clearActivityInfoCache();
 
         resp.setRedirectUrl("/WsSMC/");
@@ -1460,10 +1450,12 @@ bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEs
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        StringBuffer info;
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        queue->resume(createQueueActionInfo(context, "resumed", req, info));
-        AccessSuccess(context, "Resumed queue %s",req.getCluster());
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            StringBuffer info;
+            queue->resume(createQueueActionInfo(context, "resumed", req, info));
+        }
+        AccessSuccess(context, "Resumed queue %s", req.getCluster());
         clearActivityInfoCache();
 
         resp.setRedirectUrl("/WsSMC/");
@@ -1498,10 +1490,12 @@ bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        StringBuffer info;
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        queue->pause(createQueueActionInfo(context, "paused", req, info));
-        AccessSuccess(context, "Paused queue %s",req.getCluster());
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            StringBuffer info;
+            queue->pause(createQueueActionInfo(context, "paused", req, info));
+        }
+        AccessSuccess(context, "Paused queue %s", req.getCluster());
         clearActivityInfoCache();
 
         resp.setRedirectUrl("/WsSMC/");
@@ -1518,11 +1512,14 @@ bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
     try
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
         {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
             QueueLock lock(queue);
             for(unsigned i=0;i<queue->ordinality();i++)
-                secAbortWorkUnit(queue->getItem(i)->queryWUID(), *context.querySecManager(), *context.queryUser());
+            {
+                Owned<IJobQueueItem> item = queue->getItem(i);
+                secAbortWorkUnit(item->queryWUID(), *context.querySecManager(), *context.queryUser());
+            }
             queue->clear();
         }
         AccessSuccess(context, "Cleared queue %s",req.getCluster());
@@ -1559,10 +1556,11 @@ bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &re
 
         // set job priority
         int priority = lw->getPriorityValue();
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        queue->changePriority(req.getWuid(),priority);
-
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            queue->changePriority(req.getWuid(),priority);
+        }
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
     }