Преглед изворни кода

HPCC-21671 Embedded Java persist semantics tweaks

Also allow signature to be optional on all java imports, not just when object
supplied.

Add new persist mode 'Channel'.

Also fixed issues that would have caused cores with Java services

Add check for valid objects when using "constructor" model.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 6 година
родитељ
комит
228cc77cc2

+ 6 - 2
plugins/javaembed/HpccClassLoader.java

@@ -22,6 +22,7 @@ import java.util.Hashtable;
 import java.util.List;
 import java.util.ArrayList;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.lang.Throwable;
 
 public class HpccClassLoader extends java.lang.ClassLoader
@@ -90,7 +91,10 @@ public class HpccClassLoader extends java.lang.ClassLoader
     
     public static String getSignature(Method m)
     {
-        StringBuilder sb = new StringBuilder("(");
+        StringBuilder sb = new StringBuilder();
+        if ((m.getModifiers() & Modifier.STATIC) == 0)
+            sb.append('@');
+        sb.append('(');
         for(Class<?> c : m.getParameterTypes())
         { 
             String sig=java.lang.reflect.Array.newInstance(c, 0).toString();
@@ -114,7 +118,7 @@ public class HpccClassLoader extends java.lang.ClassLoader
         Method[] methods = clazz.getMethods();
         for (Method m : methods)
         {
-            if (m.getName().equals(simpleName))
+            if ((m.getModifiers() & Modifier.PUBLIC) != 0 && m.getName().equals(simpleName))
             {
                 if (ret == null)
                     ret = getSignature(m);

+ 112 - 111
plugins/javaembed/javaembed.cpp

@@ -49,6 +49,7 @@ static const char *version = "Java Embed Helper 1.0.0";
 //#define TRACE_GLOBALREF
 //#define TRACE_CLASSFILE
 //#define CHECK_JNI
+//#define FORCE_GC
 /* Note - if you enable CHECK_JNI and see output like:
  *   WARNING in native method: JNI call made without checking exceptions when required to from CallObjectMethodV
  * where for 'from' may be any of several functions, then the cause is likely to be a missing call to checkException()
@@ -552,13 +553,14 @@ public:
     using JNIEnv::DeleteLocalRef;
     using JNIEnv::ExceptionClear;
     using JNIEnv::ExceptionCheck;
+    using JNIEnv::GetObjectRefType;
 
 #ifdef TRACE_GLOBALREF
     void DeleteGlobalRef(jobject val)
     {
         DBGLOG("DeleteGlobalRef %p", val);
         JNIEnv::DeleteGlobalRef(val);
-#ifdef CHECK_JNI
+#ifdef FORCE_GC
         forceGC(this);
 #endif
     }
@@ -934,7 +936,7 @@ public:
         if (p && p->instance)
         {
             queryJNIEnv()->DeleteGlobalRef(p->instance);
-#ifdef CHECK_JNI
+#ifdef FORCE_GC
             forceGC(queryJNIEnv());
 #endif
         }
@@ -1000,6 +1002,7 @@ enum PersistMode
 {
     persistNone,
     persistThread,
+    persistChannel,
     persistWorkunit,
     persistQuery,
     persistGlobal
@@ -1019,6 +1022,8 @@ static PersistMode getPersistMode(const char *val, StringAttr &globalScope)
         return persistNone;
     else if (strieq(val, "thread"))
         return persistThread;
+    else if (strieq(val, "channel"))
+        return persistChannel;
     else if (strieq(val, "workunit"))
         return persistWorkunit;
     else if (strieq(val, "query"))
@@ -3036,15 +3041,17 @@ public:
     JavaEmbedImportContext(ICodeContext *codeCtx, JavaThreadContext *_sharedCtx, jobject _instance, unsigned flags, const char *options)
     : sharedCtx(_sharedCtx), JNIenv(sharedCtx->JNIenv), instance(_instance)
     {
-        if (instance)
-            instance = JNIenv->NewGlobalRef(instance, "instance");
         argcount = 0;
         argsig = NULL;
         nonStatic = (instance != nullptr);
         javaClass = nullptr;
         StringArray opts;
         opts.appendList(options, ",");
-        engine = codeCtx->queryEngineContext();
+        if (codeCtx)
+        {
+            engine = codeCtx->queryEngineContext();
+            nodeNum = codeCtx->getNodeNum();
+        }
         StringBuffer lclassPath;
         if (engine)
         {
@@ -3064,6 +3071,8 @@ public:
                 val++;
                 if (stricmp(optName, "classpath")==0)
                     lclassPath.append(';').append(val);
+                if (strieq(optName, "globalscope"))
+                    globalScopeKey.set(val);
                 else if (strieq(optName, "persist"))
                 {
                     if (persistMode != persistNone)
@@ -3071,6 +3080,7 @@ public:
                     persistMode = getPersistMode(val, globalScopeKey);
                     switch (persistMode)
                     {
+                    case persistChannel:
                     case persistWorkunit:
                     case persistQuery:
                         if (!engine)
@@ -3091,8 +3101,6 @@ public:
     {
         if (javaClass)
             JNIenv->DeleteGlobalRef(javaClass);
-        if (instance)
-            JNIenv->DeleteGlobalRef(instance);
         if (classLoader)
             JNIenv->DeleteGlobalRef(classLoader);
     }
@@ -3173,29 +3181,8 @@ public:
     }
     virtual unsigned __int64 getUnsignedResult()
     {
-        if (*returnType=='L')
-            return (unsigned __int64) JNIenv->NewGlobalRef(result.l, "return");
-        else if (*returnType=='V' && strieq(methodName, "<init>"))
-        {
-            jobject thisObject = JNIenv->NewGlobalRef(result.l, "thisobject");
-            switch (persistMode)
-            {
-            case persistThread:
-                instance = thisObject;  // Means we free the object automatically when this thread is done
-                break;
-            case persistWorkunit:
-            case persistQuery:
-                // Register this object to be removed automatically at end of specified scope...
-                assertex(engine);
-                VStringBuffer scopeKey("O.%p", thisObject);
-                PersistedObjectCriticalBlock persistBlock;
-                persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
-                assertex(!persistBlock.getInstance());
-                engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode==persistWorkunit);
-                persistBlock.leave(thisObject);
-            }
-            return (unsigned __int64) thisObject;
-        }
+        if (*returnType=='V' && strieq(methodName, "<init>"))
+            return (unsigned __int64) result.l;
         throw makeStringExceptionV(MSGAUD_user, 0, "javaembed: In method %s: Unsigned results not supported", queryReportName()); // Java doesn't support unsigned
     }
     virtual void getStringResult(size32_t &__len, char * &__result)
@@ -3529,10 +3516,9 @@ public:
         {
             if (!val)
                 throw MakeStringException(MSGAUD_user, 0, "javaembed: In method %s: Null value passed for \"this\"", queryReportName());
-            if (importName[0]=='~')
-                instance = (jobject) val;  // Should ensure it gets released at end of function
-            else
-                instance = JNIenv->NewGlobalRef((jobject) val, "instanceParam");
+            instance = (jobject) val;
+            if (JNIenv->GetObjectRefType(instance) != JNIGlobalRefType)
+                throw MakeStringException(MSGAUD_user, 0, "javaembed: In method %s: Invalid value passed for \"this\"", queryReportName());
             jclass newJavaClass = JNIenv->GetObjectClass(instance);
             if (!JNIenv->IsSameObject(newJavaClass, javaClass))
             {
@@ -3953,38 +3939,58 @@ public:
             JNIenv->ExceptionClear();
             if (nonStatic)
             {
-                if (!instance)
+                if (streq(methodName, "<init>"))
                 {
-                    if (streq(methodName, "<init>"))
+                    if (!instance)
                     {
-                        result.l = JNIenv->NewObjectA(javaClass, javaMethodID, args);
-                        return;
+                        if (persistMode == persistNone)
+                            throw MakeStringException(0, "Cannot return object without persist");
+                        StringBuffer scopeKey;
+                        getScopeKey(scopeKey);
+                        PersistedObjectCriticalBlock persistBlock;
+                        persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
+                        instance = persistBlock.getInstance();
+                        if (instance)
+                            persistBlock.leave();
+                        else
+                        {
+                            instance = JNIenv->NewGlobalRef(JNIenv->NewObjectA(javaClass, javaMethodID, args), "constructor");
+#ifdef TRACE_GLOBALREF
+                            StringBuffer myClassName;
+                            getClassNameForObject(JNIenv, myClassName, instance);
+                            DBGLOG("Constructed object %p of class %s", instance, myClassName.str());
+#endif
+                            if (persistMode==persistQuery || persistMode==persistWorkunit || persistMode==persistChannel)
+                            {
+                                assertex(engine);
+                                engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode!=persistQuery);
+                            }
+                            persistBlock.leave(instance);
+                        }
                     }
-                    assertex(persistMode == persistNone);
-                    instance = createInstance();
+                    result.l = instance;
+                    return;
                 }
-                if (javaMethodID)
+                else if (!instance)
                 {
-                    switch (*returnType)
-                    {
-                    case 'C': result.c = JNIenv->CallCharMethodA(instance, javaMethodID, args); break;
-                    case 'Z': result.z = JNIenv->CallBooleanMethodA(instance, javaMethodID, args); break;
-                    case 'J': result.j = JNIenv->CallLongMethodA(instance, javaMethodID, args); break;
-                    case 'F': result.f = JNIenv->CallFloatMethodA(instance, javaMethodID, args); break;
-                    case 'D': result.d = JNIenv->CallDoubleMethodA(instance, javaMethodID, args); break;
-                    case 'I': result.i = JNIenv->CallIntMethodA(instance, javaMethodID, args); break;
-                    case 'S': result.s = JNIenv->CallShortMethodA(instance, javaMethodID, args); break;
-                    case 'B': result.s = JNIenv->CallByteMethodA(instance, javaMethodID, args); break;
-                    case '[':
-                    case 'L': result.l = JNIenv->CallObjectMethodA(instance, javaMethodID, args); break;
-                    case 'V': JNIenv->CallVoidMethodA(instance, javaMethodID, args); result.l = nullptr; break;
-                    default: throwUnexpected();
-                    }
+                    assertex(persistMode == persistNone); // Any other persist mode should have already created the instance
+                    instance = createInstance();          // Local object, will be released at exit() from function
                 }
-                else
+                assertex(javaMethodID);
+                switch (*returnType)
                 {
-                    assertex(methodName[0]=='~');
-                    result.l = 0;
+                case 'C': result.c = JNIenv->CallCharMethodA(instance, javaMethodID, args); break;
+                case 'Z': result.z = JNIenv->CallBooleanMethodA(instance, javaMethodID, args); break;
+                case 'J': result.j = JNIenv->CallLongMethodA(instance, javaMethodID, args); break;
+                case 'F': result.f = JNIenv->CallFloatMethodA(instance, javaMethodID, args); break;
+                case 'D': result.d = JNIenv->CallDoubleMethodA(instance, javaMethodID, args); break;
+                case 'I': result.i = JNIenv->CallIntMethodA(instance, javaMethodID, args); break;
+                case 'S': result.s = JNIenv->CallShortMethodA(instance, javaMethodID, args); break;
+                case 'B': result.s = JNIenv->CallByteMethodA(instance, javaMethodID, args); break;
+                case '[':
+                case 'L': result.l = JNIenv->CallObjectMethodA(instance, javaMethodID, args); break;
+                case 'V': JNIenv->CallVoidMethodA(instance, javaMethodID, args); result.l = nullptr; break;
+                default: throwUnexpected();
                 }
             }
             else
@@ -4042,13 +4048,10 @@ public:
     }
     virtual void exit() override
     {
-        if (persistMode == persistNone && instance != nullptr)
-        {
-            JNIenv->DeleteGlobalRef(instance);
-            instance = nullptr;
-        }
+        if (persistMode==persistNone)
+            instance = 0;  // otherwise we leave it for next call as it saves a lot of time looking it up
         JNIenv->PopLocalFrame(nullptr);
-#ifdef CHECK_JNI
+#ifdef FORCE_GC
         forceGC(JNIenv);
 #endif
     }
@@ -4151,28 +4154,25 @@ protected:
             Owned<IException> e = E;
             throw MakeStringException(0, "parameterless constructor required");
         }
-        return JNIenv->NewGlobalRef(JNIenv->NewObject(javaClass, constructor), "createInstance");
+        return JNIenv->NewObject(javaClass, constructor);
     }
 
     void loadFunction(const char *classpath, size32_t bytecodeLen, const byte *bytecode)
     {
         try
         {
-            StringBuffer classname;
             StringAttr checkedClassName;
+            nonStatic = true;   // until proven otherwise
             // Name should be in the form class.method:signature
             const char *funcname = strrchr(importName, '.');
             if (funcname)
             {
-                classname.append(funcname-importName, importName);
+                classname.clear().append(funcname-importName, importName);
                 classname.replace('/', '.');
                 funcname++;  // skip the '.'
             }
             else
-            {
-                nonStatic = true;  // we assume we are going to call a method of a cached object - and we will get the class from that
                 funcname = importName;
-            }
             const char *coloncolon = strstr(funcname, "::");
             if (coloncolon)
             {
@@ -4186,10 +4186,9 @@ protected:
                 methodName.set(funcname, sig-funcname);
                 sig++; // skip the ':'
                 if (*sig == '@') // indicates a non-static method
-                {
-                    nonStatic = true;
                     sig++;
-                }
+                else
+                    nonStatic = false;
                 signature.set(sig);
             }
             else
@@ -4198,9 +4197,10 @@ protected:
             {
                 PersistedObjectCriticalBlock persistBlock;
                 StringBuffer scopeKey;
-                if (nonStatic && !instance && persistMode > persistThread && !isConstructor) // MORE - there may be a persist mode between Thread and Wuid, meaning shared between multiple on this thread
+                if (nonStatic && !instance && persistMode >= persistThread && !isConstructor)
                 {
-                    // If the persist scope is global, query, or workunit, we may want to use a pre-existing object. If we do we share its classloader, class, etc.
+                    // If a persist scope is specified, we may want to use a pre-existing object. If we do we share its classloader, class, etc.
+                    assertex(classname.length());  // MORE - what does this imply?
                     getScopeKey(scopeKey);
                     persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
                     instance = persistBlock.getInstance();
@@ -4232,45 +4232,51 @@ protected:
                     }
                     javaClass = (jclass) JNIenv->NewGlobalRef(javaClass, "javaClass");
                 }
-                if (nonStatic && !instance && !isConstructor && persistMode != persistNone)
+                if (nonStatic && !instance && !isConstructor  && persistMode != persistNone)
                 {
                     instance = createInstance();
-                    if (persistBlock.locked())
+#ifdef TRACE_GLOBALREF
+                    StringBuffer myClassName;
+                    getClassNameForObject(JNIenv, myClassName, instance);
+                    DBGLOG("Created object %p of class %s", instance, myClassName.str());
+#endif
+                    if (persistBlock.locked()) // I think this should always be true?
                     {
-                        if (persistMode==persistQuery || persistMode==persistWorkunit)
+                        instance = JNIenv->NewGlobalRef(instance, "createInstance");
+                        if (persistMode==persistQuery || persistMode==persistWorkunit || persistMode==persistChannel)
                         {
                             assertex(engine);
-                            engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode==persistWorkunit);
+                            engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode!=persistQuery);
                         }
-                        persistBlock.leave(JNIenv->NewGlobalRef(instance, "persistBlockLeave"));
+                        persistBlock.leave(instance);
                     }
                 }
             }
-            if (methodName[0]=='~')
-            {
-                if (!instance)
-                    throw MakeStringException(0, "~ invalid without instance");
-                if (!checkedClassName)
-                    checkedClassName.set(methodName+1);
-                signature.set("()L");  // We return 0
-            }
-            else
+            if (!signature)
             {
-                if (!signature)
-                    getSignature(signature, JNIenv, javaClass, funcname);
-                StringBuffer javaSignature;
-                patchSignature(javaSignature, signature);
-
-                if (nonStatic)
-                    javaMethodID = JNIenv->GetMethodID(javaClass, methodName, javaSignature);
+                getSignature(signature, JNIenv, javaClass, funcname);
+                if (signature.str()[0]=='@')
+                {
+                    nonStatic = true;
+                    signature.set(signature.str()+1);
+                }
                 else
-                    javaMethodID = JNIenv->GetStaticMethodID(javaClass, methodName, javaSignature);
+                    nonStatic = false;
             }
+            StringBuffer javaSignature;
+            patchSignature(javaSignature, signature);
+
+            if (nonStatic)
+                javaMethodID = JNIenv->GetMethodID(javaClass, methodName, javaSignature);
+            else
+                javaMethodID = JNIenv->GetStaticMethodID(javaClass, methodName, javaSignature);
             if (checkedClassName)
             {
                 StringBuffer myClassName;
                 getClassNameForObject(JNIenv, myClassName, instance);
-                DBGLOG("Checking class name %s matches %s for function %s", myClassName.str(), checkedClassName.str(), methodName.str());
+#ifdef CHECK_JNI
+                DBGLOG("Checking class name %s for %p matches %s for function %s", myClassName.str(), instance, checkedClassName.str(), methodName.str());
+#endif
                 const char *shortClassName = strrchr(myClassName, '.');
                 if (shortClassName)
                     shortClassName++;
@@ -4319,23 +4325,16 @@ protected:
     StringBuffer &getScopeKey(StringBuffer &ret)
     {
         if (globalScopeKey)
-            ret.append(globalScopeKey);
-        else
-        {
-            // MORE - we could key off the class name, but we don't always supply that.
-            // Plus it's mangled as often as not
-            const char *dotpos = strrchr(importName, '.');
-            if (dotpos)
-                ret.append(dotpos-importName+1, importName);
-            else
-                throw MakeStringException(0, "javaembed: cannot determine key for persist in function %s", importName.get());
-        }
-        ret.append('.');
+            ret.append(globalScopeKey).append('.');
+        ret.append(classname).append('.');
         switch (persistMode)
         {
         case persistGlobal:
             ret.append("global");
             break;
+        case persistChannel:
+            ret.append(nodeNum).append('.');
+            // Fall into
         case persistWorkunit:
             engine->getQueryId(ret, true);
             break;
@@ -4350,10 +4349,12 @@ protected:
     CheckedJNIEnv *JNIenv = nullptr;
     jvalue result = {0};
     StringAttr classpath;
+    StringBuffer classname;
     IArrayOf<ECLDatasetIterator> iterators;   // to make sure they get freed
     bool nonStatic = false;
     jobject instance = nullptr; // class instance of object to call methods on
 
+    unsigned nodeNum = 0;
     StringAttr globalScopeKey;
     PersistMode persistMode = persistNone;  // Defines the lifetime of the java object for which this is called.
 

+ 1 - 3
testing/regress/ecl/javaembed_ex11.ecl

@@ -20,7 +20,7 @@
 
 IMPORT Java;
 
-UNSIGNED JavaAccumulator(INTEGER initial) := EMBED(Java)
+UNSIGNED JavaAccumulator(INTEGER initial) := EMBED(Java : persist('Global'))
 public class JavaAccumulator
 {
   public JavaAccumulator(int initial) { tot = initial; }
@@ -41,7 +41,6 @@ ENDEMBED;
 
 INTEGER accumulate(UNSIGNED p, INTEGER val) := IMPORT(Java, 'JavaAccumulator::accumulate');
 INTEGER clear(UNSIGNED p) := IMPORT(Java, 'JavaAccumulator::clear');
-release(UNSIGNED p) := IMPORT(Java, '~JavaAccumulator'); // After calling this the java object p is no longer usable
 
 a := JavaAccumulator(35) : INDEPENDENT;
 
@@ -52,5 +51,4 @@ ORDERED
   accumulate(a, 3);
   clear(a);
   accumulate(a, 10);
-  release(a);  
 );

+ 0 - 2
testing/regress/ecl/javaembed_ex12.ecl

@@ -41,7 +41,6 @@ ENDEMBED;
 
 integer accumulate(unsigned p, integer val) := IMPORT(Java, 'accumulate');
 integer clear(unsigned p) := IMPORT(Java, 'clear');
-release(unsigned p) := IMPORT(Java, '~persister'); // After calling this the java object p is no longer usable
 
 p := persister(35) : independent;
 
@@ -51,5 +50,4 @@ sequential(
   accumulate(p, 3);
   clear(p);
   accumulate(p, 10);
-  release(p);  
 );

+ 1 - 4
testing/regress/ecl/javaembed_ex13.ecl

@@ -21,7 +21,7 @@
 
 IMPORT Java;
 
-unsigned persister(integer initial) := EMBED(Java)
+unsigned persister(integer initial) := EMBED(Java : PERSIST('thread'))
 public class persister
 {
   public persister(int initial) { tot = initial; }
@@ -42,7 +42,6 @@ ENDEMBED;
 
 integer accumulate(unsigned p, integer val) := IMPORT(Java, 'accumulate');
 integer clear(unsigned p) := IMPORT(Java, 'clear');
-unsigned release(unsigned p) := IMPORT(Java, '~persister'); // After calling this the java object p is no longer usable
 
 r := record
   integer i;
@@ -59,5 +58,3 @@ d1 := DATASET([{1}, {2}, {3}], r);
 accumulated := ITERATE(d1, t(LEFT, RIGHT), LOCAL);
 
 OUTPUT(accumulated, {i});
-objects := TABLE(GROUP(accumulated, TRUE, LOCAL), {px := MAX(GROUP,p)});
-APPLY(objects,EVALUATE(release(px)));