浏览代码

Merge remote-tracking branch 'origin/candidate-3.10.2' into candidate-3.10.x

Conflicts:
	version.cmake

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 年之前
父节点
当前提交
acecc4a180

+ 1 - 0
cmake_modules/dependencies/quantal.cmake

@@ -0,0 +1 @@
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.49.0, libicu48, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive12, rsync")

+ 25 - 17
dali/base/dadfs.cpp

@@ -671,11 +671,13 @@ public:
         return clusternames.ordinality();
     }
 
-    unsigned find(const char *clustername)
+    unsigned find(const char *_clusterName)
     {
+        StringAttr clusterName = _clusterName;
+        clusterName.toLowerCase();
         StringBuffer name;
         ForEachItem(i)  {
-            if (strcmp(item(i).getClusterLabel(name.clear()).str(),clustername)==0) 
+            if (strcmp(item(i).getClusterLabel(name.clear()).str(),clusterName)==0)
                 return i;
             if (singleclusteroverride)
                 break;
@@ -2773,17 +2775,21 @@ public:
         clusters.kill();
     }
 
-    IFileDescriptor *getFileDescriptor(const char *clustername)
+    IFileDescriptor *getFileDescriptor(const char *_clusterName)
     {
         CriticalBlock block (sect);
         Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(root,&queryNamedGroupStore(),0);
         fdesc->setTraceName(logicalName.get());
         StringArray cnames;
-        if (clustername&&*clustername)
-            cnames.append(clustername);
+        if (_clusterName&&*_clusterName)
+        {
+            StringAttr clusterName = _clusterName;
+            clusterName.toLowerCase();
+            cnames.append(clusterName);
+        }
         else
             getClusterNames(cnames);
-        fdesc->setClusterOrder(cnames,clustername&&*clustername);
+        fdesc->setClusterOrder(cnames,_clusterName&&*_clusterName);
         return fdesc.getClear();
     }
 
@@ -2931,10 +2937,8 @@ public:
         CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
         reloadClusters();
         if (findCluster(clustername)!=NotFound) {
-            if (findCluster(clustername)!=NotFound) {
-                IDFS_Exception *e = new CDFS_Exception(DFSERR_ClusterAlreadyExists,clustername);
-                throw e;
-            }
+            IDFS_Exception *e = new CDFS_Exception(DFSERR_ClusterAlreadyExists,clustername);
+            throw e;
         }
         Owned<IClusterInfo> cluster = createClusterInfo(clustername,NULL,mspec,&queryNamedGroupStore());
         if (cluster->queryGroup(&queryNamedGroupStore())) {
@@ -6792,9 +6796,11 @@ bool CDistributedFileDirectory::doRemovePhysical(CDfsLogicalFileName &dlfn,const
         file.clear();
         return doRemoveEntry(dlfn,user,ignoresub);  
     }
-    StringBuffer clustername(cluster); 
+    StringBuffer clustername(cluster);
     if (clustername.length()==0)
         dlfn.getCluster(clustername); // override
+    else
+        clustername.toLowerCase();
     if ((clustername.length()==0)||((file->findCluster(clustername.str())==0)&&(file->numClusters()==1))) {
         clustername.clear();
         file->detach(); 
@@ -7670,7 +7676,7 @@ class CInitGroups
                 throwUnexpected();
         }
         if (altName)
-            gname.clear().append(altName);
+            gname.clear().append(altName).toLowerCase();
 
         VStringBuffer xpath("Group[@name=\"%s\"]", gname.str());
         IPropertyTree *existingClusterGroup = groupsconnlock.conn->queryRoot()->queryPropTree(xpath.str()); // 'live' cluster group
@@ -7744,13 +7750,15 @@ public:
         defaultTimeout = _defaultTimeout;
     }
 
-    bool doClusterGroup(CgCmd cmd, const char *clusterName, const char *type, bool spares, SocketEndpointArray *eps, StringBuffer &messages)
+    bool doClusterGroup(CgCmd cmd, const char *_clusterName, const char *type, bool spares, SocketEndpointArray *eps, StringBuffer &messages)
     {
         Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
         if (!conn)
             return false;
-        if (!clusterName || !*clusterName)
+        if (!_clusterName || !*_clusterName)
             return false;
+        StringAttr clusterName = _clusterName;
+        clusterName.toLowerCase();
         if (!type || !*type)
             return false;
         bool ret = true;
@@ -7759,10 +7767,10 @@ public:
         StringBuffer errMsg;
         const char *clusterType = type;
         if (loadMachineMap()) {
-            VStringBuffer xpath("%s[@name=\"%s\"]", type, clusterName);
+            VStringBuffer xpath("%s[@name=\"%s\"]", type, clusterName.get());
             clusters.setown(root->getElements(xpath.str()));
             if (!clusters || !clusters->first()) {
-                VStringBuffer errMsg("Could not find type %s, %s cluster", type, clusterName);
+                VStringBuffer errMsg("Could not find type %s, %s cluster", type, clusterName.get());
                 WARNLOG("%s", errMsg.str());
                 messages.append(errMsg).newline();
                 ret = false;
@@ -7850,7 +7858,7 @@ public:
                     }
                 }
                 if (clusters->next()) {
-                    VStringBuffer errMsg("resetThorGroup: more than one cluster named: %s", clusterName);
+                    VStringBuffer errMsg("resetThorGroup: more than one cluster named: %s", clusterName.get());
                     WARNLOG("%s", errMsg.str());
                     messages.append(errMsg).newline();
                     ret = false;

+ 7 - 2
dali/base/dafdesc.cpp

@@ -404,6 +404,8 @@ public:
     CClusterInfo(const char *_name,IGroup *_group,const ClusterPartDiskMapSpec &_mspec,IGroupResolver *resolver,const char *_roxielabel)
         : name(_name),group(_group), roxielabel(_roxielabel)
     {
+        name.toLowerCase();
+        roxielabel.toLowerCase();
         mspec =_mspec;
         checkClusterName(resolver);
     }
@@ -507,6 +509,7 @@ public:
     void setGroupName(const char *_name)
     {
         name.set(_name);
+        name.toLowerCase();
     }
 
     void setGroup(IGroup *_group)
@@ -538,6 +541,7 @@ public:
     void setRoxieLabel(const char *_label)
     {
         roxielabel.set(_label);
+        roxielabel.toLowerCase();
     }
 
     const char *queryRoxieLabel()
@@ -549,7 +553,7 @@ public:
     {
         const char * label = queryRoxieLabel();
         if (label)
-            return  ret.append(label);
+            return ret.append(label);
         return getGroupName(ret,NULL);
     }
 
@@ -1879,7 +1883,8 @@ public:
         unsigned done = 0;
         StringBuffer cname;
         ForEachItemIn(i,names) {
-            const char *name = names.item(i);
+            StringAttr name = names.item(i);
+            name.toLowerCase();
             for (unsigned j=done;j<clusters.ordinality();j++) {
                 clusters.item(j).getClusterLabel(cname.clear());
                 if (strcmp(cname.str(),name)==0) {

+ 1 - 1
dali/base/dafdesc.hpp

@@ -217,7 +217,7 @@ if endCluster is not called it will assume only one cluster and not replicated
     virtual void setClusterGroup(unsigned clusternum,IGroup *grp) = 0;              // sets group for cluster
     virtual StringBuffer &getClusterGroupName(unsigned clusternum,StringBuffer &ret,IGroupResolver *resolver=NULL) = 0;                 // returns group name of cluster (if set)
     virtual void setClusterGroupName(unsigned clusternum,const char *name) = 0;     // sets group name of cluster (if set)
-    virtual void setClusterOrder(StringArray &names,bool exclusive) = 0;            // if exclusive set then othe clusters deleted
+    virtual void setClusterOrder(StringArray &names,bool exclusive) = 0;            // if exclusive set then other clusters deleted
     virtual void serializeTree(IPropertyTree &pt,unsigned flags=0) = 0;             // deserialize with deserializeFileDescriptorTree
     virtual IPropertyTree *getFileTree(unsigned flags=0) = 0;                       // flags IFDSF_*
 

+ 2 - 0
ecl/eclagent/agentctx.hpp

@@ -105,6 +105,8 @@ struct IAgentContext : extends IGlobalCodeContext
     virtual unsigned __int64 queryStopAfter() = 0;
     
     virtual const char *queryWuid() = 0;
+
+    virtual void updateWULogfile() = 0;
 };
 
 #endif // AGENTCTX_HPP_INCL

+ 7 - 6
ecl/eclagent/eclagent.cpp

@@ -505,8 +505,8 @@ public:
 
 //=======================================================================================
 
-EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *queryXML, IProperties *_globals, IPropertyTree *_config)
-    : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), globals(_globals), config(_config)
+EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, IProperties *_globals, IPropertyTree *_config, ILogMsgHandler * _logMsgHandler)
+    : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), globals(_globals), config(_config), logMsgHandler(_logMsgHandler)
 {
     isAborting = false;
     isStandAloneExe = false;
@@ -554,10 +554,10 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo
         SCMStringBuffer jobName;
         debugContext->debugInitialize(wuid, wu->getJobName(jobName).str(), true);
     }
-    if (queryXML)
+    if (_queryXML)
     {
         Owned<IWorkUnit> w = updateWorkUnit();
-        w->setXmlParams(queryXML);
+        w->setXmlParams(_queryXML);
     }
     Owned<const IPropertyTree> xmlParams = wuRead->getXmlParams();
     if (xmlParams)
@@ -3096,12 +3096,13 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
 
     //Build log file specification
     StringBuffer logfilespec;
+    ILogMsgHandler * logMsgHandler = NULL;
     if (!standAloneExe)
     {
         Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(agentTopology, "eclagent");
         lf->setMsgFields(MSGFIELD_timeDate | MSGFIELD_msgID | MSGFIELD_process | MSGFIELD_thread | MSGFIELD_code);
         lf->setCreateAliasFile(false);
-        lf->beginLogging();
+        logMsgHandler = lf->beginLogging();
         PROGLOG("Logging to %s", lf->queryLogFileSpec());
         logfilespec.set(lf->queryLogFileSpec());
     }
@@ -3311,7 +3312,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
 
             if (w)
             {
-                EclAgent agent(w, wuid.str(), globals->getPropInt("IGNOREVERSION", 0)==0, globals->getPropBool("WFRESET", false), globals->getPropBool("NORETRY", false), logfilespec.str(), globals->queryProp("allowedPipePrograms"), query.getClear(), globals, agentTopology);
+                EclAgent agent(w, wuid.str(), globals->getPropInt("IGNOREVERSION", 0)==0, globals->getPropBool("WFRESET", false), globals->getPropBool("NORETRY", false), logfilespec.str(), globals->queryProp("allowedPipePrograms"), query.getClear(), globals, agentTopology, logMsgHandler);
                 const bool isRemoteWorkunit = (daliServers.length() != 0);
                 const bool resolveFilesLocally = !isRemoteWorkunit || globals->getPropBool("USELOCALFILES", false);
                 const bool writeResultsToStdout = !isRemoteWorkunit || globals->getPropBool("RESULTSTOSTDOUT", false);

+ 7 - 2
ecl/eclagent/eclagent.ipp

@@ -253,7 +253,9 @@ public:
     {
         return ctx->queryWuid();
     }
-    
+
+    virtual void updateWULogfile()                  { return ctx->updateWULogfile(); }
+
 protected:
     IAgentContext * ctx;
 };
@@ -385,6 +387,7 @@ private:
     SafePluginMap *pluginMap;
     IProperties *globals;
     IPropertyTree *config;
+    ILogMsgHandler *logMsgHandler;
     StringAttr agentTempDir;
     Owned<IOrderedOutputSerializer> outputSerializer;
 
@@ -444,7 +447,7 @@ private:
 public:
     IMPLEMENT_IINTERFACE;
 
-    EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *queryXML, IProperties *globals, IPropertyTree *config);
+    EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, IProperties *_globals, IPropertyTree *_config, ILogMsgHandler * _logMsgHandler);
     ~EclAgent();
 
     void setBlocked();
