Ver código fonte

HPCC-14878 Pass stop() up the stream chain - even if not started

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 anos atrás
pai
commit
91e2369cf9

+ 102 - 40
common/thorhelper/thorstrand.cpp

@@ -49,15 +49,27 @@ public:
         started = true;
     }
 
+    void startProducerThread(IThreaded & main)
+    {
+        CThreaded * thread = new CThreaded("ReadAheadThread", &main);
+        threads.append(*thread);
+        thread->start();
+    }
+
     void processConsumerStop()
     {
-        //Ensure only one producer triggers stopping on the inputs
-        if (started && !stopping.exchange(true, std::memory_order_acq_rel))
+        if (started)
         {
-            stopConsumers();
-            for (unsigned i=0; i < numProducers; i++)
-                producerStoppedSem.wait();
+            //Ensure only one producer triggers stopping on the inputs
+            if (!stopping.exchange(true, std::memory_order_acq_rel))
+            {
+                stopActiveProducers();
+                for (unsigned i=0; i < numProducers; i++)
+                    producerStoppedSem.wait();
+            }
         }
+        else
+            stopInactiveProducers();
     }
 
     void waitForStop()
@@ -70,16 +82,26 @@ public:
         producerStoppedSem.signal();
     }
 
+    inline unsigned getNumProducers() const { return numProducers; }
+
 protected:
-    virtual void stopConsumers()
+    //Stop producers that have already been started()
+    virtual void stopActiveProducers()
     {
         producerStopSem.signal(numProducers);
+        ForEachItemIn(i, threads)
+            threads.item(i).join();
+        threads.kill();
     }
 
+    //Stop producers that have never been started()
+    virtual void stopInactiveProducers() = 0;
+
 private:
     unsigned numProducers;
     Semaphore producerStopSem;
     Semaphore producerStoppedSem;
+    CIArrayOf<CThreaded> threads;
     std::atomic<bool> stopping;
     bool started;
 };
@@ -101,7 +123,7 @@ public:
         assertex(n == 0);
         stream = _stream;
     }
-    virtual void ready()
+    virtual void start()
     {
     }
     virtual void reset()
@@ -243,6 +265,11 @@ public:
         queue = _queue;
     }
 
+    void stopInput()
+    {
+        stream->stop();
+    }
+
 protected:
     CStrandJunction & junction;
     RowBlockAllocator & allocator;
@@ -352,10 +379,10 @@ public:
         resetBlockQueue(queue);
         CStrandJunction::reset();
     }
-    virtual void ready()
+    virtual void start()
     {
         for (unsigned i=0; i < numStrands; i++)
-            asyncStart("ReadAheadThread", *producers[i]);
+            startProducerThread(*producers[i]);
         noteStarted();
     }
     static BlockedManyToOneJunction * create(roxiemem::IRowManager & rowManager, unsigned numStrands, unsigned blockSize)
@@ -366,12 +393,18 @@ public:
     }
 
 protected:
-    virtual void stopConsumers()
+    virtual void stopActiveProducers()
     {
         queue->abort();
-        CStrandJunction::stopConsumers();
+        CStrandJunction::stopActiveProducers();
+    }
+    virtual void stopInactiveProducers()
+    {
+        for (unsigned i=0; i < numStrands; i++)
+            producers[i]->stopInput();
     }
 
+
 protected:
     unsigned numStrands;
     Owned<IRowQueue> queue;
@@ -424,17 +457,23 @@ public:
             consumers[i]->reset();
         CStrandJunction::reset();
     }
-    virtual void ready()
+    virtual void start()
     {
-        asyncStart(producer);
+        startProducerThread(producer);
         noteStarted();
     }
 
 protected:
-    virtual void stopConsumers()
+    virtual void stopActiveProducers()
     {
         queue->abort();
-        CStrandJunction::stopConsumers();
+        CStrandJunction::stopActiveProducers();
+    }
+
+    virtual void stopInactiveProducers()
+    {
+        for (unsigned i=0; i < numStrands; i++)
+            producer.stopInput();
     }
 
 protected:
@@ -583,7 +622,7 @@ public:
         alive = true;
     }
 
-    void stop()
+    void stopInput()
     {
         stream->stop();
     }
@@ -661,13 +700,12 @@ public:
         numActiveStrands = numStrands;
         CStrandJunction::reset();
     }
-    virtual void ready()
+    virtual void start()
     {
         for (unsigned i=0; i < numStrands; i++)
-        {
-            CThreaded * thread = new CThreaded("ReadAheadThread", producers[i]);
-            thread->startRelease();
-        }
+            startProducerThread(*producers[i]);
+        noteStarted();
+        throwUnexpected();  // reimplement...
     }
     virtual const void *nextRow()
     {
@@ -732,6 +770,11 @@ protected:
                 curStrand = 0;
         } while (!producers[curStrand]->isAlive());
     }
+    virtual void stopInactiveProducers()
+    {
+        for (unsigned i=0; i < numStrands; i++)
+            producers[i]->stopInput();
+    }
 
 
 protected:
@@ -936,6 +979,11 @@ public:
         strand = _input;
     }
 
