Explorar o código

Merge remote-tracking branch 'origin/closedown-4.0.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman %!s(int64=12) %!d(string=hai) anos
pai
achega
0a626bc208

+ 7 - 7
ecl/ecl-bundle/ecl-bundle.cpp

@@ -107,7 +107,7 @@ unsigned doEclCommand(StringBuffer &output, const char *cmd, const char *input)
     {
     {
         Owned<IPipeProcess> pipe = createPipeProcess();
         Owned<IPipeProcess> pipe = createPipeProcess();
         VStringBuffer runcmd("eclcc  --nologfile --nostdinc %s", cmd);
         VStringBuffer runcmd("eclcc  --nologfile --nostdinc %s", cmd);
-        pipe->run("eclcc", runcmd, ".", input != NULL, true, true);
+        pipe->run("eclcc", runcmd, ".", input != NULL, true, true, 1024*1024);
         if (optVerbose)
         if (optVerbose)
         {
         {
             printf("Running %s\n", runcmd.str());
             printf("Running %s\n", runcmd.str());
@@ -119,23 +119,23 @@ unsigned doEclCommand(StringBuffer &output, const char *cmd, const char *input)
             pipe->write(strlen(input), input);
             pipe->write(strlen(input), input);
             pipe->closeInput();
             pipe->closeInput();
         }
         }
-        StringBuffer error;
         char buf[1024];
         char buf[1024];
         while (true)
         while (true)
         {
         {
-            size32_t read = pipe->readError(sizeof(buf), buf);
+            size32_t read = pipe->read(sizeof(buf), buf);
             if (!read)
             if (!read)
                 break;
                 break;
-            error.append(read, buf);
+            output.append(read, buf);
         }
         }
+        int ret = pipe->wait();
+        StringBuffer error;
         while (true)
         while (true)
         {
         {
-            size32_t read = pipe->read(sizeof(buf), buf);
+            size32_t read = pipe->readError(sizeof(buf), buf);
             if (!read)
             if (!read)
                 break;
                 break;
-            output.append(read, buf);
+            error.append(read, buf);
         }
         }
-        int ret = pipe->wait();
         if (optVerbose && (ret > 0 || error.length()))
         if (optVerbose && (ret > 0 || error.length()))
             printf("eclcc return code was %d, output to stderr:\n%s", ret, error.str());
             printf("eclcc return code was %d, output to stderr:\n%s", ret, error.str());
         return ret;
         return ret;

+ 19 - 20
esp/files/scripts/TargetSelectWidget.js

@@ -215,30 +215,29 @@ require([
 
 
         loadTargets: function () {
         loadTargets: function () {
             var context = this;
             var context = this;
-            WsTopology.TpTargetClusterQuery({
-                load: function (response) {
-                    if (lang.exists("TpTargetClusterQueryResponse.TpTargetClusters.TpTargetCluster", response)) {
-                        var targetData = response.TpTargetClusterQueryResponse.TpTargetClusters.TpTargetCluster;
-                        var has_hthor = false;
-                        for (var i = 0; i < targetData.length; ++i) {
-                            context.targetSelectControl.options.push({
-                                label: targetData[i].Name,
-                                value: targetData[i].Name
-                            });
-                            if (targetData[i].Name == "hthor") {
-                                has_hthor = true;
-                            }
+            WsTopology.TpLogicalClusterQuery({
+            }).then(function (response) {
+                if (lang.exists("TpLogicalClusterQueryResponse.TpLogicalClusters.TpLogicalCluster", response)) {
+                    var targetData = response.TpLogicalClusterQueryResponse.TpLogicalClusters.TpLogicalCluster;
+                    var has_hthor = false;
+                    for (var i = 0; i < targetData.length; ++i) {
+                        context.targetSelectControl.options.push({
+                            label: targetData[i].Name,
+                            value: targetData[i].Name
+                        });
+                        if (targetData[i].Name == "hthor") {
+                            has_hthor = true;
                         }
                         }
+                    }
 
 
-                        if (!context.includeBlank && context._value == "") {
-                            if (has_hthor) {
-                                context._value = "hthor";
-                            } else {
-                                context._value = context.targetSelectControl.options[0].value;
-                            }
+                    if (!context.includeBlank && context._value == "") {
+                        if (has_hthor) {
+                            context._value = "hthor";
+                        } else {
+                            context._value = context.targetSelectControl.options[0].value;
                         }
                         }
-                        context.resetDefaultSelection();
                     }
                     }
+                    context.resetDefaultSelection();
                 }
                 }
             });
             });
         }
         }

+ 6 - 3
esp/files/scripts/WsTopology.js

@@ -30,13 +30,16 @@ define([
             lang.mixin(params.request, {
             lang.mixin(params.request, {
                 Type: "ALLSERVICES"
                 Type: "ALLSERVICES"
             });
             });
-            ESPRequest.send("WsTopology", "TpServiceQuery", params);
+            return ESPRequest.send("WsTopology", "TpServiceQuery", params);
         },
         },
         TpTargetClusterQuery: function (params) {
         TpTargetClusterQuery: function (params) {
-            ESPRequest.send("WsTopology", "TpTargetClusterQuery", params);
+            return ESPRequest.send("WsTopology", "TpTargetClusterQuery", params);
         },
         },
         TpGroupQuery: function (params) {
         TpGroupQuery: function (params) {
-            ESPRequest.send("WsTopology", "TpGroupQuery", params);
+            return ESPRequest.send("WsTopology", "TpGroupQuery", params);
+        },
+        TpLogicalClusterQuery: function (params) {
+            return ESPRequest.send("WsTopology", "TpLogicalClusterQuery", params);
         }
         }
     };
     };
 });
 });

