浏览代码

HPCC-21049 Thor implementation of DISTRIBUTE,ALL

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 6 年之前
父节点
当前提交
b1f73305bb

+ 1 - 2
testing/regress/ecl/distriball1.ecl

@@ -17,7 +17,6 @@
 
 //Some of these tests need to be verified by looking at the generate code
 //MORE: Remove these lines when the code has been implemented in the engines.
-//nothor
 //noroxie
 //nohthor
 
@@ -31,7 +30,7 @@ o1 := output(count(d1) - numRows * CLUSTERSIZE);
 
 //Check combinations of distributes are not combined
 d2 := distribute(d1, ALL);
-o2 := output(count(d1) - numRows * CLUSTERSIZE * CLUSTERSIZE);
+o2 := output(count(d2) - numRows * CLUSTERSIZE * CLUSTERSIZE);
 
 //Check that distribute does not remove distribute,set
 d3a := distribute(d1, hash(id));

+ 0 - 1
testing/regress/ecl/distriball2.ecl

@@ -17,7 +17,6 @@
 
 //Some of these tests need to be verified by looking at the generate code
 //MORE: Remove these lines when the code has been implemented in the engines.
-//nothor
 //noroxie
 //nohthor
 

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -177,7 +177,7 @@ public:
     {
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
-        distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, this, "FetchStream");
+        distributor = createHashDistributor(&owner, owner.queryContainer().queryJobChannel().queryJobComm(), tag, false, false, this, "FetchStream");
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL, NULL));
     }
     virtual IRowStream *queryOutput() override { return this; }

+ 5 - 3
thorlcr/activities/hashdistrib/thhashdistrib.cpp

@@ -37,9 +37,6 @@ class HashDistributeMasterBase : public CMasterActivity
     DistributeMode mode;
     mptag_t mptag;
     mptag_t mptag2; // for tag 2
-    bool redistribute;
-    double skew;
-    double targetskew;
 public:
     HashDistributeMasterBase(DistributeMode _mode, CMasterGraphElement *info) 
         : CMasterActivity(info), mode(_mode) 
@@ -254,6 +251,11 @@ CActivityBase *createHashDistributeActivityMaster(CMasterGraphElement *container
         return new HashDistributeActivityMaster(DM_distrib, container);
 }
 
