瀏覽代碼

Merge pull request #6531 from jakesmith/hpcc-12409

HPCC-12409 Pathological distribute slowdown fix

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
a1d1614f29
共有 2 個文件被更改,包括 270 次插入209 次删除
  1. 266 209
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  2. 4 0
      thorlcr/thorutil/thormisc.hpp

+ 266 - 209
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -188,6 +188,103 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
      */
     class CSender : public CSimpleInterface, implements IThreadFactory, implements IExceptionHandler
     {
+        class CTarget
+        {
+            CSender &owner;
+            unsigned target;
+            atomic_t activeWriters;
+            atomic_t senderFinished;
+            CSendBucketQueue pendingBuckets;
+            mutable CriticalSection crit;
+            Owned<CSendBucket> bucket;
+        public:
+            CTarget(CSender &_owner, unsigned _target) : owner(_owner), target(_target)
+            {
+                atomic_set(&activeWriters, 0);
+                atomic_set(&senderFinished, 0);
+            }
+            ~CTarget()
+            {
+                reset();
+            }
+            void reset()
+            {
+                loop
+                {
+                    CSendBucket *sendBucket = pendingBuckets.dequeueNow();
+                    if (!sendBucket)
+                        break;
+                    ::Release(sendBucket);
+                }
+                bucket.clear();
+                atomic_set(&activeWriters, 0);
+                atomic_set(&senderFinished, 0);
+            }
+            void send(CMessageBuffer &mb)
+            {
+                CriticalBlock b(crit);
+                if (!atomic_read(&senderFinished))
+                {
+                    if (owner.selfPush(target))
+                        assertex(target != owner.self);
+                    if (!owner.sendBlock(target, mb))
+                        atomic_set(&senderFinished, 1);
+                }
+            }
+            inline unsigned getNumPendingBuckets() const
+            {
+                return pendingBuckets.ordinality();
+            }
+            inline CSendBucket *dequeuePendingBucket()
+            {
+                return pendingBuckets.dequeueNow();
+            }
+            inline void enqueuePendingBucket(CSendBucket *bucket)
+            {
+                pendingBuckets.enqueue(bucket);
+            }
+            inline void incActiveWriters()
+            {
+                atomic_inc(&activeWriters);
+                ++owner.totalActiveWriters; // NB: incActiveWriters() is always called within a activeWritersLock crit
+            }
+            inline void decActiveWriters()
+            {
+                atomic_dec(&activeWriters);
+                --owner.totalActiveWriters; // NB: decActiveWriters() is always called within a activeWritersLock crit
+            }
+            inline unsigned getActiveWriters() const
+            {
+                return atomic_read(&activeWriters);
+            }
+            inline bool getSenderFinished() const
+            {
+                return atomic_read(&senderFinished);
+            }
+            inline void checkSenderFinished()
+            {
+                CriticalBlock b(crit);
+                if (!atomic_read(&senderFinished))
+                {
+                    atomic_set(&senderFinished, 1);
+                    atomic_inc(&owner.numFinished);
+                }
+            }
+            inline CSendBucket *queryBucket()
+            {
+                return bucket;
+            }
+            inline CSendBucket *queryBucketCreate()
+            {
+                if (!bucket)
+                    bucket.setown(new CSendBucket(owner.owner, target));
+                return bucket;
+            }
+            inline CSendBucket *getBucketClear()
+            {
+                return bucket.getClear();
+            }
+        };
         /*
          * CWriterHandler, a per thread class and member of the writerPool
          * a write handler, is given an initial CSendBucket and handles the dedup(if applicable)
@@ -203,21 +300,21 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             CDistributorBase &distributor;
             Owned<CSendBucket> _sendBucket;
             unsigned nextPending;
-            bool aborted;
+            CTarget *target;
 
         public:
             IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
             CWriteHandler(CSender &_owner) : owner(_owner), distributor(_owner.owner)
             {
-                aborted = false;
+                target = NULL;
             }
             void init(void *startInfo)
             {
                 nextPending = getRandom()%distributor.numnodes;
                 _sendBucket.setown((CSendBucket *)startInfo);
-                owner.setActiveWriter(_sendBucket->queryDestination(), this);
-                aborted = false;
+                target = owner.targets.item(_sendBucket->queryDestination());
+                target->incActiveWriters();
             }
             void main()
             {
@@ -226,7 +323,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                 size32_t writerTotalSz = 0;
                 size32_t sendSz = 0;
                 MemoryBuffer mb;
-                while (!aborted)
+                while (!owner.aborted)
                 {
                     writerTotalSz += sendBucket->querySize();
                     owner.dedup(sendBucket); // conditional
@@ -243,7 +340,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                             // more added to dest I'm processing?
                             {
                                 CriticalBlock b(owner.activeWritersLock);
-                                sendBucket.setown(owner.pendingBuckets.item(dest)->dequeueNow());
+                                sendBucket.setown(target->dequeuePendingBucket());
                             }
                             if (sendBucket)
                             {
@@ -252,13 +349,13 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                                 continue; // NB: it will flow into else "remote" arm
                             }
                         }
-                        while (!aborted)
+                        while (!owner.aborted)
                         {
                             // JCSMORE check if worth compressing
                             CMessageBuffer msg;
                             fastLZCompressToBuffer(msg, mb.length(), mb.bufferBase());
                             mb.clear();
-                            owner.send(dest, msg);
+                            target->send(msg);
                             sendSz = 0;
                             if (wholeBucket)
                                 break;
@@ -270,79 +367,65 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                     // see if others to process
                     // NB: this will never start processing a bucket for a destination which already has an active writer.
                     CriticalBlock b(owner.activeWritersLock);
-                    owner.setActiveWriter(dest, NULL);
+                    target->decActiveWriters();
                     sendBucket.setown(owner.getAnotherBucket(nextPending));
                     if (!sendBucket)
+                    {
+                        target = NULL; // will be reinitialized to new target in init(), when thread pool thread is reused
                         break;
+                    }
                     dest = sendBucket->queryDestination();
-                    owner.setActiveWriter(dest, this);
+                    target = owner.targets.item(dest);
+                    target->incActiveWriters();
                     HDSendPrintLog3("CWriteHandler, now dealing with (b=%d), size=%d", sendBucket->queryDestination(), sendBucket->querySize());
                 }
             }
             bool canReuse() { return true; }
             bool stop() { return true; }
-            void abort()
-            {
-                aborted = true;
-            }
-        } **activeWriters;
+        };
 
         CDistributorBase &owner;
-        CriticalSection activeWritersLock;
+        mutable CriticalSection activeWritersLock;
         mutable SpinLock totalSzLock;
         SpinLock doDedupLock;
-        PointerIArrayOf<CSendBucket> buckets;
         UnsignedArray candidates;
         size32_t totalSz;
         bool senderFull, doDedup, aborted, initialized;
         Semaphore senderFullSem;
         Linked<IException> exception;
-        OwnedMalloc<bool> senderFinished;
-        unsigned numFinished, dedupSamples, dedupSuccesses, self;
+        atomic_t numFinished;
+        unsigned dedupSamples, dedupSuccesses, self;
         Owned<IThreadPool> writerPool;
-        PointerArrayOf<CSendBucketQueue> pendingBuckets;
-        unsigned numActiveWriters;
+        unsigned totalActiveWriters;
+        PointerArrayOf<CTarget> targets;
 
         void init()
         {
-            unsigned n;
-            for (unsigned n=0; n<owner.numnodes; n++)
-                buckets.append(NULL);
             totalSz = 0;
             senderFull = false;
-            senderFinished.allocateN(owner.numnodes, true);
-            numFinished = 0;
+            atomic_set(&numFinished, 0);
             dedupSamples = dedupSuccesses = 0;
             doDedup = owner.doDedup;
             writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
             self = owner.activity->queryJob().queryMyRank()-1;
-            for (n=0; n<owner.numnodes; n++)
-                pendingBuckets.append(new CSendBucketQueue);
-            numActiveWriters = 0;
+
+            targets.ensure(owner.numnodes);
+            for (unsigned n=0; n<owner.numnodes; n++)
+                targets.append(new CTarget(*this, n));
+
+            totalActiveWriters = 0;
             aborted = false;
-            activeWriters = new CWriteHandler *[owner.numnodes];
-            memset(activeWriters, 0, owner.numnodes * sizeof(CWriteHandler *));
             initialized = true;
         }
         void reset()
         {
-            assertex(0 == numActiveWriters);
+            assertex(0 == totalActiveWriters);
             // unless it was aborted, there shouldn't be any pending or non-null buckets
             for (unsigned n=0; n<owner.numnodes; n++)
-            {
-                CSendBucketQueue *queue = pendingBuckets.item(n);
-                loop
-                {
-                    CSendBucket *bucket = queue->dequeueNow();
-                    if (!bucket)
-                        break;
-                    ::Release(bucket);
-                }
-                buckets.replace(NULL, n);
-            }
+                targets.item(n)->reset();
             totalSz = 0;
             senderFull = false;
-            numFinished = 0;
+            atomic_set(&numFinished, 0);
             aborted = false;
         }
         void reinit()
@@ -352,6 +435,11 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             else
                 init();
         }
+        unsigned queryInactiveWriters() const
+        {
+            CriticalBlock b(activeWritersLock);
+            return owner.writerPoolSize - totalActiveWriters;
+        }
         void dedup(CSendBucket *sendBucket)
         {
             {
@@ -380,42 +468,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                 }
             }
         }
-        CSendBucket *queryBucket(unsigned n)
-        {
-            return buckets.item(n);
-        }
-        CSendBucket *queryBucketCreate(unsigned n)
-        {
-            CSendBucket *bucket = buckets.item(n);
-            if (!bucket)
-            {
-                bucket = new CSendBucket(owner, n);
-                buckets.replace(bucket, n);
-            }
-            return bucket;
-        }
-        CSendBucket *getBucketClear(unsigned n)
-        {
-            Linked<CSendBucket> bucket = buckets.item(n);
-            if (!bucket)
-                return NULL;
-            buckets.replace(NULL, n);
-            return bucket.getClear();
-        }
-        void send(unsigned dest, CMessageBuffer &mb)
-        {
-            if (!senderFinished[dest])
-            {
-                if (selfPush(dest))
-                    assertex(dest != self);
-                if (!owner.sendBlock(dest, mb))
-                {
-                    ActPrintLog(owner.activity, "CDistributorBase::sendBlock stopped slave %d", dest+1);
-                    senderFinished[dest] = true;
-                    numFinished++;
-                }
-            }
-        }
         void decTotal(size32_t sz)
         {
             SpinBlock b(totalSzLock);
@@ -432,7 +484,15 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             SpinBlock b(totalSzLock);
             return totalSz;
         }
-        inline bool selfPush(unsigned i)
+        inline bool sendBlock(unsigned i, CMessageBuffer &msg)
+        {
+            if (owner.sendBlock(i, msg))
+                return true;
+            atomic_inc(&numFinished);
+            ActPrintLog(owner.activity, "CSender::sendBlock stopped slave %d (finished=%d)", i+1, atomic_read(&numFinished));
+            return false;
+        }
+        inline bool selfPush(unsigned i) const
         {
             return (i==self)&&!owner.pull;
         }
@@ -447,7 +507,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                     try
                     {
                         nullMsg.clear();
-                        owner.sendBlock(i, nullMsg);
+                        sendBlock(i, nullMsg);
                     }
                     catch (IException *e)
                     {
@@ -469,19 +529,8 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
         {
             if (initialized)
             {
-                delete [] activeWriters;
                 for (unsigned n=0; n<owner.numnodes; n++)
-                {
-                    CSendBucketQueue *queue = pendingBuckets.item(n);
-                    loop
-                    {
-                        CSendBucket *bucket = queue->dequeueNow();
-                        if (!bucket)
-                            break;
-                        ::Release(bucket);
-                    }
-                    delete queue;
-                }
+                    delete targets.item(n);
             }
         }
         CSendBucket *getAnotherBucket(unsigned &next)
@@ -490,15 +539,15 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             unsigned start = next;
             loop
             {
-                unsigned current=next;
-                unsigned c = pendingBuckets.item(current)->ordinality();
+                CTarget *target = targets.item(next);
+                unsigned c = target->getNumPendingBuckets();
                 ++next;
                 if (next>=owner.numnodes)
                     next = 0;
                 if (c)
                 {
-                    if (NULL == activeWriters[current])
-                        return pendingBuckets.item(current)->dequeueNow();
+                    if (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit))
+                        return target->dequeuePendingBucket();
                 }
                 if (next == start)
                     return NULL;
@@ -506,13 +555,11 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
         }
         void add(CSendBucket *bucket)
         {
-            if (owner.selfstopped && !senderFinished[self])
-            {
-                senderFinished[self] = true;
-                ++numFinished;
-            }
+            if (owner.selfstopped)
+                targets.item(self)->checkSenderFinished();
             unsigned dest = bucket->queryDestination();
-            if (senderFinished[dest])
+            CTarget *target = targets.item(dest);
+            if (target->getSenderFinished())
             {
                 HDSendPrintLog2("CSender::add disposing of bucket [finished(%d)]", dest);
                 bucket->Release();
@@ -520,71 +567,14 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             else
             {
                 CriticalBlock b(activeWritersLock);
-                CWriteHandler *writer = activeWriters[dest];
-                if (!writer && (numActiveWriters < owner.writerPoolSize))
+                if ((totalActiveWriters < owner.writerPoolSize) && (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit)))
                 {
-                    HDSendPrintLog2("CSender::add (new thread), dest=%d", dest);
+                    HDSendPrintLog3("CSender::add (new thread), dest=%d, active=%d", dest, totalActiveWriters);
                     writerPool->start(bucket);
                 }
                 else // an existing writer will pick up
-                    pendingBuckets.item(dest)->enqueue(bucket);
-            }
-        }
-        void setActiveWriter(unsigned n, CWriteHandler *writer)
-        {
-            if (writer)
-            {
-                assertex(!activeWriters[n]);
-                ++numActiveWriters;
-            }
-            else
-            {
-                assertex(activeWriters[n]);
-                --numActiveWriters;
-            }
-            activeWriters[n] = writer;
-        }
-        unsigned getSendCandidate(bool &doSelf)
-        {
-            unsigned i;
-            unsigned maxsz=0;
-            for (i=0;i<owner.numnodes;i++)
-            {
-                CSendBucket *bucket = queryBucket(i);
-                if (bucket)
-                {
-                    size32_t bucketSz = bucket->querySize();
-                    if (bucketSz > maxsz)
-                        maxsz = bucketSz;
-                }
-            }
-            doSelf = false;
-            if (0 == maxsz)
-                return NotFound;
-            candidates.kill();
-            for (i=0; i<owner.numnodes; i++)
-            {
-                CSendBucket *bucket = queryBucket(i);
-                if (bucket)
-                {
-                    size32_t bucketSz = bucket->querySize();
-                    if (bucketSz > maxsz/2)
-                    {
-                        if (i==self)
-                            doSelf = true;
-                        else
-                            candidates.append(i);
-                    }
-                }
+                    target->enqueuePendingBucket(bucket);
             }
-            if (0 == candidates.ordinality())
-                return NotFound;
-            unsigned h;
-            if (candidates.ordinality()==1)
-                h = candidates.item(0);
-            else
-                h = candidates.item(getRandom()%candidates.ordinality());
-            return h;
         }
         void process(IRowStream *input)
         {
@@ -594,7 +584,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             rowcount_t totalSent = 0;
             try
             {
-                while (!aborted && numFinished < owner.numnodes)
+                while (!aborted && atomic_read(&numFinished) < owner.numnodes)
                 {
                     while (queryTotalSz() >= owner.inputBufferSize)
                     {
@@ -602,40 +592,110 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                             break;
 
                         HDSendPrintLog("process exceeded inputBufferSize");
-                        bool doSelf;
-                        unsigned which = getSendCandidate(doSelf);
-                        if (NotFound != which)
-                        {
-                            HDSendPrintLog3("process exceeded: send to %d, size=%d", which, queryBucket(which)->querySize());
-                            add(getBucketClear(which));
-                        }
-                        if (doSelf)
+
+                        // establish largest partial bucket
+                        unsigned maxSz=0;
+                        if (queryInactiveWriters())
                         {
-                            HDSendPrintLog2("process exceeded: doSelf, size=%d", queryBucket(self)->querySize());
-                            add(getBucketClear(self));
+                            for (unsigned i=0; i<owner.numnodes; i++)
+                            {
+                                CSendBucket *bucket = targets.item(i)->queryBucket();
+                                if (bucket)
+                                {
+                                    size32_t bucketSz = bucket->querySize();
+                                    if (bucketSz > maxSz)
+                                        maxSz = bucketSz;
+                                    HDSendPrintLog4("b[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
+                                }
+                            }
                         }
-                        else if (NotFound == which) // i.e. none
+                        /* Only add buckets if some inactive writers
+                         * choose larger candidate buckets to targets that are inactive
+                         * and randomize from that list which are queued to writers
+                         */
+                        if (maxSz)
                         {
-                            HDSendPrintLog("process exceeded inputBufferSize, none to send");
+                            // pick candidates that are at >= 50% size of largest
+                            candidates.clear();
+                            bool doSelf = false;
+                            unsigned inactiveWriters = queryInactiveWriters();
+                            for (unsigned i=0; i<owner.numnodes; i++)
                             {
-                                SpinBlock b(totalSzLock);
-                                // some may have been freed after lock
-                                if (totalSz < owner.inputBufferSize)
-                                    break;
-                                senderFull = true;
+                                CTarget *target = targets.item(i);
+                                CSendBucket *bucket = target->queryBucket();
+                                if (bucket)
+                                {
+                                    size32_t bucketSz = bucket->querySize();
+                                    if (bucketSz >= maxSz/2)
+                                    {
+                                        if (0 == target->getActiveWriters()) // only if there are no active writer threads for this target
+                                        {
+                                            if (i==self)
+                                                doSelf = true; // always send to self if candidate
+                                            else
+                                            {
+                                                candidates.append(i);
+                                                HDSendPrintLog4("c[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
+                                                /* NB: in theory could be more if some finished since checking, but that's okay
+                                                 * some candidates, or free space will be picked up in next section
+                                                 */
+                                                if (candidates.ordinality() >= inactiveWriters)
+                                                    break;
+                                            }
+                                        }
+                                    }
+                                }
                             }
-                            loop
+                            unsigned limit = owner.candidateLimit;
+                            while (candidates.ordinality())
                             {
-                                if (timer.elapsedCycles() >= queryOneSecCycles()*10)
-                                    ActPrintLog(owner.activity, "HD sender, waiting for space");
-                                timer.reset();
-
-                                if (senderFullSem.wait(10000))
-                                    break;
-                                if (aborted)
+                                if (0 == queryInactiveWriters())
                                     break;
+                                else
+                                {
+                                    unsigned pos = getRandom()%candidates.ordinality();
+                                    unsigned c = candidates.item(pos);
+                                    CTarget *target = targets.item(c);
+                                    CSendBucket *bucket = target->queryBucket();
+                                    assertex(bucket);
+                                    HDSendPrintLog3("process exceeded: send to %d, size=%d", c, bucket->querySize());
+                                    add(target->getBucketClear());
+                                    if (limit)
+                                    {
+                                        --limit;
+                                        if (0 == limit)
+                                            break;
+                                    }
+                                    candidates.remove(pos);
+                                }
+                            }
+                            if (doSelf)
+                            {
+                                CTarget *target = targets.item(self);
+                                CSendBucket *bucket = target->queryBucket();
+                                assertex(bucket);
+                                HDSendPrintLog2("process exceeded: doSelf, size=%d", bucket->querySize());
+                                add(target->getBucketClear());
                             }
                         }
+                        {
+                            SpinBlock b(totalSzLock);
+                            // some may have been written by now
+                            if (totalSz < owner.inputBufferSize)
+                                break;
+                            senderFull = true;
+                        }
+                        loop
+                        {
+                            if (timer.elapsedCycles() >= queryOneSecCycles()*10)
+                                ActPrintLog(owner.activity, "HD sender, waiting for space, active writers = %d", queryInactiveWriters());
+                            timer.reset();
+
+                            if (senderFullSem.wait(10000))
+                                break;
+                            if (aborted)
+                                break;
+                        }
                     }
                     if (aborted)
                         break;
@@ -643,11 +703,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                     if (!row)
                         break;
                     unsigned dest = owner.ihash->hash(row)%owner.numnodes;
-                    if (senderFinished[dest]) // does this need to be thread safe?
+                    CTarget *target = targets.item(dest);
+                    if (target->getSenderFinished()) // does this need to be thread safe?
                         ReleaseThorRow(row);
                     else
                     {
-                        CSendBucket *bucket = queryBucketCreate(dest);
+                        CSendBucket *bucket = target->queryBucketCreate();
                         size32_t rs;
                         bucket->add(row, rs);
                         totalSent++;
@@ -658,7 +719,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                         if (bucket->querySize() >= owner.bucketSendSize)
                         {
                             HDSendPrintLog3("adding new bucket: %d, size = %d", bucket->queryDestination(), bucket->querySize());
-                            add(getBucketClear(dest));
+                            add(target->getBucketClear());
                         }
                     }
                 }
