浏览代码

HPCC-21126 Refactor CXRefExBuilderThread class

1. Fix clang warning in dfuXRefService.hpp;
2. Add another CriticalSection for accessing job queue;
3. Clean the code in the CXRefExBuilderThread class.

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 6 年之前
父节点
当前提交
6ad4f9bf70
共有 2 个文件被更改,包括 86 次插入67 次删除
  1. 4 4
      esp/services/ws_dfu/ws_dfuXRefService.cpp
  2. 82 63
      esp/services/ws_dfu/ws_dfuXRefService.hpp

+ 4 - 4
esp/services/ws_dfu/ws_dfuXRefService.cpp

@@ -505,15 +505,15 @@ bool CWsDfuXRefEx::onDFUXRefBuild(IEspContext &context, IEspDFUXRefBuildRequest
         }
         StringBuffer returnStr;
         ESPSerializationFormat fmt = context.getResponseFormat();
-        if (m_XRefbuilder->IsQueued(req.getCluster()) )
+        if (m_XRefbuilder->isQueued(req.getCluster()) )
             appendReplyMessage(fmt == ESPSerializationJSON, returnStr,"/WsDFUXRef/DFUXRefList","An XRef build for cluster %s is in process. Click here to return to the main XRef List.",req.getCluster());
-        else if (!m_XRefbuilder->IsRunning())
+        else if (!m_XRefbuilder->isRunning())
             appendReplyMessage(fmt == ESPSerializationJSON, returnStr,"/WsDFUXRef/DFUXRefList","Running XRef Process. Click here to return to the main XRef List.");
         else
             appendReplyMessage(fmt == ESPSerializationJSON, returnStr,"/WsDFUXRef/DFUXRefList","someone is currently running a Xref build. Your request will be added to the queue. Please click here to return to the main page.");
 
 
-        m_XRefbuilder->QueueRequest(xRefNode,req.getCluster());
+        m_XRefbuilder->queueRequest(xRefNode,req.getCluster());
         resp.setDFUXRefActionResult(returnStr.str());
     }
     catch(IException* e)
