Przeglądaj źródła

Refactor watchdog a bit and impl. MP version

Introduce MP version of watchdog mechanism, switchable via thor option.
Default to MP, keeping UDP version for now, for debugging reasons.
UDP version remains limited to UDP packet size.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 13 lat temu
rodzic
commit
8e478f28d1

+ 1 - 0
system/mp/mptag.hpp

@@ -55,6 +55,7 @@ TAGENUM
     DEFTAG ( MPTAG_THOR )
     DEFTAG ( MPTAG_THORRESOURCELOCK )
     DEFTAG ( MPTAG_MPTX )
+    DEFTAG ( MPTAG_THORWATCHDOG )
 
     // new static tags go above here
 

+ 12 - 16
thorlcr/graph/thgraphmaster.cpp

@@ -2413,23 +2413,19 @@ void CMasterGraph::getFinalProgress()
         {
             if (globals->getPropBool("@watchdogProgressEnabled"))
             {
-                HeartBeatPacket &hb = *(HeartBeatPacket *) msg.readDirect(sizeof(hb.packetsize));
-                if (hb.packetsize)
+                try
                 {
-                    size32_t sz = hb.packetsize-sizeof(hb.packetsize);
-                    if (sz)
-                    {
-                        msg.readDirect(sz);
-                        try
-                        {
-                            queryJobManager().queryDeMonServer()->takeHeartBeat(hb);
-                        }
-                        catch (IException *e)
-                        {
-                            GraphPrintLog(e, "Failure whilst deserializing stats/progress");
-                            e->Release();
-                        }
-                    }
+                    size32_t progressLen;
+                    msg.read(progressLen);
+                    MemoryBuffer progressData;
+                    progressData.setBuffer(progressLen, (void *)msg.readDirect(progressLen));
+                    const SocketEndpoint &ep = queryClusterGroup().queryNode(sender).endpoint();
+                    queryJobManager().queryDeMonServer()->takeHeartBeat(ep, progressData);
+                }
+                catch (IException *e)
+                {
+                    GraphPrintLog(e, "Failure whilst deserializing stats/progress");
+                    e->Release();
                 }
             }
         }

+ 1 - 11
thorlcr/graph/thgraphslave.cpp

@@ -666,12 +666,6 @@ void CSlaveGraph::serializeStats(MemoryBuffer &mb)
         Owned<IThorActivityIterator> iter = getTraverseIterator();
         ForEach (*iter)
         {
-            if (mb.length() > (DATA_MAX-30))
-            {
-                WARNLOG("Act: Progress packet too big!");
-                break;
-            }
-            
             CGraphElementBase &element = iter->query();
             CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
             unsigned pos = mb.length();
@@ -739,11 +733,7 @@ void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
             if (!queryOwner())
             {
                 if (globals->getPropBool("@watchdogProgressEnabled"))
-                {
-                    HeartBeatPacket hb;
-                    jobS.queryProgressHandler()->stopGraph(*this, &hb);
-                    doneInfoMb.append(hb.packetsize, &hb);
-                }
+                    jobS.queryProgressHandler()->stopGraph(*this, &doneInfoMb);
             }
             doneInfoMb.append(job.queryMaxDiskUsage());
             queryJob().queryTimeReporter().serialize(doneInfoMb);

+ 160 - 78
thorlcr/master/mawatchdog.cpp

@@ -42,63 +42,63 @@ public:
     SocketEndpoint ep;
     bool alive;
     bool markdead;
-    HeartBeatPacket lastpacket;
     CMachineStatus(const SocketEndpoint &_ep)
         : ep(_ep)
     {
         alive = true;
         markdead = false;
-        memset(&lastpacket,0,sizeof(lastpacket));
     }
-    void update(HeartBeatPacket &packet)
+    void update(HeartBeatPacketHeader &packet)
     {
         alive = true;
-        if (markdead) {
+        if (markdead)
+        {
             markdead = false;
             StringBuffer epstr;
             ep.getUrlStr(epstr);
             LOG(MCdebugProgress, unknownJob, "Watchdog : Marking Machine as Up! [%s]", epstr.str());
         }
-        if(packet.progressSize > 0)
-            lastpacket = packet;
     }   
 };
 
 
-CMasterWatchdog::CMasterWatchdog() : threaded("CMasterWatchdog")
+CMasterWatchdogBase::CMasterWatchdogBase() : threaded("CMasterWatchdogBase")
 {
-    stopped = false;
-    sock = NULL;
-    if (globals->getPropBool("@watchdogEnabled"))
-    {
-        if (!sock)
-            sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
-        LOG(MCdebugProgress, unknownJob, "Starting watchdog");
+    stopped = true;
+    watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT);
+    if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10)
+        watchdogMachineTimeout = HEARTBEAT_INTERVAL*10;
+    watchdogMachineTimeout *= 1000;
 #ifdef _WIN32
-        threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
+    threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
 #endif
-        threaded.init(this);
-    }
 }
 