@@ -676,6 +679,8 @@ public:
     
     IGroup *getHThorGroup(StringBuffer &out);
     
+    virtual void updateWULogfile();
+
 };
 
 //---------------------------------------------------------------------------

+ 27 - 1
ecl/eclagent/eclgraph.cpp

@@ -978,6 +978,7 @@ void EclSubGraph::execute(const byte * parentExtract)
             wu->setTimerInfo(timer.str(), NULL, msTick()-startTime, 1, 0);
         }
     }
+    agent->updateWULogfile();//Update workunit logfile name in case of rollover
 }
 
 
@@ -1708,6 +1709,31 @@ void EclAgent::executeThorGraph(const char * graphName)
     while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
 }
 
+//In case of logfile rollover, update workunit logfile name(s) stored
+//in SDS/WorkUnits/{WUID}/Process/EclAgent/myeclagent<log>
+void EclAgent::updateWULogfile()
+{
+    if (logMsgHandler && config->hasProp("@name"))
+    {
+        StringBuffer logname;
+        bool ok = logMsgHandler->getLogName(logname);
+        if (ok)
+        {
+            RemoteFilename rlf;
+            rlf.setLocalPath(logname);
+            rlf.getRemotePath(logname.clear());
+
+            Owned <IWorkUnit> w = updateWorkUnit();
+            w->addProcess("EclAgent", config->queryProp("@name"), logname.str());
+        }
+        else
+        {
+            DBGLOG("ERROR: Unable to query logfile name");
+            assertex(ok);
+        }
+    }
+}
+
 void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
 {
     assertex(parentExtractSize == 0);
@@ -1735,12 +1761,12 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
             if (guillotineTimeout)
                 abortmonitor->setGuillotineTimeout(guillotineTimeout);
             activeGraph->execute(NULL);
+            updateWULogfile();//Update workunit logfile name in case of rollover
             if (guillotineTimeout)
                 abortmonitor->setGuillotineTimeout(0);
             if (debugContext)
                 debugContext->checkBreakpoint(DebugStateGraphEnd, NULL, graphName);
             activeGraph.clear();
-
             if (debugContext)
             {
                 if (isAborting)

+ 28 - 21
esp/eclwatch/ws_XSLT/wuidcommon.xslt

@@ -1080,12 +1080,6 @@
               <xsl:value-of select="Value"/>
             </a>
           </td>
-          <xsl:variable name="resultname" select="Name"/>
-          <xsl:for-each select="/WUInfoResponse/ResultViews/View">
-            <td>
-              <a href="javascript:void(0);" onclick="getLink(document.getElementById('ECL_Result_{$position}'), '/WsWorkunits/WUResultView?Wuid={$wuid}&amp;ResultName={$resultname}&amp;ViewName={.}');return false;"><xsl:value-of select="."/></a>
-            </td>
-          </xsl:for-each>
           <td>
             <a href="javascript:void(0);" onclick="getLink(document.getElementById('ECL_Result_{position()}'), '/WsWorkunits/WUResultBin?Format=zip&amp;Wuid={$wuid}&amp;Sequence={Link}');return false;">.zip</a>
           </td>
@@ -1102,6 +1096,12 @@
               </a>
             </xsl:if>
           </td>
+          <xsl:variable name="resultname" select="Name"/>
+          <xsl:for-each select="/WUInfoResponse/ResultViews/View">
+            <td>
+              <a href="javascript:void(0);" onclick="getLink(document.getElementById('ECL_Result_{$position}'), '/WsWorkunits/WUResultView?Wuid={$wuid}&amp;ResultName={$resultname}&amp;ViewName={.}');return false;"><xsl:value-of select="."/></a>
+            </td>
+          </xsl:for-each>
         </xsl:when>
         <xsl:when test="number(ShowFileContent) and string-length(FileName)">
           <td>
@@ -1109,6 +1109,9 @@
               <xsl:value-of select="Value"/>
             </a>
           </td>
+          <td/>
+          <td/>
+          <td/>
           <td>
             <xsl:if test="string-length(FileName)">
               <a href="/WsDfu/DFUInfo?Name={FileName}" >
@@ -1123,26 +1126,30 @@
             <xsl:value-of select="Value"/>
             <xsl:text disable-output-escaping="yes"><![CDATA[ </span>]]></xsl:text>
          </td>
-            <td>
-              <xsl:if test="string-length(FileName)">
-                 <a href="/WsDfu/DFUInfo?Name={FileName}" >
+         <td/>
+         <td/>
+         <td/>
+         <td>
+            <xsl:if test="string-length(FileName)">
+                <a href="/WsDfu/DFUInfo?Name={FileName}" >
                     <xsl:value-of select="FileName"/>
-                 </a>
-              </xsl:if>
-            </td>
+                </a>
+            </xsl:if>
+         </td>
        </xsl:when>
        <xsl:otherwise>
-            <td/>
-            <td/>
-            <td/>
+         <td/>
+         <td/>
+         <td/>
+         <td/>
          <td><xsl:value-of select="Value"/></td>
-            <td>
-              <xsl:if test="string-length(FileName)">
-                 <a href="/WsDfu/DFUInfo?Name={FileName}" >
+         <td>
+            <xsl:if test="string-length(FileName)">
+                <a href="/WsDfu/DFUInfo?Name={FileName}" >
                     <xsl:value-of select="FileName"/>
-                 </a>
-              </xsl:if>
-            </td>
+                </a>
+            </xsl:if>
+         </td>
        </xsl:otherwise>
      </xsl:choose>
     </tr>

+ 16 - 12
esp/files/scripts/GraphControl.js

@@ -143,23 +143,27 @@ define([
 		},
 
 		centerOn: function (globalID) {
-			var item = this.obj.getItem(globalID);
-			this.obj.centerOnItem(item, true);
-			var items = [item];
-			this.obj.setSelected(items, true);
+			if (this.obj) {
+				var item = this.obj.getItem(globalID);
+				this.obj.centerOnItem(item, true);
+				var items = [item];
+				this.obj.setSelected(items, true);
+			}
 		},
 
 		watchSelect: function (select) {
-			if (sniff("chrome") && select) {
-				var context = this;
+			if (this.obj) {
+				if (sniff("chrome") && select) {
+					var context = this;
 
-				aspect.before(select, "openDropDown", function () {
-					dojo.style(context.obj, "height", "0px");
-				});
+					aspect.before(select, "openDropDown", function () {
+						dojo.style(context.obj, "height", "0px");
+					});
 
-				aspect.after(select, "closeDropDown", function (focus) {
-					dojo.style(context.obj, "height", "100%");
-				});
+					aspect.after(select, "closeDropDown", function (focus) {
+						dojo.style(context.obj, "height", "100%");
+					});
+				}
 			}
 		},
 

+ 1 - 1
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -3713,7 +3713,7 @@ void deployEclOrArchive(IEspContext &context, IEspWUDeployWorkunitRequest & req,
     wu->commit();
     wu.clear();
 
-    submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false, NULL, NULL, &req.getDebugValues());
+    submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
     waitForWorkUnitToCompile(wuid.str(), req.getWait());
 
     WsWuInfo winfo(context, wuid.str());

+ 3 - 3
initfiles/bin/init_configesp

@@ -31,9 +31,7 @@ ulimit -n 8192
 SNMPID=$$
 
 killed() {
-    rm -f ${SENTINEL}
-    killall -9 configesp 2>/dev/null
-    sleep 1
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -42,6 +40,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 nohup configesp 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -49,6 +48,7 @@ while [ -e ${SENTINEL} ]; do
         nohup configesp 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 3
initfiles/bin/init_dafilesrv.in

@@ -41,9 +41,7 @@ export SENTINEL="dafilesrv.sentinel"
 rm -f ${SENTINEL}
 
 killed(){
-    rm -f ${SENTINEL}
-    killall -9 dafilesrv 2> /dev/null
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 ulimit -n $handlelimit
@@ -51,6 +49,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 dafilesrv -L $log -I ${INSTANCE_NAME} &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -58,5 +57,6 @@ while [ -e ${SENTINEL} ]; do
         dafilesrv -L $log -I ${INSTANCE_NAME} &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 3 - 1
initfiles/bin/init_dali

@@ -29,7 +29,7 @@ ulimit -n 8192
 
 killed(){
     dalistop .
-    rm -f ${SENTINEL}
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -37,6 +37,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 daserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -44,5 +45,6 @@ while [ -e ${SENTINEL} ]; do
         daserver 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 5 - 6
initfiles/bin/init_dfuserver

@@ -30,12 +30,9 @@ ulimit -n 8192
 ulimit -s 512
 
 killed() {
-    if [ -e ${SENTINEL} ]; then
-        rm -f ${SENTINEL}
-        dfuserver stop=1
-        sleep 5
-        killall -9 dfuserver
-    fi
+    dfuserver stop=1
+    sleep 5
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -44,6 +41,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 dfuserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -51,5 +49,6 @@ while [ -e ${SENTINEL} ]; do
         dfuserver 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 3 - 7
initfiles/bin/init_eclagent.in

@@ -30,13 +30,7 @@ rm -f ${SENTINEL}
 rm -f ${PID_DIR}/hthortemp/*
 
 killed (){
-    rm -f ${SENTINEL}
-
-    killall agentexec
-    killall eclagent
-    sleep 2
-    killall -9 agentexec eclagent 1>/dev/null 2>&1 
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -47,6 +41,7 @@ ulimit -c unlimited
 agentexec 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 1
@@ -54,6 +49,7 @@ while [ -e ${SENTINEL} ]; do
         agentexec 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 11
initfiles/bin/init_eclccserver

@@ -26,17 +26,7 @@ export SENTINEL="eclccserver.sentinel"
 rm -f ${SENTINEL}
 
 killed() {
-    PATH_PRE=`type -path hpcc_setenv`
-    source ${PATH_PRE}
-    PID_NAME="$PID/`basename $PWD`.pid"
-
-    if [ -e ${SENTINEL} ]; then
-            rm -f ${SENTINEL}
-            if [ -e ${PID_NAME} ]; then
-                    pidwait_fn `cat ${PID_NAME}` 3
-            fi
-    fi
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -44,6 +34,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 eclccserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -51,6 +42,7 @@ while [ -e ${SENTINEL} ]; do
         eclccserver 1>/dev/null 2>/dev/null & 
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 3
initfiles/bin/init_eclscheduler

@@ -26,9 +26,7 @@ export SENTINEL="eclscheduler.sentinel"
 rm -f ${SENTINEL}
 
 killed(){
-    rm -f ${SENTINEL}
-    killall -9 eclscheduler
-    sleep
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -36,6 +34,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 eclscheduler 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -43,6 +42,7 @@ while [ -e ${SENTINEL} ]; do
         eclscheduler 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 5
initfiles/bin/init_esp

@@ -30,11 +30,7 @@ ulimit -n 8192
 SNMPID=$$
 
 killed() {
-    rm -f ${SENTINEL}
-    killall esp 1>/dev/null 2>/dev/null
-    sleep 15
-    killall -9 esp 1>/dev/null 2>/dev/null 
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 15
     exit 255
 }
 
@@ -42,11 +38,13 @@ trap "killed" SIGINT SIGTERM SIGKILL
 esp snmpid=$SNMPID 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 while [ -e ${SENTINEL} ]; do
     sleep 5
     if [ -e ${SENTINEL} ]; then
         esp snmpid=$SNMPID 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 3 - 5
initfiles/bin/init_roxie

@@ -47,11 +47,7 @@ killed() {
     if [ -n "$1" ]; then
         cd $1
     fi
-    rm -f ${SENTINEL}
-    killall roxie
-    sleep 5
-    killall -9 roxie
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -59,6 +55,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 nohup roxie --topology=RoxieTopology.xml --logfile=$logfilename --restarts=$restarts --stdlog=0 2>>$logfilename.stderr 1>>$logfilename.stdout &
 echo $! > $PID_NAME 
 wait
+rm $PID_NAME
 
 # Automatically restart roxie when it dies
 while [ -e ${SENTINEL} ]; do
@@ -68,5 +65,6 @@ while [ -e ${SENTINEL} ]; do
     nohup roxie --topology=RoxieTopology.xml --logfile=$logfilename --restarts=$restarts --stdlog=0 2>>$logfilename.stderr 1>>$logfilename.stdout &
     echo $! > $PID_NAME
     wait
+    rm $PID_NAME
 done
 

+ 3 - 2
initfiles/bin/init_sasha

@@ -34,7 +34,6 @@ ulimit -n 8192
 which_pidof
 
 killed() {
-    rm -f ${SENTINEL}
     pid=`${PIDOF} saserver`
     if [ -n "$pid" ]; then
         sasha server=. action=stop
@@ -50,7 +49,7 @@ killed() {
             kill -9 $pid
         fi
     fi
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -58,6 +57,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 saserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -65,5 +65,6 @@ while [ -e ${SENTINEL} ]; do
         saserver 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 4 - 1
initfiles/bin/init_thor

@@ -23,10 +23,13 @@ PID_NAME="$PID/`basename $PWD`.pid"
 INIT_PID_NAME="$PID/init_`basename $PWD`.pid"
 echo $$ > $INIT_PID_NAME
 
+export SENTINEL="thor.sentinel"
+rm -f ${SENTINEL}
+
 killed() {
         echo "Stopping"
         $deploydir/stop_thor $deploydir
-        rm $PID_NAME
+        kill_process ${SENTINEL} ${PID_NAME} 3
         exit 255
 }
 

+ 24 - 10
initfiles/sbin/hpcc_setenv.in

@@ -22,17 +22,31 @@
 
 ###<REPLACE>###
 
+function kill_process () {
+    SENTINEL=$1
+    PID=$2
+    TIMEOUT=$3
+    if [ -e $SENTINEL ]; then
+        rm -f $SENTINEL
+    fi
+    if [ -e $PID ]; then
+        pidwait_fn $PID $TIMEOUT
+    fi
+}
+
 function pidwait_fn () {
-        WATCH_PID=$1
-        TIMEOUT=$(($2*1000))
-        kill $WATCH_PID
-        while [ -d /proc/$WATCH_PID -a $TIMEOUT -gt 0 ]; do
-                sleep 0.2
-                TIMEOUT=$(($TIMEOUT-200))
-        done
-        if [ $TIMEOUT -le 0 ]; then
-                kill -9 $WATCH_PID
-        fi
+    PID=$1
+    WATCH_PID=`cat $PID`
+    TIMEOUT=$(($2*1000))
+    kill $WATCH_PID
+    while [ -d /proc/$WATCH_PID -a $TIMEOUT -gt 0 ]; do
+            sleep 0.2
+            TIMEOUT=$(($TIMEOUT-200))
+    done
+    if [ $TIMEOUT -le 0 ]; then
+            kill -9 $WATCH_PID
+    fi
+    rm -f $PID
 }
 
 platform='unknown'

+ 33 - 131
roxie/ccd/ccdfile.cpp

@@ -398,7 +398,7 @@ public:
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
 
     virtual const char *queryFilename() { return logical->queryFilename(); }
-    virtual bool IsShared() const { return CInterface::IsShared(); }
+    virtual bool isAlive() const { return CInterface::isAlive(); }
     virtual int getLinkCount() const { return CInterface::getLinkCount(); }
     virtual RoxieFileType getFileType() { return fileType; }
 
@@ -515,7 +515,7 @@ typedef MapStringTo<StringArrayPtr> MapStringToDiffFileUsage;
 
 class CRoxieFileCache : public CInterface, implements ICopyFileProgress, implements IRoxieFileCache
 {
-    ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
+    mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
     InterruptableSemaphore toCopy;
     InterruptableSemaphore toClose;
     mutable CopyMapStringToMyClass<ILazyFileIO> files;
@@ -1011,10 +1011,14 @@ public:
                         break;
                     if (todo.ordinality())
                     {
-                        next.set(&todo.pop());
+                        ILazyFileIO *popped = &todo.pop();
+                        if (popped->isAlive())
+                        {
+                            next.set(popped);
+                            if (next)
+                                currentTodoFile.append(next->queryFilename());
+                        }
                         atomic_dec(&numFilesToProcess);    // must decrement counter for SNMP accuracy
-                        if (next)
-                            currentTodoFile.append(next->queryFilename());
                     }
                 }
                 if (next)
@@ -1131,6 +1135,13 @@ public:
         ILazyFileIO *goer = files.getValue(filename);
         if (goer == file)
             files.remove(filename);
+        ForEachItemInRev(idx, todo)
+        {
+            if (file == &todo.item(idx))
+            {
+                todo.remove(idx);
+            }
+        }
     }
 
     virtual ILazyFileIO *lookupFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const char *baseIndexFileName,  ILazyFileIO *patchFile, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &deployedLocationInfo, offset_t size, const CDateTime &modified, bool memFile, bool isRemote, bool startFileCopy, bool doForegroundCopy, unsigned crc, bool isCompressed, const char *lookupDali)
@@ -1140,45 +1151,30 @@ public:
         {
             CriticalBlock b(crit);
             ILazyFileIO *f = files.getValue(localLocation);
-            if (f)
+            if (f && f->isAlive())
             {
                 if ((size != -1 && size != f->getSize()) ||
                     (!modified.isNull() && !modified.equals(*f->queryDateTime(), false)))
                 {
-                    if (!f->IsShared())
-                    {
-                        // kill it
-                        files.remove(localLocation);
-                        ForEachItemInRev(idx, todo)
-                        {
-                            if (f == &todo.item(idx))
-                            {
-                                todo.remove(idx);
-                            }
-                        }
-                    }
-                    else
+                    StringBuffer modifiedDt;
+                    if (!modified.isNull())
+                        modified.getString(modifiedDt);
+                    StringBuffer fileDt;
+                    f->queryDateTime()->getString(fileDt);
+                    if (fileErrorList.find(id) == 0)
                     {
-                        StringBuffer modifiedDt;
-                        if (!modified.isNull())
-                            modified.getString(modifiedDt);
-                        StringBuffer fileDt;
-                        f->queryDateTime()->getString(fileDt);
-                        if (fileErrorList.find(id) == 0)
+                        switch (fileType)
                         {
-                            switch (fileType)
-                            {
-                                case ROXIE_KEY:
-                                    fileErrorList.setValue(id, "Key");
-                                    break;
+                            case ROXIE_KEY:
+                                fileErrorList.setValue(id, "Key");
+                                break;
 
-                                case ROXIE_FILE:
-                                    fileErrorList.setValue(id, "File");
-                                    break;
-                            }
+                            case ROXIE_FILE:
+                                fileErrorList.setValue(id, "File");
+                                break;
                         }
-                        throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d  Date = %s  %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
                     }
+                    throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d  Date = %s  %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
                 }
                 else
                     return LINK(f);
@@ -1252,7 +1248,7 @@ public:
         {
             CriticalBlock b(crit);
             ret = files.getValue(localLocation);
-            if (ret)
+            if (ret && ret->isAlive())
             {
                 if (crc)
                 {
@@ -1299,26 +1295,6 @@ public:
         return ret;
     }
 
-    virtual IFileIO *lookupPluginFile(const char* dllname, const char *localLocation)
-    {
-        // plugins should be copied via deployment tool, not by roxie, so they should already be copied to the node - error if not
-        ILazyFileIO *ret;
-        {
-            CriticalBlock b(crit);
-            ret = files.getValue(localLocation);
-            if (ret) // already opened, valid and increase LINK count
-            {
-                // MORE - need to check version label
-                return LINK(ret);
-            }
-            
-            ret = openPlugin(dllname, localLocation);
-            files.setValue(localLocation, ret);
-        }
-        ret->checkOpen();
-        return ret;
-    }
-
     virtual void closeExpired(bool remote)
     {
         // This schedules a close at the next available opportunity
@@ -1343,7 +1319,7 @@ public:
         ForEach(h)
         {
             ILazyFileIO *f = files.mapToValue(&h.query());
-            if (f->isOpen() && f->isRemote()==remote && !f->isCopying())
+            if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
             {
                 unsigned age = msTick() - f->getLastAccessed();
                 if (age > maxFileAge[remote])
@@ -1430,80 +1406,6 @@ public:
         }
     }
 
-    virtual void flushUnused(bool deleteFiles, bool cleanUpOneTimeQueries)
-    {
-        CriticalBlock b(crit);
-        // Everything currently in the queue should be removed if it is only in the queue and the cache.
-        ForEachItemInRev(idx, todo)
-        {
-            ILazyFileIO *f = &todo.item(idx);
-            if (f)  // make sure it hasn't already been blanked out by prior call to flushUnused
-            {
-                if (!f->IsShared())
-                    todo.remove(idx);
-            }
-        }
-
-        IArrayOf<ILazyFileIO> goers;
-        HashIterator h(files);
-        StringAttrMapping metaFileGoers;
-
-        ForEach(h)
-        {
-            // checking if file is shared, but count is 1 and names match - if so it means the only use of this file 
-            // is the copy, so we can remove the file
-            ILazyFileIO *f = files.mapToValue(&h.query());
-            if (!f->IsShared())
-                goers.append(*LINK(f));
-            else if (f->getLinkCount() == 2 && (stricmp(f->queryFilename(), currentTodoFile.str()) == 0) && deleteFiles)
-            {
-                // cannot be in the middle of copying a dll so don't worry about cleanUpOneTimeQueries
-                // this file is in the middle of being copied, get rid of it...
-                needToDeleteFile = true;
-                goers.append(*LINK(f));
-
-                DBGLOG("NAME = %s", f->queryFilename());
-            }
-        }
-
-        ForEachItemInRev(idx1, goers)
-        {
-            ILazyFileIO *item = &goers.item(idx1);
-            StringBuffer goer(item->queryFilename());
-
-            RoxieFileType type = item->getFileType();
-
-            if (cleanUpOneTimeQueries)  // only want to delete dlls
-            {
-                if (type != ROXIE_WU_DLL)
-                    continue;
-            }
-            
-            if (!files.remove(goer))
-                DBGLOG("ERROR - file was not removed from cache %s", goer.str());
-            else if (deleteFiles || type == ROXIE_WU_DLL)  // always want to delete WU dlls
-            {
-                bool isRemote = item->isRemote();
-
-                goers.remove(idx1);   // remove so we can delete the file if needed
-                if ((!isRemote) && (type != ROXIE_PLUGIN_DLL))
-                {
-                    DBGLOG("trying to delete - file %s", goer.str());
-                    try
-                    {
-                        OwnedIFile unneededFile = createIFile(goer.str());
-                        unneededFile->remove();
-                    }
-                    catch (IException *E)
-                    {
-                        EXCLOG(MCoperatorError, E, "While trying to delete a file");
-                        E->Release();
-                    }
-                }
-            }
-        }
-    }
-    
     int numFilesToCopy()
     {
         CriticalBlock b(crit);

+ 1 - 3
roxie/ccd/ccdfile.hpp

@@ -31,7 +31,7 @@ interface ILazyFileIO : extends IFileIO
 {
     virtual const char *queryFilename() = 0;
     virtual void checkOpen() = 0;
-    virtual bool IsShared() const = 0;
+    virtual bool isAlive() const = 0;
     virtual void addSource(IFile *source) = 0;
     virtual bool isRemote() = 0;
     virtual offset_t getSize() = 0;
@@ -66,8 +66,6 @@ interface IRoxieFileCache : extends IInterface
 {
     virtual ILazyFileIO *lookupFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const char *baseIndexFileName, ILazyFileIO *patchFile, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &deployedLocationInfo, offset_t size, const CDateTime &modified, bool memFile, bool isRemote, bool startFileCopy, bool doForegroundCopy, unsigned crc, bool isCompressed, const char *lookupDali) = 0;
     virtual IFileIO *lookupDllFile(const char* dllname, const char *localLocation, const StringArray &remoteNames, unsigned crc, bool isRemote) = 0;
-    virtual IFileIO *lookupPluginFile(const char* dllname, const char *localLocation) = 0;
-    virtual void flushUnused(bool deleteFiles, bool cleanUpOneTimeQueries) = 0;
     virtual RoxieFileStatus fileUpToDate(IFile *f, RoxieFileType fileType, offset_t size, const CDateTime &modified, unsigned crc, const char* id, bool isCompressed) = 0;
     virtual int numFilesToCopy() = 0;
     virtual void closeExpired(bool remote) = 0;

+ 5 - 3
rtl/eclrtl/rtlds.cpp

@@ -63,16 +63,18 @@ void RtlDatasetBuilder::ensure(size32_t required)
     if (required > maxSize)
     {
         maxSize = getNextSize(maxSize, required);
-        buffer = (byte *)realloc(buffer, maxSize);
-        if (!buffer)
+        byte * newbuffer = (byte *)realloc(buffer, maxSize);
+        if (!newbuffer)
             throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
+        buffer = newbuffer;
     }
+    self = buffer + totalSize;
 }
 
 byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
 {
     ensure(totalSize + required);
-    return buffer + totalSize;
+    return self; // self is updated by ensure()
 }
 
 void RtlDatasetBuilder::flushDataset()

+ 1 - 1
system/jlib/jlog.ipp

@@ -604,7 +604,7 @@ public:
     int                       flush() { CriticalBlock block(crit); return fflush(handle); }
     char const *              disable();
     void                      enable();
-    bool                      getLogName(StringBuffer &name) const { name.append(filename); return true; }
+    bool                      getLogName(StringBuffer &name) const { CriticalBlock block(crit); name.append(filename); return true; }
 protected:
     void                      checkRollover() const;
     void                      doRollover(bool daily, const char *forceName = NULL) const;

+ 24 - 9
system/jlib/jstring.cpp

@@ -1175,19 +1175,34 @@ void StringAttr::setown(const char * _text)
   text = (char *)_text;
 }
 
+void StringAttr::toLowerCase()
+{
+    if (text)
+    {
+        char * cur = text;
+        char next;
+        while ((next = *cur) != 0)
+        {
+            if (isupper(next))
+                *cur = tolower(next);
+            cur++;
+        }
+    }
+}
+
 void StringAttr::toUpperCase()
 {
-  if (text)
-  {
-    char * cur = text;
-    char next;
-    while ((next = *cur) != 0)
+    if (text)
     {
-      if (islower(next))
-        *cur = toupper(next);
-      cur++;
+        char * cur = text;
+        char next;
+        while ((next = *cur) != 0)
+        {
+            if (islower(next))
+              *cur = toupper(next);
+            cur++;
+        }
     }
-  }
 }
 
 

+ 1 - 0
system/jlib/jstring.hpp

@@ -252,6 +252,7 @@ public:
     void         set(const char * _text);
     void         setown(const char * _text);
     void         set(const char * _text, unsigned _len);
+    void         toLowerCase();
     void         toUpperCase();
     
 private:

+ 24 - 14
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -287,7 +287,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
         PointerIArrayOf<CSendBucket> buckets;
         UnsignedArray candidates;
         size32_t totalSz;
-        bool senderFull, doDedup;
+        bool senderFull, doDedup, aborted;
         Semaphore senderFullSem;
         Linked<IException> exception;
         OwnedMalloc<bool> senderFinished;
@@ -309,11 +309,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             numFinished = 0;
             dedupSamples = dedupSuccesses = 0;
             doDedup = owner.doDedup;
-            writerPool.setown(createThreadPool("HashDist writer pool", this, &owner, owner.writerPoolSize, 5*60*1000));
+            writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
             self = owner.activity->queryJob().queryMyRank()-1;
             for (n=0; n<owner.numnodes; n++)
                 pendingBuckets.append(new CSendBucketQueue);
             numActiveWriters = 0;
+            aborted = false;
         }
 
         void dedup(CSendBucket *sendBucket)
@@ -592,6 +593,8 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                             }
                         }
                     }
+                    if (aborted)
+                        break;
                     const void *row = input->ungroupedNextRow();
                     if (!row)
                         break;
@@ -619,22 +622,25 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             }
             catch (IException *e)
             {
-                ActPrintLog(owner.activity, e, "HDIST: sendloop");
+                ActPrintLog(owner.activity, e, "HDIST: sender.process");
                 owner.fireException(e);
             }
 
             ActPrintLog(owner.activity, "Distribute send finishing");
-            // send remainder
-            Owned<IShuffledIterator> iter = createShuffledIterator(owner.numnodes);
-            ForEach(*iter)
-            {
-                unsigned dest=iter->get();
-                Owned<CSendBucket> bucket = getBucketClear(dest);
-                HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
-                if (bucket && bucket->querySize())
+            if (!aborted)
+            {
+                // send remainder
+                Owned<IShuffledIterator> iter = createShuffledIterator(owner.numnodes);
+                ForEach(*iter)
                 {
-                    HDSendPrintLog3("Sending last bucket(s): %d, size = %d", bucket->queryDestination(), bucket->querySize());
-                    add(bucket.getClear());
+                    unsigned dest=iter->get();
+                    Owned<CSendBucket> bucket = getBucketClear(dest);
+                    HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
+                    if (bucket && bucket->querySize())
+                    {
+                        HDSendPrintLog3("Sending last bucket(s): %d, size = %d", bucket->queryDestination(), bucket->querySize());
+                        add(bucket.getClear());
+                    }
                 }
             }
             ActPrintLog(owner.activity, "HDIST: waiting for threads");
@@ -652,8 +658,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
     // IExceptionHandler impl.
         virtual bool fireException(IException *e)
         {
-            if (!exception.get())
+            if (!aborted)
+            {
                 exception.set(e);
+                aborted = true;
+                senderFullSem.signal(); // send regardless, because senderFull could be about to be set.
+            }
             return owner.fireException(e);
         }
         friend class CWriteHandler;

+ 63 - 6
thorlcr/graph/thgraphmaster.cpp

@@ -1256,16 +1256,57 @@ CJobMaster::~CJobMaster()
     tmpHandler.clear();
 }
 
+static IException *createBCastException(unsigned slave, const char *errorMsg)
+{
+    // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
+    StringBuffer msg("General failure communicating to slave");
+    if (slave)
+        msg.append("(").append(slave).append(") ");
+    else
+        msg.append("s ");
+    Owned<IThorException> e = MakeThorException(0, "%s", msg.append(" [").append(errorMsg).append("]").str());
+    e->setAction(tea_shutdown);
+    return e.getClear();
+}
+
 void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, mptag_t *_replyTag, bool sendOnly)
 {
     mptag_t replyTag = createReplyTag();
     msg.setReplyTag(replyTag);
-    if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
+    if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
+    {
+        class CSendAsyncfor : public CAsyncFor
+        {
+            CJobMaster &job;
+            CMessageBuffer &msg;
+            mptag_t mptag;
+            unsigned timeout;
+            StringAttr errorMsg;
+        public:
+            CSendAsyncfor(CJobMaster &_job, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg)
+                : job(_job), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg)
+            {
+            }
+            void Do(unsigned i)
+            {
+                if (!job.queryJobComm().send(msg, i+1, mptag, timeout))
+                    throw createBCastException(i+1, errorMsg);
+            }
+        } afor(*this, msg, mptag, timeout, errorMsg);
+        try
+        {
+            afor.For(querySlaves(), querySlaves());
+        }
+        catch (IException *e)
+        {
+            EXCLOG(e, "broadcastSendAsync");
+            abort(e);
+            throw;
+        }
+    }
+    else if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
     {
-        // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
-        StringBuffer msg("General failure communicating to slaves [");
-        Owned<IThorException> e = MakeThorException(0, "%s", msg.append(errorMsg).append("]").str());
-        e->setAction(tea_shutdown);
+        Owned<IException> e = createBCastException(0, errorMsg);
         EXCLOG(e, NULL);
         abort(e);
         throw e.getClear();
@@ -1274,6 +1315,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
     if (_replyTag)
         *_replyTag = replyTag;
     unsigned respondents = 0;
+    Owned<IBitSet> bitSet = createBitSet();
     loop
     {
         rank_t sender;
@@ -1282,7 +1324,21 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
         {
             if (_replyTag) _replyTag = NULL;
             StringBuffer tmpStr;
-            Owned<IException> e = MakeThorFatal(NULL, 0, "%s - Timeout receiving from slaves", errorMsg?tmpStr.append(": ").append(errorMsg).str():"");
+            if (errorMsg)
+                tmpStr.append(": ").append(errorMsg).append(" - ");
+            tmpStr.append("Timeout receiving from slaves - no reply from: [");
+            unsigned s = bitSet->scan(0, false);
+            assertex(s<querySlaves()); // must be at least one
+            tmpStr.append(s+1);
+            loop
+            {
+                s = bitSet->scan(s+1, false);
+                if (s>=querySlaves())
+                    break;
+                tmpStr.append(",").append(s+1);
+            }
+            tmpStr.append("]");
+            Owned<IException> e = MakeThorFatal(NULL, 0, " %s", tmpStr.str());
             EXCLOG(e, NULL);
             throw e.getClear();
         }
@@ -1296,6 +1352,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
             throw e.getClear();
         }
         ++respondents;
+        bitSet->set((unsigned)sender-1);
         if (respondents == querySlaveGroup().ordinality())
             break;
     }