+ 14 - 32
initfiles/bash/etc/init.d/hpcc_common.in

@@ -301,7 +301,7 @@ configGenCmd() {
     else 
     else 
         echo $configcmd >> $logFile
         echo $configcmd >> $logFile
     fi
     fi
-    eval $configcmd >> $logFile 2>&1
+    su ${user} -c "$configcmd" >> $logFile 2>&1
 }
 }
 
 
 createRuntime() {
 createRuntime() {
@@ -361,38 +361,20 @@ createRuntime() {
     fi
     fi
 
 
     #change the permission for all component directory under var
     #change the permission for all component directory under var
-    echo `date` "start chown $pid/$compName" >> /var/log/HPCCSystems/temp_log
-    chown -cR $user:$group "$pid/$compName"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown $pid/$compName" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown $lock/$compName" >> /var/log/HPCCSystems/temp_log
-    chown -cR $user:$group "$lock/$compName"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown $lock/$compName" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown $log/$compName" >> /var/log/HPCCSystems/temp_log
-    chown -cR $user:$group "$log/$compName"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown $log/$compName" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown $compPath" >> /var/log/HPCCSystems/temp_log
-    chown -cR $user:$group "$compPath"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown $compPath" >> /var/log/HPCCSystems/temp_log
+    chown -c $user:$group "$pid/$compName"  1> /dev/null 2>/dev/null
+    chown -c $user:$group "$lock/$compName"  1> /dev/null 2>/dev/null
+    chown -c $user:$group "$log/$compName"  1> /dev/null 2>/dev/null
+    chown -c $user:$group "$compPath"  1> /dev/null 2>/dev/null
     dir.getByName data
     dir.getByName data
-    echo `date` "start chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
-    chown -cR $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
+    chown -c $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
     dir.getByName data2
     dir.getByName data2
-    chown -cR $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
+    chown -c $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
     dir.getByName data3
     dir.getByName data3
-    chown -cR $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
+    chown -c $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
     dir.getByName query
     dir.getByName query
-    chown -cR $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
-    echo `date` "start chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
+    chown -c $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
     dir.getByName mirror
     dir.getByName mirror
-    chown -cR $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
-    echo `date` "stop chown ${dir_return}" >> /var/log/HPCCSystems/temp_log
+    chown -c $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
 
 
     # setting up ulimit for thor and other component which needs it.
     # setting up ulimit for thor and other component which needs it.
     ulimit -n 32768
     ulimit -n 32768
@@ -473,7 +455,7 @@ startCmd() {
     if [ ! -d $lockPath ]; then
     if [ ! -d $lockPath ]; then
         mkdir -p $lockPath >> $logFile 2>&1
         mkdir -p $lockPath >> $logFile 2>&1
     fi
     fi
-    chown -cR $user:$user $lockPath >> /dev/null 2>&1
+    chown -c $user:$user $lockPath >> /dev/null 2>&1
     lock ${lock}/${compName}/${compName}.lock
     lock ${lock}/${compName}/${compName}.lock
 
 
     if [ $__lockCreated -eq 0 ]; then
     if [ $__lockCreated -eq 0 ]; then
@@ -618,12 +600,12 @@ start_component() {
 
 
     if [ ! -d $logDir ]; then
     if [ ! -d $logDir ]; then
         mkdir -p $logDir >> tmp.txt 2>&1
         mkdir -p $logDir >> tmp.txt 2>&1
-        chown -cR $user:$user $logDir >> /dev/null 2>&1
+        chown -c $user:$user $logDir >> /dev/null 2>&1
     fi
     fi
 
 
     if [ ! -f $logFile ]; then
     if [ ! -f $logFile ]; then
         touch $logFile >> tmp.txt 2>&1
         touch $logFile >> tmp.txt 2>&1
-        chown -cR $user:$user $logFile >> /dev/null 2>&1
+        chown -c $user:$user $logFile >> /dev/null 2>&1
     fi 
     fi 
 
 
     # Creating Runtime 
     # Creating Runtime 
@@ -733,7 +715,7 @@ create_dropzone() {
     # Creating DropZone directory
     # Creating DropZone directory
     if [ ! -d ${D} ]; then
     if [ ! -d ${D} ]; then
          mkdir -p $D > /dev/null 2>&1
          mkdir -p $D > /dev/null 2>&1
-         chown -cR $user:$user $D > /dev/null 2>&1
+         chown -c $user:$user $D > /dev/null 2>&1
          chmod 777 $D > /dev/null 2>&1
          chmod 777 $D > /dev/null 2>&1
     fi
     fi
     done
     done

+ 12 - 0
system/jlib/jthread.cpp

@@ -1696,6 +1696,16 @@ class CLinuxPipeProcess: public CInterface, implements IPipeProcess
                 }
                 }
 
 
             }
             }
+            if (hError!=(HANDLE)-1) { // hmm who did that
+                fcntl(hError,F_SETFL,0); // read any remaining data in blocking mode
+                while (bufsize<buf.length()) {
+                    size32_t sizeRead = (size32_t)::read(hError, (byte *)buf.bufferBase()+bufsize, buf.length()-bufsize);
+                    if ((int)sizeRead>0)
+                        bufsize += sizeRead;
+                    else
+                        break;
+                }
+            }
             return 0;
             return 0;
         }
         }
         void stop() 
         void stop() 