-CMasterWatchdog::~CMasterWatchdog()
+CMasterWatchdogBase::~CMasterWatchdogBase()
 {
     stop();
-    ::Release(sock);
-    ForEachItemInRev(i, state) {
+    ForEachItemInRev(i, state)
+    {
         CMachineStatus *mstate=(CMachineStatus *)state.item(i);
         delete mstate;
     }
 }
 
-void CMasterWatchdog::addSlave(const SocketEndpoint &slave)
+void CMasterWatchdogBase::start()
+{
+    PROGLOG("Starting watchdog");
+    stopped = false;
+    threaded.init(this);
+}
+
+void CMasterWatchdogBase::addSlave(const SocketEndpoint &slave)
 {
     synchronized block(mutex);
     CMachineStatus *mstate=new CMachineStatus(slave);
     state.append(mstate);
 }
 
-void CMasterWatchdog::removeSlave(const SocketEndpoint &slave)
+void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave)
 {
     synchronized block(mutex);
     CMachineStatus *ms = findSlave(slave);
@@ -108,9 +108,10 @@ void CMasterWatchdog::removeSlave(const SocketEndpoint &slave)
     }
 }
 
-CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep)
+CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep)
 {
-    ForEachItemInRev(i, state) {
+    ForEachItemInRev(i, state)
+    {
         CMachineStatus *mstate=(CMachineStatus *)state.item(i);
         if (mstate->ep.equals(ep))
             return mstate;
@@ -119,7 +120,7 @@ CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep)
 }
 
 
-void CMasterWatchdog::stop()
+void CMasterWatchdogBase::stop()
 {
     threaded.adjustPriority(0); // restore to normal before stopping
     { synchronized block(mutex);
@@ -128,27 +129,16 @@ void CMasterWatchdog::stop()
         LOG(MCdebugProgress, unknownJob, "Stopping watchdog");
         stopped = true;
     }
-    if (sock)
-    {
-        SocketEndpoint masterEp(getMasterPortBase());
-        StringBuffer ipStr;
-        masterEp.getIpText(ipStr);
-        Owned<ISocket> sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str());
-        HeartBeatPacket hbpacket;
-        memset(&hbpacket, 0, sizeof(hbpacket));
-        MemoryBuffer mb;
-        size32_t sz = ThorCompress(&hbpacket, hbpacket.packetSize(), mb);
-        sock->write(mb.toByteArray(), sz);
-        sock->close();
-    }
+    stopReading();
     threaded.join();
     LOG(MCdebugProgress, unknownJob, "Stopped watchdog");
 }
 
