浏览代码

gh-518 Implement WHEN action in roxie and codegen

Implement the action form of WHEN in roxie, and fix the code generator
so the code is more suitable for implementing in hthor as well

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 13 年之前
父节点
当前提交
26bad584ef

+ 4 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -14446,7 +14446,10 @@ ABoundActivity * HqlCppTranslator::doBuildActivityExecuteWhen(BuildCtx & ctx, IH
     buildInstancePrefix(instance);
     buildInstanceSuffix(instance);
 
-    buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);
+    if (expr->isAction())
+        addDependency(ctx, boundDataset, instance->queryBoundActivity(), dependencyAtom, NULL, 1);
+    else
+        buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);
     addDependency(ctx, associatedActivity, instance->queryBoundActivity(), dependencyAtom, label, when);
 
     return instance->getBoundActivity();

+ 6 - 3
ecl/hqlcpp/hqlresource.cpp

@@ -2720,9 +2720,12 @@ void EclResourcer::createInitialGraph(IHqlExpression * expr, IHqlExpression * ow
             createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, false);
             return;
         case no_executewhen:
-            createInitialGraph(expr->queryChild(0), expr, thisGraph, UnconditionalLink, false);
-            createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, true);
-            return;
+            {
+                bool newGraph = expr->isAction() && (options.targetClusterType == HThorCluster);
+                createInitialGraph(expr->queryChild(0), expr, thisGraph, UnconditionalLink, newGraph);
+                createInitialGraph(expr->queryChild(1), expr, thisGraph, UnconditionalLink, true);
+                return;
+            }
         case no_keyindex:
         case no_newkeyindex:
             return;

+ 2 - 2
ecl/regress/when6.ecl

@@ -34,12 +34,12 @@ ds := dataset([
 
 simple := dedup(ds, f1);
 
-osum := output(TABLE(simple, { sum(group, f1) }, f3));
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
 
 x1 := when(simple, osum, parallel);
 
 o1 := output(TABLE(x1, { f1 }));
-o2 := output(TABLE(simple, { count(group) }, f3));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
 when(o1, o2, success);
 </Query>
 </Archive>

+ 45 - 0
ecl/regress/when6a.eclxml

@@ -0,0 +1,45 @@
+<Archive>
+<!--
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+-->
+<Query>
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(ds, f1);
+
+osum := output(TABLE(simple, { sum(group, f1) }, f3));
+
+x1 := when(simple, osum, success);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { count(group) }, f3));
+when(o1, o2);
+</Query>
+</Archive>

+ 45 - 0
ecl/regress/when6b.eclxml

@@ -0,0 +1,45 @@
+<Archive>
+<!--
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+-->
+<Query>
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(ds, f1);
+
+osum := output(TABLE(simple, { sum(group, f1) }, f3));
+
+x1 := when(simple, osum, failure);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { count(group) }, f3));
+when(o1, o2);
+</Query>
+</Archive>

+ 45 - 0
ecl/regress/when6c.eclxml

@@ -0,0 +1,45 @@
+<Archive>
+<!--
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+-->
+<Query>
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := limit(dedup(ds, f1),1);
+
+osum := output(TABLE(simple, { sum(group, f1) }, f3));
+
+x1 := when(simple, osum, failure);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { count(group) }, f3));
+when(o1, o2);
+</Query>
+</Archive>

+ 44 - 0
ecl/regress/when6h.ecl

@@ -0,0 +1,44 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+#option ('targetClusterType', 'hthor');
+
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(nofold(ds), f1);
+
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
+
+x1 := when(simple, osum, parallel);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
+when(o1, o2, success);

+ 2 - 0
roxie/ccd/ccdquery.cpp

@@ -535,6 +535,8 @@ protected:
             return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
         case TAKwhen_dataset:
             return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind);
+        case TAKwhen_action:
+            return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
 
         // These are not required in Roxie for the time being - code generator should trap them
         case TAKdistribution:

+ 70 - 0
roxie/ccd/ccdserver.cpp

@@ -18479,6 +18479,7 @@ public:
         savedExtractSize = parentExtractSize;
         savedExtract = parentExtract;
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        executeDependencies(parentExtractSize, parentExtract, WhenParallelId);        // MORE: This should probably be done in parallel!
     }
 
     virtual void stop(bool aborting)
@@ -18493,11 +18494,13 @@ public:
         ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext()); // bit of a waste of time....
         return input->nextInGroup();
     }
+
 protected:
     unsigned savedExtractSize;
     const byte *savedExtract;
 };
 
+
 class CRoxieServerWhenActivityFactory : public CRoxieServerActivityFactory
 {
 public:
@@ -18512,11 +18515,78 @@ public:
     }
 };
 
+
 extern IRoxieServerActivityFactory *createRoxieServerWhenActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind)
 {
     return new CRoxieServerWhenActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind);
 }
 
