瀏覽代碼

Merge pull request #7540 from ghalliday/issue13711

HPCC-13711 Add merge sort algorithms to roxie

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
8958863a06
共有 3 個文件被更改,包括 133 次插入31 次删除
  1. 124 30
      common/thorhelper/roxiehelper.cpp
  2. 5 1
      common/thorhelper/roxiehelper.hpp
  3. 4 0
      roxie/ccd/ccdserver.cpp

+ 124 - 30
common/thorhelper/roxiehelper.cpp

@@ -618,7 +618,7 @@ extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const I
 
 //========================================================================================= 
 
-class CQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+class CInplaceSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
 {
 protected:
     unsigned curIndex;
@@ -626,18 +626,11 @@ protected:
     ICompare *compare;
 
 public:
-    CQuickSortAlgorithm(ICompare *_compare) : compare(_compare)
+    CInplaceSortAlgorithm(ICompare *_compare) : compare(_compare)
     {
         curIndex = 0;
     }
 
-    virtual void prepare(IInputBase *input)
-    {
-        curIndex = 0;
-        if (input->nextGroup(sorted))
-            qsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
-    }
-
     virtual const void *next()
     {
         if (sorted.isItem(curIndex))
@@ -654,12 +647,26 @@ public:
     }
 };
 
-class CStableQuickSortAlgorithm : public CQuickSortAlgorithm
+class CQuickSortAlgorithm : public CInplaceSortAlgorithm
 {
 public:
-    CStableQuickSortAlgorithm(ICompare *_compare) : CQuickSortAlgorithm(_compare)
+    CQuickSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
+
+    virtual void prepare(IInputBase *input)
     {
+        curIndex = 0;
+        if (input->nextGroup(sorted))
+            qsortvec(const_cast<void * *>(sorted.getArray()), sorted.ordinality(), *compare);
     }
+};
+
+class CStableInplaceSortAlgorithm : public CInplaceSortAlgorithm
+{
+public:
+    CStableInplaceSortAlgorithm(ICompare *_compare) : CInplaceSortAlgorithm(_compare) {}
+
+    virtual void sortRows(void * * rows, size_t numRows, void * * temp) = 0;
+
     virtual void prepare(IInputBase *input)
     {
         curIndex = 0;
@@ -669,11 +676,44 @@ public:
             void **rows = const_cast<void * *>(sorted.getArray());
             MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
             void **temp = (void **) tempAttr.bufferBase();
-            qsortvecstableinplace(rows, numRows, *compare, temp);
+            sortRows(rows, numRows, temp);
         }
     }
 };
 
+class CStableQuickSortAlgorithm : public CStableInplaceSortAlgorithm
+{
+public:
+    CStableQuickSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
+
+    virtual void sortRows(void * * rows, size_t numRows, void * * temp)
+    {
+        qsortvecstableinplace(rows, numRows, *compare, temp);
+    }
+};
+
+class CMergeSortAlgorithm : public CStableInplaceSortAlgorithm
+{
+public:
+    CMergeSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
+
+    virtual void sortRows(void * * rows, size_t numRows, void * * temp)
+    {
+        msortvecstableinplace(rows, numRows, *compare, temp);
+    }
+};
+
+class CParallelMergeSortAlgorithm : public CStableInplaceSortAlgorithm
+{
+public:
+    CParallelMergeSortAlgorithm(ICompare *_compare) : CStableInplaceSortAlgorithm(_compare) {}
+
+    virtual void sortRows(void * * rows, size_t numRows, void * * temp)
+    {
+        parmsortvecstableinplace(rows, numRows, *compare, temp);
+    }
+};
+
 #define INSERTION_SORT_BLOCKSIZE 1024
 
 class SortedBlock : public CInterface, implements IInterface
@@ -1097,7 +1137,7 @@ public:
     }
 };
 
-class CSpillingQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, implements roxiemem::IBufferedRowCallback
+class CSpillingSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, implements roxiemem::IBufferedRowCallback
 {
     enum {
         InitialSortElements = 0,
@@ -1118,18 +1158,20 @@ class CSpillingQuickSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, imp
     bool stable;
 
 public:
-    CSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
+    CSpillingSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
         : rowsToSort(&_rowManager, InitialSortElements, CommitStep, _activityId),
           rowManager(_rowManager), compare(_compare), rowMeta(_rowMeta), ctx(_ctx), tempDirectory(_tempDirectory), activityId(_activityId), stable(_stable)
     {
         rowManager.addRowBuffer(this);
     }
-    ~CSpillingQuickSortAlgorithm()
+    ~CSpillingSortAlgorithm()
     {
         rowManager.removeRowBuffer(this);
         diskReader.clear();
     }
 
+    virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp) = 0;
+
     virtual void prepare(IInputBase *input)
     {
         loop
@@ -1172,20 +1214,7 @@ public:
         }
         else
         {
-            unsigned numRows = rowsToSort.numCommitted();
-            if (numRows)
-            {
-                void ** rows = const_cast<void * *>(rowsToSort.getBlock(numRows));
-                //MORE: Should this be parallel?  Should that be dependent on whether it is grouped?  Should be a hint.
-                if (stable)
-                {
-                    MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
-                    void **temp = (void **) tempAttr.bufferBase();
-                    qsortvecstableinplace(rows, numRows, *compare, temp);
-                }
-                else
-                    qsortvec(rows, numRows, *compare);
-            }
+            sortCommitted();
             sorted.transferFrom(rowsToSort);
         }
     }
