Browse Source

HPCC-25808 Remove remaining instances of atomic_t from the system

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 4 years ago
parent
commit
146418a709

+ 6 - 7
dali/base/dasess.cpp

@@ -1262,7 +1262,7 @@ class CCovenSessionManager: public CSessionManagerBase, implements ISessionManag
     Owned<IDaliLdapConnection> ldapconn;
     Owned<CLdapWorkItem> ldapworker;
     Semaphore ldapsig;
-    atomic_t ldapwaiting;
+    std::atomic<unsigned> ldapwaiting{0};
     Semaphore workthreadsem;
     bool stopping;
 
@@ -1293,7 +1293,6 @@ public:
     {
         mySessionId = queryCoven().getUniqueId(); // tell others in coven TBD
         registerSubscriptionManager(SESSION_PUBLISHER,this);
-        atomic_set(&ldapwaiting,0);
         workthreadsem.signal(10);
         stopping = false;
         ldapsig.signal();
@@ -1463,11 +1462,11 @@ public:
 #endif
         if ((ldapconn->getLDAPflags()&(DLF_SAFE|DLF_ENABLED))!=(DLF_SAFE|DLF_ENABLED))
             return ldapconn->getPermissions(key,obj,udesc,flags);
-        atomic_inc(&ldapwaiting);
+        ldapwaiting++;
         unsigned retries = 0;
         while (!stopping) {
             if (ldapsig.wait(1000)) {
-                atomic_dec(&ldapwaiting);
+                ldapwaiting--;
                 if (!ldapworker)
                     ldapworker.setown(CLdapWorkItem::get(ldapconn,workthreadsem));
                 if (ldapworker) {
@@ -1489,7 +1488,7 @@ public:
                                 return ret;
                             }
                         }
-                        if (atomic_read(&ldapwaiting)>10)   // give up quicker if piling up
+                        if (ldapwaiting>10)   // give up quicker if piling up
                             break;
                         if (i==5) { // one retry
                             ldapworker->stop(); // abandon thread
@@ -1510,7 +1509,7 @@ public:
                 return SecAccess_None;
             }
             else {
-                unsigned waiting = atomic_read(&ldapwaiting);
+                unsigned waiting = ldapwaiting;
                 static unsigned last=0;
                 static unsigned lasttick=0;
                 static unsigned first50=0;
@@ -1533,7 +1532,7 @@ public:
                     first50 = 0;
             }
         }
-        atomic_dec(&ldapwaiting);
+        ldapwaiting--;
         return SecAccess_None;
 #endif
     }

+ 6 - 6
dali/base/dautils.hpp

@@ -156,14 +156,14 @@ protected:
 class da_decl CTransactionLogTracker
 {
     unsigned max;
-    atomic_t *counts;
+    std::atomic<unsigned> * counts;
 public:
     CTransactionLogTracker(int _max) : max(_max)
     {
-        counts = new atomic_t[max+1]; // +1 reserve for unknown commands
+        counts = new std::atomic<unsigned>[max+1]; // +1 reserve for unknown commands
         unsigned t=0;
         for (; t<=max; t++)
-            atomic_set(&counts[t],0);
+            counts[t] = 0;
     }
     ~CTransactionLogTracker()
     {
@@ -172,15 +172,15 @@ public:
     inline const unsigned &getMax() const { return max; }
     inline void startTransaction(unsigned cmd)
     {
-        atomic_inc(&counts[cmd]);
+        counts[cmd]++;
     }
     inline void endTransaction(unsigned cmd)
     {
-        atomic_dec(&counts[cmd]);
+        counts[cmd]--;
     }
     unsigned getTransactionCount(unsigned cmd) const
     {
-        return (unsigned)atomic_read(&counts[cmd]);
+        return counts[cmd].load();
     }
     virtual StringBuffer &getCmdText(unsigned cmd, StringBuffer &ret) const = 0;
 };

+ 6 - 6
dali/dfu/dfurun.cpp

@@ -533,7 +533,7 @@ class CDFUengine: public CInterface, implements IDFUengine
 
     CriticalSection monitorsect;
     CriticalSection subcopysect;
-    atomic_t runningflag;
+    std::atomic<unsigned> runningflag;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -541,7 +541,7 @@ public:
     CDFUengine()
     {
         defaultTransferBufferSize = 0;
-        atomic_set(&runningflag,1);
+        runningflag = 1;
         eventpusher.setown(getScheduleEventPusher());
     }
 
@@ -1069,12 +1069,12 @@ public:
         // only clear cache when nothing running (bit of a kludge)
         class CenvClear
         {
-            atomic_t &running;
+            std::atomic<unsigned> &running;
         public:
-            CenvClear(atomic_t &_running)
+            CenvClear(std::atomic<unsigned> &_running)
                 : running(_running)
             {
-                if (atomic_dec_and_test(&running)) {
+                if (--running == 0) {
                     Owned<IEnvironmentFactory> envf = getEnvironmentFactory(false);
                     Owned<IConstEnvironment> env = envf->openEnvironment();
                     env->clearCache();
@@ -1082,7 +1082,7 @@ public:
             }
             ~CenvClear()
             {
-                atomic_inc(&running);
+                ++running;
             }
         } cenvclear(runningflag);
         Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();

+ 4 - 4
dali/sasha/saserver.cpp

@@ -50,14 +50,14 @@ extern void LDStest();
 
 Owned<IPropertyTree> serverConfig;
 static IArrayOf<ISashaServer> servers;
-static atomic_t StopSuspendCount = ATOMIC_INIT(0);
+static std::atomic<unsigned> StopSuspendCount{0};
 static bool stopped = false;
 static Semaphore stopSem;
 
 const char *sashaProgramName;
 
-CSuspendAutoStop::CSuspendAutoStop() { atomic_inc(&StopSuspendCount); }
-CSuspendAutoStop::~CSuspendAutoStop() { atomic_dec(&StopSuspendCount); }
+CSuspendAutoStop::CSuspendAutoStop() { StopSuspendCount++; }
+CSuspendAutoStop::~CSuspendAutoStop() { StopSuspendCount--; }
 
 #ifdef _CONTAINERIZED
 const char *service = nullptr;
@@ -265,7 +265,7 @@ void SashaMain()
                 stopped = true;
             }
             else if (timeout&&(timeout<msTick()-start)) {
-                if (atomic_read(&StopSuspendCount)==0) {
+                if (StopSuspendCount==0) {
                     PROGLOG("Auto Restart");
                     stopped = true;
                 }

+ 24 - 24
ecl/hthor/hthorkey.cpp

@@ -2871,7 +2871,7 @@ public:
         left = NULL;
         prev = NULL;
         next = NULL;
-        atomic_set(&endMarkersPending,0);
+        endMarkersPending = 0;
         groupStart = NULL;
         matchcount = 0;
     }
@@ -2885,12 +2885,12 @@ public:
         if (_groupStart)
         {
             groupStart = _groupStart;
-            atomic_inc(&_groupStart->endMarkersPending);
+            ++_groupStart->endMarkersPending;
         }
         else
         {
             groupStart = this;
-            atomic_set(&endMarkersPending, 1);
+            endMarkersPending = 1;
         }
         matchcount = 0;
     }
@@ -2912,12 +2912,12 @@ public:
     inline void notePending()
     {
 //      assertex(!complete());
-        atomic_inc(&groupStart->endMarkersPending);
+        ++groupStart->endMarkersPending;
     }
 
     inline bool complete() const
     {
-        return atomic_read(&groupStart->endMarkersPending) == 0;
+        return groupStart->endMarkersPending == 0;
     }
 
     inline bool inGroup(CJoinGroup *leader) const
@@ -2931,7 +2931,7 @@ public:
         //Another completing group could cause this group to be processed once endMarkersPending is set to 0
         //So link this object to ensure it is not disposed of while this function is executing
         Linked<CJoinGroup> saveThis(this);
-        if (atomic_dec_and_test(&groupStart->endMarkersPending))
+        if (--groupStart->endMarkersPending == 0)
         {
             join->onComplete(groupStart);
         }
@@ -2966,7 +2966,7 @@ protected:
     const void *left;
     unsigned matchcount;
     CIArrayOf<MatchSet> matchsets;
-    atomic_t endMarkersPending;
+    std::atomic<unsigned> endMarkersPending;
     IJoinProcessor *join = nullptr;
     mutable CriticalSection crit;
     CJoinGroup *groupStart;
@@ -3439,9 +3439,9 @@ class CHThorKeyedJoinActivity  : public CHThorThreadedActivityBase, implements I
     Owned<JoinGroupPool> pool;
     QueueOf<const void, true> pending;
     CriticalSection statsCrit, imatchCrit, fmatchCrit;
-    atomic_t prefiltered;
-    atomic_t postfiltered;
-    atomic_t skips;
+    RelaxedAtomic<unsigned> prefiltered;
+    RelaxedAtomic<unsigned> postfiltered;
+    RelaxedAtomic<unsigned> skips;
     unsigned seeks;
     unsigned scans;
     unsigned wildseeks;
@@ -3460,9 +3460,9 @@ public:
     CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
         : CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg)
     {
-        atomic_set(&prefiltered, 0);
-        atomic_set(&postfiltered, 0);
-        atomic_set(&skips, 0);
+        prefiltered = 0;
+        postfiltered = 0;
+        skips = 0;
         seeks = 0;
         scans = 0;
         eclKeySize.set(helper.queryIndexRecordSize());
@@ -3674,7 +3674,7 @@ public:
         CriticalBlock proc(fmatchCrit);
         bool ret = helper.fetchMatch(ms->queryJoinGroup()->queryLeft(), right);
         if (!ret)
-            atomic_inc(&postfiltered);
+            ++postfiltered;
         return ret;
     }
 
@@ -3682,7 +3682,7 @@ public:
     {
         bool ret = helper.leftCanMatch(_left);
         if (!ret)
-            atomic_inc(&prefiltered);
+            ++prefiltered;
         return ret;
     }
 
@@ -3802,7 +3802,7 @@ public:
                 }
                 else
                 {
-                    atomic_inc(&skips);
+                    ++skips;
                 }
             }
             else
