Преглед на файлове

HPCC-12662 Add single threaded version of IBitSet

The pre-existing version protects against any concurrent access
to set/gets for any bit by any thread.
It also uses a flexible/expanding Array for storage.

The version introduced in this commit, is not thread safe,
however it is optimized for speed.
In the form that is given a fixed lump of memory to work with,
it will not expand, but allow concurrent access to the
underlying storage safely, with the prerequisite that threads
do not manipulate the same bits_t space concurrently.
The particular use case it was introduced for, is the CMarker in
lookup join. Using this implementation without locks, allows the
CMarker to sometimes run approximately a 100 times faster.

This change also fixes a bug in the pre-existing IBitSet
implementation, in IBiset->incl(). If the include range started
beyond the existing width of the bit set, the implementation
incorrectly added set range values to the tail of the bitset,
as opposed to pre-padding the bitset up to the start point.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith преди 10 години
родител
ревизия
5ee42c4518
променени са 4 файла, в които са добавени 629 реда и са изтрити 210 реда
  1. 338 155
      system/jlib/jset.cpp
  2. 18 1
      system/jlib/jset.hpp
  3. 190 33
      testing/unittests/jlibtests.cpp
  4. 83 21
      thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

+ 338 - 155
system/jlib/jset.cpp

@@ -23,121 +23,73 @@
 
 //-----------------------------------------------------------------------
 
-// Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
-
-class CBitSet : public CInterface, implements IBitSet
+// NB: The CBitSet* helpers are primarily avoid the need for virtuals in the implementations
+class CBitSetArrayHelper
 {
-public:
-    IMPLEMENT_IINTERFACE;
 protected:
-    //unsigned seems to be most efficient, and required for __builtin_ffs below
-    typedef unsigned bits_t;
-    enum { BitsPerItem = sizeof(bits_t) * 8 };
     ArrayOf<bits_t> bits;
     mutable CriticalSection crit;
 
-public:
-    CBitSet() { }
-    CBitSet(MemoryBuffer &buffer)
-    {
-        deserialize(buffer);
-    }
-    void set(unsigned n,bool val) 
+    inline bits_t &getBitSet(unsigned i, bits_t &tmp) { tmp = bits.item(i); return tmp;}
+    inline void setBitSet(unsigned i, bits_t m)
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        if (i>=bits.ordinality()) {
-            if (!val)
-                return; // don't bother
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-        }
-        else {
-            bits_t m=bits.item(i);
-            if (val)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
+        bits.replace(m, i);
     }
-        
-    bool invert(unsigned n) 
+    inline void addBitSet(bits_t m)
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        bool ret;
-        if (i>=bits.ordinality()) {
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-            ret = true;
-        }
-        else {
-            bits_t m=bits.item(i);
-            ret = ((m&t)==0);
-            if (ret)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
-        return ret;
+        bits.append(m);
     }
-        
-    bool test(unsigned n) 
+    inline unsigned getWidth() const { return bits.ordinality(); }
+};
+
+class CBitSetMemoryHelper
+{
+protected:
+    bits_t *mem;
+    unsigned bitSetUnits;
+    MemoryBuffer mb; // Used if mem not provided, also implies expansion allowed
+    bool fixedMemory;
+
+    CBitSetMemoryHelper()
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        if (i<bits.ordinality()) {
-            bits_t m=bits.item(i);
-            if (m&t)
-                return true;
-        }
-        return false;
+        fixedMemory = false;
+        bitSetUnits = 0;
+        mem = NULL;
     }
-        
-    bool testSet(unsigned n,bool val) 
+    inline bits_t &getBitSet(unsigned i, bits_t &tmp) { return mem[i]; }
+    inline void setBitSet(unsigned i, bits_t m) { } // NOP, getBitset returns ref. in this impl. and the bits_t is set directly
+    inline void addBitSet(bits_t m)
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        bool ret;
-        if (i>=bits.ordinality()) {
-            ret = false;
-            if (!val)
-                return false; // don't bother
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-        }
-        else {
-            bits_t m=bits.item(i);
-            ret = (m&t)!=0;
-            if (val)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
-        return ret;
+        if (fixedMemory)
+            throw MakeStringException(-1, "CBitSetThreadUnsafe with fixed mem cannot expand");
+        mb.append(m);
+        mem = (bits_t *)mb.bufferBase();
+        ++bitSetUnits;
     }