@@ -1223,14 +1252,31 @@ public:
     }
 
 protected:
+    void sortCommitted()
+    {
+        unsigned numRows = rowsToSort.numCommitted();
+        if (numRows)
+        {
+            void ** rows = const_cast<void * *>(rowsToSort.getBlock(numRows));
+            //MORE: Should this be parallel?  Should that be dependent on whether it is grouped?  Should be a hint.
+            if (stable)
+            {
+                MemoryAttr tempAttr(numRows*sizeof(void **)); // Temp storage for stable sort. This should probably be allocated from roxiemem
+                void **temp = (void **) tempAttr.bufferBase();
+                sortRows(rows, numRows, *compare, temp);
+            }
+            else
+                sortRows(rows, numRows, *compare, NULL);
+        }
+    }
     bool spillRows()
     {
         unsigned numRows = rowsToSort.numCommitted();
         if (numRows == 0)
             return false;
 
+        sortCommitted();
         const void * * rows = rowsToSort.getBlock(numRows);
-        qsortvec(const_cast<void * *>(rows), numRows, *compare);
 
         Owned<IRowWriter> out = queryMerger()->createWriteBlock();
         for (unsigned i= 0; i < numRows; i++)
@@ -1256,6 +1302,46 @@ protected:
     }
 };
 
+
+class CSpillingQuickSortAlgorithm : public CSpillingSortAlgorithm
+{
+public:
+    CSpillingQuickSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _stable)
+        : CSpillingSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _stable)
+    {
+    }
+
+    virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp)
+    {
+        if (stableTemp)
+            qsortvecstableinplace(rows, numRows, compare, stableTemp);
+        else
+            qsortvec(rows, numRows, compare);
+    }
+};
+
+
+class CSpillingMergeSortAlgorithm : public CSpillingSortAlgorithm
+{
+public:
+    CSpillingMergeSortAlgorithm(ICompare *_compare, roxiemem::IRowManager &_rowManager, IOutputMetaData * _rowMeta, ICodeContext *_ctx, const char *_tempDirectory, unsigned _activityId, bool _parallel)
+        : CSpillingSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, true)
+    {
+        parallel = _parallel;
+    }
+
+    virtual void sortRows(void * * rows, size_t numRows, ICompare & compare, void * * stableTemp)
+    {
+        if (parallel)
+            parmsortvecstableinplace(rows, numRows, compare, stableTemp);
+        else
+            msortvecstableinplace(rows, numRows, compare, stableTemp);
+    }
+protected:
+    bool parallel;
+};
+
+
 extern ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare)
 {
     return new CQuickSortAlgorithm(_compare);
@@ -1296,6 +1382,14 @@ extern ISortAlgorithm *createSortAlgorithm(RoxieSortAlgorithm _algorithm, ICompa
     case spillingQuickSortAlgorithm:
     case stableSpillingQuickSortAlgorithm:
         return createSpillingQuickSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, _algorithm==stableSpillingQuickSortAlgorithm);
+    case mergeSortAlgorithm:
+        return new CMergeSortAlgorithm(_compare);
+    case parallelMergeSortAlgorithm:
+        return new CParallelMergeSortAlgorithm(_compare);
+    case spillingMergeSortAlgorithm:
+        return new CSpillingMergeSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, false);
+    case spillingParallelMergeSortAlgorithm:
+        return new CSpillingMergeSortAlgorithm(_compare, _rowManager, _rowMeta, _ctx, _tempDirectory, _activityId, true);
     default:
         break;
     }

+ 5 - 1
common/thorhelper/roxiehelper.hpp

@@ -99,7 +99,11 @@ public:
 
 //==============================================================================================================
 
-typedef enum {heapSortAlgorithm, insertionSortAlgorithm, quickSortAlgorithm, stableQuickSortAlgorithm, spillingQuickSortAlgorithm, stableSpillingQuickSortAlgorithm, unknownSortAlgorithm } RoxieSortAlgorithm;
+typedef enum { heapSortAlgorithm, insertionSortAlgorithm,
+              quickSortAlgorithm, stableQuickSortAlgorithm, spillingQuickSortAlgorithm, stableSpillingQuickSortAlgorithm,
+              mergeSortAlgorithm, spillingMergeSortAlgorithm,
+              parallelMergeSortAlgorithm, spillingParallelMergeSortAlgorithm,
+              unknownSortAlgorithm } RoxieSortAlgorithm;
 
 interface ISortAlgorithm : extends IInterface
 {

+ 4 - 0
roxie/ccd/ccdserver.cpp

@@ -7376,6 +7376,10 @@ public:
             sortAlgorithm = heapSortAlgorithm; // NOTE - we do allow UNSTABLE('heapsort') in order to facilitate runtime selection. Also explicit selection of heapsort overrides request to spill
         else if (stricmp(algorithmName, "insertionsort")==0)
             sortAlgorithm = insertionSortAlgorithm;
+        else if (stricmp(algorithmName, "mergesort")==0)
+            sortAlgorithm = (sortFlags & TAFspill) ? spillingMergeSortAlgorithm : mergeSortAlgorithm;
+        else if (stricmp(algorithmName, "parmergesort")==0)
+            sortAlgorithm = (sortFlags & TAFspill) ? spillingParallelMergeSortAlgorithm : parallelMergeSortAlgorithm;
         else
         {
             if (*algorithmName)