Explorar o código

HPCC-24790 Roxie IBYTI handling reworked for dynamic topology

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=4) %!d(string=hai) anos
pai
achega
5dae4f4cf6

+ 43 - 11
roxie/ccd/ccd.hpp

@@ -51,9 +51,12 @@
 
 #define ROXIE_STATEFILE_VERSION 2
 
+#ifdef _CONTAINERIZED
+#define SUBCHANNELS_IN_HEADER
+#endif
+
 extern IException *MakeRoxieException(int code, const char *format, ...) __attribute__((format(printf, 2, 3)));
 void openMulticastSocket();
-extern size32_t channelWrite(unsigned channel, void const* buf, size32_t size);
 
 void setMulticastEndpoints(unsigned numChannels);
 
@@ -142,19 +145,23 @@ private:
     RoxiePacketHeader(const RoxiePacketHeader &source) =  delete;
 
 public:
-    unsigned packetlength;
-    unsigned short retries;         // how many retries on this query, the high bits are used as flags, see above
-    unsigned short overflowSequence;// Used if more than one packet-worth of data from server - eg keyed join. We don't mind if we wrap...
-    unsigned short continueSequence;// Used if more than one chunk-worth of data from agent. We don't mind if we wrap
-    unsigned short channel;         // multicast family to send on
-    unsigned activityId;            // identifies the helper factory to be used (activityId in graph)
-    hash64_t queryHash;             // identifies the query
-
-    ruid_t uid;                     // unique id
+    unsigned packetlength = 0;
+    unsigned short retries = 0;         // how many retries on this query, the high bits are used as flags, see above
+    unsigned short overflowSequence = 0;// Used if more than one packet-worth of data from server - eg keyed join. We don't mind if we wrap...
+    unsigned short continueSequence = 0;// Used if more than one chunk-worth of data from agent. We don't mind if we wrap
+    unsigned short channel = 0;         // multicast family to send on
+    unsigned activityId = 0;            // identifies the helper factory to be used (activityId in graph)
+    hash64_t queryHash = 0;             // identifies the query
+
+    ruid_t uid = 0;                     // unique id
     ServerIdentifier serverId;
+#ifdef SUBCHANNELS_IN_HEADER
+    ServerIdentifier subChannels[MAX_SUBCHANNEL];
+#endif
 #ifdef TIME_PACKETS
-    unsigned tick;
+    unsigned tick = 0;
 #endif
+    RoxiePacketHeader() = default;
 
     RoxiePacketHeader(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
     RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _activityId, unsigned _subChannel);
@@ -176,6 +183,30 @@ public:
         return bitpos / SUBCHANNEL_BITS;
     }
 
