Forráskód Böngészése

Merge pull request #13155 from richardkchapman/hpcc23130

HPCC-23130 Re-enable threadLocal option for embedded Java

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 éve
szülő
commit
d53436fd0e

+ 4 - 1
ecl/hqlcpp/hqlcpp.cpp

@@ -12121,7 +12121,10 @@ void HqlCppTranslator::buildScriptFunctionDefinition(BuildCtx &ctx, IHqlExpressi
         createParam.append(",NULL");
     createParam.append(");");
     funcctx.addQuoted(createParam);
-    funcctx.addQuoted("EmbedContextBlock __b(__ctx);");
+    if (threadlocal)
+        funcctx.addQuoted("EmbedContextBlock __b(__ctx, ctx);");
+    else
+        funcctx.addQuoted("EmbedContextBlock __b(__ctx);");
     OwnedHqlExpr ctxVar = createVariable("__ctx", makeBoolType());
 
     HqlExprArray scriptArgs;

+ 1 - 0
plugins/Rembed/Rembed.cpp

@@ -1414,6 +1414,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
 
     virtual void callFunction()

+ 1 - 0
plugins/cassandra/cassandraembed.cpp

@@ -1830,6 +1830,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
 
 protected:

+ 1 - 0
plugins/couchbase/couchbaseembed.hpp

@@ -457,6 +457,7 @@ namespace couchbaseembed
                UNSUPPORTED("loadCompiledScript");
            }
            virtual void enter() override {}
+           virtual void reenter(ICodeContext *codeCtx) override {}
            virtual void exit() override {}
        protected:
            void execute();

+ 1 - 1
plugins/javaembed/java.ecllib

@@ -27,5 +27,5 @@ EXPORT syntaxCheck := Language.syntaxCheck;
 EXPORT checkImport := Language.checkImport;
 EXPORT boolean supportsImport := true;
 EXPORT boolean supportsScript := true;
-EXPORT boolean threadlocal := false;
+EXPORT boolean threadlocal := true;
 EXPORT boolean singletonEmbedContext := true;

+ 57 - 12
plugins/javaembed/javaembed.cpp

@@ -2319,6 +2319,11 @@ public:
 // the C++ thread and the java threading library, ensuring that we register/unregister as needed,
 // and that any thread_local function contexts are destroyed before we detach from the java thread
 
+interface IJavaEmbedFunctionContext : public IEmbedFunctionContext
+{
+    virtual void endThread() = 0;
+};
+
 class JavaThreadContext
 {
 public:
@@ -2342,7 +2347,15 @@ public:
         // exiting."
         globalState->javaVM->DetachCurrentThread();
     }
-
+    void endThread()
+    {
+        persistedObjects.kill();
+        ForEachItemIn(idx, contexts)
+        {
+            auto &context = contexts.item(idx);
+            context.endThread();
+        }
+    }
     jobject getSystemClassLoader()
     {
         jobject systemClassLoaderObj = JNIenv->CallStaticObjectMethod(javaLangClassLoaderClass, cl_getSystemClassLoader);
@@ -2370,7 +2383,7 @@ public:
         JavaObjectXmlWriter x(JNIenv, result, name, *esdl, esdlservice, *writer);
         x.write();
     }
-    void registerContext(IEmbedFunctionContext *ctx)
+    void registerContext(IJavaEmbedFunctionContext *ctx)
     {
         // Note - this object is thread-local so no need for a critsec
         contexts.append(*ctx);
@@ -2421,7 +2434,7 @@ public:
         return p->instance;
     }
 private:
-    IArrayOf<IEmbedFunctionContext> contexts;
+    IArrayOf<IJavaEmbedFunctionContext> contexts;
     StringMapOf<PersistedObject> persistedObjects = { false };
     StringMapOf<PersistedObject> loaders = { false };
 };
@@ -3196,11 +3209,11 @@ private:
 // Objects of class JavaEmbedImportContext are created locally for each call of a function, or thread-local to persist from one call to the next.
 // Methods in here do not need to be thread-safe
 
