Переглянути джерело

HPCC-14873 Add support to indicate readers are completed

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 роки тому
батько
коміт
20d5688d6d
3 змінених файлів з 51 додано та 16 видалено
  1. 15 8
      common/thorhelper/thorstrand.cpp
  2. 12 7
      system/jlib/jqueue.cpp
  3. 24 1
      system/jlib/jqueue.hpp

+ 15 - 8
common/thorhelper/thorstrand.cpp

@@ -393,9 +393,7 @@ public:
 
     virtual void stop()
     {
-        //A stop on any unordered branch implies that there is no need to read any more on other threads?
-        //??Is this strictly correct?
-        queue.abort();
+        queue.noteReaderStopped();
         junction.processConsumerStop();
     }
 
@@ -584,20 +582,29 @@ public:
         }
         abortSoon = false;
         finishedWriting = false;
+        finishedReading = false;
     }
 
     bool enqueue(RoxieRowBlock * next)
     {
-        if (abortSoon)
+        if (abortSoon || finishedReading)
             return false;
         space.wait();
-        if (abortSoon)
+        if (abortSoon || finishedReading)
             return false;
         value = next;
         avail.signal();
         return true;
     }
 
+    void noteReaderStopped()
+    {
+        if (abortSoon)
+            return;
+        finishedReading = true;
+        space.signal();
+    }
+
     void noteWriterStopped()
     {
         if (abortSoon)
@@ -631,6 +638,7 @@ protected:
     RoxieRowBlock * value = nullptr;
     bool abortSoon = false;
     bool finishedWriting = false;
+    bool finishedReading = false;
     Semaphore space __attribute__((aligned(CACHE_LINE_SIZE)));
     Semaphore avail __attribute__((aligned(CACHE_LINE_SIZE)));
 };
@@ -933,7 +941,7 @@ public:
     virtual void stop()
     {
         //reading no more records => abort the queue and prevent the producer adding any more rows
-        inputQueue.abort();
+        inputQueue.noteReaderStopped();
         splitJunction.processConsumerStop();
     }
 
