瀏覽代碼

HPCC-23117 Embedded java not working properly in Roxie

A proper fix for the issues caused by thread-local object reuse by pooled
threads is beyond the scope of 7.2.x, so turn off the use of threadLocal in
Java for now.

That revealed some issues in the implementation of PERSIST('thread') when
threadLocal mode not available, so fix those too.

Also fix behaviour of PERSIST('channel') in Roxie for consistency.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父節點
當前提交
ad4d6cf0a4
共有 4 個文件被更改,包括 251 次插入22 次删除
  1. 1 1
      plugins/javaembed/java.ecllib
  2. 37 21
      plugins/javaembed/javaembed.cpp
  3. 108 0
      testing/regress/ecl/javascope.ecl
  4. 105 0
      testing/regress/ecl/key/javascope.xml

+ 1 - 1
plugins/javaembed/java.ecllib

@@ -27,5 +27,5 @@ EXPORT syntaxCheck := Language.syntaxCheck;
 EXPORT checkImport := Language.checkImport;
 EXPORT boolean supportsImport := true;
 EXPORT boolean supportsScript := true;
-EXPORT boolean threadlocal := true;
+EXPORT boolean threadlocal := false;
 EXPORT boolean singletonEmbedContext := true;

+ 37 - 21
plugins/javaembed/javaembed.cpp

@@ -627,6 +627,9 @@ public:
     inline void DeleteGlobalRef(jobject val)
     {
         JNIEnv::DeleteGlobalRef(val);
+#ifdef FORCE_GC
+        forceGC(this);
+#endif
     }
     inline jobject NewGlobalRef(jobject val, const char *)
     {
@@ -806,6 +809,16 @@ class PersistedObject : public MappingBase
 {
 public:
     PersistedObject(const char *_name) : name(_name) {}
+    ~PersistedObject()
+    {
+        if (instance)
+        {
+#ifdef TRACE_GLOBALREF
+            DBGLOG("DeleteGlobalRef(singleton): %p", instance);
+#endif
+            queryJNIEnv()->DeleteGlobalRef(instance);
+        }
+    }
     CriticalSection crit;
     jobject instance = nullptr;
     StringAttr name;
@@ -999,14 +1012,6 @@ public:
     void doUnregister(const char *key)
     {
         CriticalBlock b(hashCrit);
-        PersistedObject *p = persistedObjects.find(key);
-        if (p && p->instance)
-        {
-            queryJNIEnv()->DeleteGlobalRef(p->instance);
-#ifdef FORCE_GC
-            forceGC(queryJNIEnv());
-#endif
-        }
         persistedObjects.remove(key);
     }
     static void unregister(const char *key);
@@ -2314,9 +2319,10 @@ public:
     }
     ~JavaThreadContext()
     {
-        // Make sure all thread-local function contexts are destroyed before we detach from
+        // Make sure all thread-local function contexts and saved objects are destroyed before we detach from
         // the Java thread
         contexts.kill();
+        persistedObjects.kill();
         // According to the Java VM 1.7 docs, "A native thread attached to
         // the VM must call DetachCurrentThread() to detach itself before
         // exiting."
@@ -2355,8 +2361,23 @@ public:
         // Note - this object is thread-local so no need for a critsec
         contexts.append(*ctx);
     }
+
+    PersistedObject *getLocalObject(CheckedJNIEnv *JNIenv, const char *name)
+    {
+        // Note - this object is thread-local so no need for a critsec
+        PersistedObject *p;
+        p = persistedObjects.find(name);
+        if (!p)
+        {
+            p = new PersistedObject(name);
+            persistedObjects.replaceOwn(*p);
+        }
+        p->crit.enter();  // needed to keep code common between local/global cases
+        return p;
+    }
 private:
     IArrayOf<IEmbedFunctionContext> contexts;
+    StringMapOf<PersistedObject> persistedObjects = { false };
 };
 
 class JavaXmlBuilder : implements IXmlWriterExt, public CInterface
@@ -3199,12 +3220,6 @@ public:
     }
     ~JavaEmbedImportContext()
     {
-        if (persistMode == persistThread)
-        {
-            StringBuffer scopeKey;
-            getScopeKey(scopeKey);
-            JavaGlobalState::unregister(scopeKey);
-        }
         if (javaClass)
             JNIenv->DeleteGlobalRef(javaClass);
         if (classLoader)
@@ -4079,7 +4094,7 @@ public:
                         StringBuffer scopeKey;
                         getScopeKey(scopeKey);
                         PersistedObjectCriticalBlock persistBlock;
-                        persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
+                        persistBlock.enter(persistMode==persistThread ? sharedCtx->getLocalObject(JNIenv, scopeKey) : globalState->getGlobalObject(JNIenv, scopeKey));
                         instance = persistBlock.getInstance();
                         if (instance)
                             persistBlock.leave();
@@ -4094,7 +4109,7 @@ public:
                             if (persistMode==persistQuery || persistMode==persistWorkunit || persistMode==persistChannel)
                             {
                                 assertex(engine);
-                                engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode!=persistQuery);
+                                engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode==persistWorkunit);
                             }
                             persistBlock.leave(instance);
                         }
