Browse Source

Merge pull request #7121 from jakesmith/hpcc-13239

HPCC-13239 Release stopped distribute buckets as soon as possible

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
b763ec2dce
1 changed files with 80 additions and 56 deletions
  1. 80 56
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp

+ 80 - 56
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -147,8 +147,9 @@ protected:
                 OwnedConstThorRow row = dedupList.getClear(--i);
                 if ((NULL != prev.get()) && (0 == iCompare->docompare(prev, row)))
                 {
-                    size32_t rsz = owner.rowMemSize(row);
-                    total -= rsz;
+                    /* NB: do not alter 'total' size. It represents the amount originally added to the bucket
+                     * which will be deducted when sent.
+                     */
                 }
                 else
                 {
@@ -159,11 +160,12 @@ protected:
             dedupList.clearRows();
             return true; // attempted
         }
-        void add(const void *row, size32_t &rs)
+        size32_t add(const void *row)
         {
-            rs = owner.rowMemSize(row);
+            size32_t rs = owner.rowMemSize(row);
             total += rs;
             rows.enqueue(row);
+            return rs;
         }
         unsigned queryDestination() const { return destination; }
         size32_t querySize() const { return total; }
@@ -298,13 +300,12 @@ protected:
             }
             void send(CMessageBuffer &mb)
             {
-                CriticalBlock b(crit);
+                CriticalBlock b(crit); // protects against multiple senders to the same target
                 if (!atomic_read(&senderFinished))
                 {
                     if (owner.selfPush(target))
                         assertex(target != owner.self);
-                    if (!owner.sendBlock(target, mb))
-                        atomic_set(&senderFinished, 1);
+                    owner.sendBlock(target, mb);
                 }
             }
             inline unsigned getNumPendingBuckets() const
@@ -337,15 +338,6 @@ protected:
             {
                 return atomic_read(&senderFinished) != 0;
             }
-            inline void checkSenderFinished()
-            {
-                CriticalBlock b(crit);
-                if (!atomic_read(&senderFinished))
-                {
-                    atomic_set(&senderFinished, 1);
-                    atomic_inc(&owner.numFinished);
-                }
-            }
             inline CSendBucket *queryBucket()
             {
                 return bucket;
@@ -360,6 +352,10 @@ protected:
             {
                 return bucket.getClear();
             }
+            bool queryMarkSenderFinished()
+            {
+                return atomic_cas(&senderFinished, 1, 0);
+            }
         };
         /*
          * CWriterHandler, a per thread class and member of the writerPool
@@ -400,17 +396,16 @@ protected:
                 size32_t writerTotalSz = 0;
                 size32_t sendSz = 0;
                 CMessageBuffer msg;
-                size32_t rawSz = 0;
                 while (!owner.aborted)
                 {
-                    writerTotalSz += sendBucket->querySize();
-                    rawSz += sendBucket->querySize();
+                    writerTotalSz += sendBucket->querySize(); // NB: This size is pre-dedup, and is the correct amount to pass to decTotal
                     owner.dedup(sendBucket); // conditional
 
                     if (owner.selfPush(dest))
                     {
-                        HDSendPrintLog2("CWriteHandler, sending raw=%d to LOCAL", rawSz);
-                        distributor.addLocal(sendBucket);
+                        HDSendPrintLog2("CWriteHandler, sending raw=%d to LOCAL", writerTotalSz);
+                        if (!target->getSenderFinished())
+                            distributor.addLocal(sendBucket);
                     }
                     else // remote
                     {
@@ -422,10 +417,7 @@ protected:
                         if (sendSz < distributor.bucketSendSize)
                         {
                             // more added to dest I'm processing?
-                            {
-                                CriticalBlock b(owner.activeWritersLock);
-                                sendBucket.setown(target->dequeuePendingBucket());
-                            }
+                            sendBucket.setown(target->dequeuePendingBucket());
                             if (sendBucket)
                             {
                                 HDSendPrintLog3("CWriteHandler, pending(b=%d) rolled, size=%d", sendBucket->queryDestination(), sendBucket->querySize());
@@ -433,9 +425,9 @@ protected:
                                 continue; // NB: it will flow into else "remote" arm
                             }
                         }
-                        target->send(msg);
+                        if (!target->getSenderFinished())
+                            target->send(msg);
                         sendSz = 0;
-                        rawSz = 0;
                         msg.clear();
                     }
                     // see if others to process
@@ -471,6 +463,7 @@ protected:
         Semaphore senderFullSem;
         Linked<IException> exception;
         atomic_t numFinished;
+        atomic_t stoppedTargets;
         unsigned dedupSamples, dedupSuccesses, self;
         Owned<IThreadPool> writerPool;
         unsigned totalActiveWriters;
@@ -481,6 +474,7 @@ protected:
             totalSz = 0;
             senderFull = false;
             atomic_set(&numFinished, 0);
+            atomic_set(&stoppedTargets, 0);
             dedupSamples = dedupSuccesses = 0;
             doDedup = owner.doDedup;
             writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
@@ -503,15 +497,9 @@ protected:
             totalSz = 0;
             senderFull = false;
             atomic_set(&numFinished, 0);
+            atomic_set(&stoppedTargets, 0);
             aborted = false;
         }
-        void reinit()
-        {
-            if (initialized)
-                reset();
-            else
-                init();
-        }
         unsigned queryInactiveWriters() const
         {
             CriticalBlock b(activeWritersLock);
@@ -561,13 +549,12 @@ protected:
             SpinBlock b(totalSzLock);
             return totalSz;
         }
-        inline bool sendBlock(unsigned i, CMessageBuffer &msg)
+        inline void sendBlock(unsigned target, CMessageBuffer &msg)
         {
-            if (owner.sendBlock(i, msg))
-                return true;
-            atomic_inc(&numFinished);
-            owner.ActPrintLog("CSender::sendBlock stopped slave %d (finished=%d)", i+1, atomic_read(&numFinished));
-            return false;
+            if (owner.sendBlock(target, msg))
+                return;
+            markStopped(target); // Probably a bit pointless if target is 'self' - process loop will have done already
+            owner.ActPrintLog("CSender::sendBlock stopped slave %d (finished=%d)", target+1, atomic_read(&numFinished));
         }
         inline bool selfPush(unsigned i) const
         {
@@ -610,6 +597,13 @@ protected:
                     delete targets.item(n);
             }
         }
+        void reinit()
+        {
+            if (initialized)
+                reset();
+            else
+                init();
+        }
         CSendBucket *getAnotherBucket(unsigned &next)
         {
             // NB: called inside activeWritersLock
@@ -632,31 +626,50 @@ protected:
         }
         void add(CSendBucket *bucket)
         {
-            if (owner.selfstopped)
-                targets.item(self)->checkSenderFinished();
             unsigned dest = bucket->queryDestination();
             CTarget *target = targets.item(dest);
-            if (target->getSenderFinished())
+            CriticalBlock b(activeWritersLock);
+            if ((totalActiveWriters < owner.writerPoolSize) && (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit)))
             {
-                HDSendPrintLog2("CSender::add disposing of bucket [finished(%d)]", dest);
-                bucket->Release();
+                HDSendPrintLog3("CSender::add (new thread), dest=%d, active=%d", dest, totalActiveWriters);
+                writerPool->start(bucket);
             }
-            else
+            else // an existing writer will pick up
+                target->enqueuePendingBucket(bucket);
+        }
+        void checkSendersFinished()
+        {
+            // check if any target has stopped and clear out partial now defunct buckets taking space.
+            if (atomic_read(&stoppedTargets) == 0) // cheap compared to atomic_xchg, so saves a few cycles in common case.
+               return;
+            int numStopped = atomic_xchg(0, &stoppedTargets);
+            if (numStopped)
             {
-                CriticalBlock b(activeWritersLock);
-                if ((totalActiveWriters < owner.writerPoolSize) && (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit)))
+                /* this will be infrequent, scan all.
+                 * NB: This may pick up / clear more than 'numStopped', but that's okay, all it will mean is that another call to checkSendersFinished() will enter here.
+                 */
+                ForEachItemIn(t, targets)
                 {
-                    HDSendPrintLog3("CSender::add (new thread), dest=%d, active=%d", dest, totalActiveWriters);
-                    writerPool->start(bucket);
+                    CTarget *target = targets.item(t);
+                    if (target->getSenderFinished())
+                    {
+                        Owned<CSendBucket> bucket = target->getBucketClear();
+                        if (bucket)
+                            decTotal(bucket->querySize());
+                        loop
+                        {
+                            bucket.setown(target->dequeuePendingBucket());
+                            if (!bucket)
+                                break;
+                            decTotal(bucket->querySize());
+                        }
+                    }
                 }
