瀏覽代碼

Merge branch 'candidate-7.2.x' into candidate-7.4.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 年之前
父節點
當前提交
9a9a10e89e

+ 1 - 1
plugins/javaembed/java.ecllib

@@ -27,5 +27,5 @@ EXPORT syntaxCheck := Language.syntaxCheck;
 EXPORT checkImport := Language.checkImport;
 EXPORT boolean supportsImport := true;
 EXPORT boolean supportsScript := true;
-EXPORT boolean threadlocal := true;
+EXPORT boolean threadlocal := false;
 EXPORT boolean singletonEmbedContext := true;

+ 36 - 21
plugins/javaembed/javaembed.cpp

@@ -629,6 +629,9 @@ public:
     inline void DeleteGlobalRef(jobject val)
     {
         JNIEnv::DeleteGlobalRef(val);
+#ifdef FORCE_GC
+        forceGC(this);
+#endif
     }
     inline jobject NewGlobalRef(jobject val, const char *)
     {
@@ -816,6 +819,16 @@ class PersistedObject : public MappingBase
 {
 public:
     PersistedObject(const char *_name) : name(_name) {}
+    ~PersistedObject()
+    {
+        if (instance)
+        {
+#ifdef TRACE_GLOBALREF
+            DBGLOG("DeleteGlobalRef(singleton): %p", instance);
+#endif
+            queryJNIEnv()->DeleteGlobalRef(instance);
+        }
+    }
     CriticalSection crit;
     jobject instance = nullptr;
     StringAttr name;
@@ -1009,14 +1022,6 @@ public:
     void doUnregister(const char *key)
     {
         CriticalBlock b(hashCrit);
-        PersistedObject *p = persistedObjects.find(key);
-        if (p && p->instance)
-        {
-            queryJNIEnv()->DeleteGlobalRef(p->instance);
-#ifdef FORCE_GC
-            forceGC(queryJNIEnv());
-#endif
-        }
         persistedObjects.remove(key);
     }
     static void unregister(const char *key);
@@ -2322,9 +2327,10 @@ public:
     }
     ~JavaThreadContext()
     {
-        // Make sure all thread-local function contexts are destroyed before we detach from
+        // Make sure all thread-local function contexts and saved objects are destroyed before we detach from
         // the Java thread
         contexts.kill();
+        persistedObjects.kill();
         // According to the Java VM 1.7 docs, "A native thread attached to
         // the VM must call DetachCurrentThread() to detach itself before
         // exiting."
@@ -2363,8 +2369,23 @@ public:
         // Note - this object is thread-local so no need for a critsec
         contexts.append(*ctx);
     }
+
+    PersistedObject *getLocalObject(CheckedJNIEnv *JNIenv, const char *name)
+    {
+        // Note - this object is thread-local so no need for a critsec
+        PersistedObject *p;
+        p = persistedObjects.find(name);
+        if (!p)
+        {
+            p = new PersistedObject(name);
+            persistedObjects.replaceOwn(*p);
+        }
+        p->crit.enter();  // needed to keep code common between local/global cases
+        return p;
+    }
 private:
     IArrayOf<IEmbedFunctionContext> contexts;
+    StringMapOf<PersistedObject> persistedObjects = { false };
 };
 
 class JavaXmlBuilder : implements IXmlWriterExt, public CInterface
@@ -3207,12 +3228,6 @@ public:
     }
     ~JavaEmbedImportContext()
     {
-        if (persistMode == persistThread)
-        {
-            StringBuffer scopeKey;
-            getScopeKey(scopeKey);
-            JavaGlobalState::unregister(scopeKey);
-        }
         if (javaClass)
             JNIenv->DeleteGlobalRef(javaClass);
         if (classLoader)
@@ -4105,7 +4120,7 @@ public:
                         StringBuffer scopeKey;
                         getScopeKey(scopeKey);
                         PersistedObjectCriticalBlock persistBlock;
-                        persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
+                        persistBlock.enter(persistMode==persistThread ? sharedCtx->getLocalObject(JNIenv, scopeKey) : globalState->getGlobalObject(JNIenv, scopeKey));
                         instance = persistBlock.getInstance();
                         if (instance)
                             persistBlock.leave();
@@ -4120,7 +4135,7 @@ public:
                             if (persistMode==persistQuery || persistMode==persistWorkunit || persistMode==persistChannel)
                             {
                                 assertex(engine);
-                                engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode!=persistQuery);
+                                engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode==persistWorkunit);
                             }
                             persistBlock.leave(instance);
                         }
