瀏覽代碼

HPCC-12226 Push kernel message info new section of workunit

Add a virtual to IPerfMonHook to allow kernel logs captured by
the monitor to be redirected.
Implement a hook method that fires the kernel messages to a
'Alert' section in the workunit.
Default to capturing kernel message of level <=3, which means
KERN_EMERG, KERN_ALERT, KERN_CRIT, KERN_ERR.

Also slightly reorganize the job start/end to common up the
monitoring between slave and master.
Also add a Origin for Thor exceptions, so can more easily see
which slave or master they came from.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 年之前
父節點
當前提交
834db2a75b

+ 2 - 0
common/fileview2/fvresultset.cpp

@@ -3275,6 +3275,8 @@ inline const char *getSeverityTagname(WUExceptionSeverity severity, unsigned fla
             return "Info";
         case ExceptionSeverityWarning:
             return "Warning";
+        case ExceptionSeverityAlert:
+            return "Alert";
         case ExceptionSeverityError:
         default:
             break;

+ 2 - 1
common/workunit/workunit.hpp

@@ -165,7 +165,8 @@ enum WUExceptionSeverity
     ExceptionSeverityInformation = 0,
     ExceptionSeverityWarning = 1,
     ExceptionSeverityError = 2,
-    ExceptionSeveritySize = 3
+    ExceptionSeverityAlert = 3,
+    ExceptionSeveritySize = 4
 };
 
 

+ 1 - 2
dali/sasha/saserver.cpp

@@ -316,7 +316,6 @@ int main(int argc, const char* argv[])
 
     bool enableSNMP = false;
 
-    Owned<IPerfMonHook> perfMonHook;
     try {
 
 
@@ -339,7 +338,7 @@ int main(int argc, const char* argv[])
                 coalesceDatastore(force);
             }
             else {
-                startPerformanceMonitor(serverConfig->getPropInt("@perfReportDelay", DEFAULT_PERF_REPORT_DELAY)*1000, PerfMonStandard, perfMonHook);
+                startPerformanceMonitor(serverConfig->getPropInt("@perfReportDelay", DEFAULT_PERF_REPORT_DELAY)*1000);
                 AddServers();
                 addAbortHandler(actionOnAbort);
 

+ 1 - 2
dali/server/daserver.cpp

@@ -371,8 +371,7 @@ int main(int argc, char* argv[])
             UseSysLogForOperatorMessages();
         AddServers(auditDir.str());
         addAbortHandler(actionOnAbort);
-        Owned<IPerfMonHook> perfMonHook;
-        startPerformanceMonitor(serverConfig->getPropInt("Coven/@perfReportDelay", DEFAULT_PERF_REPORT_DELAY)*1000, PerfMonStandard, perfMonHook);
+        startPerformanceMonitor(serverConfig->getPropInt("Coven/@perfReportDelay", DEFAULT_PERF_REPORT_DELAY)*1000);
         StringBuffer absPath;
         StringBuffer dataPath;
         serverConfig->getProp("@dataPath",dataPath);

+ 10 - 0
esp/eclwatch/ws_XSLT/batchwuid.xslt

@@ -377,6 +377,16 @@
                                 </td>
                             </tr>
                         </xsl:if>
+                        <xsl:if test="count(Exceptions/ECLException[Severity='Alert'])">
+                            <tr>
+                                <th>Alert:</th>
+                                <td>
+                                    <table>
+                                        <xsl:apply-templates select="Exceptions/ECLException[Severity='Alert']"/>
+                                    </table>
+                                </td>
+                            </tr>
+                        </xsl:if>
                     </table>
                 </form>
                 <table>

+ 2 - 1
esp/eclwatch/ws_XSLT/wuid.xslt

@@ -544,6 +544,7 @@
                   {
                     activeSections[activeSections.length] = 'Warnings';
                     activeSections[activeSections.length] = 'Info';
+                    activeSections[activeSections.length] = 'Alert';
                   }
                 }
               }