@@ -3833,7 +3833,7 @@ public:
                             }
                             else
                             {
-                                atomic_inc(&skips);
+                                ++skips;
                             }
                         }
                         catch(IException * e)
@@ -3883,7 +3883,7 @@ public:
                                 }
                                 else
                                 {
-                                    atomic_inc(&skips);
+                                    ++skips;
                                 }
                             }
                             catch(IException * e)
@@ -3926,7 +3926,7 @@ public:
                                 }
                                 else
                                 {
-                                    atomic_inc(&skips);
+                                    ++skips;
                                 }
                             }
                             catch(IException * e)
@@ -3973,7 +3973,7 @@ public:
                         }
                         else
                         {
-                            atomic_inc(&skips);
+                            ++skips;
                         }
                     }
                     catch(IException * e)
@@ -4088,7 +4088,7 @@ public:
         }
         else
         {
-            atomic_inc(&postfiltered);
+            ++postfiltered;
         }
         return false;
     }
@@ -4115,9 +4115,9 @@ public:
     {
         CHThorThreadedActivityBase::updateProgress(progress);
         StatsActivityScope scope(progress, activityId);
-        progress.addStatistic(StNumPreFiltered, atomic_read(&prefiltered));
-        progress.addStatistic(StNumPostFiltered, atomic_read(&postfiltered));
-        progress.addStatistic(StNumIndexSkips, atomic_read(&skips));
+        progress.addStatistic(StNumPreFiltered, prefiltered);
+        progress.addStatistic(StNumPostFiltered, postfiltered);
+        progress.addStatistic(StNumIndexSkips, skips);
         progress.addStatistic(StNumIndexSeeks, seeks);
         progress.addStatistic(StNumIndexScans, scans);
         progress.addStatistic(StNumIndexWildSeeks, wildseeks);

+ 7 - 7
esp/platform/espprotocol.cpp

@@ -28,21 +28,21 @@
 
 typedef IXslProcessor * (*getXslProcessor_func)();
 
-static atomic_t gActiveRequests;
+static RelaxedAtomic<unsigned> gActiveRequests;
 
-long ActiveRequests::getCount()
+unsigned ActiveRequests::getCount()
 {
-    return atomic_read(&gActiveRequests);
+    return gActiveRequests;
 }
 
-void ActiveRequests::inc()
+ActiveRequests::ActiveRequests()
 {
-    atomic_inc(&gActiveRequests);
+    gActiveRequests++;
 }
 
-void ActiveRequests::dec()
+ActiveRequests::~ActiveRequests()
 {
-    atomic_dec(&gActiveRequests);
+    gActiveRequests--;
 }
 
 CEspApplicationPort::CEspApplicationPort(bool viewcfg, CEspProtocol* prot) : viewConfig(viewcfg), rootAuth(false), navWidth(165), navResize(false), navScroll(false), protocol(prot)

+ 4 - 7
esp/platform/espprotocol.hpp

@@ -34,17 +34,14 @@
 #include <map>
 using namespace std;
 
+//A helper class for tracking the number of active requests
 class ActiveRequests
 {
 public:
+    ActiveRequests();
+    ~ActiveRequests();
 
-    ActiveRequests() { inc(); }
-    ~ActiveRequests()  { dec(); }
-
-    void inc();
-    void dec();
-
-    static long getCount();
+    static unsigned getCount();
 };
 
 class CEspBindingEntry : public CInterface, implements IInterface

+ 6 - 6
fs/dafsserver/dafsserver.cpp

@@ -2789,13 +2789,13 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         StructArrayOf<OpenFileInfo> openFiles;
         Owned<IDirectoryIterator> opendir;
         unsigned            lasttick, lastInactiveTick;
-        atomic_t            &globallasttick;
+        std::atomic<unsigned> &globallasttick;
         unsigned            previdx;        // for debug
 
 
         IMPLEMENT_IINTERFACE;
 
-        CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,atomic_t &_globallasttick, bool _calledByRowService)
+        CRemoteClientHandler(CRemoteFileServer *_parent,ISocket *_socket,std::atomic<unsigned> &_globallasttick, bool _calledByRowService)
             : socket(_socket), globallasttick(_globallasttick), calledByRowService(_calledByRowService)
         {
             previdx = (unsigned)-1;
@@ -3073,7 +3073,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
         void touch()
         {
             lastInactiveTick = lasttick = msTick();
-            atomic_set(&globallasttick,lasttick);
+            globallasttick = lasttick;
         }
 
         const char *queryPeerName()
@@ -3415,7 +3415,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
     CAsyncCommandManager asyncCommandManager;
     CThrottler stdCmdThrottler, slowCmdThrottler;
     CClientStatsTable clientStatsTable;
-    atomic_t globallasttick;
+    std::atomic<unsigned> globallasttick;
     unsigned targetActiveThreads;
     Linked<IPropertyTree> keyPairInfo;
 
@@ -3650,7 +3650,7 @@ public:
         stopping = false;
         clientcounttick = msTick();
         closedclients = 0;
-        atomic_set(&globallasttick,msTick());
+        globallasttick = msTick();
     }
 
     ~CRemoteFileServer()
