Pārlūkot izejas kodu

Merge pull request #2332 from jakesmith/graphprogress

Graph progress improvements

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 gadi atpakaļ
vecāks
revīzija
64ac421af8

+ 1 - 0
system/mp/mptag.hpp

@@ -55,6 +55,7 @@ TAGENUM
     DEFTAG ( MPTAG_THOR )
     DEFTAG ( MPTAG_THORRESOURCELOCK )
     DEFTAG ( MPTAG_MPTX )
+    DEFTAG ( MPTAG_THORWATCHDOG )
 
     // new static tags go above here
 

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -729,7 +729,7 @@ public:
     void setResults(IThorGraphResults *results);
     virtual void executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *graphLoopResults);
     virtual void executeChild(size32_t parentExtractSz, const byte *parentExtract);
-    virtual void serializeStats(MemoryBuffer &mb) { }
+    virtual bool serializeStats(MemoryBuffer &mb) { return false; }
     virtual bool prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async);
     virtual void create(size32_t parentExtractSz, const byte *parentExtract);
     virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract);

+ 12 - 16
thorlcr/graph/thgraphmaster.cpp

@@ -2404,23 +2404,19 @@ void CMasterGraph::getFinalProgress()
         {
             if (globals->getPropBool("@watchdogProgressEnabled"))
             {
-                HeartBeatPacket &hb = *(HeartBeatPacket *) msg.readDirect(sizeof(hb.packetsize));
-                if (hb.packetsize)
+                try
                 {
-                    size32_t sz = hb.packetsize-sizeof(hb.packetsize);
-                    if (sz)
-                    {
-                        msg.readDirect(sz);
-                        try
-                        {
-                            queryJobManager().queryDeMonServer()->takeHeartBeat(hb);
-                        }
-                        catch (IException *e)
-                        {
-                            GraphPrintLog(e, "Failure whilst deserializing stats/progress");
-                            e->Release();
-                        }
-                    }
+                    size32_t progressLen;
+                    msg.read(progressLen);
+                    MemoryBuffer progressData;
+                    progressData.setBuffer(progressLen, (void *)msg.readDirect(progressLen));
+                    const SocketEndpoint &ep = queryClusterGroup().queryNode(sender).endpoint();
+                    queryJobManager().queryDeMonServer()->takeHeartBeat(ep, progressData);
+                }
+                catch (IException *e)
+                {
+                    GraphPrintLog(e, "Failure whilst deserializing stats/progress");
+                    e->Release();
                 }
             }
         }

+ 59 - 29
thorlcr/graph/thgraphslave.cpp