@@ -573,7 +574,7 @@
 
             function getSectionName(Section)
             {
-              if ('WarningsInfo'.indexOf(Section)>-1)
+              if ('WarningsInfoAlert'.indexOf(Section)>-1)
               {
                 return 'Exceptions';
               }

+ 24 - 0
esp/eclwatch/ws_XSLT/wuidcommon.xslt

@@ -397,6 +397,30 @@
         </p>
       </xsl:if>
       
+      <xsl:if test="AlertCount &gt; 0">
+        <p>
+          <div>
+            <div id="HdrAlert" class="wugroup">
+              <div class="WuGroupHdrLeft">
+                <A href="javascript:void(0)" onclick="toggleElement('Alert');" id="explinkalert" class="wusectionexpand">
+                  Alert: (<xsl:value-of select="AlertCount"/>)
+                </A>
+              </div>
+            </div>
+            <div id="Alert" class="wusectioncontent">
+              <xsl:if test="count(Exceptions/ECLException[Severity='Alert'])=0">
+                <span class="loading">&nbsp;&nbsp;Loading...</span>
+              </xsl:if>
+              <xsl:if test="count(Exceptions/ECLException[Severity='Alert'])">
+                <table id="AlertContent" class="wusectiontable">
+                  <xsl:apply-templates select="Exceptions/ECLException[Severity='Alert']"/>
+                </table>
+              </xsl:if>
+            </div>
+          </div>
+        </p>
+      </xsl:if>
+      
       <xsl:if test="ResultCount &gt; 0 or ResultsDesc != ''">
         <p>
           <div>

+ 2 - 1
esp/eclwatch/ws_XSLT/wuiddetails.xslt

@@ -88,7 +88,7 @@
                 if (parentSectionDiv)
                 {
                   parentSectionDiv.innerHTML = '<span class="loading">&nbsp;&nbsp;Workunit has no ' + Section + '.</span>';
-                  if (Section == 'Exceptions' || Section == 'Warnings' || Section == 'Info')
+                  if (Section == 'Exceptions' || Section == 'Warnings' || Section == 'Info' || Section == 'Alert')
                   {
                     parentSectionDiv.style.display = 'none';
                     parentSectionDiv.style.visibility = 'hidden';
@@ -115,6 +115,7 @@
                   {
                     updateSection('Warnings');
                     updateSection('Info');
+                    updateSection('Alert');
                   }
                 }
               }

+ 4 - 2
esp/scm/ws_workunits.ecm

@@ -212,6 +212,7 @@ ESPStruct [nil_remove] ECLWorkunit
     [min_ver("1.17")] int ErrorCount;
     [min_ver("1.17")] int WarningCount;
     [min_ver("1.17")] int InfoCount;
+    [min_ver("1.52")] int AlertCount;
     [min_ver("1.17")] int GraphCount;
     [min_ver("1.17")] int SourceFileCount;
     [min_ver("1.17")] int ResultCount;
@@ -516,7 +517,8 @@ ESPenum WUExceptionSeverity : string
 {
     INFO("info"),
     WARNING("warning"),
-    ERROR("error")
+    ERROR("error"),
+    ALERT("alert")
 };
 
 ESPrequest WURunRequest
@@ -1562,7 +1564,7 @@ ESPresponse [exceptions_inline] WUCheckFeaturesResponse
 };
 
 ESPservice [
-    version("1.51"), default_client_version("1.51"),
+    version("1.52"), default_client_version("1.52"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
 {
     ESPmethod [resp_xsl_default("/esp/xslt/workunits.xslt")]     WUQuery(WUQueryRequest, WUQueryResponse);

+ 3 - 0
esp/services/ecldirect/EclDirectService.cpp

@@ -61,6 +61,9 @@ EclDirectWUExceptions::EclDirectWUExceptions(IConstWorkUnit& cw)
             case ExceptionSeverityInformation:
                 e->setSeverity("Info");
                 break;
+            case ExceptionSeverityAlert:
+                e->setSeverity("Alert");
+                break;
         }
 
         errors.append(*e.getLink());

+ 3 - 1
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -141,7 +141,7 @@ void formatDuration(StringBuffer &s, unsigned ms)
 }
 
 
-WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf(0)
+WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf(0), numalert(0)
 {
     Owned<IConstWUExceptionIterator> it = &wu.getExceptions();
     ForEach(*it)
@@ -163,6 +163,7 @@ WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf
             case ExceptionSeverityError: label = "Error"; numerr++; break;
             case ExceptionSeverityWarning: label = "Warning"; numwrn++; break;
             case ExceptionSeverityInformation: label = "Info"; numinf++; break;
+            case ExceptionSeverityAlert: label = "Alert"; numalert++; break;
         }
 
         e->setSeverity(label);
@@ -307,6 +308,7 @@ void WsWuInfo::getExceptions(IEspECLWorkunit &info, unsigned flags)
             info.setErrorCount(errors.ErrCount());
             info.setWarningCount(errors.WrnCount());
             info.setInfoCount(errors.InfCount());
+            info.setAlertCount(errors.AlertCount());
         }
         if ((flags & WUINFO_IncludeExceptions))
             info.setExceptions(errors);

