Browse Source

HPCC-24492 Fix algorithm in submitWorkunit() to remove unnecessary lock acquisition
Function now uses a read-only workunit instead.
Renamed factory to globalFactory to disambiguate when it is referenced.

Signed-off-by: nhalliday <nathanhallidaywork@gmail.com>

nhalliday 4 years ago
parent
commit
0366cb78be
1 changed files with 26 additions and 27 deletions
  1. 26 27
      common/workunit/workunit.cpp

+ 26 - 27
common/workunit/workunit.cpp

@@ -6278,33 +6278,33 @@ extern WORKUNIT_API IConstWorkUnitIterator *createSecureConstWUIterator(IPropert
 
 
 
 
 static CriticalSection factoryCrit;
 static CriticalSection factoryCrit;
-static Owned<IWorkUnitFactory> factory;
+static Owned<IWorkUnitFactory> globalFactory;
 
 
 void CDaliWorkUnitFactory::clientShutdown()
 void CDaliWorkUnitFactory::clientShutdown()
 {
 {
     CriticalBlock b(factoryCrit);
     CriticalBlock b(factoryCrit);
-    factory.clear();
+    globalFactory.clear();
 }
 }
 
 
 void clientShutdownWorkUnit()
 void clientShutdownWorkUnit()
 {
 {
     CriticalBlock b(factoryCrit);
     CriticalBlock b(factoryCrit);
-    factory.clear();
+    globalFactory.clear();
 }
 }
 
 
 
 
 extern WORKUNIT_API void setWorkUnitFactory(IWorkUnitFactory * _factory)
 extern WORKUNIT_API void setWorkUnitFactory(IWorkUnitFactory * _factory)
 {
 {
     CriticalBlock b(factoryCrit);
     CriticalBlock b(factoryCrit);
-    factory.setown(_factory);
+    globalFactory.setown(_factory);
 }
 }
 
 
 extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory()
 extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory()
 {
 {
-    if (!factory)
+    if (!globalFactory)
     {
     {
         CriticalBlock b(factoryCrit);
         CriticalBlock b(factoryCrit);
-        if (!factory)   // NOTE - this "double test" paradigm is not guaranteed threadsafe on modern systems/compilers - I think in this instance that is harmless even in the (extremely) unlikely event that it resulted in the setown being called twice.
+        if (!globalFactory)   // NOTE - this "double test" paradigm is not guaranteed threadsafe on modern systems/compilers - I think in this instance that is harmless even in the (extremely) unlikely event that it resulted in the setown being called twice.
         {
         {
             const char *forceEnv = getenv("FORCE_DALI_WORKUNITS");
             const char *forceEnv = getenv("FORCE_DALI_WORKUNITS");
             bool forceDali = forceEnv && !strieq(forceEnv, "off") && !strieq(forceEnv, "0");
             bool forceDali = forceEnv && !strieq(forceEnv, "off") && !strieq(forceEnv, "0");
@@ -6327,23 +6327,23 @@ extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory()
                 }
                 }
             }
             }
             if (pluginInfo && !forceDali)
             if (pluginInfo && !forceDali)
-                factory.setown( (IWorkUnitFactory *) loadPlugin(pluginInfo));
+                globalFactory.setown( (IWorkUnitFactory *) loadPlugin(pluginInfo));
             else
             else
-                factory.setown(new CDaliWorkUnitFactory());
+                globalFactory.setown(new CDaliWorkUnitFactory());
         }
         }
     }
     }
-    return factory.getLink();
+    return globalFactory.getLink();
 }
 }
 
 
 extern WORKUNIT_API IWorkUnitFactory * getDaliWorkUnitFactory()
 extern WORKUNIT_API IWorkUnitFactory * getDaliWorkUnitFactory()
 {
 {
-    if (!factory)
+    if (!globalFactory)
     {
     {
         CriticalBlock b(factoryCrit);
         CriticalBlock b(factoryCrit);
-        if (!factory)   // NOTE - this "double test" paradigm is not guaranteed threadsafe on modern systems/compilers - I think in this instance that is harmless even in the (extremely) unlikely event that it resulted in the setown being called twice.
-            factory.setown(new CDaliWorkUnitFactory());
+        if (!globalFactory)   // NOTE - this "double test" paradigm is not guaranteed threadsafe on modern systems/compilers - I think in this instance that is harmless even in the (extremely) unlikely event that it resulted in the setown being called twice.
+            globalFactory.setown(new CDaliWorkUnitFactory());
     }
     }
-    return factory.getLink();
+    return globalFactory.getLink();
 }
 }
 
 
 // A SecureWorkUnitFactory allows the security params to be supplied once to the factory rather than being supplied to each call.
 // A SecureWorkUnitFactory allows the security params to be supplied once to the factory rather than being supplied to each call.
@@ -6700,7 +6700,7 @@ void CLocalWorkUnit::cleanupAndDelete(bool deldll, bool deleteOwned, const Strin
                 }
                 }
             }
             }
         }
         }
-        factory->clearAborting(queryWuid());
+        globalFactory->clearAborting(queryWuid());
         deleteTempFiles(NULL, deleteOwned, true); // all, any remaining.
         deleteTempFiles(NULL, deleteOwned, true); // all, any remaining.
     }
     }
     catch(IException *E)
     catch(IException *E)
