浏览代码

Initial work on supporting loop termination conditions

- Add a new flag and two new functions to provide ways of accessing the
  global whether to execute again condition without requiring the whole
  dataset in memory.

- Modify hthor and roxie so they can use the new functions

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 13 年之前
父节点
当前提交
57a71c7e21

+ 1 - 1
ecl/eclagent/agentctx.hpp

@@ -44,7 +44,7 @@ struct IHThorGraphResults : extends IEclGraphResults
 
 struct IHThorBoundLoopGraph : extends IInterface
 {
-    virtual IHThorGraphResult * execute(void * counterRow, ConstPointerArray & rows, const byte * parentExtract) = 0;
+    virtual IHThorGraphResults * execute(void * counterRow, ConstPointerArray & rows, const byte * parentExtract) = 0;
     virtual void execute(void * counterRow, IHThorGraphResults * graphLoopResults, const byte * parentExtract) = 0;
 };
 

+ 1 - 1
ecl/eclagent/eclagent.ipp

@@ -699,7 +699,7 @@ public:
     EclBoundLoopGraph(IAgentContext & _agent, IEclLoopGraph * _graph, IOutputMetaData * _resultMeta, unsigned _activityId);
     IMPLEMENT_IINTERFACE
 
-    virtual IHThorGraphResult * execute(void * counterRow, ConstPointerArray & rows, const byte * parentExtract);
+    virtual IHThorGraphResults * execute(void * counterRow, ConstPointerArray & rows, const byte * parentExtract);
     virtual void execute(void * counterRow, IHThorGraphResults * graphLoopResults, const byte * parentExtract);
 
 protected:

+ 2 - 2
ecl/eclagent/eclgraph.cpp

@@ -1806,7 +1806,7 @@ EclBoundLoopGraph::EclBoundLoopGraph(IAgentContext & _agent, IEclLoopGraph * _gr
     activityId = _activityId;
 }
 
-IHThorGraphResult * EclBoundLoopGraph::execute(void * counterRow, ConstPointerArray & rows, const byte * parentExtract)
+IHThorGraphResults * EclBoundLoopGraph::execute(void * counterRow, ConstPointerArray & rows, const byte * parentExtract)
 {
     Owned<GraphResults> results = new GraphResults(3);
 
@@ -1823,7 +1823,7 @@ IHThorGraphResult * EclBoundLoopGraph::execute(void * counterRow, ConstPointerAr
     }
 
     graph->executeChild(parentExtract, results, NULL);
-    return LINK(results->queryResult(0));
+    return results.getClear();
 }
 
 

+ 12 - 2
ecl/hql/hqlutil.cpp

@@ -2879,7 +2879,7 @@ IHqlExpression * convertRecordToTransform(IHqlExpression * record, bool canOmit)
 }
 
 