+ 2 - 0
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -98,12 +98,14 @@ struct WsWUExceptions
     int ErrCount() { return numerr; }
     int WrnCount() { return numwrn; }
     int InfCount() { return numinf; }
+    int AlertCount() { return numalert; }
 
 private:
     IArrayOf<IEspECLException> errors;
     int numerr;
     int numwrn;
     int numinf;
+    int numalert;
 };
 
 #define WUINFO_TruncateEclTo64k         0x0001

+ 8 - 1
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -1064,6 +1064,8 @@ WUExceptionSeverity checkGetExceptionSeverity(CWUExceptionSeverity severity)
             return ExceptionSeverityWarning;
         case CWUExceptionSeverity_ERROR:
             return ExceptionSeverityError;
+        case CWUExceptionSeverity_ALERT:
+            return ExceptionSeverityAlert;
     }
 
     throw MakeStringExceptionDirect(ECLWATCH_INVALID_INPUT,"invalid exception severity");
@@ -3960,7 +3962,7 @@ bool CWsWorkunitsEx::onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfo
             sb.append("Thor:         ").append(req.getThorIPAddress()).append("\r\n");
         //Exceptions/Warnings/Info
         Owned<IConstWUExceptionIterator> exceptions = &cwu->getExceptions();
-        StringBuffer info, warn, err;
+        StringBuffer info, warn, err, alert;
         ForEach(*exceptions)
         {
             switch (exceptions->query().getSeverity())
@@ -3974,6 +3976,9 @@ bool CWsWorkunitsEx::onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfo
             case ExceptionSeverityError:
                 err.append("\t").append(exceptions->query().getExceptionMessage(temp)).append("\r\n\r\n");
                 break;
+            case ExceptionSeverityAlert:
+                alert.append("\t").append(exceptions->query().getExceptionMessage(temp)).append("\r\n\r\n");
+                break;
             }
         }
         if (err.length())
@@ -3982,6 +3987,8 @@ bool CWsWorkunitsEx::onWUCreateZAPInfo(IEspContext &context, IEspWUCreateZAPInfo
             sb.append("Warnings:     ").append("\r\n").append(warn);
         if (info.length())
             sb.append("Information:  ").append("\r\n").append(info);
+        if (alert.length())
+            sb.append("Alert:        ").append("\r\n").append(alert);
 
         //User provided Information
 

+ 6 - 3
roxie/roxiemem/roxiemem.cpp

@@ -312,12 +312,12 @@ IPerfMonHook *createRoxieMemStatsPerfMonHook(IPerfMonHook *chain)
         {
         }
         
-        void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 firstDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount)
+        virtual void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 firstDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount)
         {
             if (chain)
                 chain->processPerfStats(processorUsage, memoryUsage, memoryTotal, firstDiskUsage,firstDiskTotal, secondDiskUsage, secondDiskTotal, threadCount);
         }
