Forráskód Böngészése

Merge pull request #9023 from shamser/issue15926

HPCC-15926 Add BEST attribute to DEDUP

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 éve
szülő
commit
14a69ce4ee

+ 8 - 1
ecl/hql/hqlgram.y

@@ -7993,7 +7993,7 @@ simpleDataSet
                             parser->expandWholeAndExcept(ds, $4);
 
                             IHqlExpression *flags = $4.getExpr();
-                            //checkDedup(ds, flags, $3);
+                            //checkDedup(ds, flags, $4);
                             OwnedHqlExpr dedup = createDataset(no_dedup, ds, createComma(flags, $7.getExpr()));
 
                             parser->checkDistribution($3, dedup, false);
@@ -11714,6 +11714,13 @@ dedupFlag
                             $$.setExpr(row.getClear(), $1);
                         }
     | dataSet
+    | BEST '(' startSortOrder beginList sortListOptCurleys ')' endSortOrder
+                        {
+                            HqlExprArray sortItems;
+                            parser->endList(sortItems);
+
+                            $$.setExpr(createExprAttribute(bestAtom, createSortList(sortItems)));
+                        }
     ;
 
 rollupExtra

+ 3 - 0
ecl/hql/hqlutil.cpp

@@ -1378,6 +1378,7 @@ DedupInfoExtractor::DedupInfoExtractor(IHqlExpression * expr)
     compareAllFields = false;
     isLocal = false;
     keepLeft = true;
+    keepBest = false;
     numToKeep.setown(createConstantOne());
 
     unsigned max = expr->numChildren();
@@ -1404,6 +1405,8 @@ DedupInfoExtractor::DedupInfoExtractor(IHqlExpression * expr)
                     keepLeft = true;
                 else if (name == rightAtom)
                     keepLeft = false;
+                else if (name == bestAtom)
+                    keepBest = true;
             }
             break;
         case no_negate:

+ 1 - 0
ecl/hql/hqlutil.hpp

@@ -270,6 +270,7 @@ public:
     bool compareAllRows;
     bool compareAllFields;
     bool isLocal;
+    bool keepBest;
     OwnedHqlExpr numToKeep;
     HqlExprArray conds;
     HqlExprArray equalities;

+ 2 - 0
ecl/hqlcpp/hqlcerrors.hpp

@@ -223,6 +223,7 @@
 #define HQLERR_ServiceDefinitionNotAllowed      4203
 #define HQLERR_BodyNotAllowedWithInline         4204
 #define HQLERR_DatasetPassedToRowArg            4205
+#define HQLERR_DedupBestWithKeepn               4206
 
 //Warnings....
 #define HQLWRN_PersistDataNotLikely             4500
@@ -522,6 +523,7 @@
 #define HQLERR_ServiceDefinitionNotAllowed_Text "Insufficient access rights to use SERVICE"
 #define HQLERR_BodyNotAllowedWithInline_Text    "#body not supported with INLINE attribute"
 #define HQLERR_DatasetPassedToRowArg_Text       "Cannot pass a dataset to row argument %s"
+#define HQLERR_DedupBestWithKeepn_Text          "DEDUP with BEST does not support KEEP"
 
 //Warnings.
 #define HQLWRN_CannotRecreateDistribution_Text  "Cannot recreate the distribution for a persistent dataset"

+ 46 - 32
ecl/hqlcpp/hqlhtcpp.cpp

@@ -14112,14 +14112,15 @@ ABoundActivity * HqlCppTranslator::doBuildActivityDedup(BuildCtx & ctx, IHqlExpr
 
     buildInstancePrefix(instance);
 
+    bool wholeRecord = false;
     if (!useHash)
     {
-        if (info.compareAllRows)
-            doBuildBoolFunction(instance->classctx, "compareAll", info.compareAllRows);
-        if (!info.keepLeft)
-            doBuildBoolFunction(instance->classctx, "keepLeft", info.keepLeft);
         if (!matchesConstantValue(info.numToKeep, 1))
+        {
             doBuildUnsignedFunction(instance->startctx, "numToKeep", info.numToKeep);
+            if (info.keepBest)
+                throwError(HQLERR_DedupBestWithKeepn);
+        }
 
         //MORE: If input is grouped (pretty likely), then no need to include fields in the filter function that are already included.
         if (instance->isGrouped)
@@ -14169,6 +14170,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityDedup(BuildCtx & ctx, IHqlExpr
         buildCompareMember(instance->nestedctx, "Compare", order, DatasetReference(dataset));
         buildHashOfExprsClass(instance->nestedctx, "Hash", order, DatasetReference(dataset), true);
 
+        bool reuseCompare = false;
         HqlExprArray fields, selects;
         ForEachItemIn(idx, info.equalities)
         {
@@ -14194,43 +14196,55 @@ ABoundActivity * HqlCppTranslator::doBuildActivityDedup(BuildCtx & ctx, IHqlExpr
         //virtual unsigned recordToKey(void * _key, const void * _record)
         buildDedupSerializeFunction(instance->startctx, "recordToKey", dataset, keyDataset, info.equalities, selects, selSeq);
 
-        //virtual ICompare * queryKeyCompare()
-        bool reuseCompare = false;
-        OwnedHqlExpr keyOrder = createValueSafe(no_sortlist, makeSortListType(NULL), selects);
-        if (recordTypesMatch(dataset, keyDataset))
+        // Helper function relating to selecting a record from a set of "duplicate" records
+        if (info.keepBest)
         {
-            OwnedHqlExpr globalOrder = replaceSelector(order, dataset, queryActiveTableSelector());
-            if (keyOrder == globalOrder)
-                reuseCompare = true;
+            // KeepBest stores entire record (not just the key field) so the KeyCompare and queryRowKeyCompare is the same as Compare.
+            reuseCompare = true;
         }
-
-        if (!reuseCompare)
-            buildCompareMember(instance->nestedctx, "KeyCompare", keyOrder, DatasetReference(keyDataset, no_activetable, NULL));
         else
-            instance->nestedctx.addQuotedLiteral("virtual ICompare * queryKeyCompare() { return &Compare; }");
-
-        //virtual unsigned getFlags() = 0;
         {
-            StringBuffer flags;
-            if (recordTypesMatch(dataset, keyDataset)) flags.append("|HFDwholerecord");
-            if (flags.length())
-                doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1);
+            //virtual ICompare * queryKeyCompare()
+            OwnedHqlExpr keyOrder = createValueSafe(no_sortlist, makeSortListType(NULL), selects);
+            if (recordTypesMatch(dataset, keyDataset))
+            {
+                OwnedHqlExpr globalOrder = replaceSelector(order, dataset, queryActiveTableSelector());
+                if (keyOrder == globalOrder)
+                    reuseCompare = true;
+            }
+            if (!reuseCompare)
+            {
+                buildCompareMember(instance->nestedctx, "KeyCompare", keyOrder, DatasetReference(keyDataset, no_activetable, NULL));
+                buildHashOfExprsClass(instance->nestedctx, "KeyHash", keyOrder, DatasetReference(keyDataset, no_activetable, NULL), true);
+                //virtual ICompare * queryRowKeyCompare()=0; // lhs is a row, rhs is a key
+                doCompareLeftRight(instance->nestedctx, "RowKeyCompare", DatasetReference(dataset), DatasetReference(keyDataset, no_activetable, NULL), info.equalities, selects);
+            }
+            wholeRecord = recordTypesMatch(dataset, keyDataset);
         }
-
-        //virtual IHash    * queryKeyHash()=0;
         if (reuseCompare)
-            instance->nestedctx.addQuotedLiteral("virtual IHash * queryKeyHash() { return &Hash; }");
-        else
-            buildHashOfExprsClass(instance->nestedctx, "KeyHash", keyOrder, DatasetReference(keyDataset, no_activetable, NULL), true);
-
-        //virtual ICompare * queryRowKeyCompare()=0; // lhs is a row, rhs is a key
-        if (!reuseCompare)
         {
-            doCompareLeftRight(instance->nestedctx, "RowKeyCompare", DatasetReference(dataset), DatasetReference(keyDataset, no_activetable, NULL), info.equalities, selects);
-        }
-        else
+            instance->nestedctx.addQuotedLiteral("virtual ICompare * queryKeyCompare() { return &Compare; }");
+            instance->nestedctx.addQuotedLiteral("virtual IHash * queryKeyHash() { return &Hash; }");
             instance->nestedctx.addQuotedLiteral("virtual ICompare * queryRowKeyCompare() { return &Compare; }");
+        }
+    }
 
+    StringBuffer flags;
+    if (info.compareAllRows) flags.append("|HDFcompareall");
+    if (info.keepLeft)       flags.append("|HDFkeepleft");
+    if (info.keepBest)       flags.append("|HDFkeepbest");
+    if (wholeRecord)         flags.append("|HDFwholerecord");
+    if (!streq(flags.str(), "|HDFkeepleft"))
+    {
+        if (flags.length()==0) flags.append("|0");
+        doBuildUnsignedFunction(instance->startctx, "getFlags", flags.str()+1);
+    }
+
+    if (info.keepBest)
+    {
+        IHqlExpression * sortOrder = expr->queryAttribute(bestAtom)->queryChild(0);
+        buildCompareClass(instance->startctx, "bestCompare", sortOrder, DatasetReference(dataset));
+        instance->startctx.addQuotedLiteral("virtual ICompare * queryCompareBest() { return &bestCompare; }");
     }
 
     buildInstanceSuffix(instance);

+ 74 - 14
ecl/hthor/hthor.cpp

@@ -2441,7 +2441,7 @@ void CHThorGroupDedupKeepLeftActivity::resetEOF()
 
 //=====================================================================================================
 
-CHThorGroupDedupKeepRightActivity::CHThorGroupDedupKeepRightActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDedupArg &_arg, ThorActivityKind _kind) : CHThorGroupDedupActivity(_agent, _activityId, _subgraphId, _arg, _kind)
+CHThorGroupDedupKeepRightActivity::CHThorGroupDedupKeepRightActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDedupArg &_arg, ThorActivityKind _kind) : CHThorGroupDedupActivity(_agent, _activityId, _subgraphId, _arg, _kind), compareBest(nullptr)
 {
 }
 
