Browse Source

HPCC-12010 Fix deadlock in onActivity cache building

If building, whilst a queue request is processing, e.g.
onSetJobPriority, the two threads involved can deadlock each
other.
Regression introduced by HPCC-11637

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 11 years ago
parent
commit
8fd18be794
1 changed files with 64 additions and 54 deletions
  1. 64 54
      esp/services/ws_smc/ws_smcService.cpp

+ 64 - 54
esp/services/ws_smc/ws_smcService.cpp

@@ -1266,17 +1266,18 @@ bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            IJobQueueItem * item0 = queue->getItem(index);
-            IJobQueueItem * item = queue->getItem(index+1);
-            if(item && item0 && (item0->getPriority() == item->getPriority()))
-                queue->moveAfter(req.getWuid(),item->queryWUID());
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
+            {
+                Owned<IJobQueueItem> item0 = queue->getItem(index);
+                Owned<IJobQueueItem> item = queue->getItem(index+1);
+                if(item && item0 && (item0->getPriority() == item->getPriority()))
+                    queue->moveAfter(req.getWuid(),item->queryWUID());
+            }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1294,17 +1295,18 @@ bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        unsigned index=queue->findRank(req.getWuid());
-        if(index>0 && index<queue->ordinality())
         {
-            IJobQueueItem * item0 = queue->getItem(index);
-            IJobQueueItem * item = queue->getItem(index-1);
-            if(item && item0 && (item0->getPriority() == item->getPriority()))
-                queue->moveBefore(req.getWuid(),item->queryWUID());
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            unsigned index=queue->findRank(req.getWuid());
+            if(index>0 && index<queue->ordinality())
+            {
+                Owned<IJobQueueItem> item0 = queue->getItem(index);
+                Owned<IJobQueueItem> item = queue->getItem(index-1);
+                if(item && item0 && (item0->getPriority() == item->getPriority()))
+                    queue->moveBefore(req.getWuid(),item->queryWUID());
+            }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1322,33 +1324,35 @@ bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspS
     {
         checkAccess(context,THORQUEUE_FEATURE,SecAccess_Full);
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            int priority0 = queue->getItem(index)->getPriority();
-            unsigned biggestIndoxInSamePriority = index;
-            unsigned nextIndex = biggestIndoxInSamePriority + 1;
-            while (nextIndex<queue->ordinality())
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
             {
-                IJobQueueItem * item = queue->getItem(nextIndex);
-                if (priority0 != item->getPriority())
+                Owned<IJobQueueItem> item = queue->getItem(index);
+                int priority0 = item->getPriority();
+                unsigned biggestIndoxInSamePriority = index;
+                unsigned nextIndex = biggestIndoxInSamePriority + 1;
+                while (nextIndex<queue->ordinality())
                 {
-                    break;
+                    Owned<IJobQueueItem> item = queue->getItem(nextIndex);
+                    if (priority0 != item->getPriority())
+                    {
+                        break;
+                    }
+                    biggestIndoxInSamePriority = nextIndex;
+                    nextIndex++;
                 }
-                biggestIndoxInSamePriority = nextIndex;
-                nextIndex++;
-            }
 
-            if (biggestIndoxInSamePriority != index)
-            {
-                IJobQueueItem * item = queue->getItem(biggestIndoxInSamePriority);
-                queue->moveAfter(req.getWuid(),item->queryWUID());
+                if (biggestIndoxInSamePriority != index)
+                {
+                    Owned<IJobQueueItem> item = queue->getItem(biggestIndoxInSamePriority);
+                    queue->moveAfter(req.getWuid(),item->queryWUID());
+                }
             }
         }
-
         AccessSuccess(context, "Changed job priority %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1372,12 +1376,13 @@ bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEsp
         unsigned index=queue->findRank(req.getWuid());
         if(index>0 && index<queue->ordinality())
         {
-            int priority0 = queue->getItem(index)->getPriority();
+            Owned<IJobQueueItem> item = queue->getItem(index);
+            int priority0 = item->getPriority();
             unsigned smallestIndoxInSamePriority = index;
             int nextIndex = smallestIndoxInSamePriority - 1;
             while (nextIndex >= 0)
             {
-                IJobQueueItem * item = queue->getItem(nextIndex);
+                Owned<IJobQueueItem> item = queue->getItem(nextIndex);
                 if (priority0 != item->getPriority())
                 {
                     break;
@@ -1388,7 +1393,7 @@ bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEsp
 
             if (smallestIndoxInSamePriority != index)
             {
-                IJobQueueItem * item = queue->getItem(smallestIndoxInSamePriority);
+                Owned<IJobQueueItem> item = queue->getItem(smallestIndoxInSamePriority);
                 queue->moveBefore(req.getWuid(),item->queryWUID());
             }
         }
@@ -1412,16 +1417,17 @@ bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMC
 
         secAbortWorkUnit(req.getWuid(), *context.querySecManager(), *context.queryUser());
 
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        
-        unsigned index=queue->findRank(req.getWuid());
-        if(index<queue->ordinality())
         {
-            if(!queue->cancelInitiateConversation(req.getWuid()))
-                throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
-        }
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
 
+            unsigned index=queue->findRank(req.getWuid());
+            if(index<queue->ordinality())
+            {
+                if(!queue->cancelInitiateConversation(req.getWuid()))
+                    throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
+            }
+        }
         AccessSuccess(context, "Removed job %s",req.getWuid());
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
@@ -1522,7 +1528,10 @@ bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEsp
         {
             QueueLock lock(queue);
             for(unsigned i=0;i<queue->ordinality();i++)
-                secAbortWorkUnit(queue->getItem(i)->queryWUID(), *context.querySecManager(), *context.queryUser());
+            {
+                Owned<IJobQueueItem> item = queue->getItem(i);
+                secAbortWorkUnit(item->queryWUID(), *context.querySecManager(), *context.queryUser());
+            }
             queue->clear();
         }
         AccessSuccess(context, "Cleared queue %s",req.getCluster());
@@ -1559,10 +1568,11 @@ bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &re
 
         // set job priority
         int priority = lw->getPriorityValue();
-        Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
-        QueueLock lock(queue);
-        queue->changePriority(req.getWuid(),priority);
-
+        {
+            Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
+            QueueLock lock(queue);
+            queue->changePriority(req.getWuid(),priority);
+        }
         clearActivityInfoCache();
         resp.setRedirectUrl("/WsSMC/");
     }