-void CMasterWatchdog::checkMachineStatus()
+void CMasterWatchdogBase::checkMachineStatus()
 {
     synchronized block(mutex);
-    ForEachItemInRev(i, state) {
+    ForEachItemInRev(i, state)
+    {
         CMachineStatus *mstate=(CMachineStatus *)state.item(i);
         if (!mstate->alive)
         {
@@ -167,67 +157,78 @@ void CMasterWatchdog::checkMachineStatus()
             mstate->alive = false;
         }
     }
+}
 
+unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb)
+{
+    mb.clear();
+    unsigned read = readData(mb);
+    if (read)
+    {
+        if (read < sizeof(HeartBeatPacketHeader))
+        {
+            WARNLOG("Receive Monitor Packet: wrong size, got %d, less than HeartBeatPacketHeader size", read);
+            return 0;
+        }
+        memcpy(&hb, mb.readDirect(sizeof(HeartBeatPacketHeader)), sizeof(HeartBeatPacketHeader));
+        if (read != hb.packetSize)  // check for corrupt packets
+        {
+            WARNLOG("Receive Monitor Packet: wrong size, expected %d, got %d", hb.packetSize, read);
+            return 0;
+        }
+        mb.setLength(hb.packetSize);
+        return hb.packetSize;
+    }
+    else
+        mb.clear();
+    return 0;
 }
 
-void CMasterWatchdog::main()
+void CMasterWatchdogBase::main()
 {
     LOG(MCdebugProgress, unknownJob, "Started watchdog");
     unsigned lastbeat=msTick();
     unsigned lastcheck=lastbeat;
 
-    unsigned watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT);
-    if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10)
-        watchdogMachineTimeout = HEARTBEAT_INTERVAL*10;
-    watchdogMachineTimeout *= 1000;
     retrycount = 0;
