Jelajahi Sumber

Merge pull request #10856 from richardkchapman/ibyti-multichannel

HPCC-19067 IBYTI delay mechanism fails when more than 2 slaves per channel

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 7 tahun lalu
induk
melakukan
0da7357c37
2 mengubah file dengan 62 tambahan dan 65 penghapusan
  1. 1 8
      initfiles/componentfiles/configxml/roxie.xsd.in
  2. 61 57
      roxie/ccd/ccdqueue.cpp

+ 1 - 8
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -1374,7 +1374,7 @@
     <xs:attribute name="initIbytiDelay" type="xs:nonNegativeInteger" use="optional" default="100">
       <xs:annotation>
         <xs:appinfo>
-          <tooltip>Initial time (in milliseconds) a agent will wait for an IBYTI packet from a peer.</tooltip>
+          <tooltip>Initial time (in milliseconds) a secondary agent will wait for an IBYTI packet from a primary peer.</tooltip>
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>
@@ -1420,13 +1420,6 @@
         </xs:appinfo>
       </xs:annotation>
     </xs:attribute>
-    <xs:attribute name="minIbytiDelay" type="xs:nonNegativeInteger" use="optional" default="0">
-      <xs:annotation>
-        <xs:appinfo>
-          <tooltip>Minimum time (millsec) a agent will wait for an IBYTI packet from a peer.</tooltip>
-        </xs:appinfo>
-      </xs:annotation>
-    </xs:attribute>
     <xs:attribute name="parallelAggregate" type="xs:nonNegativeInteger" use="optional" default="0">
       <xs:annotation>
         <xs:appinfo>

+ 61 - 57
roxie/ccd/ccdqueue.cpp

@@ -34,17 +34,15 @@
 #include <cppunit/extensions/HelperMacros.h>
 #endif
 
-CriticalSection ibytiCrit; // CAUTION - not safe to use spinlocks as real-time thread accesses
 CriticalSection queueCrit;
 unsigned channels[MAX_CLUSTER_SIZE];
 unsigned channelCount;
 unsigned subChannels[MAX_CLUSTER_SIZE];
 unsigned numSlaves[MAX_CLUSTER_SIZE];
 unsigned replicationLevel[MAX_CLUSTER_SIZE];
-unsigned IBYTIDelays[MAX_CLUSTER_SIZE]; // MORE: this will cover only 2 slaves per channel, change to cover all. 
+unsigned liveSubChannels[MAX_CLUSTER_SIZE];  // technically probably should be atomic, but any races are benign enough that not worth it.
 
-static SpinLock suspendCrit; // MORE: Could remove this, and replace the following with an atomic boolean array.
-static bool suspendedChannels[MAX_CLUSTER_SIZE];
+static std::atomic<bool> suspendedChannels[MAX_CLUSTER_SIZE];
 
 using roxiemem::OwnedRoxieRow;
 using roxiemem::OwnedConstRoxieRow;
@@ -610,38 +608,57 @@ void SlaveContextLogger::flush()
 
 //=================================================================================
 
-unsigned getIbytiDelay(unsigned channel, const RoxiePacketHeader &header)
+bool isSubChannelAlive(unsigned channel, unsigned subChannel)
+{
+    unsigned mask = SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
+    return (liveSubChannels[channel] & mask) == mask;
+}
+
+unsigned getIbytiDelay(unsigned channel, unsigned primarySubChannel)
 {
     // MORE - adjust delay according to whether it's a retry, whether it was a broadcast etc
-    CriticalBlock b(ibytiCrit);
-    return IBYTIDelays[channel];
+    unsigned mySubChannel = subChannels[channel] - 1;
+    unsigned subChannel = primarySubChannel;
+    unsigned delay = 0;
+    while (subChannel != mySubChannel)
+    {
+        if (isSubChannelAlive(channel, subChannel))
+            delay++;
+        subChannel++;
+        if (subChannel >= numSlaves[channel])
+            subChannel = 0;
+    }
+    return delay * initIbytiDelay;
 }
 
-void resetIbytiDelay(unsigned channel)
+void notePrimarySubChannelsDead(unsigned channel, unsigned primarySubChannel)
 {
-    unsigned prevVal;
+    unsigned mySubChannel = subChannels[channel] - 1;
+    unsigned subChannel = primarySubChannel;
+    while (subChannel != mySubChannel)
     {
-        CriticalBlock b(ibytiCrit);
-        prevVal = IBYTIDelays[channel];
-        IBYTIDelays[channel] = initIbytiDelay;
+        liveSubChannels[channel] &= ~(SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel));
+        subChannel++;
+        if (subChannel >= numSlaves[channel])
+            subChannel = 0;
     }
-    if (traceLevel > 8 && prevVal != initIbytiDelay)
-        DBGLOG("Reset IBYTI delay value for channel %u from %u to %u", channel, prevVal, initIbytiDelay);
 }
 
-void decIbytiDelay(unsigned channel, unsigned factor = 2)
+bool otherSlaveHasPriority(const RoxiePacketHeader &h)
 {
-    unsigned prevVal, newVal;
+    unsigned hash = h.priorityHash();
+    unsigned primarySubChannel = (hash % numSlaves[h.channel]) + 1;
+    unsigned mySubChannel = subChannels[h.channel];
+    while (primarySubChannel != mySubChannel)
     {
-        CriticalBlock b(ibytiCrit);
-        prevVal = IBYTIDelays[channel];
-        IBYTIDelays[channel] /= factor;
-        if (IBYTIDelays[channel] < minIbytiDelay)
-            IBYTIDelays[channel] = minIbytiDelay;
-        newVal = IBYTIDelays[channel];
+        unsigned channelMask = SUBCHANNEL_MASK << (SUBCHANNEL_BITS * primarySubChannel);
+        if ((h.retries & ROXIE_RETRIES_MASK) == channelMask)
+            return true;
+        primarySubChannel++;
+        if (primarySubChannel >= numSlaves[h.channel])
+            primarySubChannel = 0;
     }
-    if (traceLevel > 8 && prevVal != newVal)
-        DBGLOG("Dec IBYTI delay value for channel %u from %u to %u (factor=%u)", channel, prevVal, newVal, factor);
+    return false;
 }
 
 //=================================================================================