@@ -247,7 +247,11 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
         if (TAKchildif == container.getKind())
         {
             if (inputs.ordinality() && (((unsigned)-1) != container.whichBranch))
-                inputCycles += inputs.item(container.whichBranch)->queryTotalCycles();
+            {
+                IThorDataLink *input = inputs.item(container.whichBranch);
+                if (input)
+                    inputCycles += input->queryTotalCycles();
+            }
         }
         else
         {
@@ -295,6 +299,8 @@ void CSlaveGraph::init(MemoryBuffer &mb)
     waitBarrier = job.createBarrier(waitBarrierTag);
     if (doneBarrierTag != TAG_NULL)
         doneBarrier = job.createBarrier(doneBarrierTag);
+    initialized = false;
+    progressActive = progressToCollect = false;
     unsigned subCount;
     mb.read(subCount);
     while (subCount--)
@@ -308,6 +314,8 @@ void CSlaveGraph::init(MemoryBuffer &mb)
 
 void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
 {
+    CriticalBlock b(progressCrit);
+    initialized = true;
     activity_id id;
     loop
     {
@@ -472,6 +480,11 @@ bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
 
 void CSlaveGraph::start()
 {
+    {
+        SpinBlock b(progressActiveLock);
+        progressActive = true;
+        progressToCollect = true;
+    }
     bool forceAsync = !queryOwner() || isGlobal();
     Owned<IThorActivityIterator> iter = getSinkIterator();
     unsigned sinks = 0;
@@ -606,6 +619,11 @@ void CSlaveGraph::abort(IException *e)
 void CSlaveGraph::done()
 {
     GraphPrintLog("End of sub-graph");
+    {
+        SpinBlock b(progressActiveLock);
+        progressActive = false;
+        progressToCollect = true; // NB: ensure collected after end of graph
+    }
     if (!aborted && (!queryOwner() || isGlobal()))
         getDoneSem.wait(); // must wait on master
     if (!queryOwner())
@@ -644,46 +662,62 @@ void CSlaveGraph::end()
     }
 }
 
-void CSlaveGraph::serializeStats(MemoryBuffer &mb)
+bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
 {
+    unsigned beginPos = mb.length();
     mb.append(queryGraphId());
     unsigned cPos = mb.length();
     unsigned count = 0;
     mb.append(count);
     CriticalBlock b(progressCrit);
-    if (started || 0 == activityCount())
+    // until started and activities initialized, activities are not ready to serlialize stats.
+    if ((started&&initialized) || 0 == activityCount())
     {
-        unsigned sPos = mb.length();
-        Owned<IThorActivityIterator> iter = getTraverseIterator();
-        ForEach (*iter)
+        bool collect=false;
         {
-            if (mb.length() > (DATA_MAX-30))
+            SpinBlock b(progressActiveLock);
+            if (progressActive || progressToCollect)
             {
-                WARNLOG("Act: Progress packet too big!");
-                break;
+                progressToCollect = false;
+                collect = true;
             }
-            
-            CGraphElementBase &element = iter->query();
-            CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
-            unsigned pos = mb.length();
-            mb.append(activity.queryContainer().queryId());
-            activity.serializeStats(mb);
-            if (pos == mb.length()-sizeof(activity_id))
-                mb.rewrite(pos);
-            else
-                ++count;
         }
-        mb.writeDirect(cPos, sizeof(count), &count);
-        mb.append(queryChildGraphCount());
+        if (collect)
+        {
+            unsigned sPos = mb.length();
+            Owned<IThorActivityIterator> iter = getTraverseIterator();
+            ForEach (*iter)
+            {
+                CGraphElementBase &element = iter->query();
+                CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
+                unsigned pos = mb.length();
+                mb.append(activity.queryContainer().queryId());
+                activity.serializeStats(mb);
+                if (pos == mb.length()-sizeof(activity_id))
+                    mb.rewrite(pos);
+                else
+                    ++count;
+            }
+            mb.writeDirect(cPos, sizeof(count), &count);
+        }
+        unsigned cqCountPos = mb.length();
+        unsigned cq=0;
+        mb.append(cq);
         Owned<IThorGraphIterator> childIter = getChildGraphs();
         ForEach(*childIter)
         {
             CSlaveGraph &graph = (CSlaveGraph &)childIter->query();
-            graph.serializeStats(mb);
+            if (graph.serializeStats(mb))
+                ++cq;
+        }
+        if (count || cq)
+        {
+            mb.writeDirect(cqCountPos, sizeof(cq), &cq);
+            return true;
         }
     }
-    else
-        mb.append((unsigned)0); // sub graph count
+    mb.rewrite(beginPos);
+    return false;
 }
 
 void CSlaveGraph::serializeDone(MemoryBuffer &mb)
@@ -730,11 +764,7 @@ void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
             if (!queryOwner())
             {
                 if (globals->getPropBool("@watchdogProgressEnabled"))
-                {
-                    HeartBeatPacket hb;
-                    jobS.queryProgressHandler()->stopGraph(*this, &hb);
-                    doneInfoMb.append(hb.packetsize, &hb);
-                }
+                    jobS.queryProgressHandler()->stopGraph(*this, &doneInfoMb);
             }
             doneInfoMb.append(job.queryMaxDiskUsage());
             queryJob().queryTimeReporter().serialize(doneInfoMb);

+ 3 - 2
thorlcr/graph/thgraphslave.hpp

@@ -90,8 +90,9 @@ class graphslave_decl CSlaveGraph : public CGraphBase
     CJobSlave &jobS;
     Owned<IInterface> progressHandler;
     Semaphore getDoneSem;
-    bool needsFinalInfo;
+    bool initialized, progressActive, progressToCollect;
     CriticalSection progressCrit;
+    SpinLock progressActiveLock;
 
 public:
 
@@ -109,7 +110,7 @@ public:
     IThorResult *getGlobalResult(CActivityBase &activity, IRowInterfaces *rowIf, unsigned id);
 
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
-    virtual void serializeStats(MemoryBuffer &mb);
+    virtual bool serializeStats(MemoryBuffer &mb);
     virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract);
     virtual void start();
     virtual void create(size32_t parentExtractSz, const byte *parentExtract);

+ 160 - 78
thorlcr/master/mawatchdog.cpp

@@ -42,63 +42,63 @@ public:
     SocketEndpoint ep;
     bool alive;
     bool markdead;
-    HeartBeatPacket lastpacket;
     CMachineStatus(const SocketEndpoint &_ep)
         : ep(_ep)
     {
         alive = true;
         markdead = false;
-        memset(&lastpacket,0,sizeof(lastpacket));
     }
-    void update(HeartBeatPacket &packet)
+    void update(HeartBeatPacketHeader &packet)
     {
         alive = true;
-        if (markdead) {
+        if (markdead)
+        {
             markdead = false;
             StringBuffer epstr;
             ep.getUrlStr(epstr);
             LOG(MCdebugProgress, unknownJob, "Watchdog : Marking Machine as Up! [%s]", epstr.str());
         }
-        if(packet.progressSize > 0)
-            lastpacket = packet;
     }   
 };
 
 
-CMasterWatchdog::CMasterWatchdog() : threaded("CMasterWatchdog")
+CMasterWatchdogBase::CMasterWatchdogBase() : threaded("CMasterWatchdogBase")
 {
-    stopped = false;
-    sock = NULL;
-    if (globals->getPropBool("@watchdogEnabled"))
-    {
-        if (!sock)
-            sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
-        LOG(MCdebugProgress, unknownJob, "Starting watchdog");
+    stopped = true;
+    watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT);
+    if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10)
+        watchdogMachineTimeout = HEARTBEAT_INTERVAL*10;
+    watchdogMachineTimeout *= 1000;
 #ifdef _WIN32
-        threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
+    threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
 #endif
-        threaded.init(this);
-    }
 }
 
