Преглед на файлове

HPCC-12662 Miscellaneous changes

1) Rename existing IBitSet factory to createThreadSafeBitSet
2) Change base CBitSet*Helper::getBits to return const copy
3) Fix typo causing compile error in Windows
3) Other minor changes.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith преди 10 години
родител
ревизия
10d5390aa8

+ 1 - 1
common/remote/sockfile.cpp

@@ -773,7 +773,7 @@ struct CTreeCopyItem: public CInterface
         loc.append(orig);
         dt.set(_dt);
         sz = _sz;
-        busy.setown(createBitSet());
+        busy.setown(createThreadSafeBitSet());
         lastused = msTick();
     }
     bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt) 

+ 1 - 1
common/thorhelper/thorstep.cpp

@@ -494,7 +494,7 @@ CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRa
     stepCompare = _stepCompare;
     equalCompare = _equalCompare;
     input = _input;
-    matched.setown(createBitSet());
+    matched.setown(createThreadSafeBitSet());
     numMatched = 0;
     readIndex = 0;
     numEqualFields = _numEqualFields;

+ 1 - 1
common/thorhelper/thorstep.ipp

@@ -824,7 +824,7 @@ public:
     const void * nextUnqueued();
     bool ensureNonEmpty();
     bool flushUnmatched();
-    void trackUnmatched() { matchedLeft.setown(createBitSet()); }
+    void trackUnmatched() { matchedLeft.setown(createThreadSafeBitSet()); }
 
     inline void consumeNextInput() { rows.enqueue(input->consume()); }
 

+ 1 - 1
dali/base/dautils.cpp

@@ -1936,7 +1936,7 @@ public:
     CPECacheElem(const char *owner, ISortedElementsTreeFilter *_postFilter)
         : CTimedCacheItem(owner), postFilter(_postFilter), postFiltered(0)
     {
-        passesFilter.setown(createBitSet());
+        passesFilter.setown(createThreadSafeBitSet());
     }
     ~CPECacheElem()
     {

+ 1 - 1
dali/sasha/saxref.cpp

@@ -1762,7 +1762,7 @@ public:
                 unsigned numsub = file.getPropInt("@numsubfiles");
                 unsigned n = 0;
                 Owned<IPropertyTreeIterator> iter = file.getElements("SubFile");
-                Owned<IBitSet> parts = createBitSet();
+                Owned<IBitSet> parts = createThreadSafeBitSet();
                 StringArray subname;
                 ForEach(*iter) {
                     IPropertyTree &sfile = iter->query();

+ 1 - 1
ecl/hql/hqlusage.cpp

@@ -247,7 +247,7 @@ FieldAccessAnalyser::FieldAccessAnalyser(IHqlExpression * _selector) : NewHqlTra
 {
     unwindFields(fields, selector->queryRecord());
     numAccessed = 0;
-    accessed.setown(createBitSet());
+    accessed.setown(createThreadSafeBitSet());
 }
 
 IHqlExpression * FieldAccessAnalyser::queryLastFieldAccessed() const

+ 63 - 64
system/jlib/jset.cpp

@@ -23,15 +23,14 @@
 
 //-----------------------------------------------------------------------
 
-// NB: The CBitSet* helpers are primarily avoid the need for virtuals in the implementations
+// NB: The CBitSet*Helper's are primarily avoid the need for virtuals in the implementations
 class CBitSetArrayHelper
 {
 protected:
     ArrayOf<bits_t> bits;
-    mutable CriticalSection crit;
 
-    inline bits_t &getBitSet(unsigned i, bits_t &tmp) { tmp = bits.item(i); return tmp;}
-    inline void setBitSet(unsigned i, bits_t m)
+    inline bits_t getBits(unsigned i) const { return bits.item(i); }
+    inline void setBits(unsigned i, bits_t m)
     {
         bits.replace(m, i);
     }
@@ -56,12 +55,12 @@ protected:
         bitSetUnits = 0;
         mem = NULL;
     }
-    inline bits_t &getBitSet(unsigned i, bits_t &tmp) { return mem[i]; }
-    inline void setBitSet(unsigned i, bits_t m) { } // NOP, getBitset returns ref. in this impl. and the bits_t is set directly
+    inline bits_t getBits(unsigned i) const { return mem[i]; }
+    inline void setBits(unsigned i, bits_t m) { mem[i] = m; }
     inline void addBitSet(bits_t m)
     {
         if (fixedMemory)
-            throw MakeStringException(-1, "CBitSetThreadUnsafe with fixed mem cannot expand");
+            throw MakeStringException(-1, "CBitSet with fixed mem cannot expand");
         mb.append(m);
         mem = (bits_t *)mb.bufferBase();
         ++bitSetUnits;
@@ -75,8 +74,8 @@ class CBitSetBase : public BITSETHELPER, public CSimpleInterfaceOf<IBitSet>
 protected:
     typedef BITSETHELPER PARENT;
     using PARENT::getWidth;
-    using PARENT::getBitSet;
-    using PARENT::setBitSet;
+    using PARENT::getBits;
+    using PARENT::setBits;
     using PARENT::addBitSet;
 
     unsigned _scan(unsigned from, bool tst, bool scninv)
@@ -86,10 +85,9 @@ protected:
         // returns index of first = val >= from
         unsigned n=getWidth();
         unsigned i;
-        bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
         for (i=from/BitsPerItem;i<n;i++)
         {
-            bits_t &m = getBitSet(i, tmpBitSet);
+            bits_t m = getBits(i);
             if (m!=noMatchMask)
             {
 #if defined(__GNUC__)
@@ -122,7 +120,7 @@ protected:
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         m &= ~t;
-                        setBitSet(i, m);
+                        setBits(i, m);
                     }
                     return i*BitsPerItem+pos;
                 }
@@ -134,7 +132,7 @@ protected:
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         m |= t;
-                        setBitSet(i, m);
+                        setBits(i, m);
                     }
                     return i*BitsPerItem+pos;
                 }
@@ -149,7 +147,7 @@ protected:
                             if (scninv)
                             {
                                 m &= ~t;
-                                setBitSet(i, m);
+                                setBits(i, m);
                             }
                             return i*BitsPerItem+j;
                         }