-    try {
-        while (!stopped) {
-            HeartBeatPacket hbpacket;
-            try {
-                size32_t read;
-                MemoryBuffer packetCompressedMb;
-                sock->readtms(packetCompressedMb.reserveTruncate(hbpacket.maxPacketSize()), hbpacket.minPacketSize(), hbpacket.maxPacketSize(), read, watchdogMachineTimeout);
-                MemoryBuffer packetMb;
-                read = ThorExpand(packetCompressedMb.toByteArray(), read, &hbpacket, hbpacket.maxPacketSize());
-                if (0==hbpacket.packetsize)
-                    break; // signal to stop
-                if(read > hbpacket.minPacketSize() && read == hbpacket.packetsize)  // check for corrupt packets
+    try
+    {
+        while (!stopped)
+        {
+            HeartBeatPacketHeader hb;
+            MemoryBuffer progressData;
+            unsigned sz = readPacket(hb, progressData);
+            if (stopped)
+                break;
+            else if (sz)
+            {
+                synchronized block(mutex);
+                CMachineStatus *ms = findSlave(hb.sender);
+                if (ms)
                 {
-                    synchronized block(mutex);
-                    CMachineStatus *ms = findSlave(hbpacket.sender);
-                    if (ms) 
+                    ms->update(hb);
+                    if (progressData.remaining())
                     {
-                        ms->update(hbpacket);
                         Owned<IJobManager> jobManager = getJobManager();
                         if (jobManager)
-                            jobManager->queryDeMonServer()->takeHeartBeat(hbpacket);
-                    }
-                    else {
-                        StringBuffer epstr;
-                        hbpacket.sender.getUrlStr(epstr);
-                        LOG(MCdebugProgress, unknownJob, "Watchdog : Unknown Machine! [%s]", epstr.str()); //TBD
+                            jobManager->queryDeMonServer()->takeHeartBeat(hb.sender, progressData);
                     }
                 }
                 else
                 {
-                    LOG(MCdebugProgress, unknownJob, "Receive Monitor Packet: wrong size, expected %d, got %d", hbpacket.packetsize, read);
+                    StringBuffer epstr;
+                    hb.sender.getUrlStr(epstr);
+                    LOG(MCdebugProgress, unknownJob, "Watchdog : Unknown Machine! [%s]", epstr.str()); //TBD
                 }
             }
-            catch (IJSOCK_Exception *e)
-            {
-                if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close)) 
-                    throw;
-                e->Release();
-            }
-            if (stopped)
-                break;
             unsigned now=msTick();
-            if (now-lastcheck>watchdogMachineTimeout) {
+            if (now-lastcheck>watchdogMachineTimeout)
+            {
                 checkMachineStatus();
                 lastcheck = msTick();
             }
-            if (now-lastbeat>THORBEAT_INTERVAL) {
+            if (now-lastbeat>THORBEAT_INTERVAL)
+            {
                 if (retrycount<=0) retrycount=THORBEAT_RETRY_INTERVAL; else retrycount -= THORBEAT_INTERVAL;
                 lastbeat = msTick();
             }
@@ -240,3 +241,84 @@ void CMasterWatchdog::main()
     }
 }
 
+
+class CMasterWatchdogUDP : public CMasterWatchdogBase
+{
+    ISocket *sock;
+public:
+    CMasterWatchdogUDP()
+    {
+        sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
+        start();
+    }
+    ~CMasterWatchdogUDP()
+    {
+        ::Release(sock);
+    }
+    virtual unsigned readData(MemoryBuffer &mb)
+    {
+        size32_t read;
+        try
+        {
+            sock->readtms(mb.reserveTruncate(UDP_DATA_MAX), sizeof(HeartBeatPacketHeader), UDP_DATA_MAX, read, watchdogMachineTimeout);
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
+                throw;
+            e->Release();
+            return 0; // will retry
+        }
+        return read;
+    }
+    virtual void stopReading()
+    {
+        if (sock)
+        {
+            SocketEndpoint masterEp(getMasterPortBase());
+            StringBuffer ipStr;
+            masterEp.getIpText(ipStr);
+            Owned<ISocket> sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str());
+            // send empty packet, stopped set, will cease reading
+            HeartBeatPacketHeader hb;
+            memset(&hb, 0, sizeof(hb));
+            hb.packetSize = sizeof(HeartBeatPacketHeader);
+            sock->write(&hb, sizeof(HeartBeatPacketHeader));
+            sock->close();
+        }
+    }
+};
+
+/////////////////////
+
+class CMasterWatchdogMP : public CMasterWatchdogBase
+{
+public:
+    CMasterWatchdogMP()
+    {
+        start();
+    }
+    virtual unsigned readData(MemoryBuffer &mb)
+    {
+        CMessageBuffer msg;
+        rank_t sender;
+        if (!queryClusterComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, &sender, watchdogMachineTimeout))
+            return 0;
+        mb.swapWith(msg);
+        return mb.length();
+    }
+    virtual void stopReading()
+    {
+        queryClusterComm().cancel(0, MPTAG_THORWATCHDOG);
+    }
+};
+
+/////////////////////
+
+CMasterWatchdogBase *createMasterWatchdog(bool udp)
+{
+    if (udp)
+        return new CMasterWatchdogUDP();
+    else
+        return new CMasterWatchdogMP();
+}

+ 18 - 11
thorlcr/master/mawatchdog.hpp

@@ -25,28 +25,35 @@
 #include "jmutex.hpp"
 
 class CMachineStatus;
-struct HeartBeatPacket;
+struct HeartBeatPacketHeader;
 
-class CMasterWatchdog : public CSimpleInterface, implements IThreaded
+class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded
 {
+    PointerArray state;
+    SocketEndpoint master;
+    Mutex mutex;
+    int retrycount;
     CThreaded threaded;
+protected:
+    bool stopped;
+    unsigned watchdogMachineTimeout;
 public:
-    CMasterWatchdog();
-    ~CMasterWatchdog();
+    CMasterWatchdogBase();
+    ~CMasterWatchdogBase();
     void addSlave(const SocketEndpoint &slave);
     void removeSlave(const SocketEndpoint &slave);
     CMachineStatus *findSlave(const SocketEndpoint &ep);
     void checkMachineStatus();
+    unsigned readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb);
+    void start();
     void stop();
     void main();
-private:
-    PointerArray state;
-    SocketEndpoint master;
-    ISocket *sock;
-    Mutex mutex;
-    bool stopped;
-    int retrycount;
+
+    virtual unsigned readData(MemoryBuffer &mb) = 0;
+    virtual void stopReading() = 0;
 };
 