-CMasterWatchdog::~CMasterWatchdog()
+CMasterWatchdogBase::~CMasterWatchdogBase()
 {
     stop();
-    ::Release(sock);
-    ForEachItemInRev(i, state) {
+    ForEachItemInRev(i, state)
+    {
         CMachineStatus *mstate=(CMachineStatus *)state.item(i);
         delete mstate;
     }
 }
 
-void CMasterWatchdog::addSlave(const SocketEndpoint &slave)
+void CMasterWatchdogBase::start()
+{
+    PROGLOG("Starting watchdog");
+    stopped = false;
+    threaded.init(this);
+}
+
+void CMasterWatchdogBase::addSlave(const SocketEndpoint &slave)
 {
     synchronized block(mutex);
     CMachineStatus *mstate=new CMachineStatus(slave);
     state.append(mstate);
 }
 
-void CMasterWatchdog::removeSlave(const SocketEndpoint &slave)
+void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave)
 {
     synchronized block(mutex);
     CMachineStatus *ms = findSlave(slave);
@@ -108,9 +108,10 @@ void CMasterWatchdog::removeSlave(const SocketEndpoint &slave)
     }
 }
 
-CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep)
+CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep)
 {
-    ForEachItemInRev(i, state) {
+    ForEachItemInRev(i, state)
+    {
         CMachineStatus *mstate=(CMachineStatus *)state.item(i);
         if (mstate->ep.equals(ep))
             return mstate;
@@ -119,7 +120,7 @@ CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep)
 }
 
 
-void CMasterWatchdog::stop()
+void CMasterWatchdogBase::stop()
 {
     threaded.adjustPriority(0); // restore to normal before stopping
     { synchronized block(mutex);
@@ -128,27 +129,16 @@ void CMasterWatchdog::stop()
         LOG(MCdebugProgress, unknownJob, "Stopping watchdog");
         stopped = true;
     }
-    if (sock)
-    {
-        SocketEndpoint masterEp(getMasterPortBase());
-        StringBuffer ipStr;
-        masterEp.getIpText(ipStr);
-        Owned<ISocket> sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str());
-        HeartBeatPacket hbpacket;
-        memset(&hbpacket, 0, sizeof(hbpacket));
-        MemoryBuffer mb;
-        size32_t sz = ThorCompress(&hbpacket, hbpacket.packetSize(), mb);
-        sock->write(mb.toByteArray(), sz);
-        sock->close();
-    }
+    stopReading();
     threaded.join();
     LOG(MCdebugProgress, unknownJob, "Stopped watchdog");
 }
 
