Parcourir la source

Merge pull request #13160 from ghalliday/issue23133

HPCC-23133 Improve pickBestEngine for inline datasets and dataset(row)

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman il y a 5 ans
Parent
commit
3e583da888
2 fichiers modifiés avec 109 ajouts et 1 suppressions
  1. 4 1
      ecl/hqlcpp/hqlhtcpp.cpp
  2. 105 0
      ecl/regress/issue23133.ecl

+ 4 - 1
ecl/hqlcpp/hqlhtcpp.cpp

@@ -19530,7 +19530,6 @@ bool HqlCppTranslator::needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_attr:
     case no_attr_expr:
     case no_attr_link:
-    case no_datasetfromrow:
     case no_rows:
     case no_libraryinput:
     case no_fail:
@@ -19629,6 +19628,7 @@ bool HqlCppTranslator::needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_globalscope:
     case no_extractresult:
     case no_pure:
+    case no_datasetfromrow:
         return needsRealThor(expr->queryChild(0), flags);
 
     case no_call:
@@ -19645,6 +19645,9 @@ bool HqlCppTranslator::needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_fetch:
         return needsRealThor(expr->queryChild(1), flags);
 
+    case no_inlinetable:
+        return expr->hasAttribute(distributedAtom) || expr->hasAttribute(localAtom) || !expr->queryChild(0)->isConstant();
+
     case no_output:
         {
             //Assume any output to files needs to stay where it is.

+ 105 - 0
ecl/regress/issue23133.ecl

@@ -0,0 +1,105 @@
+#option ('pickBestEngine', true);
+IMPORT Java;
+
+J(STRING x) := MODULE
+    EXPORT unsigned Accumulator(integer dummy, DATA s = d'') := EMBED(Java: PERSIST('workunit'), GLOBALSCOPE(x))
+    import java.io.*;
+    class Accumulator implements Serializable
+    {
+        public Accumulator(int dummy, byte[] s)
+        {
+            System.out.print("construct ");
+            System.out.print(dummy);
+            System.out.print(" ");
+            if (s.length!=0)
+            {
+                ByteArrayInputStream bis = new ByteArrayInputStream(s);
+                try
+                {
+                    ObjectInput in = new ObjectInputStream(bis);
+                    System.out.print("deserialize ");
+                    Accumulator other = (Accumulator) in.readObject();
+                    n = other.n;
+                }
+                catch (Exception ex)
+                {
+                }
+            }
+            System.out.println(n);
+        }
+        public synchronized int accumulate(int i)
+        {
+            System.out.print("accumulate ");
+            System.out.print(n);
+            System.out.print(" + ");
+            System.out.println(i);
+            n = n+i;
+            return n;
+        }
+        public int gather(byte [] other)
+        {
+            Accumulator o = new Accumulator(2, other);
+            System.out.print("gather ");
+            System.out.print(n);
+            System.out.print(" + ");
+            System.out.println(o.n);
+            n = n + o.n;
+            return n;
+        }
+        public byte[] serialize()
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            try
+            {
+                ObjectOutput out = new ObjectOutputStream(bos);
+                out.writeObject(this);
+                out.flush();
+            }
+            catch (Exception e)
+            {
+            }
+            return bos.toByteArray();
+        }
+        private int n = 0;
+    }
+    ENDEMBED;
+
+    EXPORT integer accumulate(unsigned obj, integer i) := IMPORT(Java, 'accumulate');
+    EXPORT data serialize(unsigned obj) := IMPORT(Java, 'serialize');
+    EXPORT integer gather(unsigned obj1, data sobj2) := IMPORT(Java, 'gather');
+END;
+
+j1 := J('Accum');
+j2 := J('merge');
+
+r := record
+  integer i;
+end;
+
+o := RECORD
+  unsigned obj;
+  integer v;
+END;
+
+o accum(r l, o r) := TRANSFORM
+  SELF.obj := IF (r.obj=0,j1.Accumulator(nofold(r.obj), d''),r.obj);
+  SELF.v := j1.accumulate(SELF.obj, l.i);
+END;
+
+d1 := DISTRIBUTE(NOFOLD(DATASET([{1}, {2}, {3}], r)), i);
+
+accumulated := AGGREGATE(d1, o, accum(LEFT, RIGHT), LOCAL);
+
+sr := record
+  DATA obj;    // serialized object
+end;
+
+tomerge := DISTRIBUTE(PROJECT(NOFOLD(accumulated), TRANSFORM(sr, SELF.obj := j1.serialize(LEFT.obj))), 0);
+
+o domerge(sr l, o r) := TRANSFORM
+  SELF.obj := IF (r.obj=0,j2.Accumulator(nofold(r.obj), d''), r.obj);
+  SELF.v := j2.gather(SELF.obj, l.obj);
+END;
+
+merged := AGGREGATE(tomerge, o, domerge(LEFT, RIGHT), LOCAL);
+merged[1].v;