Browse Source

Merge pull request #8312 from jakesmith/hpcc-15110

HPCC-15110 Load Graph XGMML from query dll instead of Dali

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 năm trước cách đây
mục cha
commit
1f00052e62

+ 9 - 1
thorlcr/graph/thgraph.cpp

@@ -2467,7 +2467,7 @@ public:
 
 ////
 
-CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
+CJobBase::CJobBase(ILoadedDllEntry *_querySo, const char *_graphName) : querySo(_querySo), graphName(_graphName)
 {
     maxDiskUsage = diskUsage = 0;
     dirty = true;
@@ -2488,6 +2488,14 @@ CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
     jobSlaveChannelNum.allocateN(querySlaves()); // filled when channels are added.
     for (unsigned s=0; s<querySlaves(); s++)
         jobSlaveChannelNum[s] = NotFound;
+    StringBuffer wuXML;
+    if (!getEmbeddedWorkUnitXML(querySo, wuXML))
+        throw MakeStringException(0, "Failed to locate workunit info in query : %s", querySo->queryName());
+    Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(wuXML);
+    Owned<IConstWUGraph> graph = localWU->getGraph(graphName);
+    graphXGMML.setown(graph->getXGMMLTree(false));
+    if (!graphXGMML)
+    	throwUnexpected();
 }
 
 void CJobBase::init()

+ 4 - 3
thorlcr/graph/thgraph.hpp

@@ -743,7 +743,7 @@ class graph_decl CJobBase : public CInterface, implements IDiskUsage, implements
 {
 protected:
     CriticalSection crit;
-    Owned<ILoadedDllEntry> querySo;
+    Linked<ILoadedDllEntry> querySo;
     IUserDescriptor *userDesc;
     offset_t maxDiskUsage, diskUsage;
     StringAttr key, graphName;
@@ -770,7 +770,7 @@ protected:
     bool usePackedAllocator;
     unsigned memorySpillAt;
     rank_t myNodeRank;
-
+    Owned<IPropertyTree> graphXGMML;
 
     class CThorPluginCtx : public SimplePluginCtx
     {
@@ -800,7 +800,7 @@ protected:
 public:
     IMPLEMENT_IINTERFACE;
 
-    CJobBase(const char *graphName);
+    CJobBase(ILoadedDllEntry *querySo, const char *graphName);
     virtual void beforeDispose();
     ~CJobBase();
 
@@ -815,6 +815,7 @@ public:
     void init();
     void setXGMML(IPropertyTree *_xgmml) { xgmml.set(_xgmml); }
     IPropertyTree *queryXGMML() { return xgmml; }
+    IPropertyTree *queryGraphXGMML() const { return graphXGMML; }
     bool queryAborted() const { return aborted; }
     const char *queryKey() const { return key; }
     const char *queryGraphName() const { return graphName; }

+ 8 - 8
thorlcr/graph/thgraphmaster.cpp

@@ -1274,8 +1274,8 @@ void loadPlugin(SafePluginMap *pluginMap, const char *_path, const char *name)
     pluginMap->addPlugin(path.str(), name);
 }
 
-CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, const char *_querySo, bool _sendSo, const SocketEndpoint &_agentEp)
-    : CJobBase(graphName), workunit(&_workunit), sendSo(_sendSo), agentEp(_agentEp)
+CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoadedDllEntry *querySo, bool _sendSo, const SocketEndpoint &_agentEp)
+    : CJobBase(querySo, graphName), workunit(&_workunit), sendSo(_sendSo), agentEp(_agentEp)
 {
     SCMStringBuffer _token, _scope;
     workunit->getScope(_scope);
@@ -1304,13 +1304,13 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, const c
         plugin.getPluginName(name);
         loadPlugin(pluginMap, pluginsDir.str(), name.str());
     }
-    querySo.setown(createDllEntry(_querySo, false, NULL));
     Owned<IMPServer> mpServer = getMPServer();
     addChannel(mpServer);
     mpJobTag = allocateMPTag();
     slavemptag = allocateMPTag();
     slaveMsgHandler = new CSlaveMessageHandler(*this, slavemptag);
     tmpHandler.setown(createTempHandler(true));
