Przeglądaj źródła

Merge pull request #6867 from richardkchapman/spill-join2

HPCC-10575 Support spilling sort in roxie 

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 lat temu
rodzic
commit
57dc290ae4
4 zmienionych plików z 81 dodań i 29 usunięć
  1. 7 2
      roxie/ccd/ccdquery.cpp
  2. 1 0
      roxie/ccd/ccdquery.hpp
  3. 72 26
      roxie/ccd/ccdserver.cpp
  4. 1 1
      roxie/ccd/ccdserver.hpp

+ 7 - 2
roxie/ccd/ccdquery.cpp

@@ -298,6 +298,7 @@ QueryOptions::QueryOptions()
     skipFileFormatCrcCheck = false;
     stripWhitespaceFromStoredDataset = ((ptr_ignoreWhiteSpace & defaultXmlReadFlags) != 0);
     timeActivities = defaultTimeActivities;
+    allSortsMaySpill = false; // No global default for this
 }
 
 QueryOptions::QueryOptions(const QueryOptions &other)
@@ -320,7 +321,8 @@ QueryOptions::QueryOptions(const QueryOptions &other)
     enableFieldTranslation = other.enableFieldTranslation;
     skipFileFormatCrcCheck = other.skipFileFormatCrcCheck;
     stripWhitespaceFromStoredDataset = other.stripWhitespaceFromStoredDataset;
-    timeActivities =other.timeActivities;
+    timeActivities = other.timeActivities;
+    allSortsMaySpill = other.allSortsMaySpill;
 }
 
 void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
@@ -354,6 +356,7 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
     updateFromWorkUnit(skipFileFormatCrcCheck, wu, "skipFileFormatCrcCheck");
     updateFromWorkUnit(stripWhitespaceFromStoredDataset, wu, "stripWhitespaceFromStoredDataset");
     updateFromWorkUnit(timeActivities, wu, "timeActivities");
+    updateFromWorkUnit(allSortsMaySpill, wu, "allSortsMaySpill");
 }
 
 void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
@@ -397,6 +400,7 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
         updateFromContext(skipFileFormatCrcCheck, ctx, "_SkipFileFormatCrcCheck", "@skipFileFormatCrcCheck");
         updateFromContext(stripWhitespaceFromStoredDataset, ctx, "_StripWhitespaceFromStoredDataset", "@stripWhitespaceFromStoredDataset");
         updateFromContext(timeActivities, ctx, "@timeActivities", "_TimeActivities");
+        // Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
     }
 }
 
@@ -718,7 +722,7 @@ protected:
         case TAKsoap_datasetaction:
             return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
         case TAKsort:
-            return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKspill:
         case TAKmemoryspillsplit:
             return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind);
@@ -1042,6 +1046,7 @@ public:
         isLoadFailed = false;
         libraryInterfaceHash = 0;
         options.enableFieldTranslation = package.getEnableFieldTranslation();  // NOTE - can be overridden by wu settings
+        options.allSortsMaySpill = dynamic;
     }
 
     ~CQueryFactory()

+ 1 - 0
roxie/ccd/ccdquery.hpp

@@ -114,6 +114,7 @@ public:
     bool skipFileFormatCrcCheck;
     bool stripWhitespaceFromStoredDataset;
     bool timeActivities;
+    bool allSortsMaySpill;
 
 private:
     static const char *findProp(const IPropertyTree *ctx, const char *name1, const char *name2);

+ 72 - 26
roxie/ccd/ccdserver.cpp

@@ -7399,6 +7399,7 @@ interface ISortAlgorithm : extends IInterface
 
 class CQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
 {
+protected:
     unsigned curIndex;
     ConstPointerArray sorted;
     ICompare *compare;
@@ -7432,6 +7433,24 @@ public:
     }
 };
 