@@ -2033,6 +2043,8 @@ public:
     unsigned wait()
     unsigned wait()
     {
     {
         CriticalBlock block(sect); 
         CriticalBlock block(sect); 
+        if (stderrbufferthread)
+            stderrbufferthread->stop();
         if (forkthread) {
         if (forkthread) {
             {
             {
                 CriticalUnblock unblock(sect);
                 CriticalUnblock unblock(sect);

+ 11 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -174,12 +174,17 @@ public:
     {
     {
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         fposHash = new CFPosHandler(*iFetchHandler, offsetCount, offsetTable);
         keyIn.set(_keyIn);
         keyIn.set(_keyIn);
-        distributor = createHashDistributor(&owner, owner.queryContainer().queryJob().queryJobComm(), tag, abortSoon, false, this);
+        distributor = createHashDistributor(&owner, owner.queryContainer().queryJob().queryJobComm(), tag, false, this);
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
         keyOutStream.setown(distributor->connect(keyRowIf, keyIn, fposHash, NULL));
     }
     }
     virtual IRowStream *queryOutput() { return this; }
     virtual IRowStream *queryOutput() { return this; }
     virtual IFileIO *queryPartIO(unsigned part) { assertex(part<files); return fPosMultiPartTable[part].file->queryFileIO(); }
     virtual IFileIO *queryPartIO(unsigned part) { assertex(part<files); return fPosMultiPartTable[part].file->queryFileIO(); }
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) { return getPartFilename(parts.item(part), fPosMultiPartTable[part].location, out, true); }
+    virtual void abort()
+    {
+        if (distributor)
+            distributor->abort();
+    }
 
 
     // IStopInput
     // IStopInput
     virtual void stopInput()
     virtual void stopInput()
@@ -459,6 +464,11 @@ public:
         fetchStreamOut->stop();
         fetchStreamOut->stop();
         dataLinkStop();
         dataLinkStop();
     }
     }
+    virtual void abort()
+    {
+        if (fetchStream)
+            fetchStream->abort();
+    }
     CATCH_NEXTROW()
     CATCH_NEXTROW()
     {
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);
         ActivityTimer t(totalCycles, timeActivities, NULL);

+ 1 - 0
thorlcr/activities/fetch/thfetchslave.ipp

@@ -36,6 +36,7 @@ interface IFetchStream : extends IInterface
     virtual IRowStream *queryOutput() = 0;
     virtual IRowStream *queryOutput() = 0;
     virtual IFileIO *queryPartIO(unsigned part) = 0;
     virtual IFileIO *queryPartIO(unsigned part) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
     virtual StringBuffer &getPartName(unsigned part, StringBuffer &out) = 0;
+    virtual void abort() = 0;
 };
 };
 
 
 IFetchStream *createFetchStream(CSlaveActivity &owner, IRowInterfaces *keyRowIf, IRowInterfaces *fetchRowIf, bool &abortSoon, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);
 IFetchStream *createFetchStream(CSlaveActivity &owner, IRowInterfaces *keyRowIf, IRowInterfaces *fetchRowIf, bool &abortSoon, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);

+ 121 - 40
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -82,7 +82,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
     IEngineRowAllocator *allocator;
     IEngineRowAllocator *allocator;
     IOutputRowSerializer *serializer;
     IOutputRowSerializer *serializer;
     IOutputMetaData *meta;
     IOutputMetaData *meta;
-    const bool &abort;
     IHash *ihash;
     IHash *ihash;
     Owned<IRowStream> input;
     Owned<IRowStream> input;
 
 
@@ -204,18 +203,21 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             CDistributorBase &distributor;
             CDistributorBase &distributor;
             Owned<CSendBucket> _sendBucket;
             Owned<CSendBucket> _sendBucket;
             unsigned nextPending;
             unsigned nextPending;
+            bool aborted;
 
 
         public:
         public:
             IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
             IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
 
             CWriteHandler(CSender &_owner) : owner(_owner), distributor(_owner.owner)
             CWriteHandler(CSender &_owner) : owner(_owner), distributor(_owner.owner)
             {
             {
+                aborted = false;
             }
             }
             void init(void *startInfo)
             void init(void *startInfo)
             {
             {
                 nextPending = getRandom()%distributor.numnodes;
                 nextPending = getRandom()%distributor.numnodes;
                 _sendBucket.setown((CSendBucket *)startInfo);
                 _sendBucket.setown((CSendBucket *)startInfo);
                 owner.setActiveWriter(_sendBucket->queryDestination(), this);
                 owner.setActiveWriter(_sendBucket->queryDestination(), this);
+                aborted = false;
             }
             }
             void main()
             void main()
             {
             {
@@ -224,7 +226,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                 size32_t writerTotalSz = 0;
                 size32_t writerTotalSz = 0;
                 size32_t sendSz = 0;
                 size32_t sendSz = 0;
                 MemoryBuffer mb;
                 MemoryBuffer mb;
-                loop
+                while (!aborted)
                 {
                 {
                     writerTotalSz += sendBucket->querySize();
                     writerTotalSz += sendBucket->querySize();
                     owner.dedup(sendBucket); // conditional
                     owner.dedup(sendBucket); // conditional
@@ -250,7 +252,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                                 continue; // NB: it will flow into else "remote" arm
                                 continue; // NB: it will flow into else "remote" arm
                             }
                             }
                         }
                         }
-                        loop
+                        while (!aborted)
                         {
                         {
                             // JCSMORE check if worth compressing
                             // JCSMORE check if worth compressing
                             CMessageBuffer msg;
                             CMessageBuffer msg;
@@ -279,6 +281,10 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             }
             }
             bool canReuse() { return true; }
             bool canReuse() { return true; }
             bool stop() { return true; }
             bool stop() { return true; }
+            void abort()
+            {
+                aborted = true;
+            }
         } **activeWriters;
         } **activeWriters;
 
 
         CDistributorBase &owner;
         CDistributorBase &owner;