@@ -5517,7 +5517,7 @@ public:
 
     unsigned idleTime()
     {
-        unsigned t = (unsigned)atomic_read(&globallasttick);
+        unsigned t = globallasttick;
         return msTick()-t;
     }
 

+ 4 - 5
roxie/ccd/ccdfile.cpp

@@ -46,7 +46,7 @@
 #include "eclhelper_dyn.hpp"
 #include "rtldynfield.hpp"
 
-atomic_t numFilesOpen[2];
+std::atomic<unsigned> numFilesOpen[2];
 
 #define MAX_READ_RETRIES 2
 
@@ -206,7 +206,7 @@ public:
         {
             if (current.get()==&failure)
                 return;
-            atomic_dec(&numFilesOpen[remote]);
+            numFilesOpen[remote]--;
             mergeStats(fileStats, current);
             current.set(&failure); 
         }
@@ -314,8 +314,7 @@ public:
                 }
             }
             lastAccess = msTick();
-            atomic_inc(&numFilesOpen[remote]);
-            if ((unsigned) atomic_read(&numFilesOpen[remote]) > maxFilesOpen[remote])
+            if (++numFilesOpen[remote] > maxFilesOpen[remote])
                 queryFileCache().closeExpired(remote); // NOTE - this does not actually do the closing of expired files (which could deadlock, or could close the just opened file if we unlocked crit)
         }
     }
@@ -1654,7 +1653,7 @@ public:
         if (!closePending[remote])
         {
             closePending[remote] = true;
-            DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) atomic_read(&numFilesOpen[remote]));
+            DBGLOG("closeExpired %s scheduled - %d files open", remote ? "remote" : "local", (int) numFilesOpen[remote]);
             toClose.signal();
         }
     }

+ 16 - 16
roxie/ccd/ccdserver.cpp

@@ -2160,27 +2160,27 @@ public:
 
 //=====================================================================================================
 
-atomic_t nextInstanceId;
+static std::atomic<unsigned> nextInstanceId;
 
 extern unsigned getNextInstanceId()
 {
-    return atomic_add_exchange(&nextInstanceId, 1)+1;
+    return ++nextInstanceId;
 }
 
-atomic_t nextRuid;
+static std::atomic<unsigned> nextRuid;
 
 ruid_t getNextRuid()
 {
-    ruid_t ret = atomic_add_exchange(&nextRuid, 1)+1;
+    ruid_t ret = ++nextRuid;
     while (ret < RUID_FIRST)
-        ret = atomic_add_exchange(&nextRuid, 1)+1; // ruids 0 and 1 are reserved for pings/unwanted discarder.
+        ret = ++nextRuid; // ruids 0 and 1 are reserved for pings/unwanted discarder.
     return ret;
 }
 
 void setStartRuid(unsigned restarts)
 {
-    atomic_set(&nextRuid, restarts * 0x10000);
-    atomic_set(&nextInstanceId, restarts * 10000);
+    nextRuid = restarts * 0x10000;
+    nextInstanceId = restarts * 10000;
 }
 
 enum { LimitSkipErrorCode = 0, KeyedLimitSkipErrorCode = 1 };
@@ -25175,7 +25175,7 @@ class CJoinGroup : public CInterface
 protected:
     const void *left;                   // LHS row
     PointerArrayOf<KeyedJoinHeader> rows;           // matching RHS rows
-    atomic_t endMarkersPending; // How many agent responses still waiting for
+    std::atomic<unsigned> endMarkersPending; // How many agent responses still waiting for
     CJoinGroup *groupStart;     // Head of group, or NULL if not grouping
     unsigned lastPartNo;
     unsigned pos;
@@ -25209,9 +25209,9 @@ public:
         groupStart = _groupStart;
         if (_groupStart)
         {
-            atomic_inc(&_groupStart->endMarkersPending);
+            ++_groupStart->endMarkersPending;
         }
-        atomic_set(&endMarkersPending, 1);
+        endMarkersPending = 1;
     }
 
     ~CJoinGroup()
@@ -25234,7 +25234,7 @@ public:
 
     inline bool complete() const
     {
-        return atomic_read(&endMarkersPending) == 0;
+        return endMarkersPending == 0;
     }
 
 #ifdef TRACE_JOINGROUPS
@@ -25244,9 +25244,9 @@ public:
 #endif
     {
         assert(!complete());
-        atomic_inc(&endMarkersPending);
+        ++endMarkersPending;
 #ifdef TRACE_JOINGROUPS
-        DBGLOG("CJoinGroup::notePending %p from %d, count became %d group count %d", this, lineNo, atomic_read(&endMarkersPending), groupStart ? atomic_read(&groupStart->endMarkersPending) : 0);
+        DBGLOG("CJoinGroup::notePending %p from %d, count became %d group count %d", this, lineNo, endMarkersPending.load(), groupStart ? groupStart->endMarkersPending.load() : 0);
 #endif
     }
 
@@ -25274,16 +25274,16 @@ public:
             candidates += candidateCount;
         }
 #ifdef TRACE_JOINGROUPS