+class CStableQuickSortAlgorithm : public CQuickSortAlgorithm
+{
+public:
+    CStableQuickSortAlgorithm(ICompare *_compare) : CQuickSortAlgorithm(_compare)
+    {
+    }
+    virtual void prepare(IRoxieInput *input)
+    {
+        curIndex = 0;
+        if (input->nextGroup(sorted))
+        {
+            unsigned size = sorted.ordinality();
+            MemoryAttr indexbuff(size*sizeof(void **));
+            qsortvecstable(const_cast<void * *>(sorted.getArray()), size, *compare, (void ***)indexbuff.bufferBase());
+        }
+    }
+};
+
 class CSpillingQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, implements roxiemem::IBufferedRowCallback
 {
     enum {
@@ -7448,10 +7467,12 @@ class CSpillingQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, imp
     Owned<IRowStream> diskReader;
     IOutputMetaData *rowMeta;
     unsigned activityId;
+    bool stable;
 
 public:
-    CSpillingQuickSortAlgorithm(ICompare *_compare, IRoxieSlaveContext * _ctx, IOutputMetaData * _rowMeta, unsigned _activityId)
-        : rowsToSort(&_ctx->queryRowManager(), InitialSortElements, CommitStep, _activityId), ctx(_ctx), compare(_compare), rowMeta(_rowMeta), activityId(_activityId)
+    CSpillingQuickSortAlgorithm(ICompare *_compare, IRoxieSlaveContext * _ctx, IOutputMetaData * _rowMeta, unsigned _activityId, bool _stable)
+        : rowsToSort(&_ctx->queryRowManager(), InitialSortElements, CommitStep, _activityId),
+          ctx(_ctx), compare(_compare), rowMeta(_rowMeta), activityId(_activityId), stable(_stable)
     {
         ctx->queryRowManager().addRowBuffer(this);
     }
@@ -7508,7 +7529,13 @@ public:
             {
                 const void * * rows = rowsToSort.getBlock(numRows);
                 //MORE: Should this be parallel?  Should that be dependent on whether it is grouped?  Should be a hint.
-                qsortvec(const_cast<void * *>(rows), numRows, *compare);
+                if (stable)
+                {
+                    MemoryAttr indexbuff(numRows*sizeof(void **));
+                    qsortvecstable(const_cast<void * *>(rows), numRows, *compare, (void ***)indexbuff.bufferBase());
+                }
+                else
+                    qsortvec(const_cast<void * *>(rows), numRows, *compare);
             }
             sorted.transferFrom(rowsToSort);
         }
@@ -8003,7 +8030,7 @@ public:
     }
 };
 
