Sfoglia il codice sorgente

HPCC-12267 Initial implementation of roxie QUANTILE activity

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 10 anni fa
parent
commit
2c594aad29

+ 24 - 4
common/thorhelper/roxiehelper.cpp

@@ -618,7 +618,22 @@ extern IGroupedInput *createSortedGroupedInputReader(IInputBase *_input, const I
 
 //========================================================================================= 
 
-class CInplaceSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+class CSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+{
+public:
+    virtual void getSortedGroup(ConstPointerArray & result)
+    {
+        loop
+        {
+            const void * row = next();
+            if (!row)
+                return;
+            result.append(row);
+        }
+    }
+};
+
+class CInplaceSortAlgorithm : public CSortAlgorithm
 {
 protected:
     unsigned curIndex;
@@ -645,6 +660,11 @@ public:
         curIndex = 0;
         sorted.kill();
     }
+    virtual void getSortedGroup(ConstPointerArray & result)
+    {
+        sorted.swapWith(result);
+        curIndex = 0;
+    }
 };
 
 class CQuickSortAlgorithm : public CInplaceSortAlgorithm
@@ -802,7 +822,7 @@ public:
     }
 };
 
-class CInsertionSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+class CInsertionSortAlgorithm : public CSortAlgorithm
 {
     SortedBlock *curBlock;
     unsigned blockNo;
@@ -937,7 +957,7 @@ public:
     }
 };
 
-class CHeapSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>
+class CHeapSortAlgorithm : public CSortAlgorithm
 {
     unsigned curIndex;
     ConstPointerArray sorted;
@@ -1137,7 +1157,7 @@ public:
     }
 };
 
-class CSpillingSortAlgorithm : implements CInterfaceOf<ISortAlgorithm>, implements roxiemem::IBufferedRowCallback
+class CSpillingSortAlgorithm : public CSortAlgorithm, implements roxiemem::IBufferedRowCallback
 {
     enum {
         InitialSortElements = 0,

+ 1 - 0
common/thorhelper/roxiehelper.hpp

@@ -110,6 +110,7 @@ interface ISortAlgorithm : extends IInterface
     virtual void prepare(IInputBase *input) = 0;
     virtual const void *next() = 0;
     virtual void reset() = 0;
+    virtual void getSortedGroup(ConstPointerArray & result) = 0;
 };
 
 extern THORHELPER_API ISortAlgorithm *createQuickSortAlgorithm(ICompare *_compare);

+ 2 - 0
roxie/ccd/ccdquery.cpp

@@ -751,6 +751,8 @@ protected:
             return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKxmlparse:
             return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind);
+        case TAKquantile:
+            return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKregroup:
             return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKcombine:

+ 182 - 0
roxie/ccd/ccdserver.cpp