@@ -1602,8 +1610,7 @@ public:
     }
     virtual void stop()
     {
-        //MORE: What should this do?
-        queue->abort();
+        queue->noteReaderStopped();
     }
     virtual void reset()
     {

+ 12 - 7
system/jlib/jqueue.cpp

@@ -25,7 +25,7 @@ template <typename state_t, unsigned readerBits, unsigned writerBits, unsigned m
 class CRowQueue : implements CInterfaceOf<IRowQueue>
 {
 public:
-    CRowQueue(unsigned _maxItems, unsigned _numProducers) : queue(_numProducers, _maxItems), numProducers(_numProducers)
+    CRowQueue(unsigned _maxItems, unsigned _numProducers, unsigned _numConsumers) : queue(_numProducers, _numConsumers, _maxItems), numConsumers(_numConsumers), numProducers(_numProducers)
     {
     }
 
@@ -47,6 +47,10 @@ public:
         queue.reset();
         //How clean up the queue and ensure the elements are disposed of?
     }
+    virtual void noteReaderStopped()
+    {
+        queue.noteReaderStopped();
+    }
     virtual void noteWriterStopped()
     {
         queue.noteWriterStopped();
@@ -58,6 +62,7 @@ public:
 
 private:
     ReaderWriterQueue<const void *, state_t, readerBits, writerBits, maxSlotBits, slotBits> queue;
+    const unsigned numConsumers;
     const unsigned numProducers;
 };
 
@@ -69,22 +74,22 @@ IRowQueue * createRowQueue(unsigned numReaders, unsigned numWriters, unsigned ma
     assertex(maxSlots == 0 || maxItems < maxSlots);
 
     if ((numReaders == 1) && (numWriters == 1) && (maxItems < 256))
-        return new CRowQueue<unsigned, 1, 1, 8, 8>(maxItems, numWriters);
+        return new CRowQueue<unsigned, 1, 1, 8, 8>(maxItems, numWriters, numReaders);
 
     if ((numReaders == 1) && (numWriters == 1) && (maxItems < 0x4000))
-        return new CRowQueue<unsigned, 1, 1, 14, 0>(maxItems, numWriters);
+        return new CRowQueue<unsigned, 1, 1, 14, 0>(maxItems, numWriters, numReaders);
 
     if ((numReaders == 1) && (numWriters <= 127) && (maxItems < 256))
-        return new CRowQueue<unsigned, 1, 7, 8, 0>(maxItems, numWriters);
+        return new CRowQueue<unsigned, 1, 7, 8, 0>(maxItems, numWriters, numReaders);
 
     if ((numWriters == 1) && (numReaders <= 255) && (maxItems < 2048))
-        return new CRowQueue<unsigned, 8, 1, 11, 0>(maxItems, numWriters);
+        return new CRowQueue<unsigned, 8, 1, 11, 0>(maxItems, numWriters, numReaders);
 
     if ((numReaders <= 31) && (numWriters <= 31) && (maxItems < 128))
-        return new CRowQueue<unsigned, 6, 6, 7, 0>(maxItems, numWriters);
+        return new CRowQueue<unsigned, 6, 6, 7, 0>(maxItems, numWriters, numReaders);
 
     assertex((numReaders < 0x1000) && (numWriters < 0x400));
-    return new CRowQueue<unsigned __int64, 12, 10, 16, 0>(maxItems, numWriters);
+    return new CRowQueue<unsigned __int64, 12, 10, 16, 0>(maxItems, numWriters, numReaders);
 }
 
 //MORE:

+ 24 - 1
system/jlib/jqueue.hpp

@@ -34,6 +34,7 @@ public:
     virtual bool enqueue(const ELEMENT item) = 0;
     virtual bool dequeue(ELEMENT & result) = 0;
     virtual bool tryDequeue(ELEMENT & result) = 0;
+    virtual void noteReaderStopped() = 0;
     virtual void noteWriterStopped() = 0;
     virtual void abort() = 0;
     virtual void reset() = 0;
@@ -100,7 +101,7 @@ class ReaderWriterQueue
     const static state_t fixedSlotMask = (1U << fixedSlotBits) - 1;
 
 public:
-    ReaderWriterQueue(unsigned _maxWriters, unsigned _maxItems) : maxItems(_maxItems), maxWriters(_maxWriters)
+    ReaderWriterQueue(unsigned _maxWriters, unsigned _maxReaders, unsigned _maxItems) : maxItems(_maxItems), maxReaders(_maxReaders), maxWriters(_maxWriters)
     {
         //printf("element(%u) pad(%u) write(%u), read(%u) slot(%u) count(%u) max(%u)\n", stateBits, padBits, writerBits, readerBits, maxSlotBits, countBits, maxItems);
         //Check all the bits are used, and none of the bits overlap.
@@ -130,6 +131,7 @@ public:
             dynamicSlotMask = fixedSlotMask;
         }
 
+        activeReaders.store(maxReaders, std::memory_order_relaxed);
         activeWriters.store(maxWriters, std::memory_order_relaxed);
         aborted.store(false, std::memory_order_relaxed);
         state.store(0, std::memory_order_relaxed);
@@ -160,6 +162,9 @@ public:
             unsigned curCount = (curState & countMask);
             if (curCount == maxItems)
             {
+                if (allReadersStopped())
+                    return false;
+
                 if (--numSpins != 0) // likely
                 {
                     curState = state.load(std::memory_order_acquire);
@@ -174,6 +179,8 @@ public:
                 {
                     if (aborted.load(std::memory_order_acquire))
                         return false;
+                    if (allReadersStopped())
+                        return false;
                     writers.wait();
                     if (aborted.load(std::memory_order_acquire))
                         return false;
@@ -237,6 +244,8 @@ public:
         if (aborted.load(std::memory_order_relaxed))
             return false;
 
+        dbgassertex(!allReadersStopped());
+
         unsigned numSpins = initialSpinsBeforeWait;
         //Note, compare_exchange_weak updates curState when it fails, so don't read inside the main loop
         state_t curState = state.load(std::memory_order_acquire);
@@ -341,11 +350,22 @@ public:
 
     virtual void reset()
     {
+        activeReaders.store(maxReaders, std::memory_order_relaxed);
         activeWriters.store(maxWriters, std::memory_order_relaxed);
         aborted.store(false, std::memory_order_relaxed);
         readers.reinit(0);
         writers.reinit(0);
     }
+    virtual void noteReaderStopped()
+    {
+        //MORE: If this reduces activeProducers to 0 then it may need to wake up any waiting threads.
+        if (--activeReaders <= 0)
+        {
+            state_t curState = state.load(std::memory_order_acquire);
+            unsigned writersWaiting = (unsigned)((curState & writerMask) >> writerShift);
+            writers.signal(writersWaiting);
+        }
+    }
     virtual void noteWriterStopped()
     {
         //MORE: If this reduces activeProducers to 0 then it may need to wake up any waiting threads.
@@ -367,12 +387,15 @@ public:
         writers.signal(writersWaiting);
     }
     inline bool allWritersStopped() const { return activeWriters.load(std::memory_order_acquire) <= 0; }
+    inline bool allReadersStopped() const { return activeReaders.load(std::memory_order_acquire) <= 0; }
 
 protected:
     BufferElement * values;
     unsigned dynamicSlotMask;
     unsigned maxItems;
+    unsigned maxReaders;
     unsigned maxWriters;
+    std::atomic<int> activeReaders;
     std::atomic<int> activeWriters;
     std::atomic<bool> aborted;
     Semaphore readers __attribute__((aligned(CACHE_LINE_SIZE)));