Procházet zdrojové kódy

Merge pull request #2505 from jakesmith/thorlog

gh-2476 - resolve some thor log issues

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 13 roky
rodič
revize
e307d03b37

+ 2 - 2
initfiles/componentfiles/thor/start_slave

@@ -69,8 +69,8 @@ ln -s -f `which $prog` $slaveproc
 
 echo "slave starting `date`"
 
-echo $instancedir/$slaveproc master=$master slave=.:$slaveport logDir=$logpth
-$instancedir/$slaveproc master=$master slave=.:$slaveport logDir=$logpth 2>/dev/null 1>/dev/null &
+echo $instancedir/$slaveproc master=$master slave=.:$slaveport slavenum=$slavenum logDir=$logpth
+$instancedir/$slaveproc master=$master slave=.:$slaveport slavenum=$slavenum logDir=$logpth 2>/dev/null 1>/dev/null &
 slavepid=$!
 echo $slavepid > $PID_NAME
 if [ "$slavepid" -eq "0" ]; then

+ 5 - 5
thorlcr/activities/filter/thfilterslave.cpp

@@ -309,11 +309,11 @@ public:
 
         if (groupStream)
         {
-			loop
-			{
-				OwnedConstThorRow row = groupStream->nextRow();
-				if (!row)
-					break;
+            loop
+            {
+                OwnedConstThorRow row = groupStream->nextRow();
+                if (!row)
+                break;
                 if (stepCompare->docompare(row, seek, numFields) >= 0)
                 {
                     dataLinkIncrement();

+ 1 - 1
thorlcr/activities/rollup/throllupslave.cpp

@@ -141,7 +141,7 @@ public:
         }
         return NULL;
     }
-	virtual void stop() { }
+    virtual void stop() { }
 };
 
 class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput

+ 13 - 8
thorlcr/master/thgraphmanager.cpp

@@ -46,7 +46,7 @@
 #include "thdemonserver.hpp"
 #include "thgraphmanager.hpp"
 
-class CJobManager: public CSimpleInterface, implements IJobManager, implements IExceptionHandler
+class CJobManager : public CSimpleInterface, implements IJobManager, implements IExceptionHandler
 {
     bool stopped, handlingConversation;
     Owned<IConversation> conversation;
@@ -59,6 +59,7 @@ class CJobManager: public CSimpleInterface, implements IJobManager, implements I
     Owned<IDeMonServer> demonServer;
     atomic_t            activeTasks;
     StringAttr          currentWuid;
+    ILogMsgHandler *logHandler;
 
     bool executeGraph(IConstWorkUnit &workunit, const char *graphName, const SocketEndpoint &agentEp);
     void addJob(CJobMaster &job) { CriticalBlock b(jobCrit); jobs.append(job); }
@@ -67,7 +68,7 @@ class CJobManager: public CSimpleInterface, implements IJobManager, implements I
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CJobManager();
+    CJobManager(ILogMsgHandler *logHandler);
     ~CJobManager();
 
     bool doit(IConstWorkUnit *workunit, const char *graphName, const SocketEndpoint &agentep);
@@ -89,7 +90,7 @@ public:
 
 // CJobManager impl.
 
-CJobManager::CJobManager()
+CJobManager::CJobManager(ILogMsgHandler *_logHandler) : logHandler(_logHandler)
 {
     stopped = handlingConversation = false;
     addThreadExceptionHandler(this);
@@ -643,7 +644,10 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         Owned<IWorkUnit> wu = &workunit.lock();
         wu->setTracingValue("ThorBuild", BUILD_TAG);
         // expect there to be 1 or 2 of these, so scan/check if log exists already and add if not
-        const char *nLog = globals->queryProp("@logURL");
+        StringBuffer log, logUrl;
+        logHandler->getLogName(log);
+        createUNCFilename(log, logUrl, false);
+        const char *nLog = logUrl.str();
         Owned<IStringIterator> siter = &wu->getDebugValues("ThorLog*");
         unsigned last=0;
         bool found=false;
@@ -718,7 +722,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     SCMStringBuffer user, eclstr;
     workunit.getUser(user);
 
-    LOG(MCdebugProgress, thorJob, "Started wuid=%s, user=%s, graph=%s\n**", wuid.str(), user.str(), graphName);
+    PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.str(), user.str(), graphName);
 
     PROGLOG("Query %s loaded", compoundPath.str());
     Owned<IConstWUGraph> graph = workunit.getGraph(graphName);
@@ -767,7 +771,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         throw;
     }
     job.clear();
-    LOG(MCdebugProgress, thorJob, "Finished wuid=%s", wuid.str());
+    PROGLOG("Finished wuid=%s, graph=%s", wuid.str(), graphName);
+
     setWuid(NULL);
     return allDone;
 }