@@ -161,7 +159,7 @@ protected:
                             if (scninv)
                             {
                                 m |= t;
-                                setbitSet(i, m);
+                                setBitSet(i, m);
                             }
                             return i*BitsPerItem+j;
                         }
@@ -191,21 +189,19 @@ protected:
         unsigned i=lo/BitsPerItem;
         if (n<=i)
         {
-            if (val)
+            if (!val)
+                return;
+            while (n < i)
             {
-                while (n < i)
-                {
-                    addBitSet(0);
-                    ++n;
-                }
+                addBitSet(0);
+                ++n;
             }
         }
         else
         {
-            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
             for (;i<n;i++)
             {
-                bits_t &m = getBitSet(i, tmpBitSet);
+                bits_t m;
                 if ((nb>=BitsPerItem)&&(j==0))
                 {
                     if (val)
@@ -216,6 +212,7 @@ protected:
                 }
                 else
                 {
+                    m = getBits(i);
                     bits_t t = ((bits_t)1)<<j;
                     for (;j<BitsPerItem;j++)
                     {
@@ -228,7 +225,7 @@ protected:
                         t <<= 1;
                     }
                 }
-                setBitSet(i, m);
+                setBits(i, m);
                 if (nb==0)
                     return;
                 j = 0;
@@ -272,13 +269,12 @@ public:
         }
         else
         {
-            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
-            bits_t &m = getBitSet(i, tmpBitSet);
+            bits_t m = getBits(i);
             if (val)
                 m |= t;
             else
                 m &= ~t;
-            setBitSet(i, m);
+            setBits(i, m);
         }
     }
     virtual bool invert(unsigned n)
@@ -295,14 +291,13 @@ public:
         }
         else
         {
-            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
-            bits_t &m = getBitSet(i, tmpBitSet);
+            bits_t m = getBits(i);
             ret = 0 == (m&t);
             if (ret)
                 m |= t;
             else
                 m &= ~t;
-            setBitSet(i, m);
+            setBits(i, m);
         }
         return ret;
     }