-        DBGLOG("CJoinGroup::noteEndReceived %p from %d, candidates %d + %d, my count was %d, group count was %d", this, lineNo, candidates, candidateCount, atomic_read(&endMarkersPending), groupStart ? atomic_read(&groupStart->endMarkersPending) : 0);
+        DBGLOG("CJoinGroup::noteEndReceived %p from %d, candidates %d + %d, my count was %d, group count was %d", this, lineNo, candidates, candidateCount, endMarkersPending.load(), groupStart ? groupStart->endMarkersPending.load() : 0);
 #endif
         // NOTE - as soon as endMarkersPending and groupStart->endMarkersPending are decremented to zero this object may get released asynchronously by other threads
         // There must therefore be nothing in this method after them that acceses member variables. Think of it as a delete this...
         // In particular, we can't safely reference groupStart after the dec_and_test of endMarkersPending, hence copy local first 
         CJoinGroup *localGroupStart = groupStart;
-        if (atomic_dec_and_test(&endMarkersPending))
+        if (--endMarkersPending == 0)
         {
             if (localGroupStart)
-                return atomic_dec_and_test(&localGroupStart->endMarkersPending);
+                return --localGroupStart->endMarkersPending == 0;
             else
                 return true;
         }

+ 10 - 10
roxie/ccd/ccdstate.cpp

@@ -1867,8 +1867,8 @@ public:
             daliHelper.setown(connectToDali());
         else
             daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
-        atomic_set(&autoPending, 0);
-        atomic_set(&autoSignalsPending, 0);
+        autoPending = 0;
+        autoSignalsPending = 0;
         forcePending = false;
         pSetsNotifier.setown(daliHelper->getPackageSetsSubscription(this));
         pMapsNotifier.setown(daliHelper->getPackageMapsSubscription(this));
@@ -1891,8 +1891,8 @@ public:
         if (force)
             forcePending = true;    
         if (signal)
-            atomic_inc(&autoSignalsPending);
-        atomic_inc(&autoPending);
+            ++autoSignalsPending;
+        ++autoPending;
         autoReloadTrigger.signal();
         if (signal)
             autoReloadComplete.wait();
@@ -1973,8 +1973,8 @@ private:
 
     Semaphore autoReloadTrigger;
     Semaphore autoReloadComplete;
-    atomic_t autoSignalsPending;
-    atomic_t autoPending;
+    std::atomic<unsigned> autoSignalsPending;
+    std::atomic<unsigned> autoPending;
     bool forcePending;
 
     class AutoReloadThread : public Thread
@@ -1997,12 +1997,12 @@ private:
                 owner.autoReloadTrigger.wait();
                 if (closing)
                     break;
-                unsigned signalsPending = atomic_read(&owner.autoSignalsPending);
+                unsigned signalsPending = owner.autoSignalsPending;
                 if (!signalsPending)
                     Sleep(500); // Typically notifications come in clumps - this avoids reloading too often
-                if (atomic_read(&owner.autoPending))
+                if (owner.autoPending)
                 {
-                    atomic_set(&owner.autoPending, 0);
+                    owner.autoPending = 0;
                     try
                     {
                         owner.reload(owner.forcePending);
@@ -2021,7 +2021,7 @@ private:
                 }
                 if (signalsPending)
                 {
-                    atomic_dec(&owner.autoSignalsPending);
+                    owner.autoSignalsPending--;
                     owner.autoReloadComplete.signal();
                 }
             }

+ 2 - 2
roxie/roxiemem/roxiemem.cpp

@@ -1533,7 +1533,7 @@ public:
             return;
         //The function is to aid testing - it allows the cas code to be tested without a surrounding lock
         //Allocate all possible rows and add them to the free space map.
-        //This is not worth doing in general because it effectively replaces atomic_sets with atomic_cas
+        //This is not worth doing in general because it effectively replaces atomic sets with atomic cas
         //relaxed memory order since there will be no multi-threaded access
         unsigned nextFree = freeBase.load(std::memory_order_relaxed);
         unsigned nextBlock = r_blocks.load(std::memory_order_relaxed);
@@ -4275,7 +4275,7 @@ protected:
     unsigned backgroundReleaseCost;
     bool releaseWhenModifyCallback;
     bool releaseWhenModifyCallbackCritical;
-    volatile bool abortBufferThread;
+    std::atomic_bool abortBufferThread;
 };
 
 //Constants are here to ensure they can all be constant folded

+ 1 - 1
roxie/udplib/udptrs.cpp

@@ -66,7 +66,7 @@ using roxiemem::DataBuffer;
 
  *
  * Data races to watch for
- * 1. Two agent threads add data at same time - only one should sent rts (use atomic_inc for the count)
+ * 1. Two agent threads add data at same time - only one should sent rts (use atomic inc for the count)
  * 2. We check for timeout and resend rts or fail just as permission comes in
  *    - resend rts is harmless ?
  *    - fail is acceptable

+ 6 - 2
rtl/eclrtl/eclrtl.cpp

