Browse Source

HPCC-20603 Add options to trace postmortem debug info on job abort

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 6 years ago
parent
commit
77f49a579e

+ 6 - 6
common/workunit/workunit.cpp

@@ -3194,7 +3194,7 @@ CWorkUnitWatcher::CWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribe
     if (flags & SubscribeOptionAbort)
     {
         VStringBuffer xpath("/WorkUnitAborts/%s", wuid);
-        abortId = querySDS().subscribe(xpath.str(), *this);
+        abortId = querySDS().subscribe(xpath.str(), *this, false, true);
     }
 }
 CWorkUnitWatcher::~CWorkUnitWatcher()
@@ -3220,11 +3220,11 @@ void CWorkUnitWatcher::notify(SubscriptionId id, const char *xpath, SDSNotifyFla
 {
     CriticalBlock b(crit);
     if (id==stateId)
-        subscriber->notify(SubscribeOptionState);
+        subscriber->notify(SubscribeOptionState, valueLen, valueData);
     else if (id==actionId)
-        subscriber->notify(SubscribeOptionAction);
+        subscriber->notify(SubscribeOptionAction, valueLen, valueData);
     else if (id==abortId)
-        subscriber->notify(SubscribeOptionAbort);
+        subscriber->notify(SubscribeOptionAbort, valueLen, valueData);
 }
 
 
@@ -5522,7 +5522,7 @@ public:
                 if (timeout==-1 || waited + 20000 < timeout)
                 {
                     waiter->wait(20000);  // recheck state every 20 seconds, in case eclagent has crashed.
-                    if (waiter->aborted)
+                    if (waiter->isAborted())
                     {
                         ret = WUStateUnknown;  // MORE - throw an exception?
                         break;
@@ -5557,7 +5557,7 @@ public:
                     break;
                 unsigned waited = msTick() - start;
                 waiter->wait(20000);  // recheck state every 20 seconds even if no timeout, in case eclagent has crashed.
-                if (waiter->aborted)
+                if (waiter->isAborted())
                 {
                     ret = WUActionUnknown;  // MORE - throw an exception?
                     break;

+ 1 - 1
common/workunit/workunit.hpp

@@ -748,7 +748,7 @@ enum WUSubscribeOptions
 
 interface IWorkUnitSubscriber
 {
-    virtual void notify(WUSubscribeOptions flags) = 0;
+    virtual void notify(WUSubscribeOptions flags, unsigned valueLen, const void *valueData) = 0;
 };
 
 interface IWorkUnitWatcher : extends IInterface

+ 15 - 12
common/workunit/workunit.ipp

@@ -603,7 +603,7 @@ public:
     virtual void unsubscribe();
     virtual bool aborting() const;
 protected:
-    virtual void notify(WUSubscribeOptions) { abortDirty = true; }
+    virtual void notify(WUSubscribeOptions, unsigned, const void *) override { abortDirty = true; }
     Owned<IWorkUnitWatcher> abortWatcher;
     mutable bool abortDirty;
     mutable bool abortState;
@@ -723,6 +723,7 @@ class WorkUnitWaiter : public CInterface, implements IAbortHandler, implements I
 {
     Semaphore changed;
     Owned<IWorkUnitWatcher> watcher;
+    bool aborted;
 public:
     IMPLEMENT_IINTERFACE;
     WorkUnitWaiter(const char *wuid, WUSubscribeOptions watchFor)
@@ -735,20 +736,11 @@ public:
     {
         unsubscribe();
     }
-    void notify(WUSubscribeOptions flags)
-    {
-        changed.signal();
-    }
+    bool isAborted() const { return aborted; }
     bool wait(unsigned timeout)
     {
         return changed.wait(timeout) && !aborted;
     }
-    bool onAbort()
-    {
-        aborted = true;
-        changed.signal();
-        return false;
-    }
     void unsubscribe()
     {
         if (watcher)
@@ -757,7 +749,18 @@ public:
             watcher.clear();
         }
     }
-    bool aborted;
+// IWorkUnitSubscriber
+    virtual void notify(WUSubscribeOptions flags, unsigned valueLen, const void *valueData) override
+    {
+        changed.signal();
+    }
+// IAbortHandler
+    virtual bool onAbort() override
+    {
+        aborted = true;
+        changed.signal();
+        return false;
+    }
 };
 
 #define PROGRESS_FORMAT_V 2

+ 2 - 2
plugins/cassandra/cassandrawu.cpp

@@ -3751,7 +3751,7 @@ public:
             if (timeout==-1 || waited + 20000 < timeout)
             {
                 waiter->wait(20000);  // recheck state every 20 seconds, in case eclagent has crashed.
-                if (waiter->aborted)
+                if (waiter->isAborted())
                     return WUStateUnknown;  // MORE - throw an exception?
             }
             else if (waited > timeout || !waiter->wait(timeout-waited))
@@ -3793,7 +3793,7 @@ public:
                 break;
             }
             waiter->wait(10000);  // recheck state every 20 seconds even if no notifications... just because we used to before
-            if (waiter->aborted)
+            if (waiter->isAborted())
                 break;
         }
         return ret;

+ 27 - 14
system/jlib/jexcept.cpp

@@ -1556,27 +1556,40 @@ void printStackReport(__int64 startIP)
 
 //---------------------------------------------------------------------------------------------------------------------
 
-bool getAllStacks(StringBuffer &output)
+unsigned getCommandOutput(StringBuffer &output, const char *cmd, const char *cmdTitle, const char *allowedPrograms)
 {
-#ifdef __linux__
-    const char *exePath = queryCurrentProcessPath();
-    if (!exePath)
-    {
-        output.append("Unable to capture stacks");
-        return false;
-    }
-    VStringBuffer cmd("gdb --batch -n -ex 'thread apply all bt' %s %u", exePath, GetCurrentProcessId());
-    Owned<IPipeProcess> pipe = createPipeProcess();
-    if (pipe->run("get stacks", cmd, nullptr, false, true, false))
+    Owned<IPipeProcess> pipe = createPipeProcess(allowedPrograms);
+    if (pipe->run(cmdTitle, cmd, nullptr, false, true, false))
     {
         Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
         readSimpleStream(output, *pipeReader);
     }
-    int retcode = pipe->wait();
-    return 0 == retcode;
-#else
+    return pipe->wait();
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+bool getDebuggerGetStacksCmd(StringBuffer &output)
+{
+#ifndef __linux__
     return false; // unsupported
 #endif
+
+    const char *exePath = queryCurrentProcessPath();
+    if (!exePath)
+    {
+        output.append("Unable to capture stacks");
+        return false;
+    }
+    return output.appendf("gdb --batch -n -ex 'thread apply all bt' %s %u", exePath, GetCurrentProcessId());
+}
+
+bool getAllStacks(StringBuffer &output)
+{
+    StringBuffer cmd;
+    if (!getDebuggerGetStacksCmd(cmd))
+        return false;
+    return 0 == getCommandOutput(output, cmd, "get stacks");
 }
 
 //---------------------------------------------------------------------------------------------------------------------

+ 2 - 0
system/jlib/jexcept.hpp

@@ -152,6 +152,8 @@ void  jlib_decl printStackReport(__int64 startIP = 0);
 #define PrintStackReport printStackReport
 
 bool jlib_decl getAllStacks(StringBuffer &output);
+unsigned jlib_decl getCommandOutput(StringBuffer &output, const char *cmd, const char *cmdTitle=nullptr, const char *allowedPrograms=nullptr);
+bool jlib_decl getDebuggerGetStacksCmd(StringBuffer &output);
 
 #ifdef _DEBUG
 #define RELEASE_CATCH_ALL       int*********

+ 20 - 2
thorlcr/graph/thgraphmaster.cpp

@@ -1676,7 +1676,7 @@ bool CJobMaster::go()
             }
             _watcher->unsubscribe();
         }