@@ -312,8 +307,7 @@ public:
         unsigned i=n/BitsPerItem;
         if (i<getWidth())
         {
-            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
-            bits_t &m = getBitSet(i, tmpBitSet);
+            bits_t m = getBits(i);
             if (m&t)
                 return true;
         }
@@ -323,28 +317,32 @@ public:
     {
         bits_t t=((bits_t)1)<<(n%BitsPerItem);
         unsigned i=n/BitsPerItem;
-        bool ret;
         if (i>=getWidth())
         {
-            ret = false;
-            if (!val)
-                return false; // don't bother
-            while (i>getWidth())
-                addBitSet(0);
-            addBitSet(t);
+            if (val)
+            {
+                while (i>getWidth())
+                    addBitSet(0);
+                addBitSet(t);
+            }
+            return false;
         }
         else
         {
-            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
-            bits_t &m = getBitSet(i, tmpBitSet);
-            ret = 0 != (m&t);
-            if (val)
-                m |= t;
+            bits_t m = getBits(i);
+            if (m&t)
+            {
+                if (!val)
+                    setBits(i, m & ~t);
+                return true;
+            }
             else
-                m &= ~t;
-            setBitSet(i, m);
+            {
+                if (val)
+                    setBits(i, m | t);
+                return false;
+            }
         }
-        return ret;
     }
     virtual unsigned scan(unsigned from,bool tst)
     {
@@ -371,8 +369,9 @@ size32_t getBitSetMemoryRequirement(unsigned numBits)
 }
 
 // Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
-class CBitSet : public CBitSetBase<CBitSetArrayHelper>
+class CBitSetThreadSafe : public CBitSetBase<CBitSetArrayHelper>
 {
+    mutable CriticalSection crit;
     void deserialize(MemoryBuffer &buffer)
     {
         CriticalBlock block(crit);
@@ -391,10 +390,10 @@ class CBitSet : public CBitSetBase<CBitSetArrayHelper>
         }
     }
 public:
-    CBitSet()
+    CBitSetThreadSafe()
     {
     }
-    CBitSet(MemoryBuffer &buffer)
+    CBitSetThreadSafe(MemoryBuffer &buffer)
     {
         deserialize(buffer);
     }
@@ -453,13 +452,13 @@ public:
     }
 };
 
-extern jlib_decl IBitSet *createBitSet()
+extern jlib_decl IBitSet *createThreadSafeBitSet()
 {
-    return new CBitSet();
+    return new CBitSetThreadSafe();
 }
 
 
-class CBitSetThreadUnsafe : public CBitSetBase<CBitSetMemoryHelper>
+class CBitSet : public CBitSetBase<CBitSetMemoryHelper>
 {
     void deserialize(MemoryBuffer &buffer)
     {
@@ -479,11 +478,11 @@ class CBitSetThreadUnsafe : public CBitSetBase<CBitSetMemoryHelper>
         fixedMemory = false;
     }
 public:
-    CBitSetThreadUnsafe()
+    CBitSet()
     {
        // In this form, bitSetUnits and mem will be updated when addBitSet expands mb
     }
-    CBitSetThreadUnsafe(size32_t memSz, const void *_mem, bool reset)
+    CBitSet(size32_t memSz, const void *_mem, bool reset)
     {
         bitSetUnits = memSz*sizeof(byte) / sizeof(bits_t);
         mem = (bits_t *)_mem;
@@ -491,7 +490,7 @@ public:
             memset(mem, 0, bitSetUnits*sizeof(bits_t));
         fixedMemory = true;
     }
-    CBitSetThreadUnsafe(MemoryBuffer &buffer)
+    CBitSet(MemoryBuffer &buffer)
     {
         deserialize(buffer);
     }
@@ -506,26 +505,26 @@ public:
     }
 };
 
-extern jlib_decl IBitSet *createBitSetThreadUnsafe(unsigned maxBits, const void *mem, bool reset)
+extern jlib_decl IBitSet *createBitSet(unsigned maxBits, const void *mem, bool reset)
 {
-    return new CBitSetThreadUnsafe(maxBits, mem, reset);
+    return new CBitSet(maxBits, mem, reset);
 }
 