@@ -532,7 +532,7 @@ bool CWsDfuXRefEx::onDFUXRefBuildCancel(IEspContext &context, IEspDFUXRefBuildCa
         StringBuffer username;
         context.getUserID(username);
 
-        m_XRefbuilder->Cancel();
+        m_XRefbuilder->cancel();
         StringBuffer returnStr;
         ESPSerializationFormat fmt = context.getResponseFormat();
         if (fmt == ESPSerializationJSON)

+ 82 - 63
esp/services/ws_dfu/ws_dfuXRefService.hpp

@@ -26,116 +26,135 @@
 
 class CXRefExBuilderThread : public Thread
 {
-    Owned<IXRefNode> m_xRefNode;
-    bool bRunning;
-    Mutex _boolMutex;
+    bool stopThread = false;
+    bool xRefRunning = false;
+    CriticalSection critRunningStatus;
+    CriticalSection critQueue;
     Semaphore m_sem;
-    SafeQueueOf<IXRefNode, false> m_pNodeQueue;
-    bool m_bRun;
-    StringBuffer _CurrentClusterName;
-    virtual void SetRunningStatus(bool bStatus)
+    SafeQueueOf<IXRefNode, false> nodeQueue;
+    StringBuffer currentClusterName;
+
+    void setRunningStatus(bool status)
+    {
+        CriticalBlock b(critRunningStatus);
+        xRefRunning = status;
+    }
+    IXRefNode* readNodeQueue()
     {
-        CriticalSection(_boolMutex);
-        bRunning = bStatus;
+        CriticalBlock b(critQueue);
+        return (IXRefNode*)nodeQueue.dequeue();
+    }
+    void writeNodeQueue(IXRefNode* xRefNode)
+    {
+        if (!xRefNode)
+            return;
+
+        CriticalBlock b(critQueue);
+        nodeQueue.enqueue(LINK(xRefNode));
     }
 public:
-   IMPLEMENT_IINTERFACE;
-   CXRefExBuilderThread() 
-   {
-       m_bRun = true;
-       bRunning = false; 
-   }
-
-   virtual void QueueRequest(IXRefNode* xRefNode,const char* cluster)
-   {
-       if(xRefNode == 0 || cluster == 0)
-           return;
-
-       CriticalSection(_RunningMutex);
-       
-       xRefNode->setCluster(cluster);
-       xRefNode->Link();
-       m_pNodeQueue.enqueue(xRefNode);
-       m_sem.signal();
-   }
-
-   ~CXRefExBuilderThread(){DBGLOG("Destroyed XRef thread");}
+    IMPLEMENT_IINTERFACE;
+
+    CXRefExBuilderThread() { };
+    ~CXRefExBuilderThread(){DBGLOG("Destroyed XRef thread");};
+
+    virtual void queueRequest(IXRefNode* xRefNode, const char* cluster)
+    {
+        if (!xRefNode || isEmptyString(cluster))
+            return;
+
+        xRefNode->setCluster(cluster);
+        writeNodeQueue(xRefNode);
+        m_sem.signal();
+    }
+
     virtual int run()
     {
         Link();
-        while(m_bRun)
+        while (!stopThread)
         {
             m_sem.wait();
-            if (m_pNodeQueue.ordinality() != 0)
-                RunXRef();
+            runXRef();
         }
         Release();
         return 0;
     }
 
-    void RunXRef()
+    void runXRef()
     {
-
         //catch all exceptions so we can signal for the new build to start
-        try{
-            SetRunningStatus(true);
-            Owned<IXRefNode> xRefNode  = (IXRefNode*)m_pNodeQueue.dequeue();
-            _CurrentClusterName.clear();
-            xRefNode->getCluster(_CurrentClusterName);
-            if (xRefNode->useSasha()) // if sasha processing just set submitted
-                xRefNode->setStatus("Submitted");
-            else {
-                Owned<IPropertyTree> tree = runXRefCluster(_CurrentClusterName.str(), xRefNode);
-                DBGLOG("finished run");
+        try
+        {
+            while (true)
+            {
+                Owned<IXRefNode> xRefNode  = readNodeQueue();
+                if (!xRefNode)
+                    break;
+
+                if (xRefNode->useSasha()) // if sasha processing just set submitted
+                    xRefNode->setStatus("Submitted");
+                else
+                {
+                    setRunningStatus(true);
+                    xRefNode->getCluster(currentClusterName);
+                    Owned<IPropertyTree> tree = runXRefCluster(currentClusterName.str(), xRefNode);
+                    DBGLOG("finished run XRef for %s", currentClusterName.str());
+                    currentClusterName.clear();
+                    setRunningStatus(false);
+                }
             }
-            SetRunningStatus(false);
         }
         catch(IException* e)
         {
             StringBuffer errorStr;
             e->errorMessage(errorStr);
-            ERRLOG("Exception thrown while running XREF: %s",errorStr.str());
+            ERRLOG("Exception thrown while running XREF: %s", errorStr.str());
             e->Release();
         }
         catch(...)
         {
             ERRLOG("Unknown Exception thrown from XREF");
         }
-        //Signal that we are ready to process another job if there is one....
-        m_sem.signal();
-
     }
 
-    virtual bool IsRunning()
+    virtual bool isRunning()
     {
-        CriticalSection(_boolMutex);
-        return bRunning;
+        CriticalBlock b(critRunningStatus);
+        return xRefRunning;
     }
-    virtual bool IsQueued(const char* Queue)
+    virtual bool isQueued(const char* clusterName)
     {
-        if(Queue == 0)
+        if (isEmptyString(clusterName))
             return false;
-        ForEachItemIn(x,m_pNodeQueue)
+
+        if (!currentClusterName.isEmpty() && streq(currentClusterName, clusterName))
+            return true;
+
+        CriticalBlock b(critQueue);
+        ForEachItemIn(x, nodeQueue)
         {
-            IXRefNode* Item = m_pNodeQueue.item(x);
+            IXRefNode* Item = nodeQueue.item(x);
             StringBuffer cachedCluster;
             Item->getCluster(cachedCluster);
-            if(strcmp(cachedCluster.str(),Queue) == 0 || strcmp(_CurrentClusterName.str(),Queue) == 0)
+            if (streq(cachedCluster, clusterName))
                 return true;
         }
         return false;
     }
-    virtual void Cancel()
+    virtual void cancel()
     {
-        while(m_pNodeQueue.ordinality() >0)
+        CriticalBlock b(critQueue);
+        while (nodeQueue.ordinality() > 0)
         {
-            Owned<IXRefNode> xRefNode  = (IXRefNode*)m_pNodeQueue.dequeue();
+            Owned<IXRefNode> xRefNode  = (IXRefNode*)nodeQueue.dequeue();
         }
         m_sem.signal();
     }
-    virtual void Shutdown()
+    virtual void stop()
     {
-        m_bRun = false;
+        stopThread = true;
+        m_sem.signal();
+        join();
     }
 };