@@ -852,7 +857,7 @@ void closeThorServerStatus()
     }
 }
 
-void thorMain()
+void thorMain(ILogMsgHandler *logHandler)
 {
     aborting = 0;
     unsigned multiThorMemoryThreshold = globals->getPropInt("@multiThorMemoryThreshold")*0x100000;
@@ -876,7 +881,7 @@ void thorMain()
         CThorResourceMaster masterResource;
         setIThorResource(masterResource);
 
-        Owned<CJobManager> jobManager = new CJobManager();
+        Owned<CJobManager> jobManager = new CJobManager(logHandler);
         try {
             LOG(MCdebugProgress, thorJob, "Listening for graph");
             jobManager->run();

+ 1 - 1
thorlcr/master/thgraphmanager.hpp

@@ -24,7 +24,7 @@ interface IException;
 CSDSServerStatus &queryServerStatus();
 CSDSServerStatus &openThorServerStatus();
 void closeThorServerStatus();
-void thorMain();
+void thorMain(ILogMsgHandler *logHandler);
 void abortThor(IException *e=NULL, bool abortCurrentJob=true);
 void setExitCode(int code);
 int queryExitCode();

+ 5 - 3
thorlcr/master/thmastermain.cpp

@@ -486,13 +486,15 @@ int main( int argc, char *argv[]  )
     StringBuffer nodeGroup, logUrl;
     Owned<IPerfMonHook> perfmonhook;
 
-    try {
+    ILogMsgHandler *logHandler;
+    try
+    {
         {
             Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals, "thor");
             lf->setName("thormaster");//override default filename
             lf->setCreateAliasFile(false);
             lf->setMsgFields(MSGFIELD_timeDate | MSGFIELD_msgID | MSGFIELD_process | MSGFIELD_thread | MSGFIELD_code);
-            lf->beginLogging();
+            logHandler = lf->beginLogging();
             createUNCFilename(lf->queryLogFileSpec(), logUrl, false);
         }
         LOG(MCdebugProgress, thorJob, "Opened log file %s", logUrl.toCharArray());
@@ -738,7 +740,7 @@ int main( int argc, char *argv[]  )
             LOG(daliAuditLogCat, ",Progress,Thor,Startup,%s,%s,%s,%s",nodeGroup.str(),thorname,queueName.str(),logUrl.str());
             auditStartLogged = true;
 
-            thorMain();
+            thorMain(logHandler);
             LOG(daliAuditLogCat, ",Progress,Thor,Terminate,%s,%s,%s",thorname,nodeGroup.str(),queueName.str());
         }
         else

+ 12 - 2
thorlcr/slave/slavmain.cpp

@@ -229,8 +229,14 @@ public:
                         }
                         else
                             soPath.append(remoteSoPath);
-                        PROGLOG("Using query: %s", soPath.str());
+
                         Owned<IPropertyTree> workUnitInfo = createPTree(msg);
+                        StringBuffer user;
+                        workUnitInfo->getProp("user", user);
+
+                        PROGLOG("Started wuid=%s, user=%s, graph=%s\n", wuid.get(), user.str(), graphName.get());
+
+                        PROGLOG("Using query: %s", soPath.str());
                         Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, soPath.str(), mptag, slaveMsgTag);
                         jobs.replace(*LINK(job));
 