+    inline unsigned getWidth() const { return bitSetUnits; }
+};
+
+template <class BITSETHELPER>
+class CBitSetBase : public BITSETHELPER, public CSimpleInterfaceOf<IBitSet>
+{
+protected:
+    typedef BITSETHELPER PARENT;
+    using PARENT::getWidth;
+    using PARENT::getBitSet;
+    using PARENT::setBitSet;
+    using PARENT::addBitSet;
 
-    unsigned _scan(unsigned from,bool tst,bool scninv)
+    unsigned _scan(unsigned from, bool tst, bool scninv)
     {
         bits_t noMatchMask=tst?0:(bits_t)-1;
         unsigned j=from%BitsPerItem;
-        CriticalBlock block(crit);
         // returns index of first = val >= from
-        unsigned n=bits.ordinality();
+        unsigned n=getWidth();
         unsigned i;
+        bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
         for (i=from/BitsPerItem;i<n;i++)
         {
-            bits_t m=bits.item(i);
+            bits_t &m = getBitSet(i, tmpBitSet);
             if (m!=noMatchMask)
             {
 #if defined(__GNUC__)
@@ -170,7 +122,7 @@ public:
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         m &= ~t;
-                        bits.replace(m,i);
+                        setBitSet(i, m);
                     }
                     return i*BitsPerItem+pos;
                 }
@@ -182,7 +134,7 @@ public:
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         m |= t;
-                        bits.replace(m,i);
+                        setBitSet(i, m);
                     }
                     return i*BitsPerItem+pos;
                 }
@@ -197,7 +149,7 @@ public:
                             if (scninv)
                             {
                                 m &= ~t;
-                                bits.replace(m,i);
+                                setBitSet(i, m);
                             }
                             return i*BitsPerItem+j;
                         }
@@ -209,7 +161,7 @@ public:
                             if (scninv)
                             {
                                 m |= t;
-                                bits.replace(m,i);
+                                setbitSet(i, m);
                             }
                             return i*BitsPerItem+j;
                         }
@@ -220,7 +172,7 @@ public:
             }
             j = 0;
         }
-        if (tst) 
+        if (tst)
             return (unsigned)-1;
         unsigned ret = n*BitsPerItem;
         if (n*BitsPerItem<from)
@@ -229,93 +181,198 @@ public:
             set(ret,true);
         return ret;
     }