+#ifdef SUBCHANNELS_IN_HEADER
+    unsigned mySubChannel() const // NOTE - 0 based
+    {
+        for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
+        {
+            if (subChannels[idx].isMe())
+                return idx;
+        }
+        throwUnexpected();
+    }
+
+    bool hasBuddies() const
+    {
+        if (subChannels[1].isNull())
+        {
+            assert(subChannels[0].isMe());
+            return false;
+        }
+        return true;
+    }
+
+    void clearSubChannels();
+#endif
+
     inline unsigned getSequenceId() const
     {
         return (((unsigned) overflowSequence) << 16) | (unsigned) continueSequence;
@@ -271,6 +302,7 @@ extern bool prestartAgentThreads;
 extern unsigned preabortKeyedJoinsThreshold;
 extern unsigned preabortIndexReadsThreshold;
 extern bool traceStartStop;
+extern bool traceRoxiePackets;
 extern bool traceServerSideCache;
 extern bool traceTranslations;
 extern bool defaultTimeActivities;

+ 2 - 0
roxie/ccd/ccdmain.cpp

@@ -81,6 +81,7 @@ unsigned maxBlockSize = 10000000;
 unsigned maxLockAttempts = 5;
 bool pretendAllOpt = false;
 bool traceStartStop = false;
+bool traceRoxiePackets = false;
 bool traceServerSideCache = false;
 bool defaultTimeActivities = true;
 bool defaultTraceEnabled = false;
@@ -1023,6 +1024,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         traceStartStop = topology->getPropBool("@traceStartStop", false);
         watchActivityId = topology->getPropInt("@watchActivityId", 0);
         traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
+        traceRoxiePackets = topology->getPropBool("@traceRoxiePackets", false);
         traceTranslations = topology->getPropBool("@traceTranslations", true);
         defaultTimeActivities = topology->getPropBool("@timeActivities", true);
         defaultTraceEnabled = topology->getPropBool("@traceEnabled", false);

+ 2 - 0
roxie/ccd/ccdquery.cpp

@@ -1209,6 +1209,8 @@ public:
 
     void addToMap()
     {
+        if (traceRoxiePackets)
+            DBGLOG("addToMap %s: hashvalue = %" I64F "x channel %d", id.str(), hashValue, channelNo);
         hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
         CriticalBlock b(activeQueriesCrit);
         activeQueries.setValue(hv, this);

+ 213 - 48
roxie/ccd/ccdqueue.cpp

@@ -66,6 +66,9 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _
 #ifdef TIME_PACKETS
     tick = source.tick;
 #endif
+#ifdef SUBCHANNELS_IN_HEADER
+    memcpy(subChannels, source.subChannels, sizeof(subChannels));
+#endif
     packetlength = sizeof(RoxiePacketHeader);
 }
 
@@ -112,14 +115,26 @@ void RoxiePacketHeader::init(const RemoteActivityId &_remoteId, ruid_t _uid, uns
     channel = _channel;
     overflowSequence = _overflowSequence;
     continueSequence = 0;
+#ifdef SUBCHANNELS_IN_HEADER
+    clearSubChannels();
+#endif
+}
+
+#ifdef SUBCHANNELS_IN_HEADER
+void RoxiePacketHeader::clearSubChannels()
+{
+    for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
+        subChannels[idx].clear();
 }
+#endif
 
 StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
 {
     const IpAddress serverIP = serverId.getIpAddress();
-    ret.appendf("uid=" RUIDF " activityId=", uid);
+    ret.append("activityId=");
     switch(activityId & ~ROXIE_PRIORITY_MASK)
     {
+    case 0: ret.append("IBYTI"); break;
     case ROXIE_UNLOAD: ret.append("ROXIE_UNLOAD"); break;
     case ROXIE_PING: ret.append("ROXIE_PING"); break;
     case ROXIE_TRACEINFO: ret.append("ROXIE_TRACEINFO"); break;
@@ -136,7 +151,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
             ret.appendf(" (fetch part)");
         break;
     }
-    ret.append(" pri=");
+    ret.appendf(" uid=" RUIDF " pri=", uid);
     switch(activityId & ROXIE_PRIORITY_MASK)
     {
         case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
@@ -160,6 +175,21 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
                 ret.appendf(" BROADCAST");
         }
     }
+#ifdef SUBCHANNELS_IN_HEADER
+    ret.append(" subchannels=");
+    for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
+    {
+        if (subChannels[idx].isNull())
+            break;
+        if (idx)
+            ret.append(',');
+        subChannels[idx].getTraceText(ret);
+        if (subChannels[idx].isMe())
+        {
+            ret.append("(me)");
+        }
+    }
+#endif
     return ret;
 }
 
@@ -294,27 +324,99 @@ void closeMulticastSockets()
     multicastSocket.clear();
 }
 
