Browse Source

Merge pull request #6721 from jakesmith/hpcc-12662

HPCC-12662 Add single threaded version of IBitSet

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
7f95004766

+ 1 - 1
common/remote/sockfile.cpp

@@ -773,7 +773,7 @@ struct CTreeCopyItem: public CInterface
         loc.append(orig);
         loc.append(orig);
         dt.set(_dt);
         dt.set(_dt);
         sz = _sz;
         sz = _sz;
-        busy.setown(createBitSet());
+        busy.setown(createThreadSafeBitSet());
         lastused = msTick();
         lastused = msTick();
     }
     }
     bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt) 
     bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt) 

+ 1 - 1
common/thorhelper/thorstep.cpp

@@ -494,7 +494,7 @@ CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRa
     stepCompare = _stepCompare;
     stepCompare = _stepCompare;
     equalCompare = _equalCompare;
     equalCompare = _equalCompare;
     input = _input;
     input = _input;
-    matched.setown(createBitSet());
+    matched.setown(createThreadSafeBitSet());
     numMatched = 0;
     numMatched = 0;
     readIndex = 0;
     readIndex = 0;
     numEqualFields = _numEqualFields;
     numEqualFields = _numEqualFields;

+ 1 - 1
common/thorhelper/thorstep.ipp

@@ -824,7 +824,7 @@ public:
     const void * nextUnqueued();
     const void * nextUnqueued();
     bool ensureNonEmpty();
     bool ensureNonEmpty();
     bool flushUnmatched();
     bool flushUnmatched();
-    void trackUnmatched() { matchedLeft.setown(createBitSet()); }
+    void trackUnmatched() { matchedLeft.setown(createThreadSafeBitSet()); }
 
 
     inline void consumeNextInput() { rows.enqueue(input->consume()); }
     inline void consumeNextInput() { rows.enqueue(input->consume()); }
 
 

+ 1 - 1
dali/base/dautils.cpp

@@ -1936,7 +1936,7 @@ public:
     CPECacheElem(const char *owner, ISortedElementsTreeFilter *_postFilter)
     CPECacheElem(const char *owner, ISortedElementsTreeFilter *_postFilter)
         : CTimedCacheItem(owner), postFilter(_postFilter), postFiltered(0)
         : CTimedCacheItem(owner), postFilter(_postFilter), postFiltered(0)
     {
     {
-        passesFilter.setown(createBitSet());
+        passesFilter.setown(createThreadSafeBitSet());
     }
     }
     ~CPECacheElem()
     ~CPECacheElem()
     {
     {

+ 1 - 1
dali/sasha/saxref.cpp

@@ -1762,7 +1762,7 @@ public:
                 unsigned numsub = file.getPropInt("@numsubfiles");
                 unsigned numsub = file.getPropInt("@numsubfiles");
                 unsigned n = 0;
                 unsigned n = 0;
                 Owned<IPropertyTreeIterator> iter = file.getElements("SubFile");
                 Owned<IPropertyTreeIterator> iter = file.getElements("SubFile");
-                Owned<IBitSet> parts = createBitSet();
+                Owned<IBitSet> parts = createThreadSafeBitSet();
                 StringArray subname;
                 StringArray subname;
                 ForEach(*iter) {
                 ForEach(*iter) {
                     IPropertyTree &sfile = iter->query();
                     IPropertyTree &sfile = iter->query();

+ 1 - 1
ecl/hql/hqlusage.cpp

@@ -247,7 +247,7 @@ FieldAccessAnalyser::FieldAccessAnalyser(IHqlExpression * _selector) : NewHqlTra
 {
 {
     unwindFields(fields, selector->queryRecord());
     unwindFields(fields, selector->queryRecord());
     numAccessed = 0;
     numAccessed = 0;
-    accessed.setown(createBitSet());
+    accessed.setown(createThreadSafeBitSet());
 }
 }
 
 
 IHqlExpression * FieldAccessAnalyser::queryLastFieldAccessed() const
 IHqlExpression * FieldAccessAnalyser::queryLastFieldAccessed() const

+ 350 - 152
system/jlib/jset.cpp

@@ -23,121 +23,71 @@
 
 
 //-----------------------------------------------------------------------
 //-----------------------------------------------------------------------
 
 
-// Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
-
-class CBitSet : public CInterface, implements IBitSet
+// NB: The CBitSet*Helper's are primarily avoid the need for virtuals in the implementations
+class CBitSetArrayHelper
 {
 {
-public:
-    IMPLEMENT_IINTERFACE;
 protected:
 protected:
-    //unsigned seems to be most efficient, and required for __builtin_ffs below
-    typedef unsigned bits_t;
-    enum { BitsPerItem = sizeof(bits_t) * 8 };
     ArrayOf<bits_t> bits;
     ArrayOf<bits_t> bits;
-    mutable CriticalSection crit;
 
 
-public:
-    CBitSet() { }
-    CBitSet(MemoryBuffer &buffer)
+    inline bits_t getBits(unsigned i) const { return bits.item(i); }
+    inline void setBits(unsigned i, bits_t m)
     {
     {
-        deserialize(buffer);
+        bits.replace(m, i);
     }
     }
-    void set(unsigned n,bool val) 
+    inline void addBitSet(bits_t m)
     {
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        if (i>=bits.ordinality()) {
-            if (!val)
-                return; // don't bother
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-        }
-        else {
-            bits_t m=bits.item(i);
-            if (val)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
+        bits.append(m);
     }
     }
-        
-    bool invert(unsigned n) 
-    {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        bool ret;
-        if (i>=bits.ordinality()) {
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-            ret = true;
-        }
-        else {
-            bits_t m=bits.item(i);
-            ret = ((m&t)==0);
-            if (ret)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
-        return ret;
-    }
-        
-    bool test(unsigned n) 
+    inline unsigned getWidth() const { return bits.ordinality(); }
+};
+
+class CBitSetMemoryHelper
+{
+protected:
+    bits_t *mem;
+    unsigned bitSetUnits;
+    MemoryBuffer mb; // Used if mem not provided, also implies expansion allowed
+    bool fixedMemory;
+
+    CBitSetMemoryHelper()
     {
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        if (i<bits.ordinality()) {
-            bits_t m=bits.item(i);
-            if (m&t)
-                return true;
-        }
-        return false;
+        fixedMemory = false;
+        bitSetUnits = 0;
+        mem = NULL;
     }
     }
-        
-    bool testSet(unsigned n,bool val) 
+    inline bits_t getBits(unsigned i) const { return mem[i]; }
+    inline void setBits(unsigned i, bits_t m) { mem[i] = m; }
+    inline void addBitSet(bits_t m)
     {
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        bool ret;
-        if (i>=bits.ordinality()) {
-            ret = false;
-            if (!val)
-                return false; // don't bother
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-        }
-        else {
-            bits_t m=bits.item(i);
-            ret = (m&t)!=0;
-            if (val)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
-        return ret;
+        if (fixedMemory)
+            throw MakeStringException(-1, "CBitSet with fixed mem cannot expand");
+        mb.append(m);
+        mem = (bits_t *)mb.bufferBase();
+        ++bitSetUnits;
     }
     }
+    inline unsigned getWidth() const { return bitSetUnits; }
+};
+
+template <class BITSETHELPER>
+class CBitSetBase : public BITSETHELPER, public CSimpleInterfaceOf<IBitSet>
+{
+protected:
+    typedef BITSETHELPER PARENT;
+    using PARENT::getWidth;
+    using PARENT::getBits;
+    using PARENT::setBits;
+    using PARENT::addBitSet;
 
 
-    unsigned _scan(unsigned from,bool tst,bool scninv)
+    unsigned _scan(unsigned from, bool tst, bool scninv)
     {
     {
         bits_t noMatchMask=tst?0:(bits_t)-1;
         bits_t noMatchMask=tst?0:(bits_t)-1;
         unsigned j=from%BitsPerItem;
         unsigned j=from%BitsPerItem;
-        CriticalBlock block(crit);
         // returns index of first = val >= from
         // returns index of first = val >= from
-        unsigned n=bits.ordinality();
+        unsigned n=getWidth();
         unsigned i;
         unsigned i;
         for (i=from/BitsPerItem;i<n;i++)
         for (i=from/BitsPerItem;i<n;i++)
         {
         {
-            bits_t m=bits.item(i);
+            bits_t m = getBits(i);
             if (m!=noMatchMask)
             if (m!=noMatchMask)
             {
             {
 #if defined(__GNUC__)
 #if defined(__GNUC__)
@@ -170,7 +120,7 @@ public:
                     {
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         bits_t t = ((bits_t)1)<<pos;
                         m &= ~t;
                         m &= ~t;
-                        bits.replace(m,i);
+                        setBits(i, m);
                     }
                     }
                     return i*BitsPerItem+pos;
                     return i*BitsPerItem+pos;
                 }
                 }
@@ -182,7 +132,7 @@ public:
                     {
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         bits_t t = ((bits_t)1)<<pos;
                         m |= t;
                         m |= t;
-                        bits.replace(m,i);
+                        setBits(i, m);
                     }
                     }
                     return i*BitsPerItem+pos;
                     return i*BitsPerItem+pos;
                 }
                 }
@@ -197,7 +147,7 @@ public:
                             if (scninv)
                             if (scninv)
                             {
                             {
                                 m &= ~t;
                                 m &= ~t;
-                                bits.replace(m,i);
+                                setBits(i, m);
                             }
                             }
                             return i*BitsPerItem+j;
                             return i*BitsPerItem+j;
                         }
                         }
@@ -209,7 +159,7 @@ public:
                             if (scninv)
                             if (scninv)
                             {
                             {
                                 m |= t;
                                 m |= t;
-                                bits.replace(m,i);
+                                setBitSet(i, m);
                             }
                             }
                             return i*BitsPerItem+j;
                             return i*BitsPerItem+j;
                         }
                         }
@@ -220,7 +170,7 @@ public:
             }
             }
             j = 0;
             j = 0;
         }
         }
