Browse Source

HPCC-18189 Replace some SpinBlock instances with atomics

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 7 years ago
parent
commit
4dcf17ff09

+ 2 - 5
esp/services/ws_dfu/ws_dfuService.hpp

@@ -23,6 +23,7 @@
 #include "fileview.hpp"
 #include "fvrelate.hpp"
 #include "dadfs.hpp"
+#include <atomic>
 
 class CThorNodeGroup: public CInterface
 {
@@ -134,8 +135,7 @@ private:
     Mutex m_superfilemutex;
     unsigned nodeGroupCacheTimeout;
     Owned<CThorNodeGroupCache> thorNodeGroupCache;
-    bool m_daliDetached = false;
-    SpinLock m_daliDetachedStateLock;
+    std::atomic<bool> m_daliDetached{false};
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -229,21 +229,18 @@ private:
     bool getQueryFile(const char *logicalName, const char *querySet, const char *queryID, IEspDFUFileDetail &fileDetails);
     bool attachServiceToDali() override
     {
-        SpinBlock b(m_daliDetachedStateLock);
         m_daliDetached = false;
         return true;
     }
 
     bool detachServiceFromDali() override
     {
-        SpinBlock b(m_daliDetachedStateLock);
         m_daliDetached = true;
         return true;
     }
 
     bool isDetachedFromDali()
     {
-        SpinBlock b(m_daliDetachedStateLock);
         return m_daliDetached;
     }
 

+ 7 - 4
roxie/ccd/ccdserver.cpp

@@ -27131,11 +27131,14 @@ protected:
 
         virtual void notifyAbort(IException *E)
         {
-            SpinBlock b(abortLock);
-            if (!aborted && QUERYINTERFACE(E, InterruptedSemaphoreException) == NULL)
+            if (QUERYINTERFACE(E, InterruptedSemaphoreException) == NULL)
             {
-                aborted = true;
-                exception.set(E);
+                SpinBlock b(abortLock);
+                if (!aborted)
+                {
+                    aborted = true;
+                    exception.set(E);
+                }
             }
         }
 

+ 6 - 6
roxie/udplib/udptrs.cpp

@@ -29,6 +29,7 @@
 #include <sys/resource.h>
 #endif
 #include <math.h>
+#include <atomic>
 
 unsigned udpOutQsPriority = 0;
 unsigned udpMaxRetryTimedoutReqs = 0; // 0 means off (keep retrying forever)
@@ -720,8 +721,7 @@ class CSendManager : implements ISendManager, public CInterface
     send_data         *data;
     Linked<TokenBucket> bucket;
     
-    SpinLock msgSeqLock;
-    unsigned msgSeq;
+    std::atomic<unsigned> msgSeq{0};
 
     static bool comparePacket(void *pkData, void *key) 
     {
@@ -732,10 +732,11 @@ class CSendManager : implements ISendManager, public CInterface
 
     inline unsigned getNextMessageSequence()
     {
-        SpinBlock b(msgSeqLock);
-        unsigned res = ++msgSeq;
-        if (!res)
+        unsigned res;
+        do
+        {
             res = ++msgSeq;
+        } while (unlikely(!res));
         return res;
     }
         
@@ -761,7 +762,6 @@ public:
         data = new send_data(*this, sniffer_port, sniffer_multicast_ip, bucket);
         send_flow = new send_send_flow(*this, numNodes);
         receive_flow = new send_receive_flow(*this, client_flow_port);
-        msgSeq = 0;
     }
 
 

+ 12 - 15
thorlcr/slave/slave.cpp

@@ -51,8 +51,6 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
 
 ProcessSlaveActivity::ProcessSlaveActivity(CGraphElementBase *container) : CSlaveActivity(container), threaded("ProcessSlaveActivity", this)
 {
-    processed = 0;
-    lastCycles = 0;
 }
 
 void ProcessSlaveActivity::beforeDispose()
@@ -83,16 +81,13 @@ void ProcessSlaveActivity::threadmain()
 #ifdef TIME_ACTIVITIES
         if (timeActivities)
         {
-            {
-                SpinBlock b(cycleLock);
-                lastCycles = get_cycles_now(); // serializeStats will reset
-            }
+            lastCycles = get_cycles_now(); // serializeStats will reset
+
             process();
-            {
-                SpinBlock b(cycleLock);
-                totalCycles.totalCycles += get_cycles_now()-lastCycles;
-                lastCycles = 0; // signal not processing
-            }
+
+            // set lastCycles to 0 to signal not processing
+            unsigned __int64 finalCycles = lastCycles.exchange(0);
+            totalCycles.totalCycles += get_cycles_now()-finalCycles;
         }
         else
             process();
@@ -173,12 +168,14 @@ void ProcessSlaveActivity::serializeStats(MemoryBuffer &mb)
 #ifdef TIME_ACTIVITIES
     if (timeActivities)
     {
-        SpinBlock b(cycleLock);
-        if (lastCycles)
+        unsigned __int64 curCycles = lastCycles;
+        if (curCycles)
         {
             unsigned __int64 nowCycles = get_cycles_now();
-            totalCycles.totalCycles += nowCycles-lastCycles;
-            lastCycles = nowCycles; // time accounted for
+            //Update lastCycles to the current number of cycles - unless it has been set to 0 in the meantime
+            //Use std::memory_order_relaxed because there is no requirement for other variables to be synchronized.
+            if (lastCycles.compare_exchange_strong(curCycles, nowCycles, std::memory_order_relaxed))
+                totalCycles.totalCycles += nowCycles-curCycles;
         }
     }
 #endif

+ 3 - 3
thorlcr/slave/slave.ipp

@@ -26,6 +26,7 @@
 #include "thormisc.hpp"
 
 #include "slave.hpp"
+#include <atomic>
 
 interface IInputSteppingMeta;
 interface IRangeCompare;
@@ -34,9 +35,8 @@ class ProcessSlaveActivity : public CSlaveActivity, implements IThreaded
 protected:
     Owned<IThorException> exception;
     CThreadedPersistent threaded;
-    rowcount_t processed;
-    unsigned __int64 lastCycles;
-    SpinLock cycleLock; // MORE: Could probably remove this and use atomic variables instead
+    rowcount_t processed = 0;
+    std::atomic<unsigned __int64> lastCycles{0};
 
     virtual void endProcess() = 0;
     virtual void process() { }

+ 1 - 2
thorlcr/thorutil/thbuf.cpp

@@ -678,7 +678,6 @@ class CRowSet : public CSimpleInterface, implements IInterface
     unsigned chunk;
     CThorExpandingRowArray rows;
     CSharedWriteAheadBase &sharedWriteAhead;
-    mutable SpinLock lock;
     mutable CriticalSection crit;
 public:
     CRowSet(CSharedWriteAheadBase &_sharedWriteAhead, unsigned _chunk, unsigned maxRows);
@@ -1177,7 +1176,7 @@ bool CRowSet::Release() const
 {
     {
         // NB: Occasionally, >1 thread may be releasing a CRowSet concurrently and miss a opportunity to reuse, but that's ok.
-        SpinBlock b(lock);
+        //No need to protect with a lock, because if not shared then it cannot be called at the same time by another thread,
         if (!IsShared())
             sharedWriteAhead.reuse((CRowSet *)this);
     }