-class JavaEmbedImportContext : public CInterfaceOf<IEmbedFunctionContext>
+class JavaEmbedImportContext : public CInterfaceOf<IJavaEmbedFunctionContext>
 {
 public:
-    JavaEmbedImportContext(ICodeContext *codeCtx, JavaThreadContext *_sharedCtx, jobject _instance, unsigned flags, const char *options, const IThorActivityContext *_activityContext)
-    : sharedCtx(_sharedCtx), JNIenv(sharedCtx->JNIenv), instance(_instance), activityContext(_activityContext)
+    JavaEmbedImportContext(ICodeContext *codeCtx, JavaThreadContext *_sharedCtx, jobject _instance, unsigned _flags, const char *options, const IThorActivityContext *_activityContext)
+    : sharedCtx(_sharedCtx), JNIenv(sharedCtx->JNIenv), instance(_instance), flags(_flags), activityContext(_activityContext)
     {
         argcount = 0;
         argsig = NULL;
@@ -3271,6 +3284,21 @@ public:
         if (classLoader)
             JNIenv->DeleteGlobalRef(classLoader);
     }
+    virtual void endThread() override
+    {
+        instance = nullptr;
+        if (javaClass)
+        {
+            JNIenv->DeleteGlobalRef(javaClass);
+            javaClass = nullptr;
+        }
+        if (classLoader)
+        {
+            JNIenv->DeleteGlobalRef(classLoader);
+            classLoader = nullptr;
+        }
+        javaMethodID = nullptr;
+    }
 
     virtual bool getBooleanResult()
     {
@@ -4249,22 +4277,35 @@ public:
     }
     virtual void enter() override
     {
+        reenter(nullptr);
+    }
+    virtual void reenter(ICodeContext *codeCtx) override
+    {
         // If we rejig codegen to only call loadCompiledScript etc at construction time, then this will need to do the reinit()
         // until we do, it's too early
 
+        if (codeCtx)
+            engine = codeCtx->queryEngineContext();
+        else if (flags & EFthreadlocal && persistMode > persistThread)
+        {
+            StringBuffer s;
+            throw MakeStringException(0, "javaembed: In method %s: Workunit must be recompiled to support this persist mode", getReportName(s).str());
+        }
+
         // Create a new frame for local references and increase the capacity
         // of those references to 64 (default is 16)
+
         JNIenv->PushLocalFrame(64);
     }
     virtual void exit() override
     {
         if (persistMode==persistNone)
             instance = 0;  // otherwise we leave it for next call as it saves a lot of time looking it up
-        JNIenv->PopLocalFrame(nullptr);
         iterators.kill();
 #ifdef FORCE_GC
         forceGC(JNIenv);
 #endif
+        JNIenv->PopLocalFrame(nullptr);
     }
 
 protected:
@@ -4565,6 +4606,7 @@ protected:
     jobject instance = nullptr; // class instance of object to call methods on
     const IThorActivityContext *activityContext = nullptr;
 
+    unsigned flags = 0;
     unsigned nodeNum = 0;
     StringAttr globalScopeKey;
     PersistMode persistMode = persistNone;  // Defines the lifetime of the java object for which this is called.
@@ -4598,17 +4640,20 @@ protected:
 static __thread JavaThreadContext* threadContext;  // We reuse per thread, for speed
 static __thread ThreadTermFunc threadHookChain;
 
-static void releaseContext()
+static void releaseContext(bool isPooled)
 {
     if (threadContext)
     {
-        delete threadContext;
-        threadContext = NULL;
+        threadContext->endThread();
+        if (!isPooled)
+        {
+            delete threadContext;
+            threadContext = NULL;
+        }
     }
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = NULL;
+        (*threadHookChain)(isPooled);
     }
 }
 

+ 2 - 4
plugins/memcached/memcachedplugin.cpp

@@ -144,7 +144,7 @@ public :
     }
 } mainThread;
 
-static void releaseContext()
+static void releaseContext(bool isPooled)
 {
     if (cachedConnection)
     {
@@ -153,10 +153,8 @@ static void releaseContext()
     }
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = NULL;
+        (*threadHookChain)(isPooled);
     }
-    threadHooked = false;
 }
 MCached * createConnection(ICodeContext * ctx, const char * options)
 {

+ 3 - 3
plugins/mysql/mysqlembed.cpp

@@ -1378,15 +1378,14 @@ static bool mysqlInitialized = false;
 static __thread bool mysqlThreadInitialized = false;
 static CriticalSection initCrit;
 
-static void terminateMySqlThread()
+static void terminateMySqlThread(bool isPooled)
 {
     MySQLConnection::clearThreadCache();
     mysql_thread_end();
     mysqlThreadInitialized = false;  // In case it was a threadpool thread...
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = NULL;
+        (*threadHookChain)(isPooled);
     }
 }
 
@@ -1674,6 +1673,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
 protected:
     void lazyExecute()

+ 4 - 3
plugins/py3embed/py3embed.cpp

@@ -270,7 +270,7 @@ private:
 static __thread PythonThreadContext* threadContext;  // We reuse per thread, for speed
 static __thread ThreadTermFunc threadHookChain;
 
-static void releaseContext()
+static void releaseContext(bool isPooled)
 {
     if (threadContext)
     {
@@ -279,8 +279,7 @@ static void releaseContext()
     }
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = NULL;
+        (*threadHookChain)(isPooled);
     }
 }
 
