Explorar el Código

HPCC-11367 review and clean up use of spinlocks

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday hace 9 años
padre
commit
875bbd39f6

+ 5 - 5
common/remote/hooks/libarchive/archive.cpp

@@ -46,13 +46,13 @@
 #endif
 
 static RegExpr *signature;
-static SpinLock *lock;
+static CriticalSection *lock;
 
 static const char *splitName(const char *fileName)
 {
     if (!fileName)
         return NULL;
-    SpinBlock b(*lock);
+    CriticalBlock b(*lock);
     const char *sig = signature->find(fileName);
     if (sig)
         return sig+signature->findlen();
@@ -608,7 +608,7 @@ protected:
 
 extern ARCHIVEFILE_API void installFileHook()
 {
-    SpinBlock b(*lock); // Probably overkill!
+    CriticalBlock b(*lock); // Probably overkill!
     if (!archiveFileHook)
     {
         archiveFileHook = new CArchiveFileHook;
@@ -620,7 +620,7 @@ extern ARCHIVEFILE_API void removeFileHook()
 {
     if (lock)
     {
-        SpinBlock b(*lock); // Probably overkill!
+        CriticalBlock b(*lock); // Probably overkill!
         if (archiveFileHook)
         {
             removeContainedFileHook(archiveFileHook);
@@ -632,7 +632,7 @@ extern ARCHIVEFILE_API void removeFileHook()
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
-    lock = new SpinLock;
+    lock = new CriticalSection;
     signature = new RegExpr(ARCHIVE_SIGNATURE);
     archiveFileHook = NULL;
     return true;

+ 1 - 2
common/thorhelper/roxiedebug.cpp

@@ -711,8 +711,7 @@ void CBreakpointInfo::serialize(MemoryBuffer &to) const
     }
 }
 
-SpinLock CBreakpointInfo::UIDlock;
-unsigned CBreakpointInfo::nextUIDvalue;
+std::atomic<unsigned> CBreakpointInfo::nextUIDvalue;
 
 unsigned CBreakpointInfo::queryUID() const
 {

+ 1 - 3
common/thorhelper/roxiedebug.hpp

@@ -137,11 +137,9 @@ private:
 
     Owned<IRowMatcher> rowMatcher;
 
-    static SpinLock UIDlock;
-    static unsigned nextUIDvalue;
+    static std::atomic<unsigned> nextUIDvalue;
     static inline unsigned nextUID()
     {
-        SpinBlock b(UIDlock);
         return ++nextUIDvalue;
     }
 

+ 1 - 0
ecl/hql/hqlattr.cpp

@@ -3779,6 +3779,7 @@ ITypeInfo * setStreamedAttr(ITypeInfo * _type, bool setValue)
 
 IInterface * CHqlExpression::queryExistingProperty(ExprPropKind propKind) const
 {
+    //If this was used significantly in a multi threaded environment then reduce the work in the spinblock
     SpinBlock block(*propertyLock);
     CHqlDynamicProperty * cur = attributes;
     while (cur)

+ 4 - 6
ecl/hql/hqlexpr.hpp

@@ -76,17 +76,15 @@ public:
 class HQL_API UniqueSequenceCounter
 {
 public:
-    inline UniqueSequenceCounter() { value = 0; }
+    inline UniqueSequenceCounter() : value(1) {}
 
     inline unique_id_t next() 
-    { 
-        SpinBlock block(lock);
-        return ++value; 
+    {
+        return value.fetch_add(1, std::memory_order_relaxed);
     }
 
 protected:
-    unique_id_t value;
-    SpinLock lock;
+    std::atomic<unique_id_t> value;
 };
 
 

+ 0 - 1
roxie/ccd/ccd.hpp

@@ -61,7 +61,6 @@ void joinMulticastChannel(unsigned channel);
 extern unsigned channels[MAX_CLUSTER_SIZE];     // list of all channel numbers for this node
 extern unsigned channelCount;                   // number of channels this node is doing
 extern unsigned subChannels[MAX_CLUSTER_SIZE];  // maps channel numbers to subChannels for this node
-extern bool suspendedChannels[MAX_CLUSTER_SIZE];// indicates suspended channels for this node
 extern unsigned numSlaves[MAX_CLUSTER_SIZE];    // number of slaves listening on this channel
 extern unsigned replicationLevel[MAX_CLUSTER_SIZE];  // Which copy of the data this channel uses on this slave
 

+ 1 - 0
roxie/ccd/ccdlistener.cpp

@@ -554,6 +554,7 @@ public:
     bool match(IpAddress &peer, const char *query, bool isBlind, bool &access, StringBuffer &errMsg, int &errCode)
     {
         {
+            //MORE: This could use a regex class that is thread safe and remove this spin lock
             SpinBlock b(crappyUnsafeRegexLock);
             if (!queries.find(query))
                 return false;

+ 2 - 2
roxie/ccd/ccdqueue.cpp

@@ -43,8 +43,8 @@ unsigned numSlaves[MAX_CLUSTER_SIZE];
 unsigned replicationLevel[MAX_CLUSTER_SIZE];
 unsigned IBYTIDelays[MAX_CLUSTER_SIZE]; // MORE: this will cover only 2 slaves per channel, change to cover all. 
 
-SpinLock suspendCrit;
-bool suspendedChannels[MAX_CLUSTER_SIZE];
+static SpinLock suspendCrit; // MORE: Could remove this, and replace the following with an atomic boolean array.
+static bool suspendedChannels[MAX_CLUSTER_SIZE];
 
 using roxiemem::OwnedRoxieRow;
 using roxiemem::OwnedConstRoxieRow;

+ 2 - 3
roxie/ccd/ccdsnmp.cpp

@@ -852,7 +852,7 @@ class CQueryStatsAggregator : public CInterface, implements IQueryStatsAggregato
     CIArrayOf<QueryStatsAggregateRecord> aggregated; // stored with most recent first
     unsigned expirySeconds;  // time to keep exact info (rather than just aggregated)
     StringAttr queryName;
-    SpinLock lock;
+    SpinLock lock; // MORE: This could be held this for a while.  Is this significant?  Should it be a CriticalSection?
 
     QueryStatsAggregateRecord &findAggregate(time_t startTime)
     {
@@ -908,7 +908,6 @@ public:
     CQueryStatsAggregator(const char *_queryName, unsigned _expirySeconds)
         : queryName(_queryName)
     {
-        SpinBlock b(queryStatsCrit);
         expirySeconds = _expirySeconds;
         queryStatsAggregators.append(*LINK(this));
     }
@@ -1010,7 +1009,7 @@ public:
 
 CIArrayOf<CQueryStatsAggregator> CQueryStatsAggregator::queryStatsAggregators;
 CQueryStatsAggregator CQueryStatsAggregator::globalStatsAggregator(NULL, SLOT_LENGTH);
-SpinLock CQueryStatsAggregator::queryStatsCrit;
+SpinLock CQueryStatsAggregator::queryStatsCrit; //MORE: Should probably be a critical section
 
 IQueryStatsAggregator *queryGlobalQueryStatsAggregator()
 {

+ 1 - 1
roxie/roxiemem/roxiemem.cpp

@@ -3704,7 +3704,7 @@ public:
         Owned<IActivityMemoryUsageMap> map = getActivityUsage();
 
         NonReentrantSpinBlock block(peakSpinLock);
-        peakUsageMap.setown(map.getClear());
+        peakUsageMap.swap(map);
     }
 
     //MORE: inline??

+ 1 - 1
system/jhtree/ctfile.cpp

@@ -530,7 +530,7 @@ unsigned __int64 CJHTreeNode::totalAllocatedCurrent;
 unsigned __int64 CJHTreeNode::totalAllocatedEver;
 unsigned CJHTreeNode::countAllocationsCurrent;
 unsigned CJHTreeNode::countAllocationsEver;
-SpinLock CJHTreeNode::spin;
+SpinLock CJHTreeNode::spin; // MORE: Could replace with atomic operations, but since 4 of them it may be less efficient when uncontested
 
 char *CJHTreeNode::expandKeys(void *src,unsigned keylength,size32_t &retsize, bool rowcompression)
 {

+ 1 - 3
system/jlib/jisem.hpp

@@ -89,7 +89,7 @@ public:
 
 class jlib_decl TokenBucket : public CInterface
 {
-    SpinLock crit;
+    SpinLock crit; // MORE: I suspect this should be a critical section
     Semaphore tokens;
     unsigned tokensAvailable;
     unsigned maxBucketSize;
@@ -122,8 +122,6 @@ class jlib_decl TokenBucket : public CInterface
 
 
 public:
-    IMPLEMENT_IINTERFACE;
-
     TokenBucket(unsigned _tokensPerPeriod, unsigned _period, unsigned _maxBucketSize)
         : tokens(_maxBucketSize), maxBucketSize(_maxBucketSize), tokensPerPeriod(_tokensPerPeriod), period(_period)
     {

+ 4 - 4
system/jlib/jutil.cpp

@@ -51,7 +51,7 @@
 
 #include "portlist.h"
 
-static SpinLock * cvtLock;
+static NonReentrantSpinLock * cvtLock;
 
 #ifdef _WIN32
 static IRandomNumberGenerator * protectedGenerator;
@@ -66,7 +66,7 @@ mach_timebase_info_data_t timebase_info  = { 1,1 };
 
 MODULE_INIT(INIT_PRIORITY_SYSTEM)
 {
-    cvtLock = new SpinLock;
+    cvtLock = new NonReentrantSpinLock;
 #ifdef _WIN32
     protectedGenerator = createRandomNumberGenerator();
     protectedGeneratorCs = new CriticalSection;
@@ -95,7 +95,7 @@ bool safe_ecvt(size_t len, char * buffer, double value, int numDigits, int * dec
 #ifdef _WIN32
     return _ecvt_s(buffer, len, value, numDigits, decimal, sign) == 0;
 #else
-    SpinBlock block(*cvtLock);
+    NonReentrantSpinBlock block(*cvtLock);
     const char * result = ecvt(value, numDigits, decimal, sign);
     if (!result)
         return false;
@@ -109,7 +109,7 @@ bool safe_fcvt(size_t len, char * buffer, double value, int numPlaces, int * dec
 #ifdef _WIN32
     return _fcvt_s(buffer, len, value, numPlaces, decimal, sign) == 0;
 #else
-    SpinBlock block(*cvtLock);
+    NonReentrantSpinBlock block(*cvtLock);
     const char * result = fcvt(value, numPlaces, decimal, sign);
     if (!result)
         return false;

+ 2 - 2
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -452,7 +452,7 @@ protected:
 
         CDistributorBase &owner;
         mutable CriticalSection activeWritersLock;
-        mutable SpinLock totalSzLock;
+        mutable SpinLock totalSzLock; // MORE: Could possibly use an atomic to reduce the scope of this spin lock
         SpinLock doDedupLock;
         IPointerArrayOf<CSendBucket> buckets;
         UnsignedArray candidates;
@@ -533,8 +533,8 @@ protected:
         }
         void decTotal(size32_t sz)
         {
-            SpinBlock b(totalSzLock);
             HDSendPrintLog2("decTotal - %d", sz);
+            SpinBlock b(totalSzLock);
             totalSz -= sz;
             if (sz && senderFull)
             {

+ 1 - 6
thorlcr/activities/indexread/thindexreadslave.cpp

@@ -97,7 +97,7 @@ protected:
     bool localKey;
     __int64 lastSeeks, lastScans;
     UInt64Array _statsArr;
-    SpinLock statLock;
+    SpinLock statLock;  // MORE: Can this be avoided by passing in the delta?
     unsigned __int64 *statsArr;
     size32_t fixedDiskRecordSize;
     rowcount_t progress;
@@ -174,11 +174,6 @@ public:
     {
         lastSeeks = lastScans = 0;
     }
-    inline void incScan()
-    {
-        SpinBlock b(statLock);
-        lastScans++;
-    }
     inline void noteStats(unsigned seeks, unsigned scans)
     {
         SpinBlock b(statLock);

+ 2 - 0
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -561,6 +561,8 @@ class CMarker
 
     rowidx_t getMore(rowidx_t &startRow) // NB: returns end row #
     {
+        //NOTE: If we could guarantee that nextChunkStartRow could not overflow then the spin lock could be replaced
+        //with a atomic fetch_add().
         NonReentrantSpinBlock block(lock);
         if (nextChunkStartRow == rowCount)
             return 0;

+ 1 - 1
thorlcr/activities/selectnth/thselectnthslave.cpp

@@ -27,7 +27,7 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
     rowcount_t lookaheadN, N, startN;
     bool createDefaultIfFail;
     IHThorSelectNArg *helper;
-    SpinLock spin;
+    SpinLock spin; // MORE: Remove this and use an atomic variable for lookaheadN
 
     void initN()
     {

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -354,7 +354,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase
     Semaphore getDoneSem;
     bool initialized, progressActive, progressToCollect;
     CriticalSection progressCrit;
-    SpinLock progressActiveLock;
+    SpinLock progressActiveLock; // MORE use atomic variables (and progressToCollect.exchange()) to remove this spin lock
     bool doneInit = false;
 
 public:

+ 1 - 1
thorlcr/slave/slave.ipp

@@ -36,7 +36,7 @@ protected:
     CThreadedPersistent threaded;
     rowcount_t processed;
     unsigned __int64 lastCycles;
-    SpinLock cycleLock;
+    SpinLock cycleLock; // MORE: Could probably remove this and use atomic variables instead
 
     virtual void endProcess() = 0;
     virtual void process() { }

+ 1 - 1
thorlcr/thorutil/thbuf.cpp

@@ -434,7 +434,7 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff
     IThorRowInterfaces *rowIf;
     ThorRowQueue *in;
     size32_t insz;
-    SpinLock lock;
+    SpinLock lock; // MORE: This lock is held for quite long periods.  I suspect it could be significantly optimized.
     bool waitingin;
     Semaphore waitinsem;
     bool waitingout;