-
-    unsigned scan(unsigned from,bool tst)
-    {
-        return _scan(from,tst,false);
-    }
-
-    unsigned scanInvert(unsigned from,bool tst) // like scan but inverts bit as well
-    {
-        return _scan(from,tst,true);
-    }
-
-    void _incl(unsigned lo, unsigned hi,bool val)
+    void _incl(unsigned lo, unsigned hi, bool val)
     {
         if (hi<lo)
             return;
         unsigned j=lo%BitsPerItem;
         unsigned nb=(hi-lo)+1;
-        CriticalBlock block(crit);
-        unsigned n=bits.ordinality();
-        unsigned i;
-        for (i=lo/BitsPerItem;i<n;i++) {
-            bits_t m;
-            if ((nb>=BitsPerItem)&&(j==0)) {
-                m = i;
-                nb -= BitsPerItem;
+        unsigned n=getWidth();
+        unsigned i=lo/BitsPerItem;
+        if (n<=i)
+        {
+            if (val)
+            {
+                while (n < i)
+                {
+                    addBitSet(0);
+                    ++n;
+                }
             }
-            else {
-                m=bits.item(i);
-                bits_t t = ((bits_t)1)<<j;
-                for (;j<BitsPerItem;j++) {
+        }
+        else
+        {
+            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
+            for (;i<n;i++)
+            {
+                bits_t &m = getBitSet(i, tmpBitSet);
+                if ((nb>=BitsPerItem)&&(j==0))
+                {
                     if (val)
-                        m |= t;
+                        m = (bits_t)-1;
                     else
-                        m &= ~t;
-                    if (--nb==0)
-                        break;
-                    t <<= 1;
+                        m = 0;
+                    nb -= BitsPerItem;
+                }
+                else
+                {
+                    bits_t t = ((bits_t)1)<<j;
+                    for (;j<BitsPerItem;j++)
+                    {
+                        if (val)
+                            m |= t;
+                        else
+                            m &= ~t;
+                        if (--nb==0)
+                            break;
+                        t <<= 1;
+                    }
                 }
+                setBitSet(i, m);
+                if (nb==0)
+                    return;
+                j = 0;
             }
-            bits.replace(m,i);
-            if (nb==0)
-                return;
-            j = 0;
         }
-        if (val) {
-            while (nb>=BitsPerItem) {
-                bits.append((bits_t)-1);
-                nb-=BitsPerItem;
+        if (val)
+        {
+            while (nb>=BitsPerItem)
+            {
+                addBitSet((bits_t)-1);
+                nb -= BitsPerItem;
             }
-            if (nb>0) {
+            if (nb>0)
+            {
                 bits_t m=0;
                 bits_t t = ((bits_t)1)<<j;
-                for (;j<BitsPerItem;j++) {
+                for (;j<BitsPerItem;j++)
+                {
                     m |= t;
                     if (--nb==0)
                         break;
                     t <<= 1;
                 }
-                bits.append(m);
+                addBitSet(m);
             }
         }
     }
-
-    void incl(unsigned lo, unsigned hi)
+public:
+// IBitSet impl.
+    virtual void set(unsigned n, bool val)
     {
-        _incl(lo,hi,true);
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i = n/BitsPerItem;
+        if (i>=getWidth())
+        {
+            if (!val)
+                return; // don't bother
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+        }
+        else
+        {
+            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
+            bits_t &m = getBitSet(i, tmpBitSet);
+            if (val)
+                m |= t;
+            else
+                m &= ~t;
+            setBitSet(i, m);
+        }
     }
-
-    void excl(unsigned lo, unsigned hi)
+    virtual bool invert(unsigned n)
     {
-        _incl(lo,hi,false);
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        bool ret;
+        if (i>=getWidth())
+        {
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+            ret = true;
+        }
+        else
+        {
+            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
+            bits_t &m = getBitSet(i, tmpBitSet);
+            ret = 0 == (m&t);
+            if (ret)
+                m |= t;
+            else
+                m &= ~t;
+            setBitSet(i, m);
+        }
+        return ret;
     }
-
-    void reset()
+    virtual bool test(unsigned n)
     {
-        CriticalBlock block(crit);
-        bits.kill();
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        if (i<getWidth())
+        {
+            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
+            bits_t &m = getBitSet(i, tmpBitSet);
+            if (m&t)
+                return true;
+        }
+        return false;
     }
-
-    void serialize(MemoryBuffer &buffer) const
+    virtual bool testSet(unsigned n, bool val)
     {
-        CriticalBlock block(crit);
-        buffer.append(bits.ordinality());
-        ForEachItemIn(b, bits)
-            buffer.append(bits.item(b));
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        bool ret;
+        if (i>=getWidth())
+        {
+            ret = false;
+            if (!val)
+                return false; // don't bother
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+        }
+        else
+        {
+            bits_t tmpBitSet; // NB: for getBitSet, not all impls. will use it
+            bits_t &m = getBitSet(i, tmpBitSet);
+            ret = 0 != (m&t);
+            if (val)
+                m |= t;
+            else
+                m &= ~t;
+            setBitSet(i, m);
+        }
+        return ret;
+    }
+    virtual unsigned scan(unsigned from,bool tst)
+    {
+        return _scan(from,tst,false);
     }
+    virtual unsigned scanInvert(unsigned from,bool tst) // like scan but inverts bit as well
+    {
+        return _scan(from,tst,true);
+    }
+    virtual void incl(unsigned lo, unsigned hi)
+    {
+        _incl(lo,hi,true);
+    }
+    virtual void excl(unsigned lo, unsigned hi)
+    {
+        _incl(lo,hi,false);
+    }
+};
 
+size32_t getBitSetMemoryRequirement(unsigned numBits)
+{
+    unsigned bitSetUnits = (numBits + (BitsPerItem-1)) / BitsPerItem;
+    return bitSetUnits * sizeof(bits_t);
+}
+
+// Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
+class CBitSet : public CBitSetBase<CBitSetArrayHelper>
+{
     void deserialize(MemoryBuffer &buffer)
     {
         CriticalBlock block(crit);
@@ -333,6 +390,67 @@ public:
             }
         }
     }
+public:
+    CBitSet()
+    {
+    }
+    CBitSet(MemoryBuffer &buffer)
+    {
+        deserialize(buffer);
+    }
+// IBitSet overloads
+    virtual void set(unsigned n, bool val)
+    {
+        CriticalBlock block(crit);
+        CBitSetBase::set(n, val);
+    }
+    virtual bool invert(unsigned n)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::invert(n);
+    }
+    virtual bool test(unsigned n)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::test(n);
+    }
+    virtual bool testSet(unsigned n, bool val)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::testSet(n, val);
+    }
+    virtual unsigned scan(unsigned from, bool tst)
+    {
+        CriticalBlock block(crit);
+        return _scan(from,tst,false);
+    }
+    virtual unsigned scanInvert(unsigned from, bool tst) // like scan but inverts bit as well
+    {
+        CriticalBlock block(crit);
+        return _scan(from,tst,true);
+    }
+    virtual void incl(unsigned lo, unsigned hi)
+    {
+        CriticalBlock block(crit);
+        _incl(lo,hi,true);
+    }
+    virtual void excl(unsigned lo, unsigned hi)
+    {
+        CriticalBlock block(crit);
+        _incl(lo,hi,false);
+    }
+    virtual void reset()
+    {
+        CriticalBlock block(crit);
+        bits.kill();
+    }
+    virtual void serialize(MemoryBuffer &buffer) const
+    {
+        CriticalBlock block(crit);
+        buffer.append(bits.ordinality());
+        ForEachItemIn(b, bits)
+            buffer.append(bits.item(b));
+    }
 };
 
 extern jlib_decl IBitSet *createBitSet()
@@ -340,10 +458,75 @@ extern jlib_decl IBitSet *createBitSet()
     return new CBitSet();
 }
 
+
+class CBitSetThreadUnsafe : public CBitSetBase<CBitSetMemoryHelper>
+{
+    void deserialize(MemoryBuffer &buffer)
+    {
+        unsigned count;
+        buffer.read(count);
+        if (count)
+        {
+            unsigned bitSets = count/BitsPerItem;
+            bitSetUnits = bitSets;
+            mem = (bits_t *)mb.reserveTruncate(bitSets*sizeof(bits_t));
+        }
+        else
+        {
+            bitSetUnits = 0;
+            mem = NULL;
+        }
+        fixedMemory = false;
+    }
+public:
+    CBitSetThreadUnsafe()
+    {
+       // In this form, bitSetUnits and mem will be updated when addBitSet expands mb
+    }
+    CBitSetThreadUnsafe(size32_t memSz, const void *_mem, bool reset)
+    {
+        bitSetUnits = memSz*sizeof(byte) / sizeof(bits_t);
+        mem = (bits_t *)_mem;
+        if (reset)
+            memset(mem, 0, bitSetUnits*sizeof(bits_t));
+        fixedMemory = true;
+    }
+    CBitSetThreadUnsafe(MemoryBuffer &buffer)
+    {
+        deserialize(buffer);
+    }
+    virtual void reset()
+    {
+        memset(mem, 0, sizeof(bits_t)*bitSetUnits);
+    }
+    virtual void serialize(MemoryBuffer &buffer) const
+    {
+        buffer.append((unsigned)(BitsPerItem*bitSetUnits));
+        buffer.append(bitSetUnits*sizeof(bits_t), mem);
+    }
+};
+
+extern jlib_decl IBitSet *createBitSetThreadUnsafe(unsigned maxBits, const void *mem, bool reset)
+{
+    return new CBitSetThreadUnsafe(maxBits, mem, reset);
+}
+
+extern jlib_decl IBitSet *createBitSetThreadUnsafe()
+{
+    return new CBitSetThreadUnsafe();
+}
+
+
+// NB: Doubt you'd want to interchange, but serialization formats are compatible
 extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb)
 {
     return new CBitSet(mb);
 }
 