-size32_t channelWrite(unsigned channel, void const* buf, size32_t size)
+static bool channelWrite(RoxiePacketHeader &buf, bool includeSelf)
 {
     size32_t minwrote = 0;
     if (roxieMulticastEnabled)
     {
-        return multicastSocket->udp_write_to(multicastEndpoints.item(channel), buf, size);
+        return multicastSocket->udp_write_to(multicastEndpoints.item(buf.channel), &buf, buf.packetlength) == buf.packetlength;
     }
     else
     {
+#ifdef SUBCHANNELS_IN_HEADER
+        // In the containerized system, the list of subchannel IPs is captured in the packet header to ensure everyone is using the
+        // same snapshot of the topology state.
+        // If the subchannel IPs are not set, fill them in now. If they are set, use them.
+        if (buf.subChannels[0].isNull())
+        {
+            Owned<const ITopologyServer> topo = getTopology();
+            const SocketEndpointArray &eps = topo->queryAgents(buf.channel);
+            if (!eps.ordinality())
+                throw makeStringExceptionV(0, "No agents available for channel %d", buf.channel);
+            if (buf.channel==0)
+            {
+                // Note that we expand any writes on channel 0 here, since we need to capture the server's view of what agents are on each channel
+                bool allOk = true;
+                if (traceRoxiePackets)
+                {
+                    StringBuffer header;
+                    DBGLOG("Translating packet sent to channel 0: %s", buf.toString(header).str());
+                }
+                for (unsigned channel = 0; channel < numChannels; channel++)
+                {
+                    buf.channel = channel+1;
+                    if (!channelWrite(buf, true))
+                        allOk = false;
+                    buf.clearSubChannels();
+                }
+                buf.channel = 0;
+                return allOk;
+            }
+
+            unsigned hdrHashVal = buf.priorityHash();
+            unsigned numAgents = eps.ordinality();
+            unsigned subChannel = (hdrHashVal % numAgents);
+
+            for (unsigned idx = 0; idx < MAX_SUBCHANNEL; idx++)
+            {
+                if (idx == numAgents)
+                    break;
+                buf.subChannels[idx].setIp(eps.item(subChannel));
+                subChannel++;
+                if (subChannel == numAgents)
+                    subChannel = 0;
+            }
+        }
+        else
+        {
+            assert(buf.channel != 0);
+        }
+        for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
+        {
+            if (buf.subChannels[subChannel].isNull())
+                break;
+            if (includeSelf || !buf.subChannels[subChannel].isMe())
+            {
+                if (traceRoxiePackets)
+                {
+                    StringBuffer s, header;
+                    DBGLOG("Writing %d bytes to subchannel %d (%s) %s", buf.packetlength, subChannel, buf.subChannels[subChannel].getTraceText(s).str(), buf.toString(header).str());
+                }
+                SocketEndpoint ep(ccdMulticastPort, buf.subChannels[subChannel].getIpAddress());
+                size32_t wrote = multicastSocket->udp_write_to(ep, &buf, buf.packetlength);
+                if (!subChannel || wrote < minwrote)
+                    minwrote = wrote;
+            }
+            else if (traceRoxiePackets)
+            {
+                StringBuffer s, header;
+                DBGLOG("NOT writing %d bytes to subchannel %d (%s) %s", buf.packetlength, subChannel, buf.subChannels[subChannel].getTraceText(s).str(), buf.toString(header).str());
+            }
+        }
+#else
         Owned<const ITopologyServer> topo = getTopology();
-        const SocketEndpointArray &eps = topo->queryAgents(channel);
+        const SocketEndpointArray &eps = topo->queryAgents(buf.channel);
         if (!eps.ordinality())
-            throw makeStringExceptionV(0, "No agents available for channel %d", channel);
+            throw makeStringExceptionV(0, "No agents available for channel %d", buf.channel);
         ForEachItemIn(idx, eps)
         {
-            size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), buf, size);
+            size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), &buf, buf.packetlength);
             if (!idx || wrote < minwrote)
                 minwrote = wrote;
         }
+#endif
     }
-    return minwrote;
+    return minwrote==buf.packetlength;
 }
 
 //============================================================================================