@@ -4372,7 +4387,7 @@ protected:
                     // If a persist scope is specified, we may want to use a pre-existing object. If we do we share its classloader, class, etc.
                     assertex(classname.length());  // MORE - what does this imply?
                     getScopeKey(scopeKey);
-                    persistBlock.enter(globalState->getGlobalObject(JNIenv, scopeKey));
+                    persistBlock.enter(persistMode==persistThread ? sharedCtx->getLocalObject(JNIenv, scopeKey) : globalState->getGlobalObject(JNIenv, scopeKey));
                     instance = persistBlock.getInstance();
                     if (instance)
                         persistBlock.leave();
@@ -4416,7 +4431,7 @@ protected:
                         if (persistMode==persistQuery || persistMode==persistWorkunit || persistMode==persistChannel)
                         {
                             assertex(engine);
-                            engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode!=persistQuery);
+                            engine->onTermination(JavaGlobalState::unregister, scopeKey.str(), persistMode==persistWorkunit);
                         }
                         persistBlock.leave(instance);
                     }
@@ -4505,12 +4520,12 @@ protected:
         case persistGlobal:
             ret.append("global");
             break;
-        case persistChannel:
-            ret.append(nodeNum).append('.');
             // Fall into
         case persistWorkunit:
             engine->getQueryId(ret, true);
             break;
+        case persistChannel:
+            ret.append(nodeNum).append('.');
         case persistQuery:
             engine->getQueryId(ret, false);
             break;

+ 5 - 1
roxie/ccd/ccdcontext.cpp