-        if (tst) 
+        if (tst)
             return (unsigned)-1;
             return (unsigned)-1;
         unsigned ret = n*BitsPerItem;
         unsigned ret = n*BitsPerItem;
         if (n*BitsPerItem<from)
         if (n*BitsPerItem<from)
@@ -229,93 +179,215 @@ public:
             set(ret,true);
             set(ret,true);
         return ret;
         return ret;
     }
     }
-
-    unsigned scan(unsigned from,bool tst)
-    {
-        return _scan(from,tst,false);
-    }
-
-    unsigned scanInvert(unsigned from,bool tst) // like scan but inverts bit as well
-    {
-        return _scan(from,tst,true);
-    }
-
-    void _incl(unsigned lo, unsigned hi,bool val)
+    void _incl(unsigned lo, unsigned hi, bool val)
     {
     {
         if (hi<lo)
         if (hi<lo)
             return;
             return;
         unsigned j=lo%BitsPerItem;
         unsigned j=lo%BitsPerItem;
         unsigned nb=(hi-lo)+1;
         unsigned nb=(hi-lo)+1;
-        CriticalBlock block(crit);
-        unsigned n=bits.ordinality();
-        unsigned i;
-        for (i=lo/BitsPerItem;i<n;i++) {
-            bits_t m;
-            if ((nb>=BitsPerItem)&&(j==0)) {
-                m = i;
-                nb -= BitsPerItem;
+        unsigned n=getWidth();
+        unsigned i=lo/BitsPerItem;
+        if (n<=i)
+        {
+            if (!val)
+                return;
+            while (n < i)
+            {
+                addBitSet(0);
+                ++n;
             }
             }
-            else {
-                m=bits.item(i);
+            if (j>0)
+            {
+                bits_t m = 0;
                 bits_t t = ((bits_t)1)<<j;
                 bits_t t = ((bits_t)1)<<j;
-                for (;j<BitsPerItem;j++) {
-                    if (val)
-                        m |= t;
-                    else
-                        m &= ~t;
+                for (;j<BitsPerItem;j++)
+                {
+                    m |= t;
                     if (--nb==0)
                     if (--nb==0)
                         break;
                         break;
                     t <<= 1;
                     t <<= 1;
                 }
                 }
+                addBitSet(m);
             }
             }
-            bits.replace(m,i);
             if (nb==0)
             if (nb==0)
                 return;
                 return;
             j = 0;
             j = 0;
         }
         }
-        if (val) {
-            while (nb>=BitsPerItem) {
-                bits.append((bits_t)-1);
-                nb-=BitsPerItem;
+        else
+        {
+            for (;i<n;i++)
+            {
+                bits_t m;
+                if ((nb>=BitsPerItem)&&(j==0))
+                {
+                    if (val)
+                        m = (bits_t)-1;
+                    else
+                        m = 0;
+                    nb -= BitsPerItem;
+                }
+                else
+                {
+                    m = getBits(i);
+                    bits_t t = ((bits_t)1)<<j;
+                    for (;j<BitsPerItem;j++)
+                    {
+                        if (val)
+                            m |= t;
+                        else
+                            m &= ~t;
+                        if (--nb==0)
+                            break;
+                        t <<= 1;
+                    }
+                }
+                setBits(i, m);
+                if (nb==0)
+                    return;
+                j = 0;
+            }
+        }
+        if (val)
+        {
+            while (nb>=BitsPerItem)
+            {
+                addBitSet((bits_t)-1);
+                nb -= BitsPerItem;
             }
             }
-            if (nb>0) {
+            if (nb>0)
+            {
                 bits_t m=0;
                 bits_t m=0;
                 bits_t t = ((bits_t)1)<<j;
                 bits_t t = ((bits_t)1)<<j;
-                for (;j<BitsPerItem;j++) {
+                for (;j<BitsPerItem;j++)
+                {
                     m |= t;
                     m |= t;
                     if (--nb==0)
                     if (--nb==0)
                         break;
                         break;
                     t <<= 1;
                     t <<= 1;
                 }
                 }
-                bits.append(m);
+                addBitSet(m);
             }
             }
         }
         }
     }
     }
