Переглянути джерело

HPCC-14354 Better handling for slot contention

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 роки тому
батько
коміт
c1466e6845
2 змінених файлів з 43 додано та 13 видалено
  1. 18 3
      system/jlib/jqueue.hpp
  2. 25 10
      testing/unittests/jlibtests.cpp

+ 18 - 3
system/jlib/jqueue.hpp

@@ -95,6 +95,8 @@ class ReaderWriterQueue
     const static state_t dequeueMask = (sequenceMask << dequeueShift);
     const static unsigned maxSlots = (1U << maxSlotBits) - 1;
     const static unsigned initialSpinsBeforeWait = 2000;
+    const static unsigned slotUnavailableSpins = 50;    // If not available for a short time then the thread must have been rescheduled
+
     const static state_t fixedSlotMask = (1U << fixedSlotBits) - 1;
 
 public:
@@ -207,10 +209,16 @@ public:
                     //MORE: Another producer has been interrupted while writing to the same slot
                     //or the consumer has not yet read from the slot.
                     //spin until that has been consumed.
+                    unsigned spins = 0;
                     while (cur.sequence.load(std::memory_order_acquire) != curEnqueueSeq)
                     {
-                        spinPause();
-                        //more: option to back off and yield.
+                        if (slotUnavailableSpins != 0 && ++spins == slotUnavailableSpins)
+                        {
+                            ThreadYield();
+                            spins = 0;
+                        }
+                        else
+                            spinPause();
                     }
 
                     //enqueue takes ownership of the object -> use std::move
@@ -303,13 +311,20 @@ public:
                     unsigned expectedSeq = (curDequeueSeq + 1) & sequenceMask;
                     unsigned curDequeueSlot = (curDequeueSeq & slotMask);
                     BufferElement & cur = values[curDequeueSlot];
+                    unsigned spins = 0;
                     loop
                     {
                         unsigned sequence = cur.sequence.load(std::memory_order_acquire);
                         if (sequence == expectedSeq)
                             break;
                         //possibly yield every n iterations?
-                        spinPause();
+                        if (slotUnavailableSpins != 0 && ++spins == slotUnavailableSpins)
+                        {
+                            ThreadYield();
+                            spins = 0;
+                        }
+                        else
+                            spinPause();
                     }
 
                     result = std::move(cur.value);

+ 25 - 10
testing/unittests/jlibtests.cpp

@@ -710,6 +710,8 @@ class JlibReaderWriterTest : public CppUnit::TestFixture
         }
     };
 public:
+    JlibReaderWriterTest() { unitWorkTimeMs = 0; }
+
     const static size_t bufferSize = 0x100000;//0x100000*64;
     void testQueue(IRowQueue & queue, unsigned numProducers, unsigned numConsumers, unsigned queueElements, unsigned readerWork, unsigned writerWork)
     {
@@ -776,14 +778,16 @@ public:
         }
 
         unsigned timeMs = cycle_to_nanosec(stopTime - startTime) / 1000000;
+        unsigned expectedReadWorkTime = (unsigned)(((double)unitWorkTimeMs * readerWork) / numConsumers);
+        unsigned expectedWriteWorkTime = (unsigned)(((double)unitWorkTimeMs * writerWork) / numProducers);
+        unsigned expectedWorkTime = std::max(expectedReadWorkTime, expectedWriteWorkTime);
         if (failures)
         {
             printf("Fail: Test %u producers %u consumers %u queueItems %u(%u) mismatches fail(@%u=%u)\n", numProducers, numConsumers, queueElements, failures, numClear, (unsigned)failPos, failValue);
             ASSERT(failures == 0);
         }
         else
-            printf("Pass: Test %u(@%u) producers %u(@%u) consumers %u queueItems in %ums\n", numProducers, writerWork, numConsumers, readerWork, queueElements, timeMs);
-
+            printf("Pass: Test %u(@%u) producers %u(@%u) consumers %u queueItems in %ums [%dms]\n", numProducers, writerWork, numConsumers, readerWork, queueElements, timeMs, timeMs-expectedWorkTime);
 
         for (unsigned i4 = 0; i4 < numConsumers; i4++)
         {
@@ -836,20 +840,23 @@ public:
         testQueue(5, 1, 10);
         testQueue(127, 1, 127);
 
+        cycle_t startTime = get_cycles_now();
+        volatile unsigned value = 0;
+        for (unsigned pass = 0; pass < 10; pass++)
+        {
+            for (unsigned i2 = 0; i2 < bufferSize; i2++)
+                value = spinCalculation(value, 1);
+        }
+        cycle_t stopTime = get_cycles_now();
+        unitWorkTimeMs = cycle_to_nanosec(stopTime - startTime) / (1000000 * 10);
+        printf("Work(1) takes %ums\n", unitWorkTimeMs);
+
         //How does it scale with number of queue elements?
         for (unsigned elem = 16; elem < 256; elem *= 2)
         {
             testQueue(16, 1, elem, 1, 1);
         }
 
-        cycle_t startTime = get_cycles_now();
-        volatile unsigned value = 0;
-        for (unsigned i2 = 0; i2 < bufferSize; i2++)
-            value = spinCalculation(value, 1);
-        cycle_t stopTime = get_cycles_now();
-        unsigned timeMs = cycle_to_nanosec(stopTime - startTime) / 1000000;
-        printf("Work(1) takes %ums\n", timeMs);
-
 #if 1
         //Many to Many
         for (unsigned readWork = 1; readWork <= 8; readWork = readWork * 2)
@@ -869,6 +876,12 @@ public:
                 testQueue(2, 2, 63, readWork, writeWork);
                 testQueue(4, 4, 63, readWork, writeWork);
                 testQueue(8, 8, 63, readWork, writeWork);
+                testQueue(16, 8, 63, readWork, writeWork);
+                testQueue(16, 16, 63, readWork, writeWork);
+                testQueue(32, 1, 63, readWork, writeWork);
+                testQueue(64, 1, 63, readWork, writeWork);
+                testQueue(1, 32, 63, readWork, writeWork);
+                testQueue(1, 64, 63, readWork, writeWork);
             }
 
         }
@@ -896,6 +909,8 @@ public:
         testQueue(2, 2, 100);
     }
 
+protected:
+    unsigned unitWorkTimeMs;
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(JlibReaderWriterTest);