-extern jlib_decl IBitSet *createBitSetThreadUnsafe()
+extern jlib_decl IBitSet *createBitSet()
 {
-    return new CBitSetThreadUnsafe();
+    return new CBitSet();
 }
 
 
 // NB: Doubt you'd want to interchange, but serialization formats are compatible
-extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb)
+extern jlib_decl IBitSet *deserializeThreadSafeBitSet(MemoryBuffer &mb)
 {
-    return new CBitSet(mb);
+    return new CBitSetThreadSafe(mb);
 }
 
-extern jlib_decl IBitSet *deserializeIBitSetThreadUnsafe(MemoryBuffer &mb)
+extern jlib_decl IBitSet *deserializeBitSet(MemoryBuffer &mb)
 {
-    return new CBitSetThreadUnsafe(mb);
+    return new CBitSet(mb);
 }
 
 

+ 10 - 11
system/jlib/jset.hpp

@@ -38,28 +38,27 @@ interface jlib_decl IBitSet : public IInterface
     virtual void serialize(MemoryBuffer &buffer) const = 0;
 };
 
-extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb);
-
 // type of underlying bit storage, exposed so thread-unsafe version can know boundaries
 typedef unsigned bits_t;
 enum { BitsPerItem = sizeof(bits_t) * 8 };
 
 
-// returns number of bytes required to represent numBits in memory
-extern jlib_decl size32_t getBitSetMemoryRequirement(unsigned numBits);
-
 // Simple BitSet // 0 based, all intermediate items exist, operations threadsafe and atomic
-extern jlib_decl IBitSet *createBitSet();
+extern jlib_decl IBitSet *createThreadSafeBitSet();
+extern jlib_decl IBitSet *deserializeThreadSafeBitSet(MemoryBuffer &mb);
 
-/* Thread unsafe, but can be significantly faster.
- * Client provide a fixed block of memory used for the bit set, threads must ensure they do not set bits
+/* Not thread safe, but can be significantly faster than createThreadSafeBitSet
+ * Client provides a fixed block of memory used for the bit set, threads must ensure they do not set bits
  * in parallel within the same bits_t space.
  * IOW, e.g. bits 0-sizeof(bits_t) must be set from only 1 thread at a time.
  */
-extern jlib_decl IBitSet *createBitSetThreadUnsafe(size32_t memSize, const void *mem, bool reset=true);
+extern jlib_decl IBitSet *createBitSet(size32_t memSize, const void *mem, bool reset=true);
+// This form allows the size of the bit set to be dynamic. No guarantees about threading.
+extern jlib_decl IBitSet *createBitSet();
+extern jlib_decl IBitSet *deserializeBitSet(MemoryBuffer &mb);
+// returns number of bytes required to represent numBits in memory
+extern jlib_decl size32_t getBitSetMemoryRequirement(unsigned numBits);
 
-// This form allows the size of the bit set to be dynamic, but there are no guarantees about threading
-extern jlib_decl IBitSet *createBitSetThreadUnsafe();
 
 
 

+ 9 - 9
testing/unittests/jlibtests.cpp

@@ -140,26 +140,26 @@ protected:
         const unsigned numBits = 400;
         for (unsigned pass=0; pass < 10000; pass++)
         {
-            Owned<IBitSet> bs = createBitSet();
+            Owned<IBitSet> bs = createThreadSafeBitSet();
             testSet1(initial, bs, 0, numBits, setValue, clearValue);
         }
         unsigned elapsed = msTick()-now;
-        now = msTick();
         fprintf(stdout, "Bit test (%u) time taken = %dms\n", initial, elapsed);
+        now = msTick();
         for (unsigned pass=0; pass < 10000; pass++)
         {
-            Owned<IBitSet> bs = createBitSetThreadUnsafe();
+            Owned<IBitSet> bs = createBitSet();
             testSet1(initial, bs, 0, numBits, setValue, clearValue);
         }
         elapsed = msTick()-now;
-        now = msTick();
         fprintf(stdout, "Bit test [thread-unsafe version] (%u) time taken = %dms\n", initial, elapsed);