+CMasterWatchdogBase *createMasterWatchdog(bool udp=false);
+
 #endif
 

+ 24 - 23
thorlcr/master/thdemonserver.cpp

@@ -25,6 +25,7 @@
 
 #include "thormisc.hpp"
 #include "thorport.hpp"
+#include "thcompressutil.hpp"
 #include "thgraphmaster.ipp"
 #include "thgraphmanager.hpp"
 #include "thwatchdog.hpp"
@@ -159,28 +160,27 @@ public:
         reportRate = globals->getPropInt("@watchdogProgressInterval", 30);
     }
 
-    virtual void takeHeartBeat(HeartBeatPacket & hbpacket)
+    virtual void takeHeartBeat(const SocketEndpoint &sender, MemoryBuffer &progressMb)
     {
         synchronized block(mutex);
-        
-        if((hbpacket.packetsize>sizeof(hbpacket.packetsize))&&(hbpacket.progressSize > 0))
+        if (0 == activeGraphs.ordinality())
         {
-            if (0 == activeGraphs.ordinality())
-            {
-                StringBuffer urlStr;
-                LOG(MCdebugProgress, unknownJob, "heartbeat packet received with no active graphs, from=%s", hbpacket.sender.getUrlStr(urlStr).str());
-                return;
-            }
-            rank_t node = querySlaveGroup().rank(hbpacket.sender);
-            assertex(node != RANK_NULL);
-
-            MemoryBuffer statsMb;
-            statsMb.setBuffer(hbpacket.progressSize, hbpacket.perfdata);
+            StringBuffer urlStr;
+            LOG(MCdebugProgress, unknownJob, "heartbeat packet received with no active graphs, from=%s", sender.getUrlStr(urlStr).str());
+            return;
+        }
+        rank_t node = querySlaveGroup().rank(sender);
+        assertex(node != RANK_NULL);
 
-            while (statsMb.remaining())
+        size32_t compressedProgressSz = progressMb.remaining();
+        if (compressedProgressSz)
+        {
+            MemoryBuffer uncompressedMb;
+            ThorExpand(progressMb.readDirect(compressedProgressSz), compressedProgressSz, uncompressedMb);
+            do
             {
                 graph_id graphId;
-                statsMb.read(graphId);
+                uncompressedMb.read(graphId);
                 CMasterGraph *graph = NULL;
                 ForEachItemIn(g, activeGraphs) if (activeGraphs.item(g).queryGraphId() == graphId) graph = (CMasterGraph *)&activeGraphs.item(g);
                 if (!graph)
@@ -188,18 +188,19 @@ public:
                     LOG(MCdebugProgress, unknownJob, "heartbeat received from unknown graph %"GIDPF"d", graphId);
                     break;
                 }
-                if (!graph->deserializeStats(node, statsMb))
+                if (!graph->deserializeStats(node, uncompressedMb))
                 {
                     LOG(MCdebugProgress, unknownJob, "heartbeat error in graph %"GIDPF"d", graphId);
                     break;
                 }
             }
-            unsigned now=msTick();
-            if (now-lastReport > 1000*reportRate) 
-            {
-                reportGraph(false);
-                lastReport = msTick();
-            }
+            while (uncompressedMb.remaining());
+        }
+        unsigned now=msTick();
+        if (now-lastReport > 1000*reportRate)
+        {
+            reportGraph(false);
+            lastReport = msTick();
         }
     }
     void startGraph(CGraphBase *graph)