@@ -1844,6 +1843,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
     virtual void setActivityOptions(const IThorActivityContext *ctx) override
     {
@@ -1918,6 +1918,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
     virtual void callFunction()
     {

+ 4 - 3
plugins/pyembed/pyembed.cpp

@@ -264,7 +264,7 @@ private:
 static __thread PythonThreadContext* threadContext;  // We reuse per thread, for speed
 static __thread ThreadTermFunc threadHookChain;
 
-static void releaseContext()
+static void releaseContext(bool isPooled)
 {
     if (threadContext)
     {
@@ -273,8 +273,7 @@ static void releaseContext()
     }
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = NULL;
+        (*threadHookChain)(isPooled);
     }
 }
 
@@ -1837,6 +1836,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
     virtual void setActivityOptions(const IThorActivityContext *ctx) override
     {
@@ -1911,6 +1911,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
     virtual void callFunction()
     {

+ 3 - 5
plugins/redis/redis.cpp

@@ -258,7 +258,7 @@ public :
 
     Owned<Connection> connection;
 };
-static void releaseAllCachedContexts()
+static void releaseAllCachedContexts(bool isPooled)
 {
     if (cachedConnection)
     {
@@ -277,10 +277,8 @@ static void releaseAllCachedContexts()
     }
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = nullptr;
+        (*threadHookChain)(isPooled);
     }
-    threadHooked = false;
 }
 //The following class is here to ensure destruction of the cachedConnection within the main thread
 //as this is not handled by the thread hook mechanism.
@@ -288,7 +286,7 @@ static class MainThreadCachedConnection
 {
 public :
     MainThreadCachedConnection() { }
-    ~MainThreadCachedConnection() { releaseAllCachedContexts(); }
+    ~MainThreadCachedConnection() { releaseAllCachedContexts(false); }
 } mainThread;
 
 Connection::Connection(ICodeContext * ctx, const char * _options, const char * _ip, int _port, bool parseOptions, int _database, const char * password, unsigned _timeout, bool selectDB)

+ 1 - 0
plugins/sqlite3/sqlite3.cpp

@@ -571,6 +571,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
 protected:
     sqlite3_value *getScalarResult()

+ 3 - 3
plugins/v8embed/v8embed.cpp

@@ -897,6 +897,7 @@ public:
         throwUnexpected();
     }
     virtual void enter() override {}
+    virtual void reenter(ICodeContext *codeCtx) override {}
     virtual void exit() override {}
     virtual void importFunction(size32_t lenChars, const char *utf)
     {
@@ -937,7 +938,7 @@ protected:
 static __thread V8JavascriptEmbedFunctionContext * theFunctionContext;  // We reuse per thread, for speed
 static __thread ThreadTermFunc threadHookChain;
 
-static void releaseContext()
+static void releaseContext(bool isPooled)
 {
     if (theFunctionContext)
     {
@@ -946,8 +947,7 @@ static void releaseContext()
     }
     if (threadHookChain)
     {
-        (*threadHookChain)();
-        threadHookChain = NULL;
+        (*threadHookChain)(isPooled);
     }
 }
 

+ 8 - 4
rtl/eclrtl/eclrtl.cpp

@@ -334,15 +334,15 @@ private:
 typedef MapStringTo<RTLUnicodeConverter, char const *> MapStrToUnicodeConverter;
 static __thread MapStrToUnicodeConverter *unicodeConverterMap = NULL;
 static __thread ThreadTermFunc prevThreadTerminator = NULL;
+static __thread bool threadHooked = false;
 
-static void clearUnicodeConverterMap()
+static void clearUnicodeConverterMap(bool isPooled)
 {
     delete unicodeConverterMap;
     unicodeConverterMap = NULL;  // Important to clear, as this is called when threadpool threads end...
     if (prevThreadTerminator)
     {
-        (*prevThreadTerminator)();
-        prevThreadTerminator = NULL;
+        (*prevThreadTerminator)(isPooled);
     }
 }
 
@@ -353,7 +353,11 @@ RTLUnicodeConverter * queryRTLUnicodeConverter(char const * codepage)
         unicodeConverterMap = new MapStrToUnicodeConverter;
         // Use thread terminator hook to clear them up on thread exit.
         // NB: May need to revisit if not on a jlib Thread.
-        prevThreadTerminator = addThreadTermFunc(clearUnicodeConverterMap);
+        if (!threadHooked)
+        {
+            prevThreadTerminator = addThreadTermFunc(clearUnicodeConverterMap);
+            threadHooked = true;
+        }
     }
     RTLUnicodeConverter * conv = unicodeConverterMap->getValue(codepage);
     if(!conv)

+ 6 - 1
rtl/eclrtl/eclrtl.hpp