+    xgmml.set(graphXGMML);
 }
 
 CJobMaster::~CJobMaster()
@@ -2465,7 +2465,7 @@ void CMasterGraph::sendGraph()
     CMessageBuffer msg;
     msg.append(GraphInit);
     msg.append(job.queryKey());
-    node->serialize(msg); // everything
+    msg.append(queryGraphId());
     if (TAG_NULL == executeReplyTag)
         executeReplyTag = jobM->allocateMPTag();
     serializeMPtag(msg, executeReplyTag);
@@ -2928,14 +2928,14 @@ void ProgressInfo::getStats(IStatisticGatherer & stats)
 
 ///////////////////////////////////////////////////
 
-CJobMaster *createThorGraph(const char *graphName, IPropertyTree *xgmml, IConstWorkUnit &workunit, const char *querySo, bool sendSo, const SocketEndpoint &agentEp)
+CJobMaster *createThorGraph(const char *graphName, IConstWorkUnit &workunit, ILoadedDllEntry *querySo, bool sendSo, const SocketEndpoint &agentEp)
 {
     Owned<CJobMaster> jobMaster = new CJobMaster(workunit, graphName, querySo, sendSo, agentEp);
-    jobMaster->setXGMML(xgmml);
-    Owned<IPropertyTreeIterator> iter = xgmml->getElements("node");
+    IPropertyTree *graphXGMML = jobMaster->queryGraphXGMML();
+    Owned<IPropertyTreeIterator> iter = graphXGMML->getElements("node");
     ForEach(*iter)
         jobMaster->addSubGraph(iter->query());
-    jobMaster->addDependencies(xgmml);
+    jobMaster->addDependencies(graphXGMML);
     return LINK(jobMaster);
 }
 

+ 1 - 1
thorlcr/graph/thgraphmaster.hpp

@@ -57,7 +57,7 @@ interface IJobManager : extends IInterface
 };
 
 interface ILoadedDllEntry;
-extern graphmaster_decl CJobMaster *createThorGraph(const char *graphName, IPropertyTree *xgmml, IConstWorkUnit &workunit, const char *querySo, bool sendSo, const SocketEndpoint &agentEp);
+extern graphmaster_decl CJobMaster *createThorGraph(const char *graphName, IConstWorkUnit &workunit, ILoadedDllEntry *querySo, bool sendSo, const SocketEndpoint &agentEp);
 extern graphmaster_decl void setJobManager(IJobManager *jobManager);
 extern graphmaster_decl IJobManager *getJobManager();
 extern graphmaster_decl IJobManager &queryJobManager();

+ 1 - 1
thorlcr/graph/thgraphmaster.ipp

@@ -118,7 +118,7 @@ class graphmaster_decl CJobMaster : public CJobBase
 public:
     IMPLEMENT_IINTERFACE;
 
-    CJobMaster(IConstWorkUnit &workunit, const char *_graphName, const char *querySo, bool _sendSo, const SocketEndpoint &_agentEp);
+    CJobMaster(IConstWorkUnit &workunit, const char *_graphName, ILoadedDllEntry *querySo, bool _sendSo, const SocketEndpoint &_agentEp);
     ~CJobMaster();
 
     virtual void addChannel(IMPServer *mpServer);

+ 1 - 15
thorlcr/graph/thgraphslave.cpp

@@ -1090,7 +1090,7 @@ public:
 };
 
 #define SLAVEGRAPHPOOLLIMIT 10
-CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, const char *_querySo, mptag_t _mpJobTag, mptag_t _slavemptag) : CJobBase(graphName), watchdog(_watchdog)
+CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, ILoadedDllEntry *_querySo, mptag_t _mpJobTag, mptag_t _slavemptag) : CJobBase(_querySo, graphName), watchdog(_watchdog)
 {
     workUnitInfo.set(_workUnitInfo);
     workUnitInfo->getProp("token", token);
@@ -1133,20 +1133,6 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
         }
         pluginMap->loadFromList(pluginsList.str());
     }