@@ -4343,7 +4358,7 @@ protected:
                     // If a persist scope is specified, we may want to use a pre-existing object. If we do we share its classloader, class, etc.
                     assertex(classname.length());  // MORE - what does this imply?
                     getScopeKey(scopeKey);
-                    persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
+                    persistBlock.enter(persistMode==persistThread ? sharedCtx->getLocalObject(JNIenv, scopeKey) : globalState->getGlobalObject(JNIenv, scopeKey));
                     instance = persistBlock.getInstance();
                     if (instance)
                         persistBlock.leave();
@@ -4365,6 +4380,7 @@ protected:
                     try
                     {
                         javaClass = (jclass) JNIenv->CallObjectMethod(classLoader, loadClassMethod, JNIenv->NewStringUTF(classname));
+                        DBGLOG("Using javaClass %p from classLoader %p", javaClass, classLoader);
                     }
                     catch (IException *E)
                     {
@@ -4387,7 +4403,7 @@ protected:
                         if (persistMode==persistQuery || persistMode==persistWorkunit || persistMode==persistChannel)
                         {
                             assertex(engine);
-                            engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode!=persistQuery);
+                            engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode==persistWorkunit);
                         }
                         persistBlock.leave(instance);
                     }
@@ -4476,12 +4492,12 @@ protected:
         case persistGlobal:
             ret.append("global");
             break;
-        case persistChannel:
-            ret.append(nodeNum).append('.');
             // Fall into
         case persistWorkunit:
             engine->getQueryId(ret, true);
             break;
+        case persistChannel:
+            ret.append(nodeNum).append('.');
         case persistQuery:
             engine->getQueryId(ret, false);
             break;

+ 108 - 0
testing/regress/ecl/javascope.ecl

@@ -0,0 +1,108 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+IMPORT Java;
+
+//version forceNonThread=false
+//version forceNonThread=true
+
+import ^ as root;
+forceNonThread := #IFDEFINED(root.forceNonThread, false);
+
+// Check that implicitly-created objects have appropriate lifetime
+
+implicit(STRING p, STRING s) := MODULE
+  STRING st := ''
+#if (forceNonThread)
+   : STORED('st')
+#end
+  ;
+
+  EXPORT INTEGER accumulate(INTEGER b) := EMBED(Java : PERSIST(p), GLOBALSCOPE(s+st))
+    class x
+    {
+      public x()
+      {
+        synchronized (x.class)
+        { 
+          idx = nextIdx;
+          nextIdx = nextIdx+1;
+        }
+//        System.out.println("created  " + idx + x.class.getName());
+      }
+      public void finalize()
+      {
+//        System.out.println("finalize " + n + " " + idx);
+      }
+      public synchronized int accumulate(int b)
+      {
+        n = n + b;
+        return n;
+      }
+      int n = 0;
+      int idx = 0;
+      static int nextIdx = 0;
+    }
+  ENDEMBED;
+
+  SHARED r := RECORD
+    integer a; 
+  END;
+
+
+  // The parallel test runs all on separate threads (except the last two calls) to ensure that separate threads
+  // are independent when using PERSIST('thread') or PERSIST('none')
+
+  EXPORT ptest := PARALLEL (
+    output(p + ': parallel');
+    output(project(nofold(dataset([{1}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{2}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{3}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{10}], r)), transform(r, self.a := accumulate(LEFT.a)+accumulate(LEFT.a*2))));
+  );
+
+  // The sequential test runs sequentially for ones that are supposed to interact across threads (otherwise results are indeterminate)
+
+  EXPORT stest := SEQUENTIAL (
+    output(p + ': sequential');
+    output(project(nofold(dataset([{1}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{2}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{3}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{10}], r)), transform(r, self.a := accumulate(LEFT.a)+accumulate(LEFT.a*2))));
+  );
+
+END;
+
+gc() := EMBED(Java)
+  public static void gc()
+  {
+    System.gc();
+  }
+ENDEMBED;
+sequential(
+  implicit('','').ptest;
+  implicit('','').stest;
+  implicit('thread','').ptest;
+  implicit('channel','').stest;
+  implicit('query','').stest;
+  implicit('workunit','').stest;
+  implicit('global','').stest;
+//  gc();
+);
+
+// Check that explicitly-created objects have appropriate lifetime (how?) but are not shared
+

+ 105 - 0
testing/regress/ecl/key/javascope.xml

@@ -0,0 +1,105 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>: parallel</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>: sequential</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>thread: parallel</Result_11></Row>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><a>40</a></Row>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><Result_16>channel: sequential</Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>query: sequential</Result_21></Row>
+</Dataset>
+<Dataset name='Result 22'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 23'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>workunit: sequential</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 28'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 29'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 30'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 31'>
+ <Row><Result_31>global: sequential</Result_31></Row>
+</Dataset>
+<Dataset name='Result 32'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 33'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 34'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 35'>
+ <Row><a>52</a></Row>
+</Dataset>