@@ -588,10 +594,13 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             rowcount_t totalSent = 0;
             rowcount_t totalSent = 0;
             try
             try
             {
             {
-                do
+                while (!aborted && numFinished < owner.numnodes)
                 {
                 {
                     while (queryTotalSz() >= owner.inputBufferSize)
                     while (queryTotalSz() >= owner.inputBufferSize)
                     {
                     {
+                        if (aborted)
+                            break;
+
                         HDSendPrintLog("process exceeded inputBufferSize");
                         HDSendPrintLog("process exceeded inputBufferSize");
                         bool doSelf;
                         bool doSelf;
                         unsigned which = getSendCandidate(doSelf);
                         unsigned which = getSendCandidate(doSelf);
@@ -623,6 +632,8 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
 
 
                                 if (senderFullSem.wait(10000))
                                 if (senderFullSem.wait(10000))
                                     break;
                                     break;
+                                if (aborted)
+                                    break;
                             }
                             }
                         }
                         }
                     }
                     }
@@ -651,7 +662,6 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                         }
                         }
                     }
                     }
                 }
                 }
-                while (numFinished < owner.numnodes);
             }
             }
             catch (IException *e)
             catch (IException *e)
             {
             {
@@ -684,6 +694,23 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
 
 
             ActPrintLog(owner.activity, "HDIST: Send loop %s %"RCPF"d rows sent", exception.get()?"aborted":"finished", totalSent);
             ActPrintLog(owner.activity, "HDIST: Send loop %s %"RCPF"d rows sent", exception.get()?"aborted":"finished", totalSent);
         }
         }
+        void abort()
+        {
+            if (aborted)
+                return;
+            aborted = true;
+            senderFullSem.signal();
+            if (initialized)
+            {
+                CriticalBlock b(activeWritersLock);
+                for (unsigned w=0; w<owner.numnodes; w++)
+                {
+                    CWriteHandler *writer = activeWriters[w];
+                    if (writer)
+                        writer->abort();
+                }
+            }
+        }
     // IThreadFactory impl.
     // IThreadFactory impl.
         virtual IPooledThread *createNew()
         virtual IPooledThread *createNew()
         {
         {
@@ -695,9 +722,9 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             ActPrintLog(owner.activity, e, "HDIST: CSender");
             ActPrintLog(owner.activity, e, "HDIST: CSender");
             if (!aborted)
             if (!aborted)
             {
             {
+                abort();
                 exception.set(e);
                 exception.set(e);
-                aborted = true;
-                senderFullSem.signal(); // send regardless, because senderFull could be about to be set.
+                senderFullSem.signal();
             }
             }
             return owner.fireException(e);
             return owner.fireException(e);
         }
         }
@@ -769,15 +796,15 @@ protected:
     unsigned self;
     unsigned self;
     unsigned numnodes;
     unsigned numnodes;
     CriticalSection putsect;
     CriticalSection putsect;
-    bool pull;
+    bool pull, aborted;
     CSender sender;
     CSender sender;
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
 
-    CDistributorBase(CActivityBase *_activity, const bool &_abort,bool _doDedup, IStopInput *_istop)
-        : activity(_activity), abort(_abort), recvthread(this), sendthread(this), sender(*this)
+    CDistributorBase(CActivityBase *_activity, bool _doDedup, IStopInput *_istop)
+        : activity(_activity), recvthread(this), sendthread(this), sender(*this)
     {
     {
-        connected = false;
+        aborted = connected = false;
         doDedup = _doDedup;
         doDedup = _doDedup;
         self = activity->queryJob().queryMyRank() - 1;
         self = activity->queryJob().queryMyRank() - 1;
         numnodes = activity->queryJob().querySlaves();
         numnodes = activity->queryJob().querySlaves();
@@ -857,6 +884,7 @@ public:
         pipewr.set(piperd->queryWriter());
         pipewr.set(piperd->queryWriter());
         connected = true;
         connected = true;
         selfstopped = false;
         selfstopped = false;
+        aborted = false;
 
 
         sendException.clear();
         sendException.clear();
         recvException.clear();
         recvException.clear();
@@ -897,7 +925,14 @@ public:
         ihash = NULL;
         ihash = NULL;
         iCompare = NULL;
         iCompare = NULL;
     }
     }
