Browse Source

Merge pull request #13553 from richardkchapman/udp-resend

HPCC-23348 Roxie may lock up under load

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 5 years ago
parent
commit
f172fe61ff
5 changed files with 86 additions and 76 deletions
  1. 40 28
      roxie/udplib/udpsha.cpp
  2. 2 3
      roxie/udplib/udpsha.hpp
  3. 1 1
      roxie/udplib/udptrr.cpp
  4. 40 44
      roxie/udplib/udptrs.cpp
  5. 3 0
      system/jlib/jsem.cpp

+ 40 - 28
roxie/udplib/udpsha.cpp

@@ -97,14 +97,6 @@ queue_t::~queue_t()
     delete [] elements; 
 }
 
-bool queue_t::empty() 
-{
-    c_region.enter();
-    bool res = (active_buffers == 0);
-    c_region.leave();
-    return res;
-}
-
 int queue_t::free_slots() 
 {
     int res=0;
@@ -148,9 +140,10 @@ void queue_t::pushOwn(DataBuffer *buf)
     data_avail.signal();
 }
 
-DataBuffer *queue_t::pop()
+DataBuffer *queue_t::pop(bool block)
 {
-    data_avail.wait();
+    if (!data_avail.wait(block ? INFINITE : 0))
+        return nullptr;
     DataBuffer *ret = NULL; 
     bool must_signal;
     {
@@ -171,30 +164,49 @@ DataBuffer *queue_t::pop()
 }
 
 
-bool queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
+unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
 {
-    bool ret = false;
-    CriticalBlock b(c_region);
-    if (active_buffers) 
+    unsigned removed = 0;
+    unsigned signalFree = 0;
+    unsigned signalFreeSlots = 0;
     {
-        unsigned ix = first;
-        for (;;)
+        CriticalBlock b(c_region);
+        if (active_buffers)
         {
-            if (elements[ix].data && 
-                ((key == NULL) || (pkCmpFn == NULL) || pkCmpFn((const void*) elements[ix].data, key)))
+            unsigned destix = first;
+            unsigned ix = first;
+            for (;;)
             {
-                ::Release(elements[ix].data);
-                elements[ix].data = NULL;  // safer than trying to remove it and close up queue - race conditions with code elsewhere
-                ret = true;
+                if (elements[ix].data && (!key || !pkCmpFn || pkCmpFn((const void*) elements[ix].data, key)))
+                {
+                    ::Release(elements[ix].data);
+                    signalFree++;
+                    active_buffers--;
+                    removed++;
+                }
+                else
+                    elements[destix++] = elements[ix];
+                ix++;
+                if (ix==element_count)
+                    ix = 0;
+                if (destix==element_count)
+                    destix = 0;
+                if (ix == last)
+                    break;
             }
-            ix++;
-            if (ix==element_count)
-                ix = 0;
-            if (ix == last)
-                break;
-        }           
+            if (signalFree && signal_free_sl)
+            {
+                signal_free_sl--;
+                signalFreeSlots++;
+            }
+            last = destix;
+        }
     }
-    return ret;
+    if (signalFree)
+        free_space.signal(signalFree);
+    if (signalFreeSlots)
+        free_sl.signal(signalFreeSlots);
+    return removed;
 }
 
 

+ 2 - 3
roxie/udplib/udpsha.hpp

@@ -75,10 +75,9 @@ class queue_t
 public: 
     void interrupt();
     void pushOwn(roxiemem::DataBuffer *buffer);
-    roxiemem::DataBuffer *pop();
-    bool empty() ;
+    roxiemem::DataBuffer *pop(bool block);
     bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
-    bool removeData(const void *key, PKT_CMP_FUN pkCmpFn);
+    unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
     int  free_slots(); //block if no free slots
     void set_queue_size(unsigned int queue_size); //must be called immediately after constructor if default constructor is used
     queue_t(unsigned int queue_size);

+ 1 - 1
roxie/udplib/udptrr.cpp

@@ -769,7 +769,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     {
         while(running) 
         {
-            DataBuffer *dataBuff = input_queue->pop();
+            DataBuffer *dataBuff = input_queue->pop(true);
             collatePacket(dataBuff);
         }
     }

+ 40 - 44
roxie/udplib/udptrs.cpp

