瀏覽代碼

Merge pull request #14304 from richardkchapman/HPCC-24951

HPCC-24951 Roxie IBYTI Packets may arrive before the packet they are suppressing

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 4 年之前
父節點
當前提交
fe2b191969
共有 5 個文件被更改,包括 113 次插入0 次删除
  1. 4 0
      roxie/ccd/ccd.hpp
  2. 6 0
      roxie/ccd/ccdmain.cpp
  3. 100 0
      roxie/ccd/ccdqueue.cpp
  4. 2 0
      roxie/ccd/ccdsnmp.cpp
  5. 1 0
      roxie/ccd/ccdsnmp.hpp

+ 4 - 0
roxie/ccd/ccd.hpp

@@ -168,6 +168,7 @@ public:
 
     static unsigned getSubChannelMask(unsigned subChannel);
     unsigned priorityHash() const;
+    void copy(const RoxiePacketHeader &oh);
     bool matchPacket(const RoxiePacketHeader &oh) const;
     void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
     StringBuffer &toString(StringBuffer &ret) const;
@@ -303,10 +304,13 @@ extern unsigned preabortKeyedJoinsThreshold;
 extern unsigned preabortIndexReadsThreshold;
 extern bool traceStartStop;
 extern bool traceRoxiePackets;
+extern bool delaySubchannelPackets;
 extern bool traceServerSideCache;
 extern bool traceTranslations;
 extern bool defaultTimeActivities;
 extern bool defaultTraceEnabled;
+extern unsigned IBYTIbufferSize;
+extern unsigned IBYTIbufferLifetime;
 extern unsigned defaultTraceLimit;
 extern unsigned watchActivityId;
 extern unsigned testAgentFailure;

+ 6 - 0
roxie/ccd/ccdmain.cpp

@@ -82,10 +82,13 @@ unsigned maxLockAttempts = 5;
 bool pretendAllOpt = false;
 bool traceStartStop = false;
 bool traceRoxiePackets = false;
+bool delaySubchannelPackets = false;    // For debugging/testing purposes only
 bool traceServerSideCache = false;
 bool defaultTimeActivities = true;
 bool defaultTraceEnabled = false;
 bool traceTranslations = true;
+unsigned IBYTIbufferSize = 0;
+unsigned IBYTIbufferLifetime = 50;  // In milliseconds
 unsigned defaultTraceLimit = 10;
 unsigned watchActivityId = 0;
 unsigned testAgentFailure = 0;
@@ -1025,6 +1028,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
         watchActivityId = topology->getPropInt("@watchActivityId", 0);
         traceServerSideCache = topology->getPropBool("@traceServerSideCache", false);
         traceRoxiePackets = topology->getPropBool("@traceRoxiePackets", false);
+        delaySubchannelPackets = topology->getPropBool("@delaySubchannelPackets", false);
+        IBYTIbufferSize = topology->getPropInt("@IBYTIbufferSize", roxieMulticastEnabled ? 0 : 10);
+        IBYTIbufferLifetime = topology->getPropInt("@IBYTIbufferLifetime", initIbytiDelay);
         traceTranslations = topology->getPropBool("@traceTranslations", true);
         defaultTimeActivities = topology->getPropBool("@timeActivities", true);
         defaultTraceEnabled = topology->getPropBool("@traceEnabled", false);

+ 100 - 0
roxie/ccd/ccdqueue.cpp

@@ -93,6 +93,17 @@ unsigned RoxiePacketHeader::priorityHash() const
     return hash;
 }
 
+void RoxiePacketHeader::copy(const RoxiePacketHeader &oh)
+{
+    // used for saving away kill packets for later matching by match
+    uid = oh.uid;
+    overflowSequence = oh.overflowSequence;
+    continueSequence = oh.continueSequence;
+    serverId = oh.serverId;
+    channel = oh.channel;
+    // MORE - would it be safer, maybe even faster to copy the rest too?
+}
+
 bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
 {
     // used when matching up a kill packet against a pending one...
@@ -396,6 +407,8 @@ static bool channelWrite(RoxiePacketHeader &buf, bool includeSelf)
                 size32_t wrote = multicastSocket->udp_write_to(ep, &buf, buf.packetlength);
                 if (!subChannel || wrote < minwrote)
                     minwrote = wrote;
+                if (delaySubchannelPackets)
+                    MilliSleep(100);
             }
             else if (traceRoxiePackets)
             {
@@ -882,6 +895,57 @@ void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
 }
 
 //=================================================================================
+
+static ThreadId roxiePacketReaderThread = 0;
+
+class IBYTIbuffer
+{
+    // This class is used to track a finite set of recently-received IBYTI messages, that may have arrived before the messages they refer to
+    // It is accessed ONLY from the main reader thread and as such does not need to be threadsafe (but does need to be fast).
+    // We use a circular buffer, and don't bother removing anything (just treat old items as expired). If the buffer overflows we will end up
+    // discarding the oldest tracked orphaned IBYTI - but that's ok, no worse than if we hadn't tracked them at all.
+public:
+    IBYTIbuffer(unsigned _numOrphans) : numOrphans(_numOrphans)
+    {
+        assertex(numOrphans);
+        orphans = new RoxiePacketHeader[numOrphans];
+        tail = 0;
+    }
+    void noteOrphan(const RoxiePacketHeader &hdr)
+    {
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        unsigned now = msTick();
+        // We could trace that the buffer may be too small, if (orphans[tail].activityId >= now)
+        orphans[tail].copy(hdr);
+        orphans[tail].activityId = now + IBYTIbufferLifetime;
+        tail++;
+        if (tail == numOrphans)
+            tail = 0;
+    }
+    bool lookup(const RoxiePacketHeader &hdr) const
+    {
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        unsigned now = msTick();
+        unsigned lookat = tail;
+        do
+        {
+            if (!lookat)
+                lookat = numOrphans;
+            lookat--;
+            if ((int) (orphans[lookat].activityId - now) < 0)   // Watch out for wrapping
+                break;    // expired;
+            if (orphans[lookat].matchPacket(hdr))
+                return true;
+        } while (lookat != tail);
+        return false;
+    }
+private:
+    RoxiePacketHeader *orphans = nullptr;
+    unsigned tail = 0;
+    unsigned numOrphans = 0;
+};
+
+//=================================================================================
 //
 // RoxieQueue - holds pending transactions on a roxie agent
 