-
-    void incl(unsigned lo, unsigned hi)
+public:
+// IBitSet impl.
+    virtual void set(unsigned n, bool val)
     {
     {
-        _incl(lo,hi,true);
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i = n/BitsPerItem;
+        if (i>=getWidth())
+        {
+            if (!val)
+                return; // don't bother
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+        }
+        else
+        {
+            bits_t m = getBits(i);
+            if (val)
+                m |= t;
+            else
+                m &= ~t;
+            setBits(i, m);
+        }
     }
     }
-
-    void excl(unsigned lo, unsigned hi)
+    virtual bool invert(unsigned n)
     {
     {
-        _incl(lo,hi,false);
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        bool ret;
+        if (i>=getWidth())
+        {
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+            ret = true;
+        }
+        else
+        {
+            bits_t m = getBits(i);
+            ret = 0 == (m&t);
+            if (ret)
+                m |= t;
+            else
+                m &= ~t;
+            setBits(i, m);
+        }
+        return ret;
     }
     }
-
-    void reset()
+    virtual bool test(unsigned n)
     {
     {
-        CriticalBlock block(crit);
-        bits.kill();
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        if (i<getWidth())
+        {
+            bits_t m = getBits(i);
+            if (m&t)
+                return true;
+        }
+        return false;
     }
     }
-
-    void serialize(MemoryBuffer &buffer) const
+    virtual bool testSet(unsigned n, bool val)
     {
     {
-        CriticalBlock block(crit);
-        buffer.append(bits.ordinality());
-        ForEachItemIn(b, bits)
-            buffer.append(bits.item(b));
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        if (i>=getWidth())
+        {
+            if (val)
+            {
+                while (i>getWidth())
+                    addBitSet(0);
+                addBitSet(t);
+            }
+            return false;
+        }
+        else
+        {
+            bits_t m = getBits(i);
+            if (m&t)
+            {
+                if (!val)
+                    setBits(i, m & ~t);
+                return true;
+            }
+            else
+            {
+                if (val)
+                    setBits(i, m | t);
+                return false;
+            }
+        }
     }
     }
+    virtual unsigned scan(unsigned from,bool tst)
+    {
+        return _scan(from,tst,false);
+    }
+    virtual unsigned scanInvert(unsigned from,bool tst) // like scan but inverts bit as well
+    {
+        return _scan(from,tst,true);
+    }
+    virtual void incl(unsigned lo, unsigned hi)
+    {
+        _incl(lo,hi,true);
+    }
+    virtual void excl(unsigned lo, unsigned hi)
+    {
+        _incl(lo,hi,false);
+    }
+};
+
+size32_t getBitSetMemoryRequirement(unsigned numBits)
+{
+    unsigned bitSetUnits = (numBits + (BitsPerItem-1)) / BitsPerItem;
+    return bitSetUnits * sizeof(bits_t);
+}
 
 
+// Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
+class CBitSetThreadSafe : public CBitSetBase<CBitSetArrayHelper>
+{
+    mutable CriticalSection crit;
     void deserialize(MemoryBuffer &buffer)
     void deserialize(MemoryBuffer &buffer)
     {
     {
         CriticalBlock block(crit);
         CriticalBlock block(crit);
@@ -333,14 +405,140 @@ public:
             }
             }
         }
         }
     }
     }