+ 1 - 1
thorlcr/master/thdemonserver.hpp

@@ -27,7 +27,7 @@ interface IWUGraphProgress;
 class CGraphBase;
 interface IDeMonServer : extends IInterface
 {
-    virtual void takeHeartBeat(HeartBeatPacket & hbpacket) = 0;
+    virtual void takeHeartBeat(const SocketEndpoint &sender, MemoryBuffer &progressMbb) = 0;
     virtual void startGraph(CGraphBase *graph) = 0;
     virtual void reportGraph(IWUGraphProgress *progress, CGraphBase *graph, bool finished) = 0;
     virtual void endGraph(CGraphBase *graph, bool success) = 0;

+ 2 - 2
thorlcr/master/thmastermain.cpp

@@ -133,7 +133,7 @@ class CRegistryServer : public CSimpleInterface
         }
     } deregistrationWatch;
 public:
-    Linked<CMasterWatchdog> watchdog;
+    Linked<CMasterWatchdogBase> watchdog;
     IBitSet *status;
 
     CRegistryServer()  : deregistrationWatch(*this), stopped(false)
@@ -142,7 +142,7 @@ public:
         msgDelay = SLAVEREG_VERIFY_DELAY;
         slavesRegistered = 0;
         if (globals->getPropBool("@watchdogEnabled"))
-            watchdog.setown(new CMasterWatchdog);
+            watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
         else
             globals->setPropBool("@watchdogProgressEnabled", false);
         CriticalBlock b(regCrit);

+ 7 - 12
thorlcr/shared/thwatchdog.hpp

@@ -23,22 +23,17 @@
 #include "thor.hpp"
 
 #define HEARTBEAT_INTERVAL      15          // seconds
-#define DATA_MAX            1024 * 8    // 8k
+#define UDP_DATA_MAX            1024 * 8    // 8k
 #define THORBEAT_INTERVAL       10*1000     // 10 sec!
 #define THORBEAT_RETRY_INTERVAL 4*60*1000   // 4 minutes
 