@@ -1078,7 +1180,11 @@ public:
             {
                 // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority agent 
                 // (more primary in the rank). The agents with higher rank will hold the lower bits of the retries field in IBYTI packet).
+#ifdef SUBCHANNELS_IN_HEADER
+                if (!checkRank || h.getRespondingSubChannel() < h.mySubChannel())
+#else
                 if (!checkRank || topology->queryChannelInfo(h.channel).otherAgentHasPriority(h.priorityHash(), h.getRespondingSubChannel()))
+#endif
                 {
                     activity->abort();
                     return true;
@@ -1111,7 +1217,11 @@ public:
             }
             
             RoxiePacketHeader &header = packet->queryHeader();
+#ifdef SUBCHANNELS_IN_HEADER
+            unsigned mySubChannel = header.mySubChannel();
+#else
             unsigned mySubChannel = topology->queryChannelInfo(header.channel).subChannel();
+#endif
             // I failed to do the query, but already sent out IBYTI - resend it so someone else can try
             if (!isUser)
             {
@@ -1164,8 +1274,12 @@ public:
         hash64_t queryHash = packet->queryHeader().queryHash;
         unsigned activityId = packet->queryHeader().activityId & ~ROXIE_PRIORITY_MASK;
         Owned<IQueryFactory> queryFactory = getQueryFactory(queryHash, channel);
+#ifdef SUBCHANNELS_IN_HEADER
+        unsigned mySubChannel = header.mySubChannel();
+#else
         unsigned numAgents = topology->queryAgents(channel).ordinality();
         unsigned mySubChannel = topology->queryChannelInfo(channel).subChannel();
+#endif
         if (!queryFactory && logctx.queryWuid())
         {
             Owned <IRoxieDaliHelper> daliHelper = connectToDali();
@@ -1184,15 +1298,39 @@ public:
         }
         try
         {   
-            if (logctx.queryTraceLevel() > 8) 
+            bool debugging = logctx.queryDebuggerActive();
+#ifdef SUBCHANNELS_IN_HEADER
+            if (debugging)
             {
-                StringBuffer x;
-                logctx.CTXLOG("IBYTI delay controls : doIbytiDelay=%s numagents=%u subchnl=%u : %s",
-                    doIbytiDelay?"YES":"NO", 
-                    numAgents, topology->queryChannelInfo(channel).subChannel(),
-                    header.toString(x).str());
+                if (!mySubChannel)
+                    abortJob = true;  // when debugging, we always run on primary only... should really have sent to primary only too...
             }
-            bool debugging = logctx.queryDebuggerActive();
+            else if (doIbytiDelay && mySubChannel)
+            {
+                unsigned delay = 0;
+                for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
+                    delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
+                unsigned start;
+                if (traceRoxiePackets)
+                {
+                    StringBuffer x;
+                    DBGLOG("YES myTurnToDelay subchannel=%u delay=%u %s", mySubChannel, delay, header.toString(x).str());
+                    start = msTick();
+                }
+                if (delay)
+                    ibytiSem.wait(delay);
+                if (traceRoxiePackets)
+                {
+                    StringBuffer x;
+                    DBGLOG("Delay done, abortJob=%d, elapsed=%d", (int) abortJob, msTick()-start);
+                }
+                if (!abortJob)
+                {
+                    for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
+                        noteNodeSick(header.subChannels[subChannel].getIpAddress());
+                }
+            }
+#else
             if (debugging)
             {
                 if (mySubChannel)
@@ -1242,7 +1380,8 @@ public:
                     }
                 }
             }
-            if (abortJob) 
+#endif
+            if (abortJob)
             {
                 CriticalBlock b(actCrit);
                 busy = false;  // Keep order - before setActivity below
@@ -1640,8 +1779,8 @@ public:
                     MTIME_SECTION(queryActiveTimer(), "bucket_wait");
                     bucket.wait((length / 1024) + 1);
                 }
-                if (channelWrite(header.channel, &header, length) != length)
-                    DBGLOG("multicast write wrote too little");
+                if (!channelWrite(header, true))
+                    DBGLOG("Roxie packet write wrote too little");
                 packetsSent++;
             }
             catch (StoppedException *E)
@@ -1769,14 +1908,18 @@ public:
                 StringBuffer s;
                 throw MakeStringException(ROXIE_PACKET_ERROR, "Maximum packet length %d exceeded sending packet %s", maxPacketSize, header.toString(s).str());
             }