-void CMasterWatchdog::checkMachineStatus()
+void CMasterWatchdogBase::checkMachineStatus()
 {
     synchronized block(mutex);
-    ForEachItemInRev(i, state) {
+    ForEachItemInRev(i, state)
+    {
         CMachineStatus *mstate=(CMachineStatus *)state.item(i);
         if (!mstate->alive)
         {
@@ -167,67 +157,78 @@ void CMasterWatchdog::checkMachineStatus()
             mstate->alive = false;
         }
     }
+}
 
+unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb)
+{
+    mb.clear();
+    unsigned read = readData(mb);
+    if (read)
+    {
+        if (read < sizeof(HeartBeatPacketHeader))
+        {
+            WARNLOG("Receive Monitor Packet: wrong size, got %d, less than HeartBeatPacketHeader size", read);
+            return 0;
+        }
+        memcpy(&hb, mb.readDirect(sizeof(HeartBeatPacketHeader)), sizeof(HeartBeatPacketHeader));
+        if (read != hb.packetSize)  // check for corrupt packets
+        {
+            WARNLOG("Receive Monitor Packet: wrong size, expected %d, got %d", hb.packetSize, read);
+            return 0;
+        }
+        mb.setLength(hb.packetSize);
+        return hb.packetSize;
+    }
+    else
+        mb.clear();
+    return 0;
 }
 
-void CMasterWatchdog::main()
+void CMasterWatchdogBase::main()
 {
     LOG(MCdebugProgress, unknownJob, "Started watchdog");
     unsigned lastbeat=msTick();
     unsigned lastcheck=lastbeat;
 
-    unsigned watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT);
-    if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10)
-        watchdogMachineTimeout = HEARTBEAT_INTERVAL*10;
-    watchdogMachineTimeout *= 1000;
     retrycount = 0;
-    try {
-        while (!stopped) {
-            HeartBeatPacket hbpacket;
-            try {
-                size32_t read;
-                MemoryBuffer packetCompressedMb;
-                sock->readtms(packetCompressedMb.reserveTruncate(hbpacket.maxPacketSize()), hbpacket.minPacketSize(), hbpacket.maxPacketSize(), read, watchdogMachineTimeout);
-                MemoryBuffer packetMb;
-                read = ThorExpand(packetCompressedMb.toByteArray(), read, &hbpacket, hbpacket.maxPacketSize());
-                if (0==hbpacket.packetsize)
-                    break; // signal to stop
-                if(read > hbpacket.minPacketSize() && read == hbpacket.packetsize)  // check for corrupt packets
+    try
+    {
+        while (!stopped)
+        {
+            HeartBeatPacketHeader hb;
+            MemoryBuffer progressData;
+            unsigned sz = readPacket(hb, progressData);
+            if (stopped)
+                break;
+            else if (sz)
+            {
+                synchronized block(mutex);
+                CMachineStatus *ms = findSlave(hb.sender);
+                if (ms)
                 {
-                    synchronized block(mutex);
-                    CMachineStatus *ms = findSlave(hbpacket.sender);
-                    if (ms) 
+                    ms->update(hb);
+                    if (progressData.remaining())
                     {
-                        ms->update(hbpacket);
                         Owned<IJobManager> jobManager = getJobManager();
                         if (jobManager)
-                            jobManager->queryDeMonServer()->takeHeartBeat(hbpacket);
-                    }
-                    else {
-                        StringBuffer epstr;
-                        hbpacket.sender.getUrlStr(epstr);
-                        LOG(MCdebugProgress, unknownJob, "Watchdog : Unknown Machine! [%s]", epstr.str()); //TBD
+                            jobManager->queryDeMonServer()->takeHeartBeat(hb.sender, progressData);
                     }
                 }
                 else
                 {
-                    LOG(MCdebugProgress, unknownJob, "Receive Monitor Packet: wrong size, expected %d, got %d", hbpacket.packetsize, read);
+                    StringBuffer epstr;
+                    hb.sender.getUrlStr(epstr);
+                    LOG(MCdebugProgress, unknownJob, "Watchdog : Unknown Machine! [%s]", epstr.str()); //TBD
                 }
             }
-            catch (IJSOCK_Exception *e)
-            {
-                if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close)) 
-                    throw;
-                e->Release();
-            }
-            if (stopped)
-                break;
             unsigned now=msTick();
-            if (now-lastcheck>watchdogMachineTimeout) {
+            if (now-lastcheck>watchdogMachineTimeout)
+            {
                 checkMachineStatus();
                 lastcheck = msTick();
             }
-            if (now-lastbeat>THORBEAT_INTERVAL) {
+            if (now-lastbeat>THORBEAT_INTERVAL)
+            {
                 if (retrycount<=0) retrycount=THORBEAT_RETRY_INTERVAL; else retrycount -= THORBEAT_INTERVAL;
                 lastbeat = msTick();
             }
@@ -240,3 +241,84 @@ void CMasterWatchdog::main()
     }
 }
 
+
+class CMasterWatchdogUDP : public CMasterWatchdogBase
+{
+    ISocket *sock;
+public:
+    CMasterWatchdogUDP()
+    {
+        sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
+        start();
+    }
+    ~CMasterWatchdogUDP()
+    {
+        ::Release(sock);
+    }
+    virtual unsigned readData(MemoryBuffer &mb)
+    {
+        size32_t read;
+        try
+        {
+            sock->readtms(mb.reserveTruncate(UDP_DATA_MAX), sizeof(HeartBeatPacketHeader), UDP_DATA_MAX, read, watchdogMachineTimeout);
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
+                throw;
+            e->Release();
+            return 0; // will retry
+        }
+        return read;
+    }
+    virtual void stopReading()
+    {
+        if (sock)
+        {
+            SocketEndpoint masterEp(getMasterPortBase());
+            StringBuffer ipStr;
+            masterEp.getIpText(ipStr);
+            Owned<ISocket> sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str());
+            // send empty packet, stopped set, will cease reading
+            HeartBeatPacketHeader hb;
+            memset(&hb, 0, sizeof(hb));
+            hb.packetSize = sizeof(HeartBeatPacketHeader);
+            sock->write(&hb, sizeof(HeartBeatPacketHeader));
+            sock->close();
+        }
+    }
+};
+
+/////////////////////
+
+class CMasterWatchdogMP : public CMasterWatchdogBase
+{
+public:
+    CMasterWatchdogMP()
+    {
+        start();
+    }
+    virtual unsigned readData(MemoryBuffer &mb)
+    {
+        CMessageBuffer msg;
+        rank_t sender;
+        if (!queryClusterComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, &sender, watchdogMachineTimeout))
+            return 0;
+        mb.swapWith(msg);
+        return mb.length();
+    }
+    virtual void stopReading()
+    {
+        queryClusterComm().cancel(0, MPTAG_THORWATCHDOG);
+    }
+};
+
+/////////////////////
+
+CMasterWatchdogBase *createMasterWatchdog(bool udp)
+{
+    if (udp)
+        return new CMasterWatchdogUDP();
+    else
+        return new CMasterWatchdogMP();
+}

+ 18 - 11
thorlcr/master/mawatchdog.hpp

@@ -25,28 +25,35 @@
 #include "jmutex.hpp"
 
 class CMachineStatus;
-struct HeartBeatPacket;
+struct HeartBeatPacketHeader;
 
-class CMasterWatchdog : public CSimpleInterface, implements IThreaded
+class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded
 {
+    PointerArray state;
+    SocketEndpoint master;
+    Mutex mutex;
+    int retrycount;
     CThreaded threaded;
+protected:
+    bool stopped;
+    unsigned watchdogMachineTimeout;
 public:
-    CMasterWatchdog();
-    ~CMasterWatchdog();
+    CMasterWatchdogBase();
+    ~CMasterWatchdogBase();
     void addSlave(const SocketEndpoint &slave);
     void removeSlave(const SocketEndpoint &slave);
     CMachineStatus *findSlave(const SocketEndpoint &ep);
     void checkMachineStatus();
+    unsigned readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb);
+    void start();
     void stop();
     void main();
-private:
-    PointerArray state;
-    SocketEndpoint master;
-    ISocket *sock;
-    Mutex mutex;
-    bool stopped;
-    int retrycount;
+
+    virtual unsigned readData(MemoryBuffer &mb) = 0;
+    virtual void stopReading() = 0;
 };
 