+extern jlib_decl IBitSet *deserializeIBitSetThreadUnsafe(MemoryBuffer &mb)
+{
+    return new CBitSetThreadUnsafe(mb);
+}
+
 
 

+ 18 - 1
system/jlib/jset.hpp

@@ -40,9 +40,26 @@ interface jlib_decl IBitSet : public IInterface
 
 extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb);
 
+// type of underlying bit storage, exposed so thread-unsafe version can know boundaries
+typedef unsigned bits_t;
+enum { BitsPerItem = sizeof(bits_t) * 8 };
+
+
+// returns number of bytes required to represent numBits in memory
+extern jlib_decl size32_t getBitSetMemoryRequirement(unsigned numBits);
+
 // Simple BitSet // 0 based, all intermediate items exist, operations threadsafe and atomic
-extern jlib_decl IBitSet *createBitSet(); 
+extern jlib_decl IBitSet *createBitSet();
+
+/* Thread unsafe, but can be significantly faster.
+ * Client provide a fixed block of memory used for the bit set, threads must ensure they do not set bits
+ * in parallel within the same bits_t space.
+ * IOW, e.g. bits 0-sizeof(bits_t) must be set from only 1 thread at a time.
+ */
+extern jlib_decl IBitSet *createBitSetThreadUnsafe(size32_t memSize, const void *mem, bool reset=true);
 