@@ -2450,6 +2450,8 @@ void CHThorGroupDedupKeepRightActivity::ready()
     CHThorGroupDedupActivity::ready();
     assertex(numToKeep==1);
     firstDone = false;
+    if (helper.keepBest())
+        compareBest = helper.queryCompareBest();
 }
 
 void CHThorGroupDedupKeepRightActivity::stop()
@@ -2476,13 +2478,21 @@ const void *CHThorGroupDedupKeepRightActivity::nextRow()
             break;
         }
 
-        if (numKept < numToKeep-1)
+        if (compareBest)
         {
-            numKept++;
-            break;
+            if (compareBest->docompare(kept,next) > 0)
+                kept.setown(next.getClear());
         }
+        else
+        {
+            if (numKept < numToKeep-1)
+            {
+                numKept++;
+                break;
+            }
 
-        kept.setown(next.getClear());
+            kept.setown(next.getClear());
+        }
     }
 
     const void * ret = kept.getClear();
@@ -2628,9 +2638,28 @@ bool HashDedupTable::insert(const void * row)
     addNew(new HashDedupElement(hash, keyRow.getClear()), hash);
     return true;
 }
+bool HashDedupTable::insertBest(const void * nextrow)
+{
+    unsigned hash = helper.queryHash()->hash(nextrow);
+    const void *et = find(hash, nextrow);
+    if (et)
+    {
+        const HashDedupElement *element = reinterpret_cast<const HashDedupElement *>(et);
+        const void * row = element->queryRow();
+        if (queryBestCompare->docompare(row,nextrow) <= 0)
+            return false;
+        removeExact( const_cast<void *>(et));
+        // drop-through to add new row
+    }
+    LinkRoxieRow(nextrow);
+    addNew(new HashDedupElement(hash, nextrow), hash);
+    return true;
+}
 
-CHThorHashDedupActivity::CHThorHashDedupActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorHashDedupArg & _arg, ThorActivityKind _kind) : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), table(_arg, activityId)
+CHThorHashDedupActivity::CHThorHashDedupActivity(IAgentContext & _agent, unsigned _activityId, unsigned _subgraphId, IHThorHashDedupArg & _arg, ThorActivityKind _kind)
+: CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg), table(_arg, activityId), hashTableFilled(false), hashDedupTableIter(table)
 {
+    keepBest = helper.keepBest();
 }
 
 void CHThorHashDedupActivity::ready()
@@ -2647,16 +2676,47 @@ void CHThorHashDedupActivity::stop()
 
 const void * CHThorHashDedupActivity::nextRow()
 {
-    while(true)
+    if (keepBest)
     {
-        OwnedConstRoxieRow next(input->nextRow());
-        if(!next)
+        // Populate hash table with best rows
+        if (!hashTableFilled)
         {
-            table.kill();
-            return NULL;
+            OwnedConstRoxieRow next(input->nextRow());
+            while(next)
+            {
+                table.insertBest(next);
+                next.setown(input->nextRow());
+            }
+            hashTableFilled = true;
+            hashDedupTableIter.first();
+        }
+
+        // Iterate through hash table returning rows
+        if (hashDedupTableIter.isValid())
+        {
+            HashDedupElement &el = hashDedupTableIter.query();
+
+            OwnedConstRoxieRow row(el.getRow());
+            hashDedupTableIter.next();
+            return row.getClear();
+        }
+        table.kill();
+        hashTableFilled = false;
+        return NULL;
+    }
+    else
+    {
+        while(true)
+        {
+            OwnedConstRoxieRow next(input->nextRow());
+            if(!next)
+            {
+                table.kill();
+                return NULL;
+            }
+            if(table.insert(next))
+                return next.getClear();
         }
-        if(table.insert(next))
-            return next.getClear();
     }
 }
 
@@ -10131,7 +10191,7 @@ extern HTHOR_API IHThorActivity * createGroupDedupActivity(IAgentContext & _agen
 {
     if(arg.compareAll())
         return new CHThorGroupDedupAllActivity(_agent, _activityId, _subgraphId, arg, kind);
-    else if (arg.keepLeft())
+    else if (arg.keepLeft() && !arg.keepBest())
         return new CHThorGroupDedupKeepLeftActivity(_agent, _activityId, _subgraphId, arg, kind);
     else
         return new CHThorGroupDedupKeepRightActivity(_agent, _activityId, _subgraphId, arg, kind);

+ 15 - 1
ecl/hthor/hthor.ipp

@@ -498,6 +498,7 @@ public:
 private:
     OwnedConstRoxieRow kept;
     bool         firstDone;
+    ICompare * compareBest;
 };
 
 class CHThorGroupDedupAllActivity : public CHThorSimpleActivityBase
@@ -534,7 +535,14 @@ public:
     }
     ~HashDedupElement()                 { ReleaseRoxieRow(keyRow); }
     inline unsigned queryHash() const   { return hash; }
-    inline const void *queryRow() const { return keyRow; }
+    inline const void * queryRow() const { return keyRow; }
+    inline const void * getRow()
+    {
+        const void * row = keyRow;
+        keyRow = nullptr;
+        hash = 0;
+        return row;
+    }
 private:
     unsigned hash;
     const void *keyRow;
