瀏覽代碼

HPCC-22148 Add ActivityContext support for embedded Java code

Note - we can't use activity flag with threadlocal attribute. We COULD think
about changing how/where activity is passed to address that, but unless/until
we do, we suppress threadlocal usage if activity specified.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 6 年之前
父節點
當前提交
5845bb6083

+ 3 - 2
ecl/hqlcpp/hqlcpp.cpp

@@ -12068,8 +12068,9 @@ void HqlCppTranslator::buildScriptFunctionDefinition(BuildCtx &ctx, IHqlExpressi
     else
         embedOptions.setown(getEmbedOptionString(bodyCode));
     IValue *optionVal = embedOptions->queryValue();
+    bool isActivity = functionBodyIsActivity(bodyCode);
 
-    bool threadlocal = bodyCode->hasAttribute(_threadlocal_Atom) && queryVal != nullptr && optionVal != nullptr;
+    bool threadlocal = bodyCode->hasAttribute(_threadlocal_Atom) && queryVal != nullptr && optionVal != nullptr && !isActivity;
     bool singletonEmbedContext = bodyCode->hasAttribute(_singletonEmbedContext_Atom);
     HqlExprArray noargs;
     OwnedHqlExpr getPlugin = bindFunctionCall(language, noargs);
@@ -12093,7 +12094,7 @@ void HqlCppTranslator::buildScriptFunctionDefinition(BuildCtx &ctx, IHqlExpressi
     else
         createParam.append("Owned<IEmbedFunctionContext> __ctx = __plugin->createFunctionContextEx(ctx,");
 
-    if (functionBodyIsActivity(bodyCode))
+    if (isActivity)
         createParam.append("activity,");
     else
         createParam.append("nullptr,");

+ 32 - 0
plugins/javaembed/ActivityContext.java

@@ -0,0 +1,32 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems(R).
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+package com.HPCCSystems;
+
+/**
+ * This interface may be passed as the first parameter to an embedded function specified with the activity attribute.
+ * It allows the activity to determine if it is being executed in a child query, is stranded and other useful information.
+*/
+
+public interface ActivityContext
+{
+    public boolean isLocal();
+    public int numSlaves();
+    public int numStrands();
+    public int querySlave(); // 0 based 0..numSlaves-1
+    public int queryStrand(); // 0 based 0..numStrands-1
+}

+ 6 - 2
plugins/javaembed/CMakeLists.txt

@@ -50,8 +50,12 @@ if(JAVAEMBED)
             ${CMAKE_BINARY_DIR}
             ${CMAKE_BINARY_DIR}/oss)
 
-        set(java_sources HpccClassLoader.java HpccUtils.java)
-        set(java_classes ${CMAKE_CURRENT_BINARY_DIR}/com/HPCCSystems/HpccClassLoader.class ${CMAKE_CURRENT_BINARY_DIR}/com/HPCCSystems/HpccUtils.class)
+        set(java_sources HpccClassLoader.java
+                         HpccUtils.java
+                         ActivityContext.java)
+        set(java_classes ${CMAKE_CURRENT_BINARY_DIR}/com/HPCCSystems/HpccClassLoader.class
+                         ${CMAKE_CURRENT_BINARY_DIR}/com/HPCCSystems/HpccUtils.class
+                         ${CMAKE_CURRENT_BINARY_DIR}/com/HPCCSystems/ActivityContext.class)
         add_custom_command ( OUTPUT ${java_classes}
             COMMAND ${Java_JAVAC_EXECUTABLE} ${java_sources} -d ${CMAKE_CURRENT_BINARY_DIR} -source 1.8 -target 1.8
             DEPENDS ${java_sources}

+ 14 - 1
plugins/javaembed/HpccUtils.java

@@ -18,11 +18,17 @@
 package com.HPCCSystems;
 
 import java.util.*;
-public class HpccUtils  implements Iterator
+public class HpccUtils  implements Iterator, ActivityContext
 {
     private long handle;
     private native static boolean _hasNext(long handle);
     private native static java.lang.Object _next(long handle);
+    private native static boolean _isLocal(long handle);
+    private native static int _numSlaves(long handle);
+    private native static int _numStrands(long handle);
+    private native static int _querySlave(long handle);
+    private native static int _queryStrand(long handle);
+    
 
     public HpccUtils(long _handle, String dllname)
     {
@@ -43,4 +49,11 @@ public class HpccUtils  implements Iterator
            throw new NoSuchElementException();
         return ret;
     }
+    
+    public boolean isLocal() { return _isLocal(handle); }
+    public int numSlaves() { return _numSlaves(handle); }
+    public int numStrands() { return _numStrands(handle); }
+    public int querySlave() { return _querySlave(handle); }
+    public int queryStrand() { return _queryStrand(handle); }
+    
 }

+ 75 - 16
plugins/javaembed/javaembed.cpp

@@ -652,8 +652,8 @@ static jclass customLoaderClass;
 static jmethodID clc_newInstance;
 static jmethodID clc_getSignature;
 static jclass hpccIteratorClass;
-static jclass utilIteratorClass;
 static jmethodID hi_constructor;
+static jclass utilIteratorClass;
 
 static jclass systemClass;
 static jmethodID system_gc;
@@ -808,6 +808,21 @@ public:
 // EnableSEHtoExceptionMapping
 //
 
+static StringBuffer &appendClassPath(StringBuffer &classPath)
+{
+    const IProperties &conf = queryEnvironmentConf();
+    if (conf.hasProp("classpath"))
+    {
+        conf.getProp("classpath", classPath);
+        classPath.append(ENVSEPCHAR);
+    }
+    else
+    {
+        classPath.append(INSTALL_DIR).append(PATHSEPCHAR).append("classes").append(ENVSEPCHAR);
+    }
+    return classPath;
+}
+
 static class JavaGlobalState
 {
 public:
@@ -823,19 +838,11 @@ public:
         {
             newPath.append(origPath).append(ENVSEPCHAR);
         }
-        const IProperties &conf = queryEnvironmentConf();
-        if (conf.hasProp("classpath"))
-        {
-            conf.getProp("classpath", newPath);
-            newPath.append(ENVSEPCHAR);
-        }
-        else
-        {
-            newPath.append(INSTALL_DIR).append(PATHSEPCHAR).append("classes").append(ENVSEPCHAR);
-        }
+        appendClassPath(newPath);
         newPath.append(".");
         optionStrings.append(newPath);
 
+        const IProperties &conf = queryEnvironmentConf();
         if (conf.hasProp("jvmlibpath"))
         {
             StringBuffer libPath;
@@ -3069,8 +3076,8 @@ private:
 class JavaEmbedImportContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
-    JavaEmbedImportContext(ICodeContext *codeCtx, JavaThreadContext *_sharedCtx, jobject _instance, unsigned flags, const char *options)
-    : sharedCtx(_sharedCtx), JNIenv(sharedCtx->JNIenv), instance(_instance)
+    JavaEmbedImportContext(ICodeContext *codeCtx, JavaThreadContext *_sharedCtx, jobject _instance, unsigned flags, const char *options, const IThorActivityContext *_activityContext)
+    : sharedCtx(_sharedCtx), JNIenv(sharedCtx->JNIenv), instance(_instance), activityContext(_activityContext)
     {
         argcount = 0;
         argsig = NULL;
@@ -3977,6 +3984,18 @@ public:
         if (javaClass)
             reinit();
     }
+    void bindActivityParam()
+    {
+        // Note: We don't require that the function takes an activityCtx parameter - if they don't care, they can omit the param
+        if (strncmp(argsig, "Lcom/HPCCSystems/ActivityContext;", 33) == 0)
+        {
+            argsig += 33;
+            jvalue v;
+            v.l = JNIenv->NewObject(hpccIteratorClass, hi_constructor, activityContext, JNIenv->NewStringUTF(helperLibraryName));
+            addArg(v);
+        }
+
+    }
 
     IException *translateException(IException *E)
     {
@@ -4434,6 +4453,7 @@ protected:
     IArrayOf<ECLDatasetIterator> iterators;   // to make sure they get freed
     bool nonStatic = false;
     jobject instance = nullptr; // class instance of object to call methods on
+    const IThorActivityContext *activityContext = nullptr;
 
     unsigned nodeNum = 0;
     StringAttr globalScopeKey;
@@ -4460,6 +4480,8 @@ protected:
         argsig = signature;
         assertex(*argsig == '(');
         argsig++;
+        if (activityContext)
+            bindActivityParam();
     }
 };
 
@@ -4539,7 +4561,7 @@ public:
     {
         if (!object)
             return NULL;
-        Owned<JavaEmbedImportContext> fctx = new JavaEmbedImportContext(nullptr, queryContext(), object, 0, options);
+        Owned<JavaEmbedImportContext> fctx = new JavaEmbedImportContext(nullptr, queryContext(), object, 0, options, nullptr);
         fctx->importFunction(rtlUtf8Length(strlen(function), function), function);
         return fctx.getClear();
     }
@@ -4562,7 +4584,7 @@ public:
     }
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, const IThorActivityContext *activityCtx, unsigned flags, const char *options) override
     {
-        return new JavaEmbedImportContext(ctx, queryContext(), nullptr, flags, options);
+        return new JavaEmbedImportContext(ctx, queryContext(), nullptr, flags, options, activityCtx);
     }
     virtual IEmbedServiceContext *createServiceContext(const char *service, unsigned flags, const char *options) override
     {
@@ -4759,7 +4781,11 @@ void doPrecompile(size32_t & __lenResult, void * & __result, const char *funcNam
 
     MemoryBuffer result;
     Owned<IPipeProcess> pipe = createPipeProcess();
-    VStringBuffer javac("javac %s %s", isEmptyString(compilerOptions) ? "-g:none" : compilerOptions, javafile.str());
+    StringBuffer options(compilerOptions);
+    if (isEmptyString(compilerOptions))
+        options.append("-g:none");
+    appendClassPath(options.append(" -cp "));
+    VStringBuffer javac("javac %s %s", options.str(), javafile.str());
     if (!pipe->run("javac", javac, tmpDirName, false, false, true, 0, false))
     {
         throw makeStringException(0, "Failed to run javac");
@@ -4862,6 +4888,12 @@ extern "C" {
 JNIEXPORT jboolean JNICALL Java_com_HPCCSystems_HpccUtils__1hasNext (JNIEnv *, jclass, jlong);
 JNIEXPORT jobject JNICALL Java_com_HPCCSystems_HpccUtils__1next (JNIEnv *, jclass, jlong);
 JNIEXPORT jclass JNICALL Java_com_HPCCSystems_HpccClassLoader_defineClassForEmbed(JNIEnv *env, jobject loader, jint bytecodeLen, jlong bytecode, jstring name);
+
+JNIEXPORT jboolean JNICALL Java_com_HPCCSystems_HpccUtils__1isLocal (JNIEnv *, jclass, jlong);
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1numSlaves (JNIEnv *, jclass, jlong);
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1numStrands (JNIEnv *, jclass, jlong);
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1querySlave (JNIEnv *, jclass, jlong);
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1queryStrand (JNIEnv *, jclass, jlong);
 }
 
 JNIEXPORT jboolean JNICALL Java_com_HPCCSystems_HpccUtils__1hasNext (JNIEnv *JNIenv, jclass, jlong proxy)
@@ -4929,6 +4961,33 @@ JNIEXPORT jclass JNICALL Java_com_HPCCSystems_HpccClassLoader_defineClassForEmbe
 
 }
 
+JNIEXPORT jboolean JNICALL Java_com_HPCCSystems_HpccUtils__1isLocal(JNIEnv *JNIenv, jclass, jlong proxy)
+{
+    const IThorActivityContext *a = (IThorActivityContext *) proxy;
+    return a->isLocal();
+}
+
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1numSlaves(JNIEnv *JNIenv, jclass, jlong proxy)
+{
+    const IThorActivityContext *a = (IThorActivityContext *) proxy;
+    return a->numSlaves();
+}
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1numStrands(JNIEnv *JNIenv, jclass, jlong proxy)
+{
+    const IThorActivityContext *a = (IThorActivityContext *) proxy;
+    return a->numStrands();
+}
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1querySlave(JNIEnv *JNIenv, jclass, jlong proxy)
+{
+    const IThorActivityContext *a = (IThorActivityContext *) proxy;
+    return a->querySlave();
+}
+JNIEXPORT jint JNICALL Java_com_HPCCSystems_HpccUtils__1queryStrand(JNIEnv *JNIenv, jclass, jlong proxy)
+{
+    const IThorActivityContext *a = (IThorActivityContext *) proxy;
+    return a->queryStrand();
+}
+
 // Used for dynamically loading in ESDL
 
 extern "C" DECL_EXPORT IEmbedContext *getEmbedContextDynamic()

+ 108 - 0
testing/regress/ecl/java-activity.ecl

@@ -0,0 +1,108 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=embedded
+//class=3rdparty
+
+IMPORT Java, Std;
+ 
+kString := RECORD
+    INTEGER4 id;
+    STRING text;
+END;
+ 
+// First test that the activity context parameter can be used
+
+DATASET(kString) withParam(STREAMED DATASET(kString) inp, integer4 slaveNo, integer4 numSlaves) := EMBED(Java: activity)
+import java.util.*;
+import com.HPCCSystems.ActivityContext;
+public class myClass {
+
+  public static class kString {
+    String text;
+    int id;
+    public kString() {}
+    public kString(String _text, int _id)
+    {
+      text = _text;
+      id = _id;
+    }
+  };
+
+  public static Iterator<kString> withParam(ActivityContext ctx, Iterator<kString> in, int slaveNo, int numSlaves)
+  {
+    List<kString> list = new ArrayList<>();
+    while (in.hasNext())
+    {
+        list.add(in.next());
+    }
+    if (slaveNo==0)
+    {
+        list.add(new kString("withParam", slaveNo));
+        list.add(new kString("ctx.numSlaves-numslaves", ctx.numSlaves()- numSlaves));  // SHould be 0
+        list.add(new kString("ctx.numStrands", ctx.numStrands()));
+        list.add(new kString("ctx.querySlave", ctx.querySlave()));
+        list.add(new kString("ctx.queryStrand", ctx.queryStrand()));
+        list.add(new kString("ctx.isLocal", ctx.isLocal() ? 1 : 0));
+    }
+    else if (ctx.querySlave() != slaveNo)
+        list.add(new kString("Unexpected ctx.querySlave value", ctx.querySlave()));
+    
+    return list.iterator();
+  }
+}
+ENDEMBED;
+
+// Also test that you don't need to include it if you don't want to
+ 
+DATASET(kString) withoutParam(STREAMED DATASET(kString) inp, integer4 slaveNo, integer4 numSlaves) := EMBED(Java: activity)
+import java.util.*;
+public class myClass {
+
+  public static class kString {
+    String text;
+    int id;
+    public kString() {}
+    public kString(String _text, int _id)
+    {
+      text = _text;
+      id = _id;
+    }
+  };
+
+  public static Iterator<kString> withoutParam(kString in[], int slaveNo, int numSlaves)
+  {
+    List<kString> list = new ArrayList<>();
+    for (kString inrec : in)
+    {
+        list.add(inrec);
+    }
+    if (slaveNo==0)
+        list.add(new kString("withoutParam", slaveNo));
+    return list.iterator();
+  }
+}
+ENDEMBED;
+
+testInput := DISTRIBUTE(DATASET([{88, 'Input'}], kString), 0);    // Data on node 1 only
+
+o0 := output(testInput);
+o1 := output(withParam(testInput, Std.System.thorlib.node(), Std.System.thorlib.nodes()));
+o2 := output(withoutParam(testInput, Std.System.thorlib.node(), Std.System.thorlib.nodes()));
+o3 := output(withParam(testInput, Std.System.thorlib.node(), Std.System.thorlib.nodes()));
+ 
+SEQUENTIAL(o0, o1, o2, o3);

+ 25 - 0
testing/regress/ecl/key/java-activity.xml

@@ -0,0 +1,25 @@
+<Dataset name='Result 1'>
+ <Row><id>88</id><text>Input</text></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><id>88</id><text>Input</text></Row>
+ <Row><id>0</id><text>withParam</text></Row>
+ <Row><id>0</id><text>ctx.numSlaves-numslaves</text></Row>
+ <Row><id>1</id><text>ctx.numStrands</text></Row>
+ <Row><id>0</id><text>ctx.querySlave</text></Row>
+ <Row><id>0</id><text>ctx.queryStrand</text></Row>
+ <Row><id>0</id><text>ctx.isLocal</text></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><id>88</id><text>Input</text></Row>
+ <Row><id>0</id><text>withoutParam</text></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><id>88</id><text>Input</text></Row>
+ <Row><id>0</id><text>withParam</text></Row>
+ <Row><id>0</id><text>ctx.numSlaves-numslaves</text></Row>
+ <Row><id>1</id><text>ctx.numStrands</text></Row>
+ <Row><id>0</id><text>ctx.querySlave</text></Row>
+ <Row><id>0</id><text>ctx.queryStrand</text></Row>
+ <Row><id>0</id><text>ctx.isLocal</text></Row>
+</Dataset>