-            if (channelWrite(header.channel, &header, length) != length)
-                logctx.CTXLOG("multicast write wrote too little");
+            if (!channelWrite(header, true))
+                logctx.CTXLOG("Roxie packet write wrote too little");
             packetsSent++;
         }
     }
 
     virtual void sendIbyti(RoxiePacketHeader &header, const IRoxieContextLogger &logctx, unsigned subChannel) override
     {
+#ifdef SUBCHANNELS_IN_HEADER
+        if (!header.hasBuddies())
+            return;
+#endif
         MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendIbyti");
         RoxiePacketHeader ibytiHeader(header, header.activityId & ROXIE_PRIORITY_MASK, subChannel);
     
@@ -1784,8 +1927,7 @@ public:
         {
             StringBuffer s; logctx.CTXLOG("Sending IBYTI packet %s", ibytiHeader.toString(s).str());
         }
-        if (channelWrite(header.channel, &ibytiHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
-            logctx.CTXLOG("sendIbyti wrote too little");
+        channelWrite(ibytiHeader, false);  // don't send to self
         ibytiPacketsSent++;
     }
 
@@ -1798,7 +1940,7 @@ public:
         {
             StringBuffer s; logctx.CTXLOG("Sending ABORT packet %s", abortHeader.toString(s).str());
         }
-        if (channelWrite(header.channel, &abortHeader, sizeof(RoxiePacketHeader)) != sizeof(RoxiePacketHeader))
+        if (!channelWrite(abortHeader, true))
             logctx.CTXLOG("sendAbort wrote too little");
         abortsSent++;
     }
@@ -1808,14 +1950,15 @@ public:
         MTIME_SECTION(queryActiveTimer(), "RoxieSocketQueueManager::sendAbortCallback");
         RoxiePacketHeader abortHeader(header, ROXIE_FILECALLBACK, 0); // subChannel irrelevant - we are about to overwrite retries anyway
         abortHeader.retries = QUERY_ABORTED;
+        abortHeader.packetlength += strlen(lfn)+1;
         MemoryBuffer data;
         data.append(sizeof(abortHeader), &abortHeader).append(lfn);
         if (logctx.queryTraceLevel() > 5)
         {
             StringBuffer s; logctx.CTXLOG("Sending ABORT FILECALLBACK packet %s for file %s", abortHeader.toString(s).str(), lfn);
         }
-        if (channelWrite(header.channel, data.toByteArray(), data.length()) != data.length())
-            logctx.CTXLOG("tr->write wrote too little");
+        if (!channelWrite(*(RoxiePacketHeader *) data.toByteArray(), true))
+            logctx.CTXLOG("sendAbortCallback wrote too little");
         abortsSent++;
     }
 
@@ -1867,29 +2010,27 @@ public:
         return ret;
     }
 