@@ -547,6 +555,7 @@ public:
         : helper(_helper), 
           activityId(_activityId)
     {
+        queryBestCompare = helper.queryCompareBest();
     }
     virtual ~HashDedupTable()
     { 
@@ -578,11 +587,13 @@ public:
     inline void setRowAllocator(IEngineRowAllocator * _keyRowAllocator) { keyRowAllocator.setown(_keyRowAllocator); }
 
     bool insert(const void * row);
+    bool insertBest(const void * row);
 
 private:
     IHThorHashDedupArg & helper;
     unsigned activityId;
     Owned<IEngineRowAllocator> keyRowAllocator;
+    ICompare * queryBestCompare;
 };
 
 class CHThorHashDedupActivity : public CHThorSimpleActivityBase
@@ -597,6 +608,9 @@ public:
 private:
     IHThorHashDedupArg & helper;
     HashDedupTable table;
+    bool keepBest;
+    bool hashTableFilled;
+    SuperHashIteratorOf<HashDedupElement> hashDedupTableIter;
 };
 
 class CHThorNormalizeActivity : public CHThorSimpleActivityBase

+ 91 - 17
roxie/ccd/ccdserver.cpp

@@ -7212,20 +7212,23 @@ class CRoxieServerDedupKeepRightActivity : public CRoxieServerDedupActivity
 {
     const void *kept;
     bool first;
+    ICompare * compareBest;
 
 public:
 
     CRoxieServerDedupKeepRightActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerDedupActivity(_ctx, _factory, _probeManager)
+        : CRoxieServerDedupActivity(_ctx, _factory, _probeManager), compareBest(nullptr)
     {
-        kept = NULL;
+        kept = nullptr;
         first = true;
+        if (helper.keepBest())
+            compareBest = helper.queryCompareBest();
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
         first = true;
-        kept = NULL;
+        kept = nullptr;
         CRoxieServerDedupActivity::start(parentExtractSize, parentExtract, paused);
     }
 
@@ -7253,7 +7256,10 @@ public:
                 break;
             }
 
-            if (numKept < numToKeep-1)
+            if (compareBest && compareBest->docompare(kept,next) <= 0)
+                continue;
+
+            if (numKept < numToKeep - 1)
             {
                 numKept++;
                 break;
@@ -7456,6 +7462,8 @@ class CRoxieServerDedupActivityFactory : public CRoxieServerActivityFactory
 {
     bool compareAll;
     bool keepLeft;
+    bool keepBest;
+    unsigned flags;
 
 public:
     CRoxieServerDedupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
@@ -7464,13 +7472,14 @@ public:
         Owned<IHThorDedupArg> helper = (IHThorDedupArg *) helperFactory();
         compareAll = helper->compareAll();
         keepLeft = helper->keepLeft();
+        keepBest = helper->keepBest();
     }
 
     virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
     {
         if (compareAll)
             return new CRoxieServerDedupAllActivity(_ctx, this, _probeManager, keepLeft);
-        else if (keepLeft)
+        else if (keepLeft && !keepBest)
             return new CRoxieServerDedupKeepLeftActivity(_ctx, this, _probeManager);
         else
             return new CRoxieServerDedupKeepRightActivity(_ctx, this, _probeManager);
@@ -7508,6 +7517,12 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
         {
             return keyRow;
         }
+        inline const void *getRowClear()
+        {
+            const void * row = keyRow;
+            keyRow = nullptr;
+            return row;
+        }
     private:
         unsigned hash;
         const void *keyRow;
@@ -7521,6 +7536,7 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
               activity(_activity),
               keySize(helper.queryKeySize())
         {
+            bestCompare=helper.queryCompareBest();
         }
         virtual ~HashDedupTable() { _releaseAll(); }
 
@@ -7568,18 +7584,36 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
             return true;
         }
 
+        bool insertBest(const void * nextrow)
+        {
+            unsigned hash = helper.queryHash()->hash(nextrow);
+            const void *et = find(hash, nextrow);
+            if (et)
+            {
+                const HashDedupElement *element = reinterpret_cast<const HashDedupElement *>(et);
+                const void * row = element->queryRow();
+                if (bestCompare->docompare(row,nextrow) <= 0)
+                    return false;
+                removeExact( const_cast<void *>(et));
+                // drop-through to add new row
+            }
+            addNew(new HashDedupElement(hash, nextrow), hash);
+            return true;
+        }
     private:
         IHThorHashDedupArg & helper;
         CachedOutputMetaData keySize;
         Owned<IEngineRowAllocator> keyRowAllocator;
         CRoxieServerHashDedupActivity & activity;
+        ICompare * bestCompare;
     } table;
 
 public:
     CRoxieServerHashDedupActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerActivity(_ctx, _factory, _probeManager), helper((IHThorHashDedupArg &)basehelper), table(helper, *this)
+        : CRoxieServerActivity(_ctx, _factory, _probeManager), helper((IHThorHashDedupArg &)basehelper), table(helper, *this), hashTableFilled(false), hashDedupTableIter(table)
     {
         eof = false;
+        keepBest = helper.keepBest();
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
@@ -7599,30 +7633,70 @@ public:
         table.reset();
         eof = false;
         CRoxieServerActivity::reset();
+        hashTableFilled = false;
     }
 
     virtual const void *nextRow()
     {
         ActivityTimer t(totalCycles, timeActivities);
-        while(!eof)
+        if (keepBest)
         {
-            const void * next = inputStream->nextRow();
-            if(!next)
+            if (eof)
+                return NULL;
+            // Populate hash table with best rows
+            if (!hashTableFilled)
             {
+                const void * next = inputStream->nextRow();
+                while(next)
+                {
+                    if (!table.insertBest(next))
+                        ReleaseRoxieRow(next);
+                    next = inputStream->nextRow();
+                }
                 if (table.count() == 0)
                     eof = true;
-                table.reset();
-                return NULL;
+                hashTableFilled = true;
+                hashDedupTableIter.first();
             }
 
-            if(table.insert(next))
-                return next;
-            else
-                ReleaseRoxieRow(next);
+            // Iterate throw hash table returning rows
+            if (hashDedupTableIter.isValid())
+            {
+                HashDedupElement &el = hashDedupTableIter.query();
+
+                const void * row = el.getRowClear();
+                hashDedupTableIter.next();
+                return row;
+            }
+            table.reset();
+            hashTableFilled = false;
+            return NULL;
         }
-        return NULL;
-    }
+        else
+        {
+            while(!eof)
+            {
+                const void * next = inputStream->nextRow();
+                if(!next)
+                {
+                    if (table.count() == 0)
+                        eof = true;
+                    table.reset();
+                    return NULL;
+                }
 
+                if(table.insert(next))
+                    return next;
+                else
+                    ReleaseRoxieRow(next);
+            }
+            return NULL;
+        }
+    }
+private:
+    bool keepBest;
+    bool hashTableFilled;
+    SuperHashIteratorOf<HashDedupElement> hashDedupTableIter;
 };
 
 class CRoxieServerHashDedupActivityFactory : public CRoxieServerActivityFactory

+ 19 - 9
rtl/include/eclhelper.hpp

@@ -39,8 +39,8 @@ if the supplied pointer was not from the roxiemem heap. Usually an OwnedRoxieStr
 
 //Should be incremented whenever the virtuals in the context or a helper are changed, so
 //that a work unit can't be rerun.  Try as hard as possible to retain compatibility.
-#define ACTIVITY_INTERFACE_VERSION      161
-#define MIN_ACTIVITY_INTERFACE_VERSION  161             //minimum value that is compatible with current interface - without using selectInterface
+#define ACTIVITY_INTERFACE_VERSION      162
+#define MIN_ACTIVITY_INTERFACE_VERSION  162             //minimum value that is compatible with current interface - without using selectInterface
 
 typedef unsigned char byte;
 