-#ifdef __linux__
-// only relevant if dllsToSlaves=false and query_so_dir was fully qualified remote path (e.g. //<ip>/path/file
-    RemoteFilename rfn;
-    rfn.setRemotePath(_querySo);
-    StringBuffer tempSo;
-    if (!rfn.isLocal())
-    {
-        WARNLOG("Cannot load shared object directly from remote path, creating temporary local copy: %s", _querySo);
-        GetTempName(tempSo,"so",true);
-        copyFile(tempSo.str(), _querySo);
-        _querySo = tempSo.str();
-    }
-#endif
-    querySo.setown(createDllEntry(_querySo, false, NULL));
     tmpHandler.setown(createTempHandler(true));
     channelMemorySize = globalMemorySize / globals->getPropInt("@channelsPerSlave", 1);
 }

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -147,7 +147,7 @@ class graphslave_decl CJobSlave : public CJobBase
 public:
     IMPLEMENT_IINTERFACE;
 
-    CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, const char *querySo, mptag_t _mptag, mptag_t _slavemptag);
+    CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _mptag, mptag_t _slavemptag);
 
     virtual void addChannel(IMPServer *mpServer);
     virtual void startJob();

+ 3 - 5
thorlcr/master/thgraphmanager.cpp

@@ -871,18 +871,16 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         sendSo = globals->getPropBool("Debug/@dllsToSlaves", true);
     }
 
+    Owned<ILoadedDllEntry> querySo = createDllEntry(compoundPath.str(), false, NULL);
+
     SCMStringBuffer eclstr;
     StringAttr user(workunit.queryUser());
 
     PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.str(), user.str(), graphName);
 
     PROGLOG("Query %s loaded", compoundPath.str());
-    Owned<IConstWUGraph> graph = workunit.getGraph(graphName);
-    Owned<IPropertyTree> graphXGMML = graph->getXGMMLTree(false);
-    Owned<CJobMaster> job = createThorGraph(graphName, graphXGMML, workunit, compoundPath.str(), sendSo, agentEp);
+    Owned<CJobMaster> job = createThorGraph(graphName, workunit, querySo, sendSo, agentEp);
     PROGLOG("Graph %s created", graphName);
-    graphXGMML.clear();
-    graph.clear();
     PROGLOG("Running graph=%s", job->queryGraphName());
     addJob(*job);
     bool allDone = false;

+ 19 - 2
thorlcr/slave/slavmain.cpp

@@ -320,6 +320,19 @@ public:
                             else
                                 soPath.append(remoteSoPath);
                         }
+#ifdef __linux__
+                    // only relevant if dllsToSlaves=false and query_so_dir was fully qualified remote path (e.g. //<ip>/path/file
+                        rfn.setRemotePath(soPath.str());
+                        StringBuffer tempSo;
+                        if (!rfn.isLocal())
+                        {
+                            WARNLOG("Cannot load shared object directly from remote path, creating temporary local copy: %s", soPath.str());
+                            GetTempName(tempSo,"so",true);
+                            copyFile(tempSo.str(), soPath.str());
+                            soPath.clear().append(tempSo.str());
+                        }
+#endif
+                        Owned<ILoadedDllEntry> querySo = createDllEntry(soPath.str(), false, NULL);
 
                         Owned<IPropertyTree> workUnitInfo = createPTree(msg);
                         StringBuffer user;
@@ -336,7 +349,7 @@ public:
 
                         Owned<IPropertyTree> deps = createPTree(msg);
 
-                        Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, soPath.str(), mptag, slaveMsgTag);
+                        Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, querySo, mptag, slaveMsgTag);
                         job->setXGMML(deps);
                         for (unsigned sc=0; sc<channelsPerSlave; sc++)
                             job->addChannel(&mpServers.item(sc));
@@ -376,7 +389,11 @@ public:
                         if (!job)
                             throw MakeStringException(0, "Job not found: %s", jobKey.get());
 
-                        Owned<IPropertyTree> graphNode = createPTree(msg);
+                        graph_id subGraphId = 0;
+                        msg.read(subGraphId);
+
+                        VStringBuffer xpath("node[@id='%" GIDPF "u']", subGraphId);
+                        Owned<IPropertyTree> graphNode = job->queryGraphXGMML()->getPropTree(xpath.str());
                         mptag_t executeReplyTag = job->deserializeMPTag(msg);
                         size32_t len;
                         msg.read(len);