-        size32_t bitSetMemSz = getBitSetMemoryRequirement(400);
+        now = msTick();
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits+5);
         MemoryBuffer mb;
         void *mem = mb.reserveTruncate(bitSetMemSz);
         for (unsigned pass=0; pass < 10000; pass++)
         {
-            Owned<IBitSet> bs = createBitSetThreadUnsafe(bitSetMemSz, mem);
+            Owned<IBitSet> bs = createBitSet(bitSetMemSz, mem);
             testSet1(initial, bs, 0, numBits, setValue, clearValue);
         }
         elapsed = msTick()-now;
@@ -279,19 +279,19 @@ protected:
         unsigned numBits = 1000000; // 10M
         unsigned nThreads = getAffinityCpus();
         unsigned bitsPerThread = numBits/nThreads;
-        bitsPerThread = (bitsPerThread + (BitsPerItem-1)) / BitsPerItem * BitsPerItem; // round up to multiple of BitsPerItem
+        bitsPerThread = ((bitsPerThread + (BitsPerItem-1)) / BitsPerItem) * BitsPerItem; // round up to multiple of BitsPerItem
         numBits = bitsPerThread*nThreads; // round
 
         fprintf(stdout, "testSetParallel, testing bit set of size : %d, nThreads=%d\n", numBits, nThreads);
 
-        Owned<IBitSet> bitSet = createBitSet();
+        Owned<IBitSet> bitSet = createThreadSafeBitSet();
         unsigned took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
         fprintf(stdout, "Thread safe parallel bit set test (%u) time taken = %dms\n", initial, took);
 
         size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits);
         MemoryBuffer mb;
         void *mem = mb.reserveTruncate(bitSetMemSz);
-        bitSet.setown(createBitSetThreadUnsafe(bitSetMemSz, mem));
+        bitSet.setown(createBitSet(bitSetMemSz, mem));
         took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
         fprintf(stdout, "Thread unsafe parallel bit set test (%u) time taken = %dms\n", initial, took);
     }

+ 2 - 2
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -341,8 +341,8 @@ public:
                     localLastPart[subFile] = pnum;
             }
             headerLinesRemaining.allocateN(subFiles);
-            gotHeaderLines.setown(createBitSet());
-            sentHeaderLines.setown(createBitSet());
+            gotHeaderLines.setown(createThreadSafeBitSet());
+            sentHeaderLines.setown(createThreadSafeBitSet());
         }
         partHandler.setown(new CCsvPartHandler(*this));
         appendOutputLinked(this);

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -628,7 +628,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
                 try
                 {
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
-                    Owned<IBitSet> endRequests = createBitSet(); // NB: verification only
+                    Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
                     while (!aborted)
                     {
                         rank_t sender;
@@ -735,7 +735,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
                     rank_t sender;
                     CMessageBuffer msg;
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
-                    Owned<IBitSet> endRequests = createBitSet(); // NB: verification only
+                    Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
 
                     Owned<IRowInterfaces> fetchDiskRowIf = createRowInterfaces(owner.helper->queryDiskRecordSize(),owner.queryActivityId(),owner.queryCodeContext());
                     while (!aborted)

+ 5 - 5
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -363,8 +363,8 @@ public:
         allDone = allDoneWaiting = allRequestStop = stopping = stopRecv = false;
         myNode = activity.queryJob().queryMyRank();
         slaves = activity.queryJob().querySlaves();
-        slavesDone.setown(createBitSet());
-        slavesStopping.setown(createBitSet());
+        slavesDone.setown(createThreadSafeBitSet());
+        slavesStopping.setown(createThreadSafeBitSet());
         mpTag = TAG_NULL;
         recvInterface = NULL;
     }
@@ -562,7 +562,7 @@ public:
         if (threadSafeBitSet)
         {
             DBGLOG("Using Thread safe variety of IBitSet");
-            bitSet.setown(createBitSet());
+            bitSet.setown(createThreadSafeBitSet());
         }
         else
         {
@@ -572,7 +572,7 @@ public:
                 return false;
 
             bitSetMem.setown(pBitSetMem);
-            bitSet.setown(createBitSetThreadUnsafe(bitSetMemSz, pBitSetMem));
+            bitSet.setown(createBitSet(bitSetMemSz, pBitSetMem));
         }
         return true;
     }