@@ -6272,10 +6272,14 @@ void rtlBase64Decode(size32_t & tlen, void * & tgt, size32_t slen, const char *
 
 //---------------------------------------------------------------------------
 
-void RtlCInterface::Link() const            { atomic_inc(&xxcount); }
+void RtlCInterface::Link() const
+{
+    xxcount++;
+}
+
 bool RtlCInterface::Release(void) const
 {
-    if (atomic_dec_and_test(&xxcount))
+    if (--xxcount == 0)
     {
         delete this;
         return true;

+ 1 - 13
rtl/eclrtl/eclrtl_imp.hpp

@@ -186,28 +186,16 @@ private:
     IUStrRegExprFindInstance * instance;
 };
 
-//Code is cloned from jiface.hpp + split into two to avoid including too much in generated code.
-#ifdef _WIN32
-typedef volatile long atomic_t;
-#define atomic_set(v,i) ((*v) = (i))
-#else
-#ifndef atomic_set
-typedef struct { volatile int counter; } atomic_t;
-#define atomic_set(v,i) (((v)->counter) = (i))
-#endif
-#endif
-
 class ECLRTL_API RtlCInterface
 {
 public:
-             RtlCInterface()        { atomic_set(&xxcount, 1); }
     virtual ~RtlCInterface()        { }
 //interface IInterface:
     void    Link() const;
     bool    Release(void) const;
 
 private:
-    mutable atomic_t    xxcount;
+    mutable std::atomic<unsigned> xxcount{1};
 };
 
 

+ 1 - 181
system/jlib/jatomic.hpp

@@ -155,193 +155,13 @@ bool compare_exchange_efficient(x & value, decltype(value.load()) expected, decl
 
 #ifdef _WIN32
 
-#include <intrin.h>
-
-extern "C"
-{
-   LONG  __cdecl _InterlockedIncrement(LONG volatile *Addend);
-   LONG  __cdecl _InterlockedDecrement(LONG volatile *Addend);
-   LONG  __cdecl _InterlockedCompareExchange(LONG volatile * Dest, LONG Exchange, LONG Comp);
-}
-
-#pragma intrinsic (_InterlockedCompareExchange)
-#define InterlockedCompareExchange _InterlockedCompareExchange
-#pragma intrinsic (_InterlockedIncrement)
-#define InterlockedIncrement _InterlockedIncrement
-#pragma intrinsic (_InterlockedDecrement)
-#define InterlockedDecrement _InterlockedDecrement
-#pragma intrinsic (_InterlockedExchangeAdd)
-#define InterlockedExchangeAdd _InterlockedExchangeAdd
-
-typedef volatile long atomic_t;
-#define ATOMIC_INIT(i)                  (i)
-#define atomic_inc(v)                   InterlockedIncrement(v)
-#define atomic_inc_and_test(v)          (InterlockedIncrement(v) == 0)
-#define atomic_dec(v)                   InterlockedDecrement(v)
-#define atomic_dec_and_test(v)          (InterlockedDecrement(v) == 0)
-#define atomic_dec_and_read(v)           InterlockedDecrement(v)
-#define atomic_read(v)                  (*v)
-#define atomic_set(v,i)                 ((*v) = (i))
-#define atomic_xchg(i, v)               InterlockedExchange(v, i)
-#define atomic_add(v,i)                 InterlockedExchangeAdd(v,i)
-#define atomic_add_and_read(v,i)        InterlockedAdd(v,i)
-#define atomic_add_exchange(v, i)       InterlockedExchangeAdd(v,i)
-#define atomic_xchg_ptr(p, v)           InterlockedExchangePointer(v,p)
-#if defined (_MSC_VER) && (_MSC_VER <= 1200)
-#define atomic_cas(v,newvalue,expectedvalue)    (InterlockedCompareExchange((PVOID *)(v),(PVOID)(long)(newvalue),(PVOID)(long)(expectedvalue))==(PVOID)(long)(expectedvalue))
-#define atomic_cas_ptr(v, newvalue,expectedvalue)       atomic_cas(v,(long)newvalue,(long)expectedvalue)
-#else
-#define atomic_cas(v,newvalue,expectedvalue)    (InterlockedCompareExchange(v,newvalue,expectedvalue)==expectedvalue)
-#define atomic_cas_ptr(v, newvalue,expectedvalue)       (InterlockedCompareExchangePointer(v,newvalue,expectedvalue)==expectedvalue)
-#endif
-
 //Used to prevent a compiler reordering volatile and non-volatile loads/stores
 #define compiler_memory_barrier()           _ReadWriteBarrier()
 
-#define atomic_acquire(v)               atomic_cas(v, 1, 0)
-#define atomic_release(v)               { compiler_memory_barrier(); atomic_set(v, 0); }
-
-#elif defined(__GNUC__)
-
-typedef struct { volatile int counter; } atomic_t;
-#define ATOMIC_INIT(i)          { (i) }
-#define atomic_read(v)          ((v)->counter)
-#define atomic_set(v,i)         (((v)->counter) = (i))
-
-static __inline__ bool atomic_dec_and_test(atomic_t *v)
-{   
-    // returns (--*v==0)
-    return (__sync_add_and_fetch(&v->counter,-1)==0);
-}
-
-static __inline__ bool atomic_inc_and_test(atomic_t *v)
-{
-    // returns (++*v==0)
-    return (__sync_add_and_fetch(&v->counter,1)==0);
-}
-
-static __inline__ void atomic_inc(atomic_t *v)
-{
-    // (*v)++
-    __sync_add_and_fetch(&v->counter,1);
-}
-
-static __inline__ void atomic_dec(atomic_t *v)
-{
-    // (*v)--
-    __sync_add_and_fetch(&v->counter,-1);
-}
-
-static __inline__ int atomic_dec_and_read(atomic_t *v)
-{
-    // (*v)--, return *v;
-    return __sync_add_and_fetch(&v->counter,-1);
-}
-
-static __inline__ int atomic_xchg(int i, atomic_t *v)
-{
-    // int ret = *v; *v = i; return v;
-    return __sync_lock_test_and_set(&v->counter,i);  // actually an xchg
-}
-
-
-
-static __inline__ void atomic_add(atomic_t *v,int i)
-{
-    // (*v) += i;
-    __sync_add_and_fetch(&v->counter,i);
-}
-
-static __inline__ int atomic_add_and_read(atomic_t *v,int i)
-{
-    // (*v) += i; return *v;
-    return __sync_add_and_fetch(&v->counter,i);
-}
-
-static __inline__ int atomic_add_exchange(atomic_t *v,int i)
-{
-    // int ret = *v; (*v) += i; return ret;
-    return __sync_fetch_and_add(&v->counter,i);
-}
-
-static __inline__ bool atomic_cas(atomic_t *v,int newvalue, int expectedvalue)
-{
-    // bool ret = (*v==expectedvalue); if (ret) *v = newvalue; return ret;
-    return __sync_bool_compare_and_swap(&v->counter, expectedvalue, newvalue);
-}
-
-static __inline__ void * atomic_xchg_ptr(void *p, void **v)
-{
-    // void * ret = *v; (*v) = p; return ret;
-    return (void *)__sync_lock_test_and_set((memsize_t *)v,(memsize_t)p);
-}
-
-static __inline__ bool atomic_cas_ptr(void **v,void *newvalue, void *expectedvalue)
-{
-    // bool ret = (*v==expectedvalue); if (ret) *v = newvalue; return ret;
-    return __sync_bool_compare_and_swap((memsize_t *)v, (memsize_t)expectedvalue, (memsize_t)newvalue);
-}
-
-#define compiler_memory_barrier() asm volatile("": : :"memory")
-
-static __inline__ bool atomic_acquire(atomic_t *v)
-{
-#if defined(_ARCH_X86_64_) || defined(_ARCH_X86_)
-    //For some reason gcc targeting x86 generates code for atomic_cas() that requires fewer registers
-    return atomic_cas(v, 1, 0);
 #else
-    return __sync_lock_test_and_set(&v->counter, 1) == 0;
-#endif
-}
 
-static __inline__ void atomic_release(atomic_t *v)
-{
-#if defined(_ARCH_X86_64_) || defined(_ARCH_X86_)
-    //x86 has a strong memory model, so the following code is sufficient, and some older gcc compilers generate
-    //an unnecessary mfence instruction, so for x86 use the following which generates better code.
-    compiler_memory_barrier();
-    atomic_set(v, 0);
-#else
-    __sync_lock_release(&v->counter);
-#endif
-}
-
-#else // other unix
-
-//Truely awful implementations of atomic operations...
-typedef volatile int atomic_t;
-int jlib_decl poor_atomic_dec_and_read(atomic_t * v);
-bool jlib_decl poor_atomic_inc_and_test(atomic_t * v);
-int jlib_decl poor_atomic_xchg(int i, atomic_t * v);
-void jlib_decl poor_atomic_add(atomic_t * v, int i);
-int jlib_decl poor_atomic_add_and_read(atomic_t * v, int i);
-int jlib_decl poor_atomic_add_exchange(atomic_t * v, int i);
-bool jlib_decl poor_atomic_cas(atomic_t * v, int newvalue, int expectedvalue);
-void jlib_decl *poor_atomic_xchg_ptr(void *p, void **v);
-bool   jlib_decl poor_atomic_cas_ptr(void ** v, void *newvalue, void *expectedvalue);
-void jlib_decl poor_compiler_memory_barrier();
-
-#define ATOMIC_INIT(i)                  (i)
-#define atomic_inc(v)                   (void)poor_atomic_inc_and_test(v)
-#define atomic_inc_and_test(v)          poor_atomic_inc_and_test(v)
-#define atomic_dec(v)                   (void)poor_atomic_dec_and_read(v)
-#define atomic_dec_and_read(v)          poor_atomic_dec_and_read(v)
-#define atomic_dec_and_test(v)          (poor_atomic_dec_and_read(v)==0)
-#define atomic_read(v)                  (*v)
-#define atomic_set(v,i)                 ((*v) = (i))
-#define atomic_xchg(i, v)               poor_atomic_xchg(i, v)
-#define atomic_add(v,i)                 poor_atomic_add(v, i)
-#define atomic_add_and_read(v,i)        poor_atomic_add_and_read(v, i)
-#define atomic_add_exchange(v, i)       poor_atomic_add_exchange(v, i)
-#define atomic_cas(v,newvalue,expectedvalue)    poor_atomic_cas(v,newvalue,expectedvalue)
-#define atomic_xchg_ptr(p, v)               poor_atomic_xchg_ptr(p, v)
-#define atomic_cas_ptr(v,newvalue,expectedvalue)    poor_atomic_cas_ptr(v,newvalue,expectedvalue)
-#define compiler_memory_barrier()       poor_compiler_memory_barrier()
-
-#define atomic_acquire(v)               atomic_cas(v, 1, 0)
-#define atomic_release(v)               { compiler_memory_barrier(); atomic_set(v, 0); }
+#define compiler_memory_barrier() asm volatile("": : :"memory")
 
 #endif
 
-
 #endif

+ 0 - 109
system/jlib/jiface.cpp

@@ -32,112 +32,3 @@ bool CInterface::Release() const
 {
     return CInterfaceOf<CEmptyClass>::Release();
 }
-
-//===========================================================================
-
-#if !defined(_WIN32) && !defined(__GNUC__)
-
-static CriticalSection *ICrit;
-
-MODULE_INIT(INIT_PRIORITY_JIFACE)
-{
-    ICrit = new CriticalSection();
-    return true;
-}
-MODULE_EXIT()
-{
-//  delete ICrit;  - need to make sure this is deleted after anything that uses it
-}
-
-int poor_atomic_dec_and_read(atomic_t * v)
-{
-    ICrit->enter();
-    int ret = --(*v);
-    ICrit->leave();
-    return ret;
-}
-
-bool poor_atomic_inc_and_test(atomic_t * v)
-{
-    ICrit->enter();
-    bool ret = (++(*v) == 0);
-    ICrit->leave();
-    return ret;
-}
-
-int poor_atomic_xchg(int i, atomic_t * v)
-{
-    ICrit->enter();
-    int prev = (*v);
-    (*v)=i;
-    ICrit->leave();
-    return prev;
-}
-
-void poor_atomic_add(atomic_t * v, int i)
-{
-    ICrit->enter();
-    (*v) += i;
-    ICrit->leave();
-}
-
-int poor_atomic_add_and_read(atomic_t * v, int i)
-{
-    ICrit->enter();
-    (*v) += i;
-    int ret = (*v);
-    ICrit->leave();
-    return ret;
-}
-
-int poor_atomic_add_exchange(atomic_t * v, int i)       
-{
-    ICrit->enter();
-    int prev = (*v);
-    (*v)=prev+i;
-    ICrit->leave();
-    return prev;
-}
-
-bool poor_atomic_cas(atomic_t * v, int newvalue, int expectedvalue)
-{
-    ICrit->enter();
-    bool ret = false;
-    if ((*v)==expectedvalue)
-    {
-        *v=newvalue;
-        ret = true;
-    }
-    ICrit->leave();
-    return ret;
-}
-
-void *poor_atomic_xchg_ptr(void *p, void** v)
-{
-    ICrit->enter();
-    void * prev = (*v);
-    (*v)=p;
-    ICrit->leave();
-    return prev;
-}
-
-bool poor_atomic_cas_ptr(void ** v, void * newvalue, void * expectedvalue)
-{
-    ICrit->enter();
-    bool ret = false;
-    if ((*v)==expectedvalue)
-    {
-        *v=newvalue;
-        ret = true;
-    }
-    ICrit->leave();
-    return ret;
-}
-
-//Hopefully the function call will be enough to stop the compiler reordering any operations
-void poor_compiler_memory_barrier()
-{
-}
-
-
-#endif

+ 0 - 14
system/jlib/jmutex.cpp

@@ -466,20 +466,6 @@ void ThreadYield()
 #endif
 }
 
-void spinUntilReady(atomic_t &value)
-{
-    unsigned i = 0;
-    const unsigned maxSpins = 10;
-    while (atomic_read(&value))
-    {
-        if (i++ == maxSpins)
-        {
-            i = 0;
-            ThreadYield();
-        }
-    }
-}
-
 void spinUntilReady(std::atomic_uint &value)
 {
     unsigned i = 0;

+ 0 - 1
system/jlib/jmutex.hpp

@@ -26,7 +26,6 @@
 #include "jsem.hpp"
 
 extern jlib_decl void ThreadYield();
-extern jlib_decl void spinUntilReady(atomic_t &value);
 extern jlib_decl void spinUntilReady(std::atomic_uint &value);
 
 

+ 10 - 10
system/mp/mpcomm.cpp

@@ -789,7 +789,7 @@ class CMPChannel: public CInterface
     CriticalSection attachsect;
     unsigned __int64 attachaddrval = 0;
     SocketEndpoint attachep, attachPeerEp;
-    atomic_t attachchk;
+    std::atomic<unsigned> attachchk;
 
 protected: friend class CMPServer;
     SocketEndpoint remoteep;
@@ -888,9 +888,9 @@ protected: friend class CMPPacketReader;
                     {
                         CriticalBlock block(attachsect);
 #ifdef _TRACE
-                        PROGLOG("MP: connect got attachsect, attachchk = %d, loopCnt = %u", atomic_read(&attachchk), loopCnt);
+                        PROGLOG("MP: connect got attachsect, attachchk = %d, loopCnt = %u", attachchk.load(), loopCnt);
 #endif
-                        if (atomic_read(&attachchk) > 0)
+                        if (attachchk > 0)
                         {
                             if (remoteep.equals(attachep))
                             {
@@ -1237,7 +1237,7 @@ public:
                 attachaddrval = 0;
                 attachep.set(nullptr);
                 attachPeerEp.set(nullptr);
-                atomic_set(&attachchk, 0);
+                attachchk = 0;
             }
             if (!keepsocket) {
                 try {
@@ -1684,7 +1684,7 @@ CMPChannel::CMPChannel(CMPServer *_parent,SocketEndpoint &_remoteep) : parent(_p
     localep.set(parent->getPort());
     reader = new CMPPacketReader(this);
     attachep.set(nullptr);
-    atomic_set(&attachchk, 0);
+    attachchk = 0;
     lastxfer = msTick();
 }
 
@@ -1702,7 +1702,7 @@ void CMPChannel::reset()
     attachaddrval = 0;
     attachep.set(nullptr);
     attachPeerEp.set(nullptr);
-    atomic_set(&attachchk, 0);
+    attachchk = 0;
     lastxfer = msTick();
 }
 
@@ -1718,9 +1718,9 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,c
 {
     struct attachdTor
     {
-        atomic_t &attchk;
-        attachdTor(atomic_t &_attchk) : attchk(_attchk) { }
-        ~attachdTor() { atomic_dec(&attchk); }
+        std::atomic<unsigned> &attchk;
+        attachdTor(std::atomic<unsigned> &_attchk) : attchk(_attchk) { }
+        ~attachdTor() { --attchk; }
     } attachChk (attachchk);
 
 #ifdef _FULLTRACE       
@@ -1733,7 +1733,7 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,c
         attachep = _remoteep;
         if (newsock)
             newsock->getPeerEndpoint(attachPeerEp);
-        atomic_inc(&attachchk);
+        ++attachchk;
     }
 
     CriticalBlock block(connectsect);

+ 13 - 13
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -480,8 +480,8 @@ protected:
         bool senderFull, doDedup, aborted, initialized;
         Semaphore senderFullSem;
         Linked<IException> exception;
-        atomic_t numFinished;
-        atomic_t stoppedTargets;
+        std::atomic<unsigned> numFinished;
+        std::atomic<unsigned> stoppedTargets;
         unsigned dedupSamples, dedupSuccesses, self;
         Owned<IThreadPool> writerPool;
         unsigned totalActiveWriters;
@@ -492,8 +492,8 @@ protected:
         {
             totalSz = 0;
             senderFull = false;
-            atomic_set(&numFinished, 0);
-            atomic_set(&stoppedTargets, 0);
+            numFinished = 0;
+            stoppedTargets = 0;
             dedupSamples = dedupSuccesses = 0;
             doDedup = owner.doDedup;
             writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
@@ -528,8 +528,8 @@ protected:
                 sendersFinished[dest] = false;
             totalSz = 0;
             senderFull = false;
-            atomic_set(&numFinished, 0);
-            atomic_set(&stoppedTargets, 0);
+            numFinished = 0;
+            stoppedTargets = 0;
             aborted = false;
         }
         unsigned queryInactiveWriters() const
@@ -599,7 +599,7 @@ protected:
             if (owner.sendBlock(target, msg))
                 return;
             markStopped(target); // Probably a bit pointless if target is 'self' - process loop will have done already
-            ::ActPrintLog(owner.activity, thorDetailedLogLevel, "CSender::sendBlock stopped slave %d (finished=%d)", target+1, atomic_read(&numFinished));
+            ::ActPrintLog(owner.activity, thorDetailedLogLevel, "CSender::sendBlock stopped slave %d (finished=%d)", target+1, numFinished.load());
         }
         void closeWrite()
         {
@@ -680,9 +680,9 @@ protected:
         void checkSendersFinished()
         {
             // check if any target has stopped and clear out partial now defunct buckets taking space.
-            if (atomic_read(&stoppedTargets) == 0) // cheap compared to atomic_xchg, so saves a few cycles in common case.
+            if (stoppedTargets == 0) // cheap compared to atomic_xchg, so saves a few cycles in common case.
                return;
-            int numStopped = atomic_xchg(0, &stoppedTargets);
+            int numStopped = stoppedTargets.exchange(0);
             if (numStopped)
             {
                 /* this will be infrequent, scan all.
@@ -714,7 +714,7 @@ protected:
             rowcount_t totalSent = 0;
             try
             {
-                while (!aborted && (unsigned)atomic_read(&numFinished) < owner.numnodes)
+                while (!aborted && numFinished < owner.numnodes)
                 {
                     while (queryTotalSz() >= owner.inputBufferSize)
                     {
@@ -817,7 +817,7 @@ protected:
                         for (;;)
                         {
                             if (timer.elapsedCycles() >= queryOneSecCycles()*10)
-                                owner.ActPrintLog("HD sender, waiting for space, inactive writers = %d, totalSz = %d, numFinished = %d", queryInactiveWriters(), queryTotalSz(), atomic_read(&numFinished));
+                                owner.ActPrintLog("HD sender, waiting for space, inactive writers = %d, totalSz = %d, numFinished = %d", queryInactiveWriters(), queryTotalSz(), numFinished.load());
                             timer.reset();
 
                             if (senderFullSem.wait(10000))
@@ -905,8 +905,8 @@ protected:
         {
             if (queryMarkSenderFinished(target))
             {
-                atomic_inc(&numFinished);
-                atomic_inc(&stoppedTargets);
+                ++numFinished;
+                ++stoppedTargets;
             }
         }
         void markSelfStopped() { markStopped(self); }

+ 7 - 7
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp

@@ -97,7 +97,7 @@ interface IJoinProcessor
     virtual void onComplete(CJoinGroup * jg) = 0;
     virtual bool leftCanMatch(const void *_left) = 0;
 #ifdef TRACE_USAGE
-     virtual atomic_t &getdebug(unsigned w) = 0;
+     virtual std::atomic<unsigned> &getdebug(unsigned w) = 0;
 #endif
      virtual CActivityBase *queryOwner() = 0;
 };
@@ -173,7 +173,7 @@ public:
     CJoinGroup(CActivityBase &_activity, const void *_left, IJoinProcessor *_join, CJoinGroup *_groupStart) : activity(_activity), join(_join), rows(_activity, NULL)
     {
 #ifdef TRACE_USAGE
-        atomic_inc(&join->getdebug(0));
+        ++join->getdebug(0);
 #endif
 #ifdef TRACE_JOINGROUPS
         ActPrintLog(join->queryOwner(), "Creating joinGroup %x, groupstart %x", this, _groupStart);
@@ -201,7 +201,7 @@ public:
     {
 #ifdef TRACE_USAGE
         if (groupStart)
-            atomic_dec(&join->getdebug(0));
+            --join->getdebug(0);
 #endif
 #ifdef TRACE_JOINGROUPS
         if (groupStart)
@@ -570,7 +570,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
     MemoryBuffer rawFetchMb;
 
 #ifdef TRACE_USAGE
-    atomic_t debugats[10];
+    std::atomic<unsigned> debugats[10];
     unsigned lastTick;
 #endif
 
@@ -1670,7 +1670,7 @@ public:
 #ifdef TRACE_USAGE
         unsigned it=0;
         for (; it<10; it++)
-            atomic_set(&debugats[it],0);
+            debugats[it] = 0;
         lastTick = 0;
 #endif
         helper = (IHThorKeyedJoinArg *)queryHelper();
@@ -1703,7 +1703,7 @@ public:
     {
         StringBuffer s;
         { CriticalBlock b(onCompleteCrit);
-            s.appendf("CJoinGroups=%d, doneGroups=%d, ",atomic_read(&debugats[0]), doneGroups.ordinality());
+            s.appendf("CJoinGroups=%d, doneGroups=%d, ",debugats[0].load(), doneGroups.ordinality());
             pool->getStats(s);
         }
         ActPrintLog(s.str());
@@ -1811,7 +1811,7 @@ public:
     virtual bool leftCanMatch(const void *_left) { UNIMPLEMENTED; return false; }
 
 #ifdef TRACE_USAGE
-    virtual atomic_t &getdebug(unsigned w)
+    virtual std::atomic<unsigned> &getdebug(unsigned w)
     {
         return debugats[w];
     }

+ 5 - 4
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2237,7 +2237,7 @@ protected:
             IChannelDistributor **channelDistributors;
             unsigned nextSpillChannel;
             CriticalSection crit;
-            atomic_t spilt;
+            std::atomic<unsigned> spilt;
         public:
             CChannelDistributor(CLookupJoinActivityBase &_owner, ICompare *cmp) : owner(_owner)
             {
@@ -2247,7 +2247,7 @@ protected:
                 channelDistributors = ((CLookupJoinActivityBase *)owner.channels[0])->channelDistributors;
                 channelDistributors[owner.queryJobChannelNumber()] = this;
                 nextSpillChannel = 0;
-                atomic_set(&spilt, 0);
+                spilt = 0;
                 //NB: all channels will have done this, before rows are added
             }
 #define HPCC_17331 // Whilst under investigation. Should be solved by fix for HPCC-21091
@@ -2342,7 +2342,8 @@ protected:
         // IChannelDistributor impl.
             virtual void putRow(const void *row)
             {
-                if (atomic_cas(&spilt, 0, 1))
+                unsigned expected = 1;
+                if (spilt.compare_exchange_strong(expected, 0))
                 {
                     StringBuffer traceInfo;
                     if (channelCollector->shrink(&traceInfo)) // grab back some valuable table array space
@@ -2354,7 +2355,7 @@ protected:
             {
                 if (!channelCollector->spill(critical))
                     return false;
-                atomic_set(&spilt, 1);
+                spilt = 1;
                 return true;
             }
             virtual roxiemem::IBufferedRowCallback *queryCallback() { return this; }

+ 5 - 5
thorlcr/master/thgraphmanager.cpp

@@ -59,7 +59,7 @@ class CJobManager : public CSimpleInterface, implements IJobManager, implements
     Owned<IException> exitException;
 
     Owned<IDeMonServer> demonServer;
-    atomic_t            activeTasks;
+    std::atomic<unsigned> activeTasks;
     StringAttr          currentWuid;
     ILogMsgHandler *logHandler;
 
@@ -259,7 +259,7 @@ CJobManager::CJobManager(ILogMsgHandler *_logHandler) : logHandler(_logHandler)
         demonServer.setown(createDeMonServer());
     else
         globals->setPropBool("@watchdogProgressEnabled", false);
-    atomic_set(&activeTasks, 0);
+    activeTasks = 0;
     setJobManager(this);
     debugListener.setown(new CThorDebugListener(*this));
 }
@@ -973,9 +973,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     {
         struct CounterBlock
         {
-            atomic_t &counter;
-            CounterBlock(atomic_t &_counter) : counter(_counter) { atomic_inc(&counter); }
-            ~CounterBlock() { atomic_dec(&counter); }
+            std::atomic<unsigned> &counter;
+            CounterBlock(std::atomic<unsigned> &_counter) : counter(_counter) { ++counter; }
+            ~CounterBlock() { --counter; }
         } cBlock(activeTasks);
 
         {

+ 5 - 5
thorlcr/slave/traceslave.hpp

@@ -156,14 +156,14 @@ private:
     const unsigned traceQueueSize;
     CTraceQueue buffers[2];
     CTraceQueue rowBufferLogCache;
-    atomic_t rowBufInUse;
+    std::atomic<unsigned> rowBufInUse;
     IThorDataLink *thorDataLink;
     IEngineRowStream *inputStream;
     IHThorArg *helper;
 
     inline void enqueueRowForTrace(const void *row)
     {
-        buffers[atomic_read(&rowBufInUse)].enqueue(row, thorDataLink->queryEndCycles());
+        buffers[rowBufInUse].enqueue(row, thorDataLink->queryEndCycles());
     }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IEngineRowStream>);
@@ -173,7 +173,7 @@ public:
         // NOTE - cannot be called by more than one thread
         buffers[1].init(traceQueueSize+1);
         rowBufferLogCache.init(traceQueueSize);
-        int bufToDump = atomic_read(&rowBufInUse);
+        int bufToDump = rowBufInUse;
         int inactiveBuf = 1 - bufToDump;
 
         // Queue any remaining rows from inactive buffer as the oldest row is skipped
@@ -181,7 +181,7 @@ public:
         // may have been formerly an active buffer)
         buffers[inactiveBuf].queueOut(rowBufferLogCache, false);
 
-        atomic_set(&rowBufInUse, inactiveBuf);// Swap Active & inactiveBuf
+        rowBufInUse = inactiveBuf;// Swap Active & inactiveBuf
         buffers[bufToDump].queueOut(rowBufferLogCache, true);
         rowBufferLogCache.dump(mb, helper);
     }
@@ -202,7 +202,7 @@ public:
     CTracingStream(IThorDataLink *_thorDataLink, IEngineRowStream *_inputStream, IHThorArg *_helper, unsigned _traceQueueSize)
         : thorDataLink(_thorDataLink), inputStream(_inputStream), helper(_helper), traceQueueSize(_traceQueueSize)
     {
-        atomic_set(&rowBufInUse, 0);
+        rowBufInUse = 0;
         buffers[0].init(traceQueueSize+1);
     }
 };