@@ -803,6 +803,7 @@ interface IRowStream;
 ECLRTL_API IRowStream * createRowStream(size32_t count, const byte * * rowset);
 
 //-----------------------------------------------------------------------------
+interface ICodeContext;
 struct RtlTypeInfo;
 class ARowBuilder;
 interface IEmbedFunctionContext : extends IInterface
@@ -849,6 +850,7 @@ interface IEmbedFunctionContext : extends IInterface
     // If reusing a context, need to call these before using/after using
     virtual void enter() = 0;
     virtual void exit() = 0;
+    virtual void reenter(ICodeContext *ctx) = 0;
 };
 
 class EmbedContextBlock
@@ -858,6 +860,10 @@ public:
     {
         ctx->enter();
     }
+    EmbedContextBlock(IEmbedFunctionContext *_ctx, ICodeContext *codeCtx) : ctx(_ctx)
+    {
+        ctx->reenter(codeCtx);
+    }
     ~EmbedContextBlock()
     {
         ctx->exit();
@@ -873,7 +879,6 @@ interface IEmbedServiceContext : extends IInterface
 
 enum EmbedFlags { EFembed = 1, EFimport = 2, EFnoreturn = 4, EFnoparams = 8, EFthreadlocal = 16 }; // For createFunctionContext flags
 
-interface ICodeContext;
 interface IThorActivityContext;
 interface IEmbedContext : extends IInterface
 {

+ 6 - 7
system/jlib/jthread.cpp

@@ -48,25 +48,24 @@ ThreadTermFunc addThreadTermFunc(ThreadTermFunc onTerm)
     return old;
 }
 
-void callThreadTerminationHooks()
+void callThreadTerminationHooks(bool isPooled)
 {
     if (threadTerminationHook)
     {
-        (*threadTerminationHook)();
-        threadTerminationHook = NULL;
+        (*threadTerminationHook)(isPooled);
     }
 }
 
 PointerArray *exceptionHandlers = NULL;
 MODULE_INIT(INIT_PRIORITY_JTHREAD)
 {
-    if (threadTerminationHook)
-        (*threadTerminationHook)();  // May be too late :(
     exceptionHandlers = new PointerArray();
     return true;
 }
 MODULE_EXIT()
 {
+    if (threadTerminationHook)
+        (*threadTerminationHook)(false);  // May be too late :(
     delete exceptionHandlers;
 }
 
@@ -276,7 +275,7 @@ int Thread::begin()
         handleException(MakeStringException(0, "Unknown exception in Thread %s", getName()));
     }
 #endif
-    callThreadTerminationHooks();
+    callThreadTerminationHooks(false);
 #ifdef _WIN32
 #ifndef _DEBUG
     CloseHandle(hThread);   // leak handle when debugging, 
@@ -928,7 +927,7 @@ public:
                 handleException(MakeStringException(0, "Unknown exception in Thread from pool %s", parent.poolname.get()));
             }
 #endif
-            callThreadTerminationHooks();    // Reset any pre-thread state.
+            callThreadTerminationHooks(true);    // Reset any per-thread state.
         } while (parent.notifyStopped(this));
         return 0;
     }

+ 3 - 3
system/jlib/jthread.hpp

@@ -60,15 +60,15 @@ extern jlib_decl unsigned threadLogID();  // for use in logging
 // so the hook function should clear any variables if necessary rather than assuming that they will be cleared
 // at thread startup time.
 
-typedef void (*ThreadTermFunc)();
+typedef void (*ThreadTermFunc)(bool isPooled);
 extern jlib_decl ThreadTermFunc addThreadTermFunc(ThreadTermFunc onTerm);
-extern jlib_decl void callThreadTerminationHooks();
+extern jlib_decl void callThreadTerminationHooks(bool isPooled);
 
 //An exception safe way of ensuring that the thread termination hooks are called.
 class jlib_decl QueryTerminationCleanup
 {
 public:
-    inline ~QueryTerminationCleanup() { callThreadTerminationHooks(); }
+    inline ~QueryTerminationCleanup() { callThreadTerminationHooks(true); }
 };
 
 class jlib_decl Thread : public CInterface, public IThread

+ 1 - 1
thorlcr/graph/thgraph.cpp

@@ -2792,7 +2792,7 @@ void CJobBase::endJob()
     {
         jobChannels.kill(); // avoiding circular references. Kill before other CJobBase components are destroyed that channels reference.
         ::Release(userDesc);
-        callThreadTerminationHooks(); // must call any installed thread termination functions, before unloading plugins
+        callThreadTerminationHooks(true); // must call any installed thread termination functions, before unloading plugins
         ::Release(pluginMap);
 
         traceMemUsage();