+CActivityBase *createNWayDistributeActivityMaster(CMasterGraphElement *container)
+{
+    return new HashDistributeActivityMaster(DM_distrib, container);
+}
+
 CActivityBase *createDistributeMergeActivityMaster(CMasterGraphElement *container)
 {
     return new HashDistributeActivityMaster(DM_distribmerge, container);

+ 1 - 0
thorlcr/activities/hashdistrib/thhashdistrib.ipp

@@ -20,6 +20,7 @@
 
 #include "thactivitymaster.ipp"
 
+CActivityBase *createNWayDistributeActivityMaster(CMasterGraphElement *info);
 CActivityBase *createHashDistributeActivityMaster(CMasterGraphElement *info);
 CActivityBase *createDistributeMergeActivityMaster(CMasterGraphElement *info);
 CActivityBase *createHashDedupMergeActivityMaster(CMasterGraphElement *info);

+ 280 - 178
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -89,7 +89,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
     Semaphore distribDoneSem, localFinishedSem;
     ICompare *iCompare, *keepBestCompare;
     size32_t bucketSendSize;
-    bool doDedup, allowSpill, connected, selfstopped;
+    bool doDedup, allowSpill, connected, selfstopped, isAll;
     Owned<IException> sendException, recvException;
     IStopInput *istop;
     size32_t fixedEstSize;
@@ -98,19 +98,20 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
 
 protected:
     /*
-     * CSendBucket - a collection of rows destined for a particular destination target(slave)
+     * CSendBucket - a collection of rows destined for a particular target. A target can be a slave, or ALL
      */
+    class CTarget;
     class CSendBucket : implements IRowStream, public CSimpleInterface
     {
         CDistributorBase &owner;
         size32_t total;
         ThorRowQueue rows;
-        unsigned destination;
+        CTarget *target;
         CThorExpandingRowArray dedupList;
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CSendBucket(CDistributorBase &_owner, unsigned _destination) : owner(_owner), destination(_destination),
+        CSendBucket(CDistributorBase &_owner, CTarget *_target): owner(_owner), target(_target),
                 dedupList(*owner.activity, owner.rowIf)
         {
             total = 0;
@@ -126,6 +127,12 @@ protected:
             }
         }
         unsigned count() const { return rows.ordinality(); }
+        const void *get(unsigned r) const
+        {
+            const void *row = rows.item(r);
+            LinkThorRow(row);
+            return row;
+        }
         bool dedup(ICompare *iCompare, ICompare *keepBestCompare) // returns true if reduces by >= 10%
         {
             unsigned c = rows.ordinality();
@@ -192,7 +199,7 @@ protected:
             rows.enqueue(row);
             return rs;
         }
-        unsigned queryDestination() const { return destination; }
+        CTarget *queryTarget() const { return target; }
         size32_t querySize() const { return total; }
         size32_t serializeClear(MemoryBuffer &dstMb)
         {
@@ -284,103 +291,88 @@ protected:
         virtual void stop() { }
     };
     typedef SimpleInterThreadQueueOf<CSendBucket, false> CSendBucketQueue;
+
+    class CSender;
+
+    /* A CTarget exists per destination slave
+     * OR in the case of ALL, a single CTarget exists that sends to all slaves except self.
+     */
+    class CTarget
+    {
+        CSender &owner;
+        unsigned destination = 0; // NB: not used in ALL case
+        std::atomic<unsigned> activeWriters{0};
+        CSendBucketQueue pendingBuckets;
+        mutable CriticalSection crit;
+        Owned<CSendBucket> bucket;
+        StringAttr info;
+        bool self = false;
+    public:
+        CTarget(CSender &_owner, bool _self, const char *_info) : owner(_owner), self(_self), info(_info)
+        {
+        }
+        CTarget(CSender &_owner, unsigned _destination, bool _self, const char *_info) : CTarget(_owner, _self, _info)
+        {
+            destination = _destination;
+        }
+        ~CTarget()
+        {
+            reset();
+        }
+        void reset()
+        {
+            for (;;)
+            {
+                CSendBucket *sendBucket = pendingBuckets.dequeueNow();
+                if (!sendBucket)
+                    break;
+                ::Release(sendBucket);
+            }
+            bucket.clear();
+            activeWriters = 0;
+        }
+        void send(CMessageBuffer &mb); // Not used for ALL
+        void sendToOthers(CMessageBuffer &mb); // Only used by ALL
+        inline unsigned getNumPendingBuckets() const
+        {
+            return pendingBuckets.ordinality();
+        }
+        inline CSendBucket *dequeuePendingBucket()
+        {
+            return pendingBuckets.dequeueNow();
+        }
+        inline void enqueuePendingBucket(CSendBucket *bucket)
+        {
+            pendingBuckets.enqueue(bucket);
+        }
+        void incActiveWriters();
+        void decActiveWriters();
+        inline unsigned getActiveWriters() const
+        {
+            return activeWriters;
+        }
+        inline CSendBucket *queryBucket()
+        {
+            return bucket;
+        }
+        CSendBucket *queryBucketCreate();
+        inline CSendBucket *getBucketClear()
+        {
+            return bucket.getClear();
+        }
+        bool isSelf() const
+        {
+            return self;
+        }
+        const char *queryInfo() const { return info; }
+    };
+
     /*
      * CSender, main send loop functionality
      * processes input, constructs CSendBucket's and manages creation CWriteHandler threads
      */
     class CSender : implements IThreadFactory, implements IExceptionHandler, public CSimpleInterface
     {
-        class CTarget
-        {
-            CSender &owner;
-            unsigned target;
-            atomic_t activeWriters;
-            atomic_t senderFinished;
-            CSendBucketQueue pendingBuckets;
-            mutable CriticalSection crit;
-            Owned<CSendBucket> bucket;
-        public:
-            CTarget(CSender &_owner, unsigned _target) : owner(_owner), target(_target)
-            {
-                atomic_set(&activeWriters, 0);
-                atomic_set(&senderFinished, 0);
-            }
-            ~CTarget()
-            {
-                reset();
-            }
-            void reset()
-            {
-                for (;;)
-                {
-                    CSendBucket *sendBucket = pendingBuckets.dequeueNow();
-                    if (!sendBucket)
-                        break;
-                    ::Release(sendBucket);
-                }
-                bucket.clear();
-                atomic_set(&activeWriters, 0);
-                atomic_set(&senderFinished, 0);
-            }
-            void send(CMessageBuffer &mb)
-            {
-                CriticalBlock b(crit); // protects against multiple senders to the same target
-                if (!atomic_read(&senderFinished))
-                {
-                    if (owner.selfPush(target))
-                        assertex(target != owner.self);
-                    owner.sendBlock(target, mb);
-                }
-            }
-            inline unsigned getNumPendingBuckets() const
-            {
-                return pendingBuckets.ordinality();
-            }
-            inline CSendBucket *dequeuePendingBucket()
-            {
-                return pendingBuckets.dequeueNow();
-            }
-            inline void enqueuePendingBucket(CSendBucket *bucket)
-            {
-                pendingBuckets.enqueue(bucket);
-            }
-            inline void incActiveWriters()
-            {
-                atomic_inc(&activeWriters);
-                ++owner.totalActiveWriters; // NB: incActiveWriters() is always called within a activeWritersLock crit
-            }
-            inline void decActiveWriters()
-            {
-                atomic_dec(&activeWriters);
-                --owner.totalActiveWriters; // NB: decActiveWriters() is always called within a activeWritersLock crit
-            }
-            inline unsigned getActiveWriters() const
-            {
-                return atomic_read(&activeWriters);
-            }
-            inline bool getSenderFinished() const
-            {
-                return atomic_read(&senderFinished) != 0;
-            }
-            inline CSendBucket *queryBucket()
-            {
-                return bucket;
-            }
-            inline CSendBucket *queryBucketCreate()
-            {
-                if (!bucket)
-                    bucket.setown(new CSendBucket(owner.owner, target));
-                return bucket;
-            }
-            inline CSendBucket *getBucketClear()
-            {
-                return bucket.getClear();
-            }
-            bool queryMarkSenderFinished()
-            {
-                return atomic_cas(&senderFinished, 1, 0);
-            }
-        };
         /*
          * CWriterHandler, a per thread class and member of the writerPool
          * a write handler, is given an initial CSendBucket and handles the dedup(if applicable)
@@ -388,7 +380,7 @@ protected:
          * If the size serialized, is below a threshold, it will see if more has been queued
          * in the interim and serialize that compress and searilize within the same send/recv cycle
          * When done, it will see if more queue available.
-         * NB: There will be at most 1 writer per destination target (up to thread pool limit)
+         * NB: There will be at most 1 writer per target (up to thread pool limit)
          */
         class CWriteHandler : implements IPooledThread, public CSimpleInterface
         {
@@ -408,15 +400,14 @@ protected:
             }
             virtual void init(void *startInfo) override
             {
-                nextPending = getRandom()%distributor.numnodes;
+                nextPending = getRandom()%owner.targets.ordinality();
                 _sendBucket.setown((CSendBucket *)startInfo);
-                target = owner.targets.item(_sendBucket->queryDestination());
+                target = _sendBucket->queryTarget();
                 target->incActiveWriters();
             }
             virtual void threadmain() override
             {
                 Owned<CSendBucket> sendBucket = _sendBucket.getClear();
-                unsigned dest = sendBucket->queryDestination();
                 size32_t writerTotalSz = 0;
                 size32_t sendSz = 0;
                 CMessageBuffer msg;
@@ -425,14 +416,19 @@ protected:
                     writerTotalSz += sendBucket->querySize(); // NB: This size is pre-dedup, and is the correct amount to pass to decTotal
                     owner.dedup(sendBucket); // conditional
 
-                    if (owner.selfPush(dest))
+                    if (target->isSelf())
                     {
                         HDSendPrintLog2("CWriteHandler, sending raw=%d to LOCAL", writerTotalSz);
-                        if (!target->getSenderFinished())
-                            distributor.addLocal(sendBucket);
+                        if (!owner.getSelfFinished())
+                            distributor.addLocalClear(sendBucket);
                     }
                     else // remote
                     {
+                        if (owner.owner.isAll)
+                        {
+                            if (!owner.getSelfFinished())
+                                distributor.addLocal(sendBucket);
+                        }
                         if (compressor)
                             sendSz += sendBucket->serializeCompressClear(msg, *compressor);
                         else
@@ -440,22 +436,24 @@ protected:
                         // NB: buckets will typically be large enough already, if not check pending buckets
                         if (sendSz < distributor.bucketSendSize)
                         {
-                            // more added to dest I'm processing?
+                            // more added to target I'm processing?
                             sendBucket.setown(target->dequeuePendingBucket());
                             if (sendBucket)
                             {
-                                HDSendPrintLog3("CWriteHandler, pending(b=%d) rolled, size=%d", sendBucket->queryDestination(), sendBucket->querySize());
+                                HDSendPrintLog3("CWriteHandler, pending(target=%s) rolled, size=%d", sendBucket->queryTarget()->queryInfo(), sendBucket->querySize());
                                 // NB: if was just < bucketSendSize and pending is ~ bucketSendSize, could mean we send is ~2*bucketSendSize, but that's ok.
                                 continue; // NB: it will flow into else "remote" arm
                             }
                         }
-                        if (!target->getSenderFinished())
+                        if (owner.owner.isAll)
+                            target->sendToOthers(msg);
+                        else
                             target->send(msg);
                         sendSz = 0;
                         msg.clear();
                     }
                     // see if others to process
-                    // NB: this will never start processing a bucket for a destination which already has an active writer.
+                    // NB: this will never start processing a bucket for a target which already has an active writer.
                     CriticalBlock b(owner.activeWritersLock);
                     owner.decTotal(writerTotalSz);
                     target->decActiveWriters();
@@ -466,10 +464,9 @@ protected:
                         break;
                     }
                     writerTotalSz = 0; // now reset for new bucket to send
-                    dest = sendBucket->queryDestination();
-                    target = owner.targets.item(dest);
+                    target = sendBucket->queryTarget();
                     target->incActiveWriters();
-                    HDSendPrintLog3("CWriteHandler, now dealing with (b=%d), size=%d", sendBucket->queryDestination(), sendBucket->querySize());
+                    HDSendPrintLog3("CWriteHandler, now dealing with (target=%s), size=%d", sendBucket->queryTarget()->queryInfo(), sendBucket->querySize());
                 }
             }
             virtual bool canReuse() const override { return true; }