-
+    virtual void abort()
+    {
+        if (!aborted)
+        {
+            aborted = true;
+            sender.abort();
+        }
+    }
     virtual void recvloop()
     virtual void recvloop()
     {
     {
         CCycleTimer timer;
         CCycleTimer timer;
@@ -910,7 +945,7 @@ public:
             CThorStreamDeserializerSource rowSource;
             CThorStreamDeserializerSource rowSource;
             rowSource.setStream(stream);
             rowSource.setStream(stream);
             unsigned left=numnodes-1;
             unsigned left=numnodes-1;
-            while (left)
+            while (left && !aborted)
             {
             {
 #ifdef _FULL_TRACE
 #ifdef _FULL_TRACE
                 ActPrintLog("HDIST: Receiving block");
                 ActPrintLog("HDIST: Receiving block");
@@ -934,7 +969,7 @@ public:
                     }
                     }
                     {
                     {
                         CriticalBlock block(putsect);
                         CriticalBlock block(putsect);
-                        while (!rowSource.eos())
+                        while (!rowSource.eos() && !aborted)
                         {
                         {
                             timer.reset();
                             timer.reset();
                             RtlDynamicRowBuilder rowBuilder(allocator);
                             RtlDynamicRowBuilder rowBuilder(allocator);
@@ -1048,11 +1083,23 @@ public:
     void setRecvExc(IException *e)
     void setRecvExc(IException *e)
     {
     {
         ActPrintLog(activity, e, "HDIST: recvloop");
         ActPrintLog(activity, e, "HDIST: recvloop");
+        abort();
         if (recvException.get())
         if (recvException.get())
             e->Release();
             e->Release();
         else
         else
             recvException.setown(e);
             recvException.setown(e);
     }
     }
+    bool sendRecv(ICommunicator &comm, CMessageBuffer &mb, rank_t r, mptag_t tag)
+    {
+        loop
+        {
+            if (aborted)
+                return false;
+            if (comm.sendRecv(mb, r, tag, MEDIUMTIMEOUT))
+                return true;
+            // try again
+        }
+    }
     virtual unsigned recvBlock(CMessageBuffer &mb,unsigned i=(unsigned)-1) = 0;
     virtual unsigned recvBlock(CMessageBuffer &mb,unsigned i=(unsigned)-1) = 0;
     virtual void stopRecv() = 0;
     virtual void stopRecv() = 0;
     virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;
     virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;
@@ -1086,8 +1133,8 @@ public:
 
 
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
 
-    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, const bool &abort, bool doDedup, IStopInput *istop)
-        : CDistributorBase(activity, abort, doDedup, istop), comm(_comm), tag(_tag)
+    CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop)
+        : CDistributorBase(activity, doDedup, istop), comm(_comm), tag(_tag)
     {
     {
         stopping = false;
         stopping = false;
     }
     }
@@ -1164,7 +1211,8 @@ Restart:
             ActPrintLog(activity, "HDIST MP sending RTS to %d",i+1);
             ActPrintLog(activity, "HDIST MP sending RTS to %d",i+1);
 #endif
 #endif
 
 
-            comm.sendRecv(rts, i+1, tag);
+            if (!sendRecv(comm, rts, i+1, tag))
+                return false;
             rts.read(flag);
             rts.read(flag);
 #ifdef _FULL_TRACE
 #ifdef _FULL_TRACE
             ActPrintLog(activity, "HDIST MP got CTS from %d, %d",i+1,(int)flag);
             ActPrintLog(activity, "HDIST MP got CTS from %d, %d",i+1,(int)flag);
@@ -1181,7 +1229,8 @@ Restart:
         }
         }
         // this branch not yet used
         // this branch not yet used
         assertex(false);
         assertex(false);
-        comm.sendRecv(msg, i+1, tag);
+        if (!sendRecv(comm, msg, i+1, tag))
+            return false;
         msg.read(flag);             // whether stopped
         msg.read(flag);             // whether stopped
         return flag!=0;
         return flag!=0;
     }
     }
@@ -1198,6 +1247,12 @@ Restart:
     {
     {
         stopping = false;
         stopping = false;
     }
     }
+    virtual void abort()
+    {
+        CDistributorBase::abort();
+        stopRecv();
+        comm.cancel(RANK_ALL, tag);
+    }
 };
 };
 
 
 class CRowPullDistributor: public CDistributorBase
 class CRowPullDistributor: public CDistributorBase
@@ -1338,8 +1393,8 @@ class CRowPullDistributor: public CDistributorBase
         selfdone.reinit();
         selfdone.reinit();
     }
     }
 public:
 public:
-    CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, const bool &abort, bool doDedup, IStopInput *istop)
-        : CDistributorBase(activity, abort, doDedup, istop), comm(_comm), tag(_tag)
+    CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop)
+        : CDistributorBase(activity, doDedup, istop), comm(_comm), tag(_tag)
     {
     {
         pull = true;
         pull = true;
         tag = _tag;
         tag = _tag;
@@ -1385,7 +1440,7 @@ public:
         {
         {
             Owned<cSortedDistributeMerger> merger = new cSortedDistributeMerger(*this, numnodes, queryCompare(), queryAllocator(), deserializer);
             Owned<cSortedDistributeMerger> merger = new cSortedDistributeMerger(*this, numnodes, queryCompare(), queryAllocator(), deserializer);
             ActPrintLog(activity, "Read loop start");
             ActPrintLog(activity, "Read loop start");
-            loop
+            while (!aborted)
             {
             {
                 const void *row = merger->merged().nextRow();
                 const void *row = merger->merged().nextRow();
                 if (!row)
                 if (!row)
@@ -1465,11 +1520,13 @@ public:
         {
         {
             msg.clear();
             msg.clear();
             selfready.wait();
             selfready.wait();
+            if (aborted)
+                return (unsigned)-1;
         }
         }
         else
         else
         {
         {
             msg.clear().append((byte)1); // rts
             msg.clear().append((byte)1); // rts
-            if (!comm.sendRecv(msg, i+1, tag))
+            if (!sendRecv(comm, msg, i+1, tag))
             {
             {
                 return i;
                 return i;
             }
             }
@@ -1614,6 +1671,11 @@ public:
         stopping = true;
         stopping = true;
         selfready.signal();
         selfready.signal();
     }
     }
+    virtual void abort()
+    {
+        CDistributorBase::abort();
+        comm.cancel(RANK_ALL, tag);
+    }
 };
 };
 
 
 //==================================================================================================
 //==================================================================================================
@@ -1621,14 +1683,14 @@ public:
 //==================================================================================================
 //==================================================================================================
 
 
 
 
-IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, const bool &abort,bool doDedup,IStopInput *istop)
+IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop)
 {
 {
-    return new CRowDistributor(activity, comm, tag, abort, doDedup, istop);
+    return new CRowDistributor(activity, comm, tag, doDedup, istop);
 }
 }
 
 
-IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, const bool &abort,bool doDedup,IStopInput *istop)
+IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop)
 {
 {
-    return new CRowPullDistributor(activity, comm, tag,  abort, doDedup, istop);
+    return new CRowPullDistributor(activity, comm, tag, doDedup, istop);
 }
 }
 
 
 
 