@@ -607,7 +607,7 @@ public:
                 threadCount = rowCount / chunkSize;
             }
             // Must be multiple of sizeof BitsPerItem
-            chunkSize = (chunkSize + (BitsPerItem-1)) / BitsPerItem * BitsPerItem; // round up to nearest multiple of BitsPerItem
+            chunkSize = ((chunkSize + (BitsPerItem-1)) / BitsPerItem) * BitsPerItem; // round up to nearest multiple of BitsPerItem
 
             /* This is yet another case of requiring a set of small worker threads
              * Thor should really use a common pool of lightweight threadlets made available to all

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -164,7 +164,7 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
         // similar to sync, but continiously listens for messages from slaves
         // slave only sends if above threashold, or if was at threshold and non empty
         // this routine is here to spot when all are whirling around processing nothing for > threshold
-        Owned<IBitSet> emptyIterations = createBitSet();
+        Owned<IBitSet> emptyIterations = createThreadSafeBitSet();
         unsigned loopEnds = 0;
         unsigned nodes = container.queryJob().querySlaves();
         unsigned n = nodes;

+ 2 - 2
thorlcr/graph/thgraph.cpp

@@ -371,11 +371,11 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
         throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
     alreadyUpdated = false;
     whichBranch = (unsigned)-1;
-    whichBranchBitSet.setown(createBitSet());
+    whichBranchBitSet.setown(createThreadSafeBitSet());
     newWhichBranch = false;
     isEof = false;
     log = true;
-    sentActInitData.setown(createBitSet());
+    sentActInitData.setown(createThreadSafeBitSet());
 }
 
 CGraphElementBase::~CGraphElementBase()

+ 3 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -334,7 +334,7 @@ void CSlaveMessageHandler::main()
 
 CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
 {
-    notedWarnings = createBitSet();
+    notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
     data = new MemoryBuffer[container.queryJob().querySlaves()];
     asyncStart = false;
@@ -652,7 +652,7 @@ public:
         unsigned s=comm->queryGroup().ordinality()-1;
         bool aborted = false;
         CMessageBuffer msg;
-        Owned<IBitSet> raisedSet = createBitSet();
+        Owned<IBitSet> raisedSet = createThreadSafeBitSet();
         unsigned remaining = timeout;
         while (s--)
         {
@@ -1345,7 +1345,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
     }
     if (sendOnly) return;
     unsigned respondents = 0;
-    Owned<IBitSet> bitSet = createBitSet();
+    Owned<IBitSet> bitSet = createThreadSafeBitSet();
     loop
     {
         rank_t sender;

+ 2 - 2
thorlcr/master/thmastermain.cpp

@@ -137,7 +137,7 @@ public:
 
     CRegistryServer()  : deregistrationWatch(*this), stopped(false)
     {
-        status = createBitSet();
+        status = createThreadSafeBitSet();
         msgDelay = SLAVEREG_VERIFY_DELAY;
         slavesRegistered = 0;
         if (globals->getPropBool("@watchdogEnabled"))
@@ -201,7 +201,7 @@ public:
         unsigned timeWaited = 0;
         unsigned connected = 0;
         unsigned slaves = queryClusterWidth();
-        Owned<IBitSet> connectedSet = createBitSet();
+        Owned<IBitSet> connectedSet = createThreadSafeBitSet();
         loop
         {
             CTimeMon tm(msgDelay);

+ 1 - 1
thorlcr/thorutil/thbuf.cpp

@@ -263,7 +263,7 @@ public:
         numblocks = 0;
         insz = 0;
         eoi = false;
-        diskfree.setown(createBitSet()); 
+        diskfree.setown(createThreadSafeBitSet()); 
 
 #ifdef _FULL_TRACE
         ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);

+ 1 - 1
thorlcr/thorutil/thorport.cpp

@@ -43,7 +43,7 @@ static IBitSet *portmap;
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
     portallocsection = new CriticalSection;
-    portmap = createBitSet();
+    portmap = createThreadSafeBitSet();
     portmap->set(MPPORT, true);
     portmap->set(WATCHDOGPORT, true);
     return true;