@@ -7466,6 +7466,188 @@ IRoxieServerActivityFactory *createRoxieServerSortActivityFactory(unsigned _id,
 
 //=====================================================================================================
 
+class CRoxieServerQuantileActivity : public CRoxieServerActivity
+{
+protected:
+    Owned<ISortAlgorithm> sorter;
+    IHThorQuantileArg &helper;
+    ConstPointerArray sorted;
+    ICompare *compare;
+    unsigned flags;
+    double skew;
+    unsigned __int64 numDivisions;
+    bool calculated;
+    bool processedAny;
+    bool anyThisGroup;
+    bool eof;
+    unsigned curQuantile;
+    unsigned curIndex;
+    unsigned curIndexExtra;
+    unsigned skipSize;
+    unsigned skipExtra;
+
+public:
+    CRoxieServerQuantileActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _flags)
+        : CRoxieServerActivity(_factory, _probeManager), helper((IHThorQuantileArg &)basehelper), flags(_flags)
+    {
+        compare = helper.queryCompare();
+        skew = 0.0;
+        numDivisions = 0;
+        calculated = false;
+        processedAny = false;
+        anyThisGroup = false;
+        eof = false;
+        curQuantile = 0;
+        curIndex = 0;
+        curIndexExtra = 0;
+        skipSize = 0;
+        skipExtra = 0;
+        if (flags & TQFunstable)
+            sorter.setown(new CQuickSortAlgorithm(compare));
+        else
+            sorter.setown(new CStableQuickSortAlgorithm(compare));
+    }
+
+    virtual void reset()
+    {
+        sorter->reset();
+        calculated = false;
+        processedAny = false;
+        anyThisGroup = false;
+        CRoxieServerActivity::reset();
+    }
+
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        skew = helper.getSkew();
+        numDivisions = helper.getNumDivisions();
+        //Check for -ve integer values and treat as out of range
+        if ((__int64)numDivisions < 1)
+            numDivisions = 1;
+        calculated = false;
+        processedAny = false;
+        anyThisGroup = false;
+        eof = false;
+        curQuantile = 0;
+        curIndex = 0;
+        curIndexExtra = 0;
+    }
+
+    virtual bool needsAllocator() const { return true; }
+
+    virtual const void * nextInGroup()
+    {
+        ActivityTimer t(totalCycles, timeActivities);
+        if (eof)
+            return NULL;
+
+        const void * ret = NULL;
+        for(;;)
+        {
+            if (!calculated)
+            {
+                sorter->prepare(input);
+                sorter->getSortedGroup(sorted);
+
+                if (sorted.ordinality() == 0)
+                {
+                    if (processedAny)
+                    {
+                        eof = true;
+                        return NULL;
+                    }
+
+                    //Unusual case 0 rows - add a default row instead
+                    RtlDynamicRowBuilder rowBuilder(rowAllocator);
+                    size32_t thisSize = helper.createDefault(rowBuilder);
+                    sorted.append(rowBuilder.finalizeRowClear(thisSize));
+                }
+
+                calculated = true;
+                processedAny = true;
+                anyThisGroup = false;
+                curQuantile = 0;
+                curIndex = 0;
+                curIndexExtra = (numDivisions-1) / 2;   // to ensure correctly rounded up
+                skipSize = (sorted.ordinality() / numDivisions);
+                skipExtra = (sorted.ordinality() % numDivisions);
+            }
+
+            const void * lhs = sorted.item(curIndex);
+            unsigned outSize;
+            RtlDynamicRowBuilder rowBuilder(rowAllocator);
+            try
+            {
+                outSize = helper.transform(rowBuilder, lhs, curQuantile);
+            }
+            catch (IException *E)
+            {
+                throw makeWrappedException(E);
+            }
+
+            if (outSize)
+                ret = rowBuilder.finalizeRowClear(outSize);
+
+            curIndex += skipSize;
+            curIndexExtra += skipExtra;
+            if (curIndexExtra >= numDivisions)
+            {
+                curIndex++;
+                curIndexExtra -= numDivisions;
+            }
+            //Ensure the current index always stays valid.
+            if (curIndex >= sorted.ordinality())
+                curIndex = sorted.ordinality()-1;
+            curQuantile++;
+            if (curQuantile > numDivisions)
+            {
+                sorted.kill();
+                sorter->reset();
+                calculated = false; // ready for next group
+            }
+
+            if (ret)
+            {
+                anyThisGroup = true;
+                processed++;
+                return ret;
+            }
+
+            if (curQuantile > numDivisions)
+            {
+                if (anyThisGroup)
+                    return NULL;
+            }
+        }
+    }
+};
+
+class CRoxieServerQuantileActivityFactory : public CRoxieServerActivityFactory
+{
+    unsigned flags;
+
+public:
+    CRoxieServerQuantileActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind)
+    {
+        Owned<IHThorQuantileArg> quantileHelper = (IHThorQuantileArg *) helperFactory();
+        flags = quantileHelper->getFlags();
+    }
+
+    virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
+    {
+        return new CRoxieServerQuantileActivity(this, _probeManager, flags);
+    }
+};
+
+IRoxieServerActivityFactory *createRoxieServerQuantileActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
+{
+    return new CRoxieServerQuantileActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
+}
+
+//=====================================================================================================
+
 class CRoxieServerSortedActivity : public CRoxieServerActivity
 {
     IHThorSortedArg &helper;

+ 1 - 0
roxie/ccd/ccdserver.hpp

@@ -409,6 +409,7 @@ extern IRoxieServerActivityFactory *createRoxieServerSoapRowActionActivityFactor
 extern IRoxieServerActivityFactory *createRoxieServerSoapDatasetCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerSoapDatasetActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerLinkedRawIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerQuantileActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 
 extern IRoxieServerActivityFactory *createRoxieServerNWayGraphLoopResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned graphId);
 extern IRoxieServerActivityFactory *createRoxieServerNWayInputActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);