-IHqlExpression * createTranformForField(IHqlExpression * field, IHqlExpression * value)
+IHqlExpression * createTransformForField(IHqlExpression * field, IHqlExpression * value)
 {
     OwnedHqlExpr record = createRecord(field);
     OwnedHqlExpr self = getSelf(record);
@@ -2888,6 +2888,16 @@ IHqlExpression * createTranformForField(IHqlExpression * field, IHqlExpression *
     return createValue(no_transform, makeTransformType(record->getType()), assign.getClear());
 }
 
+IHqlExpression * convertScalarToRow(IHqlExpression * value, ITypeInfo * fieldType)
+{
+    if (!fieldType)
+        fieldType = value->queryType();
+
+    OwnedHqlExpr field = createField(unnamedAtom, LINK(fieldType), NULL, NULL);
+    OwnedHqlExpr record = createRecord(field);
+    OwnedHqlExpr transform = createTransformForField(field, value);
+    return createRow(no_createrow, LINK(transform));
+}
 
 inline bool isScheduleAction(IHqlExpression * expr)
 {
@@ -4346,7 +4356,7 @@ bool SplitDatasetAttributeTransformer::split(SharedHqlExpr & dataset, SharedHqlE
     case 2:
         {
             OwnedHqlExpr field = createField(unnamedAtom, value->getType(), NULL);
-            OwnedHqlExpr transform = createTranformForField(field, value);
+            OwnedHqlExpr transform = createTransformForField(field, value);
             OwnedHqlExpr combine = createDatasetF(no_combine, LINK(&newDatasets.item(0)), LINK(&newDatasets.item(1)), LINK(transform), LINK(selSeq), NULL);
             OwnedHqlExpr first = createRowF(no_selectnth, LINK(combine), getSizetConstant(1), createAttribute(noBoundCheckAtom), NULL);
             dataset.setown(createDatasetFromRow(first.getClear()));

+ 3 - 0
ecl/hql/hqlutil.hpp

@@ -169,6 +169,9 @@ extern HQL_API bool isSimpleTransformToMergeWith(IHqlExpression * expr);
 extern HQL_API IHqlExpression * queryUncastExpr(IHqlExpression * expr);
 extern HQL_API bool areConstant(const HqlExprArray & args);
 
+extern HQL_API IHqlExpression * createTransformForField(IHqlExpression * field, IHqlExpression * value);
+extern HQL_API IHqlExpression * convertScalarToRow(IHqlExpression * value, ITypeInfo * fieldType);
+
 
 inline void extendConditionOwn(SharedHqlExpr & cond, node_operator op, IHqlExpression * r)
 {

+ 2 - 0
ecl/hqlcpp/hqlcatom.cpp

@@ -388,6 +388,7 @@ _ATOM loadResourceAtom;
 _ATOM log10Atom;
 _ATOM lookupBlobAtom;
 _ATOM _loop_Atom;
+_ATOM _loopFirst_Atom;
 _ATOM ls42anAtom;
 _ATOM ls42axAtom;
 _ATOM ls42vnAtom;
@@ -1089,6 +1090,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1)
     MAKEATOM(loadResource);
     MAKEATOM(lookupBlob);
     MAKESYSATOM(loop);
+    MAKESYSATOM(loopFirst);
     log10Atom = createAtom("_log10");
     MAKEATOM(ls42an);
     MAKEATOM(ls42ax);

+ 1 - 0
ecl/hqlcpp/hqlcatom.hpp

@@ -388,6 +388,7 @@ extern _ATOM loadResourceAtom;
 extern _ATOM log10Atom;
 extern _ATOM lookupBlobAtom;
 extern _ATOM _loop_Atom;
+extern _ATOM _loopFirst_Atom;
 extern _ATOM ls42anAtom;
 extern _ATOM ls42axAtom;
 extern _ATOM ls42vnAtom;

+ 3 - 3
ecl/hqlcpp/hqlcpp.ipp

@@ -1204,7 +1204,6 @@ public:
     void doBuildReturnCompare(BuildCtx & ctx, IHqlExpression * expr, node_operator op, bool isBoolEquality);
     void buildReturnOrder(BuildCtx & ctx, IHqlExpression *sortList, const DatasetReference & dataset);
 
-    unique_id_t buildLoopSubgraph(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance);
     unique_id_t buildGraphLoopSubgraph(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance);
     unique_id_t buildRemoteSubgraph(BuildCtx & ctx, IHqlExpression * dataset, unique_id_t containerId);
         
@@ -1903,12 +1902,12 @@ public:
     IHqlExpression * addDataset(IHqlExpression * expr);
     void buildStmt(BuildCtx & ctx, IHqlExpression * expr);
     unique_id_t buildGraphLoopBody(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance);
-    unique_id_t buildLoopBody(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance);
+    unique_id_t buildLoopBody(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * filter, IHqlExpression * again, IHqlExpression * counter, unique_id_t containerId, bool multiInstance);
     unique_id_t buildRemoteGraph(BuildCtx & ctx, IHqlExpression * ds, unique_id_t containerId);
     void generateGraph(BuildCtx & ctx);
     void generatePrefetchGraph(BuildCtx & _ctx, OwnedHqlExpr * retGraphExpr, OwnedHqlExpr * retResultsExpr);
     bool isDatasetPresent(IHqlExpression * expr);
-    
+    unsigned queryLoopConditionResult() const { return loopAgainResult; }
 protected:
     void createBuilderAlias(BuildCtx & ctx, ParentExtract * extractBuilder);
 
@@ -1920,6 +1919,7 @@ protected:
     OwnedHqlExpr instanceExpr;
     OwnedHqlExpr resultInstanceExpr;
     OwnedHqlExpr represents;
+    unsigned loopAgainResult;
     HqlExprArray results;
     unsigned numResults;
 };

+ 47 - 11
ecl/hqlcpp/hqlcppds.cpp

@@ -1142,9 +1142,21 @@ bool isGraphIndependent(IHqlExpression * expr, IHqlExpression * graph)
 //---------------------------------------------------------------------------
 // Child dataset processing
 
-static IHqlExpression * createCounterAsResult(IHqlExpression * counter, IHqlExpression * represents, unsigned seq)
+static IHqlExpression * convertScalarToResult(IHqlExpression * value, ITypeInfo * fieldType, IHqlExpression * represents, unsigned seq)
+{
+    OwnedHqlExpr row = convertScalarToRow(value, fieldType);
+    OwnedHqlExpr ds = createDatasetFromRow(LINK(row));
+    HqlExprArray args;
+    args.append(*LINK(ds));
+    args.append(*LINK(represents));
+    args.append(*getSizetConstant(seq));
+    args.append(*createAttribute(rowAtom));
+    return createValue(no_setgraphresult, makeVoidType(), args);
+}
+
+static IHqlExpression * createScalarFromResult(ITypeInfo * scalarType, ITypeInfo * fieldType, IHqlExpression * represents, unsigned seq)
 {
-    OwnedHqlExpr counterField = createField(unnamedAtom, LINK(unsignedType), NULL, NULL);
+    OwnedHqlExpr counterField = createField(unnamedAtom, LINK(fieldType), NULL, NULL);
     OwnedHqlExpr counterRecord = createRecord(counterField);
     HqlExprArray args;
     args.append(*LINK(counterRecord));
@@ -1153,10 +1165,15 @@ static IHqlExpression * createCounterAsResult(IHqlExpression * counter, IHqlExpr
     args.append(*createAttribute(rowAtom));
     OwnedHqlExpr counterResult = createDataset(no_getgraphresult, args);
     OwnedHqlExpr select = createNewSelectExpr(createRow(no_selectnth, LINK(counterResult), getSizetConstant(1)), LINK(counterField));
-    OwnedHqlExpr cast = ensureExprType(select, counter->queryType());
+    OwnedHqlExpr cast = ensureExprType(select, scalarType);
     return createAlias(cast, internalAttrExpr);
 }
 
+static IHqlExpression * createCounterAsResult(IHqlExpression * counter, IHqlExpression * represents, unsigned seq)
+{
+    return createScalarFromResult(counter->queryType(), unsignedType, represents, seq);
+}
+
 ChildGraphBuilder::ChildGraphBuilder(HqlCppTranslator & _translator) 
 : translator(_translator)
 {
@@ -1166,6 +1183,7 @@ ChildGraphBuilder::ChildGraphBuilder(HqlCppTranslator & _translator)
     instanceExpr.setown(createQuoted(instanceName, makeBoolType()));
     represents.setown(createAttribute(graphAtom, LINK(idExpr)));
     numResults = 0;
+    loopAgainResult = 0;
 
     StringBuffer s;
     resultInstanceExpr.setown(createQuoted(appendUniqueId(s.append("res"), id), makeBoolType()));
@@ -1308,9 +1326,10 @@ void ChildGraphBuilder::createBuilderAlias(BuildCtx & ctx, ParentExtract * extra
     ctx.addQuoted(s);
 }
 
-unique_id_t ChildGraphBuilder::buildLoopBody(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance)
+unique_id_t ChildGraphBuilder::buildLoopBody(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * filter, IHqlExpression * again, IHqlExpression * counter, unique_id_t containerId, bool multiInstance)
 {
-    OwnedHqlExpr transformedBody = LINK(body);
+    LinkedHqlExpr transformedBody = body;
+    LinkedHqlExpr transformedAgain = again;
     numResults = 2;
 
     //Result 1 is the input dataset.
@@ -1333,6 +1352,13 @@ unique_id_t ChildGraphBuilder::buildLoopBody(BuildCtx & ctx, IHqlExpression * da
     {
         OwnedHqlExpr select = createCounterAsResult(counter, represents, 2);
         transformedBody.setown(replaceExpression(transformedBody, counter, select));
+        if (transformedAgain)
+        {
+            //The COUNTER for the global termination condition is whether to execute iteration COUNTER, 1=1st iter
+            //Since we're evaluating the condition in the previous iteration it needs to be increased by 1.
+            OwnedHqlExpr nextCounter = adjustValue(select, 1);
+            transformedAgain.setown(replaceExpression(transformedAgain, counter, nextCounter));
+        }
         numResults = 3;
     }
 
@@ -1344,6 +1370,22 @@ unique_id_t ChildGraphBuilder::buildLoopBody(BuildCtx & ctx, IHqlExpression * da
 
     OwnedHqlExpr result = createValue(no_setgraphresult, makeVoidType(), LINK(transformedBody), LINK(represents), getSizetConstant(0), createAttribute(_loop_Atom));
 
+    if (transformedAgain)
+    {
+        LinkedHqlExpr nextLoopDataset = transformedBody;
+        if (filter)
+        {
+            //If there is a loop filter then the global condition is applied to dataset filtered by that.
+            OwnedHqlExpr mappedFilter = replaceSelector(filter, left, nextLoopDataset);
+            nextLoopDataset.setown(createDataset(no_filter, nextLoopDataset.getClear(), LINK(mappedFilter)));
+        }
+        transformedAgain.setown(replaceExpression(transformedAgain, rowsExpr, nextLoopDataset));
+        OwnedHqlExpr againResult = convertScalarToResult(transformedAgain, queryBoolType(), represents, 3);
+        result.setown(createCompound(result.getClear(), againResult.getClear()));
+        loopAgainResult = 3;
+        numResults = 4;
+    }
+
     BuildCtx subctx(ctx);
     subctx.addGroup();
 
@@ -1661,12 +1703,6 @@ void HqlCppTranslator::buildChildDataset(BuildCtx & ctx, IHqlExpression * expr,
 }
 
 
-unique_id_t HqlCppTranslator::buildLoopSubgraph(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance)
-{
-    ChildGraphBuilder builder(*this);
-    return builder.buildLoopBody(ctx, dataset, selSeq, rowsid, body, counter, containerId, multiInstance);
-}
-
 unique_id_t HqlCppTranslator::buildGraphLoopSubgraph(BuildCtx & ctx, IHqlExpression * dataset, IHqlExpression * selSeq, IHqlExpression * rowsid, IHqlExpression * body, IHqlExpression * counter, unique_id_t containerId, bool multiInstance)
 {
     ChildGraphBuilder builder(*this);

+ 11 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -7972,6 +7972,10 @@ ABoundActivity * HqlCppTranslator::doBuildActivityLoop(BuildCtx & ctx, IHqlExpre
         buildReturn(funcctx, loopCond);
     }
 
+    IHqlExpression * loopFirst = queryPropertyChild(expr, _loopFirst_Atom, 0);
+    if (loopFirst)
+        doBuildBoolFunction(instance->startctx, "loopFirstTime", loopFirst);
+
     IHqlExpression * parallel = expr->queryProperty(parallelAtom);
     if (parallel && (targetHThor() || !count || loopCond))
         parallel = NULL;
@@ -8012,6 +8016,7 @@ ABoundActivity * HqlCppTranslator::doBuildActivityLoop(BuildCtx & ctx, IHqlExpre
     if (counter) flags.append("|LFcounter");
     if (parallel) flags.append("|LFparallel");
     if (filter) flags.append("|LFfiltered");
+    if (loopFirst) flags.append("|LFnewloopagain");
 
     if (flags.length())
         doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1);
@@ -8023,9 +8028,14 @@ ABoundActivity * HqlCppTranslator::doBuildActivityLoop(BuildCtx & ctx, IHqlExpre
     //output dataset is result 0
     //input dataset is fed in using result 1
     //counter (if required) is fed in using result 2[0].counter;
-    unique_id_t loopId = buildLoopSubgraph(subctx, dataset, selSeq, rowsid, body->queryChild(0), counter, instance->activityId, (parallel != NULL));
+    ChildGraphBuilder builder(*this);
+    unique_id_t loopId = builder.buildLoopBody(subctx, dataset, selSeq, rowsid, body->queryChild(0), filter, loopCond, counter, instance->activityId, (parallel != NULL));
     instance->addAttributeInt("_loopid", loopId);
 
+    unsigned loopAgainResult = builder.queryLoopConditionResult();
+    if (loopAgainResult)
+        doBuildUnsignedFunction(instance->startctx, "loopAgainResult", loopAgainResult);
+
     buildInstanceSuffix(instance);
 
     buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);

+ 34 - 0
ecl/hqlcpp/hqlttcpp.cpp

@@ -11521,6 +11521,40 @@ IHqlExpression * HqlTreeNormalizer::createTransformedBody(IHqlExpression * expr)
                 return getDebugValueExpr(translator.wu(), expr);
             break;
         }
+    case no_loop:
+        {
+            OwnedHqlExpr transformed = NewHqlTransformer::createTransformed(expr);
+            IHqlExpression * loopCond = queryRealChild(transformed, 3);
+            if (loopCond)
+            {
+                //Create a firstCond attribute so that the condition for whether to execute the loop
+                //the first time will be efficiently optimized.
+                IHqlExpression * dataset = transformed->queryChild(0);
+                IHqlExpression * filter = queryRealChild(transformed, 2);
+                IHqlExpression * rowsid = transformed->queryProperty(_rowsid_Atom);
+                IHqlExpression * selSeq = querySelSeq(transformed);
+                IHqlExpression * counter = queryPropertyChild(expr, _countProject_Atom, 0);
+                OwnedHqlExpr left = createSelector(no_left, dataset, selSeq);
+                OwnedHqlExpr rowsExpr = createDataset(no_rows, LINK(left), LINK(rowsid));
+                OwnedHqlExpr initialLoopDataset = LINK(dataset);
+                if (filter)
+                {
+                    //If there is a loop filter then the global condition is applied to dataset filtered by that.
+                    OwnedHqlExpr mappedFilter = replaceSelector(filter, left, dataset);
+                    initialLoopDataset.setown(createDataset(no_filter, initialLoopDataset.getClear(), LINK(mappedFilter)));
+                }
+                OwnedHqlExpr firstCond = replaceExpression(loopCond, rowsExpr, initialLoopDataset);
+                if (counter)
+                {
+                    //Whether to evaluate the 1st time round the loop requires COUNTER=1
+                    OwnedHqlExpr one = createConstant(createIntValue(1, counter->getType()));
+                    firstCond.setown(replaceExpression(firstCond, counter, one));
+                }
+                return appendOwnedOperand(transformed, createExprAttribute(_loopFirst_Atom, firstCond.getClear()));
+            }
+            return transformed.getClear();
+        }
+        break;
     }
 
     unsigned max = expr->numChildren();

+ 28 - 12
ecl/hthor/hthor.cpp

@@ -8814,6 +8814,8 @@ void CHThorLoopActivity::ready()
     maxIterations = helper.numIterations();
     if ((int)maxIterations < 0) maxIterations = 0;
     finishedLooping = ((kind == TAKloopcount) && (maxIterations == 0));
+    if ((flags & IHThorLoopArg::LFnewloopagain) && !helper.loopFirstTime())
+        finishedLooping = true;
     extractBuilder.clear();
     helper.createParentExtract(extractBuilder);
 }
@@ -8856,20 +8858,25 @@ const void * CHThorLoopActivity::nextInGroup()
         switch (kind)
         {
         case TAKloopdataset:
-            if (!helper.loopAgain(loopCounter, loopPending.ordinality(), (const void * *)loopPending.getArray()))
             {
-                if (loopPending.ordinality() == 0)
+                if (!(flags & IHThorLoopArg::LFnewloopagain))
                 {
-                    eof = true;
-                    return NULL;
-                }
+                    if (!helper.loopAgain(loopCounter, loopPending.ordinality(), (const void * *)loopPending.getArray()))
+                    {
+                        if (loopPending.ordinality() == 0)
+                        {
+                            eof = true;
+                            return NULL;
+                        }
 
-                arrayInput.init(&loopPending);
-                curInput = &arrayInput;
-                finishedLooping = true;
-                continue;       // back to the input loop again
+                        arrayInput.init(&loopPending);
+                        curInput = &arrayInput;
+                        finishedLooping = true;
+                        continue;       // back to the input loop again
+                    }
+                }
+                break;
             }
-            break;
         case TAKlooprow:
             if (loopPending.empty())
             {
@@ -8899,8 +8906,17 @@ const void * CHThorLoopActivity::nextInGroup()
             *((thor_loop_counter_t *)counterRow) = loopCounter;
         }
 
-        Owned<IHThorGraphResult> curResult = loopGraph->execute(counterRow, loopPending, extractBuilder.getbytes());
-        resultInput.init(curResult);
+        Owned<IHThorGraphResults> curResults = loopGraph->execute(counterRow, loopPending, extractBuilder.getbytes());
+        if (flags & IHThorLoopArg::LFnewloopagain)
+        {
+            IHThorGraphResult * result = curResults->queryResult(helper.loopAgainResult());
+            assertex(result);
+            const byte * row = static_cast<const byte *>(result->queryRow(0));
+            assertex(row);
+            if (!*row)
+                finishedLooping = true;
+        }
+        resultInput.init(curResults->queryResult(0));
         curInput = &resultInput;
 
         loopCounter++;

+ 41 - 0
ecl/regress/loopone.ecl

@@ -0,0 +1,41 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+
+namesRecord :=
+            RECORD
+string20        surname;
+string10        forename;
+integer2        age := 25;
+            END;
+
+namesTable := dataset([
+        {'Hawthorn','Gavin',40},
+        {'Hawthorn','Mia',30},
+        {'Smithe','Pru',20},
+        {'X','Z',10}], namesRecord);
+
+//case 5: a row filter and a global condition
+output(loop(namesTable, left.age < 40, count(rows(left)) > 1, project(rows(left), transform(namesRecord, self.age := left.age+3; self := left))));
+
+output(loop(namesTable, left.age < 15, count(rows(left)) > 1, project(rows(left), transform(namesRecord, self.age := left.age+3; self := left))));
+
+//The following are illegal - If they were allowedCOUNTER would need to be mapped for the filter condition.
+//output(loop(namesTable, left.age < 40 - COUNTER, count(rows(left)) > 1, project(rows(left), transform(namesRecord, self.age := left.age+3; self := left))));
+
+//output(loop(namesTable, left.age < 15 - COUNTER, count(rows(left)) > 1, project(rows(left), transform(namesRecord, self.age := left.age+3; self := left))));

+ 33 - 16
roxie/ccd/ccdserver.cpp

@@ -13620,6 +13620,8 @@ public:
         maxIterations = (int) helper.numIterations();
         if (maxIterations < 0) maxIterations = 0;
         finishedLooping = ((activityKind == TAKloopcount) && (maxIterations == 0));
+        if ((flags & IHThorLoopArg::LFnewloopagain) && !helper.loopFirstTime())
+            finishedLooping = true;
         loopExtractBuilder.clear();
         helper.createParentExtract(loopExtractBuilder);         // could possibly delay this until execution actually happens
     }
@@ -13722,20 +13724,25 @@ public:
             switch (activityKind)
             {
             case TAKloopdataset:
-                if (!helper.loopAgain(loopCounter, loopPending.ordinality(), (const void * *)loopPending.getArray()))
                 {
-                    if (loopPending.ordinality() == 0)
+                    if (!(flags & IHThorLoopArg::LFnewloopagain))
                     {
-                        eof = true;
-                        return NULL;
-                    }
+                        if (!helper.loopAgain(loopCounter, loopPending.ordinality(), (const void * *)loopPending.getArray()))
+                        {
+                            if (loopPending.ordinality() == 0)
+                            {
+                                eof = true;
+                                return NULL;
+                            }
 
-                    arrayInput.init(&loopPending);
-                    curInput = &arrayInput;
-                    finishedLooping = true;
-                    continue;       // back to the input loop again
+                            arrayInput.init(&loopPending);
+                            curInput = &arrayInput;
+                            finishedLooping = true;
+                            continue;       // back to the input loop again
+                        }
+                    }
+                    break;
                 }
-                break;
             case TAKlooprow:
                 if (loopPending.empty())
                 {
@@ -13761,7 +13768,17 @@ public:
             checkAbort();
             try 
             {
-                resultInput.setown(executeIteration(loopExtractBuilder.size(), loopExtractBuilder.getbytes(), loopCounter, loopPending));
+                Owned<IRoxieGraphResults> results = executeIteration(loopExtractBuilder.size(), loopExtractBuilder.getbytes(), loopCounter, loopPending);
+                resultInput.setown(results->createIterator(0));
+
+                if (flags & IHThorLoopArg::LFnewloopagain)
+                {
+                    Owned<IRoxieInput> againResult = results->createIterator(helper.loopAgainResult());
+                    OwnedConstRoxieRow row  = againResult->nextInGroup();
+                    assertex(row);
+                    if (!*(const byte *)row.get())
+                        finishedLooping = true;
+                }
             }
             catch (IException *E)
             {
@@ -13776,7 +13793,7 @@ public:
         }
     }
 
-    IRoxieInput * executeIteration(unsigned parentExtractSize, const byte *parentExtract, unsigned counter, ConstPointerArray & rows)
+    IRoxieGraphResults * executeIteration(unsigned parentExtractSize, const byte *parentExtract, unsigned counter, ConstPointerArray & rows)
     {
         try
         {
@@ -13787,7 +13804,7 @@ public:
 
             createCounterResult(loopGraph, counter);
 
-            Owned<IRoxieInput> ret = loopGraph->execute(0, parentExtractSize, parentExtract);
+            Owned<IRoxieGraphResults> ret = loopGraph->execute(parentExtractSize, parentExtract);
             loopGraph->afterExecute();
             return ret.getClear();
         }
@@ -25004,7 +25021,7 @@ public:
     }
 };
 
-class CGraphResults : public CInterface, implements IEclGraphResults
+class CGraphResults : public CInterface, implements IRoxieGraphResults
 {
     IArrayOf<IGraphResult> results;
     CriticalSection cs;
@@ -26918,10 +26935,10 @@ public:
         reset();
     }
 
-    virtual IRoxieInput * execute(unsigned id, size32_t parentExtractSize, const byte *parentExtract)
+    virtual IRoxieGraphResults * execute(size32_t parentExtractSize, const byte *parentExtract)
     {
         doExecute(parentExtractSize, parentExtract);
-        return results->createIterator(id);
+        return LINK(results);
     }
     virtual void getResult(size32_t & retSize, void * & ret, unsigned id)
     {

+ 7 - 1
roxie/ccd/ccdserver.hpp

@@ -315,6 +315,12 @@ interface IRoxieServerLoopResultProcessor
     virtual IRoxieInput * connectIterationOutput(unsigned whichIteration, IProbeManager *probeManager, IArrayOf<IRoxieInput> &probes, IRoxieServerActivity *targetAct, unsigned targetIdx) = 0;
 };
 
+interface IRoxieGraphResults : extends IEclGraphResults
+{
+public:
+    virtual IRoxieInput * createIterator(unsigned id) = 0;
+};
+
 class CGraphIterationInfo;
 
 interface IRoxieServerChildGraph : public IInterface
@@ -325,7 +331,7 @@ interface IRoxieServerChildGraph : public IInterface
     virtual void setInputResult(unsigned id, IGraphResult * result) = 0;
     virtual bool querySetInputResult(unsigned id, IRoxieInput * result) = 0;
     virtual void stopUnusedOutputs() = 0;
-    virtual IRoxieInput * execute(unsigned id, size32_t parentExtractSize, const byte *parentExtract) = 0;
+    virtual IRoxieGraphResults * execute(size32_t parentExtractSize, const byte *parentExtract) = 0;
     virtual void afterExecute() = 0;
 //sequential graph related helpers
     virtual void clearGraphLoopResults() = 0;

+ 4 - 0
rtl/include/eclhelper.hpp

@@ -2401,6 +2401,7 @@ struct IHThorLoopArg : public IHThorArg
         LFparallel = 1,
         LFcounter = 2,
         LFfiltered = 4,
+        LFnewloopagain = 8,
     };
     virtual unsigned getFlags() = 0;
     virtual bool sendToLoop(unsigned counter, const void * in) = 0;         // does the input row go to output or round the loop?
@@ -2409,6 +2410,9 @@ struct IHThorLoopArg : public IHThorArg
     virtual void createParentExtract(rtlRowBuilder & builder) = 0;
     virtual unsigned defaultParallelIterations() = 0;
     virtual void numParallelIterations(size32_t & retSize, void * & retData) = 0;
+    //If new loop again is set the following should be used instead of loopAgain
+    virtual bool loopFirstTime() = 0;
+    virtual unsigned loopAgainResult() = 0;  // which result contains the indication of whether to loop again?
 };
 
 

+ 2 - 0
rtl/include/eclhelper_base.hpp

@@ -2946,6 +2946,8 @@ class CThorLoopArg : public CThorArg, implements IHThorLoopArg
     virtual bool loopAgain(unsigned counter, unsigned num, const void * * _rows)    { return num != 0; }
     virtual unsigned defaultParallelIterations() { return 0; }
     virtual void numParallelIterations(size32_t & retSize, void * & retData) { retSize = 0; retData = NULL; }
+    virtual bool loopFirstTime() { return false; }
+    virtual unsigned loopAgainResult() { return 0; }
 };