Parcourir la source

HPCC-26487 Read query dlls directly in cloud

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith il y a 3 ans
Parent
commit
08cd2ddf2b

+ 1 - 0
dali/base/dacoven.cpp

@@ -936,6 +936,7 @@ ICoven &queryCoven()
 {
     if (coven==NULL)
     {
+        PrintStackReport();
         Owned<IException> e = MakeStringException(-1, "No access to Dali - this normally means a plugin call is being called from a thorslave");
         EXCLOG(e, NULL);
         throw e.getClear();

+ 3 - 1
thorlcr/graph/thgraphmaster.cpp

@@ -1575,6 +1575,7 @@ void CJobMaster::sendQuery()
     const char *soName = queryDllEntry().queryName();
     PROGLOG("Query dll: %s", soName);
     tmp.append(soName);
+#ifndef _CONTAINERIZED
     tmp.append(sendSo);
     if (sendSo)
     {
@@ -1586,6 +1587,8 @@ void CJobMaster::sendQuery()
         read(iFileIO, 0, sz, tmp);
         PROGLOG("Loading query for serialization to slaves took %d ms", atimer.elapsed());
     }
+    queryJobManager().addCachedSo(soName);
+#endif
     Owned<IPropertyTree> deps = createPTree(queryXGMML()->queryName());
     Owned<IPropertyTreeIterator> edgeIter = queryXGMML()->getElements("edge"); // JCSMORE trim to those actually needed
     ForEach (*edgeIter)
@@ -1604,7 +1607,6 @@ void CJobMaster::sendQuery()
     CTimeMon queryToSlavesTimer;
     broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "sendQuery");
     PROGLOG("Serialization of query init info (%d bytes) to slaves took %d ms", msg.length(), queryToSlavesTimer.elapsed());
-    queryJobManager().addCachedSo(soName);
     querySent = true;
 }
 

+ 2 - 0
thorlcr/graph/thgraphmaster.hpp

@@ -48,7 +48,9 @@ interface IJobManager : extends IInterface
     virtual void setWuid(const char *wuid, const char *cluster=NULL) = 0;
     virtual IDeMonServer *queryDeMonServer() = 0;
     virtual void fatal(IException *e) = 0;
+#ifndef _CONTAINERIZED
     virtual void addCachedSo(const char *name) = 0;
+#endif
     virtual void updateWorkUnitLog(IWorkUnit &workunit) = 0;
 };
 

+ 20 - 5
thorlcr/master/thgraphmanager.cpp

@@ -55,7 +55,9 @@ class CJobManager : public CSimpleInterface, implements IJobManager, implements
     Owned<IConversation> conversation;
     StringAttr queueName;
     CriticalSection replyCrit, jobCrit;
+#ifndef _CONTAINERIZED
     CFifoFileCache querySoCache;
+#endif
     Owned<IJobQueue> jobq;
     ICopyArrayOf<CJobMaster> jobs;
     Owned<IException> exitException;
@@ -248,7 +250,9 @@ public:
     virtual void setWuid(const char *wuid, const char *cluster=NULL);
     virtual IDeMonServer *queryDeMonServer() { return demonServer; }
     virtual void fatal(IException *e);
+#ifndef _CONTAINERIZED
     virtual void addCachedSo(const char *name);
+#endif
     virtual void updateWorkUnitLog(IWorkUnit &workunit);
 };
 
@@ -477,6 +481,7 @@ void CJobManager::run()
     LOG(MCdebugProgress, thorJob, "Listening for graph");
 
     setWuid(NULL);
+#ifndef _CONTAINERIZED
     StringBuffer soPath;
     globals->getProp("@query_so_dir", soPath);
     StringBuffer soPattern("*.");
@@ -487,7 +492,6 @@ void CJobManager::run()
 #endif
     querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
 
-#ifndef _CONTAINERIZED
     SCMStringBuffer _queueNames;
     const char *thorName = globals->queryProp("@name");
     if (!thorName) thorName = "thor";
@@ -930,16 +934,24 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     Owned<IConstWUQuery> query = workunit.getQuery();
     SCMStringBuffer soName;
     query->getQueryDllName(soName);
+#ifndef _CONTAINERIZED
     unsigned version = query->getQueryDllCrc();
+#endif
     query.clear();
 
+    bool sendSo = false;
+    Owned<ILoadedDllEntry> querySo;
     StringBuffer soPath;
+#ifdef _CONTAINERIZED
+    PROGLOG("Loading query name: %s", soName.str());
+    querySo.setown(queryDllServer().loadDll(soName.str(), DllLocationLocal));
+    soPath.append(querySo->queryName());
+#else
     globals->getProp("@query_so_dir", soPath);
     StringBuffer compoundPath;
     compoundPath.append(soPath.str());
     soPath.append(soName.str());
     getCompoundQueryName(compoundPath, soName.str(), version);
-    bool sendSo = false;
     if (querySoCache.isAvailable(compoundPath.str()))
         PROGLOG("Using existing local dll: %s", compoundPath.str()); // It is assumed if present here then _still_ present on slaves from previous send.
     else