-        StringBuffer &extraLogging(StringBuffer &extra)
+        virtual StringBuffer &extraLogging(StringBuffer &extra)
         {
             unsigned totalPages;
             unsigned freePages;
@@ -332,7 +332,10 @@ IPerfMonHook *createRoxieMemStatsPerfMonHook(IPerfMonHook *chain)
                 return chain->extraLogging(extra);
             return extra;
         }
-
+        virtual void log(int level, const char *message)
+        {
+            PROGLOG("%s", message);
+        }
     };
     return new memstatsPerfMonHook(chain);
 }

+ 16 - 12
system/jlib/jdebug.cpp

@@ -1888,9 +1888,9 @@ public:
 #define KERN_NOTICE "<5>"   // normal but significant condition
 #define KERN_INFO   "<6>"   // informational
 #define KERN_DEBUG  "<7>"   // debug-level messages
-#define KMSGTEST(S) if (memcmp(p,S,3)==0) ln.append(#S)
+#define KMSGTEST(S) if (memcmp(p,S,3)==0) { ln.append(#S); level = p[1]-'0'; }
 
-    void printKLog()
+    void printKLog(IPerfMonHook *hook)
     {
         const char *p;
         size32_t sz = getKLog(p);
@@ -1899,14 +1899,15 @@ public:
         while (p!=e) {
             if (*p=='<') {
                 ln.clear();
-                KMSGTEST(KERN_EMERG);
-                else KMSGTEST(KERN_ALERT);
-                else KMSGTEST(KERN_CRIT);
-                else KMSGTEST(KERN_ERR);
-                else KMSGTEST(KERN_WARNING);
-                else KMSGTEST(KERN_NOTICE);
-                else KMSGTEST(KERN_INFO);
-                else KMSGTEST(KERN_DEBUG);
+                int level = -1;
+                KMSGTEST(KERN_EMERG)
+                else KMSGTEST(KERN_ALERT)
+                else KMSGTEST(KERN_CRIT)
+                else KMSGTEST(KERN_ERR)
+                else KMSGTEST(KERN_WARNING)
+                else KMSGTEST(KERN_NOTICE)
+                else KMSGTEST(KERN_INFO)
+                else KMSGTEST(KERN_DEBUG)
                 else {
                     ln.append("KERN_UNKNOWN");
                     p -= 3;
@@ -1915,7 +1916,10 @@ public:
                 ln.append(": ");
                 while ((p!=e)&&(*p!='\n'))
                     ln.append(*(p++));
-                PROGLOG("%s",ln.str());
+                if (hook)
+                    hook->log(level, ln.str());
+                else
+                    PROGLOG("%s",ln.str());
             }
             while ((p!=e)&&(*p!='\n'))
                 p++;
@@ -2279,7 +2283,7 @@ public:
                 if (traceMode&PerfMonExtended) {
                     if (extstats.getLine(str.clear()))
                         LOG(MCdebugInfo, unknownJob, "%s", str.str());
-                    extstats.printKLog();
+                    extstats.printKLog(hook);
                 }
 #endif
             }

+ 4 - 4
system/jlib/jdebug.hpp

@@ -238,12 +238,12 @@ void jlib_decl enableMemLeakChecking(bool enable);
 
 // Hook to be called by the performance monitor, takes stats for processor, virtual memory, disk, and thread usage
 
-class IPerfMonHook : public IInterface
+interface IPerfMonHook : extends IInterface
 {
 public:
     virtual void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 fistDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount) = 0;
     virtual StringBuffer &extraLogging(StringBuffer &extra) = 0; // for extra periodic logging
-
+    virtual void log(int level, const char *msg) = 0;
 };
 
 enum
@@ -264,7 +264,7 @@ enum
 
 };
 