@@ -1489,13 +1489,24 @@ struct IHThorRollupArg : public IHThorArg
     virtual size32_t transform(ARowBuilder & rowBuilder, const void * _left, const void * _right) = 0;
 };
 
+enum
+{
+    HDFwholerecord   = 0x0001,
+    HDFcompareall    = 0x0002,
+    HDFkeepleft      = 0x0004,
+    HDFkeepbest      = 0x0008
+};
+
 struct IHThorDedupArg : public IHThorArg
 {
-    virtual bool compareAll() = 0;
-    virtual bool keepLeft() = 0;
+    inline bool compareAll() { return (getFlags() & HDFcompareall) != 0; }
+    inline bool keepLeft() { return (getFlags() & HDFkeepleft) != 0; }
+    inline bool keepBest() { return (getFlags() & HDFkeepbest) != 0; }
     virtual bool matches(const void * _left, const void * _right) = 0;
     virtual unsigned numToKeep() = 0;
     virtual ICompare * queryComparePrimary() = 0;           // used to break global dedup into chunks
+    virtual unsigned getFlags() = 0;
+    virtual ICompare * queryCompareBest() = 0;
 };
 
 enum
@@ -1904,11 +1915,6 @@ struct IHThorHashDistributeArg : public IHThorArg
     virtual ICompare * queryMergeCompare()=0;       // iff TAKhasdistributemerge
 };
 
-enum
-{
-    HFDwholerecord  = 0x0001,
-};
-
 struct IHThorHashDedupArg : public IHThorArg
 {
     virtual ICompare * queryCompare()=0;
@@ -1919,6 +1925,10 @@ struct IHThorHashDedupArg : public IHThorArg
     virtual unsigned getFlags() = 0;
     virtual IHash    * queryKeyHash()=0;
     virtual ICompare * queryRowKeyCompare()=0; // lhs is a row, rhs is a key
+    virtual ICompare * queryCompareBest()=0;
+    inline bool compareAll() { return (getFlags() & HDFcompareall) != 0; }
+    inline bool keepLeft() { return (getFlags() & HDFkeepleft) != 0; }
+    inline bool keepBest() { return (getFlags() & HDFkeepbest) != 0; }
 };
 
 struct IHThorHashMinusArg : public IHThorArg

+ 6 - 4
rtl/include/eclhelper_base.hpp

@@ -1249,12 +1249,11 @@ class CThorDedupArg : implements IHThorDedupArg, public CThorArg
         }
         return NULL;
     }
-
-    virtual bool compareAll() { return false; }
-    virtual bool keepLeft() { return true; }
     virtual bool matches(const void * _left, const void * _right) { return true; }
     virtual unsigned numToKeep() { return 1; }
     virtual ICompare * queryComparePrimary() { return NULL; }
+    virtual unsigned getFlags() { return HDFkeepleft; }
+    virtual ICompare * queryCompareBest() { return NULL; }
 };
 
 class CThorAggregateArg : implements IHThorAggregateArg, public CThorArg
@@ -2115,7 +2114,10 @@ class CThorHashDedupArg : implements IHThorHashDedupArg, public CThorArg
         return NULL;
     }
 
-    virtual unsigned getFlags() { return 0; }
+    virtual unsigned getFlags() { return HDFkeepleft; }
+    virtual ICompare * queryCompareBest() { return NULL; }
+    virtual IOutputMetaData * queryKeySize() { return NULL; }
+    virtual size32_t recordToKey(ARowBuilder & rowBuilder, const void * _record) { return 0; }
 };
 
 class CThorHashMinusArg : implements IHThorHashMinusArg, public CThorArg

+ 82 - 0
testing/regress/ecl/dedupwithbest.ecl

@@ -0,0 +1,82 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2016 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+############################################################################## */
+
+#option ('testHashDedupSpillTimes',10);
+MyRec := RECORD
+    INTEGER3 Id;
+    STRING10 Field1;
+    STRING50 Field2;
+END;
+
+ds := DATASET([{'001','KC','G'},
+               {'002','KC','Z'},
+               {'003','KC','Z'},
+               {'004','KC','C'},
+               {'005','KA','X'},
+               {'006','KB','A'},
+               {'007','KB','G'},
+               {'008','KA','B'},
+               {'009','KC','Z'},
+               {'010','KB','B'},
+               {'011','KB','B'}],MyRec);
+
+sorted_ds := SORT(ds, Field1);
+
+Dedup1 := DEDUP(sorted_ds, Field1, BEST(Field2));
+Dedup2 := DEDUP(sorted_ds, Field1, BEST(-Field2));
+Dedup3 := SORT(DEDUP(ds, Field1, HASH, BEST(Field2)), Field1);
+
+OUTPUT(dedup1, NAMED('BEST_Field2'));
+OUTPUT(dedup2, NAMED('BEST_Field2_Reverse'));
+OUTPUT(dedup3, NAMED('BEST_Field2_HASH'));
+
+// Grouped dedup
+gr1 := GROUP(sorted_ds, Field1);
+gr1_sorted := SORT(gr1, Field2);
+Dedupgr1 := DEDUP(gr1_sorted, field2, BEST(Id));
+Dedupgr2 := DEDUP(gr1_sorted, field2, BEST(-Id));
+OUTPUT(Dedupgr1, NAMED('GroupDedup'));
+OUTPUT(Dedupgr2, NAMED('GroupDedup_Reverse'));
+
+//Grouped HASH dedup
+Dedupgr3 := SORT(DEDUP(gr1_sorted, field2, BEST(Id), HASH), Field2, Id);
+Dedupgr4 := SORT(DEDUP(gr1_sorted, field2, BEST(-Id), HASH), Field2, Id);
+OUTPUT(Dedupgr3, NAMED('GroupDedupHash'));
+OUTPUT(Dedupgr4, NAMED('GroupDedupHash_Reverse'));
+
+//Larger test
+numRecords := 1000000;
+// Generate DS set so that
+// 1) Id fields values are from 1..1000000
+// 2) Field1 fields are from K0..K49999
+// 3) Field2 fields are from 0..19
+createIds(unsigned n) := NOFOLD(DATASET(n, TRANSFORM(MyRec, SELF.id := COUNTER , SELF.Field1:='K' + (STRING) (COUNTER % 50000), SELF.Field2:=(STRING)((COUNTER-1) DIV 50000) ), DISTRIBUTED));
+
+generatedDS := createIds(numRecords);
+
+// The expected result willl be (if dedup is working)
+// 1) Id field has value 1..50000
+// 2) field1 fields should have value 'K0' to 'K49999'
+// 3) field2 fields should be '0' (because this is the 'best' field)
+// 4) field2 should be the same as field1 prefixed with 'K', other than for Id field 50000 where field1 should be K0
+// 5) There should be 50K rows
+dedupds := DEDUP(generatedDS, Field1, HASH, BEST(field2));
+OUTPUT( COUNT(dedupds), NAMED('DEDUP_COUNT') );
+
+d := ENTH(SORT(dedupds,Id),20);
+OUTPUT( d, NAMED('DEDUP_SAMPLE') ) ;
+

+ 80 - 0
testing/regress/ecl/key/dedupwithbest.xml

