浏览代码

Merge remote-tracking branch 'origin/candidate-5.2.0'

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>

Conflicts:
	system/jlib/jcomp.cpp
Gavin Halliday 10 年之前
父节点
当前提交
95b9450419

+ 1 - 1
common/remote/sockfile.cpp

@@ -773,7 +773,7 @@ struct CTreeCopyItem: public CInterface
         loc.append(orig);
         dt.set(_dt);
         sz = _sz;
-        busy.setown(createBitSet());
+        busy.setown(createThreadSafeBitSet());
         lastused = msTick();
     }
     bool equals(const RemoteFilename &orig, const char *_net, const char *_mask, offset_t _sz, CDateTime &_dt) 

+ 1 - 1
common/thorhelper/thorstep.cpp

@@ -494,7 +494,7 @@ CFilteredInputBuffer::CFilteredInputBuffer(IEngineRowAllocator * _allocator, IRa
     stepCompare = _stepCompare;
     equalCompare = _equalCompare;
     input = _input;
-    matched.setown(createBitSet());
+    matched.setown(createThreadSafeBitSet());
     numMatched = 0;
     readIndex = 0;
     numEqualFields = _numEqualFields;

+ 1 - 1
common/thorhelper/thorstep.ipp

@@ -824,7 +824,7 @@ public:
     const void * nextUnqueued();
     bool ensureNonEmpty();
     bool flushUnmatched();
-    void trackUnmatched() { matchedLeft.setown(createBitSet()); }
+    void trackUnmatched() { matchedLeft.setown(createThreadSafeBitSet()); }
 
     inline void consumeNextInput() { rows.enqueue(input->consume()); }
 

+ 1 - 1
dali/base/dautils.cpp

@@ -1936,7 +1936,7 @@ public:
     CPECacheElem(const char *owner, ISortedElementsTreeFilter *_postFilter)
         : CTimedCacheItem(owner), postFilter(_postFilter), postFiltered(0)
     {
-        passesFilter.setown(createBitSet());
+        passesFilter.setown(createThreadSafeBitSet());
     }
     ~CPECacheElem()
     {

+ 1 - 1
dali/sasha/saxref.cpp

@@ -1762,7 +1762,7 @@ public:
                 unsigned numsub = file.getPropInt("@numsubfiles");
                 unsigned n = 0;
                 Owned<IPropertyTreeIterator> iter = file.getElements("SubFile");
-                Owned<IBitSet> parts = createBitSet();
+                Owned<IBitSet> parts = createThreadSafeBitSet();
                 StringArray subname;
                 ForEach(*iter) {
                     IPropertyTree &sfile = iter->query();

+ 1 - 1
ecl/hql/hqlgram.hpp

@@ -463,7 +463,7 @@ public:
     void checkIndexFieldType(IHqlExpression * cur, bool isPayload, bool insideNestedRecord, const attribute & errpos);
     void checkIndexRecordType(IHqlExpression * record, unsigned numPayloadFields, bool insideNestedRecord, const attribute & errpos);
     void checkIndexRecordTypes(IHqlExpression * index, const attribute & errpos);
-    void reportIndexFieldType(IHqlExpression * expr, bool isPayload, const attribute & errpos);
+    void reportInvalidIndexFieldType(IHqlExpression * expr, bool isPayload, const attribute & errpos);
     void reportUnsupportedFieldType(ITypeInfo * type, const attribute & errpos);
     void checkCaseForDuplicates(HqlExprArray & exprs, attribute &err);
     void checkOnFailRecord(IHqlExpression * expr, attribute & errpos);

+ 15 - 20
ecl/hql/hqlgram2.cpp

@@ -6965,14 +6965,15 @@ void HqlGram::reportUnsupportedFieldType(ITypeInfo * type, const attribute & err
     reportError(ERR_INDEX_BADTYPE, errpos, "Fields of type %s are not currently supported", s.str());
 }
 
-void HqlGram::reportIndexFieldType(IHqlExpression * expr, bool isKeyed, const attribute & errpos)
+void HqlGram::reportInvalidIndexFieldType(IHqlExpression * expr, bool isKeyed, const attribute & errpos)
 {
+    const char * id = expr->queryId()->str();
     StringBuffer s;
     getFriendlyTypeStr(expr, s);
     if (isKeyed)
-        reportError(ERR_INDEX_BADTYPE, errpos, "INDEX does not currently support keyed fields of type '%s'", s.str());
+        reportError(ERR_INDEX_BADTYPE, errpos, "INDEX does not currently support keyed field (%s) of type '%s'", id, s.str());
     else
-        reportError(ERR_INDEX_BADTYPE, errpos, "INDEX does not currently support fields of type '%s'", s.str());
+        reportError(ERR_INDEX_BADTYPE, errpos, "INDEX does not currently support field (%s) of type '%s'", id, s.str());
 }
 
 void HqlGram::checkIndexFieldType(IHqlExpression * expr, bool isPayload, bool insideNestedRecord, const attribute & errpos)
@@ -6984,19 +6985,20 @@ void HqlGram::checkIndexFieldType(IHqlExpression * expr, bool isPayload, bool in
         {
             ITypeInfo * type = expr->queryType();
             IIdAtom * id = expr->queryId();
-            switch (type->getTypeCode())
+            type_t tc = type->getTypeCode();
+            switch (tc)
             {
             case type_real:
                 if (!isPayload)
-                    reportIndexFieldType(expr, true, errpos);
+                    reportInvalidIndexFieldType(expr, true, errpos);
                 break;
             case type_decimal:
                 if (!isPayload && type->isSigned())
-                    reportIndexFieldType(expr, true, errpos);
+                    reportInvalidIndexFieldType(expr, true, errpos);
                 break;
             case type_bitfield:
             case type_any:
-                reportIndexFieldType(expr, false, errpos);
+                reportInvalidIndexFieldType(expr, false, errpos);
                 break;
             case type_record:
                 throwUnexpected();
@@ -7008,34 +7010,27 @@ void HqlGram::checkIndexFieldType(IHqlExpression * expr, bool isPayload, bool in
                     break;
                 }
             case type_dictionary:
-                if (!variableOk || !isPayload)
-                    reportError(ERR_INDEX_BADTYPE, errpos, "Dictionaries (%s) are not supported inside indexes", id->str());
-                break;
             case type_table:
             case type_groupedtable:
-                if (!variableOk)
-                    reportError(ERR_INDEX_BADTYPE, errpos, "Datasets (%s) are not supported inside indexes", id->str());
-                break;
             case type_packedint:
-                if (!isPayload)
-                    reportError(ERR_INDEX_BADTYPE, errpos, "PACKED integers (%s) are not supported inside indexes", id->str());
-                break;
             case type_set:
-                if (!variableOk)
-                    reportError(ERR_INDEX_BADTYPE, errpos, "SETS (%s) are not supported inside indexes", id->str());
+            case type_varstring:
+            case type_varunicode:
+                if (!isPayload)
+                    reportInvalidIndexFieldType(expr, true, errpos);
                 break;
             case type_int:
             case type_swapint:
                 if (!isPayload && insideNestedRecord)
                 {
                     if (type->isSigned() ||
-                        ((type->getTypeCode() == type_littleendianint) && (type->getSize() != 1)))
+                        ((tc == type_littleendianint) && (type->getSize() != 1)))
                         reportError(ERR_INDEX_BADTYPE, errpos.pos, "Signed or little-endian field %s is not supported inside a keyed record field ", id->str());
                 }
                 break;
             default:
                 if (!type->isScalar())
-                    reportIndexFieldType(expr, false, errpos);
+                    reportInvalidIndexFieldType(expr, false, errpos);
                 else if ((type->getSize() == UNKNOWN_LENGTH) && !variableOk)
                 {
                     reportError(ERR_INDEX_BADTYPE, errpos, "Variable size fields (%s) are not supported inside indexes", id->str());

+ 1 - 1
ecl/hql/hqlusage.cpp

@@ -247,7 +247,7 @@ FieldAccessAnalyser::FieldAccessAnalyser(IHqlExpression * _selector) : NewHqlTra
 {
     unwindFields(fields, selector->queryRecord());
     numAccessed = 0;
-    accessed.setown(createBitSet());
+    accessed.setown(createThreadSafeBitSet());
 }
 
 IHqlExpression * FieldAccessAnalyser::queryLastFieldAccessed() const

+ 37 - 0
ecl/regress/issue12531.ecl

@@ -0,0 +1,37 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#option ('globalFold', false);
+d := dataset('~local::rkc::person', { string15 name, unsigned8 filepos{virtual(fileposition)} }, flat);
+
+i := index(d, { f_name := (varstring11) name, filepos } ,'\\home\\person.name_first.key');
+i2 := index(d, { f_name := (varunicode11) name, filepos } ,'\\home\\person.name_first.key');
+
+a1 := i(f_name='RICHARD');
+
+a2 := sort(a1, -f_name);
+build(i);
+output(a2(filepos > 10));
+output(a2(filepos > 20));
+
+output(sort(a1, -filepos));
+output(sort(a1, filepos));
+
+output(a1(filepos > 10));
+output(a1(filepos > 20));
+
+

+ 1 - 1
ecl/regress/keytrim.ecl

@@ -19,7 +19,7 @@
 //d := dataset('~local::rkc::person', { string15 f1, qstring15 f2, data15 f3, unicode15 f4, unsigned8 filepos{virtual(fileposition)} }, flat);
 d := dataset('~local::rkc::person', { string15 f1, qstring15 f2, data15 f3, varstring15 f4, unsigned8 filepos{virtual(fileposition)} }, flat);
 
-i := index(d, { d } ,'\\home\\person.name_first.key');
+i := index(d, { f1, f2 }, { d } ,'\\home\\person.name_first.key');
 
 
 string15 searchStrInStr := 'Gavin' : stored('searchStrInStr');

+ 34 - 8
esp/src/eclwatch/ESPWorkunit.js

@@ -127,21 +127,47 @@ define([
             this.set("sourceFiles", sourceFiles);
         },
         ExtractTime: function (Timer) {
+            //  GH:  <n>ns or <m>ms or <s>s or [<d> days ][<h>:][<m>:]<s>[.<ms>]
             var nsIndex = Timer.indexOf("ns");
             if (nsIndex !== -1) {
-                return Timer.substr(0, nsIndex) / 1000000000;
+                return parseFloat(Timer.substr(0, nsIndex)) / 1000000000;
             }
             var msIndex = Timer.indexOf("ms");
             if (msIndex !== -1) {
-                return Timer.substr(0, msIndex) / 1000;
+                return parseFloat(Timer.substr(0, msIndex)) / 1000;
             }
-            //MORE: This code doesn't cope with separate days
-            var secs = 0;
-            var timeParts = Timer.split(":");
-            for (var j = 0; j < timeParts.length; ++j) {
-                secs = secs * 60 + timeParts[j];
+            var sIndex = Timer.indexOf("s");
+            if (sIndex !== -1 && Timer.indexOf("days") === -1) {
+                return parseFloat(Timer.substr(0, sIndex));
             }
-            return secs;
+
+            var dayTimeParts = Timer.split(" days ");
+            var days = parseFloat(dayTimeParts.length > 1 ? dayTimeParts[0] : 0.0);
+            var time = dayTimeParts.length > 1 ? dayTimeParts[1] : dayTimeParts[0];
+            var secs = 0.0;
+            var timeParts = time.split(":").reverse();
+            for (var j = 0; j < timeParts.length; ++j) {
+                secs += parseFloat(timeParts[j]) * Math.pow(60, j);
+            }
+            return (days * 24 * 60 * 60) + secs;
+        },
+        ExtractTimeTests: function () {
+            var tests = [
+                { str: "1.1s", expected: 1.1 },
+                { str: "2.2ms", expected: 0.0022 },
+                { str: "3.3ns", expected: 0.0000000033 },
+                { str: "4.4", expected: 4.4 },
+                { str: "5:55.5", expected: 355.5 },
+                { str: "6:06:06.6", expected: 21966.6 },
+                { str: "6:06:6.6", expected: 21966.6 },
+                { str: "6:6:6.6", expected: 21966.6 },
+                { str: "7 days 7:07:7.7", expected: 630427.7 }
+            ];
+            tests.forEach(function (test, idx) {
+                if (this.ExtractTime(test.str) !== test.expected) {
+                    console.log("ExtractTimeTests failed with " + this.ExtractTime(test.str) + " !== " +  test.expected);
+                }
+            }, this);
         },
         _TimersSetter: function (Timers) {
             var timers = [];

+ 27 - 7
system/jlib/jcomp.cpp

@@ -259,6 +259,7 @@ CppCompiler::CppCompiler(const char * _coreName, const char * _sourceDir, const
     saveTemps = false;
     abortChecker = NULL;
     precompileHeader = false;
+    linkFailed = false;
 }
 
 void CppCompiler::addCompileOption(const char * option)
@@ -527,7 +528,8 @@ void CppCompiler::extractErrors(IArrayOf<IError> & errors)
 
         //cpperr.ecl:7:10: error: ‘syntaxError’ was not declared in this scope
         RegExpr gccErrorPattern("^{.+}:{[0-9]+}:{[0-9]+}: {[a-z]+}: {.*$}");
-        RegExpr gccLinkErrorPattern("^{.+}:[^:]+: {.*$}"); // undefined reference
+        RegExpr gccErrorPattern2("^{.+}:{[0-9]+}: {[a-z]+}: {.*$}");
+        RegExpr gccLinkErrorPattern("^{.+}:{[0-9]+}: {.*$}"); // undefined reference
         RegExpr gccLinkErrorPattern2("^.+ld: {.*$}"); // fail to find library etc.
         RegExpr gccExitStatusPattern("^.*exit status$"); // collect2: error: ld returned 1 exit status
         const char * cur = file.str();
@@ -561,23 +563,40 @@ void CppCompiler::extractErrors(IArrayOf<IError> & errors)
                 gccErrorPattern.findstr(kind, 4);
                 gccErrorPattern.findstr(msg, 5);
 
-                if (streq(kind, "warning"))
+                if (strieq(kind, "error"))
+                    errors.append(*createError(CategoryError, SeverityError, 2999, msg.str(), filename.str(), atoi(line), atoi(column), 0));
+                else
                     errors.append(*createError(CategoryCpp, SeverityWarning, 2999, msg.str(), filename.str(), atoi(line), atoi(column), 0));
+            }
+            else if (gccErrorPattern2.find(next))
+            {
+                StringBuffer filename, line, kind, msg;
+                gccErrorPattern2.findstr(filename, 1);
+                gccErrorPattern2.findstr(line, 2);
+                gccErrorPattern2.findstr(kind, 3);
+                gccErrorPattern2.findstr(msg, 4);
+
+                if (strieq(kind, "error"))
+                    errors.append(*createError(CategoryError, SeverityError, 2999, msg.str(), filename.str(), atoi(line), 0, 0));
                 else
-                    errors.append(*createError(CategoryError, SeverityError, 2999, msg.str(), filename.str(), atoi(line), atoi(column), 0));
+                    errors.append(*createError(CategoryCpp, SeverityWarning, 2999, msg.str(), filename.str(), atoi(line), 0, 0));
             }
             else if (gccLinkErrorPattern.find(next))
             {
-                StringBuffer filename, msg;
+                StringBuffer filename, line, msg;
                 gccLinkErrorPattern.findstr(filename, 1);
-                gccLinkErrorPattern.findstr(msg, 2);
-                errors.append(*createError(CategoryError, SeverityError, 2999, msg.str(), filename.str(), 0, 0, 0));
+                gccLinkErrorPattern.findstr(line, 2);
+                gccLinkErrorPattern.findstr(msg, 3);
+
+                ErrorSeverity severity = linkFailed ? SeverityError : SeverityWarning;
+                errors.append(*createError(CategoryError, severity, 2999, msg.str(), filename.str(), atoi(line), 0, 0));
             }
             else if (gccLinkErrorPattern2.find(next))
             {
                 StringBuffer msg("C++ link error: ");
                 gccLinkErrorPattern2.findstr(msg, 1);
-                errors.append(*createError(CategoryError, SeverityError, 2999, msg.str(), NULL, 0, 0, 0));
+                ErrorSeverity severity = linkFailed ? SeverityError : SeverityWarning;
+                errors.append(*createError(CategoryError, severity, 2999, msg.str(), NULL, 0, 0, 0));
             }
             else if (vsErrorPattern.find(next))
             {
@@ -650,6 +669,7 @@ bool CppCompiler::doLink()
     StringBuffer logFile = StringBuffer(CORE_NAME).append("_link.log.tmp");
     logFiles.append(logFile);
     bool ret = invoke_program(expanded.str(), runcode, true, logFile) && (runcode == 0);
+    linkFailed = !ret;
     return ret;
 }
 

+ 1 - 0
system/jlib/jcomp.ipp

@@ -81,6 +81,7 @@ protected:
     void _addInclude(StringBuffer &s, const char *paths);
     bool            saveTemps;
     bool            precompileHeader;
+    bool            linkFailed;
     IAbortRequestCallback * abortChecker;
 };
 

+ 350 - 152
system/jlib/jset.cpp

@@ -23,121 +23,71 @@
 
 //-----------------------------------------------------------------------
 
-// Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
-
-class CBitSet : public CInterface, implements IBitSet
+// NB: The CBitSet*Helper's are primarily avoid the need for virtuals in the implementations
+class CBitSetArrayHelper
 {
-public:
-    IMPLEMENT_IINTERFACE;
 protected:
-    //unsigned seems to be most efficient, and required for __builtin_ffs below
-    typedef unsigned bits_t;
-    enum { BitsPerItem = sizeof(bits_t) * 8 };
     ArrayOf<bits_t> bits;
-    mutable CriticalSection crit;
 
-public:
-    CBitSet() { }
-    CBitSet(MemoryBuffer &buffer)
+    inline bits_t getBits(unsigned i) const { return bits.item(i); }
+    inline void setBits(unsigned i, bits_t m)
     {
-        deserialize(buffer);
+        bits.replace(m, i);
     }
-    void set(unsigned n,bool val) 
+    inline void addBitSet(bits_t m)
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        if (i>=bits.ordinality()) {
-            if (!val)
-                return; // don't bother
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-        }
-        else {
-            bits_t m=bits.item(i);
-            if (val)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
+        bits.append(m);
     }
-        
-    bool invert(unsigned n) 
-    {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        bool ret;
-        if (i>=bits.ordinality()) {
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-            ret = true;
-        }
-        else {
-            bits_t m=bits.item(i);
-            ret = ((m&t)==0);
-            if (ret)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
-        return ret;
-    }
-        
-    bool test(unsigned n) 
+    inline unsigned getWidth() const { return bits.ordinality(); }
+};
+
+class CBitSetMemoryHelper
+{
+protected:
+    bits_t *mem;
+    unsigned bitSetUnits;
+    MemoryBuffer mb; // Used if mem not provided, also implies expansion allowed
+    bool fixedMemory;
+
+    CBitSetMemoryHelper()
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        if (i<bits.ordinality()) {
-            bits_t m=bits.item(i);
-            if (m&t)
-                return true;
-        }
-        return false;
+        fixedMemory = false;
+        bitSetUnits = 0;
+        mem = NULL;
     }
-        
-    bool testSet(unsigned n,bool val) 
+    inline bits_t getBits(unsigned i) const { return mem[i]; }
+    inline void setBits(unsigned i, bits_t m) { mem[i] = m; }
+    inline void addBitSet(bits_t m)
     {
-        bits_t t=((bits_t)1)<<(n%BitsPerItem);
-        unsigned i=n/BitsPerItem;
-        CriticalBlock block(crit);
-        bool ret;
-        if (i>=bits.ordinality()) {
-            ret = false;
-            if (!val)
-                return false; // don't bother
-            while (i>bits.ordinality())
-                bits.append(0);
-            bits.append(t);
-        }
-        else {
-            bits_t m=bits.item(i);
-            ret = (m&t)!=0;
-            if (val)
-                m |= t;
-            else 
-                m &= ~t;
-            bits.replace(m,i);
-        }
-        return ret;
+        if (fixedMemory)
+            throw MakeStringException(-1, "CBitSet with fixed mem cannot expand");
+        mb.append(m);
+        mem = (bits_t *)mb.bufferBase();
+        ++bitSetUnits;
     }
+    inline unsigned getWidth() const { return bitSetUnits; }
+};
+
+template <class BITSETHELPER>
+class CBitSetBase : public BITSETHELPER, public CSimpleInterfaceOf<IBitSet>
+{
+protected:
+    typedef BITSETHELPER PARENT;
+    using PARENT::getWidth;
+    using PARENT::getBits;
+    using PARENT::setBits;
+    using PARENT::addBitSet;
 
-    unsigned _scan(unsigned from,bool tst,bool scninv)
+    unsigned _scan(unsigned from, bool tst, bool scninv)
     {
         bits_t noMatchMask=tst?0:(bits_t)-1;
         unsigned j=from%BitsPerItem;
-        CriticalBlock block(crit);
         // returns index of first = val >= from
-        unsigned n=bits.ordinality();
+        unsigned n=getWidth();
         unsigned i;
         for (i=from/BitsPerItem;i<n;i++)
         {
-            bits_t m=bits.item(i);
+            bits_t m = getBits(i);
             if (m!=noMatchMask)
             {
 #if defined(__GNUC__)
@@ -170,7 +120,7 @@ public:
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         m &= ~t;
-                        bits.replace(m,i);
+                        setBits(i, m);
                     }
                     return i*BitsPerItem+pos;
                 }
@@ -182,7 +132,7 @@ public:
                     {
                         bits_t t = ((bits_t)1)<<pos;
                         m |= t;
-                        bits.replace(m,i);
+                        setBits(i, m);
                     }
                     return i*BitsPerItem+pos;
                 }
@@ -197,7 +147,7 @@ public:
                             if (scninv)
                             {
                                 m &= ~t;
-                                bits.replace(m,i);
+                                setBits(i, m);
                             }
                             return i*BitsPerItem+j;
                         }
@@ -209,7 +159,7 @@ public:
                             if (scninv)
                             {
                                 m |= t;
-                                bits.replace(m,i);
+                                setBitSet(i, m);
                             }
                             return i*BitsPerItem+j;
                         }
@@ -220,7 +170,7 @@ public:
             }
             j = 0;
         }
-        if (tst) 
+        if (tst)
             return (unsigned)-1;
         unsigned ret = n*BitsPerItem;
         if (n*BitsPerItem<from)
@@ -229,93 +179,215 @@ public:
             set(ret,true);
         return ret;
     }
-
-    unsigned scan(unsigned from,bool tst)
-    {
-        return _scan(from,tst,false);
-    }
-
-    unsigned scanInvert(unsigned from,bool tst) // like scan but inverts bit as well
-    {
-        return _scan(from,tst,true);
-    }
-
-    void _incl(unsigned lo, unsigned hi,bool val)
+    void _incl(unsigned lo, unsigned hi, bool val)
     {
         if (hi<lo)
             return;
         unsigned j=lo%BitsPerItem;
         unsigned nb=(hi-lo)+1;
-        CriticalBlock block(crit);
-        unsigned n=bits.ordinality();
-        unsigned i;
-        for (i=lo/BitsPerItem;i<n;i++) {
-            bits_t m;
-            if ((nb>=BitsPerItem)&&(j==0)) {
-                m = i;
-                nb -= BitsPerItem;
+        unsigned n=getWidth();
+        unsigned i=lo/BitsPerItem;
+        if (n<=i)
+        {
+            if (!val)
+                return;
+            while (n < i)
+            {
+                addBitSet(0);
+                ++n;
             }
-            else {
-                m=bits.item(i);
+            if (j>0)
+            {
+                bits_t m = 0;
                 bits_t t = ((bits_t)1)<<j;
-                for (;j<BitsPerItem;j++) {
-                    if (val)
-                        m |= t;
-                    else
-                        m &= ~t;
+                for (;j<BitsPerItem;j++)
+                {
+                    m |= t;
                     if (--nb==0)
                         break;
                     t <<= 1;
                 }
+                addBitSet(m);
             }
-            bits.replace(m,i);
             if (nb==0)
                 return;
             j = 0;
         }
-        if (val) {
-            while (nb>=BitsPerItem) {
-                bits.append((bits_t)-1);
-                nb-=BitsPerItem;
+        else
+        {
+            for (;i<n;i++)
+            {
+                bits_t m;
+                if ((nb>=BitsPerItem)&&(j==0))
+                {
+                    if (val)
+                        m = (bits_t)-1;
+                    else
+                        m = 0;
+                    nb -= BitsPerItem;
+                }
+                else
+                {
+                    m = getBits(i);
+                    bits_t t = ((bits_t)1)<<j;
+                    for (;j<BitsPerItem;j++)
+                    {
+                        if (val)
+                            m |= t;
+                        else
+                            m &= ~t;
+                        if (--nb==0)
+                            break;
+                        t <<= 1;
+                    }
+                }
+                setBits(i, m);
+                if (nb==0)
+                    return;
+                j = 0;
+            }
+        }
+        if (val)
+        {
+            while (nb>=BitsPerItem)
+            {
+                addBitSet((bits_t)-1);
+                nb -= BitsPerItem;
             }
-            if (nb>0) {
+            if (nb>0)
+            {
                 bits_t m=0;
                 bits_t t = ((bits_t)1)<<j;
-                for (;j<BitsPerItem;j++) {
+                for (;j<BitsPerItem;j++)
+                {
                     m |= t;
                     if (--nb==0)
                         break;
                     t <<= 1;
                 }
-                bits.append(m);
+                addBitSet(m);
             }
         }
     }
-
-    void incl(unsigned lo, unsigned hi)
+public:
+// IBitSet impl.
+    virtual void set(unsigned n, bool val)
     {
-        _incl(lo,hi,true);
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i = n/BitsPerItem;
+        if (i>=getWidth())
+        {
+            if (!val)
+                return; // don't bother
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+        }
+        else
+        {
+            bits_t m = getBits(i);
+            if (val)
+                m |= t;
+            else
+                m &= ~t;
+            setBits(i, m);
+        }
     }
-
-    void excl(unsigned lo, unsigned hi)
+    virtual bool invert(unsigned n)
     {
-        _incl(lo,hi,false);
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        bool ret;
+        if (i>=getWidth())
+        {
+            while (i>getWidth())
+                addBitSet(0);
+            addBitSet(t);
+            ret = true;
+        }
+        else
+        {
+            bits_t m = getBits(i);
+            ret = 0 == (m&t);
+            if (ret)
+                m |= t;
+            else
+                m &= ~t;
+            setBits(i, m);
+        }
+        return ret;
     }
-
-    void reset()
+    virtual bool test(unsigned n)
     {
-        CriticalBlock block(crit);
-        bits.kill();
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        if (i<getWidth())
+        {
+            bits_t m = getBits(i);
+            if (m&t)
+                return true;
+        }
+        return false;
     }
-
-    void serialize(MemoryBuffer &buffer) const
+    virtual bool testSet(unsigned n, bool val)
     {
-        CriticalBlock block(crit);
-        buffer.append(bits.ordinality());
-        ForEachItemIn(b, bits)
-            buffer.append(bits.item(b));
+        bits_t t=((bits_t)1)<<(n%BitsPerItem);
+        unsigned i=n/BitsPerItem;
+        if (i>=getWidth())
+        {
+            if (val)
+            {
+                while (i>getWidth())
+                    addBitSet(0);
+                addBitSet(t);
+            }
+            return false;
+        }
+        else
+        {
+            bits_t m = getBits(i);
+            if (m&t)
+            {
+                if (!val)
+                    setBits(i, m & ~t);
+                return true;
+            }
+            else
+            {
+                if (val)
+                    setBits(i, m | t);
+                return false;
+            }
+        }
     }
+    virtual unsigned scan(unsigned from,bool tst)
+    {
+        return _scan(from,tst,false);
+    }
+    virtual unsigned scanInvert(unsigned from,bool tst) // like scan but inverts bit as well
+    {
+        return _scan(from,tst,true);
+    }
+    virtual void incl(unsigned lo, unsigned hi)
+    {
+        _incl(lo,hi,true);
+    }
+    virtual void excl(unsigned lo, unsigned hi)
+    {
+        _incl(lo,hi,false);
+    }
+};
+
+size32_t getBitSetMemoryRequirement(unsigned numBits)
+{
+    unsigned bitSetUnits = (numBits + (BitsPerItem-1)) / BitsPerItem;
+    return bitSetUnits * sizeof(bits_t);
+}
 
+// Simple BitSet // 0 based all, intermediate items exist, operations threadsafe and atomic
+class CBitSetThreadSafe : public CBitSetBase<CBitSetArrayHelper>
+{
+    mutable CriticalSection crit;
     void deserialize(MemoryBuffer &buffer)
     {
         CriticalBlock block(crit);
@@ -333,14 +405,140 @@ public:
             }
         }
     }
+public:
+    CBitSetThreadSafe()
+    {
+    }
+    CBitSetThreadSafe(MemoryBuffer &buffer)
+    {
+        deserialize(buffer);
+    }
+// IBitSet overloads
+    virtual void set(unsigned n, bool val)
+    {
+        CriticalBlock block(crit);
+        CBitSetBase::set(n, val);
+    }
+    virtual bool invert(unsigned n)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::invert(n);
+    }
+    virtual bool test(unsigned n)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::test(n);
+    }
+    virtual bool testSet(unsigned n, bool val)
+    {
+        CriticalBlock block(crit);
+        return CBitSetBase::testSet(n, val);
+    }
+    virtual unsigned scan(unsigned from, bool tst)
+    {
+        CriticalBlock block(crit);
+        return _scan(from,tst,false);
+    }
+    virtual unsigned scanInvert(unsigned from, bool tst) // like scan but inverts bit as well
+    {
+        CriticalBlock block(crit);
+        return _scan(from,tst,true);
+    }
+    virtual void incl(unsigned lo, unsigned hi)
+    {
+        CriticalBlock block(crit);
+        _incl(lo,hi,true);
+    }
+    virtual void excl(unsigned lo, unsigned hi)
+    {
+        CriticalBlock block(crit);
+        _incl(lo,hi,false);
+    }
+    virtual void reset()
+    {
+        CriticalBlock block(crit);
+        bits.kill();
+    }
+    virtual void serialize(MemoryBuffer &buffer) const
+    {
+        CriticalBlock block(crit);
+        buffer.append(bits.ordinality());
+        ForEachItemIn(b, bits)
+            buffer.append(bits.item(b));
+    }
+};
+
+extern jlib_decl IBitSet *createThreadSafeBitSet()
+{
+    return new CBitSetThreadSafe();
+}
+
+
+class CBitSet : public CBitSetBase<CBitSetMemoryHelper>
+{
+    void deserialize(MemoryBuffer &buffer)
+    {
+        unsigned count;
+        buffer.read(count);
+        if (count)
+        {
+            unsigned bitSets = count/BitsPerItem;
+            bitSetUnits = bitSets;
+            mem = (bits_t *)mb.reserveTruncate(bitSets*sizeof(bits_t));
+        }
+        else
+        {
+            bitSetUnits = 0;
+            mem = NULL;
+        }
+        fixedMemory = false;
+    }
+public:
+    CBitSet()
+    {
+       // In this form, bitSetUnits and mem will be updated when addBitSet expands mb
+    }
+    CBitSet(size32_t memSz, const void *_mem, bool reset)
+    {
+        bitSetUnits = memSz*sizeof(byte) / sizeof(bits_t);
+        mem = (bits_t *)_mem;
+        if (reset)
+            memset(mem, 0, bitSetUnits*sizeof(bits_t));
+        fixedMemory = true;
+    }
+    CBitSet(MemoryBuffer &buffer)
+    {
+        deserialize(buffer);
+    }
+    virtual void reset()
+    {
+        memset(mem, 0, sizeof(bits_t)*bitSetUnits);
+    }
+    virtual void serialize(MemoryBuffer &buffer) const
+    {
+        buffer.append((unsigned)(BitsPerItem*bitSetUnits));
+        buffer.append(bitSetUnits*sizeof(bits_t), mem);
+    }
 };
 
+extern jlib_decl IBitSet *createBitSet(unsigned maxBits, const void *mem, bool reset)
+{
+    return new CBitSet(maxBits, mem, reset);
+}
+
 extern jlib_decl IBitSet *createBitSet()
 {
     return new CBitSet();
 }
 
-extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb)
+
+// NB: Doubt you'd want to interchange, but serialization formats are compatible
+extern jlib_decl IBitSet *deserializeThreadSafeBitSet(MemoryBuffer &mb)
+{
+    return new CBitSetThreadSafe(mb);
+}
+
+extern jlib_decl IBitSet *deserializeBitSet(MemoryBuffer &mb)
 {
     return new CBitSet(mb);
 }

+ 18 - 2
system/jlib/jset.hpp

@@ -38,10 +38,26 @@ interface jlib_decl IBitSet : public IInterface
     virtual void serialize(MemoryBuffer &buffer) const = 0;
 };
 
-extern jlib_decl IBitSet *deserializeIBitSet(MemoryBuffer &mb);
+// type of underlying bit storage, exposed so thread-unsafe version can know boundaries
+typedef unsigned bits_t;
+enum { BitsPerItem = sizeof(bits_t) * 8 };
+
 
 // Simple BitSet // 0 based, all intermediate items exist, operations threadsafe and atomic
-extern jlib_decl IBitSet *createBitSet(); 
+extern jlib_decl IBitSet *createThreadSafeBitSet();
+extern jlib_decl IBitSet *deserializeThreadSafeBitSet(MemoryBuffer &mb);
+
+/* Not thread safe, but can be significantly faster than createThreadSafeBitSet
+ * Client provides a fixed block of memory used for the bit set, threads must ensure they do not set bits
+ * in parallel within the same bits_t space.
+ * IOW, e.g. bits 0-sizeof(bits_t) must be set from only 1 thread at a time.
+ */
+extern jlib_decl IBitSet *createBitSet(size32_t memSize, const void *mem, bool reset=true);
+// This form allows the size of the bit set to be dynamic. No guarantees about threading.
+extern jlib_decl IBitSet *createBitSet();
+extern jlib_decl IBitSet *deserializeBitSet(MemoryBuffer &mb);
+// returns number of bytes required to represent numBits in memory
+extern jlib_decl size32_t getBitSetMemoryRequirement(unsigned numBits);
 
 
 

+ 4 - 3
system/jlib/jstats.cpp

@@ -1619,8 +1619,8 @@ void StatisticsFilter::set(const char * creatorTypeText, const char * scopeTypeT
 void StatisticsFilter::set(const char * _creatorTypeText, const char * _creator, const char * _scopeTypeText, const char * _scope, const char * _measureText, const char * _kindText)
 {
     StatisticMeasure newMeasure = queryMeasure(_measureText);
-    if (measure != SMeasureNone)
-        setMeasure(measure);
+    if (newMeasure != SMeasureNone)
+        setMeasure(newMeasure);
     set(_creatorTypeText, _scopeTypeText, _kindText);
     setCreator(_creator);
     setScope(_scope);
@@ -1687,7 +1687,8 @@ void StatisticsFilter::setKind(const char * _kind)
 {
     if (!_kind || !*_kind || streq(_kind, "*"))
     {
-        measure = SMeasureAll;
+        if (measure == SMeasureNone)
+            measure = SMeasureAll;
         kind = StKindAll;
         return;
     }

+ 202 - 33
testing/unittests/jlibtests.cpp

@@ -95,56 +95,225 @@ public:
 
 protected:
 
+    void testSet1(bool initial, IBitSet *bs, unsigned start, unsigned numBits, bool setValue, bool clearValue)
+    {
+        unsigned end = start+numBits;
+        if (initial)
+            bs->incl(start, end);
+        for (unsigned i=start; i < end; i++)
+        {
+            ASSERT(bs->test(i) == clearValue);
+            bs->set(i, setValue);
+            ASSERT(bs->test(i) == setValue);
+
+            bs->set(i+5, setValue);
+            ASSERT(bs->scan(0, setValue) == i);
+            ASSERT(bs->scan(i+1, setValue) == i+5);
+            bs->set(i, clearValue);
+            bs->set(i+5, clearValue);
+            unsigned match1 = bs->scan(0, setValue);
+            ASSERT(match1 == initial ? -1 : end);
+
+            bs->invert(i);
+            ASSERT(bs->test(i) == setValue);
+            bs->invert(i);
+            ASSERT(bs->test(i) == clearValue);
+
+            bool wasSet = bs->testSet(i, setValue);
+            ASSERT(wasSet == clearValue);
+            bool wasSet2 = bs->testSet(i, clearValue);
+            ASSERT(wasSet2 == setValue);
+            ASSERT(bs->test(i) == clearValue);
+
+            bs->set(i, setValue);
+            unsigned match = bs->scanInvert(0, setValue);
+            ASSERT(match == i);
+            ASSERT(bs->test(i) == clearValue);
+        }
+        bs->reset();
+        if (initial)
+        {
+            bs->incl(start, end);
+            bs->excl(start+5, end-5);
+        }
+        else
+            bs->incl(start+5, end-5);
+        unsigned inclStart = bs->scan(start, setValue);
+        ASSERT((start+5) == inclStart);
+        unsigned inclEnd = bs->scan(start+5, clearValue);
+        ASSERT((end-5) == (inclEnd-1));
+    }
+
     void testSet(bool initial)
     {
         unsigned now = msTick();
         bool setValue = !initial;
         bool clearValue = initial;
         const unsigned numBits = 400;
-        for (unsigned pass=0; pass< 10000; pass++)
+        for (unsigned pass=0; pass < 10000; pass++)
+        {
+            Owned<IBitSet> bs = createThreadSafeBitSet();
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        unsigned elapsed = msTick()-now;
+        fprintf(stdout, "Bit test (%u) time taken = %dms\n", initial, elapsed);
+        now = msTick();
+        for (unsigned pass=0; pass < 10000; pass++)
         {
             Owned<IBitSet> bs = createBitSet();
-            if (initial)
-                bs->incl(0, numBits);
-            for (unsigned i=0; i < numBits; i++)
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        elapsed = msTick()-now;
+        fprintf(stdout, "Bit test [thread-unsafe version] (%u) time taken = %dms\n", initial, elapsed);
+        now = msTick();
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits+5);
+        MemoryBuffer mb;
+        void *mem = mb.reserveTruncate(bitSetMemSz);
+        for (unsigned pass=0; pass < 10000; pass++)
+        {
+            Owned<IBitSet> bs = createBitSet(bitSetMemSz, mem);
+            testSet1(initial, bs, 0, numBits, setValue, clearValue);
+        }
+        elapsed = msTick()-now;
+        fprintf(stdout, "Bit test [thread-unsafe version, fixed memory] (%u) time taken = %dms\n", initial, elapsed);
+    }
+
+    class CBitThread : public CSimpleInterfaceOf<IInterface>, implements IThreaded
+    {
+        IBitSet &bitSet;
+        unsigned startBit, numBits;
+        bool initial, setValue, clearValue;
+        CThreaded threaded;
+        Owned<IException> exception;
+        CppUnit::Exception *cppunitException;
+    public:
+        CBitThread(IBitSet &_bitSet, unsigned _startBit, unsigned _numBits, bool _initial)
+            : threaded("CBitThread", this), bitSet(_bitSet), startBit(_startBit), numBits(_numBits), initial(_initial)
+        {
+            cppunitException = NULL;
+            setValue = !initial;
+            clearValue = initial;
+        }
+        void start() { threaded.start(); }
+        void join()
+        {
+            threaded.join();
+            if (exception)
+                throw exception.getClear();
+            else if (cppunitException)
+                throw cppunitException;
+        }
+        virtual void main()
+        {
+            try
             {
-                ASSERT(bs->test(i) == clearValue);
-                bs->set(i, setValue);
-                ASSERT(bs->test(i) == setValue);
-
-                bs->set(i+5, setValue);
-                ASSERT(bs->scan(0, setValue) == i);
-                ASSERT(bs->scan(i+1, setValue) == i+5);
-                bs->set(i, clearValue);
-                bs->set(i+5, clearValue);
-                unsigned match1 = bs->scan(0, setValue);
-                ASSERT(match1 == initial ? -1 : numBits);
-
-                bs->invert(i);
-                ASSERT(bs->test(i) == setValue);
-                bs->invert(i);
-                ASSERT(bs->test(i) == clearValue);
-
-                bool wasSet = bs->testSet(i, setValue);
-                ASSERT(wasSet == clearValue);
-                bool wasSet2 = bs->testSet(i, clearValue);
-                ASSERT(wasSet2 == setValue);
-                ASSERT(bs->test(i) == clearValue);
-
-                bs->set(i, setValue);
-                unsigned match = bs->scanInvert(0, setValue);
-                ASSERT(match == i);
-                ASSERT(bs->test(i) == clearValue);
+                unsigned endBit = startBit+numBits-1;
+                if (initial)
+                    bitSet.incl(startBit, endBit);
+                for (unsigned i=startBit; i < endBit; i++)
+                {
+                    ASSERT(bitSet.test(i) == clearValue);
+                    bitSet.set(i, setValue);
+                    ASSERT(bitSet.test(i) == setValue);
+                    if (i < (endBit-1))
+                        ASSERT(bitSet.scan(i, clearValue) == i+1); // find next unset (should be i+1)
+                    bitSet.set(i, clearValue);
+                    bitSet.invert(i);
+                    ASSERT(bitSet.test(i) == setValue);
+                    bitSet.invert(i);
+                    ASSERT(bitSet.test(i) == clearValue);
+
+                    bool wasSet = bitSet.testSet(i, setValue);
+                    ASSERT(wasSet == clearValue);
+                    bool wasSet2 = bitSet.testSet(i, clearValue);
+                    ASSERT(wasSet2 == setValue);
+                    ASSERT(bitSet.test(i) == clearValue);
+
+                    bitSet.set(i, setValue);
+                    unsigned match = bitSet.scanInvert(startBit, setValue);
+                    ASSERT(match == i);
+                    ASSERT(bitSet.test(i) == clearValue);
+                }
+            }
+            catch (IException *e)
+            {
+                exception.setown(e);
+            }
+            catch (CppUnit::Exception &e)
+            {
+                cppunitException = e.clone();
             }
         }
-        unsigned elapsed = msTick()-now;
-        fprintf(stdout, "Bit test (%u) time taken = %dms\n", initial, elapsed);
+    };
+    unsigned testParallelRun(IBitSet &bitSet, unsigned nThreads, unsigned bitsPerThread, bool initial)
+    {
+        IArrayOf<CBitThread> bitThreads;
+        unsigned bitStart = 0;
+        unsigned bitEnd = 0;
+        for (unsigned t=0; t<nThreads; t++)
+        {
+            bitThreads.append(* new CBitThread(bitSet, bitStart, bitsPerThread, initial));
+            bitStart += bitsPerThread;
+        }
+        unsigned now = msTick();
+        for (unsigned t=0; t<nThreads; t++)
+            bitThreads.item(t).start();
+        Owned<IException> exception;
+        CppUnit::Exception *cppunitException = NULL;
+        for (unsigned t=0; t<nThreads; t++)
+        {
+            try
+            {
+                bitThreads.item(t).join();
+            }
+            catch (IException *e)
+            {
+                EXCLOG(e, NULL);
+                if (!exception)
+                    exception.setown(e);
+                else
+                    e->Release();
+            }
+            catch (CppUnit::Exception *e)
+            {
+                cppunitException = e;
+            }
+        }
+        if (exception)
+            throw exception.getClear();
+        else if (cppunitException)
+            throw *cppunitException;
+        return msTick()-now;
+    }
+
+    void testSetParallel(bool initial)
+    {
+        unsigned numBits = 1000000; // 10M
+        unsigned nThreads = getAffinityCpus();
+        unsigned bitsPerThread = numBits/nThreads;
+        bitsPerThread = ((bitsPerThread + (BitsPerItem-1)) / BitsPerItem) * BitsPerItem; // round up to multiple of BitsPerItem
+        numBits = bitsPerThread*nThreads; // round
+
+        fprintf(stdout, "testSetParallel, testing bit set of size : %d, nThreads=%d\n", numBits, nThreads);
+
+        Owned<IBitSet> bitSet = createThreadSafeBitSet();
+        unsigned took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
+        fprintf(stdout, "Thread safe parallel bit set test (%u) time taken = %dms\n", initial, took);
+
+        size32_t bitSetMemSz = getBitSetMemoryRequirement(numBits);
+        MemoryBuffer mb;
+        void *mem = mb.reserveTruncate(bitSetMemSz);
+        bitSet.setown(createBitSet(bitSetMemSz, mem));
+        took = testParallelRun(*bitSet, nThreads, bitsPerThread, initial);
+        fprintf(stdout, "Thread unsafe parallel bit set test (%u) time taken = %dms\n", initial, took);
     }
 
     void testSimple()
     {
         testSet(false);
         testSet(true);
+        testSetParallel(false);
+        testSetParallel(true);
     }
 };
 

+ 2 - 2
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -341,8 +341,8 @@ public:
                     localLastPart[subFile] = pnum;
             }
             headerLinesRemaining.allocateN(subFiles);
-            gotHeaderLines.setown(createBitSet());
-            sentHeaderLines.setown(createBitSet());
+            gotHeaderLines.setown(createThreadSafeBitSet());
+            sentHeaderLines.setown(createThreadSafeBitSet());
         }
         partHandler.setown(new CCsvPartHandler(*this));
         appendOutputLinked(this);

+ 2 - 2
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp

@@ -628,7 +628,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
                 try
                 {
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
-                    Owned<IBitSet> endRequests = createBitSet(); // NB: verification only
+                    Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
                     while (!aborted)
                     {
                         rank_t sender;
@@ -735,7 +735,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
                     rank_t sender;
                     CMessageBuffer msg;
                     unsigned endRequestsCount = owner.container.queryJob().querySlaves();
-                    Owned<IBitSet> endRequests = createBitSet(); // NB: verification only
+                    Owned<IBitSet> endRequests = createThreadSafeBitSet(); // NB: verification only
 
                     Owned<IRowInterfaces> fetchDiskRowIf = createRowInterfaces(owner.helper->queryDiskRecordSize(),owner.queryActivityId(),owner.queryCodeContext());
                     while (!aborted)

+ 85 - 23
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -363,8 +363,8 @@ public:
         allDone = allDoneWaiting = allRequestStop = stopping = stopRecv = false;
         myNode = activity.queryJob().queryMyRank();
         slaves = activity.queryJob().querySlaves();
-        slavesDone.setown(createBitSet());
-        slavesStopping.setown(createBitSet());
+        slavesDone.setown(createThreadSafeBitSet());
+        slavesStopping.setown(createThreadSafeBitSet());
         mpTag = TAG_NULL;
         recvInterface = NULL;
     }
@@ -453,15 +453,13 @@ class CMarker
     CActivityBase &activity;
     NonReentrantSpinLock lock;
     ICompare *cmp;
-    /* Access to bitSet is currently protected by the implementation
-     * Should move over to an implementation that's based on a lump of
-     * roxiemem and ensure that the threads avoid accessing the same bytes/words etc.
-     */
-    Owned<IBitSet> bitSet; // should be roxiemem, so can cause spilling
+    OwnedConstThorRow bitSetMem; // for thread unsafe version
+    Owned<IBitSet> bitSet;
     const void **base;
     rowidx_t nextChunkStartRow; // Updated as threads request next chunk
     rowidx_t rowCount, chunkSize; // There are configured at start of calculate()
     rowidx_t parallelMinChunkSize, parallelChunkSize; // Constant, possibly configurable in future
+    unsigned threadCount;
 
     class CCompareThread : public CInterface, implements IThreaded
     {
@@ -498,10 +496,14 @@ class CMarker
     }
     inline void mark(rowidx_t i)
     {
+        // NB: Thread safe, because markers are dealing with discrete parts of bitSetMem (alighted to bits_t boundaries)
         bitSet->set(i); // mark boundary
     }
     rowidx_t doMarking(rowidx_t myStart, rowidx_t myEnd)
     {
+        // myStart must be on bits_t boundary
+        dbgassertex(0 == (myStart % BitsPerItem));
+
         rowidx_t chunkUnique = 0;
         const void **rows = base+myStart;
         rowidx_t i=myStart;
@@ -550,20 +552,46 @@ public:
         // perhaps should make these configurable..
         parallelMinChunkSize = 1024;
         parallelChunkSize = 10*parallelMinChunkSize;
+        threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
+        if (0 == threadCount)
+            threadCount = getAffinityCpus();
+    }
+    bool init(rowidx_t rowCount)
+    {
+        bool threadSafeBitSet = activity.getOptBool("threadSafeBitSet", false); // for testing only
+        if (threadSafeBitSet)
+        {
+            DBGLOG("Using Thread safe variety of IBitSet");
+            bitSet.setown(createThreadSafeBitSet());
+        }
+        else
+        {
+            size32_t bitSetMemSz = getBitSetMemoryRequirement(rowCount);
+            void *pBitSetMem = activity.queryJob().queryRowManager()->allocate(bitSetMemSz, activity.queryContainer().queryId(), SPILL_PRIORITY_LOW);
+            if (!pBitSetMem)
+                return false;
+
+            bitSetMem.setown(pBitSetMem);
+            bitSet.setown(createBitSet(bitSetMemSz, pBitSetMem));
+        }
+        return true;
+    }
+    void reset()
+    {
+        bitSet.clear();
     }
     rowidx_t calculate(CThorExpandingRowArray &rows, ICompare *_cmp, bool doSort)
     {
+        CCycleTimer timer;
+        assertex(bitSet);
         cmp = _cmp;
-        unsigned threadCount = activity.getOptInt(THOROPT_JOINHELPER_THREADS, activity.queryMaxCores());
-        if (0 == threadCount)
-            threadCount = getAffinityCpus();
         if (doSort)
             rows.sort(*cmp, threadCount);
         rowCount = rows.ordinality();
         if (0 == rowCount)
             return 0;
         base = rows.getRowArray();
-        bitSet.setown(createBitSet());
+
         rowidx_t uniqueTotal = 0;
         if ((1 == threadCount) || (rowCount < parallelMinChunkSize))
             uniqueTotal = doMarking(0, rowCount);
@@ -578,6 +606,9 @@ public:
                 chunkSize = parallelMinChunkSize;
                 threadCount = rowCount / chunkSize;
             }
+            // Must be multiple of sizeof BitsPerItem
+            chunkSize = ((chunkSize + (BitsPerItem-1)) / BitsPerItem) * BitsPerItem; // round up to nearest multiple of BitsPerItem
+
             /* This is yet another case of requiring a set of small worker threads
              * Thor should really use a common pool of lightweight threadlets made available to all
              * where any particular instances (e.g. lookup) can stipulate min/max it requires etc.
@@ -610,6 +641,7 @@ public:
         ++uniqueTotal;
         mark(rowCount-1); // last row is implicitly end of group
         cmp = NULL;
+        DBGLOG("CMarker::calculate - uniqueTotal=%"RIPF"d, took=%d ms", uniqueTotal, timer.elapsedMs());
         return uniqueTotal;
     }
     rowidx_t findNextBoundary(rowidx_t start)
@@ -1509,8 +1541,11 @@ protected:
                 bool success=false;
                 try
                 {
-                    // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
-                    success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    if (marker.init(rhsRows))
+                    {
+                        // NB: If this ensure returns false, it will have called the MM callbacks and have setup isLocalLookup() already
+                        success = rhs.ensure(rhsRows, SPILL_PRIORITY_LOW); // NB: Could OOM, handled by exception handler
+                    }
                 }
                 catch (IException *e)
                 {
@@ -1610,6 +1645,7 @@ protected:
 
                 // If HT sized already and now spilt, too big clear and size when local size known
                 clearHT();
+                marker.reset();
 
                 if (stopping)
                 {
@@ -1724,7 +1760,7 @@ protected:
                 }
             }
             else
-                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs, NULL, false));
+                rightStream.setown(rowLoader->load(right, abortSoon, false, &rhs));
 
             if (!rightStream)
             {
@@ -1739,16 +1775,40 @@ protected:
 
                 rowLoader.clear();
 
-                // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
-                rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
-
-                /* Although HT is allocated with a low spill priority, it can still cause callbacks
-                 * so try to allocate before rhs is transferred to spillable collector
-                 */
-                bool htAllocated = setupHT(uniqueKeys);
-                if (!htAllocated)
+                bool success;
+                try
+                {
+                    success = marker.init(rhs.ordinality());
+                }
+                catch (IException *e)
+                {
+                    if (!isSmart())
+                        throw;
+                    switch (e->errorCode())
+                    {
+                    case ROXIEMM_MEMORY_POOL_EXHAUSTED:
+                    case ROXIEMM_MEMORY_LIMIT_EXCEEDED:
+                        e->Release();
+                        break;
+                    default:
+                        throw;
+                    }
+                    success = false;
+                }
+                if (success)
+                {
+                    // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
+                    rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
+                    success = setupHT(uniqueKeys);
+                    if (!success)
+                    {
+                        if (!isSmart())
+                            throw MakeActivityException(this, 0, "Failed to allocate [LOCAL] hash table");
+                    }
+                }
+                if (!success)
                 {
-                    ActPrintLog("Out of memory trying to allocate the [LOCAL] hash table for a SMART join (%"RIPF"d rows), will now failover to a std hash join", uniqueKeys);
+                    ActPrintLog("Out of memory trying to allocate [LOCAL] tables for a SMART join (%"RIPF"d rows), will now failover to a std hash join", rhs.ordinality());
                     Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
                     collector->setOptions(rcflag_noAllInMemSort); // If fits into memory, don't want it resorted
                     collector->transferRowsIn(rhs); // can spill after this
@@ -1784,6 +1844,7 @@ protected:
             {
                 ActPrintLog("Performing standard join");
 
+                marker.reset();
                 // NB: lhs ordering and grouping lost from here on.. (will have been caught earlier if global)
                 if (grouped)
                     throw MakeActivityException(this, 0, "Degraded to standard join, LHS order cannot be preserved");
@@ -2225,6 +2286,7 @@ public:
         }
         // Rows now in hash table, rhs arrays no longer needed
         _rows.kill();
+        marker.reset();
     }
 };
 

+ 1 - 1
thorlcr/activities/loop/thloop.cpp

@@ -164,7 +164,7 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
         // similar to sync, but continiously listens for messages from slaves
         // slave only sends if above threashold, or if was at threshold and non empty
         // this routine is here to spot when all are whirling around processing nothing for > threshold
-        Owned<IBitSet> emptyIterations = createBitSet();
+        Owned<IBitSet> emptyIterations = createThreadSafeBitSet();
         unsigned loopEnds = 0;
         unsigned nodes = container.queryJob().querySlaves();
         unsigned n = nodes;

+ 2 - 2
thorlcr/graph/thgraph.cpp

@@ -371,11 +371,11 @@ CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml)
         throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
     alreadyUpdated = false;
     whichBranch = (unsigned)-1;
-    whichBranchBitSet.setown(createBitSet());
+    whichBranchBitSet.setown(createThreadSafeBitSet());
     newWhichBranch = false;
     isEof = false;
     log = true;
-    sentActInitData.setown(createBitSet());
+    sentActInitData.setown(createThreadSafeBitSet());
 }
 
 CGraphElementBase::~CGraphElementBase()

+ 3 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -334,7 +334,7 @@ void CSlaveMessageHandler::main()
 
 CMasterActivity::CMasterActivity(CGraphElementBase *_container) : CActivityBase(_container), threaded("CMasterActivity", this)
 {
-    notedWarnings = createBitSet();
+    notedWarnings = createThreadSafeBitSet();
     mpTag = TAG_NULL;
     data = new MemoryBuffer[container.queryJob().querySlaves()];
     asyncStart = false;
@@ -652,7 +652,7 @@ public:
         unsigned s=comm->queryGroup().ordinality()-1;
         bool aborted = false;
         CMessageBuffer msg;
-        Owned<IBitSet> raisedSet = createBitSet();
+        Owned<IBitSet> raisedSet = createThreadSafeBitSet();
         unsigned remaining = timeout;
         while (s--)
         {
@@ -1337,7 +1337,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
     }
     if (sendOnly) return;
     unsigned respondents = 0;
-    Owned<IBitSet> bitSet = createBitSet();
+    Owned<IBitSet> bitSet = createThreadSafeBitSet();
     loop
     {
         rank_t sender;

+ 2 - 2
thorlcr/master/thmastermain.cpp

@@ -137,7 +137,7 @@ public:
 
     CRegistryServer()  : deregistrationWatch(*this), stopped(false)
     {
-        status = createBitSet();
+        status = createThreadSafeBitSet();
         msgDelay = SLAVEREG_VERIFY_DELAY;
         slavesRegistered = 0;
         if (globals->getPropBool("@watchdogEnabled"))
@@ -201,7 +201,7 @@ public:
         unsigned timeWaited = 0;
         unsigned connected = 0;
         unsigned slaves = queryClusterWidth();
-        Owned<IBitSet> connectedSet = createBitSet();
+        Owned<IBitSet> connectedSet = createThreadSafeBitSet();
         loop
         {
             CTimeMon tm(msgDelay);

+ 1 - 1
thorlcr/thorutil/thbuf.cpp

@@ -263,7 +263,7 @@ public:
         numblocks = 0;
         insz = 0;
         eoi = false;
-        diskfree.setown(createBitSet()); 
+        diskfree.setown(createThreadSafeBitSet()); 
 
 #ifdef _FULL_TRACE
         ActPrintLog(activity, "SmartBuffer create %x",(unsigned)(memsize_t)this);

+ 1 - 1
thorlcr/thorutil/thorport.cpp

@@ -43,7 +43,7 @@ static IBitSet *portmap;
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
     portallocsection = new CriticalSection;
-    portmap = createBitSet();
+    portmap = createThreadSafeBitSet();
     portmap->set(MPPORT, true);
     portmap->set(WATCHDOGPORT, true);
     return true;