-interface IUserMetric : public IInterface
+interface IUserMetric : extends IInterface
 {
     virtual unsigned __int64 queryCount() const = 0;
     virtual const char *queryName() const = 0;
@@ -278,7 +278,7 @@ extern jlib_decl IUserMetric * createUserMetric(const char *name, const char *ma
 typedef unsigned PerfMonMode;
 
 void jlib_decl getSystemTraceInfo(StringBuffer &str, PerfMonMode mode = PerfMonProcMem);
-void jlib_decl startPerformanceMonitor(unsigned interval, PerfMonMode traceMode = PerfMonStandard, IPerfMonHook * hook = 0);
+void jlib_decl startPerformanceMonitor(unsigned interval, PerfMonMode traceMode = PerfMonStandard, IPerfMonHook * hook = NULL);
 void jlib_decl stopPerformanceMonitor();
 void jlib_decl setPerformanceMonitorPrimaryFileSystem(char const * fs); // for monitoring disk1, defaults to C: (win) or / (linux)
 void jlib_decl setPerformanceMonitorSecondaryFileSystem(char const * fs); // for monitoring disk2, no default

+ 42 - 0
thorlcr/graph/thgraph.cpp

@@ -2486,6 +2486,7 @@ CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
     slaveGroup.setown(jobGroup->remove(0));
     myrank = jobGroup->rank(queryMyNode());
     globalMemorySize = globals->getPropInt("@globalMemorySize"); // in MB
+    oldNodeCacheMem = 0;
 }
 
 void CJobBase::init()
@@ -2531,6 +2532,11 @@ void CJobBase::init()
     graphExecutor.setown(new CGraphExecutor(*this));
 }
 
+void CJobBase::beforeDispose()
+{
+    endJob();
+}
+
 CJobBase::~CJobBase()
 {
     clean();
@@ -2551,6 +2557,41 @@ CJobBase::~CJobBase()
         PROGLOG("Heap usage : %"I64F"d bytes", (unsigned __int64)heapUsage);
 }
 
+void CJobBase::startJob()
+{
+    LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
+    ClearTempDirs();
+    unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
+    if (pinterval)
+    {
+        perfmonhook.setown(createThorMemStatsPerfMonHook(*this, getOptInt(THOROPT_MAX_KERNLOG, 3)));
+        startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
+    }
+    PrintMemoryStatusLog();
+    logDiskSpace();
+    unsigned keyNodeCacheMB = (unsigned)getWorkUnitValueInt("keyNodeCacheMB", 0);
+    if (keyNodeCacheMB)
+    {
+        oldNodeCacheMem = setNodeCacheMem(keyNodeCacheMB * 0x100000);
+        PROGLOG("Key node cache size set to: %d MB", keyNodeCacheMB);
+    }
+    unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
+    if (!keyFileCacheLimit)
+        keyFileCacheLimit = (querySlaves()+1)*2;
+    setKeyIndexCacheSize(keyFileCacheLimit);
+    PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
+}
+
+void CJobBase::endJob()
+{
+    stopPerformanceMonitor();
+    LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
+    clearKeyStoreCache(true);
+    if (oldNodeCacheMem)
+        setNodeCacheMem(oldNodeCacheMem);
+    PrintMemoryStatusLog();
+}
+
 bool CJobBase::queryForceLogging(graph_id graphId, bool def) const
 {
     // JCSMORE, could add comma separated range, e.g. 1-5,10-12
@@ -2575,6 +2616,7 @@ IThorGraphIterator *CJobBase::getSubGraphs()
     return new CGraphTableIterator(subGraphs);
 }
 
+
 static void getGlobalDeps(CGraphBase &graph, CopyCIArrayOf<CGraphDependency> &deps)
 {
     Owned<IThorActivityIterator> iter = graph.getIterator();

+ 5 - 0
thorlcr/graph/thgraph.hpp

@@ -768,6 +768,8 @@ protected:
     unsigned maxActivityCores, globalMemorySize;
     unsigned forceLogGraphIdMin, forceLogGraphIdMax;
     Owned<IContextLogger> logctx;
+    Owned<IPerfMonHook> perfmonhook;
+    size32_t oldNodeCacheMem;
 
     class CThorPluginCtx : public SimplePluginCtx
     {
@@ -793,10 +795,12 @@ protected:
             removeAssociates(child);
         }
     }