-typedef enum {heapSort, insertionSort, quickSort, spillingQuickSort, unknownSort } RoxieSortAlgorithm;
+typedef enum {heapSort, insertionSort, quickSort, stableQuickSort, spillingQuickSort, stableSpillingQuickSort, unknownSort } RoxieSortAlgorithm;
 
 class CRoxieServerSortActivity : public CRoxieServerActivity
 {
@@ -8037,8 +8064,12 @@ public:
         case quickSort:
             sorter.setown(new CQuickSortAlgorithm(compare));
             break;
+        case stableQuickSort:
+            sorter.setown(new CStableQuickSortAlgorithm(compare));
+            break;
         case spillingQuickSort:
-            sorter.setown(new CSpillingQuickSortAlgorithm(compare, ctx, meta, activityId));
+        case stableSpillingQuickSort:
+            sorter.setown(new CSpillingQuickSortAlgorithm(compare, ctx, meta, activityId, sortAlgorithm==stableSpillingQuickSort));
             break;
         case unknownSort:
             sorter.clear(); // create it later....
@@ -8077,9 +8108,13 @@ public:
                 {
                     if (stricmp(useAlgorithm, "quicksort")==0)
                     {
-                        if (sortFlags & TAFstable)
-                            throw MakeStringException(ROXIE_UNKNOWN_ALGORITHM, "Invalid stable sort algorithm %s requested", useAlgorithm.get());
-                        sorter.setown(new CQuickSortAlgorithm(compare));
+                        switch (sortFlags & (TAFstable|TAFspill))
+                        {
+                        case 0: sortAlgorithm = quickSort; break;
+                        case TAFstable: sortAlgorithm = stableQuickSort; break;
+                        case TAFspill: sortAlgorithm = spillingQuickSort; break;
+                        case TAFstable|TAFspill: sortAlgorithm = stableSpillingQuickSort; break;
+                        }
                     }
                     else if (stricmp(useAlgorithm, "heapsort")==0)
                         sorter.setown(new CHeapSortAlgorithm(compare));
@@ -8088,7 +8123,9 @@ public:
                     else
                     {
                         WARNLOG(ROXIE_UNKNOWN_ALGORITHM, "Ignoring unsupported sort order algorithm '%s', using default", useAlgorithm.get());
-                        if (sortFlags & TAFunstable)
+                        if (sortFlags & TAFspill)
+                            sorter.setown(new CSpillingQuickSortAlgorithm(compare, ctx, meta, activityId, (sortFlags & TAFstable) != 0));
+                        else if (sortFlags & TAFunstable)
                             sorter.setown(new CQuickSortAlgorithm(compare));
                         else
                             sorter.setown(new CHeapSortAlgorithm(compare));
@@ -8116,20 +8153,31 @@ class CRoxieServerSortActivityFactory : public CRoxieServerActivityFactory
 {
     RoxieSortAlgorithm sortAlgorithm;
     unsigned sortFlags;
-
+    bool forceSpill;
 public:
-    CRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+    CRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
         : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
     {
-        sortAlgorithm = heapSort;
-        sortFlags = TAFstable;
+        forceSpill = _queryFactory.queryOptions().allSortsMaySpill || _graphNode.getPropBool("hint[@name='spill']/@value", false);;
+        if (forceSpill)
+        {
+            sortAlgorithm = stableSpillingQuickSort;
+            sortFlags = TAFstable|TAFspill;
+        }
+        else
+        {
+            sortAlgorithm = heapSort;
+            sortFlags = TAFstable;
+        }
         Owned<IHThorSortArg> sortHelper = (IHThorSortArg *) helperFactory();
         IHThorAlgorithm *sortMethod = static_cast<IHThorAlgorithm *>(sortHelper->selectInterface(TAIalgorithm_1));
         if (sortMethod)
         {
             sortFlags = sortMethod->getAlgorithmFlags();
+            if (forceSpill)
+                sortFlags |= TAFspill;
             if (sortFlags & TAFspill)
-                sortAlgorithm = spillingQuickSort;
+                sortAlgorithm = stableSpillingQuickSort;
             else if (sortFlags & TAFunstable)
                 sortAlgorithm = quickSort;
             if (!(sortFlags & TAFconstant))
@@ -8141,18 +8189,16 @@ public:
                 {
                     if (stricmp(useAlgorithm, "quicksort")==0)
                     {
-                        if (sortFlags & TAFstable)
-                            throw MakeStringException(ROXIE_UNKNOWN_ALGORITHM, "Invalid stable sort algorithm %s requested", useAlgorithm.get());
-                        sortAlgorithm = quickSort;
-                    }
-                    else if (stricmp(useAlgorithm, "spillingquicksort")==0)
-                    {
-                        if (sortFlags & TAFstable)
-                            throw MakeStringException(ROXIE_UNKNOWN_ALGORITHM, "Invalid stable sort algorithm %s requested", useAlgorithm.get());
-                        sortAlgorithm = spillingQuickSort;
+                        switch (sortFlags & (TAFstable|TAFspill))
+                        {
+                        case 0: sortAlgorithm = quickSort; break;
+                        case TAFstable: sortAlgorithm = stableQuickSort; break;
+                        case TAFspill: sortAlgorithm = spillingQuickSort; break;
+                        case TAFstable|TAFspill: sortAlgorithm = stableSpillingQuickSort; break;
+                        }
                     }
                     else if (stricmp(useAlgorithm, "heapsort")==0)
-                        sortAlgorithm = heapSort; // NOTE - we do allow UNSTABLE('heapsort') in order to facilitate runtime selection
+                        sortAlgorithm = heapSort; // NOTE - we do allow UNSTABLE('heapsort') in order to facilitate runtime selection. Also explicit selection of heapsort overrides request to spill
                     else if (stricmp(useAlgorithm, "insertionsort")==0)
                         sortAlgorithm = insertionSort;
                     else
@@ -8174,9 +8220,9 @@ public:
     }
 };
 
-IRoxieServerActivityFactory *createRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+IRoxieServerActivityFactory *createRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
 {
-    return new CRoxieServerSortActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+    return new CRoxieServerSortActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
 }
 
 //=====================================================================================================

+ 1 - 1
roxie/ccd/ccdserver.hpp

@@ -343,7 +343,7 @@ extern IRoxieServerActivityFactory *createRoxieServerRollupActivityFactory(unsig
 extern IRoxieServerActivityFactory *createRoxieServerNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerNormalizeChildActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerNormalizeLinkedChildActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerThroughSpillActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSplitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerPipeReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);