@@ -678,7 +739,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                 ForEach(*iter)
                 {
                     unsigned dest=iter->get();
-                    Owned<CSendBucket> bucket = getBucketClear(dest);
+                    Owned<CSendBucket> bucket = targets.item(dest)->getBucketClear();
                     HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
                     if (bucket && bucket->querySize())
                     {
@@ -700,16 +761,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                 return;
             aborted = true;
             senderFullSem.signal();
-            if (initialized)
-            {
-                CriticalBlock b(activeWritersLock);
-                for (unsigned w=0; w<owner.numnodes; w++)
-                {
-                    CWriteHandler *writer = activeWriters[w];
-                    if (writer)
-                        writer->abort();
-                }
-            }
         }
     // IThreadFactory impl.
         virtual IPooledThread *createNew()
@@ -798,6 +849,8 @@ protected:
     CriticalSection putsect;
     bool pull, aborted;
     CSender sender;
+    unsigned candidateLimit;
+    unsigned targetWriterLimit;
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -811,9 +864,9 @@ public:
         iCompare = NULL;
         ihash = NULL;
         fixedEstSize = 0;
-        bucketSendSize = globals->getPropInt("@hd_out_buffer_size", DEFAULT_OUT_BUFFER_SIZE);
+        bucketSendSize = activity->getOptUInt(THOROPT_HDIST_BUCKET_SIZE, DEFAULT_OUT_BUFFER_SIZE);
         istop = _istop;
-        inputBufferSize = globals->getPropInt("@hd_in_buffer_size", DEFAULT_IN_BUFFER_SIZE);
+        inputBufferSize = activity->getOptUInt(THOROPT_HDIST_BUFFER_SIZE, DEFAULT_IN_BUFFER_SIZE);
         pullBufferSize = DISTRIBUTE_PULL_BUFFER_SIZE;
         selfstopped = false;
         pull = false;
@@ -823,9 +876,14 @@ public:
         if (allowSpill)
             ActPrintLog(activity, "Using spilling buffer (will spill if overflows)");
         writerPoolSize = activity->getOptUInt(THOROPT_HDIST_WRITE_POOL_SIZE, DEFAULT_WRITEPOOLSIZE);
-        if (writerPoolSize>numnodes)
-            writerPoolSize = numnodes; // no point in more
+        if (writerPoolSize>(numnodes*2))
+            writerPoolSize = numnodes*2; // limit to 2 per target
         ActPrintLog(activity, "Writer thread pool size : %d", writerPoolSize);
+        candidateLimit = activity->getOptUInt(THOROPT_HDIST_CANDIDATELIMIT);
+        ActPrintLog(activity, "candidateLimit : %d", candidateLimit);
+        ActPrintLog(activity, "inputBufferSize : %d, bucketSendSize = %d", inputBufferSize, bucketSendSize);
+        targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT);
+        ActPrintLog(activity, "targetWriterLimit : %d", targetWriterLimit);
     }
 
     ~CDistributorBase()