-        void notify(WUSubscribeOptions flags)
+        virtual void notify(WUSubscribeOptions flags, unsigned valueLen, const void *valueData) override
         {
             CriticalBlock b(crit);
             if (!watcher)
@@ -1687,7 +1687,18 @@ bool CJobMaster::go()
                 if (factory->isAborting(wu.queryWuid()))
                 {
                     LOG(MCwarning, thorJob, "ABORT detected from user");
-                    Owned <IException> e = MakeThorException(TE_WorkUnitAborting, "User signalled abort");
+
+                    unsigned code = TE_WorkUnitAborting; // default
+                    if (job.getOptBool("dumpInfoOnUserAbort", false))
+                        code = TE_WorkUnitAbortingDumpInfo;
+                    else if ((1 == valueLen) && ('2' == *((const char *)valueData)))
+                    {
+                        /* NB: Standard abort mechanism will trigger abort subscriber with "1"
+                         * If "2" is signalled, it means - Abort with dump info.
+                         */
+                        code = TE_WorkUnitAbortingDumpInfo;
+                    }
+                    Owned <IException> e = MakeThorException(code, "User signalled abort");
                     job.fireException(e);
                 }
             }
@@ -2193,6 +2204,12 @@ void CMasterGraph::abort(IException *e)
 {
     if (aborted) return;
     bool _graphDone = graphDone; // aborting master activities can trigger master graphDone, but want to fire GraphAbort to slaves if graphDone=false at start.
+    bool dumpInfo = TE_WorkUnitAbortingDumpInfo == e->errorCode() || job.getOptBool("dumpInfoOnAbort");
+    if (dumpInfo)
+    {
+        StringBuffer dumpInfoCmd;
+        checkAndDumpAbortInfo(queryJob().getOpt("dumpInfoCmd", dumpInfoCmd));
+    }
     try { CGraphBase::abort(e); }
     catch (IException *e)
     {
@@ -2209,6 +2226,7 @@ void CMasterGraph::abort(IException *e)
             CMessageBuffer msg;
             msg.append(GraphAbort);
             msg.append(job.queryKey());
+            msg.append(dumpInfo);
             msg.append(queryGraphId());
             jobM->broadcast(queryNodeComm(), msg, masterSlaveMpTag, LONGTIMEOUT, "abort");
         }

+ 3 - 2
thorlcr/shared/thexception.hpp

@@ -119,7 +119,7 @@
 #define TE_PipeReturnedFailure                  TE_Base + 96
 #define TE_IdleRestart                          TE_Base + 97
 #define TE_NotEnoughFreeSpace                   TE_Base + 98
-#define TE_WorkUnitWriteLimitExceeded               TE_Base + 99
+#define TE_WorkUnitWriteLimitExceeded           TE_Base + 99
 #define TE_CsvLineLenghtExceeded                TE_Base + 100
 #define TE_ActivityBufferingLimitExceeded       TE_Base + 101
 #define TE_CouldNotCreateLookAhead              TE_Base + 102
@@ -156,7 +156,8 @@
 #define TE_SkewWarning                          TE_Base + 133
 #define TE_SkewError                            TE_Base + 134
 #define TE_KERN                                 TE_Base + 135
-#define TE_Final                                TE_Base + 136       // keep this last
+#define TE_WorkUnitAbortingDumpInfo             TE_Base + 136
+#define TE_Final                                TE_Base + 137       // keep this last
 #define ISTHOREXCEPTION(n) (n > TE_Base && n < TE_Final)
 
 #endif

+ 7 - 0
thorlcr/slave/slavmain.cpp

@@ -1939,6 +1939,13 @@ public:
                         CJobSlave *job = jobs.find(jobKey.get());
                         if (job)
                         {
+                            bool dumpInfo;
+                            msg.read(dumpInfo);
+                            if (dumpInfo)
+                            {
+                                StringBuffer dumpInfoCmd;
+                                checkAndDumpAbortInfo(job->getOpt("dumpInfoCmd", dumpInfoCmd));
+                            }
                             graph_id gid;
                             msg.read(gid);
                             for (unsigned c=0; c<job->queryJobChannels(); c++)

+ 34 - 0
thorlcr/thorutil/thormisc.cpp

@@ -1480,3 +1480,37 @@ bool isRemoteReadCandidate(const CActivityBase &activity, const RemoteFilename &
     }
     return false;
 }
+
+void checkAndDumpAbortInfo(const char *cmd)
+{
+    try
+    {
+        StringBuffer dumpInfoCmd(cmd);
+        if (dumpInfoCmd.length())
+        {
+            /* add some params that might be useful to script
+             * 1) Thor instance name
+             * 2) base port
+             * 3) exe path
+             * 4) PID
+             */
+            const char *myInstanceName = globals->queryProp("@name");
+            unsigned myBasePort = getMachinePortBase();
+            StringBuffer exePath(queryCurrentProcessPath());
+            if (0 == exePath.length())
+                exePath.append("process-name-unknown");
+            unsigned pid = GetCurrentProcessId();
+            dumpInfoCmd.appendf(" %s %u %s %u", myInstanceName, myBasePort, exePath.str(), pid);
+        }
+        else
+            getDebuggerGetStacksCmd(dumpInfoCmd);
+        StringBuffer cmdOutput;
+        unsigned retCode = getCommandOutput(cmdOutput, dumpInfoCmd, "slave dump info", globals->queryProp("@allowedPipePrograms"));
+        PROGLOG("\n%s, return code = %u\n%s\n", dumpInfoCmd.str(), retCode, cmdOutput.str());
+    }
+    catch (IException *e)
+    {
+        EXCLOG(e, nullptr);
+        e->Release();
+    }
+}

+ 2 - 0
thorlcr/thorutil/thormisc.hpp

@@ -525,5 +525,7 @@ extern graph_decl void getLayoutTranslations(IConstPointerArrayOf<ITranslator> &
 extern graph_decl const ITranslator *getLayoutTranslation(const char *fname, IPartDescriptor &partDesc, RecordTranslationMode translationMode, unsigned expectedFormatCrc, IOutputMetaData *expectedFormat, unsigned projectedFormatCrc, IOutputMetaData *projectedFormat);
 extern graph_decl bool isRemoteReadCandidate(const CActivityBase &activity, const RemoteFilename &rfn, StringBuffer &localPath);
 
+extern graph_decl void checkAndDumpAbortInfo(const char *cmd);
+
 #endif