-                else // an existing writer will pick up
-                    target->enqueuePendingBucket(bucket);
             }
         }
         void process(IRowStream *input)
         {
             owner.ActPrintLog("Distribute send start");
-            reinit();
             CCycleTimer timer;
             rowcount_t totalSent = 0;
             try
@@ -781,13 +794,12 @@ protected:
                         break;
                     unsigned dest = owner.ihash->hash(row)%owner.numnodes;
                     CTarget *target = targets.item(dest);
-                    if (target->getSenderFinished()) // does this need to be thread safe?
+                    if (target->getSenderFinished())
                         ReleaseThorRow(row);
                     else
                     {
                         CSendBucket *bucket = target->queryBucketCreate();
-                        size32_t rs;
-                        bucket->add(row, rs);
+                        size32_t rs = bucket->add(row);
                         totalSent++;
                         {
                             SpinBlock b(totalSzLock);
@@ -799,6 +811,7 @@ protected:
                             add(target->getBucketClear());
                         }
                     }
+                    checkSendersFinished(); // clears out defunct target buckets if any have stopped
                 }
             }
             catch (IException *e)
@@ -839,6 +852,15 @@ protected:
             aborted = true;
             senderFullSem.signal();
         }
+        void markStopped(unsigned target)
+        {
+            if (targets.item(target)->queryMarkSenderFinished())
+            {
+                atomic_inc(&numFinished);
+                atomic_inc(&stoppedTargets);
+            }
+        }
+        void markSelfStopped() { markStopped(self); }
     // IThreadFactory impl.
         virtual IPooledThread *createNew()
         {
@@ -1104,6 +1126,7 @@ public:
             {
                 recvthread.stop();
                 selfstopped = true;
+                sender.markSelfStopped();
             }
             distribDoneSem.wait();
             if (sendException.get())
@@ -1259,6 +1282,7 @@ public:
     virtual void startTX()=0;
     void start()
     {
+        sender.reinit();
         startTX();
         recvthread.start();
         sendthread.start();