+// This form allows the size of the bit set to be dynamic, but there are no guarantees about threading
+extern jlib_decl IBitSet *createBitSetThreadUnsafe();
 
 
 

+ 190 - 33
testing/unittests/jlibtests.cpp

@@ -95,56 +95,213 @@ public:
 
 protected:
 
+    void testSet1(bool initial, IBitSet *bs, unsigned start, unsigned numBits, bool setValue, bool clearValue)
+    {
+        unsigned end = start+numBits;
+        if (initial)
+            bs->incl(start, end);
+        for (unsigned i=start; i < end; i++)
+        {
+            ASSERT(bs->test(i) == clearValue);
+            bs->set(i, setValue);
+            ASSERT(bs->test(i) == setValue);
+
+            bs->set(i+5, setValue);
+            ASSERT(bs->scan(0, setValue) == i);
+            ASSERT(bs->scan(i+1, setValue) == i+5);
+            bs->set(i, clearValue);
+            bs->set(i+5, clearValue);
+            unsigned match1 = bs->scan(0, setValue);
+            ASSERT(match1 == initial ? -1 : end);
+
+            bs->invert(i);
+            ASSERT(bs->test(i) == setValue);
+            bs->invert(i);
+            ASSERT(bs->test(i) == clearValue);
+
+            bool wasSet = bs->testSet(i, setValue);
+            ASSERT(wasSet == clearValue);
+            bool wasSet2 = bs->testSet(i, clearValue);
+            ASSERT(wasSet2 == setValue);
+            ASSERT(bs->test(i) == clearValue);
+
+            bs->set(i, setValue);
+            unsigned match = bs->scanInvert(0, setValue);
+            ASSERT(match == i);
+            ASSERT(bs->test(i) == clearValue);
+        }
+    }
+
     void testSet(bool initial)
     {
         unsigned now = msTick();
         bool setValue = !initial;
         bool clearValue = initial;
         const unsigned numBits = 400;
-        for (unsigned pass=0; pass< 10000; pass++)
+        for (unsigned pass=0; pass < 10000; pass++)
         {
             Owned<IBitSet> bs = createBitSet();
-            if (initial)
-                bs->incl(0, numBits);
-            for (unsigned i=0; i < numBits; i++)
-            {
-                ASSERT(bs->test(i) == clearValue);
-                bs->set(i, setValue);
-                ASSERT(bs->test(i) == setValue);
-
-                bs->set(i+5, setValue);
-                ASSERT(bs->scan(0, setValue) == i);
-                ASSERT(bs->scan(i+1, setValue) == i+5);
-                bs->set(i, clearValue);
-                bs->set(i+5, clearValue);
-                unsigned match1 = bs->scan(0, setValue);
-                ASSERT(match1 == initial ? -1 : numBits);
-
-                bs->invert(i);
-                ASSERT(bs->test(i) == setValue);
-                bs->invert(i);
-                ASSERT(bs->test(i) == clearValue);
-
-                bool wasSet = bs->testSet(i, setValue);
-                ASSERT(wasSet == clearValue);
-                bool wasSet2 = bs->testSet(i, clearValue);
-                ASSERT(wasSet2 == setValue);
-                ASSERT(bs->test(i) == clearValue);
-
-                bs->set(i, setValue);
-                unsigned match = bs->scanInvert(0, setValue);
-                ASSERT(match == i);
-                ASSERT(bs->test(i) == clearValue);
-            }
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
         }
         unsigned elapsed = msTick()-now;
+        now = msTick();
         fprintf(stdout, "Bit test (%u) time taken = %dms\n", initial, elapsed);
+        for (unsigned pass=0; pass < 10000; pass++)
+        {
+            Owned<IBitSet> bs = createBitSetThreadUnsafe();
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        elapsed = msTick()-now;
+        now = msTick();
+        fprintf(stdout, "Bit test [thread-unsafe version] (%u) time taken = %dms\n", initial, elapsed);
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(400);
+        MemoryBuffer mb;
+        void *mem = mb.reserveTruncate(bitSetMemSz);
+        for (unsigned pass=0; pass < 10000; pass++)
+        {
+            Owned<IBitSet> bs = createBitSetThreadUnsafe(bitSetMemSz, mem);
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        elapsed = msTick()-now;
+        fprintf(stdout, "Bit test [thread-unsafe version, fixed memory] (%u) time taken = %dms\n", initial, elapsed);
+    }
+
+    class CBitThread : public CSimpleInterfaceOf<IInterface>, implements IThreaded
+    {
+        IBitSet &bitSet;
+        unsigned startBit, numBits;
+        bool initial, setValue, clearValue;
+        CThreaded threaded;
+        Owned<IException> exception;
+        CppUnit::Exception *cppunitException;
+    public:
+        CBitThread(IBitSet &_bitSet, unsigned _startBit, unsigned _numBits, bool _initial)
+            : threaded("CBitThread", this), bitSet(_bitSet), startBit(_startBit), numBits(_numBits), initial(_initial)
+        {
+            cppunitException = NULL;
+            setValue = !initial;
+            clearValue = initial;
+        }
+        void start() { threaded.start(); }
+        void join()
+        {
+            threaded.join();
+            if (exception)
+                throw exception.getClear();
+            else if (cppunitException)
+                throw cppunitException;
+        }
+        virtual void main()
+        {
+            try
+            {
+                unsigned endBit = startBit+numBits-1;
+                if (initial)
+                    bitSet.incl(startBit, endBit);
+                for (unsigned i=startBit; i < endBit; i++)
+                {
+                    ASSERT(bitSet.test(i) == clearValue);
+                    bitSet.set(i, setValue);
+                    ASSERT(bitSet.test(i) == setValue);
+                    if (i < (endBit-1))
+                        ASSERT(bitSet.scan(i, clearValue) == i+1); // find next unset (should be i+1)
+                    bitSet.set(i, clearValue);
+                    bitSet.invert(i);
+                    ASSERT(bitSet.test(i) == setValue);
+                    bitSet.invert(i);
+                    ASSERT(bitSet.test(i) == clearValue);
+
+                    bool wasSet = bitSet.testSet(i, setValue);
+                    ASSERT(wasSet == clearValue);
+                    bool wasSet2 = bitSet.testSet(i, clearValue);
+                    ASSERT(wasSet2 == setValue);
+                    ASSERT(bitSet.test(i) == clearValue);
+
+                    bitSet.set(i, setValue);
+                    unsigned match = bitSet.scanInvert(startBit, setValue);
+                    ASSERT(match == i);
+                    ASSERT(bitSet.test(i) == clearValue);
+                }
+            }
+            catch (IException *e)
+            {
+                exception.setown(e);
+            }
+            catch (CppUnit::Exception &e)
+            {
+                cppunitException = e.clone();
+            }
+        }
+    };
+    unsigned testParallelRun(IBitSet &bitSet, unsigned nThreads, unsigned bitsPerThread, bool initial)
+    {
+        IArrayOf<CBitThread> bitThreads;
+        unsigned bitStart = 0;
+        unsigned bitEnd = 0;
+        for (unsigned t=0; t<nThreads; t++)
+        {
+            bitThreads.append(* new CBitThread(bitSet, bitStart, bitsPerThread, initial));
+            bitStart += bitsPerThread;
+        }
+        unsigned now = msTick();
+        for (unsigned t=0; t<nThreads; t++)
+            bitThreads.item(t).start();
+        Owned<IException> exception;
+        CppUnit::Exception *cppunitException = NULL;
+        for (unsigned t=0; t<nThreads; t++)
+        {
+            try
+            {
+                bitThreads.item(t).join();
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, NULL);
+                if (!exception)
+                    exception.setown(e);
+                else
+                    e->Release();
+            }
+            catch (CppUnit::Exception *e)
+            {
+                cppunitException = e;
+            }
+        }
+        if (exception)
+            throw exception.getClear();
+        else if (cppunitException)
+            throw *cppunitException;
+        return msTick()-now;
+    }
+
+    void testSetParallel(bool initial)
+    {
+        unsigned numBits = 1000000; // 10M
+        unsigned nThreads = getAffinityCpus();
+        unsigned bitsPerThread = numBits/nThreads;
+        bitsPerThread = (bitsPerThread + (BitsPerItem-1)) / BitsPerItem * BitsPerItem; // round up to multiple of BitsPerItem
+        numBits = bitsPerThread*nThreads; // round
+
+        fprintf(stdout, "testSetParallel, testing bit set of size : %d, nThreads=%d\n", numBits, nThreads);
+
+        Owned<IBitSet> bitSet = createBitSet();
+        unsigned took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
+        fprintf(stdout, "Thread safe parallel bit set test (%u) time taken = %dms\n", initial, took);
+
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits);
+        MemoryBuffer mb;
+        void *mem = mb.reserveTruncate(bitSetMemSz);
+        bitSet.setown(createBitSetThreadUnsafe(bitSetMemSz, mem));
+        took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
+        fprintf(stdout, "Thread unsafe parallel bit set test (%u) time taken = %dms\n", initial, took);
     }
 
     void testSimple()
     {
         testSet(false);
         testSet(true);
+        testSetParallel(false);
+        testSetParallel(true);
     }
 };
 

