Browse Source

Merge pull request #5579 from richardkchapman/roxie-many-lookup

HPCC-10579 Improve many lookup join implementation in roxie

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 years ago
parent
commit
30cb902677

+ 1 - 2
ecl/hqlcpp/hqlhtcpp.cpp

@@ -11870,8 +11870,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityJoinOrDenormalize(BuildCtx & c
         //Lookup join doesn't need the left sort (unless it is reused elsewhere), or the right sort unless it is deduping.
         if (canReuseLeftCompare || !isLookupJoin)
             generateSortCompare(instance->nestedctx, instance->classctx, no_left, lhsDsRef, joinInfo.queryLeftSort(), true, noSortAttr, false, isLightweight, isLocalSort);
-        if (!(isLookupJoin && isManyLookup && !couldBeKeepOne && !targetThor()))            // many lookup doesn't need to dedup the rhs
-            generateSortCompare(instance->nestedctx, instance->classctx, no_right, rhsDsRef, joinInfo.queryRightSort(), isLocalSort, noSortAttr, canReuseLeftCompare, isLightweight, isLocalSort);
+        generateSortCompare(instance->nestedctx, instance->classctx, no_right, rhsDsRef, joinInfo.queryRightSort(), isLocalSort, noSortAttr, canReuseLeftCompare, isLightweight, isLocalSort);
 
         bool isGlobal = !isLocalJoin && !instance->isChildActivity();
         generateSerializeKey(instance->nestedctx, no_left, lhsDsRef, joinInfo.queryLeftSort(), isGlobal, false, false);

+ 1 - 1
roxie/ccd/ccdquery.cpp

@@ -485,7 +485,7 @@ protected:
         case TAKsmartjoin:
         case TAKsmartdenormalize:
         case TAKsmartdenormalizegroup:
-            return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKmerge:
             return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKnormalize:

+ 252 - 52
roxie/ccd/ccdserver.cpp

@@ -17180,19 +17180,36 @@ private:
     class LookupTable : public CInterface
     {
     public:
-        LookupTable(unsigned _size, ICompare * _leftRightCompare, ICompare * _rightCompare, IHash * _leftHash, IHash * _rightHash, bool _dedupOnAdd)
-        : leftRightCompare(_leftRightCompare), rightCompare(_rightCompare), leftHash(_leftHash), rightHash(_rightHash), dedupOnAdd(_dedupOnAdd)
-        {
-            unsigned minsize = (4*_size)/3;
-            size = 2;
-            while((minsize >>= 1) > 0)
-                size <<= 1;
-            mask = size - 1;
-            table = (const void * *)calloc(size, sizeof(void *));
-            findex = fstart = BadIndex;
+        LookupTable(IHThorHashJoinArg &helper)
+        : leftRightCompare(helper.queryCompareLeftRight()), rightCompare(helper.queryCompareRight()),
+          leftHash(helper.queryHashLeft()), rightHash(helper.queryHashRight())
+        {
+            size = 0;
         }
+        virtual const void *find(const void * left) const = 0;
+        virtual const void *findNext(const void * left) const = 0;
 
-        ~LookupTable()
+    protected:
+        ICompare * leftRightCompare;
+        ICompare * rightCompare;
+        IHash * leftHash;
+        IHash * rightHash;
+        unsigned size;
+    };
+
+    class DedupLookupTable : public LookupTable
+    {
+    public:
+        DedupLookupTable(ConstPointerArray &rightRows, IHThorHashJoinArg &helper)
+        : LookupTable(helper)
+        {
+            size = (4*rightRows.length())/3 + 1;
+            table = (const void * *)calloc(size, sizeof(void *)); // This should probably be allocated from roxiemem (and size rounded up to actual available size)
+            ForEachItemIn(idx, rightRows)
+                add(rightRows.item(idx));
+        }
+
+        ~DedupLookupTable()
         {
             unsigned i;
             for(i=0; i<size; i++)
@@ -17200,56 +17217,114 @@ private:
             free(table);
         }
 
+        virtual const void *find(const void * left) const
+        {
+            unsigned index = leftHash->hash(left) % size;
+            unsigned start = index;
+            while (table[index])
+            {
+                if(leftRightCompare->docompare(left, table[index]) == 0)
+                    return table[index];
+                index++;
+                if (index==size)
+                    index = 0;
+                if (index==start)
+                    throw MakeStringException(ROXIE_JOIN_ERROR, "Internal error in lookup join activity (hash table full on lookup)");
+            }
+            return NULL;
+        }
+
+        virtual const void *findNext(const void * left) const
+        {
+            return NULL;
+        }
+
+    protected:
         void add(const void * right)
         {
-            findex = BadIndex;
-            unsigned start = rightHash->hash(right) & mask;
-            unsigned index = start;
-            while(table[index])
+            unsigned index = rightHash->hash(right) % size;
+            unsigned start = index;
+            while (table[index])
             {
-                if(dedupOnAdd && (rightCompare->docompare(table[index], right) == 0))
+                if (rightCompare->docompare(table[index], right) == 0)
                 {
                     ReleaseRoxieRow(right);
                     return;
                 }
                 index++;
-                if(index==size)
+                if (index==size)
                     index = 0;
-                if(index==start)
-                    throwUnexpected(); //table is full, should never happen
+                if (index==start)
+                    throw MakeStringException(ROXIE_JOIN_ERROR, "Internal error in lookup join activity (hash table full on add)");
             }
             table[index] = right;
         }
 
-        const void *find(const void * left) const
+        const void * * table;
+    };
+
+    class FewLookupTable : public LookupTable
+    {
+    public:
+        FewLookupTable(ConstPointerArray &rightRows, IHThorHashJoinArg &helper)
+        : LookupTable(helper)
         {
-            fstart = leftHash->hash(left) & mask;
+            size = (4*rightRows.length())/3 + 1;
+            table = (const void * *)calloc(size, sizeof(void *)); // This should probably be allocated from roxiemem
+            findex = fstart = BadIndex;
+            ForEachItemIn(idx, rightRows)
+                add(rightRows.item(idx));
+        }
+
+        ~FewLookupTable()
+        {
+            unsigned i;
+            for(i=0; i<size; i++)
+                ReleaseRoxieRow(table[i]);
+            free(table);
+        }
+
+        virtual const void *find(const void * left) const
+        {
+            fstart = leftHash->hash(left) % size;
             findex = fstart;
             return doFind(left);
         }
-
-        const void *findNext(const void * left) const
+        virtual const void *findNext(const void * left) const
         {
-            if(findex == BadIndex)
+            if (findex == BadIndex)
                 return NULL;
             advance();
             return doFind(left);
         }
-
+    protected:
+        void add(const void * right)
+        {
+            unsigned start = rightHash->hash(right) % size;
+            unsigned index = start;
+            while (table[index])
+            {
+                index++;
+                if (index==size)
+                    index = 0;
+                if (index==start)
+                    throwUnexpected(); //table is full, should never happen
+            }
+            table[index] = right;
+        }
         void advance() const
         {
             findex++;
             if(findex==size)
                 findex = 0;
             if(findex==fstart)
-                throw MakeStringException(ROXIE_JOIN_ERROR, "Internal error hthor lookup join activity (hash table full on lookup)");
+                throw MakeStringException(ROXIE_JOIN_ERROR, "Internal error in lookup join activity (hash table full on lookup)");
         }
-
         const void *doFind(const void * left) const
         {
             while(table[findex])
             {
-                if(leftRightCompare->docompare(left, table[findex]) == 0)
+                if (leftRightCompare->docompare(left, table[findex]) == 0)
                     return table[findex];
                 advance();
             }
@@ -17257,26 +17332,119 @@ private:
             return NULL;
         }
 
-    private:
-        ICompare * leftRightCompare;
-        ICompare * rightCompare;
-        IHash * leftHash;
-        IHash * rightHash;
-        unsigned size;
-        unsigned mask;
         const void * * table;
-        bool dedupOnAdd;
         unsigned mutable fstart;
         unsigned mutable findex;
         static unsigned const BadIndex;
     };
 
+    class ManyLookupTable : public LookupTable
+    {
+    public:
+        ManyLookupTable(ConstPointerArray &rightRows, IHThorHashJoinArg &helper)
+        : LookupTable(helper)
+        {
+            rightRows.swapWith(rowtable);
+            UInt64Array groups;
+            unsigned numRows = rowtable.length();
+            if (numRows)
+            {
+                unsigned groupStart = 0;
+                const void *groupStartRow = rowtable.item(0);
+                for (unsigned i=1; i < numRows; i++)
+                {
+                    const void *thisRow = rowtable.item(i);
+                    if (rightCompare->docompare(groupStartRow, thisRow))
+                    {
+                        groups.append(makeint64(groupStart, i-groupStart));
+                        groupStart = i;
+                        groupStartRow = thisRow;
+                    }
+                }
+                groups.append(makeint64(groupStart, numRows-groupStart));
+            }
+            size = (4*groups.length())/3 + 1;
+            table = (__uint64 *) calloc(size, sizeof(__uint64)); // This should probably be allocated from roxiemem
+            ForEachItemIn(idx, groups)
+            {
+                unsigned __int64 group = groups.item(idx);
+                unsigned groupstart = high(group);
+                const void *row = rowtable.item(groupstart);
+                add(row, group);
+            }
+        }
+
+        ~ManyLookupTable()
+        {
+            ForEachItemIn(idx, rowtable)
+            {
+                ReleaseRoxieRow(rowtable.item(idx));
+            }
+            free(table);
+        }
+
+        void add(const void *row, unsigned __int64 group)
+        {
+            unsigned start = rightHash->hash(row) % size;
+            unsigned index = start;
+            while (table[index])
+            {
+                index++;
+                if (index==size)
+                    index = 0;
+                if (index==start)
+                    throwUnexpected(); //table is full, should never happen
+            }
+            table[index] = group;
+        }
+
+        virtual const void *find(const void * left) const
+        {
+            unsigned index = leftHash->hash(left) % size;
+            unsigned start = index;
+            while (table[index])
+            {
+                __uint64 group = table[index];
+                currentMatch = high(group);
+                const void *right = rowtable.item(currentMatch);
+                if (leftRightCompare->docompare(left, right) == 0)
+                {
+                    currentMatch++;
+                    matchCount = low(group) - 1;
+                    return right;
+                }
+                index++;
+                if (index==size)
+                    index = 0;
+                if (index==start)
+                    throw MakeStringException(ROXIE_JOIN_ERROR, "Internal error in lookup join activity (hash table full on lookup)");
+            }
+            matchCount = 0;
+            return NULL;
+        }
+
+        virtual const void *findNext(const void * left) const
+        {
+            if (!matchCount)
+                return NULL;
+            matchCount--;
+            return rowtable.item(currentMatch++);
+        }
+
+    protected:
+        __uint64 *table;
+        ConstPointerArray rowtable;
+        mutable unsigned currentMatch;
+        mutable unsigned matchCount;
+    };
+
     IHThorHashJoinArg &helper;
     bool leftOuterJoin;
     bool exclude;
     bool eog;
     bool many;
     bool dedupRHS;
+    bool useFewTable;
     bool matchedGroup;
     const void *left;
     OwnedConstRoxieRow defaultRight;
@@ -17313,8 +17481,8 @@ private:
     }
 
 public:
-    CRoxieServerLookupJoinActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerTwoInputActivity(_factory, _probeManager), helper((IHThorHashJoinArg &)basehelper)
+    CRoxieServerLookupJoinActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _useFewTable)
+        : CRoxieServerTwoInputActivity(_factory, _probeManager), helper((IHThorHashJoinArg &)basehelper), useFewTable(_useFewTable)
     {
         unsigned joinFlags = helper.getJoinFlags();
         leftOuterJoin = (joinFlags & JFleftouter) != 0;
@@ -17342,7 +17510,6 @@ public:
     void loadRight()
     {
         ConstPointerArray rightset;
-        unsigned i = 0;
         try
         {
             const void * next;
@@ -17355,17 +17522,47 @@ public:
                     break;
                 rightset.append(next);
             }
-            unsigned rightord = rightset.ordinality();
-            table.setown(new LookupTable(rightord, helper.queryCompareLeftRight(), helper.queryCompareRight(), helper.queryHashLeft(), helper.queryHashRight(), dedupRHS));
-
-            for(i=0; i<rightord; i++)
-                table->add(rightset.item(i));
+            if (!dedupRHS)
+            {
+                if (useFewTable)
+                {
+                    table.setown(new FewLookupTable(rightset, helper));  // NOTE - takes ownership of rightset
+                }
+                else
+                {
+                    if (!helper.isRightAlreadySorted())
+                    {
+                        if (helper.getJoinFlags() & JFunstable)
+                        {
+                            qsortvec(const_cast<void * *>(rightset.getArray()), rightset.ordinality(), *helper.queryCompareRight());
+                        }
+                        else
+                        {
+                            unsigned rightord = rightset.ordinality();
+                            MemoryAttr tempAttr(rightord*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
+                            void **temp = (void **) tempAttr.bufferBase();
+                            void **_rows = const_cast<void * *>(rightset.getArray());
+                            memcpy(temp, _rows, rightord*sizeof(void **));
+                            qsortvecstable(temp, rightord, *helper.queryCompareRight(), (void ***)_rows);
+                            for (int i = 0; i < rightord; i++)
+                            {
+                                *_rows = **((void ***)_rows);
+                                _rows++;
+                            }
+                        }
+                    }
+                    table.setown(new ManyLookupTable(rightset, helper));  // NOTE - takes ownership of rightset
+                }
+            }
+            else
+            {
+                table.setown(new DedupLookupTable(rightset, helper)); // NOTE - takes ownership of rightset
+            }
         }
         catch (...)
         {
-            unsigned rightord = rightset.ordinality();
-            for ( ; i<rightord; i++)
-                ReleaseRoxieRow(rightset.item(i));
+            ForEachItemIn(idx, rightset)
+                ReleaseRoxieRow(rightset.item(idx));
             throw;
         }
     };
@@ -17739,28 +17936,31 @@ private:
     
 };
 