-struct HeartBeatPacket
-{
-    unsigned short  packetsize;                 // used as validity check must be first
-    SocketEndpoint  sender;
-    unsigned        tick;                       // sequence check
-    unsigned short  progressSize;               // size of progress data (following performamce data)
-
-    byte            perfdata[DATA_MAX]; // performance/progress data from here on
 
-    inline size32_t packetSize() { return progressSize + (sizeof(HeartBeatPacket) - sizeof(perfdata)); }
-    inline size32_t minPacketSize() { return sizeof(progressSize) + sizeof(tick) + sizeof(sender) + sizeof(packetsize); }
-    inline size32_t maxPacketSize() { return DATA_MAX + minPacketSize(); }
+struct HeartBeatPacketHeader
+{
+    size32_t packetSize;   // used as validity check must be first
+    SocketEndpoint sender;
+    unsigned tick;         // sequence check
+    size32_t progressSize; // size of progress data (following performamce data)
 };
 
 #endif

+ 1 - 1
thorlcr/slave/slavmain.cpp

@@ -137,7 +137,7 @@ public:
             querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
         Owned<ISlaveWatchdog> watchdog;
         if (globals->getPropBool("@watchdogEnabled"))
-            watchdog.setown(createProgressHandler());
+            watchdog.setown(createProgressHandler(globals->getPropBool("@useUDPWatchdog")));
 
         CMessageBuffer msg;
         stopped = false;

+ 87 - 41
thorlcr/slave/slwatchdog.cpp

@@ -29,46 +29,47 @@
 #include "slwatchdog.hpp"
 #include "thgraphslave.hpp"
 
-class CGraphProgressHandler : public CSimpleInterface, implements ISlaveWatchdog, implements IThreaded
+class CGraphProgressHandlerBase : public CSimpleInterface, implements ISlaveWatchdog, implements IThreaded
 {
     CriticalSection crit;
     CGraphArray activeGraphs;
     bool stopped, progressEnabled;
-    Owned<ISocket> sock;
     CThreaded threaded;
     SocketEndpoint self;
 
-    void sendData()
+    void gatherAndSend()
     {
-        HeartBeatPacket hbpacket;
-        gatherData(hbpacket);
-        if(hbpacket.packetsize > 0)
-        {
-            MemoryBuffer mb;
-            size32_t sz = ThorCompress(&hbpacket,hbpacket.packetsize, mb, 0x200);
-            sock->write(mb.toByteArray(), sz);
-        }
+        MemoryBuffer sendMb, progressMb;
+        HeartBeatPacketHeader hb;
+        hb.sender = self;
+        hb.tick++;
+        size32_t progressSizePos = (byte *)&hb.progressSize - (byte *)&hb;
+        sendMb.append(sizeof(HeartBeatPacketHeader), &hb);
+
+        hb.progressSize = gatherData(progressMb);
+        sendMb.writeDirect(progressSizePos, sizeof(hb.progressSize), &hb.progressSize);
+        sendMb.append(progressMb);
+        size32_t packetSize = sendMb.length();
+        sendMb.writeDirect(0, sizeof(hb.packetSize), &packetSize);
+        sendData(sendMb);
     }
+    virtual void sendData(MemoryBuffer &mb) = 0;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
-    CGraphProgressHandler() : threaded("CGraphProgressHandler")
+    CGraphProgressHandlerBase() : threaded("CGraphProgressHandler")
     {
         self = queryMyNode()->endpoint();
         stopped = true;
 
-        StringBuffer ipStr;
-        queryClusterGroup().queryNode(0).endpoint().getIpText(ipStr);
-        sock.setown(ISocket::udp_connect(getFixedPort(getMasterPortBase(), TPORT_watchdog),ipStr.str()));
         progressEnabled = globals->getPropBool("@watchdogProgressEnabled");
-        sendData();                         // send initial data
         stopped = false;
 #ifdef _WIN32
         threaded.adjustPriority(+1); // it is critical that watchdog packets get through.
 #endif
         threaded.init(this);
     }
-    ~CGraphProgressHandler()
+    ~CGraphProgressHandlerBase()
     {
         stop();
     }
@@ -82,31 +83,27 @@ public:
         LOG(MCdebugProgress, thorJob, "Stopped watchdog");
     }
 
-    void gatherData(HeartBeatPacket &hb)
+    size32_t gatherData(MemoryBuffer &mb)
     {
         CriticalBlock b(crit);
-        hb.sender = self;
-        hb.progressSize = 0;
         if (progressEnabled)
         {
-            CriticalBlock b(crit);
-            MemoryBuffer mb;
-            mb.setBuffer(DATA_MAX, hb.perfdata);
-            mb.rewrite();
-            ForEachItemIn(g, activeGraphs)
-            {
-                CGraphBase &graph = activeGraphs.item(g);
-                graph.serializeStats(mb);
-                if (mb.length() > (DATA_MAX-30))
+            MemoryBuffer progressData;
+            { CriticalBlock b(crit);
+                ForEachItemIn(g, activeGraphs)
                 {
-                    WARNLOG("Progress packet too big!");
-                    break;
+                    CGraphBase &graph = activeGraphs.item(g);
+                    graph.serializeStats(progressData);
                 }
             }
-            hb.progressSize = mb.length();
+            size32_t sz = progressData.length();
+            if (sz)
+            {
+                ThorCompress(progressData, mb, 0x200);
+                return sz;
+            }
         }
-        hb.tick++;
-        hb.packetsize = hb.packetSize();
+        return 0;
     }
 
 // ISlaveWatchdog impl.
@@ -117,15 +114,21 @@ public:
         StringBuffer str("Watchdog: Start Job ");
         LOG(MCdebugProgress, thorJob, "%s", str.append(graph.queryGraphId()).str());
     }
-    void stopGraph(CGraphBase &graph, HeartBeatPacket *hb)
+    void stopGraph(CGraphBase &graph, MemoryBuffer *mb)
     {
         CriticalBlock b(crit);
         if (NotFound != activeGraphs.find(graph))
         {
             StringBuffer str("Watchdog: Stop Job ");
             LOG(MCdebugProgress, thorJob, "%s", str.append(graph.queryGraphId()).str());
-            if (hb)
-                gatherData(*hb);
+            if (mb)
+            {
+                unsigned pos=mb->length();
+                mb->append((size32_t)0); // placeholder
+                gatherData(*mb);
+                size32_t len=(mb->length()-pos)-sizeof(size32_t);
+                mb->writeDirect(pos, sizeof(len), &len);
+            }
             activeGraphs.zap(graph);
         }
     }
@@ -134,6 +137,7 @@ public:
     void main()
     {
         LOG(MCdebugProgress, thorJob, "Watchdog: thread running");
+        gatherAndSend(); // send initial data
         assertex(HEARTBEAT_INTERVAL>=8);
         unsigned count = HEARTBEAT_INTERVAL+getRandom()%8-4;
         while (!stopped)
@@ -141,15 +145,57 @@ public:
             Sleep(1000);
             if (count--==0)
             {
-                sendData();
+                gatherAndSend();
                 count = HEARTBEAT_INTERVAL+getRandom()%8-4;         
             }
         }
     }
 };
 