+ 83 - 21
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -453,15 +453,13 @@ class CMarker
     CActivityBase &activity;
     NonReentrantSpinLock lock;
     ICompare *cmp;
-    /* Access to bitSet is currently protected by the implementation
-     * Should move over to an implementation that's based on a lump of
-     * roxiemem and ensure that the threads avoid accessing the same bytes/words etc.
-     */
-    Owned<IBitSet> bitSet; // should be roxiemem, so can cause spilling
+    OwnedConstThorRow bitSetMem; // for thread unsafe version
+    Owned<IBitSet> bitSet;
     const void **base;
     rowidx_t nextChunkStartRow; // Updated as threads request next chunk
     rowidx_t rowCount, chunkSize; // There are configured at start of calculate()
     rowidx_t parallelMinChunkSize, parallelChunkSize; // Constant, possibly configurable in future
+    unsigned threadCount;
 
     class CCompareThread : public CInterface, implements IThreaded
     {
@@ -498,10 +496,14 @@ class CMarker
     }
     inline void mark(rowidx_t i)
     {
+        // NB: Thread safe, because markers are dealing with discrete parts of bitSetMem (alighted to bits_t boundaries)
         bitSet->set(i); // mark boundary
     }
     rowidx_t doMarking(rowidx_t myStart, rowidx_t myEnd)
     {
+        // myStart must be on bits_t boundary
+        dbgassertex(0 == (myStart % BitsPerItem));
+
         rowidx_t chunkUnique = 0;
         const void **rows = base+myStart;
         rowidx_t i=myStart;
@@ -550,20 +552,46 @@ public:
         // perhaps should make these configurable..
         parallelMinChunkSize = 1024;
         parallelChunkSize = 10*parallelMinChunkSize;
+        threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
+        if (0 == threadCount)
+            threadCount = getAffinityCpus();
+    }
+    bool init(rowidx_t rowCount)
+    {
+        bool threadSafeBitSet = activity.getOptBool("threadSafeBitSet", false); // for testing only
+        if (threadSafeBitSet)
+        {
+            DBGLOG("Using Thread safe variety of IBitSet");
+            bitSet.setown(createBitSet());
+        }
+        else
+        {
+            size32_t bitSetMemSz = getBitSetMemoryRequirement(rowCount);
+            void *pBitSetMem = activity.queryJob().queryRowManager()->allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
+            if (!pBitSetMem)
+                return false;
+
+            bitSetMem.setown(pBitSetMem);
+            bitSet.setown(createBitSetThreadUnsafe(bitSetMemSz, pBitSetMem));
+        }
+        return true;
+    }
+    void reset()
+    {
+        bitSet.clear();
     }
     rowidx_t calculate(CThorExpandingRowArray &rows, ICompare *_cmp, bool doSort)
     {
+        CCycleTimer timer;
+        assertex(bitSet);
         cmp = _cmp;
-        unsigned threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
-        if (0 == threadCount)
-            threadCount = getAffinityCpus();
         if (doSort)
             rows.sort(*cmp, threadCount);
         rowCount = rows.ordinality();
         if (0 == rowCount)
             return 0;
         base = rows.getRowArray();
-        bitSet.setown(createBitSet());
+
         rowidx_t uniqueTotal = 0;
         if ((1 == threadCount) || (rowCount < parallelMinChunkSize))
             uniqueTotal = doMarking(0, rowCount);
@@ -578,6 +606,9 @@ public:
                 chunkSize = parallelMinChunkSize;
                 threadCount = rowCount / chunkSize;
             }