-unsigned const CRoxieServerLookupJoinActivity::LookupTable::BadIndex(static_cast<unsigned>(-1));
+unsigned const CRoxieServerLookupJoinActivity::FewLookupTable::BadIndex(static_cast<unsigned>(-1));
 
 class CRoxieServerLookupJoinActivityFactory : public CRoxieServerJoinActivityFactory
 {
 public:
-    CRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+    CRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
         : CRoxieServerJoinActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
     {
         Owned<IHThorHashJoinArg> helper = (IHThorHashJoinArg *) helperFactory();
+        useFewTable = _graphNode.getPropBool("hint[@name='usefewtable']/@value", false);
         if((helper->getJoinFlags() & (JFfirst | JFfirstleft | JFfirstright | JFslidingmatch)) != 0)
             throw MakeStringException(ROXIE_INVALID_FLAGS, "Invalid flags for lookup join activity"); // code generator should never create such an activity
     }
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerLookupJoinActivity(this, _probeManager);
+        return new CRoxieServerLookupJoinActivity(this, _probeManager, useFewTable);
     }
+protected:
+    bool useFewTable;
 };
 
-IRoxieServerActivityFactory *createRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+IRoxieServerActivityFactory *createRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
 {
-    return new CRoxieServerLookupJoinActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+    return new CRoxieServerLookupJoinActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
 }
 
 //=====================================================================================================

