浏览代码

Merge pull request #5517 from richardkchapman/java-stream

HPCC-10460 Streamed dataset support for Java

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 11 年之前
父节点
当前提交
dea8746a6d

+ 4 - 6
common/thorhelper/thorxmlwrite.cpp

@@ -1121,10 +1121,6 @@ void CommonFieldProcessor::processQString(unsigned len, const char *value, const
     rtlQStrToStr(len, temp, len, value);
     processString(len, temp, field);
 }
-void CommonFieldProcessor::processSetAll(const RtlFieldInfo * field)
-{
-    result.append("ALL");
-}
 void CommonFieldProcessor::processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
 {   
     if (trim)
@@ -1132,12 +1128,14 @@ void CommonFieldProcessor::processUtf8(unsigned len, const char *value, const Rt
     outputXmlUtf8(len, value, NULL, result);
 }
 
-bool CommonFieldProcessor::processBeginSet(const RtlFieldInfo * field)
+bool CommonFieldProcessor::processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
 {
     result.append('[');
+    if (isAll)
+        result.append("ALL");
     return true;
 }
-bool CommonFieldProcessor::processBeginDataset(const RtlFieldInfo * field) 
+bool CommonFieldProcessor::processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
 {
     result.append('[');
     return true;

+ 2 - 3
common/thorhelper/thorxmlwrite.hpp

@@ -248,11 +248,10 @@ public:
     virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field);
     virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field);
     virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field);
-    virtual void processSetAll(const RtlFieldInfo * field);
     virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field);
 
-    virtual bool processBeginSet(const RtlFieldInfo * field);
-    virtual bool processBeginDataset(const RtlFieldInfo * field); 
+    virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data);
+    virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows);
     virtual bool processBeginRow(const RtlFieldInfo * field);
     virtual void processEndSet(const RtlFieldInfo * field);
     virtual void processEndDataset(const RtlFieldInfo * field);

二进制
initfiles/classes/JavaCat$NestedClass.class


二进制
initfiles/classes/JavaCat.class


二进制
initfiles/examples/embed/JavaCat$NestedClass.class


二进制
initfiles/examples/embed/JavaCat.class


+ 87 - 0
initfiles/examples/embed/JavaCat.java

@@ -1,3 +1,4 @@
+import java.util.*;
 public class JavaCat
 {
   public static int add1(int a)
@@ -79,4 +80,90 @@ public class JavaCat
     in[1] = t;
     return in;
   }
+
+  public static class NestedClass
+  {
+    String ufield;
+    public NestedClass(String s)
+    {
+      ufield = s;
+    }
+    public NestedClass()
+    {
+    }
+  }
+
+  boolean bfield;
+  int ifield;
+  long lfield;
+  double dfield;
+  float ffield;
+  String sfield;
+  char cfield1;
+  String cfield2;
+  NestedClass n;
+  boolean bset[];
+  byte [] dset[];
+  String sset[];
+  NestedClass sub[];
+
+  public JavaCat(boolean b, int i, double d)
+  {
+    bfield = b;
+    ifield = i;
+    lfield = i * 100000000;
+    dfield = d;
+    ffield = (float) d;
+    sfield = "Yoohoo";
+    cfield1 = 'X';
+    cfield2 = "Z";
+    n = new NestedClass("nest");
+    bset = new boolean [5];
+    bset[3] = b;
+    dset = new byte[1][];
+    dset[0] = new byte[1];
+    dset[0][0] = 14;
+    sset = new String[1];
+    sset[0] = "Hello";
+    sub = new NestedClass[1];
+    sub[0] = new NestedClass("subnest");
+  }
+
+  public JavaCat()
+  {
+    n = new NestedClass("nest2");
+  }
+
+  public static JavaCat returnrec(boolean b, int i, double d)
+  {
+    return new JavaCat(b,i,d);
+  }
+
+  public static String passrec(JavaCat j)
+  {
+    return j.n.ufield;
+  }
+
+  public static JavaCat transform(JavaCat in, int lim)
+  {
+    return new JavaCat(in.bfield, lim, in.dfield);
+  }
+
+  public static int passDataset(Iterator<JavaCat> d)
+  {
+    int sum = 0;
+    while (d.hasNext())
+    {
+      JavaCat r = d.next();
+      System.out.print(r.lfield);
+      System.out.println("");
+      sum += r.lfield;
+    }
+    return sum;
+  }
+
+  public static Iterator<JavaCat> passDataset2(JavaCat d[])
+  {
+    return Arrays.asList(d).iterator();
+  }
 }