@@ -0,0 +1,80 @@
+<Dataset name='BEST_Field2'>
+ <Row><id>8</id><field1>KA        </field1><field2>B                                                 </field2></Row>
+ <Row><id>6</id><field1>KB        </field1><field2>A                                                 </field2></Row>
+ <Row><id>4</id><field1>KC        </field1><field2>C                                                 </field2></Row>
+</Dataset>
+<Dataset name='BEST_Field2_Reverse'>
+ <Row><id>5</id><field1>KA        </field1><field2>X                                                 </field2></Row>
+ <Row><id>7</id><field1>KB        </field1><field2>G                                                 </field2></Row>
+ <Row><id>2</id><field1>KC        </field1><field2>Z                                                 </field2></Row>
+</Dataset>
+<Dataset name='BEST_Field2_HASH'>
+ <Row><id>8</id><field1>KA        </field1><field2>B                                                 </field2></Row>
+ <Row><id>6</id><field1>KB        </field1><field2>A                                                 </field2></Row>
+ <Row><id>4</id><field1>KC        </field1><field2>C                                                 </field2></Row>
+</Dataset>
+<Dataset name='GroupDedup'>
+ <Row><id>8</id><field1>KA        </field1><field2>B                                                 </field2></Row>
+ <Row><id>5</id><field1>KA        </field1><field2>X                                                 </field2></Row>
+ <Row><id>6</id><field1>KB        </field1><field2>A                                                 </field2></Row>
+ <Row><id>10</id><field1>KB        </field1><field2>B                                                 </field2></Row>
+ <Row><id>7</id><field1>KB        </field1><field2>G                                                 </field2></Row>
+ <Row><id>4</id><field1>KC        </field1><field2>C                                                 </field2></Row>
+ <Row><id>1</id><field1>KC        </field1><field2>G                                                 </field2></Row>
+ <Row><id>2</id><field1>KC        </field1><field2>Z                                                 </field2></Row>
+</Dataset>
+<Dataset name='GroupDedup_Reverse'>
+ <Row><id>8</id><field1>KA        </field1><field2>B                                                 </field2></Row>
+ <Row><id>5</id><field1>KA        </field1><field2>X                                                 </field2></Row>
+ <Row><id>6</id><field1>KB        </field1><field2>A                                                 </field2></Row>
+ <Row><id>11</id><field1>KB        </field1><field2>B                                                 </field2></Row>
+ <Row><id>7</id><field1>KB        </field1><field2>G                                                 </field2></Row>
+ <Row><id>4</id><field1>KC        </field1><field2>C                                                 </field2></Row>
+ <Row><id>1</id><field1>KC        </field1><field2>G                                                 </field2></Row>
+ <Row><id>9</id><field1>KC        </field1><field2>Z                                                 </field2></Row>
+</Dataset>
+<Dataset name='GroupDedupHash'>
+ <Row><id>8</id><field1>KA        </field1><field2>B                                                 </field2></Row>
+ <Row><id>5</id><field1>KA        </field1><field2>X                                                 </field2></Row>
+ <Row><id>6</id><field1>KB        </field1><field2>A                                                 </field2></Row>
+ <Row><id>10</id><field1>KB        </field1><field2>B                                                 </field2></Row>
+ <Row><id>7</id><field1>KB        </field1><field2>G                                                 </field2></Row>
+ <Row><id>4</id><field1>KC        </field1><field2>C                                                 </field2></Row>
+ <Row><id>1</id><field1>KC        </field1><field2>G                                                 </field2></Row>
+ <Row><id>2</id><field1>KC        </field1><field2>Z                                                 </field2></Row>
+</Dataset>
+<Dataset name='GroupDedupHash_Reverse'>
+ <Row><id>8</id><field1>KA        </field1><field2>B                                                 </field2></Row>
+ <Row><id>5</id><field1>KA        </field1><field2>X                                                 </field2></Row>
+ <Row><id>6</id><field1>KB        </field1><field2>A                                                 </field2></Row>
+ <Row><id>11</id><field1>KB        </field1><field2>B                                                 </field2></Row>
+ <Row><id>7</id><field1>KB        </field1><field2>G                                                 </field2></Row>
+ <Row><id>4</id><field1>KC        </field1><field2>C                                                 </field2></Row>
+ <Row><id>1</id><field1>KC        </field1><field2>G                                                 </field2></Row>
+ <Row><id>9</id><field1>KC        </field1><field2>Z                                                 </field2></Row>
+</Dataset>
+<Dataset name='DEDUP_COUNT'>
+ <Row><DEDUP_COUNT>50000</DEDUP_COUNT></Row>
+</Dataset>
+<Dataset name='DEDUP_SAMPLE'>
+ <Row><id>2500</id><field1>K2500     </field1><field2>0                                                 </field2></Row>
+ <Row><id>5000</id><field1>K5000     </field1><field2>0                                                 </field2></Row>
+ <Row><id>7500</id><field1>K7500     </field1><field2>0                                                 </field2></Row>
+ <Row><id>10000</id><field1>K10000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>12500</id><field1>K12500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>15000</id><field1>K15000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>17500</id><field1>K17500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>20000</id><field1>K20000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>22500</id><field1>K22500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>25000</id><field1>K25000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>27500</id><field1>K27500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>30000</id><field1>K30000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>32500</id><field1>K32500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>35000</id><field1>K35000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>37500</id><field1>K37500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>40000</id><field1>K40000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>42500</id><field1>K42500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>45000</id><field1>K45000    </field1><field2>0                                                 </field2></Row>
+ <Row><id>47500</id><field1>K47500    </field1><field2>0                                                 </field2></Row>
+ <Row><id>50000</id><field1>K0        </field1><field2>0                                                 </field2></Row>
+</Dataset>

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -177,7 +177,7 @@ public:
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
         distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, this, "FetchStream");
-        keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
+        keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL, NULL));
     }
     virtual IRowStream *queryOutput() { return this; }
     virtual IFileIO *queryPartIO(unsigned part) { assertex(part<files); return fPosMultiPartTable[part].file->queryFileIO(); }

+ 237 - 42
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -87,7 +87,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
     Owned<IRowStream> input;
 
     Semaphore distribDoneSem, localFinishedSem;
-    ICompare *iCompare;
+    ICompare *iCompare, *keepBestCompare;
     size32_t bucketSendSize;
     bool doDedup, allowSpill, connected, selfstopped;
     Owned<IException> sendException, recvException;
@@ -126,7 +126,7 @@ protected:
             }
         }
         unsigned count() const { return rows.ordinality(); }
-        bool dedup(ICompare *iCompare) // returns true if reduces by >= 10%
+        bool dedup(ICompare *iCompare, ICompare *keepBestCompare) // returns true if reduces by >= 10%
         {
             unsigned c = rows.ordinality();
             if (c<2)
@@ -140,21 +140,47 @@ protected:
              */
             dedupList.sort(*iCompare, 1);
 
-            OwnedConstThorRow prev;
-            for (unsigned i = c; i>0;)
+            OwnedConstThorRow prev = dedupList.getClear(--c);
+            if (!keepBestCompare)
             {
-                OwnedConstThorRow row = dedupList.getClear(--i);
-                if ((NULL != prev.get()) && (0 == iCompare->docompare(prev, row)))
+                rows.enqueue(prev.getLink());
+                for (unsigned i = c; i>0;)
                 {
-                    /* NB: do not alter 'total' size. It represents the amount originally added to the bucket
-                     * which will be deducted when sent.
-                     */
+                    OwnedConstThorRow row = dedupList.getClear(--i);
+                    if (0 == iCompare->docompare(prev, row))
+                    {
+                        /* NB: do not alter 'total' size. It represents the amount originally added to the bucket
+                        * which will be deducted when sent.
+                        */
+                    }
+                    else
+                    {
+                        rows.enqueue(row.getLink());
+                        prev.setown(row.getClear());
+                    }
                 }
-                else
+            }
+            else
+            {
+                // Queue all the "best" rows
+                for (unsigned i = c; i>0;)
                 {
-                    prev.set(row);
-                    rows.enqueue(row.getClear());
+                    OwnedConstThorRow row = dedupList.getClear(--i);
+
+                    if (0 == iCompare->docompare(prev, row))
+                    {
+                        // Same 'key' fields, so now examine 'best' fields to decide which to keep
+                        // N.B. queue rows that equal in terms of the "best" condition as slave activity select which to retain
+                        if ((keepBestCompare->docompare(prev,row) > 0))
+                            prev.setown(row.getClear());
+                    }
+                    else
+                    {
+                        rows.enqueue(prev.getClear());
+                        prev.setown(row.getClear());
+                    }
                 }
+                rows.enqueue(prev.getClear());
             }
             dedupList.clearRows();
             return true; // attempted
@@ -512,7 +538,7 @@ protected:
             }
             unsigned preCount = sendBucket->count();
             CCycleTimer dedupTimer;