+    void endJob();
 public:
     IMPLEMENT_IINTERFACE;
 
     CJobBase(const char *graphName);
+    virtual void beforeDispose();
     ~CJobBase();
     void clean();
     void init();
@@ -808,6 +812,7 @@ public:
     bool queryForceLogging(graph_id graphId, bool def) const;
     ITimeReporter &queryTimeReporter() { return *timeReporter; }
     const IContextLogger &queryContextLogger() const { return *logctx; }
+    virtual void startJob();
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
     virtual CGraphBase *createGraph() = 0;
     void joinGraph(CGraphBase &graph);

+ 1 - 1
thorlcr/graph/thgraphmaster.cpp

@@ -1688,7 +1688,7 @@ bool CJobMaster::go()
     unsigned concurrentSubGraphs = (unsigned)getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
     try
     {
-        ClearTempDirs();
+        startJob();
         Owned<IWUGraphProgress> progress = graphProgress->update();
         progress->setGraphState(WUGraphRunning);
         progress.clear();

+ 1 - 35
thorlcr/graph/thgraphslave.cpp

@@ -1100,29 +1100,16 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
     querySo.setown(createDllEntry(_querySo, false, NULL));
     codeCtx = new CThorCodeContextSlave(*this, *querySo, *userDesc, slavemptag);
     tmpHandler.setown(createTempHandler(true));
-    startJob();
 }
 
 CJobSlave::~CJobSlave()
 {
     graphExecutor->wait();
-    endJob();
 }
 
 void CJobSlave::startJob()
 {
-    LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
-    ClearTempDirs();
-    unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
-    if (pinterval)
-        startPerformanceMonitor(pinterval);
-    if (pinterval)
-    {
-        perfmonhook.setown(createThorMemStatsPerfMonHook());
-        startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
-    }
-    PrintMemoryStatusLog();
-    logDiskSpace();
+    CJobBase::startJob();
     unsigned minFreeSpace = (unsigned)getWorkUnitValueInt("MINIMUM_DISK_SPACE", 0);
     if (minFreeSpace)
     {
@@ -1135,27 +1122,6 @@ void CJobSlave::startJob()
             throw MakeThorException(TE_NotEnoughFreeSpace, "Node %s has %u MB(s) of available disk space, specified minimum for this job: %u MB(s)", ep.getUrlStr(s).str(), (unsigned) freeSpace / 0x100000, minFreeSpace);
         }
     }
-    unsigned keyNodeCacheMB = (unsigned)getWorkUnitValueInt("keyNodeCacheMB", 0);
-    if (keyNodeCacheMB)
-    {
-        oldNodeCacheMem = setNodeCacheMem(keyNodeCacheMB * 0x100000);
-        PROGLOG("Key node cache size set to: %d MB", keyNodeCacheMB);
-    }
-    unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
-    if (!keyFileCacheLimit)
-        keyFileCacheLimit = (querySlaves()+1)*2;
-    setKeyIndexCacheSize(keyFileCacheLimit);
-    PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
-}
-
-void CJobSlave::endJob()
-{
-    stopPerformanceMonitor();
-    LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
-    clearKeyStoreCache(true);
-    if (oldNodeCacheMem)
-        setNodeCacheMem(oldNodeCacheMem);
-    PrintMemoryStatusLog();
 }
 
 __int64 CJobSlave::getWorkUnitValueInt(const char *prop, __int64 defVal) const

+ 13 - 4
thorlcr/graph/thgraphslave.hpp

@@ -143,15 +143,13 @@ class graphslave_decl CJobSlave : public CJobBase
     CriticalSection graphRunCrit;
     size32_t oldNodeCacheMem;
 