-    void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue, const ITopologyServer* topology)
+    void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue
+#ifndef SUBCHANNELS_IN_HEADER
+                 , const ITopologyServer* topology
+#endif
+                 )
     {
-        assertex(!localAgent);
+        assert(!localAgent);
         bool preActivity = false;
+#ifdef SUBCHANNELS_IN_HEADER
+        unsigned mySubChannel = header.mySubChannel();
+#else
         const ChannelInfo &channelInfo = topology->queryChannelInfo(header.channel);
         unsigned mySubChannel = channelInfo.subChannel();
+#endif
 
-        if (traceLevel > 10)
-        {
-            IpAddress peer;
-            StringBuffer s, s1;
-            multicastSocket->getPeerAddress(peer).getIpText(s);
-            header.toString(s1);
-            DBGLOG("doIBYTI %s from %s", s1.str(), s.str());
-            DBGLOG("header.retries=%x header.getSubChannelMask(header.channel)=%x", header.retries, header.getSubChannelMask(mySubChannel));
-        }
-        
         if (header.retries == QUERY_ABORTED)
         {
             abortRunning(header, queue, false, preActivity);
             queue.remove(header);
 
-            if (traceLevel > 10)
+            if (traceRoxiePackets || traceLevel > 10)
             {
                 StringBuffer s; 
                 DBGLOG("Abort activity %s", header.toString(s).str());
@@ -1901,17 +2042,21 @@ public:
             unsigned subChannel = header.getRespondingSubChannel();
             if (subChannel == mySubChannel)
             {
-                if (traceLevel > 10)
+                if (traceRoxiePackets || traceLevel > 10)
                     DBGLOG("doIBYTI packet was from self");
                 ibytiPacketsFromSelf++;
             }
             else
             {
+#ifndef SUBCHANNELS_IN_HEADER
                 channelInfo.noteChannelHealthy(subChannel);
+#else
+                noteNodeHealthy(header.subChannels[subChannel].getIpAddress());
+#endif
                 bool foundInQ = queue.remove(header);
                 if (foundInQ)
                 {
-                    if (traceLevel > 10)
+                    if (traceRoxiePackets || traceLevel > 10)
                     {
                         StringBuffer s; 
                         DBGLOG("Removed activity from Q : %s", header.toString(s).str());
@@ -1921,14 +2066,22 @@ public:
                 }
                 if (abortRunning(header, queue, true, preActivity))
                 {
+                    if (traceRoxiePackets || traceLevel > 10)
+                    {
+                        StringBuffer s;
+                        DBGLOG("Aborted running activity : %s", header.toString(s).str());
+                    }
                     if (preActivity)
                         ibytiPacketsWorked++;
                     else 
                         ibytiPacketsHalfWorked++;
                     return;
                 }               
-                if (traceLevel > 10)
-                    DBGLOG("doIBYTI packet was too late");
+                if (traceRoxiePackets || traceLevel > 10)
+                {
+                    StringBuffer s;
+                    DBGLOG("doIBYTI packet was too late (or too early) : %s", header.toString(s).str());
+                }
                 ibytiPacketsTooLate++; // meaning either I started and reserve the right to finish, or I finished already
             }
         }
@@ -1938,6 +2091,9 @@ public:
     {
         // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
         // DO NOT put tracing on this thread except at very high tracelevels!
+#ifdef SUBCHANNELS_IN_HEADER
+        unsigned mySubchannel = header.mySubChannel();
+#else
         Owned<const ITopologyServer> topology = getTopology();
         if (!header.channel)
         {
@@ -1953,7 +2109,7 @@ public:
             return;
         }
         unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
-
+#endif
         if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
         {
             Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
@@ -1965,7 +2121,11 @@ public:
             doFileCallback(packet);
         }
         else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
-            doIbyti(header, queue, topology); // MORE - check how fast this is!
+            doIbyti(header, queue
+#ifndef SUBCHANNELS_IN_HEADER
+                    , topology
+#endif
+                   ); // MORE - check how fast this is!
         else
         {
             Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
@@ -2060,10 +2220,10 @@ public:
                 RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
                 if (l != header.packetlength)
                     DBGLOG("sock->read returned %d but packetlength was %d", l, header.packetlength);
-                if (traceLevel > 10)
+                if (traceRoxiePackets || traceLevel > 10)
                 {
                     StringBuffer s;
-                    DBGLOG("Read from multicast: %s", header.toString(s).str());
+                    DBGLOG("Read roxie packet: %s", header.toString(s).str());
                 }
 #ifdef ROXIE_SLA_LOGIC
                 if (header.activityId & ROXIE_SLA_PRIORITY)
@@ -2086,13 +2246,13 @@ public:
                     {
                         //MORE: I think this should probably be based on the error code instead.
 
-                        EXCLOG(E, "Exception reading or processing multicast msg");
+                        EXCLOG(E, "Exception reading or processing roxie packet");
                         E->Release();
                         MilliSleep(1000); // Give a chance for mem free
                     }
                     else 
                     {
-                        EXCLOG(E, "Exception reading or processing multicast msg");
+                        EXCLOG(E, "Exception reading or processing roxie packet");
                         E->Release();
                         // MORE: Protect with try logic, in case udp_create throws exception ?
                         //       What to do if create fails (ie exception is caught) ?
@@ -2503,12 +2663,17 @@ public:
             }
             else
             {
+#ifdef SUBCHANNELS_IN_HEADER
+                // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending
+                throwUnexpected();
+#else
                 // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
                 // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
                 for (unsigned i = 0; i < numChannels; i++)
                 {
                     targetQueue->enqueue(packet->clonePacket(i+1));
                 }
+#endif
             }
         }
     }

+ 3 - 0
roxie/udplib/udplib.hpp

@@ -48,6 +48,8 @@ public:
     ServerIdentifier(const IpAddress &from) { setIp(from); }
     const IpAddress getIpAddress() const;
     unsigned getIp4() const { return netAddress; };
+    inline bool isNull() const { return netAddress==0; }
+    inline void clear() { netAddress=0; }
     const ServerIdentifier & operator=(const ServerIdentifier &from)
     {
         netAddress = from.netAddress;
@@ -71,6 +73,7 @@ public:
         serverIp.setIP4(netAddress);
         return serverIp.getIpText(s);
     }
+    bool isMe() const;
 };
 
 extern UDPLIB_API ServerIdentifier myNode;

+ 5 - 0
roxie/udplib/udpsha.cpp

@@ -61,6 +61,11 @@ const IpAddress ServerIdentifier::getIpAddress() const
     return ret;
 }
 
+bool ServerIdentifier::isMe() const
+{
+    return *this==myNode;
+}
+
 ServerIdentifier myNode;
 
 //---------------------------------------------------------------------------------------------

+ 67 - 1
roxie/udplib/udptopo.cpp

@@ -18,6 +18,7 @@
 #include "jmisc.hpp"
 #include "udplib.hpp"
 #include "udptopo.hpp"
+#include "udpipmap.hpp"
 #include "roxie.hpp"
 #include "portlist.h"
 #include <thread>
@@ -28,7 +29,6 @@
 unsigned initIbytiDelay; // In milliseconds
 unsigned minIbytiDelay;  // In milliseconds
 
-
 unsigned ChannelInfo::getIbytiDelay(unsigned primarySubChannel) const  // NOTE - zero-based
 {
     unsigned delay = 0;
@@ -85,6 +85,33 @@ bool ChannelInfo::otherAgentHasPriority(unsigned priorityHash, unsigned otherAge
     return false;
 }
 
+static unsigned *createNewNodeHealthScore(const IpAddress &)
+{
+    return new unsigned(initIbytiDelay);
+}
+
+static IpMapOf<unsigned> buddyHealth(createNewNodeHealthScore);   // For each buddy IP ever seen, maintains a score of how long I should wait for it to respond when it is the 'first responder'
+
+void noteNodeSick(const IpAddress &node)
+{
+    // NOTE - IpMapOf is thread safe (we never remove entries). Two threads hitting at the same time may result in the change from one being lost, but that's not a disaster
+    unsigned current = buddyHealth[node];
+    unsigned newDelay = current / 2;
+    if (newDelay < minIbytiDelay)
+        newDelay = minIbytiDelay;
+    buddyHealth[node] = newDelay;
+}
+
+void noteNodeHealthy(const IpAddress &node)
+{
+    // NOTE - IpMapOf is thread safe (we never remove entries). Two threads hitting at the same time may result in the change from one being lost, but that's not a disaster
+    buddyHealth[node] = initIbytiDelay;
+}
+
+unsigned getIbytiDelay(const IpAddress &node)
+{
+    return buddyHealth[node];
+}
 
 class CTopologyServer : public CInterfaceOf<ITopologyServer>
 {
@@ -442,3 +469,42 @@ extern UDPLIB_API void stopTopoThread()
     }
 }
 
+#ifdef _USE_CPPUNIT
+#include "unittests.hpp"
+
+class BuddyHealthTest : public CppUnit::TestFixture
+{
+    CPPUNIT_TEST_SUITE(BuddyHealthTest);
+        CPPUNIT_TEST(testBuddyHealth);
+    CPPUNIT_TEST_SUITE_END();
+
+    void testBuddyHealth()
+    {
+        initIbytiDelay = 64;
+        minIbytiDelay = 16;
+        IpAddress a1("123.4.5.1");
+        IpAddress a2("123.4.6.2");
+        IpAddress a3("123.4.5.3");
+        CPPUNIT_ASSERT(getIbytiDelay(a1)==initIbytiDelay);
+        noteNodeSick(a1);
+        noteNodeSick(a2);
+        CPPUNIT_ASSERT(getIbytiDelay(a1)==initIbytiDelay/2);
+        CPPUNIT_ASSERT(getIbytiDelay(a2)==initIbytiDelay/2);
+        CPPUNIT_ASSERT(getIbytiDelay(a3)==initIbytiDelay);
+        noteNodeHealthy(a1);
+        CPPUNIT_ASSERT(getIbytiDelay(a1)==initIbytiDelay);
+        CPPUNIT_ASSERT(getIbytiDelay(a2)==initIbytiDelay/2);
+        CPPUNIT_ASSERT(getIbytiDelay(a3)==initIbytiDelay);
+        noteNodeSick(a2);
+        noteNodeSick(a2);
+        noteNodeSick(a2);
+        noteNodeSick(a2);
+        noteNodeSick(a2);
+        CPPUNIT_ASSERT(getIbytiDelay(a2)==minIbytiDelay);
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION( BuddyHealthTest );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( BuddyHealthTest, "BuddyHealthTest" );
+
+#endif

+ 6 - 0
roxie/udplib/udptopo.hpp

@@ -96,6 +96,12 @@ private:
     mutable std::vector<unsigned> currentDelay;  // NOTE - technically should be atomic, but in the event of a race we don't really care who wins
 };
 
+// In containerized mode with dynamic topology , we prefer a different mechanism for tracking node health
+
+extern UDPLIB_API void noteNodeSick(const IpAddress &node);
+extern UDPLIB_API void noteNodeHealthy(const IpAddress &node);
+extern UDPLIB_API unsigned getIbytiDelay(const IpAddress &node);
+
 interface ITopologyServer : public IInterface
 {
     virtual const SocketEndpointArray &queryAgents(unsigned channel) const = 0;