Bläddra i källkod

HPCC-25532 Implement COMBINE,GROUP in Thor

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 4 år sedan
förälder
incheckning
3727ece2bc

+ 0 - 2
testing/regress/ecl/combinegroup.ecl

@@ -15,8 +15,6 @@
     limitations under the License.
 ############################################################################## */
 
-//nothor
-
 inrec := record
 unsigned6 did;
     end;

+ 105 - 0
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -584,6 +584,106 @@ public:
     }
 };
 
+/////
+
+class CCombineGroupSlaveActivity : public CSlaveActivity
+{
+    IHThorCombineGroupArg *helper;
+    IEngineRowStream *rightInputStream = nullptr;
+    bool grouped = false;
+    bool eos = false;
+    bool anyThisGroup = false;
+    CThorExpandingRowArray group;
+
+public:
+    CCombineGroupSlaveActivity(CGraphElementBase *_container) 
+        : CSlaveActivity(_container), group(*this, this)
+    {
+        grouped = container.queryGrouped();
+        helper = (IHThorCombineGroupArg *) queryHelper();
+        setRequireInitData(false);
+        appendOutputLinked(this);
+    }
+    virtual void start() override
+    {
+        ActivityTimer s(slaveTimerStats, timeActivities);
+        ForEachItemIn(i, inputs)
+        {
+            try { startInput(i); }
+            catch (CATCHALL)
+            {
+                ActPrintLog("COMBINE(%" ACTPF "d): Error staring input %d", container.queryId(), i);
+                throw;
+            }
+        }
+        rightInputStream = queryInputStream(1);
+        dataLinkStart();
+    }
+    virtual void stop() override
+    {
+        stopAllInputs();
+        dataLinkStop();
+    }
+    CATCH_NEXTROW()
+    {
+        ActivityTimer t(slaveTimerStats, timeActivities);
+        if (eos)
+            return nullptr;
+        while (true)
+        {
+            OwnedConstThorRow left(inputStream->nextRow());
+            if (!left)
+            {
+                if (grouped)
+                {
+                    if (anyThisGroup)
+                        anyThisGroup = false;
+                    else
+                        eos = true;
+                    return nullptr;
+                }
+                else
+                {
+                    OwnedConstThorRow nextRight(rightInputStream->nextRow());
+                    if (nextRight)
+                        throw MakeActivityException(this, -1, "Missing LEFT record for Combine group");
+                    eos = true;
+                }
+                return nullptr;
+            }
+
+            while (true)
+            {
+                const void * in = rightInputStream->nextRow();
+                if (!in)
+                    break;
+                group.append(in);
+            }
+
+            if (0 == group.ordinality())
+                throw MakeActivityException(this, -1, "Missing RIGHT group for Combine Group");
+
+            RtlDynamicRowBuilder rowBuilder(queryRowAllocator());
+            size32_t outSize = helper->transform(rowBuilder, left, group.ordinality(), group.getRowArray());
+            group.kill();
+            if (outSize)
+            {
+                anyThisGroup = true;
+                dataLinkIncrement();
+                return rowBuilder.finalizeRowClear(outSize);
+            }
+        }
+    }
+    virtual bool isGrouped() const override
+    {
+        return queryInput(0)->isGrouped();
+    }
+    virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
+    {
+        initMetaInfo(info);
+        // TBD I think this should say max out = lhs set.
+    }
+};
 
 /////
 
@@ -1033,6 +1133,11 @@ CActivityBase *createCombineSlave(CGraphElementBase *container)
     return new CombineSlaveActivity(container);
 }
 
+CActivityBase *createCombineGroupSlave(CGraphElementBase *container)
+{
+    return new CCombineGroupSlaveActivity(container);
+}
+
 CActivityBase *createRegroupSlave(CGraphElementBase *container)
 {
     return new RegroupSlaveActivity(container);

+ 1 - 0
thorlcr/activities/funnel/thfunnelslave.ipp

@@ -31,6 +31,7 @@
 
 activityslaves_decl CActivityBase *createFunnelSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createCombineSlave(CGraphElementBase *container);
+activityslaves_decl CActivityBase *createCombineGroupSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createRegroupSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createNonEmptySlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createNWaySelectSlave(CGraphElementBase *container);

+ 1 - 0
thorlcr/master/thactivitymaster.cpp

@@ -141,6 +141,7 @@ public:
             case TAKapply:
             case TAKfunnel:
             case TAKcombine:
+            case TAKcombinegroup:
             case TAKregroup:
             case TAKsorted:
             case TAKnwayinput:

+ 3 - 0
thorlcr/slave/slave.cpp

@@ -503,6 +503,9 @@ public:
             case TAKcombine:
                 ret = createCombineSlave(this);
                 break;
+            case TAKcombinegroup:
+                ret = createCombineGroupSlave(this);
+                break;
             case TAKregroup:
                 ret = createRegroupSlave(this);
                 break;