-    void startJob();
-    void endJob();
-
 public:
     IMPLEMENT_IINTERFACE;
 
     CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, const char *querySo, mptag_t _mptag, mptag_t _slavemptag);
     ~CJobSlave();
 
+    virtual void startJob();
     const char *queryFindString() const { return key.get(); } // for string HT
 
     ISlaveWatchdog *queryProgressHandler() { return watchdog; }
@@ -172,10 +170,21 @@ public:
         CMessageBuffer msg;
         msg.append((int)smt_errorMsg);
         IThorException *te = QUERYINTERFACE(e, IThorException);
+        bool userOrigin = false;
         if (te)
+        {
             te->setJobId(key);
+            te->setSlave(queryMyRank());
+            if (!te->queryOrigin())
+            {
+                VStringBuffer msg("SLAVE #%d", queryMyRank());
+                te->setOrigin(msg);
+            }
+            else if (0 == stricmp("user", te->queryOrigin()))
+                userOrigin = true;
+        }
         serializeThorException(e, msg);
-        if (te && te->queryOrigin() && 0 == stricmp("user", te->queryOrigin()))
+        if (userOrigin)
         {
             // wait for reply
             if (!queryJobComm().sendRecv(msg, 0, querySlaveMpTag(), LONGTIMEOUT))

+ 0 - 9
thorlcr/master/thmastermain.cpp

@@ -727,12 +727,6 @@ int main( int argc, char *argv[]  )
         connectLogMsgManagerToDali();
         if (globals->getPropBool("@cache_dafilesrv_master",false))
             setDaliServixSocketCaching(true); // speeds up deletes under linux
-
-        unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
-        if (pinterval) {
-            perfmonhook.setown(createThorMemStatsPerfMonHook());
-            startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
-        }
     }
     catch (IException *e)
     {
@@ -793,9 +787,6 @@ int main( int argc, char *argv[]  )
     disconnectLogMsgManagerFromDali();
     closeThorServerStatus();
     if (globals) globals->Release();
-    PROGLOG("Thor closing down 6");
-
-    stopPerformanceMonitor();
     PROGLOG("Thor closing down 5");
     PROGLOG("Thor closing down 4");
     closeDllServer();

+ 2 - 1
thorlcr/shared/thexception.hpp

@@ -155,7 +155,8 @@
 #define TE_LargeAggregateTable                  TE_Base + 132
 #define TE_SkewWarning                          TE_Base + 133
 #define TE_SkewError                            TE_Base + 134
-#define TE_Final                                TE_Base + 135       // keep this last
+#define TE_KERN                                 TE_Base + 135
+#define TE_Final                                TE_Base + 136       // keep this last
 #define ISTHOREXCEPTION(n) (n > TE_Base && n < TE_Final)
 
 #endif

+ 2 - 0
thorlcr/slave/slavmain.cpp

@@ -294,6 +294,8 @@ public:
                             PROGLOG("Workunit option 'slaveDaliClient' enabled");
                             enableThorSlaveAsDaliClient();
                         }
+                        job->startJob();
+
                         msg.clear();
                         msg.append(false);
                         break;

+ 0 - 7
thorlcr/thorutil/thmem.cpp

@@ -2384,10 +2384,3 @@ IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t s
 {
     return new COutputMetaWithExtra(meta, sz);
 }
-
-
-
-IPerfMonHook *createThorMemStatsPerfMonHook(IPerfMonHook *chain)
-{
-    return LINK(chain);
-}

+ 0 - 3
thorlcr/thorutil/thmem.hpp