-            if (sendBucket->dedup(owner.iCompare))
+            if (sendBucket->dedup(owner.iCompare, owner.keepBestCompare))
             {
                 unsigned tookMs = dedupTimer.elapsedMs();
                 unsigned postCount = sendBucket->count();
@@ -1066,7 +1092,7 @@ public:
         if (_pullBufferSize) pullBufferSize = _pullBufferSize;
     }
 
-    virtual IRowStream *connect(IThorRowInterfaces *_rowIf, IRowStream *_input, IHash *_ihash, ICompare *_iCompare)
+    virtual IRowStream *connect(IThorRowInterfaces *_rowIf, IRowStream *_input, IHash *_ihash, ICompare *_iCompare, ICompare *_keepBestCompare)
     {
         ActPrintLog("HASHDISTRIB: connect");
 
@@ -1081,6 +1107,7 @@ public:
         input.set(_input);
         ihash = _ihash;
         iCompare = _iCompare;
+        keepBestCompare = _keepBestCompare;
         if (allowSpill)
         {
             StringBuffer temp;
@@ -1999,7 +2026,7 @@ public:
     void doDistSetup()
     {
         Owned<IThorRowInterfaces> myRowIf = getRowInterfaces(); // avoiding circular link issues
-        out.setown(distributor->connect(myRowIf, instrm, ihash, mergecmp));
+        out.setown(distributor->connect(myRowIf, instrm, ihash, mergecmp, NULL));
     }
     virtual void start() override
     {
@@ -2423,6 +2450,29 @@ class CHashTableRowTable : private CThorExpandingRowArray
     ICompare *iCompare;
     rowidx_t htElements, htMax;
 
+    class CHashTableRowStream : public CSimpleInterfaceOf<IRowStream>
+    {
+        CHashTableRowTable * parent;
+        bool stopped;
+        unsigned pos;
+    public:
+        CHashTableRowStream(CHashTableRowTable * _parent) : parent(_parent), pos(0), stopped(false) {};
+        const void * nextRow()
+        {
+            if (stopped)
+                return nullptr;
+            while (pos < parent->numRows)
+            {
+                const void * row = parent->getRowClear(pos);
+                ++pos;
+                if (row) return row;
+            }
+            stopped = true;
+            return nullptr;
+        }
+        void stop() { stopped=true; }
+    };
+
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -2454,7 +2504,7 @@ public:
         return allocateRowTable(newMaxRows, maxSpillCost);
     }
     void rehash(const void **newRows);
-    bool lookupRow(unsigned htPos, const void *row) const // return true == match
+    unsigned lookupRow(unsigned htPos, const void *row) const // return htpos of match or UINT_MAX if no match
     {
         rowidx_t s = htPos;
         loop
@@ -2463,13 +2513,13 @@ public:
             if (!htKey)
                 break;
             if (0 == iCompare->docompare(row, htKey))
-                return true;
+                return htPos;
             if (++htPos==maxRows)
                 htPos = 0;
             if (htPos == s)
                 ThrowStringException(0, "lookupRow() HT full - infinite loop!");
         }
-        return false;
+        return UINT_MAX;
     }
     inline void addRow(unsigned htPos, const void *row)
     {
@@ -2492,10 +2542,16 @@ public:
             --htElements;
         return ret;
     }
+    inline const void *queryRow(unsigned htPos) const
+    {
+        return CThorExpandingRowArray::query(htPos);
+    }
     inline rowidx_t queryHtElements() const { return htElements; }
     inline bool hasRoom() const { return htElements < htMax; }
     inline rowidx_t queryMaxRows() const { return CThorExpandingRowArray::queryMaxRows(); }
     inline const void **getRowArray() { return CThorExpandingRowArray::getRowArray(); }
+    inline unsigned getHighWaterMark() const { return numRows; }
+    inline IRowStream *getRowStream() { return new CHashTableRowStream(this); }
 };
 
 class CSpill : implements IRowWriter, public CSimpleInterface
@@ -2590,8 +2646,12 @@ class CBucket : public CSimpleInterface
     CriticalSection lock;
     unsigned bucketN;
     CSpill rowSpill, keySpill;
-
+    bool keepBest;
+    ICompare *keepBestCompare;
     void doSpillHashTable();
+    bool rowsInBucketDedupedAlready;
+    bool streamed = true;
+
 public:
     CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
     bool addKey(const void *key, unsigned hashValue);
@@ -2613,13 +2673,35 @@ public:
         rowSpill.close();
         keySpill.close();
     }
-    inline IRowStream *getRowStream(rowcount_t *count) { return rowSpill.getReader(count); }
-    inline IRowStream *getKeyStream(rowcount_t *count) { return keySpill.getReader(count); }
+    inline IRowStream *getSpillRowStream(rowcount_t *count) { return rowSpill.getReader(count); }
+    inline IRowStream *getSpillKeyStream(rowcount_t *count) { return keySpill.getReader(count); }
+    inline IRowStream *getRowStream()
+    {
+        // NB: must be called when spilling blocked (see CBucketHandler::spillCrit)
+        streamed = true;
+        return htRows->getRowStream();
+    }
     inline rowidx_t getKeyCount() const { return htRows->queryHtElements(); }
     inline rowcount_t getSpiltRowCount() const { return rowSpill.getCount(); }
     inline rowcount_t getSpiltKeyCount() const { return keySpill.getCount(); }
     inline bool isSpilt() const { return spilt; }
+    inline bool isSpillable() const { return !spilt && !streamed; }
     inline unsigned queryBucketNumber() const { return bucketN; }
+    inline const void *queryRow(unsigned htPos) const
+    {
+        if (htPos < htRows->getHighWaterMark())
+            return htRows->queryRow(htPos);
+        return nullptr;
+    }
+    inline void setRowsInBucketDeduped()
+    {
+        dbgassertex(!isSpilt());
+        rowsInBucketDedupedAlready=true;
+    }
+    inline bool areRowsInBucketDeduped() const
+    {
+        return rowsInBucketDedupedAlready;
+    }
 };
 
 class CBucketHandler : public CSimpleInterface, implements IInterface, implements roxiemem::IBufferedRowCallback
@@ -2636,6 +2718,9 @@ class CBucketHandler : public CSimpleInterface, implements IInterface, implement
     CBucket **buckets;
     mutable rowidx_t peakKeyCount;
     bool callbacksInstalled = false;
+    unsigned nextBestBucket = 0;
+    bool bestReady = false;
+    CriticalSection spillCrit;
 
     rowidx_t getTotalBucketCount() const
     {
@@ -2700,7 +2785,10 @@ public:
             recalcPeakKeyCount();
         return peakKeyCount;
     }
+    void clearCallbacks();
+    void initCallbacks();
     void flushBuckets();
+    CBucket *queryBucket(unsigned bucketn) { assertex (bucketn < numBuckets); return buckets[bucketn]; }
     bool spillBucket(bool critical) // spills a bucket
     {
         if (NotFound == nextToSpill)
@@ -2710,6 +2798,7 @@ public:
             // post spill, turn on, on higher priority, flushes spilt buckets that have accrued keys
             nextSpilledBucketFlush = 0;
         }