@@ -171,16 +171,15 @@ public:
         while (toSend.size() < maxPackets && packetsQueued.load(std::memory_order_relaxed))
         {
             DataBuffer *buffer = popQueuedData();
-            if (buffer) // Aborted slave queries leave NULL records on queue
-            {
-                UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
-                toSend.push_back(buffer);
-                totalSent += header->length;
+            if (!buffer)
+                break;  // Suggests data was aborted before we got to pop it
+            UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
+            toSend.push_back(buffer);
+            totalSent += header->length;
 #if defined(__linux__) || defined(__APPLE__)
-                if (isLocal && (totalSent> 100000))  // Avoids sending too fast to local node, for reasons lost in the mists of time
-                    break;
+            if (isLocal && (totalSent> 100000))  // Avoids sending too fast to local node, for reasons lost in the mists of time
+                break;
 #endif
-            }
         }
         for (DataBuffer *buffer: toSend)
         {
@@ -229,17 +228,16 @@ public:
     bool removeData(void *key, PKT_CMP_FUN pkCmpFn) 
     {
         // Used after receiving an abort, to avoid sending data that is no longer required
-        bool anyRemoved = false;
+        unsigned removed = 0;
         if (packetsQueued.load(std::memory_order_relaxed))
         {
-            // NOTE - removeData replaces entries by null (so value of packetsQueued is not affected)
             for (unsigned i = 0; i < numQueues; i++)
             {
-                if (output_queue[i].removeData(key, pkCmpFn))
-                    anyRemoved = true;
+                removed += output_queue[i].removeData(key, pkCmpFn);
             }
+            packetsQueued -= removed;
         }
-        return anyRemoved;
+        return removed > 0;
     }
 
     void abort()
@@ -266,49 +264,47 @@ public:
     DataBuffer *popQueuedData() 
     {
         DataBuffer *buffer;
-        while (1) 
+        for (unsigned i = 0; i < numQueues; i++)
         {
-            for (unsigned i = 0; i < numQueues; i++) 
+            if (udpOutQsPriority)
             {
-                if (udpOutQsPriority) 
+                buffer = output_queue[current_q].pop(false);
+                if (!buffer)
                 {
-                    if (output_queue[current_q].empty()) 
+                    if (udpTraceLevel >= 5)
+                        DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
+                    currentQNumPkts = 0;
+                    current_q = (current_q + 1) % numQueues;
+                }
+                else
+                {
+                    currentQNumPkts++;
+                    if (udpTraceLevel >= 5)
+                        DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
+                    if (currentQNumPkts >= maxPktsPerQ[current_q])
                     {
-                        if (udpTraceLevel >= 5)
-                            DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
                         currentQNumPkts = 0;
                         current_q = (current_q + 1) % numQueues;
                     }
-                    else 
-                    {
-                        buffer = output_queue[current_q].pop();
-                        currentQNumPkts++;
-                        if (udpTraceLevel >= 5) 
-                            DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
-                        if (currentQNumPkts >= maxPktsPerQ[current_q]) 
-                        {
-                            currentQNumPkts = 0;
-                            current_q = (current_q + 1) % numQueues;
-                        }
-                        packetsQueued--;
-                        return buffer;
-                    }
+                    packetsQueued--;
+                    return buffer;
                 }
-                else 
+            }
+            else
+            {
+                current_q = (current_q + 1) % numQueues;
+                buffer = output_queue[current_q].pop(false);
+                if (buffer)
                 {
-                    current_q = (current_q + 1) % numQueues;
-                    if (!output_queue[current_q].empty()) 
-                    {
-                        packetsQueued--;
-                        return output_queue[current_q].pop();
-                    }
+                    packetsQueued--;
+                    return buffer;
                 }
             }
-            // If we get here, it suggests we were told to get a buffer but no queue has one
-            // Should never happen
-            MilliSleep(10);
-            DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
         }
+        // If we get here, it suggests we were told to get a buffer but no queue has one.
+        // This should be rare but possible if data gets removed following an abort, as
+        // there is a window in abort() between the remove and the decrement of packetsQueued.
+        return nullptr;
     }
 
     UdpReceiverEntry(const IpAddress &_ip, const IpAddress &_sourceIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort)

+ 3 - 0
system/jlib/jsem.cpp

@@ -80,6 +80,9 @@ bool Semaphore::wait(unsigned timeout)
     if (sem_trywait(&sem) == 0)
         return true;
 
+    if (timeout==0)
+        return false;
+
     timespec abs;
     getEndTime(abs, timeout);
     int ret = sem_timedwait(&sem, &abs);