@@ -1687,9 +1749,9 @@ public:
         ActPrintLog("HASHDISTRIB: %sinit tag %d",mergecmp?"merge, ":"",(int)mptag);
         ActPrintLog("HASHDISTRIB: %sinit tag %d",mergecmp?"merge, ":"",(int)mptag);
 
 
         if (mergecmp)
         if (mergecmp)
-            distributor = createPullHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon, false, this);
+            distributor = createPullHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this);
         else
         else
-            distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon, false, this);
+            distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this);
         inputstopped = true;
         inputstopped = true;
     }
     }
     void stopInput()
     void stopInput()
@@ -1751,7 +1813,12 @@ public:
         ActPrintLog("HASHDISTRIB: kill");
         ActPrintLog("HASHDISTRIB: kill");
         CSlaveActivity::kill();
         CSlaveActivity::kill();
     }
     }
-
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (distributor)
+            distributor->abort();
+    }
     CATCH_NEXTROW()
     CATCH_NEXTROW()
     {
     {
         ActivityTimer t(totalCycles, timeActivities, NULL); // careful not to call again in derivatives
         ActivityTimer t(totalCycles, timeActivities, NULL); // careful not to call again in derivatives
@@ -3146,7 +3213,6 @@ public:
         distributor = NULL;
         distributor = NULL;
         mptag = TAG_NULL;
         mptag = TAG_NULL;
     }
     }
-
     ~GlobalHashDedupSlaveActivity()
     ~GlobalHashDedupSlaveActivity()
     {
     {
         instrm.clear();
         instrm.clear();
@@ -3157,20 +3223,17 @@ public:
             distributor->Release();
             distributor->Release();
         }
         }
     }
     }
-
     void stopInput()
     void stopInput()
     {
     {
         CriticalBlock block(stopsect);  // can be called async by distribute
         CriticalBlock block(stopsect);  // can be called async by distribute
         HashDedupSlaveActivityBase::stopInput();
         HashDedupSlaveActivityBase::stopInput();
     }
     }
-
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {
     {
         HashDedupSlaveActivityBase::init(data, slaveData);
         HashDedupSlaveActivityBase::init(data, slaveData);
         mptag = container.queryJob().deserializeMPTag(data);
         mptag = container.queryJob().deserializeMPTag(data);
-        distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,true, this);
+        distributor = createHashDistributor(this, container.queryJob().queryJobComm(), mptag, true, this);
     }
     }
-
     void start()
     void start()
     {
     {
         HashDedupSlaveActivityBase::start();
         HashDedupSlaveActivityBase::start();
@@ -3179,7 +3242,6 @@ public:
         instrm.setown(distributor->connect(myRowIf, input, iHash, iCompare));
         instrm.setown(distributor->connect(myRowIf, input, iHash, iCompare));
         input = instrm.get();
         input = instrm.get();
     }
     }
-
     void stop()
     void stop()
     {
     {
         ActPrintLog("stopping");
         ActPrintLog("stopping");
@@ -3193,7 +3255,12 @@ public:
         stopInput();
         stopInput();
         dataLinkStop();
         dataLinkStop();
     }
     }
-
+    void abort()
+    {
+        HashDedupSlaveActivityBase::abort();
+        if (distributor)
+            distributor->abort();
+    }
     void getMetaInfo(ThorDataLinkMetaInfo &info)
     void getMetaInfo(ThorDataLinkMetaInfo &info)
     {
     {
         initMetaInfo(info);
         initMetaInfo(info);
@@ -3270,7 +3337,7 @@ public:
         ICompare *icompareL = joinargs->queryCompareLeft();
         ICompare *icompareL = joinargs->queryCompareLeft();
         ICompare *icompareR = joinargs->queryCompareRight();
         ICompare *icompareR = joinargs->queryCompareRight();
         if (!lhsDistributor)
         if (!lhsDistributor)
-            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, abortSoon,false, this));
+            lhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag, false, this));
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
         Owned<IRowStream> reader = lhsDistributor->connect(queryRowInterfaces(inL), inL, ihashL, icompareL);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, true, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         Owned<IThorRowLoader> loaderL = createThorRowLoader(*this, ::queryRowInterfaces(inL), icompareL, true, rc_allDisk, SPILL_PRIORITY_HASHJOIN);
         strmL.setown(loaderL->load(reader, abortSoon));
         strmL.setown(loaderL->load(reader, abortSoon));