+ 4 - 0
plugins/javaembed/CMakeLists.txt

@@ -37,8 +37,10 @@ if (USE_JNI)
              ./../../system/include
              ./../../rtl/eclrtl
              ./../../rtl/include
+             ./../../rtl/nbcd
              ./../../common/deftype
              ./../../system/jlib
+             ./../../roxie/roxiemem
              ${CMAKE_BINARY_DIR}
              ${CMAKE_BINARY_DIR}/oss
         )
@@ -53,6 +55,7 @@ if (USE_JNI)
     endif()
 
     install ( TARGETS javaembed DESTINATION plugins )
+    install ( FILES ${CMAKE_CURRENT_SOURCE_DIR}/HpccUtils.class DESTINATION classes/com/HPCCSystems COMPONENT Runtime)
 
     # We link against jsig so that signals are chained from the jvm
 
@@ -69,6 +72,7 @@ if (USE_JNI)
     #    ${JSIG_LIBRARY}
         ${JAVA_JVM_LIBRARY}
         eclrtl
+        roxiemem
         jlib
         )
   endif()

二进制
plugins/javaembed/HpccUtils.class


+ 43 - 0
plugins/javaembed/HpccUtils.java

@@ -0,0 +1,43 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+package com.HPCCSystems;
+
+import java.util.*;
+public class HpccUtils  implements Iterator
+{
+    private long handle;
+    private native static boolean _hasNext(long handle);
+    private native static java.lang.Object _next(long handle);
+
+    public HpccUtils(long _handle, String dllname)
+    {
+        System.load(dllname);
+        handle = _handle;
+    }
+    public native void remove();
+    {
+    }
+    public boolean hasNext()
+    {
+        return _hasNext(handle);
+    }
+    public java.lang.Object next()
+    {
+        return _next(handle);
+    }
+}

文件差异内容过多而无法显示
+ 1126 - 18
plugins/javaembed/javaembed.cpp


+ 2 - 6
plugins/mysql/mysqlembed.cpp

@@ -667,10 +667,6 @@ public:
         rtlQStrToStrX(charCount, text.refstr(), len, value);
         processString(charCount, text.getstr(), field);
     }
-    virtual void processSetAll(const RtlFieldInfo * field)
-    {
-        UNSUPPORTED("ALL sets");
-    }
     virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
     {
         size32_t utf8chars;
@@ -682,11 +678,11 @@ public:
         bindInfo.length = &bindInfo.buffer_length;
     }
 
-    virtual bool processBeginSet(const RtlFieldInfo * field)
+    virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
     {
         UNSUPPORTED("SET fields");
     }
-    virtual bool processBeginDataset(const RtlFieldInfo * field)
+    virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
     {
         UNSUPPORTED("Nested datasets");
     }

+ 4 - 6
plugins/pyembed/pyembed.cpp

@@ -883,10 +883,6 @@ public:
         rtlQStrToStrX(charCount, text.refstr(), len, value);
         processString(charCount, text.getstr(), field);
     }
-    virtual void processSetAll(const RtlFieldInfo * field)
-    {
-        rtlFail(0, "pyembed: ALL sets are not supported");
-    }
     virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
     {
         size32_t sizeBytes = rtlUtf8Size(len, value);
@@ -895,12 +891,14 @@ public:
         addArg(vval);
     }
 
-    virtual bool processBeginSet(const RtlFieldInfo * field)
+    virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
     {
         push();
+        if (isAll)
+            rtlFail(0, "pyembed: ALL sets are not supported");
         return true;
     }
-    virtual bool processBeginDataset(const RtlFieldInfo * field)
+    virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
     {
         push();
         return true;

+ 5 - 6
plugins/v8embed/v8embed.cpp

@@ -289,23 +289,22 @@ public:
         rtlQStrToStrX(charCount, text.refstr(), len, value);
         processString(charCount, text.getstr(), field);
     }
-    virtual void processSetAll(const RtlFieldInfo * field)
-    {
-        rtlFail(0, "v8embed: ALL sets are not supported");
-    }
     virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
     {
         addProp(field, v8::String::New(value, rtlUtf8Size(len, value)));
     }
 