+public:
+    CBitSetThreadSafe()
+    {
+    }
+    CBitSetThreadSafe(MemoryBuffer &buffer)
+    {
+        deserialize(buffer);
+    }
+// IBitSet overloads
+    virtual void set(unsigned n, bool val)
+    {
+        CriticalBlock block(crit);
+        CBitSetBase::set(n, val);
+    }
+    virtual bool invert(unsigned n)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::invert(n);
+    }
+    virtual bool test(unsigned n)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::test(n);
+    }
+    virtual bool testSet(unsigned n, bool val)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::testSet(n, val);
+    }
+    virtual unsigned scan(unsigned from, bool tst)
+    {
+        CriticalBlock block(crit);
+        return _scan(from,tst,false);
+    }
+    virtual unsigned scanInvert(unsigned from, bool tst) // like scan but inverts bit as well
+    {
+        CriticalBlock block(crit);
+        return _scan(from,tst,true);
+    }
+    virtual void incl(unsigned lo, unsigned hi)
+    {
+        CriticalBlock block(crit);
+        _incl(lo,hi,true);
+    }
+    virtual void excl(unsigned lo, unsigned hi)
+    {
+        CriticalBlock block(crit);
+        _incl(lo,hi,false);
+    }
+    virtual void reset()
+    {
+        CriticalBlock block(crit);
+        bits.kill();
+    }
+    virtual void serialize(MemoryBuffer &buffer) const
+    {
+        CriticalBlock block(crit);
+        buffer.append(bits.ordinality());
+        ForEachItemIn(b, bits)
+            buffer.append(bits.item(b));
+    }
+};
+
+extern jlib_decl IBitSet *createThreadSafeBitSet()
+{
+    return new CBitSetThreadSafe();
+}
+
+
+class CBitSet : public CBitSetBase<CBitSetMemoryHelper>
+{
+    void deserialize(MemoryBuffer &buffer)
+    {
+        unsigned count;
+        buffer.read(count);
+        if (count)
+        {
+            unsigned bitSets = count/BitsPerItem;
+            bitSetUnits = bitSets;
+            mem = (bits_t *)mb.reserveTruncate(bitSets*sizeof(bits_t));
+        }
+        else
+        {
+            bitSetUnits = 0;
+            mem = NULL;
+        }
+        fixedMemory = false;
+    }
+public:
+    CBitSet()
+    {
+       // In this form, bitSetUnits and mem will be updated when addBitSet expands mb
+    }
+    CBitSet(size32_t memSz, const void *_mem, bool reset)
+    {
+        bitSetUnits = memSz*sizeof(byte) / sizeof(bits_t);
+        mem = (bits_t *)_mem;
+        if (reset)
+            memset(mem, 0, bitSetUnits*sizeof(bits_t));
+        fixedMemory = true;
+    }
+    CBitSet(MemoryBuffer &buffer)
+    {
+        deserialize(buffer);
+    }
+    virtual void reset()
+    {
+        memset(mem, 0, sizeof(bits_t)*bitSetUnits);
+    }
+    virtual void serialize(MemoryBuffer &buffer) const
+    {
+        buffer.append((unsigned)(BitsPerItem*bitSetUnits));
+        buffer.append(bitSetUnits*sizeof(bits_t), mem);
+    }
 };
 };
 
 
+extern jlib_decl IBitSet *createBitSet(unsigned maxBits, const void *mem, bool reset)
+{
+    return new CBitSet(maxBits, mem, reset);
+}
+
 extern jlib_decl IBitSet *createBitSet()
 extern jlib_decl IBitSet *createBitSet()
 {
 {
     return new CBitSet();
     return new CBitSet();
 }
 }
 
 
-extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb)
+
+// NB: Doubt you'd want to interchange, but serialization formats are compatible
+extern jlib_decl IBitSet *deserializeThreadSafeBitSet(MemoryBuffer &mb)
+{
+    return new CBitSetThreadSafe(mb);
+}
+
+extern jlib_decl IBitSet *deserializeBitSet(MemoryBuffer &mb)
 {
 {
     return new CBitSet(mb);
     return new CBitSet(mb);
 }
 }

+ 18 - 2
system/jlib/jset.hpp

@@ -38,10 +38,26 @@ interface jlib_decl IBitSet : public IInterface
     virtual void serialize(MemoryBuffer &buffer) const = 0;
     virtual void serialize(MemoryBuffer &buffer) const = 0;
 };
 };
 
 
-extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb);
+// type of underlying bit storage, exposed so thread-unsafe version can know boundaries
+typedef unsigned bits_t;
+enum { BitsPerItem = sizeof(bits_t) * 8 };
+
 
 
 // Simple BitSet // 0 based, all intermediate items exist, operations threadsafe and atomic
 // Simple BitSet // 0 based, all intermediate items exist, operations threadsafe and atomic
-extern jlib_decl IBitSet *createBitSet(); 
+extern jlib_decl IBitSet *createThreadSafeBitSet();
+extern jlib_decl IBitSet *deserializeThreadSafeBitSet(MemoryBuffer &mb);
+
+/* Not thread safe, but can be significantly faster than createThreadSafeBitSet
+ * Client provides a fixed block of memory used for the bit set, threads must ensure they do not set bits
+ * in parallel within the same bits_t space.
+ * IOW, e.g. bits 0-sizeof(bits_t) must be set from only 1 thread at a time.
+ */
+extern jlib_decl IBitSet *createBitSet(size32_t memSize, const void *mem, bool reset=true);
+// This form allows the size of the bit set to be dynamic. No guarantees about threading.
+extern jlib_decl IBitSet *createBitSet();
+extern jlib_decl IBitSet *deserializeBitSet(MemoryBuffer &mb);
+// returns number of bytes required to represent numBits in memory
+extern jlib_decl size32_t getBitSetMemoryRequirement(unsigned numBits);
 
 
 
 
 
 

+ 202 - 33
testing/unittests/jlibtests.cpp

@@ -95,56 +95,225 @@ public:
 
 
 protected:
 protected:
 
 