@@ -962,15 +974,16 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         }
         sendSo = globals->getPropBool("Debug/@dllsToSlaves", true);
     }
-
-    Owned<ILoadedDllEntry> querySo = createDllEntry(compoundPath.str(), false, NULL, false);
+    querySo.setown(createDllEntry(compoundPath.str(), false, NULL, false));
+    soPath.swapWith(compoundPath);
+#endif
 
     SCMStringBuffer eclstr;
     StringAttr user(workunit.queryUser());
 
     PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.str(), user.str(), graphName);
 
-    PROGLOG("Query %s loaded", compoundPath.str());
+    PROGLOG("Query %s loaded", soPath.str());
     Owned<CJobMaster> job = createThorGraph(graphName, workunit, querySo, sendSo, agentEp);
     unsigned wfid = job->getWfid();
     StringBuffer graphScope;
@@ -1038,10 +1051,12 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     return allDone;
 }
 
+#ifndef _CONTAINERIZED
 void CJobManager::addCachedSo(const char *name)
 {
     querySoCache.add(name);
 }
+#endif
 
 static int exitCode = -1;
 void setExitCode(int code) { exitCode = code; }

+ 31 - 29
thorlcr/master/thmastermain.cpp

@@ -66,7 +66,9 @@
 #include "thexception.hpp"
 #include "thmem.hpp"
 
+#ifndef _CONTAINERIED
 #define DEFAULT_QUERY_SO_DIR "sodir"
+#endif
 #define MAX_SLAVEREG_DELAY 60*1000*15 // 15 mins
 #define SLAVEREG_VERIFY_DELAY 5*1000
 #define SHUTDOWN_IN_PARALLEL 20
@@ -836,6 +838,16 @@ int main( int argc, const char *argv[]  )
         PROGLOG("Global memory size = %d MB", mmemSize);
         roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)mmemSize) * 0x100000, 0, thorAllocSizes, NULL);
 
+        char thorPath[1024];
+        if (!GetCurrentDirectory(1024, thorPath))
+        {
+            OERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
+            thorPath[0] = 0;
+        }
+        unsigned l = strlen(thorPath);
+        if (l) { thorPath[l] = PATHSEPCHAR; thorPath[l+1] = '\0'; }
+        globals->setProp("@thorPath", thorPath);
+
 #ifndef _CONTAINERIZED
         const char * overrideBaseDirectory = globals->queryProp("@thorDataDirectory");
         const char * overrideReplicateDirectory = globals->queryProp("@thorReplicateDirectory");
@@ -849,6 +861,25 @@ int main( int argc, const char *argv[]  )
             setBaseDirectory(overrideBaseDirectory, false);
         if (overrideReplicateDirectory&&*overrideBaseDirectory)
             setBaseDirectory(overrideReplicateDirectory, true);
+
+        StringBuffer soDir, soPath;
+        if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),soDir))
+            globals->setProp("@query_so_dir", soDir.str());
+        else if (!globals->getProp("@query_so_dir", soDir)) {
+            globals->setProp("@query_so_dir", DEFAULT_QUERY_SO_DIR); 
+            soDir.append(DEFAULT_QUERY_SO_DIR);
+        }
+        if (isAbsolutePath(soDir.str()))
+            soPath.append(soDir);
+        else
+        {
+            soPath.append(thorPath);
+            addPathSepChar(soPath);
+            soPath.append(soDir);
+        }
+        addPathSepChar(soPath);
+        globals->setProp("@query_so_dir", soPath.str());
+        recursiveCreateDirectory(soPath.str());
 #endif
 
         StringBuffer tempDirStr;
@@ -870,35 +901,6 @@ int main( int argc, const char *argv[]  )
         SetTempDir(0, tempDirStr.str(), tempPrefix.str(), true);
         DBGLOG("Temp directory: %s", queryTempDir());
 
-        char thorPath[1024];
-        if (!GetCurrentDirectory(1024, thorPath))
-        {
-            OERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
-            thorPath[0] = 0;
-        }
-        unsigned l = strlen(thorPath);
-        if (l) { thorPath[l] = PATHSEPCHAR; thorPath[l+1] = '\0'; }
-        globals->setProp("@thorPath", thorPath);
-
-        StringBuffer soDir, soPath;
-        if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),soDir))
-            globals->setProp("@query_so_dir", soDir.str());
-        else if (!globals->getProp("@query_so_dir", soDir)) {
-            globals->setProp("@query_so_dir", DEFAULT_QUERY_SO_DIR); 
-            soDir.append(DEFAULT_QUERY_SO_DIR);
-        }
-        if (isAbsolutePath(soDir.str()))
-            soPath.append(soDir);
-        else
-        {
-            soPath.append(thorPath);
-            addPathSepChar(soPath);
-            soPath.append(soDir);
-        }
-        addPathSepChar(soPath);
-        globals->setProp("@query_so_dir", soPath.str());
-        recursiveCreateDirectory(soPath.str());
-
         startLogMsgParentReceiver();    
         connectLogMsgManagerToDali();
         if (globals->getPropBool("@cache_dafilesrv_master",false))