@@ -481,7 +478,7 @@ protected:
         mutable SpinLock totalSzLock; // MORE: Could possibly use an atomic to reduce the scope of this spin lock
         SpinLock doDedupLock;
         IPointerArrayOf<CSendBucket> buckets;
-        UnsignedArray candidates;
+        PointerArrayOf<CTarget> candidates;
         size32_t totalSz;
         bool senderFull, doDedup, aborted, initialized;
         Semaphore senderFullSem;
@@ -492,6 +489,7 @@ protected:
         Owned<IThreadPool> writerPool;
         unsigned totalActiveWriters;
         PointerArrayOf<CTarget> targets;
+        std::atomic<bool> *sendersFinished = nullptr;
 
         void init()
         {
@@ -504,9 +502,20 @@ protected:
             writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
             self = owner.activity->queryJobChannel().queryMyRank()-1;
 
-            targets.ensure(owner.numnodes);
-            for (unsigned n=0; n<owner.numnodes; n++)
-                targets.append(new CTarget(*this, n));
+            sendersFinished = new std::atomic<bool>[owner.numnodes];
+            for (unsigned dest=0; dest<owner.numnodes; dest++)
+                sendersFinished[dest] = false;
+            if (owner.isAll)
+                targets.append(new CTarget(*this, 0, false, "DESTINATION=ALL"));
+            else
+            {
+                targets.ensure(owner.numnodes);
+                for (unsigned n=0; n<owner.numnodes; n++)
+                {
+                    VStringBuffer info("DESTINATION=%u", n);
+                    targets.append(new CTarget(*this, n, self==n && !owner.pull, info.str()));
+                }
+            }
 
             totalActiveWriters = 0;
             aborted = false;
@@ -516,8 +525,10 @@ protected:
         {
             assertex(0 == totalActiveWriters);
             // unless it was aborted, there shouldn't be any pending or non-null buckets
-            for (unsigned n=0; n<owner.numnodes; n++)
-                targets.item(n)->reset();
+            ForEachItemIn(t, targets)
+                targets.item(t)->reset();
+            for (unsigned dest=0; dest<owner.numnodes; dest++)
+                sendersFinished[dest] = false;
             totalSz = 0;
             senderFull = false;
             atomic_set(&numFinished, 0);
@@ -529,6 +540,19 @@ protected:
             CriticalBlock b(activeWritersLock);
             return owner.writerPoolSize - totalActiveWriters;
         }
+        inline bool getSenderFinished(unsigned dest) const
+        {
+            return sendersFinished[dest];
+        }
+        inline bool getSelfFinished() const
+        {
+            return sendersFinished[self];
+        }
+        bool queryMarkSenderFinished(unsigned dest)
+        {
+            bool expectedState = false;
+            return sendersFinished[dest].compare_exchange_strong(expectedState, true);
+        }
         void dedup(CSendBucket *sendBucket)
         {
             {
@@ -580,22 +604,18 @@ protected:
             markStopped(target); // Probably a bit pointless if target is 'self' - process loop will have done already
             owner.ActPrintLog("CSender::sendBlock stopped slave %d (finished=%d)", target+1, atomic_read(&numFinished));
         }
-        inline bool selfPush(unsigned i) const
-        {
-            return (i==self)&&!owner.pull;
-        }
         void closeWrite()
         {
             unsigned i;
             CMessageBuffer nullMsg;
-            for (i=0; i<owner.numnodes; i++)
+            for (unsigned dest=0; dest<owner.numnodes; dest++)
             {
-                if (!selfPush(i))
+                if (owner.pull || dest != self)
                 {
                     try
                     {
                         nullMsg.clear();
-                        sendBlock(i, nullMsg);
+                        sendBlock(dest, nullMsg);
                     }
                     catch (IException *e)
                     {
@@ -617,8 +637,9 @@ protected:
         {
             if (initialized)
             {
-                for (unsigned n=0; n<owner.numnodes; n++)
-                    delete targets.item(n);
+                ForEachItemIn(t, targets)
+                    delete targets.item(t);
+                delete sendersFinished;
             }
         }
         void reinit()
@@ -637,7 +658,7 @@ protected:
                 CTarget *target = targets.item(next);
                 unsigned c = target->getNumPendingBuckets();
                 ++next;
-                if (next>=owner.numnodes)
+                if (next>=targets.ordinality())
                     next = 0;
                 if (c)
                 {
@@ -650,12 +671,11 @@ protected:
         }
         void add(CSendBucket *bucket)
         {
-            unsigned dest = bucket->queryDestination();
-            CTarget *target = targets.item(dest);
+            CTarget *target = bucket->queryTarget();
             CriticalBlock b(activeWritersLock);
             if ((totalActiveWriters < owner.writerPoolSize) && (!owner.targetWriterLimit || (target->getActiveWriters() < owner.targetWriterLimit)))
             {
-                HDSendPrintLog3("CSender::add (new thread), dest=%d, active=%d", dest, totalActiveWriters);
+                HDSendPrintLog3("CSender::add (new thread), target=%s, active=%u", target->queryInfo(), totalActiveWriters);
                 writerPool->start(bucket);
             }
             else // an existing writer will pick up
@@ -672,11 +692,11 @@ protected:
                 /* this will be infrequent, scan all.
                  * NB: This may pick up / clear more than 'numStopped', but that's okay, all it will mean is that another call to checkSendersFinished() will enter here.
                  */
-                ForEachItemIn(t, targets)
+                for (unsigned dest=0; dest<owner.numnodes; dest++)
                 {
-                    CTarget *target = targets.item(t);
-                    if (target->getSenderFinished())
+                    if (getSenderFinished(dest))
                     {
+                        CTarget *target = targets.item(dest);
                         Owned<CSendBucket> bucket = target->getBucketClear();
                         if (bucket)
                             decTotal(bucket->querySize());
@@ -711,15 +731,15 @@ protected:
                         unsigned maxSz=0;
                         if (queryInactiveWriters())
                         {
-                            for (unsigned i=0; i<owner.numnodes; i++)
+                            ForEachItemIn(t, targets)
                             {
-                                CSendBucket *bucket = targets.item(i)->queryBucket();
+                                CSendBucket *bucket = targets.item(t)->queryBucket();
                                 if (bucket)
                                 {
                                     size32_t bucketSz = bucket->querySize();
                                     if (bucketSz > maxSz)
                                         maxSz = bucketSz;
-                                    HDSendPrintLog4("b[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
+                                    HDSendPrintLog4("b[%d], rows=%d, size=%d", t, bucket->count(), bucketSz);
                                 }
                             }
                         }
@@ -733,9 +753,9 @@ protected:
                             candidates.clear();
                             bool doSelf = false;
                             unsigned inactiveWriters = queryInactiveWriters();
-                            for (unsigned i=0; i<owner.numnodes; i++)
+                            ForEachItemIn(t, targets)
                             {
-                                CTarget *target = targets.item(i);
+                                CTarget *target = targets.item(t);
                                 CSendBucket *bucket = target->queryBucket();
                                 if (bucket)
                                 {
@@ -744,12 +764,12 @@ protected:
                                     {
                                         if (0 == target->getActiveWriters()) // only if there are no active writer threads for this target
                                         {
-                                            if (i==self)
+                                            if (target->isSelf())
                                                 doSelf = true; // always send to self if candidate
                                             else
                                             {
-                                                candidates.append(i);
-                                                HDSendPrintLog4("c[%d], rows=%d, size=%d", i, bucket->count(), bucketSz);
+                                                candidates.append(target);
+                                                HDSendPrintLog4("c[%d], rows=%d, size=%d", t, bucket->count(), bucketSz);
                                                 /* NB: in theory could be more if some finished since checking, but that's okay
                                                  * some candidates, or free space will be picked up in next section
                                                  */
@@ -768,11 +788,10 @@ protected:
                                 else
                                 {
                                     unsigned pos = getRandom()%candidates.ordinality();
-                                    unsigned c = candidates.item(pos);
-                                    CTarget *target = targets.item(c);
+                                    CTarget *target = candidates.item(pos);
                                     CSendBucket *bucket = target->queryBucket();
                                     assertex(bucket);
-                                    HDSendPrintLog3("process exceeded: send to %d, size=%d", c, bucket->querySize());
+                                    HDSendPrintLog3("process exceeded: sending to %s, size=%u", target->queryInfo(), bucket->querySize());
                                     add(target->getBucketClear());
                                     if (limit)
                                     {
@@ -816,12 +835,20 @@ protected:
                     const void *row = input->ungroupedNextRow();
                     if (!row)
                         break;
-                    unsigned dest = owner.ihash->hash(row)%owner.numnodes;
-                    CTarget *target = targets.item(dest);
-                    if (target->getSenderFinished())
-                        ReleaseThorRow(row);
+
+                    CTarget *target = nullptr;
+                    if (owner.isAll)
+                        target = targets.item(0);
                     else
                     {
+                        unsigned dest = owner.ihash->hash(row)%owner.numnodes;
+                        if (getSenderFinished(dest))
+                            ReleaseThorRow(row);
+                        else
+                            target = targets.item(dest);
+                    }
+                    if (target)
+                    {
                         CSendBucket *bucket = target->queryBucketCreate();
                         size32_t rs = bucket->add(row);
                         totalSent++;
@@ -831,11 +858,12 @@ protected:
                         }
                         if (bucket->querySize() >= owner.bucketSendSize)
                         {
-                            HDSendPrintLog3("adding new bucket: %d, size = %d", bucket->queryDestination(), bucket->querySize());
+                            HDSendPrintLog3("adding new bucket: target=%s, size = %d", bucket->queryTarget()->queryInfo(), bucket->querySize());
                             add(target->getBucketClear());
                         }
                     }
-                    checkSendersFinished(); // clears out defunct target buckets if any have stopped
+                    if (!owner.isAll) // in the ALL case, the ALL CTarget must still send to any that have not finished until all are.
+                        checkSendersFinished(); // clears out defunct target buckets if any have stopped
                 }
             }
             catch (IException *e)
@@ -849,15 +877,16 @@ protected:
             if (!aborted)
             {
                 // send remainder
-                Owned<IShuffledIterator> iter = createShuffledIterator(owner.numnodes);
+                Owned<IShuffledIterator> iter = createShuffledIterator(targets.ordinality());
                 ForEach(*iter)
                 {
                     unsigned dest=iter->get();
-                    Owned<CSendBucket> bucket = targets.item(dest)->getBucketClear();
-                    HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
+                    CTarget *target = targets.item(dest);
+                    Owned<CSendBucket> bucket = target->getBucketClear();
+                    HDSendPrintLog3("Looking at last bucket: target=%s, size=%u", target->queryInfo(), bucket.get()?bucket->querySize():-1);
                     if (bucket && bucket->querySize())
                     {
-                        HDSendPrintLog3("Sending last bucket(s): %d, size = %d", bucket->queryDestination(), bucket->querySize());
+                        HDSendPrintLog3("Sending last bucket(s): target=%s, size=%u", target->queryInfo(), bucket->querySize());
                         add(bucket.getClear());
                     }
                 }
@@ -878,7 +907,7 @@ protected:
         }
         void markStopped(unsigned target)
         {
-            if (targets.item(target)->queryMarkSenderFinished())
+            if (queryMarkSenderFinished(target))
             {
                 atomic_inc(&numFinished);
                 atomic_inc(&stoppedTargets);
@@ -903,6 +932,7 @@ protected:
             return owner.fireException(e);
         }
         friend class CWriteHandler;
+        friend class CTarget;
     };
 
     IOutputRowDeserializer *deserializer;
@@ -950,10 +980,10 @@ protected:
         }
     } sendthread;
 
-    void addLocal(CSendBucket *bucket)
+    void addLocalClear(CSendBucket *bucket)
     {
         CriticalBlock block(putsect); // JCSMORE - probably doesn't need for this long
-        HDSendPrintLog3("addLocal (b=%d), size=%d", bucket->queryDestination(), bucket->querySize());
+        HDSendPrintLog3("addLocalClear (target=%s), size=%u", bucket->queryTarget()->queryInfo(), bucket->querySize());
         for (;;)
         {
             const void *row = bucket->nextRow();
@@ -962,6 +992,18 @@ protected:
             pipewr->putRow(row);
         }
     }
+    void addLocal(CSendBucket *bucket)
+    {
+        CriticalBlock block(putsect); // JCSMORE - probably doesn't need for this long
+        HDSendPrintLog3("addLocal (target=%s), size=%u", bucket->queryTarget()->queryInfo(), bucket->querySize());
+        for (unsigned r=0; r<bucket->count(); r++)
+        {
+            const void *row = bucket->get(r);
+            if (!row)
+                break;
+            pipewr->putRow(row);
+        }
+    }
     void ActPrintLog(const char *format, ...)  __attribute__((format(printf, 2, 3)))
     {
         StringBuffer msg;
@@ -1001,11 +1043,12 @@ protected:
 public:
     IMPLEMENT_IINTERFACE_USING(CInterface);
 
-    CDistributorBase(CActivityBase *_activity, bool _doDedup, IStopInput *_istop, const char *_id)
+    CDistributorBase(CActivityBase *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id)
         : activity(_activity), recvthread(this), sendthread(this), sender(*this), id(_id)
     {
         aborted = connected = false;
         doDedup = _doDedup;
+        isAll = _isall;
         self = activity->queryJobChannel().queryMyRank() - 1;
         numnodes = activity->queryJob().querySlaves();
         iCompare = NULL;
@@ -1370,6 +1413,47 @@ public:
 };
 
 
+void CDistributorBase::CTarget::send(CMessageBuffer &mb) // Not used for ALL
+{
+    dbgassertex(!self);
+    CriticalBlock b(crit); // protects against multiple senders to the same target
+    if (!owner.getSenderFinished(destination))
+        owner.sendBlock(destination, mb);
+}
+
+void CDistributorBase::CTarget::sendToOthers(CMessageBuffer &mb) // Only used by ALL
+{
+    CriticalBlock b(crit); // protects against multiple parallel sender threads sending to ALL clashing
+    for (unsigned dest=0; dest<owner.owner.numnodes; dest++)
+    {
+        if (dest != owner.self)
+        {
+            if (!owner.getSenderFinished(dest))
+                owner.sendBlock(dest, mb);
+        }
+    }
+}
+
+void CDistributorBase::CTarget::incActiveWriters()
+{
+    ++activeWriters;
+    ++owner.totalActiveWriters; // NB: incActiveWriters() is always called within a activeWritersLock crit
+}
+
+void CDistributorBase::CTarget::decActiveWriters()
+{
+    --activeWriters;
+    --owner.totalActiveWriters; // NB: decActiveWriters() is always called within a activeWritersLock crit
+}
+
+CDistributorBase::CSendBucket *CDistributorBase::CTarget::queryBucketCreate()
+{
+    if (!bucket)
+        bucket.setown(new CSendBucket(owner.owner, this));
+    return bucket;
+}
+
+
 
 // protocol is:
 // 1) 0 byte block              - indicates end of input - no ack required
@@ -1386,8 +1470,8 @@ class CRowDistributor: public CDistributorBase
     ICommunicator &comm;
     bool stopping;
 public:
-    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id)
-        : CDistributorBase(activity, doDedup, istop, id), comm(_comm), tag(_tag)
+    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id)
+        : CDistributorBase(activity, doDedup, isAll, istop, id), comm(_comm), tag(_tag)
     {
         stopping = false;
     }
@@ -1472,20 +1556,15 @@ Restart:
 #endif
             if (flag==0)
                 return false;           // other end stopped
+            size32_t preAppendAckLen = msg.length();
             flag = 0;                   // no ack
-            msg.append(flag);
-        }
-        if (flag==0)
-        {                  // no ack
+            msg.append(flag); // JCSMORE - not great that it's altering msg, but protocol demands it at the moment
             comm.send(msg, i+1, tag);
-            return true;
+            msg.setLength(preAppendAckLen);
         }
-        // this branch not yet used
-        assertex(false);
-        if (!sendRecv(comm, msg, i+1, tag))
-            return false;
-        msg.read(flag);             // whether stopped
-        return flag!=0;
+        else
+            comm.send(msg, i+1, tag);
+        return true;
     }
 
     virtual void stopRecv()
@@ -1653,7 +1732,7 @@ class CRowPullDistributor: public CDistributorBase
     }
 public:
     CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id)
-        : CDistributorBase(activity, doDedup, istop, id), comm(_comm), tag(_tag)
+        : CDistributorBase(activity, doDedup, false, istop, id), comm(_comm), tag(_tag)
     {
         pull = true;
         targetWriterLimit = 1; // >1 target writer can cause packets to be received out of order
@@ -1947,9 +2026,9 @@ public:
 //==================================================================================================
 
 
-IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id)
+IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id)
 {
-    return new CRowDistributor(activity, comm, tag, doDedup, istop, id);
+    return new CRowDistributor(activity, comm, tag, doDedup, isAll, istop, id);
 }
 
 IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL)
@@ -1984,6 +2063,7 @@ protected:
     ICompare *mergecmp = nullptr;     // if non-null is merge distribute
     bool eofin = false;
     bool setupDist = true;
+    bool isAll = false;
 public:
     HashDistributeSlaveBase(CGraphElementBase *_container)
         : CSlaveActivity(_container)
@@ -2008,7 +2088,7 @@ public:
         if (mergecmp)
             distributor = createPullHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, this);
         else
-            distributor = createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, this);
+            distributor = createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, isAll, this);
     }
     void stopInput()
     {
@@ -2098,6 +2178,22 @@ public:
 
 //===========================================================================
 
+class NWayDistributeSlaveActivity : public HashDistributeSlaveBase
+{
+    typedef HashDistributeSlaveBase PARENT;
+public:
+    NWayDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container)
+    {
+        IHThorNWayDistributeArg *distribargs = (IHThorNWayDistributeArg *)queryHelper();
+        if (!distribargs->isAll())
+            UNIMPLEMENTED;
+        isAll = true;
+        ihash = nullptr;
+    }
+};
+
+//===========================================================================
+
 class HashDistributeMergeSlaveActivity : public HashDistributeSlaveActivity
 {
     typedef HashDistributeSlaveActivity PARENT;
@@ -3589,7 +3685,7 @@ CBucketHandler *CBucketHandler::getNextBucketHandler(Owned<IRowStream> &nextInpu
         if (bucket->isSpilt() && !bucket->areRowsInBucketDeduped())
         {
             rowcount_t keyCount, count;
-            /* If each key and row stream were to use a unique allocator per destination bucket
+            /* If each key and row stream were to use a unique allocator per target bucket
              * thereby keeping rows/keys together in pages, it would make it easier to free pages on spill requests.
              * However, it would also mean a lot of allocators with at least one page per allocate, which ties up a lot of memory
              */
@@ -3661,7 +3757,7 @@ public:
     {
         HashDedupSlaveActivityBase::init(data, slaveData);
         mptag = container.queryJobChannel().deserializeMPTag(data);
-        distributor = createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, true, this);
+        distributor = createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, true, false, this);
     }
     virtual void start() override
     {
@@ -3774,7 +3870,7 @@ public:
         ICompare *icompareL = joinargs->queryCompareLeft();
         ICompare *icompareR = joinargs->queryCompareRight();
         if (!lhsDistributor)
-            lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, this, "LHS"));
+            lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag, false, false, this, "LHS"));
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), leftInputStream, ihashL, icompareL, nullptr);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, stableSort_earlyAlloc, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         loaderL->setTracingPrefix("Join left");
@@ -3786,7 +3882,7 @@ public:
         lhsDistributor->join();
         leftdone = true;
         if (!rhsDistributor)