+CMasterWatchdogBase *createMasterWatchdog(bool udp=false);
+
 #endif
 

+ 24 - 23
thorlcr/master/thdemonserver.cpp

@@ -25,6 +25,7 @@
 
 #include "thormisc.hpp"
 #include "thorport.hpp"
+#include "thcompressutil.hpp"
 #include "thgraphmaster.ipp"
 #include "thgraphmanager.hpp"
 #include "thwatchdog.hpp"
@@ -159,28 +160,27 @@ public:
         reportRate = globals->getPropInt("@watchdogProgressInterval", 30);
     }
 
-    virtual void takeHeartBeat(HeartBeatPacket & hbpacket)
+    virtual void takeHeartBeat(const SocketEndpoint &sender, MemoryBuffer &progressMb)
     {
         synchronized block(mutex);
-        
-        if((hbpacket.packetsize>sizeof(hbpacket.packetsize))&&(hbpacket.progressSize > 0))
+        if (0 == activeGraphs.ordinality())
         {
-            if (0 == activeGraphs.ordinality())
-            {
-                StringBuffer urlStr;
-                LOG(MCdebugProgress, unknownJob, "heartbeat packet received with no active graphs, from=%s", hbpacket.sender.getUrlStr(urlStr).str());
-                return;
-            }
-            rank_t node = querySlaveGroup().rank(hbpacket.sender);
-            assertex(node != RANK_NULL);
-
-            MemoryBuffer statsMb;
-            statsMb.setBuffer(hbpacket.progressSize, hbpacket.perfdata);
+            StringBuffer urlStr;
+            LOG(MCdebugProgress, unknownJob, "heartbeat packet received with no active graphs, from=%s", sender.getUrlStr(urlStr).str());
+            return;
+        }
+        rank_t node = querySlaveGroup().rank(sender);
+        assertex(node != RANK_NULL);
 
-            while (statsMb.remaining())
+        size32_t compressedProgressSz = progressMb.remaining();
+        if (compressedProgressSz)
+        {
+            MemoryBuffer uncompressedMb;
+            ThorExpand(progressMb.readDirect(compressedProgressSz), compressedProgressSz, uncompressedMb);
+            do
             {
                 graph_id graphId;
-                statsMb.read(graphId);
+                uncompressedMb.read(graphId);
                 CMasterGraph *graph = NULL;
                 ForEachItemIn(g, activeGraphs) if (activeGraphs.item(g).queryGraphId() == graphId) graph = (CMasterGraph *)&activeGraphs.item(g);
                 if (!graph)
@@ -188,18 +188,19 @@ public:
                     LOG(MCdebugProgress, unknownJob, "heartbeat received from unknown graph %"GIDPF"d", graphId);
                     break;
                 }
-                if (!graph->deserializeStats(node, statsMb))
+                if (!graph->deserializeStats(node, uncompressedMb))
                 {
                     LOG(MCdebugProgress, unknownJob, "heartbeat error in graph %"GIDPF"d", graphId);
                     break;
                 }
             }
-            unsigned now=msTick();
-            if (now-lastReport > 1000*reportRate) 
-            {
-                reportGraph(false);
-                lastReport = msTick();
-            }
+            while (uncompressedMb.remaining());
+        }
+        unsigned now=msTick();
+        if (now-lastReport > 1000*reportRate)
+        {
+            reportGraph(false);
+            lastReport = msTick();
         }
     }
     void startGraph(CGraphBase *graph)