@@ -3281,7 +3348,7 @@ public:
         lhsDistributor->join();
         lhsDistributor->join();
         leftdone = true;
         leftdone = true;
         if (!rhsDistributor)
         if (!rhsDistributor)
-            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, abortSoon,false, this));
+            rhsDistributor.setown(createHashDistributor(this, container.queryJob().queryJobComm(), mptag2, false, this));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
         reader.setown(rhsDistributor->connect(queryRowInterfaces(inR), inR, ihashR, icompareR));
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, true, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         Owned<IThorRowLoader> loaderR = createThorRowLoader(*this, ::queryRowInterfaces(inR), icompareR, true, rc_mixed, SPILL_PRIORITY_HASHJOIN);;
         strmR.setown(loaderR->load(reader, abortSoon));
         strmR.setown(loaderR->load(reader, abortSoon));
@@ -3354,6 +3421,14 @@ public:
         ActPrintLog("HASHJOIN: kill");
         ActPrintLog("HASHJOIN: kill");
         CSlaveActivity::kill();
         CSlaveActivity::kill();
     }
     }
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (lhsDistributor)
+            lhsDistributor->abort();
+        if (rhsDistributor)
+            rhsDistributor->abort();
+    }
     CATCH_NEXTROW()
     CATCH_NEXTROW()
     {
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);
         ActivityTimer t(totalCycles, timeActivities, NULL);
@@ -3454,7 +3529,7 @@ CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivi
             }
             }
         } nodeCompare(helperExtra.queryHashElement());
         } nodeCompare(helperExtra.queryHashElement());
         if (!distributor)
         if (!distributor)
-            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, activity.queryAbortSoon(), false, NULL));
+            distributor.setown(createPullHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL));
         strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare));
         strm.setown(distributor->connect(nodeRowMetaRowIf, localAggregatedStream, &nodeCompare, &nodeCompare));
         loop
         loop
         {
         {
@@ -3488,7 +3563,7 @@ CThorRowAggregator *mergeLocalAggs(Owned<IHashDistributor> &distributor, CActivi
         };
         };
         Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(localAggTable);
         Owned<IRowStream> localAggregatedStream = new CRowAggregatedStream(localAggTable);
         if (!distributor)
         if (!distributor)
-            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, activity.queryAbortSoon(), false, NULL));
+            distributor.setown(createHashDistributor(&activity, activity.queryContainer().queryJob().queryJobComm(), mptag, false, NULL));
         Owned<IRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
         Owned<IRowInterfaces> rowIf = activity.getRowInterfaces(); // create new rowIF / avoid using activities IRowInterface, otherwise suffer from circular link
         strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL));
         strm.setown(distributor->connect(rowIf, localAggregatedStream, helperExtra.queryHashElement(), NULL));
         loop
         loop
@@ -3588,6 +3663,12 @@ public:
         stopInput(input);
         stopInput(input);
         dataLinkStop();
         dataLinkStop();
     }
     }
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (distributor)
+            distributor->abort();
+    }
     CATCH_NEXTROW()
     CATCH_NEXTROW()
     {
     {
         ActivityTimer t(totalCycles, timeActivities, NULL);
         ActivityTimer t(totalCycles, timeActivities, NULL);

+ 2 - 6
thorlcr/activities/hashdistrib/thhashdistribslave.ipp

@@ -23,17 +23,14 @@
 #include "slave.ipp"
 #include "slave.ipp"
 #include "thactivityutil.ipp"
 #include "thactivityutil.ipp"
 
 
-interface IRowStreamWithMetaData: extends IRowStream
-{   // currently fixed size data only
-    virtual bool nextRow(const void *&row,void *meta)=0;
-};
 
 
-interface IHashDistributor: extends IInterface
+interface IHashDistributor : extends IInterface
 {
 {
     virtual IRowStream *connect(IRowInterfaces *rowIf, IRowStream *in, IHash *ihash, ICompare *icompare)=0;
     virtual IRowStream *connect(IRowInterfaces *rowIf, IRowStream *in, IHash *ihash, ICompare *icompare)=0;
     virtual void disconnect(bool stop)=0;
     virtual void disconnect(bool stop)=0;
     virtual void join()=0;
     virtual void join()=0;
     virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
     virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
+    virtual void abort()=0;
 };
 };
 
 
 interface IStopInput;
 interface IStopInput;
@@ -41,7 +38,6 @@ IHashDistributor *createHashDistributor(
     CActivityBase *activity,
     CActivityBase *activity,
     ICommunicator &comm, 
     ICommunicator &comm, 
     mptag_t tag, 
     mptag_t tag, 
-    const bool &abort,
     bool dedup,
     bool dedup,
     IStopInput *istop);
     IStopInput *istop);
 
 

+ 1 - 2
thorlcr/activities/loop/thloop.cpp

@@ -61,7 +61,6 @@ protected:
             if (0 == slaveEmptyIterations) // either 1st or has been reset, i.e. non-empty
             if (0 == slaveEmptyIterations) // either 1st or has been reset, i.e. non-empty
                 allEmptyIterations = false;
                 allEmptyIterations = false;
         }
         }
-        assertex(loopEnds==0 || loopEnds==nodes); // Not sure possible in global graph, for some to finish and not others 
         bool final = loopEnds == nodes; // final
         bool final = loopEnds == nodes; // final
         msg.clear();
         msg.clear();
         if (allEmptyIterations)
         if (allEmptyIterations)