+    void stopStrand()
+    {
+        strand->stop();
+    }
+
     inline OrderedReadAheadQueue & queryInputQueue() { return inputQueue; }
     inline OrderedReadAheadQueue & queryOutputQueue() { return outputQueue; }
 
@@ -962,12 +1010,20 @@ class COrderedStrandBranch : public CInterface, implements IStrandBranch, implem
         OrderedInputJunction(COrderedStrandBranch & _owner) : CStrandJunction(1), owner(_owner) {}
         virtual IEngineRowStream * queryOutput(unsigned n) { return owner.queryStrandInput(n); }
         virtual void setInput(unsigned n, IEngineRowStream * _stream) { assertex(n==0); owner.setInput(_stream); }
-        virtual void ready() { owner.startInputReader(); }
+        virtual void start()
+        {
+            startProducerThread(owner);
+            noteStarted();
+        }
         virtual void abort() { owner.abortInputQueues(); }
-        virtual void stopConsumers()
+        virtual void stopActiveProducers()
         {
             owner.abortInputQueues();
-            CStrandJunction::stopConsumers();
+            CStrandJunction::stopActiveProducers();
+        }
+        virtual void stopInactiveProducers()
+        {
+            owner.stopInactiveInput();
         }
     protected:
         COrderedStrandBranch & owner;
@@ -978,18 +1034,27 @@ class COrderedStrandBranch : public CInterface, implements IStrandBranch, implem
         OrderedOutputJunction(COrderedStrandBranch & _owner) : CStrandJunction(_owner.numStrands), owner(_owner) {}
         virtual IEngineRowStream * queryOutput(unsigned n) { assertex(n==0); return owner.queryOutput(); }
         virtual void setInput(unsigned n, IEngineRowStream * _stream) { owner.setStrand(n, _stream); }
-        virtual void ready() { owner.startStrandReaders(); }
+        virtual void start()
+        {
+            for (unsigned i=0; i < getNumProducers(); i++)
+                startProducerThread(*owner.strands[i]);
+            noteStarted();
+        }
         virtual void abort() { owner.abortOutputQueues(); }
-        virtual void stopConsumers()
+        virtual void stopActiveProducers()
         {
             owner.abortOutputQueues();
-            CStrandJunction::stopConsumers();
+            CStrandJunction::stopActiveProducers();
         }
         virtual void reset()
         {
             owner.resetBranch();
             CStrandJunction::reset();
         }
+        virtual void stopInactiveProducers()
+        {
+            owner.stopInactiveStrands();
+        }
     protected:
         COrderedStrandBranch & owner;
     };
@@ -1032,19 +1097,6 @@ public:
             strands[i]->abortOutputQueue();
     }
 
-    void startStrandReaders()
-    {
-        for (unsigned i=0; i < numStrands; i++)
-            asyncStart(*strands[i]);
-        outputJunction->noteStarted();
-    }
-
-    void startInputReader()
-    {
-        asyncStart(*this);
-        inputJunction->noteStarted();
-    }
-
     virtual void main()
     {
         unsigned curStrand = 0;
@@ -1205,6 +1257,16 @@ public:
         return this;
     }
 
+    void stopInactiveInput()
+    {
+        input->stop();
+    }
+
+    void stopInactiveStrands()
+    {
+        for (unsigned i=0; i < numStrands; i++)
+            strands[i]->stopStrand();
+    }
 
  //interface IEngineRowStream
      virtual const void *nextRow()

+ 2 - 2
common/thorhelper/thorstrand.hpp

@@ -28,12 +28,12 @@ class IStrandJunction : extends IInterface
 public:
     virtual IEngineRowStream * queryOutput(unsigned n) = 0;
     virtual void setInput(unsigned n, IEngineRowStream * _stream) = 0;
-    virtual void ready() = 0;
+    virtual void start() = 0;
     virtual void reset() = 0;
     virtual void abort() = 0;
 };
 
-inline void startJunction(IStrandJunction * junction) { if (junction) junction->ready(); }
+inline void startJunction(IStrandJunction * junction) { if (junction) junction->start(); }
 inline void resetJunction(IStrandJunction * junction) { if (junction) junction->reset(); }
 
 interface IManyToOneRowStream : extends IRowStream

+ 5 - 4
roxie/ccd/ccdserver.cpp

@@ -1623,7 +1623,7 @@ protected:
     IArrayOf<StrandProcessor> strands;
     Owned<IStrandBranch> branch;
     Owned<IStrandJunction> splitter;
-    unsigned active;  // SHould really be atomic
+    std::atomic<unsigned> active;
 public:
     CRoxieServerStrandedActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const StrandOptions &_strandOptions)
         : CRoxieServerActivity(_ctx, _factory, _probeManager),
@@ -1645,7 +1645,7 @@ public:
 
     virtual void reset()
     {
-        // assertex(active==0);  Disable for now as we know that stop() is nt being called on the strands.
+        assertex(active==0);
         ForEachItemIn(idx, strands)
             strands.item(idx).reset();
         resetJunction(splitter);
@@ -1655,8 +1655,9 @@ public:
     virtual void stop()
     {
         // Called from the strands... which should ensure that stop is not called more than once per strand
-        if (active);
-            active--;
+        //The first strand to call
+        if (active)
+            --active;
         if (!active)
             CRoxieServerActivity::stop();
     }