+ 18 - 3
thorlcr/slave/slavmain.cpp

@@ -1601,7 +1601,9 @@ class CJobListener : public CSimpleInterface
     bool &stopped;
     CriticalSection crit;
     OwningStringSuperHashTableOf<CJobSlave> jobs;
+#ifndef _CONTAINERIZED
     CFifoFileCache querySoCache; // used to mirror master cache
+#endif
     IArrayOf<IMPServer> mpServers;
     unsigned channelsPerSlave;
 
@@ -1748,6 +1750,7 @@ public:
                 verifyThreads.append(*new CVerifyThread(*this, c));
         }
 
+#ifndef _CONTAINERIZED
         StringBuffer soPath;
         globals->getProp("@query_so_dir", soPath);
         StringBuffer soPattern("*.");
@@ -1758,6 +1761,7 @@ public:
 #endif
         if (globals->getPropBool("Debug/@dllsToSlaves",true))
             querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
+#endif
         Owned<ISlaveWatchdog> watchdog;
         if (globals->getPropBool("@watchdogEnabled"))
             watchdog.setown(createProgressHandler(globals->getPropBool("@useUDPWatchdog")));
@@ -1782,10 +1786,20 @@ public:
                         mptag_t slaveMsgTag;
                         deserializeMPtag(msg, slaveMsgTag);
                         queryNodeComm().flush(slaveMsgTag);
-                        StringBuffer soPath, soPathTail;
-                        StringAttr wuid, graphName, remoteSoPath;
+                        StringAttr wuid, graphName;
+                        StringBuffer soPath;
                         msg.read(wuid);
                         msg.read(graphName);
+
+                        Owned<ILoadedDllEntry> querySo;
+#ifdef _CONTAINERIZED
+                        StringAttr soName;
+                        msg.read(soName);
+                        querySo.setown(createDllEntry(soName.str(), false, NULL, false));
+                        soPath.append(soName);
+#else
+                        StringBuffer soPathTail;
+                        StringAttr remoteSoPath;
                         msg.read(remoteSoPath);
                         bool sendSo;
                         msg.read(sendSo);
@@ -1857,7 +1871,8 @@ public:
                             soPath.clear().append(tempSo.str());
                         }
 #endif
-                        Owned<ILoadedDllEntry> querySo = createDllEntry(soPath.str(), false, NULL, false);
+                        querySo.setown(createDllEntry(soPath.str(), false, NULL, false));
+#endif
 
                         Owned<IPropertyTree> workUnitInfo = createPTree(msg);
                         StringBuffer user;

+ 21 - 20
thorlcr/slave/thslavemain.cpp

@@ -460,6 +460,27 @@ int main( int argc, const char *argv[]  )
                 setBaseDirectory(overrideBaseDirectory, false);
             if (!isEmptyString(overrideReplicateDirectory))
                 setBaseDirectory(overrideReplicateDirectory, true);
+
+            if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),str.clear()))
+                globals->setProp("@query_so_dir", str.str());
+            else
+                globals->getProp("@query_so_dir", str.clear());
+            if (str.length())
+            {
+                if (globals->getPropBool("Debug/@dllsToSlaves", true))
+                {
+                    StringBuffer uniqSoPath;
+                    if (PATHSEPCHAR == str.charAt(str.length()-1))
+                        uniqSoPath.append(str.length()-1, str.str());
+                    else
+                        uniqSoPath.append(str);
+                    uniqSoPath.append("_").append(getMachinePortBase());
+                    str.swapWith(uniqSoPath);
+                    globals->setProp("@query_so_dir", str.str());
+                }
+                PROGLOG("Using querySo directory: %s", str.str());
+                recursiveCreateDirectory(str.str());
+            }
 #endif
 
             StringBuffer tempDirStr;
@@ -488,26 +509,6 @@ int main( int argc, const char *argv[]  )
                 LOG(MCdebugProgress, thorJob, "%d%% disk free\n",pc);
             }
 #endif
-            if (getConfigurationDirectory(globals->queryPropTree("Directories"),"query","thor",globals->queryProp("@name"),str.clear()))
-                globals->setProp("@query_so_dir", str.str());
-            else
-                globals->getProp("@query_so_dir", str.clear());
-            if (str.length())
-            {
-                if (globals->getPropBool("Debug/@dllsToSlaves", true))
-                {
-                    StringBuffer uniqSoPath;
-                    if (PATHSEPCHAR == str.charAt(str.length()-1))
-                        uniqSoPath.append(str.length()-1, str.str());
-                    else
-                        uniqSoPath.append(str);
-                    uniqSoPath.append("_").append(getMachinePortBase());
-                    str.swapWith(uniqSoPath);
-                    globals->setProp("@query_so_dir", str.str());
-                }
-                PROGLOG("Using querySo directory: %s", str.str());
-                recursiveCreateDirectory(str.str());
-            }
      
             multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
             if (multiThorMemoryThreshold) {