@@ -547,7 +547,4 @@ extern graph_decl ILargeMemLimitNotify *createMultiThorResourceMutex(const char
 
 extern graph_decl void setThorVMSwapDirectory(const char *swapdir);
 
-class IPerfMonHook; 
-extern graph_decl IPerfMonHook *createThorMemStatsPerfMonHook(IPerfMonHook *chain=NULL); // for passing to jdebug startPerformanceMonitor
-
 #endif

+ 42 - 2
thorlcr/thorutil/thormisc.cpp

@@ -238,6 +238,8 @@ public:
         mb.read(column);
         mb.read((int &)severity);
         mb.read(origin);
+        if (0 == origin.length()) // simpler to clear serialized 0 length terminated string here than check on query
+            origin.clear();
         size32_t sz;
         mb.read(sz);
         if (sz)
@@ -742,8 +744,9 @@ void reportExceptionToWorkunit(IConstWorkUnit &workunit,IException *e, WUExcepti
         if (te)
         {
             we->setSeverity(te->querySeverity());
-            if (te->queryOrigin())
-                we->setExceptionSource(te->queryOrigin());
+            if (!te->queryOrigin()) // will have an origin if from slaves already
+                te->setOrigin("master");
+            we->setExceptionSource(te->queryOrigin());
             StringAttr file;
             unsigned line, column;
             te->getAssert(file, line, column);
@@ -1229,3 +1232,40 @@ void logDiskSpace()
     diskSpaceMsg.append(tempDir).append(" = ").append(getFreeSpace(tempDir)/0x100000).append(" MB");
     PROGLOG("%s", diskSpaceMsg.str());
 }
+
+IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int maxLevel, IPerfMonHook *chain)
+{
+    class CPerfMonHook : public CSimpleInterfaceOf<IPerfMonHook>
+    {
+        CJobBase &job;
+        int maxLevel;
+        Linked<IPerfMonHook> chain;
+    public:
+        CPerfMonHook(CJobBase &_job, unsigned _maxLevel, IPerfMonHook *_chain) : chain(_chain), maxLevel(_maxLevel), job(_job)
+        {
+        }
+        virtual void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 firstDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount)
+        {
+            if (chain)
+                chain->processPerfStats(processorUsage, memoryUsage, memoryTotal, firstDiskUsage,firstDiskTotal, secondDiskUsage, secondDiskTotal, threadCount);
+        }
+        virtual StringBuffer &extraLogging(StringBuffer &extra)
+        {
+            if (chain)
+                return chain->extraLogging(extra);
+            return extra;
+        }
+        virtual void log(int level, const char *msg)
+        {
+            PROGLOG("%s", msg);
+            if ((maxLevel != -1) && (level <= maxLevel)) // maxLevel of -1 means disabled
+            {
+                Owned<IThorException> e = MakeThorException(TE_KERN, "%s", msg);
+                e->setSeverity(ExceptionSeverityAlert);
+                e->setAction(tea_warning);
+                job.fireException(e);
+            }
+        }
+    };
+    return new CPerfMonHook(job, maxLevel, chain);
+}

+ 5 - 0
thorlcr/thorutil/thormisc.hpp

@@ -19,6 +19,7 @@
 #define _THORMISC_
 
 #include "jiface.hpp"
+#include "jdebug.hpp"
 #include "jthread.hpp"
 #include "jexcept.hpp"
 #include "jarray.hpp"
@@ -61,6 +62,7 @@
 #define THOROPT_JOINHELPER_THREADS    "joinHelperThreads"       // Number of threads to use in threaded variety of join helper
 #define THOROPT_LKJOIN_LOCALFAILOVER  "lkjoin_localfailover"    // Force SMART to failover to distributed local lookup join (for testing only)   (default = false)
 #define THOROPT_LKJOIN_HASHJOINFAILOVER "lkjoin_hashjoinfailover" // Force SMART to failover to hash join (for testing only)                     (default = false)
+#define THOROPT_MAX_KERNLOG           "max_kern_level"          // Max kernel logging level, to push to workunit, -1 to disable                  (default = 3)
 
 #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000  // max of row matches before selfjoin emits warning
 
@@ -457,5 +459,8 @@ extern graph_decl void sendInChunks(ICommunicator &comm, rank_t dst, mptag_t mpT
 
 extern graph_decl void logDiskSpace();
 
+class CJobBase;
+extern graph_decl IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int minLevel, IPerfMonHook *chain=NULL); // for passing to jdebug startPerformanceMonitor
+
 #endif