-ISlaveWatchdog *createProgressHandler()
+
+class CGraphProgressUDPHandler : public CGraphProgressHandlerBase
 {
-    return new CGraphProgressHandler();
-}
+    Owned<ISocket> sock;
+public:
+    CGraphProgressUDPHandler()
+    {
+        StringBuffer ipStr;
+        queryClusterGroup().queryNode(0).endpoint().getIpText(ipStr);
+        sock.setown(ISocket::udp_connect(getFixedPort(getMasterPortBase(), TPORT_watchdog),ipStr.str()));
+    }
+    virtual void sendData(MemoryBuffer &mb)
+    {
+        HeartBeatPacketHeader hb;
+        memcpy(&hb, mb.toByteArray(), sizeof(HeartBeatPacketHeader));
+        if (hb.packetSize > UDP_DATA_MAX)
+        {
+            WARNLOG("Progress packet too big! progress lost");
+            hb.progressSize = 0;
+            hb.packetSize = sizeof(HeartBeatPacketHeader);
+        }
+        sock->write(mb.toByteArray(), mb.length());
+    }
+};
+
 
+class CGraphProgressMPHandler : public CGraphProgressHandlerBase
+{
+public:
+    CGraphProgressMPHandler()
+    {
+    }
+    virtual void sendData(MemoryBuffer &mb)
+    {
+        CMessageBuffer msg;
+        msg.swapWith(mb);
+        queryClusterComm().send(msg, 0, MPTAG_THORWATCHDOG);
+    }
+};
+
+ISlaveWatchdog *createProgressHandler(bool udp)
+{
+    if (udp)
+        return new CGraphProgressUDPHandler();
+    else
+        return new CGraphProgressMPHandler();
+}

+ 2 - 2
thorlcr/slave/slwatchdog.hpp

@@ -26,11 +26,11 @@ class CGraphBase;
 interface ISlaveWatchdog : extends IInterface
 {
     virtual void startGraph(CGraphBase &graph) = 0;
-    virtual void stopGraph(CGraphBase &graph, HeartBeatPacket *hb=NULL) = 0;
+    virtual void stopGraph(CGraphBase &graph, MemoryBuffer *mb=NULL) = 0;
     virtual void stop() = 0;
 };
-ISlaveWatchdog *createProgressHandler();
 
+ISlaveWatchdog *createProgressHandler(bool udp=false);
 
 #endif