瀏覽代碼

HPCC-26795 Reduce number of critical sections in udp transport layer

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 3 年之前
父節點
當前提交
68140d231a
共有 3 個文件被更改,包括 60 次插入51 次删除
  1. 45 42
      roxie/udplib/udpsha.cpp
  2. 14 7
      roxie/udplib/udpsha.hpp
  3. 1 2
      roxie/udplib/udptrs.cpp

+ 45 - 42
roxie/udplib/udpsha.cpp

@@ -91,39 +91,58 @@ queue_t::~queue_t()
     }
 }
 
-unsigned queue_t::available()
+void queue_t::interrupt()
 {
-    CriticalBlock b(c_region);
-    if (count < limit)
-        return limit - count;
-    return 0;
+    data_avail.interrupt();
 }
 
-int queue_t::free_slots() 
+void queue_t::doEnqueue(DataBuffer *buf)
 {
-    int res=0;
-    while (res <= 0)
+    // Must currently be called within a critical section.  Does not signal - that should be done outside the CS.
+    // Could probably be done lock-free, which given one thread using this is high priority might avoid some
+    // potential priority-inversion issues. Or we might consider using PI-aware futexes here?
+    if (tail)
+    {
+        assert(head);
+        assert(!tail->msgNext);
+        tail->msgNext = buf;
+    }
+    else
+    {
+        assert(!head);
+        head = buf;
+    }
+    tail = buf;
+    count.fastAdd(1); // inside a critical section, so no need for atomic inc.
+#ifdef _DEBUG
+    if (count > limit)
+        DBGLOG("queue_t::pushOwn set count to %u", count.load());
+#endif
+}
+
+void queue_t::pushOwnWait(DataBuffer * buf)
+{
+    assert(!buf->msgNext);
+    for (;;)
     {
-        c_region.enter();
-        res = limit - count;
-        if (res <= 0)
-            signal_free_sl++;
-        c_region.leave();
-        if (res <= 0)
         {
-            while (!free_sl.wait(3000))
+            CriticalBlock b(c_region);
+
+            if (count < limit)
             {
-                if (udpTraceLevel >= 1)
-                    DBGLOG("queue_t::free_slots blocked for 3 seconds waiting for free_sl semaphore");
+                doEnqueue(buf);
+                break;  // signal outside the critical section, rather than here, so the waiting thread can progress
             }
+            signal_free_sl++;
         }
-    }
-    return res;
-}
 
-void queue_t::interrupt()
-{
-    data_avail.interrupt();
+        while (!free_sl.wait(3000))
+        {
+            if (udpTraceLevel >= 1)
+                DBGLOG("queue_t::pushOwnWait blocked for 3 seconds waiting for free_sl semaphore");
+        }
+    }
+    data_avail.signal();
 }
 
 void queue_t::pushOwn(DataBuffer *buf)
@@ -133,23 +152,7 @@ void queue_t::pushOwn(DataBuffer *buf)
     assert(!buf->msgNext);
     {
         CriticalBlock b(c_region);
-        if (tail)
-        {
-            assert(head);
-            assert(!tail->msgNext);
-            tail->msgNext = buf;
-        }
-        else
-        {
-            assert(!head);
-            head = buf;
-        }
-        tail = buf;
-        count++;
-#ifdef _DEBUG
-        if (count > limit)
-            DBGLOG("queue_t::pushOwn set count to %u", count);
-#endif
+        doEnqueue(buf);
     }
     data_avail.signal();
 }
@@ -164,7 +167,7 @@ DataBuffer *queue_t::pop(bool block)
         CriticalBlock b(c_region);
         if (!count)
             return nullptr;
-        count--;
+        count.fastAdd(-1); // inside a critical section => not atomic
         ret = head;
         head = head->msgNext;
         if (!head)
@@ -211,7 +214,7 @@ unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
                     if (temp==tail)
                         tail = prev;
                     ::Release(temp);
-                    count--;
+                    count.fastAdd(-1);
                     if (count < limit && signal_free_sl)
                     {
                         signal_free_sl--;

+ 14 - 7
roxie/udplib/udpsha.hpp

@@ -88,27 +88,34 @@ class queue_t
     DataBuffer *head = nullptr;      // We add at tail and remove from head
     DataBuffer *tail = nullptr;
 
-    unsigned count = 0;
+    RelaxedAtomic<unsigned> count{0};       // always updated inside a critical section, only atomic to guarantee it can be read outside the crit sec.
     unsigned limit = 0;
     
     CriticalSection c_region;
     InterruptableSemaphore data_avail;
-    Semaphore       free_sl;              // Signalled when (a) someone is waiting for it and (b) count changes from >= limit to < limit
-    unsigned        signal_free_sl = 0;   // Number of people waiting in free_sl. Only updated within critical section
+    Semaphore       free_sl;                // Signalled when (a) someone is waiting for it and (b) count changes from >= limit to < limit
+    unsigned        signal_free_sl = 0;     // Number of people waiting in free_sl. Only updated within critical section
     
 public: 
     void interrupt();
-    void pushOwn(DataBuffer *buffer);
+    void pushOwn(DataBuffer *buffer);       // Non blocking enqueue (used by receiver)
+    void pushOwnWait(DataBuffer *buffer);   // Blocking enqueue (used by the sender)
     DataBuffer *pop(bool block);
     bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
     unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
-    unsigned available();                // non-blocking
-    int  free_slots();                   // block if no free slots
-    void set_queue_size(unsigned limit); //must be called immediately after constructor if default constructor is used
+    unsigned available() const              // non-blocking, no critical section
+    {
+        unsigned num = count.load();
+        return likely(num < limit) ? limit - num : 0;
+    }
+    void set_queue_size(unsigned limit);    //must be called immediately after constructor if default constructor is used
     queue_t(unsigned int queue_size);
     queue_t() {};
     ~queue_t();
     inline int capacity() const { return limit; }
+
+protected:
+    void doEnqueue(DataBuffer *buf); // internal function to add the item to the queue, but not signal
 };
 
 

+ 1 - 2
roxie/udplib/udptrs.cpp

@@ -508,8 +508,7 @@ public:
 
     inline void pushData(unsigned queue, DataBuffer *buffer)
     {
-        output_queue[queue].free_slots();     // block until at least one free space
-        output_queue[queue].pushOwn(buffer);
+        output_queue[queue].pushOwnWait(buffer);  // block until there is some space on the queue
         if (!packetsQueued++)
             requestToSendNew();
     }