+    void testSet1(bool initial, IBitSet *bs, unsigned start, unsigned numBits, bool setValue, bool clearValue)
+    {
+        unsigned end = start+numBits;
+        if (initial)
+            bs->incl(start, end);
+        for (unsigned i=start; i < end; i++)
+        {
+            ASSERT(bs->test(i) == clearValue);
+            bs->set(i, setValue);
+            ASSERT(bs->test(i) == setValue);
+
+            bs->set(i+5, setValue);
+            ASSERT(bs->scan(0, setValue) == i);
+            ASSERT(bs->scan(i+1, setValue) == i+5);
+            bs->set(i, clearValue);
+            bs->set(i+5, clearValue);
+            unsigned match1 = bs->scan(0, setValue);
+            ASSERT(match1 == initial ? -1 : end);
+
+            bs->invert(i);
+            ASSERT(bs->test(i) == setValue);
+            bs->invert(i);
+            ASSERT(bs->test(i) == clearValue);
+
+            bool wasSet = bs->testSet(i, setValue);
+            ASSERT(wasSet == clearValue);
+            bool wasSet2 = bs->testSet(i, clearValue);
+            ASSERT(wasSet2 == setValue);
+            ASSERT(bs->test(i) == clearValue);
+
+            bs->set(i, setValue);
+            unsigned match = bs->scanInvert(0, setValue);
+            ASSERT(match == i);
+            ASSERT(bs->test(i) == clearValue);
+        }
+        bs->reset();
+        if (initial)
+        {
+            bs->incl(start, end);
+            bs->excl(start+5, end-5);
+        }
+        else
+            bs->incl(start+5, end-5);
+        unsigned inclStart = bs->scan(start, setValue);
+        ASSERT((start+5) == inclStart);
+        unsigned inclEnd = bs->scan(start+5, clearValue);
+        ASSERT((end-5) == (inclEnd-1));
+    }
+
     void testSet(bool initial)
     void testSet(bool initial)
     {
     {
         unsigned now = msTick();
         unsigned now = msTick();
         bool setValue = !initial;
         bool setValue = !initial;
         bool clearValue = initial;
         bool clearValue = initial;
         const unsigned numBits = 400;
         const unsigned numBits = 400;
-        for (unsigned pass=0; pass< 10000; pass++)
+        for (unsigned pass=0; pass < 10000; pass++)
+        {
+            Owned<IBitSet> bs = createThreadSafeBitSet();
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        unsigned elapsed = msTick()-now;
+        fprintf(stdout, "Bit test (%u) time taken = %dms\n", initial, elapsed);
+        now = msTick();
+        for (unsigned pass=0; pass < 10000; pass++)
         {
         {
             Owned<IBitSet> bs = createBitSet();
             Owned<IBitSet> bs = createBitSet();
-            if (initial)
-                bs->incl(0, numBits);
-            for (unsigned i=0; i < numBits; i++)
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        elapsed = msTick()-now;
+        fprintf(stdout, "Bit test [thread-unsafe version] (%u) time taken = %dms\n", initial, elapsed);
+        now = msTick();
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits+5);
+        MemoryBuffer mb;
+        void *mem = mb.reserveTruncate(bitSetMemSz);
+        for (unsigned pass=0; pass < 10000; pass++)
+        {
+            Owned<IBitSet> bs = createBitSet(bitSetMemSz, mem);
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        elapsed = msTick()-now;
+        fprintf(stdout, "Bit test [thread-unsafe version, fixed memory] (%u) time taken = %dms\n", initial, elapsed);
+    }
+
+    class CBitThread : public CSimpleInterfaceOf<IInterface>, implements IThreaded
+    {
+        IBitSet &bitSet;
+        unsigned startBit, numBits;
+        bool initial, setValue, clearValue;
+        CThreaded threaded;
+        Owned<IException> exception;
+        CppUnit::Exception *cppunitException;
+    public:
+        CBitThread(IBitSet &_bitSet, unsigned _startBit, unsigned _numBits, bool _initial)
+            : threaded("CBitThread", this), bitSet(_bitSet), startBit(_startBit), numBits(_numBits), initial(_initial)
+        {
+            cppunitException = NULL;
+            setValue = !initial;
+            clearValue = initial;
+        }
+        void start() { threaded.start(); }
+        void join()
+        {
+            threaded.join();
+            if (exception)
+                throw exception.getClear();
+            else if (cppunitException)
+                throw cppunitException;
+        }
+        virtual void main()
+        {
+            try
             {
             {
-                ASSERT(bs->test(i) == clearValue);
-                bs->set(i, setValue);
-                ASSERT(bs->test(i) == setValue);
-
-                bs->set(i+5, setValue);
-                ASSERT(bs->scan(0, setValue) == i);
-                ASSERT(bs->scan(i+1, setValue) == i+5);
-                bs->set(i, clearValue);
-                bs->set(i+5, clearValue);
-                unsigned match1 = bs->scan(0, setValue);
-                ASSERT(match1 == initial ? -1 : numBits);
-
-                bs->invert(i);
-                ASSERT(bs->test(i) == setValue);
-                bs->invert(i);
-                ASSERT(bs->test(i) == clearValue);
-
-                bool wasSet = bs->testSet(i, setValue);
-                ASSERT(wasSet == clearValue);
-                bool wasSet2 = bs->testSet(i, clearValue);
-                ASSERT(wasSet2 == setValue);
-                ASSERT(bs->test(i) == clearValue);
-
-                bs->set(i, setValue);
-                unsigned match = bs->scanInvert(0, setValue);
-                ASSERT(match == i);
-                ASSERT(bs->test(i) == clearValue);
+                unsigned endBit = startBit+numBits-1;
+                if (initial)
+                    bitSet.incl(startBit, endBit);
+                for (unsigned i=startBit; i < endBit; i++)
+                {
+                    ASSERT(bitSet.test(i) == clearValue);
+                    bitSet.set(i, setValue);
+                    ASSERT(bitSet.test(i) == setValue);
+                    if (i < (endBit-1))
+                        ASSERT(bitSet.scan(i, clearValue) == i+1); // find next unset (should be i+1)
+                    bitSet.set(i, clearValue);
+                    bitSet.invert(i);
+                    ASSERT(bitSet.test(i) == setValue);
+                    bitSet.invert(i);
+                    ASSERT(bitSet.test(i) == clearValue);
+
+                    bool wasSet = bitSet.testSet(i, setValue);
+                    ASSERT(wasSet == clearValue);
+                    bool wasSet2 = bitSet.testSet(i, clearValue);
+                    ASSERT(wasSet2 == setValue);
+                    ASSERT(bitSet.test(i) == clearValue);
+
+                    bitSet.set(i, setValue);
+                    unsigned match = bitSet.scanInvert(startBit, setValue);
+                    ASSERT(match == i);
+                    ASSERT(bitSet.test(i) == clearValue);
+                }
+            }
+            catch (IException *e)
+            {
+                exception.setown(e);
+            }
+            catch (CppUnit::Exception &e)
+            {
+                cppunitException = e.clone();
             }
             }
         }
         }
-        unsigned elapsed = msTick()-now;
-        fprintf(stdout, "Bit test (%u) time taken = %dms\n", initial, elapsed);
+    };
+    unsigned testParallelRun(IBitSet &bitSet, unsigned nThreads, unsigned bitsPerThread, bool initial)
+    {
+        IArrayOf<CBitThread> bitThreads;
+        unsigned bitStart = 0;
+        unsigned bitEnd = 0;
+        for (unsigned t=0; t<nThreads; t++)
+        {
+            bitThreads.append(* new CBitThread(bitSet, bitStart, bitsPerThread, initial));
+            bitStart += bitsPerThread;
+        }
+        unsigned now = msTick();
+        for (unsigned t=0; t<nThreads; t++)
+            bitThreads.item(t).start();
+        Owned<IException> exception;
+        CppUnit::Exception *cppunitException = NULL;
+        for (unsigned t=0; t<nThreads; t++)
+        {
+            try
+            {
+                bitThreads.item(t).join();
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, NULL);
+                if (!exception)
+                    exception.setown(e);
+                else
+                    e->Release();
+            }
+            catch (CppUnit::Exception *e)
+            {
+                cppunitException = e;
+            }
+        }
+        if (exception)
+            throw exception.getClear();
+        else if (cppunitException)
+            throw *cppunitException;
+        return msTick()-now;
+    }
+
+    void testSetParallel(bool initial)
+    {
+        unsigned numBits = 1000000; // 10M
+        unsigned nThreads = getAffinityCpus();
+        unsigned bitsPerThread = numBits/nThreads;
+        bitsPerThread = ((bitsPerThread + (BitsPerItem-1)) / BitsPerItem) * BitsPerItem; // round up to multiple of BitsPerItem
+        numBits = bitsPerThread*nThreads; // round
+
+        fprintf(stdout, "testSetParallel, testing bit set of size : %d, nThreads=%d\n", numBits, nThreads);
+
+        Owned<IBitSet> bitSet = createThreadSafeBitSet();
+        unsigned took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
+        fprintf(stdout, "Thread safe parallel bit set test (%u) time taken = %dms\n", initial, took);
+
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits);
+        MemoryBuffer mb;
+        void *mem = mb.reserveTruncate(bitSetMemSz);
+        bitSet.setown(createBitSet(bitSetMemSz, mem));
+        took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
+        fprintf(stdout, "Thread unsafe parallel bit set test (%u) time taken = %dms\n", initial, took);
     }
     }
 
 
     void testSimple()
     void testSimple()
     {
     {
         testSet(false);
         testSet(false);
         testSet(true);
         testSet(true);
+        testSetParallel(false);
+        testSetParallel(true);
     }
     }
 };
 };
 
 