@@ -1000,13 +1017,13 @@ public:
         {
             queryFound = true;
             abortJob = true;
-            if (doIbytiDelay) 
+            if (doIbytiDelay)
                 ibytiSem.signal();
             if (activity) 
             {
                 // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority slave 
                 // (more primary in the rank). The slaves with higher rank will hold the lower bits of the retries field in IBYTI packet).
-                if (!checkRank || ((h.retries & ROXIE_RETRIES_MASK) < h.getSubChannelMask(h.channel))) 
+                if (!checkRank || otherSlaveHasPriority(h))
                 {
                     activity->abort();
                     return true;
@@ -1128,45 +1145,40 @@ public:
                 bool primChannel = true;
                 if (subChannels[channel] != 1) 
                     primChannel = false;
-                bool myTurnToDelayIBYTI =  true;  // all slaves will delay, except one
                 unsigned hdrHashVal = header.priorityHash();
-                if ((((hdrHashVal % numSlaves[channel]) + 1) == subChannels[channel]))
-                    myTurnToDelayIBYTI =  false;
-
-                if (myTurnToDelayIBYTI) 
+                unsigned primarySubChannel = (hdrHashVal % numSlaves[channel]) + 1;
+                if (primarySubChannel != subChannels[channel])
                 {
-                    unsigned delay = getIbytiDelay(channel, header);
+                    unsigned delay = getIbytiDelay(channel, primarySubChannel);
                     if (logctx.queryTraceLevel() > 6)
                     {
                         StringBuffer x;
                         logctx.CTXLOG("YES myTurnToDelayIBYTI channel=%s delay=%u hash=%u %s", primChannel?"primary":"secondary", delay, hdrHashVal, header.toString(x).str());
                     }
                     
-                    // MORE: this code puts the penalty on all slaves on this channel,
-                    //       change it to have one for each slave on every channel.
-                    //       NOT critical for the time being with 2 slaves per channel
                     // MORE: if we are dealing with a query that was on channel 0, we may want a longer delay 
                     // (since the theory about duplicated work not mattering when cluster is idle does not hold up)
 
                     if (delay)
                     {
                         ibytiSem.wait(delay);
-                        if (abortJob)
-                            resetIbytiDelay(channel); // we know there is an active buddy on the channel...
-                        else
-                            decIbytiDelay(channel);
+                        if (!abortJob)
+                            notePrimarySubChannelsDead(channel, primarySubChannel);
                         if (logctx.queryTraceLevel() > 8)
                         {
                             StringBuffer x;
-                            logctx.CTXLOG("Buddy did%s send IBYTI, updated delay=%u : %s", 
-                                abortJob ? "" : " NOT", IBYTIDelays[channel], header.toString(x).str());
+                            logctx.CTXLOG("Buddy did%s send IBYTI, updated delay : %s",
+                                abortJob ? "" : " NOT", header.toString(x).str());
                         }
                     }
                 }
-                else {
+                else
+                {
 #ifndef NO_IBYTI_DELAYS_COUNT
-                    if (primChannel) atomic_inc(&ibytiNoDelaysPrm);
-                    else atomic_inc(&ibytiNoDelaysSec);
+                    if (primChannel)
+                        atomic_inc(&ibytiNoDelaysPrm);
+                    else
+                        atomic_inc(&ibytiNoDelaysSec);
 #endif
                     if (logctx.queryTraceLevel() > 6)
                     {
@@ -1477,18 +1489,14 @@ public:
             assertex(subChannels[channel] == 0);
             assertex(subChannel != 0);
             subChannels[channel] = subChannel;
-            IBYTIDelays[channel] = initIbytiDelay;
+            liveSubChannels[channel] = 0;  // Until proven otherwise
             channels[channelCount++] = channel;
         }
     }
 
     virtual bool checkSuspended(const RoxiePacketHeader &header, const IRoxieContextLogger &logctx)
     {
-        bool suspended;
-        {
-            SpinBlock b(suspendCrit);
-            suspended = suspendedChannels[header.channel];
-        }
+        bool suspended = suspendedChannels[header.channel];
         if (suspended)
         {
             try 
@@ -1519,12 +1527,8 @@ public:
     virtual bool suspendChannel(unsigned channel, bool suspend, const IRoxieContextLogger &logctx)
     {
         assertex(channel < MAX_CLUSTER_SIZE);
-        bool prev;
-        {
-            SpinBlock b(suspendCrit);
-            prev = suspendedChannels[channel];
-            suspendedChannels[channel] = suspend;
-        }
+        bool prev = suspendedChannels[channel];
+        suspendedChannels[channel] = suspend;
         if (suspend && subChannels[channel] && !prev)
         {
             logctx.CTXLOG("ERROR: suspending channel %d - aborting active queries", channel);
@@ -1998,7 +2002,7 @@ public:
             }
             else
             {
-                resetIbytiDelay(header.channel);
+                liveSubChannels[header.channel] |= header.retries & ROXIE_RETRIES_MASK;
                 bool foundInQ;
                 {
                     CriticalBlock b(queueCrit);