+
+//=================================================================================
+
+class CRoxieServerWhenActionActivity : public CRoxieServerActionBaseActivity
+{
+public:
+    CRoxieServerWhenActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
+        : CRoxieServerActionBaseActivity(_factory, _probeManager)
+    {
+        savedExtractSize = 0;
+        savedExtract = NULL;
+    }
+
+    virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
+    {
+        savedExtractSize = parentExtractSize;
+        savedExtract = parentExtract;
+        CRoxieServerActionBaseActivity::start(parentExtractSize, parentExtract, paused);
+        executeDependencies(parentExtractSize, parentExtract, WhenParallelId);        // MORE: This should probably be done in parallel!
+    }
+
+    virtual void stop(bool aborting)
+    {
+        if (state != STATEstopped)
+            executeDependencies(savedExtractSize, savedExtract, aborting ? WhenFailureId : WhenSuccessId);
+        CRoxieServerActionBaseActivity::stop(aborting);
+    }
+
+    virtual void doExecuteAction(unsigned parentExtractSize, const byte * parentExtract)
+    {
+        executeDependencies(parentExtractSize, parentExtract, 1);
+    }
+
+
+protected:
+    unsigned savedExtractSize;
+    const byte *savedExtract;
+};
+
+
+class CRoxieServerWhenActionActivityFactory : public CRoxieServerActivityFactory
+{
+public:
+    CRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot)
+        : CRoxieServerActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind), isRoot(_isRoot)
+    {
+    }
+
+    virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
+    {
+        return new CRoxieServerWhenActionActivity(this, _probeManager);
+    }
+
+    virtual bool isSink() const
+    {
+        return isRoot;
+    }
+private:
+    bool isRoot;
+};
+
+extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot)
+{
+    return new CRoxieServerWhenActionActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _isRoot);
+}
+
 //=================================================================================
 class CRoxieServerParseActivity : public CRoxieServerActivity, implements IMatchedAction
 {

+ 1 - 0
roxie/ccd/ccdserver.hpp

@@ -514,5 +514,6 @@ extern IRoxieServerActivityFactory *createRoxieServerParallelActionActivityFacto
 extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerStreamedIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
 extern IRoxieServerActivityFactory *createRoxieServerWhenActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
 
 #endif

+ 6 - 6
testing/ecl/key/when6.xml

@@ -1,15 +1,15 @@
 <Dataset name='Result 1'>
- <Row><c>1</c></Row>
- <Row><c>1</c></Row>
- <Row><c>1</c></Row>
-</Dataset>
-<Dataset name='Result 2'>
  <Row><s>3</s></Row>
  <Row><s>1</s></Row>
  <Row><s>9</s></Row>
 </Dataset>
-<Dataset name='Result 3'>
+<Dataset name='Result 2'>
  <Row><f1>1</f1></Row>
  <Row><f1>9</f1></Row>
  <Row><f1>3</f1></Row>
 </Dataset>
+<Dataset name='Result 3'>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+</Dataset>

+ 15 - 0
testing/ecl/key/when6a.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+ <Row><s>3</s></Row>
+ <Row><s>1</s></Row>
+ <Row><s>9</s></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><f1>1</f1></Row>
+ <Row><f1>9</f1></Row>
+ <Row><f1>3</f1></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+</Dataset>

+ 5 - 0
testing/ecl/key/when6b.xml

@@ -0,0 +1,5 @@
+<Dataset name='Result 2'>
+ <Row><f1>1</f1></Row>
+ <Row><f1>9</f1></Row>
+ <Row><f1>3</f1></Row>
+</Dataset>

+ 15 - 0
testing/ecl/key/when6c.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+ <Row><s>3</s></Row>
+ <Row><s>1</s></Row>
+ <Row><s>9</s></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><f1>1</f1></Row>
+ <Row><f1>9</f1></Row>
+ <Row><f1>3</f1></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+</Dataset>

+ 42 - 0
testing/ecl/when6a.ecl

@@ -0,0 +1,42 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(nofold(ds), f1);
+
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
+
+x1 := when(simple, osum, parallel);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
+when(o1, o2, success);

+ 42 - 0
testing/ecl/when6b.ecl

@@ -0,0 +1,42 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(nofold(ds), f1);
+
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
+
+x1 := when(simple, osum, failure);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
+when(o1, o2, failure);

+ 42 - 0
testing/ecl/when6c.ecl

@@ -0,0 +1,42 @@
+/*##############################################################################
+
+    Copyright (C) 2011 HPCC Systems.
+
+    All rights reserved. This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU Affero General Public License as
+    published by the Free Software Foundation, either version 3 of the
+    License, or (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Affero General Public License for more details.
+
+    You should have received a copy of the GNU Affero General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+############################################################################## */
+
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := limit(dedup(nofold(ds), f1),1);
+
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
+
+x1 := when(simple, osum, failure);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
+when(o1, o2, failure);