+ 2 - 2
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -341,8 +341,8 @@ public:
                     localLastPart[subFile] = pnum;
                     localLastPart[subFile] = pnum;
             }
             }
             headerLinesRemaining.allocateN(subFiles);
             headerLinesRemaining.allocateN(subFiles);
-            gotHeaderLines.setown(createBitSet());
-            sentHeaderLines.setown(createBitSet());
+            gotHeaderLines.setown(createThreadSafeBitSet());
+            sentHeaderLines.setown(createThreadSafeBitSet());
         }
         }
         partHandler.setown(new CCsvPartHandler(*this));
         partHandler.setown(new CCsvPartHandler(*this));
         appendOutputLinked(this);
         appendOutputLinked(this);

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -628,7 +628,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
                 try
                 try
                 {
                 {
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
-                    Owned<IBitSet> endRequests = createBitSet(); // NB: verification only
+                    Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
                     while (!aborted)
                     while (!aborted)
                     {
                     {
                         rank_t sender;
                         rank_t sender;
@@ -735,7 +735,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
                     rank_t sender;
                     rank_t sender;
                     CMessageBuffer msg;
                     CMessageBuffer msg;
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
-                    Owned<IBitSet> endRequests = createBitSet(); // NB: verification only
+                    Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
 
 
                     Owned<IRowInterfaces> fetchDiskRowIf = createRowInterfaces(owner.helper->queryDiskRecordSize(),owner.queryActivityId(),owner.queryCodeContext());
                     Owned<IRowInterfaces> fetchDiskRowIf = createRowInterfaces(owner.helper->queryDiskRecordSize(),owner.queryActivityId(),owner.queryCodeContext());
                     while (!aborted)
                     while (!aborted)

+ 85 - 23
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -363,8 +363,8 @@ public:
         allDone = allDoneWaiting = allRequestStop = stopping = stopRecv = false;
         allDone = allDoneWaiting = allRequestStop = stopping = stopRecv = false;
         myNode = activity.queryJob().queryMyRank();
         myNode = activity.queryJob().queryMyRank();
         slaves = activity.queryJob().querySlaves();
         slaves = activity.queryJob().querySlaves();
-        slavesDone.setown(createBitSet());
-        slavesStopping.setown(createBitSet());
+        slavesDone.setown(createThreadSafeBitSet());
+        slavesStopping.setown(createThreadSafeBitSet());
         mpTag = TAG_NULL;
         mpTag = TAG_NULL;
         recvInterface = NULL;
         recvInterface = NULL;
     }
     }
@@ -453,15 +453,13 @@ class CMarker
     CActivityBase &activity;
     CActivityBase &activity;
     NonReentrantSpinLock lock;
     NonReentrantSpinLock lock;
     ICompare *cmp;
     ICompare *cmp;
-    /* Access to bitSet is currently protected by the implementation
-     * Should move over to an implementation that's based on a lump of
-     * roxiemem and ensure that the threads avoid accessing the same bytes/words etc.
-     */
-    Owned<IBitSet> bitSet; // should be roxiemem, so can cause spilling
+    OwnedConstThorRow bitSetMem; // for thread unsafe version
+    Owned<IBitSet> bitSet;
     const void **base;
     const void **base;
     rowidx_t nextChunkStartRow; // Updated as threads request next chunk
     rowidx_t nextChunkStartRow; // Updated as threads request next chunk
     rowidx_t rowCount, chunkSize; // There are configured at start of calculate()
     rowidx_t rowCount, chunkSize; // There are configured at start of calculate()
     rowidx_t parallelMinChunkSize, parallelChunkSize; // Constant, possibly configurable in future
     rowidx_t parallelMinChunkSize, parallelChunkSize; // Constant, possibly configurable in future
+    unsigned threadCount;
 
 
     class CCompareThread : public CInterface, implements IThreaded
     class CCompareThread : public CInterface, implements IThreaded
     {
     {
@@ -498,10 +496,14 @@ class CMarker
     }
     }
     inline void mark(rowidx_t i)
     inline void mark(rowidx_t i)
     {
     {
+        // NB: Thread safe, because markers are dealing with discrete parts of bitSetMem (alighted to bits_t boundaries)
         bitSet->set(i); // mark boundary
         bitSet->set(i); // mark boundary
     }
     }
     rowidx_t doMarking(rowidx_t myStart, rowidx_t myEnd)
     rowidx_t doMarking(rowidx_t myStart, rowidx_t myEnd)
     {
     {
+        // myStart must be on bits_t boundary
+        dbgassertex(0 == (myStart % BitsPerItem));
+
         rowidx_t chunkUnique = 0;
         rowidx_t chunkUnique = 0;
         const void **rows = base+myStart;
         const void **rows = base+myStart;
         rowidx_t i=myStart;
         rowidx_t i=myStart;
@@ -550,20 +552,46 @@ public:
         // perhaps should make these configurable..
         // perhaps should make these configurable..
         parallelMinChunkSize = 1024;
         parallelMinChunkSize = 1024;
         parallelChunkSize = 10*parallelMinChunkSize;
         parallelChunkSize = 10*parallelMinChunkSize;
+        threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
+        if (0 == threadCount)
+            threadCount = getAffinityCpus();
+    }
+    bool init(rowidx_t rowCount)
+    {
+        bool threadSafeBitSet = activity.getOptBool("threadSafeBitSet", false); // for testing only
+        if (threadSafeBitSet)
+        {
+            DBGLOG("Using Thread safe variety of IBitSet");
+            bitSet.setown(createThreadSafeBitSet());
+        }
+        else
+        {
+            size32_t bitSetMemSz = getBitSetMemoryRequirement(rowCount);
+            void *pBitSetMem = activity.queryJob().queryRowManager()->allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
+            if (!pBitSetMem)
+                return false;
+
+            bitSetMem.setown(pBitSetMem);
+            bitSet.setown(createBitSet(bitSetMemSz, pBitSetMem));
+        }
+        return true;
+    }
+    void reset()
+    {
+        bitSet.clear();
     }
     }
     rowidx_t calculate(CThorExpandingRowArray &rows, ICompare *_cmp, bool doSort)
     rowidx_t calculate(CThorExpandingRowArray &rows, ICompare *_cmp, bool doSort)
     {
     {
+        CCycleTimer timer;
+        assertex(bitSet);
         cmp = _cmp;
         cmp = _cmp;
-        unsigned threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
-        if (0 == threadCount)
-            threadCount = getAffinityCpus();
         if (doSort)
         if (doSort)
             rows.sort(*cmp, threadCount);
             rows.sort(*cmp, threadCount);
         rowCount = rows.ordinality();
         rowCount = rows.ordinality();
         if (0 == rowCount)
         if (0 == rowCount)
             return 0;
             return 0;
         base = rows.getRowArray();
         base = rows.getRowArray();
-        bitSet.setown(createBitSet());
+
         rowidx_t uniqueTotal = 0;
         rowidx_t uniqueTotal = 0;
         if ((1 == threadCount) || (rowCount < parallelMinChunkSize))
         if ((1 == threadCount) || (rowCount < parallelMinChunkSize))
             uniqueTotal = doMarking(0, rowCount);
             uniqueTotal = doMarking(0, rowCount);
@@ -578,6 +606,9 @@ public:
                 chunkSize = parallelMinChunkSize;
                 chunkSize = parallelMinChunkSize;
                 threadCount = rowCount / chunkSize;
                 threadCount = rowCount / chunkSize;
             }
             }
+            // Must be multiple of sizeof BitsPerItem
+            chunkSize = ((chunkSize + (BitsPerItem-1)) / BitsPerItem) * BitsPerItem; // round up to nearest multiple of BitsPerItem
+
             /* This is yet another case of requiring a set of small worker threads
             /* This is yet another case of requiring a set of small worker threads
              * Thor should really use a common pool of lightweight threadlets made available to all
              * Thor should really use a common pool of lightweight threadlets made available to all
              * where any particular instances (e.g. lookup) can stipulate min/max it requires etc.
              * where any particular instances (e.g. lookup) can stipulate min/max it requires etc.
@@ -610,6 +641,7 @@ public:
         ++uniqueTotal;
         ++uniqueTotal;
         mark(rowCount-1); // last row is implicitly end of group
         mark(rowCount-1); // last row is implicitly end of group
         cmp = NULL;
         cmp = NULL;
+        DBGLOG("CMarker::calculate - uniqueTotal=%"RIPF"d, took=%d ms", uniqueTotal, timer.elapsedMs());
         return uniqueTotal;
         return uniqueTotal;
     }
     }
     rowidx_t findNextBoundary(rowidx_t start)
     rowidx_t findNextBoundary(rowidx_t start)
@@ -1509,8 +1541,11 @@ protected:
                 bool success=false;
                 bool success=false;
                 try
                 try
                 {
                 {
-                    // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
-                    success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    if (marker.init(rhsRows))
+                    {
+                        // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
+                        success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    }
                 }
                 }
                 catch (IException *e)
                 catch (IException *e)
                 {
                 {
@@ -1610,6 +1645,7 @@ protected:
 
 
                 // If HT sized already and now spilt, too big clear and size when local size known
                 // If HT sized already and now spilt, too big clear and size when local size known
                 clearHT();
                 clearHT();
+                marker.reset();
 
 
                 if (stopping)
                 if (stopping)
                 {
                 {
@@ -1724,7 +1760,7 @@ protected:
                 }
                 }
             }
             }
             else
             else
-                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs, NULL, false));
+                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs));
 
 
             if (!rightStream)
             if (!rightStream)
             {
             {
@@ -1739,16 +1775,40 @@ protected:
 
 
                 rowLoader.clear();
                 rowLoader.clear();
 
 
-                // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
-                rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
-
-                /* Although HT is allocated with a low spill priority, it can still cause callbacks
-                 * so try to allocate before rhs is transferred to spillable collector
-                 */
-                bool htAllocated = setupHT(uniqueKeys);
-                if (!htAllocated)
+                bool success;
+                try
+                {
+                    success = marker.init(rhs.ordinality());
+                }
+                catch (IException *e)
+                {
+                    if (!isSmart())
+                        throw;
+                    switch (e->errorCode())
+                    {
+                    case ROXIEMM_MEMORY_POOL_EXHAUSTED:
+                    case ROXIEMM_MEMORY_LIMIT_EXCEEDED:
+                        e->Release();
+                        break;
+                    default:
+                        throw;
+                    }
+                    success = false;
+                }
+                if (success)
+                {
+                    // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
+                    rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
+                    success = setupHT(uniqueKeys);
+                    if (!success)
+                    {
+                        if (!isSmart())
+                            throw MakeActivityException(this, 0, "Failed to allocate [LOCAL] hash table");
+                    }
+                }
+                if (!success)
                 {
                 {
-                    ActPrintLog("Out of memory trying to allocate the [LOCAL] hash table for a SMART join (%"RIPF"d rows), will now failover to a std hash join", uniqueKeys);
+                    ActPrintLog("Out of memory trying to allocate [LOCAL] tables for a SMART join (%"RIPF"d rows), will now failover to a std hash join", rhs.ordinality());
                     Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
                     Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
                     collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it resorted
                     collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it resorted
                     collector->transferRowsIn(rhs); // can spill after this
                     collector->transferRowsIn(rhs); // can spill after this
@@ -1784,6 +1844,7 @@ protected:
             {
             {
                 ActPrintLog("Performing standard join");
                 ActPrintLog("Performing standard join");
 
 
+                marker.reset();
                 // NB: lhs ordering and grouping lost from here on.. (will have been caught earlier if global)
                 // NB: lhs ordering and grouping lost from here on.. (will have been caught earlier if global)
                 if (grouped)
                 if (grouped)
                     throw MakeActivityException(this, 0, "Degraded to standard join, LHS order cannot be preserved");
                     throw MakeActivityException(this, 0, "Degraded to standard join, LHS order cannot be preserved");
@@ -2225,6 +2286,7 @@ public:
         }
         }
         // Rows now in hash table, rhs arrays no longer needed
         // Rows now in hash table, rhs arrays no longer needed
         _rows.kill();
         _rows.kill();
+        marker.reset();
     }
     }
 };
 };
 
 

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -164,7 +164,7 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
         // similar to sync, but continiously listens for messages from slaves
         // similar to sync, but continiously listens for messages from slaves
         // slave only sends if above threashold, or if was at threshold and non empty
         // slave only sends if above threashold, or if was at threshold and non empty
         // this routine is here to spot when all are whirling around processing nothing for > threshold
         // this routine is here to spot when all are whirling around processing nothing for > threshold