+ 1 - 1
thorlcr/master/thdemonserver.hpp

@@ -27,7 +27,7 @@ interface IWUGraphProgress;
 class CGraphBase;
 interface IDeMonServer : extends IInterface
 {
-    virtual void takeHeartBeat(HeartBeatPacket & hbpacket) = 0;
+    virtual void takeHeartBeat(const SocketEndpoint &sender, MemoryBuffer &progressMbb) = 0;
     virtual void startGraph(CGraphBase *graph) = 0;
     virtual void reportGraph(IWUGraphProgress *progress, CGraphBase *graph, bool finished) = 0;
     virtual void endGraph(CGraphBase *graph, bool success) = 0;

+ 2 - 2
thorlcr/master/thmastermain.cpp

@@ -133,7 +133,7 @@ class CRegistryServer : public CSimpleInterface
         }
     } deregistrationWatch;
 public:
-    Linked<CMasterWatchdog> watchdog;
+    Linked<CMasterWatchdogBase> watchdog;
     IBitSet *status;
 
     CRegistryServer()  : deregistrationWatch(*this), stopped(false)
@@ -142,7 +142,7 @@ public:
         msgDelay = SLAVEREG_VERIFY_DELAY;
         slavesRegistered = 0;
         if (globals->getPropBool("@watchdogEnabled"))
-            watchdog.setown(new CMasterWatchdog);
+            watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
         else
             globals->setPropBool("@watchdogProgressEnabled", false);
         CriticalBlock b(regCrit);

+ 7 - 12
thorlcr/shared/thwatchdog.hpp

@@ -23,22 +23,17 @@
 #include "thor.hpp"
 
 #define HEARTBEAT_INTERVAL      15          // seconds
-#define DATA_MAX            1024 * 8    // 8k
+#define UDP_DATA_MAX            1024 * 8    // 8k
 #define THORBEAT_INTERVAL       10*1000     // 10 sec!
 #define THORBEAT_RETRY_INTERVAL 4*60*1000   // 4 minutes
 
