Pārlūkot izejas kodu

HPCC-24952 Rework IBYTI processing

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 gadi atpakaļ
vecāks
revīzija
f8104c556b
2 mainītis faili ar 435 papildinājumiem un 105 dzēšanām
  1. 8 0
      roxie/ccd/ccd.hpp
  2. 427 105
      roxie/ccd/ccdqueue.cpp

+ 8 - 0
roxie/ccd/ccd.hpp

@@ -51,7 +51,15 @@
 
 #define ROXIE_STATEFILE_VERSION 2
 
+// Have not yet tested impact of new IBYTI handling in non-containerized systems
+
 #ifdef _CONTAINERIZED
+#define NEW_IBYTI
+#endif
+
+#if defined(_CONTAINERIZED) || defined (NEW_IBYTI)
+// Both containerized mode and new IBYTI mode assume subchannels are passed in header.
+// It SHOULD also work, and may be beneficial, in non-containerized systems but has not as yet been confirmed.
 #define SUBCHANNELS_IN_HEADER
 #endif
 

+ 427 - 105
roxie/ccd/ccdqueue.cpp

@@ -1194,7 +1194,9 @@ class CRoxieWorker : public CInterface, implements IPooledThread
 {
     RoxieQueue *queue;
     CriticalSection actCrit;
+#ifndef NEW_IBYTI
     Semaphore ibytiSem;
+#endif
     bool stopped;
     bool abortJob;
     bool busy;
@@ -1247,8 +1249,10 @@ public:
         if (packet && packet->queryHeader().channel==channel)
         {
             abortJob = true;
+#ifndef NEW_IBYTI
             if (doIbytiDelay) 
                 ibytiSem.signal();
+#endif
             if (activity) 
                 activity->abort();
         }
@@ -1261,8 +1265,10 @@ public:
         {
             queryFound = true;
             abortJob = true;
+#ifndef NEW_IBYTI
             if (doIbytiDelay)
                 ibytiSem.signal();
+#endif
             if (activity) 
             {
                 // Try to stop/abort a job after it starts only if IBYTI comes from a higher priority agent 
@@ -1386,18 +1392,19 @@ public:
         try
         {   
             bool debugging = logctx.queryDebuggerActive();
-#ifdef SUBCHANNELS_IN_HEADER
             if (debugging)
             {
-                if (!mySubChannel)
-                    abortJob = true;  // when debugging, we always run on primary only... should really have sent to primary only too...
+                if (mySubChannel)
+                    abortJob = true;  // when debugging, we always run on primary only...
             }
+#ifndef NEW_IBYTI
+#ifdef SUBCHANNELS_IN_HEADER
             else if (doIbytiDelay && mySubChannel)
             {
                 unsigned delay = 0;
                 for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
                     delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
-                unsigned start;
+                unsigned start = 0;
                 if (traceRoxiePackets)
                 {
                     StringBuffer x;
@@ -1418,11 +1425,6 @@ public:
                 }
             }
 #else
-            if (debugging)
-            {
-                if (mySubChannel)
-                    abortJob = true;  // when debugging, we always run on primary only...
-            }
             else if (doIbytiDelay && (numAgents > 1))
             {
                 unsigned hdrHashVal = header.priorityHash();
@@ -1468,6 +1470,7 @@ public:
                 }
             }
 #endif
+#endif
             if (abortJob)
             {
                 CriticalBlock b(actCrit);
@@ -1534,8 +1537,10 @@ public:
                     maxAgentsActive.store_max(agentsActive);
                     abortJob = false;
                     busy = true;
+#ifndef NEW_IBYTI
                     if (doIbytiDelay) 
                         ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
+#endif
                     packet.setown(queue->dequeue());
                     if (packet)
                     {
@@ -1930,6 +1935,286 @@ public:
     }
 };
 
+//------------------------------------------------------------------------------------------------------------
+#ifdef NEW_IBYTI
+
+class DelayedPacketQueue
+{
+    // Used to keep a list of all recently-received packets where we are not primary subchannel. There is one queue per subchannel level
+    // It is accessed ONLY from the main reader thread and does not need to be threadsafe (but does need to be fast)
+    // We use a doubly-linked list (not std::list as not quite flexible enough).
+
+    class DelayedPacketEntry
+    {
+        DelayedPacketEntry() = delete;
+        DelayedPacketEntry(const DelayedPacketEntry&) = delete;
+    public:
+        DelayedPacketEntry(IRoxieQueryPacket *_packet, unsigned _waitExpires)
+        : packet(_packet), waitExpires(_waitExpires)
+        {
+        }
+        ~DelayedPacketEntry()
+        {
+            if (prev)
+                prev->next = next;
+            if (next)
+                next->prev = prev;
+        }
+        bool matches(const RoxiePacketHeader &ibyti) const
+        {
+            return packet->queryHeader().matchPacket(ibyti);
+        }
+        IRoxieQueryPacket *getClear()
+        {
+            return packet.getClear();
+        }
+        StringBuffer & describe(StringBuffer &ret) const
+        {
+            return packet->queryHeader().toString(ret);
+        }
+
+        Owned<IRoxieQueryPacket> packet;
+        DelayedPacketEntry *next = nullptr;
+        DelayedPacketEntry *prev = nullptr;
+
+        unsigned waitExpires = 0;
+    };
+
+public:
+    DelayedPacketQueue() = default;
+    DelayedPacketQueue(const DelayedPacketQueue&) = delete;
+    ~DelayedPacketQueue()
+    {
+        while (head)
+            removeEntry(head);
+    }
+    bool doIBYTI(const RoxiePacketHeader &ibyti)
+    {
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        DelayedPacketEntry *finger = head;
+        while (finger)
+        {
+            if (finger->matches(ibyti))
+            {
+                if (traceRoxiePackets)
+                {
+                    StringBuffer s;
+                    DBGLOG("IBYTI removing delayed packet %s", finger->describe(s).str());
+                }
+                removeEntry(finger);
+                return true;
+            }
+            finger = finger->next;
+        }
+        return false;
+    }
+
+    void append(IRoxieQueryPacket *packet, unsigned expires)
+    {
+        // Goes on the end. But percolate the expiry time backwards
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires);
+        if (traceRoxiePackets)
+        {
+            StringBuffer s;
+            DBGLOG("Adding delayed packet %s", packet->queryHeader().toString(s).str());
+        }
+        newEntry->prev = tail;
+        if (tail)
+        {
+            tail->next = newEntry;
+            for (DelayedPacketEntry *finger = tail; finger != nullptr; finger = finger->prev)
+            {
+                if ((int) (finger->waitExpires - expires) <= 0)
+                    break;
+                finger->waitExpires = expires;
+                finger = finger->prev;
+            }
+        }
+        else
+            head = newEntry;
+        tail = newEntry;
+    }
+
+    // Move any that we are done waiting for our buddy onto the active queue
+    void checkExpired(
+            unsigned now,
+#ifdef ROXIE_SLA_LOGIC
+            RoxieQueue &slaQueue,
+#endif
+            RoxieQueue &hiQueue, RoxieQueue &loQueue)
+    {
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        DelayedPacketEntry *finger = head;
+        while (finger)
+        {
+            if (((int) (finger->waitExpires - now)) <= 0)   // Oddly coded to handle wrapping
+            {
+                IRoxieQueryPacket *packet = finger->getClear();
+                const RoxiePacketHeader &header = packet->queryHeader();
+                if (traceRoxiePackets)
+                {
+                    StringBuffer s;
+                    DBGLOG("No IBYTI received yet for delayed packet %s", header.toString(s).str());
+                }
+#ifdef ROXIE_SLA_LOGIC
+                if (header.activityId & ROXIE_SLA_PRIORITY)
+                    slaQueue.enqueue(packet);
+                else
+#endif
+                if (header.activityId & ROXIE_HIGH_PRIORITY)
+                    hiQueue.enqueue(packet);
+                else
+                    loQueue.enqueue(packet);
+                DelayedPacketEntry *goer = finger;
+                finger = finger->next;
+                removeEntry(goer);
+            }
+            else
+                break;
+        }
+    }
+
+    // How long until the next time we want to call checkExpires() ?
+    unsigned timeout(unsigned now) const
+    {
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        if (head)
+        {
+            int delay = (int) (head->waitExpires - now);
+            if (delay <= 0)
+                return 0;
+            else
+                return (unsigned) delay;
+        }
+        else
+            return (unsigned) -1;
+    }
+
+private:
+    void removeEntry(DelayedPacketEntry *goer)
+    {
+        if (goer==head)
+            head = goer->next;
+        if (goer==tail)
+            tail = goer->prev;
+        delete goer;
+    }
+
+    DelayedPacketEntry *head = nullptr;
+    DelayedPacketEntry *tail = nullptr;
+
+};
+
+//------------------------------------------------------------------------------------------------------------
+
+class DelayedPacketQueueChannel : public CInterface
+{
+    // Manages a set of DelayedPacketQueues, one for each supported subchannel level.
+    DelayedPacketQueueChannel() = delete;
+    DelayedPacketQueueChannel(const DelayedPacketQueueChannel&) = delete;
+public:
+    DelayedPacketQueueChannel(unsigned _channel) : channel(_channel)
+    {
+    }
+    inline unsigned queryChannel() const { return channel; }
+    inline DelayedPacketQueue &queryQueue(unsigned subchannel)
+    {
+        assertex(subchannel);  // Subchannel 0 means primary and is never delayed
+        subchannel -= 1;
+        if (subchannel > maxSeen)
+            maxSeen = subchannel;
+        return queues[subchannel];
+    }
+    unsigned timeout(unsigned now) const
+    {
+        unsigned min = (unsigned) -1;
+        for (unsigned queue = 0; queue <= maxSeen; queue++)
+        {
+            unsigned t = queues[queue].timeout(now);
+            if (t < min)
+                min = t;
+        }
+        return min;
+    }
+    void checkExpired(
+            unsigned now,
+#ifdef ROXIE_SLA_LOGIC
+            RoxieQueue &slaQueue,
+#endif
+            RoxieQueue &hiQueue, RoxieQueue &loQueue)
+    {
+        for (unsigned queue = 0; queue <= maxSeen; queue++)
+        {
+            queues[queue].checkExpired(
+                    now,
+#ifdef ROXIE_SLA_LOGIC
+                    slaQueue,
+#endif
+                    hiQueue, loQueue);
+        }
+    }
+private:
+    DelayedPacketQueue queues[MAX_SUBCHANNEL-1];   // Note - primary subchannel is not included
+    unsigned channel = 0;
+    unsigned maxSeen = 0;
+};
+
+class DelayedPacketQueueManager
+{
+
+public:
+    DelayedPacketQueueManager() = default;
+    DelayedPacketQueueManager(const DelayedPacketQueueManager&) = delete;
+    inline DelayedPacketQueue &queryQueue(unsigned channel, unsigned subchannel)
+    {
+        // Note - there are normally no more than a couple of channels on a single agent.
+        // If that were to change we could make this a fixed size array
+        assert(GetCurrentThreadId()==roxiePacketReaderThread);
+        ForEachItemIn(idx, channels)
+        {
+            DelayedPacketQueueChannel &i = channels.item(idx);
+            if (i.queryChannel() == channel)
+                return i.queryQueue(subchannel);
+        }
+        channels.append(*new DelayedPacketQueueChannel(channel));
+        return channels.tos().queryQueue(subchannel);
+    }
+    unsigned timeout(unsigned now) const
+    {
+        unsigned ret = (unsigned) -1;
+        ForEachItemIn(idx, channels)
+        {
+            unsigned t = channels.item(idx).timeout(now);
+            if (t < ret)
+                ret = t;
+        }
+        return ret;
+    }
+    void checkExpired(
+            unsigned now,
+#ifdef ROXIE_SLA_LOGIC
+            RoxieQueue &slaQueue,
+#endif
+            RoxieQueue &hiQueue, RoxieQueue &loQueue)
+    {
+        ForEachItemIn(idx, channels)
+        {
+            channels.item(idx).checkExpired(
+                    now,
+#ifdef ROXIE_SLA_LOGIC
+                    slaQueue,
+#endif
+                    hiQueue, loQueue);
+        }
+    }
+private:
+    CIArrayOf<DelayedPacketQueueChannel> channels;
+};
+#endif
+
+//------------------------------------------------------------------------------------------------------------
+
 class RoxieSocketQueueManager : public RoxieReceiverBase
 {
 protected:
@@ -1939,6 +2224,9 @@ protected:
     Owned<TokenBucket> bucket;
     unsigned maxPacketSize = 0;
     std::atomic<bool> running = { false };
+#ifdef NEW_IBYTI
+    DelayedPacketQueueManager delayed;
+#endif
 
     class ReceiverThread : public Thread
     {
@@ -2098,25 +2386,28 @@ public:
         return ret;
     }
 
-    void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue
-#ifndef SUBCHANNELS_IN_HEADER
-                 , const ITopologyServer* topology
-#endif
-                 )
+    void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue)
     {
         assert(!localAgent);
         bool preActivity = false;
 #ifdef SUBCHANNELS_IN_HEADER
         unsigned mySubChannel = header.mySubChannel();
 #else
+        Owned<const ITopologyServer> topology = getTopology();
         const ChannelInfo &channelInfo = topology->queryChannelInfo(header.channel);
         unsigned mySubChannel = channelInfo.subChannel();
 #endif
 
         if (header.retries == QUERY_ABORTED)
         {
-            abortRunning(header, queue, false, preActivity);
-            queue.remove(header);
+            bool foundInQ = false;
+#ifdef NEW_IBYTI
+            foundInQ = mySubChannel != 0 && delayed.queryQueue(header.channel, mySubChannel).doIBYTI(header);
+#endif
+            if (!foundInQ)
+                foundInQ = queue.remove(header);
+            if (!foundInQ)
+                abortRunning(header, queue, false, preActivity);
 
             if (traceRoxiePackets || traceLevel > 10)
             {
@@ -2141,7 +2432,12 @@ public:
 #else
                 noteNodeHealthy(header.subChannels[subChannel].getIpAddress());
 #endif
-                bool foundInQ = queue.remove(header);
+                bool foundInQ = false;
+#ifdef NEW_IBYTI
+                foundInQ = mySubChannel != 0 && delayed.queryQueue(header.channel, mySubChannel).doIBYTI(header);
+#endif
+                if (!foundInQ)
+                    foundInQ = queue.remove(header);
                 if (foundInQ)
                 {
                     if (traceRoxiePackets || traceLevel > 10)
@@ -2181,87 +2477,100 @@ public:
     {
         // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
         // DO NOT put tracing on this thread except at very high tracelevels!
-        if (!header.channel)
+        if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
+            doIbyti(header, queue);
+        else
         {
-            // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
-            // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
-            // Unfortunately this is bad news for dropping packets
+            if (!header.channel)
+            {
+                // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
+                // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
+                // Unfortunately this is bad news for dropping packets
             // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending, except for some control messages like PING or UNLOAD
 
-            Owned<const ITopologyServer> topology = getTopology();
-            const std::vector<unsigned> channels = topology->queryChannels();
-            Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
-            for (unsigned i = 1; i < channels.size(); i++)
-                queue.enqueue(packet->clonePacket(channels[i]));
-            header.channel = channels[0];
-            queue.enqueue(packet.getClear());
-            return;
-        }
+                Owned<const ITopologyServer> topology = getTopology();
+                const std::vector<unsigned> channels = topology->queryChannels();
+                Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
+                for (unsigned i = 1; i < channels.size(); i++)
+                    queue.enqueue(packet->clonePacket(channels[i]));
+                header.channel = channels[0];
+                queue.enqueue(packet.getClear());
+                return;
+            }
 #ifdef SUBCHANNELS_IN_HEADER
-        unsigned mySubchannel = header.mySubChannel();
+            unsigned mySubchannel = header.mySubChannel();
 #else
-        Owned<const ITopologyServer> topology = getTopology();
-        unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
+            Owned<const ITopologyServer> topology = getTopology();
+            unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
 #endif
-        if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
-        {
-            Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
-            if (traceLevel > 10)
+            if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
             {
-                StringBuffer s; 
-                DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
+                Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
+                if (traceLevel > 10)
+                {
+                    StringBuffer s;
+                    DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
+                }
+                doFileCallback(packet);
             }
-            doFileCallback(packet);
-        }
-        else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
-            doIbyti(header, queue
-#ifndef SUBCHANNELS_IN_HEADER
-                    , topology
-#endif
-                   ); // MORE - check how fast this is!
-        else if (IBYTIbufferSize && queue.lookupOrphanIBYTI(header))
-        {
-            if (traceRoxiePackets || traceLevel > 10)
+            else if (IBYTIbufferSize && queue.lookupOrphanIBYTI(header))
             {
-                StringBuffer s;
-                DBGLOG("doIBYTI packet was too early : %s", header.toString(s).str());
+                if (traceRoxiePackets || traceLevel > 10)
+                {
+                    StringBuffer s;
+                    DBGLOG("doIBYTI packet was too early : %s", header.toString(s).str());
+                }
+                ibytiPacketsTooLate--;
+                ibytiPacketsTooEarly++;
             }
-            ibytiPacketsTooLate--;
-            ibytiPacketsTooEarly++;
-        }
-        else
-        {
-            Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
-            AgentContextLogger logctx(packet);
-            unsigned retries = header.thisChannelRetries(mySubchannel);
-            if (retries)
+            else
             {
-                // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
-                assertex(header.channel); // should never see a retry on channel 0
-                if (retries >= SUBCHANNEL_MASK)
-                    return; // someone sent a failure or something - ignore it
-
-                // Send back an out-of-band immediately, to let Roxie server know that channel is still active
-                if (!(testAgentFailure & 0x800))
+                Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
+                AgentContextLogger logctx(packet);
+                unsigned retries = header.thisChannelRetries(mySubchannel);
+                if (retries)
                 {
-                    RoxiePacketHeader newHeader(header, ROXIE_ALIVE, mySubchannel);
-                    Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
-                    output->flush();
-                }
+                    // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
+                    assertex(header.channel); // should never see a retry on channel 0
+                    if (retries >= SUBCHANNEL_MASK)
+                        return; // someone sent a failure or something - ignore it
 
-                // If it's a retry, look it up against already running, or output stream, or input queue
-                // if found, send an IBYTI and discard retry request
-                
-                if (!mySubchannel)
-                    retriesReceivedPrm++;
-                else  
-                    retriesReceivedSec++;
-                bool alreadyRunning = false;
-                Owned<IPooledThreadIterator> wi = queue.running();
-                ForEach(*wi)
-                {
-                    CRoxieWorker &w = (CRoxieWorker &) wi->query();
-                    if (w.match(header))
+                    // Send back an out-of-band immediately, to let Roxie server know that channel is still active
+                    if (!(testAgentFailure & 0x800))
+                    {
+                        RoxiePacketHeader newHeader(header, ROXIE_ALIVE, mySubchannel);
+                        Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
+                        output->flush();
+                    }
+
+                    // If it's a retry, look it up against already running, or output stream, or input queue
+                    // if found, send an IBYTI and discard retry request
+
+                    if (!mySubchannel)
+                        retriesReceivedPrm++;
+                    else
+                        retriesReceivedSec++;
+                    bool alreadyRunning = false;
+                    Owned<IPooledThreadIterator> wi = queue.running();
+                    ForEach(*wi)
+                    {
+                        CRoxieWorker &w = (CRoxieWorker &) wi->query();
+                        if (w.match(header))
+                        {
+                            alreadyRunning = true;
+                            if (!mySubchannel)
+                                retriesIgnoredPrm++;
+                            else
+                                retriesIgnoredSec++;
+                            ROQ->sendIbyti(header, logctx, mySubchannel);
+                            if (logctx.queryTraceLevel() > 10)
+                            {
+                                StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
+                            }
+                            break;
+                        }
+                    }
+                    if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
                     {
                         alreadyRunning = true;
                         if (!mySubchannel)
@@ -2271,35 +2580,33 @@ public:
                         ROQ->sendIbyti(header, logctx, mySubchannel);
                         if (logctx.queryTraceLevel() > 10)
                         {
-                            StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
+                            StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for completed activity %s", mySubchannel, header.toString(xx).str());
                         }
-                        break;
                     }
-                } 
-                if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
-                {
-                    alreadyRunning = true;
-                    if (!mySubchannel)
-                        retriesIgnoredPrm++;
-                    else 
-                        retriesIgnoredSec++;
-                    ROQ->sendIbyti(header, logctx, mySubchannel);
-                    if (logctx.queryTraceLevel() > 10)
+                    if (!alreadyRunning)
                     {
-                        StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for completed activity %s", mySubchannel, header.toString(xx).str());
+                        if (logctx.queryTraceLevel() > 10)
+                        {
+                            StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
+                        }
+                        queue.enqueueUnique(packet.getClear(), mySubchannel);
                     }
                 }
-                if (!alreadyRunning)
+                else // first time (not a retry).
                 {
-                    if (logctx.queryTraceLevel() > 10)
+#ifdef NEW_IBYTI
+                    if (mySubchannel != 0)  // i.e. I am not the primary here
                     {
-                        StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
+                        unsigned delay = 0;
+                        for (unsigned subChannel = 0; subChannel < mySubchannel; subChannel++)
+                            delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
+                        delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
                     }
-                    queue.enqueueUnique(packet.getClear(), mySubchannel);
+                    else
+#endif
+                        queue.enqueue(packet.getClear());
                 }
             }
-            else // first time (not a retry). 
-                queue.enqueue(packet.getClear());
         }
     }
 
@@ -2316,8 +2623,15 @@ public:
             {
                 // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
                 // DO NOT put tracing on this thread except at very high tracelevels!
+#ifdef NEW_IBYTI
+                unsigned timeout = delayed.timeout(msTick());
+                if (timeout>5000)
+                    timeout = 5000;
+#else
+                unsigned timeout = 5000;
+#endif
                 unsigned l;
-                multicastSocket->read(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, 5);
+                multicastSocket->readtms(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, timeout);
                 mb.setLength(l);
                 packetsReceived++;
                 RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
@@ -2374,6 +2688,14 @@ public:
                     break;
                 }
             }
+#ifdef NEW_IBYTI
+            delayed.checkExpired(
+                            msTick(),
+#ifdef ROXIE_SLA_LOGIC
+                            slaQueue,
+#endif
+                            hiQueue, loQueue);
+#endif
         }
         return 0;
     }