+        CriticalBlock b(spillCrit);
         unsigned start=nextToSpill;
         do
         {
@@ -2719,7 +2808,7 @@ public:
             CBucket *bucket = buckets[nextToSpill++];
             if (nextToSpill == numBuckets)
                 nextToSpill = 0;
-            if (!bucket->isSpilt() && (critical || (bucket->getKeyCount() >= HASHDEDUP_MINSPILL_THRESHOLD)))
+            if (bucket->isSpillable() && (critical || (bucket->getKeyCount() >= HASHDEDUP_MINSPILL_THRESHOLD)))
             {
                 // spill whole bucket unless last
                 // The one left, will be last bucket standing and grown to fill mem
@@ -2732,6 +2821,7 @@ public:
         return false;
     }
     CBucketHandler *getNextBucketHandler(Owned<IRowStream> &nextInput);
+
 public:
     bool addRow(const void *row);
     inline unsigned calcBucket(unsigned hashValue) const
@@ -2749,6 +2839,41 @@ public:
     {
         return spillBucket(critical);
     }
+    IRowStream * getNextBestRowStream()
+    {
+        // NB: Called only once input has been read
+        CriticalBlock b(spillCrit); // buckets can still be spilt
+        if (!bestReady)
+        {
+            // All non-spilled buckets in memory at this point in time are deduped
+            // -> set flag in all these buckets just in case they are spilled so that
+            //    when they need to be streamed back it's not necessary to dedup again
+            bestReady = true;
+            for (unsigned cur=0; cur<numBuckets; cur++)
+            {
+                if (!buckets[cur]->isSpilt())
+                    buckets[cur]->setRowsInBucketDeduped();
+            }
+        }
+        while (nextBestBucket < numBuckets)
+        {
+            CBucket *bucket = buckets[nextBestBucket++];
+            if (bucket->isSpilt())
+            {
+                if (bucket->areRowsInBucketDeduped())
+                {
+                    rowcount_t count; // unused
+                    return bucket->getSpillRowStream(&count);
+                }
+            }
+            else
+            {
+                if (bucket->getKeyCount())
+                    return bucket->getRowStream();
+            }
+        }
+        return nullptr;
+    }
 };
 
 class HashDedupSlaveActivityBase : public CSlaveActivity
@@ -2772,6 +2897,10 @@ protected:
     CHashTableRowTable **hashTables;
     unsigned numHashTables, initialNumBuckets;
     roxiemem::RoxieHeapFlags allocFlags;
+    bool keepBest;
+    ICompare *keepBestCompare;
+    Owned<IRowStream> bestRowStream;
+    unsigned testSpillTimes;
 
     inline CHashTableRowTable &queryHashTable(unsigned n) const { return *hashTables[n]; }
     void ensureNumHashTables(unsigned _numHashTables)
@@ -2819,6 +2948,8 @@ public:
         numHashTables = initialNumBuckets = 0;
         roxiemem::RoxieHeapFlags allocFlags = roxiemem::RHFnone;
         appendOutputLinked(this);
+        keepBest = false;
+        testSpillTimes = getOptInt("testHashDedupSpillTimes");
     }
     ~HashDedupSlaveActivityBase()
     {
@@ -2829,6 +2960,9 @@ public:
     {
         iHash = helper->queryHash();
         iCompare = helper->queryCompare();
+        keepBest = helper->keepBest();
+        keepBestCompare = helper->queryCompareBest();
+        assertex (!keepBest || keepBestCompare);
 
         // JCSMORE - really should ask / lookup what flags the allocator created for extractKey has...
         allocFlags = queryJobChannel().queryThorAllocator()->queryFlags();
@@ -2836,7 +2970,7 @@ public:
         // JCSMORE - it may not be worth extracting the key,
         // if there's an upstream activity that holds onto rows for periods of time (e.g. sort)
         IOutputMetaData *km = helper->queryKeySize();
-        extractKey = (0 == (HFDwholerecord & helper->getFlags()));
+        extractKey = (0 == (HDFwholerecord & helper->getFlags())) && !keepBest;
         if (extractKey)
         {
             // if key and row are fixed length, check that estimated memory sizes make it worth extracting key per row
@@ -2915,10 +3049,22 @@ public:
         ActivityTimer t(totalCycles, timeActivities);
         if (eos)
             return NULL;
+
         // bucket handlers, stream out non-duplicates (1st entry in HT)
         loop
         {
             OwnedConstThorRow row;
+            if (bestRowStream)
+            {
+                row.setown(bestRowStream->nextRow());
+                if (row)
+                {
+                    dataLinkIncrement();
+                    return row.getClear();
+                }
+                // Else drop-through
+            }
+            else
             {
                 SpinBlock b(stopSpin);
                 row.setown(grouped?distInput->nextRow():distInput->ungroupedNextRow());
@@ -2928,15 +3074,38 @@ public:
                 lastEog = false;
                 if (bucketHandler->addRow(row)) // true if new, i.e. non-duplicate (does not take ownership)
                 {
-                    dataLinkIncrement();
-                    return row.getClear();
+                    // Keepbest doesn't return rows straight-away, it builds the
+                    // best rows in the hash table first
+                    if (!keepBest)
+                    {
+                        dataLinkIncrement();
+                        return row.getClear();
+                    }
                 }
             }
             else
             {
+                // Keepbest has populated the hashtable with the best rows
+                // -> stream back best rows from hash table
+                if (keepBest)
+                {
+                    // This should cause one of the buckets to spill (used for testing)
+                    if (testSpillTimes)
+                    {
+                        bucketHandler->spillBucket(false);
+                        testSpillTimes--;
+                    }
+
+                    /* Get next available best IRowStream, i.e. buckets that did not spill before end of input.
+                     * The bucket whose stream is returned is no longer be spillable.
+                     * Other buckets continue to be, but are marked to be ignored by future handler stages.
+                     */
+                    bestRowStream.setown(bucketHandler->getNextBestRowStream());
+                    if (bestRowStream)
+                        continue;
+                }
                 // If spill event occurred, disk buckets + key buckets will have been created by this stage.
                 bucketHandler->flushBuckets();
-
                 Owned<CBucketHandler> nextBucketHandler;
                 loop
                 {
@@ -2949,6 +3118,7 @@ public:
                         {
                             currentInput.clear();
                             bucketHandler.clear();
+
                             if (grouped)
                             {
                                 if (lastEog)
@@ -3057,7 +3227,7 @@ void CHashTableRowTable::rehash(const void **newRows)
 
 CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
     : owner(_owner), rowIf(_rowIf), keyIf(_keyIf), iRowHash(_iRowHash), iKeyHash(_iKeyHash), iCompare(_iCompare), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
-      rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
+      rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN), rowsInBucketDedupedAlready(false)
 
 {
     spilt = false;
@@ -3072,6 +3242,9 @@ CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf,
     }
     else
         keyAllocator = keyIf->queryRowAllocator();
+
+    keepBest = owner.helper->keepBest();
+    keepBestCompare = owner.helper->queryCompareBest();
 }
 
 void CBucket::clear()
@@ -3200,8 +3373,17 @@ bool CBucket::addRow(const void *row, unsigned hashValue)
         {
             // Even if not adding, check HT for dedupping purposes upfront
             unsigned htPos = hashValue % htRows->queryMaxRows();
-            if (htRows->lookupRow(htPos, row))
-                return false; // dedupped
+            unsigned matchedHtPos = htRows->lookupRow(htPos, row);
+            if (matchedHtPos != UINT_MAX)
+            {
+                const void * oldrow = htRows->queryRow(matchedHtPos);
+                if (!keepBest || keepBestCompare->docompare(oldrow,row) <= 0)
+                    return false; // dedupped
+
+                // Remove the non-best row
+                ReleaseRoxieRow(htRows->getRowClear(matchedHtPos));
+                // drop-through to add the best row
+            }
 
             if (doAdd)
             {
@@ -3268,7 +3450,7 @@ CBucketHandler::~CBucketHandler()
         ::Release(buckets[i]);
 }
 
-void CBucketHandler::flushBuckets()
+void CBucketHandler::clearCallbacks()
 {
     if (callbacksInstalled)
     {
@@ -3276,6 +3458,22 @@ void CBucketHandler::flushBuckets()
         owner.queryRowManager()->removeRowBuffer(&postSpillFlush);
         callbacksInstalled = false;
     }
+}
+
+void CBucketHandler::initCallbacks()
+{
+    if (!callbacksInstalled)
+    {
+        owner.queryRowManager()->addRowBuffer(this);
+        // postSpillFlush not needed until after 1 spill event, but not safe to add within callback
+        owner.queryRowManager()->addRowBuffer(&postSpillFlush);
+        callbacksInstalled = true;
+    }
+}
+
+void CBucketHandler::flushBuckets()
+{
+    clearCallbacks();
     for (unsigned i=0; i<numBuckets; i++)
     {
         CBucket &bucket = *buckets[i];
@@ -3375,10 +3573,7 @@ void CBucketHandler::init(unsigned _numBuckets, IRowStream *keyStream)
         htRows.setOwner(buckets[i]);
     }
     ActPrintLog(&owner, "Max %d buckets, current depth = %d", numBuckets, depth+1);
-    owner.queryRowManager()->addRowBuffer(this);
-    // postSpillFlush not needed until after 1 spill event, but not safe to add within callback
-    owner.queryRowManager()->addRowBuffer(&postSpillFlush);
-    callbacksInstalled = true;
+    initCallbacks();
     if (keyStream)
     {
         loop
@@ -3400,18 +3595,18 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
     while (currentBucket<numBuckets)
     {
         CBucket *bucket = buckets[currentBucket];
-        if (bucket->isSpilt())
+        if (bucket->isSpilt() && !bucket->areRowsInBucketDeduped())
         {
             rowcount_t keyCount, count;
             /* If each key and row stream were to use a unique allocator per destination bucket
              * thereby keeping rows/keys together in pages, it would make it easier to free pages on spill requests.
              * However, it would also mean a lot of allocators with at least one page per allocate, which ties up a lot of memory
              */
-            Owned<IRowStream> keyStream = bucket->getKeyStream(&keyCount);
+            Owned<IRowStream> keyStream = bucket->getSpillKeyStream(&keyCount);
             dbgassertex(keyStream);
             Owned<CBucketHandler> newBucketHandler = new CBucketHandler(owner, rowIf, keyIf, iRowHash, iKeyHash, iCompare, extractKey, depth+1, div*numBuckets);
             ActPrintLog(&owner, "Created bucket handler %d, depth %d", currentBucket, depth+1);
-            nextInput.setown(bucket->getRowStream(&count));
+            nextInput.setown(bucket->getSpillRowStream(&count));
             dbgassertex(nextInput);
             // Use peak in mem keys as estimate for next round of buckets.
             unsigned nextNumBuckets = getBucketEstimateWithPrev(count, (rowidx_t)getPeakCount(), (rowidx_t)keyCount);
@@ -3483,7 +3678,7 @@ public:
         HashDedupSlaveActivityBase::start();
         ActivityTimer s(totalCycles, timeActivities);
         Owned<IThorRowInterfaces> myRowIf = getRowInterfaces(); // avoiding circular link issues
-        instrm.setown(distributor->connect(myRowIf, distInput, iHash, iCompare));
+        instrm.setown(distributor->connect(myRowIf, distInput, iHash, iCompare, keepBestCompare));
         distInput = instrm.get();
     }
     virtual void stop() override
@@ -3592,7 +3787,7 @@ public:
         ICompare *icompareR = joinargs->queryCompareRight();
         if (!lhsDistributor)
             lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, this, "LHS"));
-        Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), leftInputStream, ihashL, icompareL);
+        Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), leftInputStream, ihashL, icompareL, nullptr);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, stableSort_earlyAlloc, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         strmL.setown(loaderL->load(reader, abortSoon));
         loaderL.clear();
@@ -3603,7 +3798,7 @@ public:
         leftdone = true;
         if (!rhsDistributor)
             rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag2, false, this, "RHS"));
-        reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), rightInputStream, ihashR, icompareR));
+        reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), rightInputStream, ihashR, icompareR, nullptr));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         strmR.setown(loaderR->load(reader, abortSoon));
         loaderR.clear();