+            // Must be multiple of sizeof BitsPerItem
+            chunkSize = (chunkSize + (BitsPerItem-1)) / BitsPerItem * BitsPerItem; // round up to nearest multiple of BitsPerItem
+
             /* This is yet another case of requiring a set of small worker threads
              * Thor should really use a common pool of lightweight threadlets made available to all
              * where any particular instances (e.g. lookup) can stipulate min/max it requires etc.
@@ -610,6 +641,7 @@ public:
         ++uniqueTotal;
         mark(rowCount-1); // last row is implicitly end of group
         cmp = NULL;
+        DBGLOG("CMarker::calculate - uniqueTotal=%"RIPF"d, took=%d ms", uniqueTotal, timer.elapsedMs());
         return uniqueTotal;
     }
     rowidx_t findNextBoundary(rowidx_t start)
@@ -1509,8 +1541,11 @@ protected:
                 bool success=false;
                 try
                 {
-                    // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
-                    success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    if (marker.init(rhsRows))
+                    {
+                        // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
+                        success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    }
                 }
                 catch (IException *e)
                 {
@@ -1610,6 +1645,7 @@ protected:
 
                 // If HT sized already and now spilt, too big clear and size when local size known
                 clearHT();
+                marker.reset();
 
                 if (stopping)
                 {
@@ -1724,7 +1760,7 @@ protected:
                 }
             }
             else
-                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs, NULL, false));
+                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs));
 
             if (!rightStream)
             {
@@ -1739,16 +1775,40 @@ protected:
 
                 rowLoader.clear();
 
-                // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
-                rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
-
-                /* Although HT is allocated with a low spill priority, it can still cause callbacks
-                 * so try to allocate before rhs is transferred to spillable collector
-                 */
-                bool htAllocated = setupHT(uniqueKeys);
-                if (!htAllocated)
+                bool success;
+                try
+                {
+                    success = marker.init(rhs.ordinality());
+                }
+                catch (IException *e)
+                {
+                    if (!isSmart())
+                        throw;
+                    switch (e->errorCode())
+                    {
+                    case ROXIEMM_MEMORY_POOL_EXHAUSTED:
+                    case ROXIEMM_MEMORY_LIMIT_EXCEEDED:
+                        e->Release();
+                        break;
+                    default:
+                        throw;
+                    }
+                    success = false;
+                }
+                if (success)
+                {
+                    // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
+                    rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
+                    success = setupHT(uniqueKeys);
+                    if (!success)
+                    {
+                        if (!isSmart())
+                            throw MakeActivityException(this, 0, "Failed to allocate [LOCAL] hash table");
+                    }
+                }
+                if (!success)
                 {
-                    ActPrintLog("Out of memory trying to allocate the [LOCAL] hash table for a SMART join (%"RIPF"d rows), will now failover to a std hash join", uniqueKeys);
+                    ActPrintLog("Out of memory trying to allocate [LOCAL] tables for a SMART join (%"RIPF"d rows), will now failover to a std hash join", rhs.ordinality());
                     Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
                     collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it resorted
                     collector->transferRowsIn(rhs); // can spill after this
@@ -1784,6 +1844,7 @@ protected:
             {
                 ActPrintLog("Performing standard join");
 
+                marker.reset();
                 // NB: lhs ordering and grouping lost from here on.. (will have been caught earlier if global)
                 if (grouped)
                     throw MakeActivityException(this, 0, "Degraded to standard join, LHS order cannot be preserved");
@@ -2225,6 +2286,7 @@ public:
         }
         // Rows now in hash table, rhs arrays no longer needed
         _rows.kill();
+        marker.reset();
     }
 };