@@ -245,11 +251,15 @@ public:
                     {
                         StringAttr key;
                         msg.read(key);
-                        PROGLOG("QueryDone, removing %s from jobs", key.get());
                         CJobSlave *job = jobs.find(key.get());
+                        StringAttr wuid = job->queryWuid();
+                        StringAttr graphName = job->queryGraphName();
+
+                        PROGLOG("QueryDone, removing %s from jobs", key.get());
                         jobs.removeExact(job);
                         PROGLOG("QueryDone, removed %s from jobs", key.get());
 
+                        PROGLOG("Finished wuid=%s, graph=%s", wuid.get(), graphName.get());
                         msg.clear();
                         msg.append(false);
                         break;

+ 18 - 8
thorlcr/slave/thslavemain.cpp

@@ -67,6 +67,7 @@ USE_JLIB_ALLOC_HOOK;
 #endif
 
 static SocketEndpoint slfEp;
+static unsigned mySlaveNum;
 
 static char **cmdArgs;
 void mergeCmdParams(IPropertyTree *props)
@@ -113,11 +114,18 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
         setClusterGroup(group);
 
         SocketEndpoint myEp = queryMyNode()->endpoint();
-        if (RANK_NULL == group->rank(queryMyNode()))
+        rank_t groupPos = group->rank(queryMyNode());
+        if (RANK_NULL == groupPos)
         {
             replyError("Node not part of thorgroup");
             return false;
         }
+        if (globals->hasProp("@SLAVENUM") && (mySlaveNum != (unsigned)groupPos))
+        {
+            VStringBuffer errStr("Slave group rank[%d] does not match provided cmd line slaveNum[%d]", mySlaveNum, (unsigned)groupPos);
+            replyError(errStr.str());
+            return false;
+        }
         globals->Release();
         globals = createPTree(msg);
         mergeCmdParams(globals); // cmd line 
@@ -207,13 +215,10 @@ public:
 
 void startSlaveLog()
 {
-    StringBuffer fileName("thorslave.");
-    SocketEndpoint ep;
-    ep.setLocalHost(0);
-    ep.getUrlStr(fileName);
-    fileName.append("_").append(getMachinePortBase());
-
+    StringBuffer fileName("thorslave");
     Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(globals->queryProp("@logDir"), "thor");
+    StringBuffer slaveNumStr;
+    lf->setPostfix(slaveNumStr.append(mySlaveNum).str());
     lf->setCreateAliasFile(false);
     lf->setName(fileName.str());//override default filename
     lf->setMsgFields(MSGFIELD_timeDate | MSGFIELD_msgID | MSGFIELD_process | MSGFIELD_thread | MSGFIELD_code);
@@ -280,7 +285,12 @@ int main( int argc, char *argv[]  )
             localHostToNIC(slfEp);
         }
         else 
-            slfEp.setLocalHost(0); 
+            slfEp.setLocalHost(0);
+
+        if (globals->hasProp("@SLAVENUM"))
+            mySlaveNum = atoi(globals->queryProp("@SLAVENUM"));
+        else
+            mySlaveNum = slfEp.port; // shouldn't happen, provided by script
 
         setMachinePortBase(slfEp.port);
         slfEp.port = getMachinePortBase();

+ 1 - 1
thorlcr/thorutil/thmem.cpp

@@ -573,7 +573,7 @@ void CThorExpandingRowArray::transferFrom(CThorExpandingRowArray &donor)
 
 void CThorExpandingRowArray::transferFrom(CThorSpillableRowArray &donor)
 {
-	transferFrom((CThorExpandingRowArray &)donor);
+    transferFrom((CThorExpandingRowArray &)donor);
 }
 
 void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)