-            rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag2, false, this, "RHS"));
+            rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), mptag2, false, false, this, "RHS"));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), rightInputStream, ihashR, icompareR, nullptr));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, stableSort_earlyAlloc, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         loaderL->setTracingPrefix("Join right");
@@ -4448,6 +4544,12 @@ CActivityBase *createHashDistributeSlave(CGraphElementBase *container)
     return new HashDistributeSlaveActivity(container);
 }
 
+CActivityBase *createNWayDistributeSlave(CGraphElementBase *container)
+{
+    ActPrintLog(container, "NWAYDISTRIB: createNWayDistributeSlave");
+    return new NWayDistributeSlaveActivity(container);
+}
+
 CActivityBase *createHashDistributeMergeSlave(CGraphElementBase *container)
 {
     ActPrintLog(container, "HASHDISTRIB: createHashDistributeMergeSlave");

+ 2 - 0
thorlcr/activities/hashdistrib/thhashdistribslave.ipp

@@ -39,6 +39,7 @@ IHashDistributor *createHashDistributor(
     ICommunicator &comm, 
     mptag_t tag, 
     bool dedup,
+    bool isAll,
     IStopInput *istop, const char *id=NULL); // id optional, used for tracing to identify which distributor if >1 in activity
 
 // IAggregateTable allows rows to be added and aggregated and retrieved via a IRowStream
@@ -54,6 +55,7 @@ IAggregateTable *createRowAggregator(CActivityBase &activity, IHThorHashAggregat
 IRowStream *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivityBase &activity, IHThorRowAggregator &helper, IHThorHashAggregateExtra &helperExtra, IRowStream *localAggTable, mptag_t mptag);
 
 activityslaves_decl CActivityBase *createHashDistributeSlave(CGraphElementBase *container);
+activityslaves_decl CActivityBase *createNWayDistributeSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createHashDistributeMergeSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createHashDedupSlave(CGraphElementBase *container);
 activityslaves_decl CActivityBase *createHashLocalDedupSlave(CGraphElementBase *container);

+ 2 - 2
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2675,8 +2675,8 @@ public:
             broadcast3MpTag = queryJobChannel().deserializeMPTag(data);
             lhsDistributeTag = queryJobChannel().deserializeMPTag(data);
             rhsDistributeTag = queryJobChannel().deserializeMPTag(data);
-            rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), rhsDistributeTag, false, NULL, "RHS"));
-            lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), lhsDistributeTag, false, NULL, "LHS"));
+            rhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), rhsDistributeTag, false, false, NULL, "RHS"));
+            lhsDistributor.setown(createHashDistributor(this, queryJobChannel().queryJobComm(), lhsDistributeTag, false, false, NULL, "LHS"));
         }
         if (isSmart())
         {

+ 1 - 0
thorlcr/graph/thgraph.cpp

@@ -821,6 +821,7 @@ bool isGlobalActivity(CGraphElementBase &container)
         case TAKkeyeddistribute:
         case TAKhashdistribute:
         case TAKhashdistributemerge:
+        case TAKnwaydistribute:
         case TAKworkunitwrite:
         case TAKdistribution:
         case TAKpartition:

+ 3 - 0
thorlcr/master/thactivitymaster.cpp

@@ -234,6 +234,9 @@ public:
             case TAKcountaggregate:
                 ret = createAggregateActivityMaster(this);
                 break;
+            case TAKnwaydistribute:
+                ret = createNWayDistributeActivityMaster(this);
+                break;
             case TAKhashdistribute:
             case TAKpartition:
                 ret = createHashDistributeActivityMaster(this);

+ 3 - 0
thorlcr/slave/slave.cpp

@@ -518,6 +518,9 @@ public:
             case TAKhashdistribute:
                 ret = createHashDistributeSlave(this);
                 break;
+            case TAKnwaydistribute:
+                ret = createNWayDistributeSlave(this);
+                break;
             case TAKdistributed:
                 ret = createHashDistributedSlave(this);
                 break;