@@ -69,7 +68,7 @@ protected:
         else
         else
             emptyIterations = 0;
             emptyIterations = 0;
         bool ok = emptyIterations <= maxEmptyLoopIterations;
         bool ok = emptyIterations <= maxEmptyLoopIterations;
-        msg.append(ok);
+        msg.append(ok && !final); // This is to tell slave whether it should continue or not
         n = nodes;
         n = nodes;
         while (n--) // a barrier really
         while (n--) // a barrier really
             container.queryJob().queryJobComm().send(msg, n+1, mpTag, LONGTIMEOUT);
             container.queryJob().queryJobComm().send(msg, n+1, mpTag, LONGTIMEOUT);

+ 22 - 18
thorlcr/activities/loop/thloopslave.cpp

@@ -309,14 +309,7 @@ public:
                     {
                     {
                         ret.setown(curInput->nextRow()); // more cope with groups somehow....
                         ret.setown(curInput->nextRow()); // more cope with groups somehow....
                         if (!ret)
                         if (!ret)
-                        {
-                            if (finishedLooping)
-                            {
-                                eof = true;
-                                return NULL;
-                            }
                             break;
                             break;
-                        }
                     }
                     }
 
 
                     if (finishedLooping || 
                     if (finishedLooping || 
@@ -335,15 +328,14 @@ public:
                 {
                 {
                 case TAKloopdataset:
                 case TAKloopdataset:
                     assertex(flags & IHThorLoopArg::LFnewloopagain);
                     assertex(flags & IHThorLoopArg::LFnewloopagain);
+                    // NB: finishedLooping set at end of loop, based on loopAgain result
                     break;
                     break;
                 case TAKlooprow:
                 case TAKlooprow:
                     if (0 == loopPendingCount)
                     if (0 == loopPendingCount)
-                    {
-                        sendEndLooping();
-                        finishedLooping = true;
-                        eof = true;
-                        return NULL;
-                    }
+                        finishedLooping = true; // This slave has finished
+                    break;
+                case TAKloopcount:
+                    // NB: finishedLooping set at end of loop, so that last getNextRow() iteration spits out final rows
                     break;
                     break;
                 }
                 }
 
 
@@ -364,8 +356,22 @@ public:
                     }
                     }
                 }
                 }
 
 
-                if (!sendLoopingCount(loopCounter, emptyIterations)) // only if global
+                if (global)
+                {
+                    // 0 signals this slave has finished, but don't stop until all have
+                    if (!sendLoopingCount(finishedLooping ? 0 : loopCounter, finishedLooping ? 0 : emptyIterations))
+                    {
+                        sentEndLooping = true; // prevent sendEndLooping() sending end again
+                        eof = true;
+                        return NULL;
+                    }
+                }
+                else if (finishedLooping)
+                {
+                    eof = true;
                     return NULL;
                     return NULL;
+                }
+
                 loopPending->flush();
                 loopPending->flush();
 
 
                 IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
                 IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
@@ -401,15 +407,13 @@ public:
                     assertex(row);
                     assertex(row);
                     //Result is a row which contains a single boolean field.
                     //Result is a row which contains a single boolean field.
                     if (!((const bool *)row.get())[0])
                     if (!((const bool *)row.get())[0])
-                        finishedLooping = true;
+                        finishedLooping = true; // NB: will finish when loopPending has been consumed
                 }
                 }
                 loopPending.setown(createOverflowableBuffer(*this, this, false, true));
                 loopPending.setown(createOverflowableBuffer(*this, this, false, true));
                 loopPendingCount = 0;
                 loopPendingCount = 0;
                 ++loopCounter;
                 ++loopCounter;
                 if ((container.getKind() == TAKloopcount) && (loopCounter > maxIterations))
                 if ((container.getKind() == TAKloopcount) && (loopCounter > maxIterations))
-                    finishedLooping = true;
-                if (finishedLooping)
-                    sendEndLooping();
+                    finishedLooping = true; // NB: will finish when loopPending has been consumed
             }
             }
         }
         }
         return NULL;
         return NULL;

+ 1 - 1
thorlcr/activities/msort/thsortu.cpp

@@ -1817,7 +1817,7 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase
                     break;
                     break;
                 }
                 }
                 try {
                 try {
-                    parent->doMatch(*work,outqueue);
+                    parent->doMatch(*work,parent->outqueue);
                     delete work;
                     delete work;
                 }
                 }
                 catch (IException *e) {
                 catch (IException *e) {

+ 2 - 1
thorlcr/graph/thgraph.cpp

@@ -1276,7 +1276,8 @@ void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract,
         start();
         start();
         if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
         if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
             GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
             GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
-        graphDone = true;
+        else
+            graphDone = true;
     }
     }
     catch (IException *e)
     catch (IException *e)
     {
     {

+ 1 - 1
thorlcr/graph/thgraphslave.cpp

@@ -632,7 +632,7 @@ void CSlaveGraph::done()
         progressActive = false;
         progressActive = false;
         progressToCollect = true; // NB: ensure collected after end of graph
         progressToCollect = true; // NB: ensure collected after end of graph
     }
     }
-    if (!aborted && (!queryOwner() || isGlobal()))
+    if (!aborted && graphDone && (!queryOwner() || isGlobal()))
         getDoneSem.wait(); // must wait on master
         getDoneSem.wait(); // must wait on master
     if (!queryOwner())
     if (!queryOwner())
     {
     {