-    virtual bool processBeginSet(const RtlFieldInfo * field)
+    virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
     {
         push();
         inDataset = true;
+        if (isAll)
+            rtlFail(0, "v8embed: ALL sets are not supported");
+
         obj = v8::Array::New();
         return true;
     }
-    virtual bool processBeginDataset(const RtlFieldInfo * field)
+    virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
     {
         push();
         inDataset = true;

+ 58 - 11
rtl/eclrtl/rtlfield.cpp

@@ -56,6 +56,30 @@ static void queryNestedOuterXPath(StringAttr & ret, const RtlFieldInfo * field)
     ret.set(xpath, (size32_t)(sep-xpath));
 }
 
+//-------------------------------------------------------------------------------------------------------------------
+
+class DummyFieldProcessor : public CInterfaceOf<IFieldProcessor>
+{
+public:
+    virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field) {}
+    virtual void processBool(bool value, const RtlFieldInfo * field) {}
+    virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field) {}
+    virtual void processInt(__int64 value, const RtlFieldInfo * field) {}
+    virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field) {}
+    virtual void processReal(double value, const RtlFieldInfo * field) {}
+    virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field) {}
+    virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field) {}
+    virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field) {}
+    virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field) {}
+    virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field) {}
+
+    virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data) { return false; }
+    virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows) { return true; }
+    virtual bool processBeginRow(const RtlFieldInfo * field) { return true; }
+    virtual void processEndSet(const RtlFieldInfo * field) {}
+    virtual void processEndDataset(const RtlFieldInfo * field) {}
+    virtual void processEndRow(const RtlFieldInfo * field) {}
+};
 
 //-------------------------------------------------------------------------------------------------------------------
 