@@ -3135,7 +3135,11 @@ public:
     virtual StringBuffer &getQueryId(StringBuffer &result, bool isShared) const
     {
         if (workUnit)
-            result.append(workUnit->queryWuid()); // In workunit mode, this works for both shared and non-shared variants
+        {
+            if (isShared)
+                result.append('Q');
+            result.append(workUnit->queryWuid());
+        }
         else if (isShared)
             result.append('Q').append(factory->queryHash());
         else

+ 90 - 0
testing/regress/ecl/hthor/javascope.xml

@@ -0,0 +1,90 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>: parallel</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>: sequential</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>thread: parallel</Result_11></Row>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><Result_16>channel: sequential</Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>query: sequential</Result_21></Row>
+</Dataset>
+<Dataset name='Result 22'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 23'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>workunit: sequential</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><a>37</a></Row>
+</Dataset>
+<Dataset name='Result 28'>
+ <Row><a>39</a></Row>
+</Dataset>
+<Dataset name='Result 29'>
+ <Row><a>42</a></Row>
+</Dataset>
+<Dataset name='Result 30'>
+ <Row><a>124</a></Row>
+</Dataset>

+ 110 - 0
testing/regress/ecl/javascope.ecl

@@ -0,0 +1,110 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+IMPORT Java;
+
+//version forceNonThread=false
+//version forceNonThread=true
+
+
+import ^ as root;
+forceNonThread := #IFDEFINED(root.forceNonThread, false);
+
+// Check that implicitly-created objects have appropriate lifetime
+
+implicit(STRING p, STRING s) := MODULE
+  STRING st := ''
+#if (forceNonThread)
+   : STORED('st')
+#end
+  ;
+
+  EXPORT INTEGER accumulate(INTEGER b) := EMBED(Java : PERSIST(p), GLOBALSCOPE(s+st))
+    class x
+    {
+      public x()
+      {
+        synchronized (x.class)
+        { 
+          idx = nextIdx;
+          nextIdx = nextIdx+1;
+        }
+//        System.out.println("created  " + idx + x.class.getName());
+      }
+      public void finalize()
+      {
+//        System.out.println("finalize " + n + " " + idx);
+      }
+      public synchronized int accumulate(int b)
+      {
+        n = n + b;
+        return n;
+      }
+      int n = 0;
+      int idx = 0;
+      static int nextIdx = 0;
+    }
+  ENDEMBED;
+
+  SHARED r := RECORD
+    integer a; 
+  END;
+
+
+  // The parallel test runs all on separate threads (except the last two calls) to ensure that separate threads
+  // are independent when using PERSIST('thread') or PERSIST('none')
+
+  EXPORT ptest := PARALLEL (
+    output(p + ': parallel');
+    output(project(nofold(dataset([{1}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{2}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{3}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{10}], r)), transform(r, self.a := accumulate(LEFT.a)+accumulate(LEFT.a*2))));
+  );
+
+  // The sequential test runs sequentially for ones that are supposed to interact across threads (otherwise results are indeterminate)
+
+  EXPORT stest := ORDERED (
+    output(p + ': sequential');
+    output(project(nofold(dataset([{1}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{2}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{3}], r)), transform(r, self.a := accumulate(LEFT.a))));
+    output(project(nofold(dataset([{10}], r)), transform(r, self.a := accumulate(LEFT.a)+accumulate(LEFT.a*2))));
+  );
+
+END;
+
+gc() := EMBED(Java)
+  public static void gc()
+  {
+    System.gc();
+  }
+ENDEMBED;
+
+ORDERED (
+  implicit('','').ptest;
+  implicit('','').stest;
+  implicit('thread','').ptest;
+  implicit('channel','').stest;
+  implicit('query','').stest;
+  implicit('workunit','').stest;
+//  implicit('global','').stest;
+//  gc();
+);
+
+// Check that explicitly-created objects have appropriate lifetime (how?) but are not shared
+

+ 90 - 0
testing/regress/ecl/key/javascope.xml

@@ -0,0 +1,90 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>: parallel</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>: sequential</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>thread: parallel</Result_11></Row>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><a>40</a></Row>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><Result_16>channel: sequential</Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>query: sequential</Result_21></Row>
+</Dataset>
+<Dataset name='Result 22'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 23'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>workunit: sequential</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 28'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 29'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 30'>
+ <Row><a>52</a></Row>
+</Dataset>

+ 90 - 0
testing/regress/ecl/thor/javascope.xml

@@ -0,0 +1,90 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>: parallel</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>: sequential</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><a>30</a></Row>
+</Dataset>
+<Dataset name='Result 11'>
+ <Row><Result_11>thread: parallel</Result_11></Row>
+</Dataset>
+<Dataset name='Result 12'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 13'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 14'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><a>40</a></Row>
+</Dataset>
+<Dataset name='Result 16'>
+ <Row><Result_16>channel: sequential</Result_16></Row>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 19'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 20'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 21'>
+ <Row><Result_21>query: sequential</Result_21></Row>
+</Dataset>
+<Dataset name='Result 22'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 23'>
+ <Row><a>3</a></Row>
+</Dataset>
+<Dataset name='Result 24'>
+ <Row><a>6</a></Row>
+</Dataset>
+<Dataset name='Result 25'>
+ <Row><a>52</a></Row>
+</Dataset>
+<Dataset name='Result 26'>
+ <Row><Result_26>workunit: sequential</Result_26></Row>
+</Dataset>
+<Dataset name='Result 27'>
+ <Row><a>37</a></Row>
+</Dataset>
+<Dataset name='Result 28'>
+ <Row><a>39</a></Row>
+</Dataset>
+<Dataset name='Result 29'>
+ <Row><a>42</a></Row>
+</Dataset>
+<Dataset name='Result 30'>
+ <Row><a>124</a></Row>
+</Dataset>