-struct HeartBeatPacket
-{
-    unsigned short  packetsize;                 // used as validity check must be first
-    SocketEndpoint  sender;
-    unsigned        tick;                       // sequence check
-    unsigned short  progressSize;               // size of progress data (following performamce data)
-
-    byte            perfdata[DATA_MAX]; // performance/progress data from here on
 
-    inline size32_t packetSize() { return progressSize + (sizeof(HeartBeatPacket) - sizeof(perfdata)); }
-    inline size32_t minPacketSize() { return sizeof(progressSize) + sizeof(tick) + sizeof(sender) + sizeof(packetsize); }
-    inline size32_t maxPacketSize() { return DATA_MAX + minPacketSize(); }
+struct HeartBeatPacketHeader
+{
+    size32_t packetSize;   // used as validity check must be first
+    SocketEndpoint sender;
+    unsigned tick;         // sequence check
+    size32_t progressSize; // size of progress data (following performamce data)
 };
 
 #endif

+ 1 - 1
thorlcr/slave/slavmain.cpp

@@ -137,7 +137,7 @@ public:
             querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
         Owned<ISlaveWatchdog> watchdog;
         if (globals->getPropBool("@watchdogEnabled"))
-            watchdog.setown(createProgressHandler());
+            watchdog.setown(createProgressHandler(globals->getPropBool("@useUDPWatchdog")));
 
         CMessageBuffer msg;
         stopped = false;

+ 87 - 41
thorlcr/slave/slwatchdog.cpp

@@ -29,46 +29,47 @@
 #include "slwatchdog.hpp"
 #include "thgraphslave.hpp"
 
-class CGraphProgressHandler : public CSimpleInterface, implements ISlaveWatchdog, implements IThreaded
+class CGraphProgressHandlerBase : public CSimpleInterface, implements ISlaveWatchdog, implements IThreaded
 {
     CriticalSection crit;
     CGraphArray activeGraphs;
     bool stopped, progressEnabled;
-    Owned<ISocket> sock;
     CThreaded threaded;
     SocketEndpoint self;
 
-    void sendData()
+    void gatherAndSend()
     {
-        HeartBeatPacket hbpacket;
-        gatherData(hbpacket);
-        if(hbpacket.packetsize > 0)
-        {
-            MemoryBuffer mb;
-            size32_t sz = ThorCompress(&hbpacket,hbpacket.packetsize, mb, 0x200);
-            sock->write(mb.toByteArray(), sz);
-        }
+        MemoryBuffer sendMb, progressMb;
+        HeartBeatPacketHeader hb;
+        hb.sender = self;
+        hb.tick++;
+        size32_t progressSizePos = (byte *)&hb.progressSize - (byte *)&hb;
+        sendMb.append(sizeof(HeartBeatPacketHeader), &hb);
+
+        hb.progressSize = gatherData(progressMb);
+        sendMb.writeDirect(progressSizePos, sizeof(hb.progressSize), &hb.progressSize);
+        sendMb.append(progressMb);
+        size32_t packetSize = sendMb.length();
+        sendMb.writeDirect(0, sizeof(hb.packetSize), &packetSize);
+        sendData(sendMb);
     }
+    virtual void sendData(MemoryBuffer &mb) = 0;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-    CGraphProgressHandler() : threaded("CGraphProgressHandler")
+    CGraphProgressHandlerBase() : threaded("CGraphProgressHandler")
     {
         self = queryMyNode()->endpoint();
         stopped = true;
 
-        StringBuffer ipStr;
-        queryClusterGroup().queryNode(0).endpoint().getIpText(ipStr);
-        sock.setown(ISocket::udp_connect(getFixedPort(getMasterPortBase(), TPORT_watchdog),ipStr.str()));
         progressEnabled = globals->getPropBool("@watchdogProgressEnabled");
-        sendData();                         // send initial data
         stopped = false;
 #ifdef _WIN32
         threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
 #endif
         threaded.init(this);
     }
-    ~CGraphProgressHandler()
+    ~CGraphProgressHandlerBase()
     {
         stop();
     }
@@ -82,31 +83,27 @@ public:
         LOG(MCdebugProgress, thorJob, "Stopped watchdog");
     }
 
-    void gatherData(HeartBeatPacket &hb)
+    size32_t gatherData(MemoryBuffer &mb)
     {
         CriticalBlock b(crit);
-        hb.sender = self;
-        hb.progressSize = 0;
         if (progressEnabled)
         {
-            CriticalBlock b(crit);
-            MemoryBuffer mb;
-            mb.setBuffer(DATA_MAX, hb.perfdata);
-            mb.rewrite();
-            ForEachItemIn(g, activeGraphs)
-            {
-                CGraphBase &graph = activeGraphs.item(g);
-                graph.serializeStats(mb);
-                if (mb.length() > (DATA_MAX-30))
+            MemoryBuffer progressData;
+            { CriticalBlock b(crit);
+                ForEachItemIn(g, activeGraphs)
                 {
-                    WARNLOG("Progress packet too big!");
-                    break;
+                    CGraphBase &graph = activeGraphs.item(g);
+                    graph.serializeStats(progressData);
                 }
             }
-            hb.progressSize = mb.length();
+            size32_t sz = progressData.length();
+            if (sz)
+            {
+                ThorCompress(progressData, mb, 0x200);
+                return sz;
+            }
         }
-        hb.tick++;
-        hb.packetsize = hb.packetSize();
+        return 0;
     }
 
 // ISlaveWatchdog impl.
@@ -117,15 +114,21 @@ public:
         StringBuffer str("Watchdog: Start Job ");
         LOG(MCdebugProgress, thorJob, "%s", str.append(graph.queryGraphId()).str());
     }
-    void stopGraph(CGraphBase &graph, HeartBeatPacket *hb)
+    void stopGraph(CGraphBase &graph, MemoryBuffer *mb)
     {
         CriticalBlock b(crit);
         if (NotFound != activeGraphs.find(graph))
         {
             StringBuffer str("Watchdog: Stop Job ");
             LOG(MCdebugProgress, thorJob, "%s", str.append(graph.queryGraphId()).str());
-            if (hb)
-                gatherData(*hb);
+            if (mb)
+            {
+                unsigned pos=mb->length();
+                mb->append((size32_t)0); // placeholder
+                gatherData(*mb);
+                size32_t len=(mb->length()-pos)-sizeof(size32_t);
+                mb->writeDirect(pos, sizeof(len), &len);
+            }
             activeGraphs.zap(graph);
         }
     }
@@ -134,6 +137,7 @@ public:
     void main()
     {
         LOG(MCdebugProgress, thorJob, "Watchdog: thread running");
+        gatherAndSend(); // send initial data
         assertex(HEARTBEAT_INTERVAL>=8);
         unsigned count = HEARTBEAT_INTERVAL+getRandom()%8-4;
         while (!stopped)
@@ -141,15 +145,57 @@ public:
             Sleep(1000);
             if (count--==0)
             {
-                sendData();
+                gatherAndSend();
                 count = HEARTBEAT_INTERVAL+getRandom()%8-4;         
             }
         }
     }
 };
 