@@ -895,6 +959,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory
     unsigned numWorkers;
     RelaxedAtomic<unsigned> started;
     std::atomic<unsigned> idle;
+    IBYTIbuffer *myIBYTIbuffer = nullptr;
 
     void noteQueued()
     {
@@ -920,8 +985,16 @@ public:
         workers.setown(createThreadPool("RoxieWorkers", this, NULL, numWorkers));
         started = 0;
         idle = 0;
+        if (IBYTIbufferSize)
+            myIBYTIbuffer = new IBYTIbuffer(IBYTIbufferSize);
+    }
+
+    ~RoxieQueue()
+    {
+        delete myIBYTIbuffer;
     }
 
+
     virtual IPooledThread *createNew();
     void abortChannel(unsigned channel);
 
@@ -1101,6 +1174,20 @@ public:
         headRegionSize = newsize;
         return ret;
     }
+
+    void noteOrphanIBYTI(const RoxiePacketHeader &hdr)
+    {
+        if (myIBYTIbuffer)
+            myIBYTIbuffer->noteOrphan(hdr);
+    }
+
+    bool lookupOrphanIBYTI(const RoxiePacketHeader &hdr) const
+    {
+        if (myIBYTIbuffer)
+            return myIBYTIbuffer->lookup(hdr);
+        else
+            return false;
+    }
 };
 
 class CRoxieWorker : public CInterface, implements IPooledThread
@@ -1866,6 +1953,7 @@ protected:
 #else
             adjustPriority(1);
 #endif
+            roxiePacketReaderThread = GetCurrentThreadId();
             return parent.run();
         }
     } readThread;
@@ -2083,6 +2171,8 @@ public:
                     DBGLOG("doIBYTI packet was too late (or too early) : %s", header.toString(s).str());
                 }
                 ibytiPacketsTooLate++; // meaning either I started and reserve the right to finish, or I finished already
+                if (IBYTIbufferSize)
+                    queue.noteOrphanIBYTI(header);
             }
         }
     }
@@ -2126,6 +2216,16 @@ public:
                     , topology
 #endif
                    ); // MORE - check how fast this is!
+        else if (IBYTIbufferSize && queue.lookupOrphanIBYTI(header))
+        {
+            if (traceRoxiePackets || traceLevel > 10)
+            {
+                StringBuffer s;
+                DBGLOG("doIBYTI packet was too early : %s", header.toString(s).str());
+            }
+            ibytiPacketsTooLate--;
+            ibytiPacketsTooEarly++;
+        }
         else
         {
             Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);

+ 2 - 0
roxie/ccd/ccdsnmp.cpp

@@ -50,6 +50,7 @@ RelaxedAtomic<unsigned> ibytiPacketsWorked;
 RelaxedAtomic<unsigned> ibytiPacketsHalfWorked;
 RelaxedAtomic<unsigned> ibytiPacketsReceived;
 RelaxedAtomic<unsigned> ibytiPacketsTooLate;
+RelaxedAtomic<unsigned> ibytiPacketsTooEarly;
 RelaxedAtomic<unsigned> ibytiNoDelaysPrm;
 RelaxedAtomic<unsigned> ibytiNoDelaysSec;
 RelaxedAtomic<unsigned> packetsSent;
@@ -359,6 +360,7 @@ CRoxieMetricsManager::CRoxieMetricsManager()
     addMetric(ibytiPacketsHalfWorked, 1000);
     addMetric(ibytiPacketsReceived, 1000);
     addMetric(ibytiPacketsTooLate, 1000);
+    addMetric(ibytiPacketsTooEarly, 1000);
 #ifndef NO_IBYTI_DELAYS_COUNT
     addMetric(ibytiNoDelaysPrm, 1000);
     addMetric(ibytiNoDelaysSec, 1000);

+ 1 - 0
roxie/ccd/ccdsnmp.hpp

@@ -97,6 +97,7 @@ extern RelaxedAtomic<unsigned> ibytiPacketsWorked;
 extern RelaxedAtomic<unsigned> ibytiPacketsHalfWorked;
 extern RelaxedAtomic<unsigned> ibytiPacketsReceived;
 extern RelaxedAtomic<unsigned> ibytiPacketsTooLate;
+extern RelaxedAtomic<unsigned> ibytiPacketsTooEarly;
 extern RelaxedAtomic<unsigned> ibytiNoDelaysPrm;
 extern RelaxedAtomic<unsigned> ibytiNoDelaysSec;
 extern RelaxedAtomic<unsigned> packetsReceived;