+ 1 - 1
roxie/ccd/ccdserver.hpp

@@ -379,7 +379,7 @@ extern IRoxieServerActivityFactory *createRoxieServerGroupActivityFactory(unsign
 extern IRoxieServerActivityFactory *createRoxieServerFirstNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSelectNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSelfJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerAllJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerTopNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);

+ 38 - 0
testing/ecl/key/manylookup.xml

@@ -0,0 +1,38 @@
+<Dataset name='Result 1'>
+ <Row><key>1</key></Row>
+ <Row><key>1</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>3</key></Row>
+ <Row><key>3</key></Row>
+ <Row><key>4</key></Row>
+ <Row><key>4</key></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><key>1</key></Row>
+ <Row><key>1</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>3</key></Row>
+ <Row><key>3</key></Row>
+ <Row><key>4</key></Row>
+ <Row><key>4</key></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><key>1</key></Row>
+ <Row><key>2</key></Row>
+ <Row><key>3</key></Row>
+ <Row><key>4</key></Row>
+</Dataset>

+ 17 - 0
testing/ecl/manylookup.ecl

@@ -0,0 +1,17 @@
+rec := RECORD
+ unsigned key;
+END;
+
+lhs := dataset([{1},{2},{3},{4},{5}], rec);
+rhs := dataset([
+  {1},{2},{2},{2},{2},{3},{4},
+  {1},{2},{2},{2},{2},{3},{4}
+  ], rec);
+
+j1 := JOIN(lhs, rhs, LEFT.key=RIGHT.key, TRANSFORM(rec, SELF:=LEFT), MANY LOOKUP);
+j2 := JOIN(lhs, rhs, LEFT.key=RIGHT.key, TRANSFORM(rec, SELF:=LEFT), MANY LOOKUP, HINT(usefewtable));
+j3 := JOIN(lhs, rhs, LEFT.key=RIGHT.key, TRANSFORM(rec, SELF:=LEFT), LOOKUP);
+
+output(NOFOLD(j1));
+output(NOFOLD(j2));
+output(NOFOLD(j3));