@@ -7209,8 +7209,8 @@ void CLocalWorkUnit::setState(WUState value)
 {
 {
     if (value==WUStateAborted || value==WUStatePaused || value==WUStateCompleted || value==WUStateFailed || value==WUStateSubmitted || value==WUStateWait)
     if (value==WUStateAborted || value==WUStatePaused || value==WUStateCompleted || value==WUStateFailed || value==WUStateSubmitted || value==WUStateWait)
     {
     {
-        if (factory)
-            factory->clearAborting(queryWuid());
+        if (globalFactory)
+            globalFactory->clearAborting(queryWuid());
     }
     }
     CriticalBlock block(crit);
     CriticalBlock block(crit);
     setEnum(p, "@state", value, states);  // For historical reasons, we use state to store the state
     setEnum(p, "@state", value, states);  // For historical reasons, we use state to store the state
@@ -8943,7 +8943,7 @@ IConstWUResult* CLocalWorkUnit::getGlobalByName(const char *qname) const
     if (strcmp(p->queryName(), GLOBAL_WORKUNIT)==0)
     if (strcmp(p->queryName(), GLOBAL_WORKUNIT)==0)
         return getVariableByName(qname);
         return getVariableByName(qname);
 
 
-    Owned <IWorkUnit> global = factory->getGlobalWorkUnit(secMgr, secUser);
+    Owned <IWorkUnit> global = globalFactory->getGlobalWorkUnit(secMgr, secUser);
     return global->getVariableByName(qname);
     return global->getVariableByName(qname);
 }
 }
 
 
@@ -8953,7 +8953,7 @@ IWUResult* CLocalWorkUnit::updateGlobalByName(const char *qname)
     if (strcmp(p->queryName(), GLOBAL_WORKUNIT)==0)
     if (strcmp(p->queryName(), GLOBAL_WORKUNIT)==0)
         return updateVariableByName(qname);
         return updateVariableByName(qname);
 
 
-    Owned <IWorkUnit> global = factory->getGlobalWorkUnit(secMgr, secUser);
+    Owned <IWorkUnit> global = globalFactory->getGlobalWorkUnit(secMgr, secUser);
     return global->updateVariableByName(qname);
     return global->updateVariableByName(qname);
 }
 }
 
 
@@ -11779,13 +11779,12 @@ extern WORKUNIT_API void submitWorkUnit(const char *wuid, const char *username,
     MemoryBuffer buffer;
     MemoryBuffer buffer;
     Owned<INamedQueueConnection> conn = createNamedQueueConnection(0); // MORE - security token?
     Owned<INamedQueueConnection> conn = createNamedQueueConnection(0); // MORE - security token?
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-    Owned<IWorkUnit> workunit = factory->updateWorkUnit(wuid);
-    assertex(workunit);
-    StringAttr clusterName(workunit->queryClusterName());
+    Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
+    assertex(cw);
+    StringAttr clusterName(cw->queryClusterName());
+    cw.clear();
     if (!clusterName.length())
     if (!clusterName.length())
         throw MakeStringException(WUERR_InvalidCluster, "No target cluster specified");
         throw MakeStringException(WUERR_InvalidCluster, "No target cluster specified");
-    workunit->commit();
-    workunit.clear();
 
 
     StringBuffer serverQueue;
     StringBuffer serverQueue;
     getClusterEclCCServerQueueName(serverQueue, clusterName);
     getClusterEclCCServerQueueName(serverQueue, clusterName);
@@ -11816,7 +11815,7 @@ extern WORKUNIT_API void secAbortWorkUnit(const char *wuid, ISecManager &secmgr,
         return;
         return;
 
 
     abortWorkUnit(wuid);
     abortWorkUnit(wuid);
-    Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
+    Owned<IConstWorkUnit> cw = globalFactory->openWorkUnit(wuid);
     if(!cw)
     if(!cw)
         return;
         return;
 
 
@@ -12321,7 +12320,7 @@ void testWorkflow()
 
 
 extern WUState waitForWorkUnitToComplete(const char * wuid, int timeout, std::list<WUState> expectedStates)
 extern WUState waitForWorkUnitToComplete(const char * wuid, int timeout, std::list<WUState> expectedStates)
 {
 {
-    return factory->waitForWorkUnit(wuid, (unsigned) timeout, false, expectedStates);
+    return globalFactory->waitForWorkUnit(wuid, (unsigned) timeout, false, expectedStates);
 }
 }
 
 
 extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout, std::list<WUState> expectedStates)
 extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISecManager &secmgr, ISecUser &secuser, int timeout, std::list<WUState> expectedStates)
@@ -12333,7 +12332,7 @@ extern WORKUNIT_API WUState secWaitForWorkUnitToComplete(const char * wuid, ISec
 
 
 extern bool waitForWorkUnitToCompile(const char * wuid, int timeout)
 extern bool waitForWorkUnitToCompile(const char * wuid, int timeout)
 {
 {
-    switch(factory->waitForWorkUnit(wuid, (unsigned) timeout, true, { WUStateWait }))
+    switch(globalFactory->waitForWorkUnit(wuid, (unsigned) timeout, true, { WUStateWait }))
     {
     {
     case WUStateCompiled:
     case WUStateCompiled:
     case WUStateCompleted:
     case WUStateCompleted:
@@ -12356,7 +12355,7 @@ extern WORKUNIT_API bool secDebugWorkunit(const char * wuid, ISecManager &secmgr
 {
 {
     if (strnicmp(command, "<debug:", 7) == 0 && checkWuSecAccess(wuid, &secmgr, &secuser, SecAccess_Read, "Debug", false, true))
     if (strnicmp(command, "<debug:", 7) == 0 && checkWuSecAccess(wuid, &secmgr, &secuser, SecAccess_Read, "Debug", false, true))
     {
     {
-        Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, &secmgr, &secuser);
+        Owned<IConstWorkUnit> wu = globalFactory->openWorkUnit(wuid, &secmgr, &secuser);
         SCMStringBuffer ip;
         SCMStringBuffer ip;
         unsigned port;
         unsigned port;
         try
         try