@@ -3791,7 +3986,7 @@ RowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBas
         } nodeCompare(helperExtra.queryHashElement());
         if (!distributor)
             distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJobChannel().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
-        strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare));
+        strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare, NULL));
         loop
         {
             OwnedConstThorRow rowMeta = strm->nextRow();
@@ -3826,7 +4021,7 @@ RowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBas
         if (!distributor)
             distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJobChannel().queryJobComm(), mptag, false, NULL, "MERGEAGGS"));
         Owned<IThorRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
-        strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL));
+        strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL, NULL));
         loop
         {
             OwnedConstThorRow row = strm->nextRow();

+ 1 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.ipp

@@ -26,7 +26,7 @@
 
 interface IHashDistributor : extends IInterface
 {
-    virtual IRowStream *connect(IThorRowInterfaces *rowIf, IRowStream *in, IHash *ihash, ICompare *icompare)=0;
+    virtual IRowStream *connect(IThorRowInterfaces *rowIf, IRowStream *in, IHash *ihash, ICompare *icompare, ICompare *compareBest)=0;
     virtual void disconnect(bool stop)=0;
     virtual void join()=0;
     virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;

+ 2 - 2
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2287,7 +2287,7 @@ protected:
             if (getOptBool(THOROPT_LKJOIN_HASHJOINFAILOVER)) // for testing only (force to disk, as if spilt)
                 channelDistributor.spill(false);
 
-            Owned<IRowStream> distChannelStream = rhsDistributor->connect(queryRowInterfaces(rightITDL), right, rightHash, NULL);
+            Owned<IRowStream> distChannelStream = rhsDistributor->connect(queryRowInterfaces(rightITDL), right, rightHash, nullptr, nullptr);
             channelDistributor.processDistRight(distChannelStream);
         }
         catch (IException *e)
@@ -2429,7 +2429,7 @@ protected:
                     }
 
                     // start LHS distributor, needed by local lookup or full join
-                    left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left, leftHash, NULL));
+                    left.setown(lhsDistributor->connect(queryRowInterfaces(leftITDL), left, leftHash, nullptr, nullptr));
 
                     // NB: Some channels in this or other slave processes may have fallen over to hash join
                 }

+ 6 - 2
thorlcr/activities/rollup/throllupslave.cpp

@@ -284,7 +284,9 @@ class CDedupBaseSlaveActivity : public CDedupRollupBaseActivity
 {
 protected:
     IHThorDedupArg *ddhelper;
+    ICompare *compareBest;
     bool keepLeft = 0;
+    bool keepBest = 0;
     unsigned numToKeep = 0;
 
 public:
@@ -300,8 +302,10 @@ public:
         ActivityTimer s(totalCycles, timeActivities);
         CDedupRollupBaseActivity::start();
         keepLeft = ddhelper->keepLeft();
+        keepBest = ddhelper->keepBest();
         numToKeep = ddhelper->numToKeep();
-        assertex(keepLeft || numToKeep == 1);
+        compareBest = ddhelper->queryCompareBest();
+        assertex( (keepLeft || numToKeep == 1) && (!keepBest || numToKeep==1));
     }
     virtual bool isGrouped() const override { return groupOp; }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
@@ -376,7 +380,7 @@ public:
                 numKept++;
                 break;
             }
-            if (!keepLeft) 
+            if (!keepLeft || (keepBest && (compareBest->docompare(kept,next) > 0)))
                 kept.setown(next.getClear());
         }
         OwnedConstThorRow ret = kept.getClear();