-        Owned<IBitSet> emptyIterations = createBitSet();
+        Owned<IBitSet> emptyIterations = createThreadSafeBitSet();
         unsigned loopEnds = 0;
         unsigned loopEnds = 0;
         unsigned nodes = container.queryJob().querySlaves();
         unsigned nodes = container.queryJob().querySlaves();
         unsigned n = nodes;
         unsigned n = nodes;

+ 2 - 2
thorlcr/graph/thgraph.cpp

@@ -371,11 +371,11 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
         throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
         throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
     alreadyUpdated = false;
     alreadyUpdated = false;
     whichBranch = (unsigned)-1;
     whichBranch = (unsigned)-1;
-    whichBranchBitSet.setown(createBitSet());
+    whichBranchBitSet.setown(createThreadSafeBitSet());
     newWhichBranch = false;
     newWhichBranch = false;
     isEof = false;
     isEof = false;
     log = true;
     log = true;
-    sentActInitData.setown(createBitSet());
+    sentActInitData.setown(createThreadSafeBitSet());
 }
 }
 
 
 CGraphElementBase::~CGraphElementBase()
 CGraphElementBase::~CGraphElementBase()

+ 3 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -334,7 +334,7 @@ void CSlaveMessageHandler::main()
 
 
 CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
 CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
 {
 {
-    notedWarnings = createBitSet();
+    notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
     mpTag = TAG_NULL;
     data = new MemoryBuffer[container.queryJob().querySlaves()];
     data = new MemoryBuffer[container.queryJob().querySlaves()];
     asyncStart = false;
     asyncStart = false;
@@ -652,7 +652,7 @@ public:
         unsigned s=comm->queryGroup().ordinality()-1;
         unsigned s=comm->queryGroup().ordinality()-1;
         bool aborted = false;
         bool aborted = false;
         CMessageBuffer msg;
         CMessageBuffer msg;
-        Owned<IBitSet> raisedSet = createBitSet();
+        Owned<IBitSet> raisedSet = createThreadSafeBitSet();
         unsigned remaining = timeout;
         unsigned remaining = timeout;
         while (s--)
         while (s--)
         {
         {
@@ -1345,7 +1345,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
     }
     }
     if (sendOnly) return;
     if (sendOnly) return;
     unsigned respondents = 0;
     unsigned respondents = 0;
-    Owned<IBitSet> bitSet = createBitSet();
+    Owned<IBitSet> bitSet = createThreadSafeBitSet();
     loop
     loop
     {
     {
         rank_t sender;
         rank_t sender;

+ 2 - 2
thorlcr/master/thmastermain.cpp

@@ -137,7 +137,7 @@ public:
 
 
     CRegistryServer()  : deregistrationWatch(*this), stopped(false)
     CRegistryServer()  : deregistrationWatch(*this), stopped(false)
     {
     {
-        status = createBitSet();
+        status = createThreadSafeBitSet();
         msgDelay = SLAVEREG_VERIFY_DELAY;
         msgDelay = SLAVEREG_VERIFY_DELAY;
         slavesRegistered = 0;
         slavesRegistered = 0;
         if (globals->getPropBool("@watchdogEnabled"))
         if (globals->getPropBool("@watchdogEnabled"))
@@ -201,7 +201,7 @@ public:
         unsigned timeWaited = 0;
         unsigned timeWaited = 0;
         unsigned connected = 0;
         unsigned connected = 0;
         unsigned slaves = queryClusterWidth();
         unsigned slaves = queryClusterWidth();
-        Owned<IBitSet> connectedSet = createBitSet();
+        Owned<IBitSet> connectedSet = createThreadSafeBitSet();
         loop
         loop
         {
         {
             CTimeMon tm(msgDelay);
             CTimeMon tm(msgDelay);

+ 1 - 1
thorlcr/thorutil/thbuf.cpp

@@ -263,7 +263,7 @@ public:
         numblocks = 0;
         numblocks = 0;
         insz = 0;
         insz = 0;
         eoi = false;
         eoi = false;
-        diskfree.setown(createBitSet()); 
+        diskfree.setown(createThreadSafeBitSet()); 
 
 
 #ifdef _FULL_TRACE
 #ifdef _FULL_TRACE
         ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);
         ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);

+ 1 - 1
thorlcr/thorutil/thorport.cpp

@@ -43,7 +43,7 @@ static IBitSet *portmap;
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
 {
     portallocsection = new CriticalSection;
     portallocsection = new CriticalSection;
-    portmap = createBitSet();
+    portmap = createThreadSafeBitSet();
     portmap->set(MPPORT, true);
     portmap->set(MPPORT, true);
     portmap->set(WATCHDOGPORT, true);
     portmap->set(WATCHDOGPORT, true);
     return true;
     return true;