@@ -998,19 +1022,34 @@ size32_t RtlSetTypeInfo::process(const byte * self, const byte * selfrow, const
 {
     unsigned offset = sizeof(bool) + sizeof(size32_t);
     unsigned max = offset + rtlReadUInt4(self + sizeof(bool));
-    if (target.processBeginSet(field))
+    unsigned elements = 0;
+    if (!*(bool *)self)
     {
-        if (*(bool *)self)
-            target.processSetAll(field);
+        unsigned tempOffset = sizeof(bool) + sizeof(size32_t);
+        if (child->isFixedSize())
+        {
+            unsigned elemSize = child->size(NULL, NULL);
+            elements = (max-offset) / elemSize;
+            assert(elements*elemSize == max-offset);
+        }
         else
         {
-            while (offset < max)
+            DummyFieldProcessor dummy;
+            while (tempOffset < max)
             {
-                offset += child->process(self+offset, selfrow, field, target);
+                tempOffset += child->process(self+tempOffset, selfrow, field, dummy);  // NOTE - good thing we can't have a set of sets, or this would recurse
+                elements++;
             }
         }
-        target.processEndSet(field);
     }
+    if (target.processBeginSet(field, elements, *(bool *)self, self+offset))
+    {
+        while (offset < max)
+        {
+            offset += child->process(self+offset, selfrow, field, target);
+        }
+    }
+    target.processEndSet(field);
     return max;
 }
 
@@ -1132,9 +1171,9 @@ size32_t RtlDatasetTypeInfo::process(const byte * self, const byte * selfrow, co
 {
     if (isLinkCounted())
     {
-        if (target.processBeginDataset(field))
+        size32_t thisCount = rtlReadUInt4(self);
+        if (target.processBeginDataset(field, thisCount))
         {
-            size32_t thisCount = rtlReadUInt4(self);
             const byte * * rows = *reinterpret_cast<const byte * * const *>(self + sizeof(size32_t));
             for (unsigned i= 0; i < thisCount; i++)
             {
@@ -1149,7 +1188,15 @@ size32_t RtlDatasetTypeInfo::process(const byte * self, const byte * selfrow, co
     {
         unsigned offset = sizeof(size32_t);
         unsigned max = offset + rtlReadUInt4(self);
-        if (target.processBeginDataset(field))
+        unsigned thisCount = 0;
+        DummyFieldProcessor dummy;
+        while (offset < max)
+        {
+            offset += child->process(self+offset, self+offset, field, dummy);
+            thisCount++;
+        }
+        offset = sizeof(size32_t);
+        if (target.processBeginDataset(field, thisCount))
         {
             while (offset < max)
             {
@@ -1246,9 +1293,9 @@ size32_t RtlDictionaryTypeInfo::process(const byte * self, const byte * selfrow,
 {
     if (isLinkCounted())
     {
-        if (target.processBeginDataset(field))
+        size32_t thisCount = rtlReadUInt4(self);
+        if (target.processBeginDataset(field, thisCount))
         {
-            size32_t thisCount = rtlReadUInt4(self);
             const byte * * rows = *reinterpret_cast<const byte * * const *>(self + sizeof(size32_t));
             for (unsigned i= 0; i < thisCount; i++)
             {

+ 2 - 3
rtl/include/eclhelper.hpp

@@ -194,13 +194,12 @@ public:
     virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field) = 0;
     virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field) = 0;
     virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field) = 0;
-    virtual void processSetAll(const RtlFieldInfo * field) = 0;
     virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field) = 0;
     inline  void processCString(const char *value, const RtlFieldInfo * field) { processString((size32_t)strlen(value), value, field); }
 
 //The following are used process the structured fields
-    virtual bool processBeginSet(const RtlFieldInfo * field) = 0;
-    virtual bool processBeginDataset(const RtlFieldInfo * field) = 0;
+    virtual bool processBeginSet(const RtlFieldInfo * field, unsigned elements, bool isAll, const byte *data) = 0;
+    virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned rows) = 0;
     virtual bool processBeginRow(const RtlFieldInfo * field) = 0;           // either in a dataset, or nested
     virtual void processEndSet(const RtlFieldInfo * field) = 0;
     virtual void processEndDataset(const RtlFieldInfo * field) = 0;

+ 43 - 0
testing/regress/ecl/javaimport.ecl

@@ -9,6 +9,30 @@ real jfadd(real4 a, real4 b) := IMPORT(java, 'JavaCat.fadd:(FF)F');
 real jdadd(real a, real b) := IMPORT(java, 'JavaCat.dadd:(DD)D');
 real jdaddD(real a, real b) := IMPORT(java, 'JavaCat.daddD:(DD)Ljava/lang/Double;');
 
+nrec := record
+  utf8 ufield;
+end;
+
+jret := RECORD
+  boolean bfield;
+  integer4 ifield;
+  integer8 lfield;
+  real8 dfield;
+  real4 ffield;
+  string1 cfield1;
+  string1 cfield2;
+  string sfield;
+  nrec n;
+  set of boolean bset;
+  set of data dset;
+  set of string sset;
+  LINKCOUNTED DATASET(nrec) sub;
+end;
+
+/*
+jret jreturnrec(boolean b, integer i, real8 d) := IMPORT(java, 'JavaCat.returnrec:(ZID)LJavaCat;');
+STRING jpassrec(jret r) := IMPORT(java, 'JavaCat.passrec:(LJavaCat;)Ljava/lang/String;');
+
 jcat('Hello ', 'world!');
 jadd(1,2);
 jaddl(3,4);
@@ -17,3 +41,22 @@ jaddi(5,6);
 jfadd(1,2);
 jdadd(3,4);
 jdaddD(5,6);
+ret := jreturnrec(false, 10, 2.345);
+ret;
+jpassrec(ret);
+*/
+
+DATASET(jret) passDataset2(LINKCOUNTED DATASET(jret) d) := IMPORT(java, 'JavaCat.passDataset2:([LJavaCat;)Ljava/util/Iterator;');
+
+ds := DATASET(
+  [
+     {true, 1,2,3,4,'a', 'b', 'cd', u'ef', [true,false], [], ['Hello from ECL'], [{'1'},{'2'},{'3'},{'4'},{'5'}]}
+    ,{true, 2,4,3,4,'a', 'b', 'cd', u'ef', [true,false], [], [], []}
+    ,{true, 3,6,3,4,'a', 'b', 'cd', u'ef', [true,false], [], [], []}
+    ,{true, 8,8,3,4,'a', 'b', 'cd', u'ef', [true,false], [d'AA55'], [], []}
+  ], jret);
+
+transform(jret) testTransform(jret in, integer lim) := IMPORT(java, 'JavaCat.transform:(LJavaCat;I)LJavaCat;');
+
+output(passDataset2(ds));
+output(project(ds, testTransform(LEFT, COUNTER)));