-ISlaveWatchdog *createProgressHandler()
+
+class CGraphProgressUDPHandler : public CGraphProgressHandlerBase
 {
-    return new CGraphProgressHandler();
-}
+    Owned<ISocket> sock;
+public:
+    CGraphProgressUDPHandler()
+    {
+        StringBuffer ipStr;
+        queryClusterGroup().queryNode(0).endpoint().getIpText(ipStr);
+        sock.setown(ISocket::udp_connect(getFixedPort(getMasterPortBase(), TPORT_watchdog),ipStr.str()));
+    }
+    virtual void sendData(MemoryBuffer &mb)
+    {
+        HeartBeatPacketHeader hb;
+        memcpy(&hb, mb.toByteArray(), sizeof(HeartBeatPacketHeader));
+        if (hb.packetSize > UDP_DATA_MAX)
+        {
+            WARNLOG("Progress packet too big! progress lost");
+            hb.progressSize = 0;
+            hb.packetSize = sizeof(HeartBeatPacketHeader);
+        }
+        sock->write(mb.toByteArray(), mb.length());
+    }
+};
+
 
+class CGraphProgressMPHandler : public CGraphProgressHandlerBase
+{
+public:
+    CGraphProgressMPHandler()
+    {
+    }
+    virtual void sendData(MemoryBuffer &mb)
+    {
+        CMessageBuffer msg;
+        msg.swapWith(mb);
+        queryClusterComm().send(msg, 0, MPTAG_THORWATCHDOG);
+    }
+};
+
+ISlaveWatchdog *createProgressHandler(bool udp)
+{
+    if (udp)
+        return new CGraphProgressUDPHandler();
+    else
+        return new CGraphProgressMPHandler();
+}

+ 2 - 2
thorlcr/slave/slwatchdog.hpp

@@ -26,11 +26,11 @@ class CGraphBase;
 interface ISlaveWatchdog : extends IInterface
 {
     virtual void startGraph(CGraphBase &graph) = 0;
-    virtual void stopGraph(CGraphBase &graph, HeartBeatPacket *hb=NULL) = 0;
+    virtual void stopGraph(CGraphBase &graph, MemoryBuffer *mb=NULL) = 0;
     virtual void stop() = 0;
 };
-ISlaveWatchdog *createProgressHandler();
 
+ISlaveWatchdog *createProgressHandler(bool udp=false);
 
 #endif