@@ -1013,7 +1071,6 @@ public:
             piperd->stop();
         pipewr.clear();
         ActPrintLog(activity, "HDIST: Read loop done");
-
     }
 
     void sendloop()

+ 4 - 0
thorlcr/thorutil/thormisc.hpp

@@ -49,6 +49,10 @@
 #define THOROPT_COMPRESS_SPILLS       "compressInternalSpills"  // Compress internal spills, e.g. spills created by lookahead or sort gathering  (default = true)
 #define THOROPT_HDIST_SPILL           "hdistSpill"              // Allow distribute receiver to spill to disk, rather than blocking              (default = true)
 #define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize"       // Distribute send thread pool size                                              (default = 16)
+#define THOROPT_HDIST_BUCKET_SIZE     "hd_out_buffer_size"      // Distribute target bucket send size                                            (default = 1MB)
+#define THOROPT_HDIST_BUFFER_SIZE     "hd_in_buffer_size"       // Distribute send buffer size (for all targets)                                 (default = 32MB)
+#define THOROPT_HDIST_CANDIDATELIMIT  "hdCandidateLimit"        // Limits # of buckets to push to the writers when send buffer is full           (default = is 50% largest)
+#define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit"          // Limit # of writer threads working on a single target                          (default = unbound, but picks round-robin)
 #define THOROPT_SPLITTER_SPILL        "splitterSpill"           // Force splitters to spill or not, default is to adhere to helper setting       (default = -1)
 #define THOROPT_LOOP_MAX_EMPTY        "loopMaxEmpty"            // Max # of iterations that LOOP can cycle through with 0 results before errors  (default = 1000)
 #define THOROPT_SMALLSORT             "smallSortThreshold"      // Use minisort approach, if estimate size of data to sort is below this setting (default = 0)