Browse Source

Merge remote-tracking branch 'origin/candidate-3.10.x'

Conflicts:
	dali/base/dadfs.cpp
	esp/files/scripts/GraphControl.js
	esp/services/ws_workunits/ws_workunitsService.cpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 năm trước cách đây
mục cha
commit
7f2c6a9c5c
43 tập tin đã thay đổi với 1123 bổ sung357 xóa
  1. 1 0
      cmake_modules/dependencies/quantal.cmake
  2. 22 16
      dali/base/dadfs.cpp
  3. 7 2
      dali/base/dafdesc.cpp
  4. 1 1
      dali/base/dafdesc.hpp
  5. 1 1
      docs/ECLProgrammersGuide/PRG_Mods/PrG_Complex_ROXIE_Queries.xml
  6. 1 1
      docs/ECLProgrammersGuide/PRG_Mods/PrG_Smart_Stepping.xml
  7. 1 1
      docs/ECLProgrammersGuide/PRG_Mods/PrG_Using_Group_Function.xml
  8. 5 3
      docs/ECLReference/ECLReference.xml
  9. 2 0
      ecl/eclagent/agentctx.hpp
  10. 7 6
      ecl/eclagent/eclagent.cpp
  11. 7 2
      ecl/eclagent/eclagent.ipp
  12. 27 1
      ecl/eclagent/eclgraph.cpp
  13. 28 21
      esp/eclwatch/ws_XSLT/wuidcommon.xslt
  14. 1 1
      esp/services/ws_workunits/ws_workunitsService.cpp
  15. 3 3
      initfiles/bin/init_configesp
  16. 3 3
      initfiles/bin/init_dafilesrv.in
  17. 3 1
      initfiles/bin/init_dali
  18. 5 6
      initfiles/bin/init_dfuserver
  19. 3 7
      initfiles/bin/init_eclagent.in
  20. 3 11
      initfiles/bin/init_eclccserver
  21. 3 3
      initfiles/bin/init_eclscheduler
  22. 3 5
      initfiles/bin/init_esp
  23. 3 5
      initfiles/bin/init_roxie
  24. 3 2
      initfiles/bin/init_sasha
  25. 4 1
      initfiles/bin/init_thor
  26. 147 0
      initfiles/examples/javascript/jsmoz.ecl
  27. 81 0
      initfiles/examples/javascript/jsv8.ecl
  28. BIN
      initfiles/examples/jni/JavaCat.class
  29. 9 0
      initfiles/examples/jni/JavaCat.java
  30. 28 9
      initfiles/examples/jni/java_from_ecl.ecl
  31. 152 0
      initfiles/examples/python/python_embed.ecl
  32. 151 0
      initfiles/examples/python/python_embed2.ecl
  33. 83 68
      initfiles/examples/python/python_from_ecl.ecl
  34. 149 0
      initfiles/examples/python/python_from_ecl2.ecl
  35. 24 10
      initfiles/sbin/hpcc_setenv.in
  36. 33 131
      roxie/ccd/ccdfile.cpp
  37. 1 3
      roxie/ccd/ccdfile.hpp
  38. 5 3
      rtl/eclrtl/rtlds.cpp
  39. 1 1
      system/jlib/jlog.ipp
  40. 24 9
      system/jlib/jstring.cpp
  41. 1 0
      system/jlib/jstring.hpp
  42. 24 14
      thorlcr/activities/hashdistrib/thhashdistribslave.cpp
  43. 63 6
      thorlcr/graph/thgraphmaster.cpp

+ 1 - 0
cmake_modules/dependencies/quantal.cmake

@@ -0,0 +1 @@
+set ( CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-regex1.49.0, libicu48, libxalan110, libxerces-c28, binutils, libldap-2.4-2, openssl, zlib1g, g++, openssh-client, openssh-server, expect, libarchive12, rsync")

+ 22 - 16
dali/base/dadfs.cpp

@@ -674,11 +674,13 @@ public:
         return clusternames.ordinality();
     }
 
-    unsigned find(const char *clustername)
+    unsigned find(const char *_clusterName)
     {
+        StringAttr clusterName = _clusterName;
+        clusterName.toLowerCase();
         StringBuffer name;
         ForEachItem(i)  {
-            if (strcmp(item(i).getClusterLabel(name.clear()).str(),clustername)==0) 
+            if (strcmp(item(i).getClusterLabel(name.clear()).str(),clusterName)==0)
                 return i;
             if (singleclusteroverride)
                 break;
@@ -2933,17 +2935,21 @@ public:
         clusters.kill();
     }
 
-    IFileDescriptor *getFileDescriptor(const char *clustername)
+    IFileDescriptor *getFileDescriptor(const char *_clusterName)
     {
         CriticalBlock block (sect);
         Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(root,&queryNamedGroupStore(),0);
         fdesc->setTraceName(logicalName.get());
         StringArray cnames;
-        if (clustername&&*clustername)
-            cnames.append(clustername);
+        if (_clusterName&&*_clusterName)
+        {
+            StringAttr clusterName = _clusterName;
+            clusterName.toLowerCase();
+            cnames.append(clusterName);
+        }
         else
             getClusterNames(cnames);
-        fdesc->setClusterOrder(cnames,clustername&&*clustername);
+        fdesc->setClusterOrder(cnames,_clusterName&&*_clusterName);
         return fdesc.getClear();
     }
 
@@ -3091,10 +3097,8 @@ public:
         CClustersLockedSection cls(CDistributedFileBase<IDistributedFile>::logicalName);
         reloadClusters();
         if (findCluster(clustername)!=NotFound) {
-            if (findCluster(clustername)!=NotFound) {
-                IDFS_Exception *e = new CDFS_Exception(DFSERR_ClusterAlreadyExists,clustername);
-                throw e;
-            }
+            IDFS_Exception *e = new CDFS_Exception(DFSERR_ClusterAlreadyExists,clustername);
+            throw e;
         }
         Owned<IClusterInfo> cluster = createClusterInfo(clustername,NULL,mspec,&queryNamedGroupStore());
         if (cluster->queryGroup(&queryNamedGroupStore())) {
@@ -7881,7 +7885,7 @@ class CInitGroups
                 throwUnexpected();
         }
         if (altName)
-            gname.clear().append(altName);
+            gname.clear().append(altName).toLowerCase();
 
         VStringBuffer xpath("Group[@name=\"%s\"]", gname.str());
         IPropertyTree *existingClusterGroup = groupsconnlock.conn->queryRoot()->queryPropTree(xpath.str()); // 'live' cluster group
@@ -7955,13 +7959,15 @@ public:
         defaultTimeout = _defaultTimeout;
     }
 
-    bool doClusterGroup(CgCmd cmd, const char *clusterName, const char *type, bool spares, SocketEndpointArray *eps, StringBuffer &messages)
+    bool doClusterGroup(CgCmd cmd, const char *_clusterName, const char *type, bool spares, SocketEndpointArray *eps, StringBuffer &messages)
     {
         Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
         if (!conn)
             return false;
-        if (!clusterName || !*clusterName)
+        if (!_clusterName || !*_clusterName)
             return false;
+        StringAttr clusterName = _clusterName;
+        clusterName.toLowerCase();
         if (!type || !*type)
             return false;
         bool ret = true;
@@ -7970,10 +7976,10 @@ public:
         StringBuffer errMsg;
         const char *clusterType = type;
         if (loadMachineMap()) {
-            VStringBuffer xpath("%s[@name=\"%s\"]", type, clusterName);
+            VStringBuffer xpath("%s[@name=\"%s\"]", type, clusterName.get());
             clusters.setown(root->getElements(xpath.str()));
             if (!clusters || !clusters->first()) {
-                VStringBuffer errMsg("Could not find type %s, %s cluster", type, clusterName);
+                VStringBuffer errMsg("Could not find type %s, %s cluster", type, clusterName.get());
                 WARNLOG("%s", errMsg.str());
                 messages.append(errMsg).newline();
                 ret = false;
@@ -8061,7 +8067,7 @@ public:
                     }
                 }
                 if (clusters->next()) {
-                    VStringBuffer errMsg("resetThorGroup: more than one cluster named: %s", clusterName);
+                    VStringBuffer errMsg("resetThorGroup: more than one cluster named: %s", clusterName.get());
                     WARNLOG("%s", errMsg.str());
                     messages.append(errMsg).newline();
                     ret = false;

+ 7 - 2
dali/base/dafdesc.cpp

@@ -404,6 +404,8 @@ public:
     CClusterInfo(const char *_name,IGroup *_group,const ClusterPartDiskMapSpec &_mspec,IGroupResolver *resolver,const char *_roxielabel)
         : name(_name),group(_group), roxielabel(_roxielabel)
     {
+        name.toLowerCase();
+        roxielabel.toLowerCase();
         mspec =_mspec;
         checkClusterName(resolver);
     }
@@ -507,6 +509,7 @@ public:
     void setGroupName(const char *_name)
     {
         name.set(_name);
+        name.toLowerCase();
     }
 
     void setGroup(IGroup *_group)
@@ -538,6 +541,7 @@ public:
     void setRoxieLabel(const char *_label)
     {
         roxielabel.set(_label);
+        roxielabel.toLowerCase();
     }
 
     const char *queryRoxieLabel()
@@ -549,7 +553,7 @@ public:
     {
         const char * label = queryRoxieLabel();
         if (label)
-            return  ret.append(label);
+            return ret.append(label);
         return getGroupName(ret,NULL);
     }
 
@@ -1879,7 +1883,8 @@ public:
         unsigned done = 0;
         StringBuffer cname;
         ForEachItemIn(i,names) {
-            const char *name = names.item(i);
+            StringAttr name = names.item(i);
+            name.toLowerCase();
             for (unsigned j=done;j<clusters.ordinality();j++) {
                 clusters.item(j).getClusterLabel(cname.clear());
                 if (strcmp(cname.str(),name)==0) {

+ 1 - 1
dali/base/dafdesc.hpp

@@ -217,7 +217,7 @@ if endCluster is not called it will assume only one cluster and not replicated
     virtual void setClusterGroup(unsigned clusternum,IGroup *grp) = 0;              // sets group for cluster
     virtual StringBuffer &getClusterGroupName(unsigned clusternum,StringBuffer &ret,IGroupResolver *resolver=NULL) = 0;                 // returns group name of cluster (if set)
     virtual void setClusterGroupName(unsigned clusternum,const char *name) = 0;     // sets group name of cluster (if set)
-    virtual void setClusterOrder(StringArray &names,bool exclusive) = 0;            // if exclusive set then othe clusters deleted
+    virtual void setClusterOrder(StringArray &names,bool exclusive) = 0;            // if exclusive set then other clusters deleted
     virtual void serializeTree(IPropertyTree &pt,unsigned flags=0) = 0;             // deserialize with deserializeFileDescriptorTree
     virtual IPropertyTree *getFileTree(unsigned flags=0) = 0;                       // flags IFDSF_*
 

+ 1 - 1
docs/ECLProgrammersGuide/PRG_Mods/PrG_Complex_ROXIE_Queries.xml

@@ -73,7 +73,7 @@ END;</programlisting>
     from the function that uses it.</para>
   </sect2>
 
-  <sect2 id="Keyed_Joins">
+  <sect2 id="PG_Keyed_Joins">
     <title>Keyed Joins</title>
 
     <para>Although the FETCH function was specifically designed for indexed

+ 1 - 1
docs/ECLProgrammersGuide/PRG_Mods/PrG_Smart_Stepping.xml

@@ -4,7 +4,7 @@
 <sect1 id="Smart_Stepping">
   <title><emphasis role="bold">Smart Stepping</emphasis></title>
 
-  <sect2 id="Overview">
+  <sect2 id="PG_Overview">
     <title>Overview</title>
 
     <para>Smart Stepping is a set of indexing techniques that, taken together,

+ 1 - 1
docs/ECLProgrammersGuide/PRG_Mods/PrG_Using_Group_Function.xml

@@ -85,7 +85,7 @@ OUTPUT(i2);
     the second.</para>
   </sect2>
 
-  <sect2 id="Performance_Considerations">
+  <sect2 id="PG_Performance_Considerations">
     <title>Performance Considerations</title>
 
     <para>There is also a major performance advantage to using the GROUP

+ 5 - 3
docs/ECLReference/ECLReference.xml

@@ -1,18 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE part PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
 "http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
-<set xml:base="../">
+<set >
   <title>ECL Reference</title>
 
   <setinfo>
     <corpauthor>HPCC Systems</corpauthor>
   </setinfo>
 
-  <xi:include href="ECLLanguageReference/ECLR_mods/ECLR_includer.xml" xpointer="element(/1)"
+  <xi:include href="../ECLLanguageReference/ECLR-includer.xml" xpointer="element(/1)"
               xmlns:xi="http://www.w3.org/2001/XInclude" />
               
-  <xi:include href="ECLStandardLibraryReference/SLR-Mods/SLR-includer.xml" xpointer="element(/1)"
+  <xi:include href="../ECLStandardLibraryReference/SLR-includer.xml" xpointer="element(/1)"
               xmlns:xi="http://www.w3.org/2001/XInclude" />
 
+  <xi:include href="../ECLProgrammersGuide/PrGd-Includer.xml" xpointer="element(/1)"
+              xmlns:xi="http://www.w3.org/2001/XInclude" />
   
 </set>

+ 2 - 0
ecl/eclagent/agentctx.hpp

@@ -105,6 +105,8 @@ struct IAgentContext : extends IGlobalCodeContext
     virtual unsigned __int64 queryStopAfter() = 0;
     
     virtual const char *queryWuid() = 0;
+
+    virtual void updateWULogfile() = 0;
 };
 
 #endif // AGENTCTX_HPP_INCL

+ 7 - 6
ecl/eclagent/eclagent.cpp

@@ -505,8 +505,8 @@ public:
 
 //=======================================================================================
 
-EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *queryXML, IProperties *_globals, IPropertyTree *_config)
-    : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), globals(_globals), config(_config)
+EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, IProperties *_globals, IPropertyTree *_config, ILogMsgHandler * _logMsgHandler)
+    : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), globals(_globals), config(_config), logMsgHandler(_logMsgHandler)
 {
     isAborting = false;
     isStandAloneExe = false;
@@ -554,10 +554,10 @@ EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bo
         SCMStringBuffer jobName;
         debugContext->debugInitialize(wuid, wu->getJobName(jobName).str(), true);
     }
-    if (queryXML)
+    if (_queryXML)
     {
         Owned<IWorkUnit> w = updateWorkUnit();
-        w->setXmlParams(queryXML);
+        w->setXmlParams(_queryXML);
     }
     Owned<const IPropertyTree> xmlParams = wuRead->getXmlParams();
     if (xmlParams)
@@ -3108,12 +3108,13 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
 
     //Build log file specification
     StringBuffer logfilespec;
+    ILogMsgHandler * logMsgHandler = NULL;
     if (!standAloneExe)
     {
         Owned<IComponentLogFileCreator> lf = createComponentLogFileCreator(agentTopology, "eclagent");
         lf->setMsgFields(MSGFIELD_timeDate | MSGFIELD_msgID | MSGFIELD_process | MSGFIELD_thread | MSGFIELD_code);
         lf->setCreateAliasFile(false);
-        lf->beginLogging();
+        logMsgHandler = lf->beginLogging();
         PROGLOG("Logging to %s", lf->queryLogFileSpec());
         logfilespec.set(lf->queryLogFileSpec());
     }
@@ -3323,7 +3324,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], StringBuffer *
 
             if (w)
             {
-                EclAgent agent(w, wuid.str(), globals->getPropInt("IGNOREVERSION", 0)==0, globals->getPropBool("WFRESET", false), globals->getPropBool("NORETRY", false), logfilespec.str(), globals->queryProp("allowedPipePrograms"), query.getClear(), globals, agentTopology);
+                EclAgent agent(w, wuid.str(), globals->getPropInt("IGNOREVERSION", 0)==0, globals->getPropBool("WFRESET", false), globals->getPropBool("NORETRY", false), logfilespec.str(), globals->queryProp("allowedPipePrograms"), query.getClear(), globals, agentTopology, logMsgHandler);
                 const bool isRemoteWorkunit = (daliServers.length() != 0);
                 const bool resolveFilesLocally = !isRemoteWorkunit || globals->getPropBool("USELOCALFILES", false);
                 const bool writeResultsToStdout = !isRemoteWorkunit || globals->getPropBool("RESULTSTOSTDOUT", false);

+ 7 - 2
ecl/eclagent/eclagent.ipp

@@ -253,7 +253,9 @@ public:
     {
         return ctx->queryWuid();
     }
-    
+
+    virtual void updateWULogfile()                  { return ctx->updateWULogfile(); }
+
 protected:
     IAgentContext * ctx;
 };
@@ -385,6 +387,7 @@ private:
     SafePluginMap *pluginMap;
     IProperties *globals;
     IPropertyTree *config;
+    ILogMsgHandler *logMsgHandler;
     StringAttr agentTempDir;
     Owned<IOrderedOutputSerializer> outputSerializer;
 
@@ -444,7 +447,7 @@ private:
 public:
     IMPLEMENT_IINTERFACE;
 
-    EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *queryXML, IProperties *globals, IPropertyTree *config);
+    EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, IProperties *_globals, IPropertyTree *_config, ILogMsgHandler * _logMsgHandler);
     ~EclAgent();
 
     void setBlocked();
@@ -677,6 +680,8 @@ public:
     
     IGroup *getHThorGroup(StringBuffer &out);
     
+    virtual void updateWULogfile();
+
 };
 
 //---------------------------------------------------------------------------

+ 27 - 1
ecl/eclagent/eclgraph.cpp

@@ -982,6 +982,7 @@ void EclSubGraph::execute(const byte * parentExtract)
             wu->setTimerInfo(timer.str(), NULL, msTick()-startTime, 1, 0);
         }
     }
+    agent->updateWULogfile();//Update workunit logfile name in case of rollover
 }
 
 
@@ -1718,6 +1719,31 @@ void EclAgent::executeThorGraph(const char * graphName)
     while (resubmit); // if pause interrupted job (i.e. with pausenow action), resubmit graph
 }
 
+//In case of logfile rollover, update workunit logfile name(s) stored
+//in SDS/WorkUnits/{WUID}/Process/EclAgent/myeclagent<log>
+void EclAgent::updateWULogfile()
+{
+    if (logMsgHandler && config->hasProp("@name"))
+    {
+        StringBuffer logname;
+        bool ok = logMsgHandler->getLogName(logname);
+        if (ok)
+        {
+            RemoteFilename rlf;
+            rlf.setLocalPath(logname);
+            rlf.getRemotePath(logname.clear());
+
+            Owned <IWorkUnit> w = updateWorkUnit();
+            w->addProcess("EclAgent", config->queryProp("@name"), logname.str());
+        }
+        else
+        {
+            DBGLOG("ERROR: Unable to query logfile name");
+            assertex(ok);
+        }
+    }
+}
+
 void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
 {
     assertex(parentExtractSize == 0);
@@ -1745,12 +1771,12 @@ void EclAgent::executeGraph(const char * graphName, bool realThor, size32_t pare
             if (guillotineTimeout)
                 abortmonitor->setGuillotineTimeout(guillotineTimeout);
             activeGraph->execute(NULL);
+            updateWULogfile();//Update workunit logfile name in case of rollover
             if (guillotineTimeout)
                 abortmonitor->setGuillotineTimeout(0);
             if (debugContext)
                 debugContext->checkBreakpoint(DebugStateGraphEnd, NULL, graphName);
             activeGraph.clear();
-
             if (debugContext)
             {
                 if (isAborting)

+ 28 - 21
esp/eclwatch/ws_XSLT/wuidcommon.xslt

@@ -1076,12 +1076,6 @@
               <xsl:value-of select="Value"/>
             </a>
           </td>
-          <xsl:variable name="resultname" select="Name"/>
-          <xsl:for-each select="/WUInfoResponse/ResultViews/View">
-            <td>
-              <a href="javascript:void(0);" onclick="getLink(document.getElementById('ECL_Result_{$position}'), '/WsWorkunits/WUResultView?Wuid={$wuid}&amp;ResultName={$resultname}&amp;ViewName={.}');return false;"><xsl:value-of select="."/></a>
-            </td>
-          </xsl:for-each>
           <td>
             <a href="javascript:void(0);" onclick="getLink(document.getElementById('ECL_Result_{position()}'), '/WsWorkunits/WUResultBin?Format=zip&amp;Wuid={$wuid}&amp;Sequence={Link}');return false;">.zip</a>
           </td>
@@ -1098,6 +1092,12 @@
               </a>
             </xsl:if>
           </td>
+          <xsl:variable name="resultname" select="Name"/>
+          <xsl:for-each select="/WUInfoResponse/ResultViews/View">
+            <td>
+              <a href="javascript:void(0);" onclick="getLink(document.getElementById('ECL_Result_{$position}'), '/WsWorkunits/WUResultView?Wuid={$wuid}&amp;ResultName={$resultname}&amp;ViewName={.}');return false;"><xsl:value-of select="."/></a>
+            </td>
+          </xsl:for-each>
         </xsl:when>
         <xsl:when test="number(ShowFileContent) and string-length(FileName)">
           <td>
@@ -1105,6 +1105,9 @@
               <xsl:value-of select="Value"/>
             </a>
           </td>
+          <td/>
+          <td/>
+          <td/>
           <td>
             <xsl:if test="string-length(FileName)">
               <a href="/WsDfu/DFUInfo?Name={FileName}" >
@@ -1119,26 +1122,30 @@
             <xsl:value-of select="Value"/>
             <xsl:text disable-output-escaping="yes"><![CDATA[ </span>]]></xsl:text>
          </td>
-            <td>
-              <xsl:if test="string-length(FileName)">
-                 <a href="/WsDfu/DFUInfo?Name={FileName}" >
+         <td/>
+         <td/>
+         <td/>
+         <td>
+            <xsl:if test="string-length(FileName)">
+                <a href="/WsDfu/DFUInfo?Name={FileName}" >
                     <xsl:value-of select="FileName"/>
-                 </a>
-              </xsl:if>
-            </td>
+                </a>
+            </xsl:if>
+         </td>
        </xsl:when>
        <xsl:otherwise>
-            <td/>
-            <td/>
-            <td/>
+         <td/>
+         <td/>
+         <td/>
+         <td/>
          <td><xsl:value-of select="Value"/></td>
-            <td>
-              <xsl:if test="string-length(FileName)">
-                 <a href="/WsDfu/DFUInfo?Name={FileName}" >
+         <td>
+            <xsl:if test="string-length(FileName)">
+                <a href="/WsDfu/DFUInfo?Name={FileName}" >
                     <xsl:value-of select="FileName"/>
-                 </a>
-              </xsl:if>
-            </td>
+                </a>
+            </xsl:if>
+         </td>
        </xsl:otherwise>
      </xsl:choose>
     </tr>

+ 1 - 1
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -3713,7 +3713,7 @@ void deployEclOrArchive(IEspContext &context, IEspWUDeployWorkunitRequest & req,
     wu->commit();
     wu.clear();
 
-    submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false, false, NULL, &req.getDebugValues());
+    submitWsWorkunit(context, wuid.str(), req.getCluster(), NULL, 0, true, false, false, NULL, NULL, &req.getDebugValues());
     waitForWorkUnitToCompile(wuid.str(), req.getWait());
 
     WsWuInfo winfo(context, wuid.str());

+ 3 - 3
initfiles/bin/init_configesp

@@ -31,9 +31,7 @@ ulimit -n 8192
 SNMPID=$$
 
 killed() {
-    rm -f ${SENTINEL}
-    killall -9 configesp 2>/dev/null
-    sleep 1
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -42,6 +40,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 nohup configesp 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -49,6 +48,7 @@ while [ -e ${SENTINEL} ]; do
         nohup configesp 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 3
initfiles/bin/init_dafilesrv.in

@@ -41,9 +41,7 @@ export SENTINEL="dafilesrv.sentinel"
 rm -f ${SENTINEL}
 
 killed(){
-    rm -f ${SENTINEL}
-    killall -9 dafilesrv 2> /dev/null
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 ulimit -n $handlelimit
@@ -51,6 +49,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 dafilesrv -L $log -I ${INSTANCE_NAME} &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -58,5 +57,6 @@ while [ -e ${SENTINEL} ]; do
         dafilesrv -L $log -I ${INSTANCE_NAME} &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 3 - 1
initfiles/bin/init_dali

@@ -29,7 +29,7 @@ ulimit -n 8192
 
 killed(){
     dalistop .
-    rm -f ${SENTINEL}
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -37,6 +37,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 daserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -44,5 +45,6 @@ while [ -e ${SENTINEL} ]; do
         daserver 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 5 - 6
initfiles/bin/init_dfuserver

@@ -30,12 +30,9 @@ ulimit -n 8192
 ulimit -s 512
 
 killed() {
-    if [ -e ${SENTINEL} ]; then
-        rm -f ${SENTINEL}
-        dfuserver stop=1
-        sleep 5
-        killall -9 dfuserver
-    fi
+    dfuserver stop=1
+    sleep 5
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -44,6 +41,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 dfuserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -51,5 +49,6 @@ while [ -e ${SENTINEL} ]; do
         dfuserver 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 3 - 7
initfiles/bin/init_eclagent.in

@@ -30,13 +30,7 @@ rm -f ${SENTINEL}
 rm -f ${PID_DIR}/hthortemp/*
 
 killed (){
-    rm -f ${SENTINEL}
-
-    killall agentexec
-    killall eclagent
-    sleep 2
-    killall -9 agentexec eclagent 1>/dev/null 2>&1 
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -47,6 +41,7 @@ ulimit -c unlimited
 agentexec 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 1
@@ -54,6 +49,7 @@ while [ -e ${SENTINEL} ]; do
         agentexec 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 11
initfiles/bin/init_eclccserver

@@ -26,17 +26,7 @@ export SENTINEL="eclccserver.sentinel"
 rm -f ${SENTINEL}
 
 killed() {
-    PATH_PRE=`type -path hpcc_setenv`
-    source ${PATH_PRE}
-    PID_NAME="$PID/`basename $PWD`.pid"
-
-    if [ -e ${SENTINEL} ]; then
-            rm -f ${SENTINEL}
-            if [ -e ${PID_NAME} ]; then
-                    pidwait_fn `cat ${PID_NAME}` 3
-            fi
-    fi
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -44,6 +34,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 eclccserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -51,6 +42,7 @@ while [ -e ${SENTINEL} ]; do
         eclccserver 1>/dev/null 2>/dev/null & 
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 3
initfiles/bin/init_eclscheduler

@@ -26,9 +26,7 @@ export SENTINEL="eclscheduler.sentinel"
 rm -f ${SENTINEL}
 
 killed(){
-    rm -f ${SENTINEL}
-    killall -9 eclscheduler
-    sleep
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -36,6 +34,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 eclscheduler 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -43,6 +42,7 @@ while [ -e ${SENTINEL} ]; do
         eclscheduler 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done
 

+ 3 - 5
initfiles/bin/init_esp

@@ -30,11 +30,7 @@ ulimit -n 8192
 SNMPID=$$
 
 killed() {
-    rm -f ${SENTINEL}
-    killall esp 1>/dev/null 2>/dev/null
-    sleep 15
-    killall -9 esp 1>/dev/null 2>/dev/null 
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 15
     exit 255
 }
 
@@ -42,11 +38,13 @@ trap "killed" SIGINT SIGTERM SIGKILL
 esp snmpid=$SNMPID 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 while [ -e ${SENTINEL} ]; do
     sleep 5
     if [ -e ${SENTINEL} ]; then
         esp snmpid=$SNMPID 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 3 - 5
initfiles/bin/init_roxie

@@ -47,11 +47,7 @@ killed() {
     if [ -n "$1" ]; then
         cd $1
     fi
-    rm -f ${SENTINEL}
-    killall roxie
-    sleep 5
-    killall -9 roxie
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -59,6 +55,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 nohup roxie --topology=RoxieTopology.xml --logfile=$logfilename --restarts=$restarts --stdlog=0 2>>$logfilename.stderr 1>>$logfilename.stdout &
 echo $! > $PID_NAME 
 wait
+rm $PID_NAME
 
 # Automatically restart roxie when it dies
 while [ -e ${SENTINEL} ]; do
@@ -68,5 +65,6 @@ while [ -e ${SENTINEL} ]; do
     nohup roxie --topology=RoxieTopology.xml --logfile=$logfilename --restarts=$restarts --stdlog=0 2>>$logfilename.stderr 1>>$logfilename.stdout &
     echo $! > $PID_NAME
     wait
+    rm $PID_NAME
 done
 

+ 3 - 2
initfiles/bin/init_sasha

@@ -34,7 +34,6 @@ ulimit -n 8192
 which_pidof
 
 killed() {
-    rm -f ${SENTINEL}
     pid=`${PIDOF} saserver`
     if [ -n "$pid" ]; then
         sasha server=. action=stop
@@ -50,7 +49,7 @@ killed() {
             kill -9 $pid
         fi
     fi
-    sleep 2
+    kill_process ${SENTINEL} ${PID_NAME} 3
     exit 255
 }
 
@@ -58,6 +57,7 @@ trap "killed" SIGINT SIGTERM SIGKILL
 saserver 1>/dev/null 2>/dev/null &
 echo $! > $PID_NAME
 wait
+rm $PID_NAME
 
 while [ -e ${SENTINEL} ]; do
     sleep 5
@@ -65,5 +65,6 @@ while [ -e ${SENTINEL} ]; do
         saserver 1>/dev/null 2>/dev/null &
         echo $! > $PID_NAME
         wait
+        rm $PID_NAME
     fi
 done

+ 4 - 1
initfiles/bin/init_thor

@@ -23,10 +23,13 @@ PID_NAME="$PID/`basename $PWD`.pid"
 INIT_PID_NAME="$PID/init_`basename $PWD`.pid"
 echo $$ > $INIT_PID_NAME
 
+export SENTINEL="thor.sentinel"
+rm -f ${SENTINEL}
+
 killed() {
         echo "Stopping"
         $deploydir/stop_thor $deploydir
-        rm $PID_NAME
+        kill_process ${SENTINEL} ${PID_NAME} 3
         exit 255
 }
 

+ 147 - 0
initfiles/examples/javascript/jsmoz.ecl

@@ -0,0 +1,147 @@
+/* Example of calling JavaScript (Mozilla SpiderMonkey) from ECL code via embedded C++
+*
+* This example evalues a JavaScript expression within an ECL transform
+*
+*/
+
+#option('compileOptions', '-I/usr/include/js/')
+#option('linkOptions', '-lmozjs185')
+
+// Embedded C++ that makes a call to a JavaScript function
+
+string jseval(varstring script, varstring a, varstring b) := BEGINC++
+
+// This section of the code should probably move to a plugin, or somesuch
+
+/* Include the JSAPI header file to get access to SpiderMonkey. */
+#include "jsapi.h"
+#include <pthread.h>
+
+/* The class of the global object. */
+static JSClass global_class = {
+    "global", JSCLASS_GLOBAL_FLAGS,
+    JS_PropertyStub, JS_PropertyStub, JS_PropertyStub, JS_StrictPropertyStub,
+    JS_EnumerateStub, JS_ResolveStub, JS_ConvertStub, JS_FinalizeStub,
+    JSCLASS_NO_OPTIONAL_MEMBERS
+};
+
+class MozJsContext
+{
+    JSRuntime *rt;
+    JSContext *cx;
+    JSObject  *global;
+
+public:
+    MozJsContext()
+    {
+        // Initialize to NULL so that initFailed can be used
+        rt = NULL;
+        cx = NULL;
+        global = NULL;
+
+        // We use a separate runtime each time - this may be a bad idea
+        rt = JS_NewRuntime(8 * 1024 * 1024);
+        if (!rt)
+            initFailed("Could not create JavaScript runtime");
+
+        // We need a context per thread - we COULD share between JS calls on a thread (using TLS, for example)
+        cx = JS_NewContext(rt, 8192);
+        if (cx == NULL)
+            initFailed("Could not create JavaScript context");
+        JS_SetOptions(cx, JSOPTION_VAROBJFIX | JSOPTION_JIT | JSOPTION_METHODJIT);
+        JS_SetVersion(cx, JSVERSION_LATEST);
+        JS_SetErrorReporter(cx, reportError);
+        /*
+         * Create the global object in a new compartment.
+         * You always need a global object per context.
+         */
+        global = JS_NewCompartmentAndGlobalObject(cx, &global_class, NULL);
+        if (global == NULL)
+            initFailed("Could not create JavaScript global object");
+        /*
+         * Populate the global object with the standard JavaScript
+         * function and object classes, such as Object, Array, Date.
+         */
+        if (!JS_InitStandardClasses(cx, global))
+            initFailed("Could not populate JavaScript global object");
+        JS_BeginRequest(cx);  // Probably not really necessary with a separate runtime per instance, but will be as soon as we change that.
+    }
+    ~MozJsContext()
+    {
+        if (global)
+            JS_EndRequest(cx);
+        if (cx)
+            JS_DestroyContext(cx);
+        if (rt)
+            JS_DestroyRuntime(rt);
+    }
+    inline operator JSContext *() const { return cx; }
+    inline JSObject *queryGlobal() const { return global; }
+private:
+    void cleanup()
+    {
+        if (cx)
+            JS_DestroyContext(cx);
+        if (rt)
+            JS_DestroyRuntime(rt);
+    }
+    void initFailed(const char *why)
+    {
+        cleanup();
+        rtlFail(0, why);
+    }
+    static void reportError(JSContext *cx, const char *message, JSErrorReport *report)
+    {
+        // MORE - need to think about what is appropriate here!
+        fprintf(stderr, "%s:%u:%s\n",
+                 report->filename ? report->filename : "<no filename=\"filename\">",
+                 (unsigned int) report->lineno,
+                 message);
+    }
+};
+
+//#body
+
+// extern  void user1(size32_t & __lenResult,char * & __result, const char * script, const char *a, const char *b) {
+{
+    MozJsContext c;
+    JSString * aa = JS_NewStringCopyZ(c, a);
+    JSString * bb = JS_NewStringCopyZ(c, b);
+    jsval rval;
+    JS_DefineProperty(c, c.queryGlobal(), "a", STRING_TO_JSVAL(aa), NULL, NULL, JSPROP_READONLY);
+    JS_DefineProperty(c, c.queryGlobal(), "b", STRING_TO_JSVAL(bb), NULL, NULL, JSPROP_READONLY);
+    JSBool ok = JS_EvaluateScript(c, c.queryGlobal(), script, strlen(script), __FILE__, __LINE__, &rval);
+    if (rval == NULL | rval == JS_FALSE)
+        rtlFail(0, "Error in JavaScript evaluation");
+    JSString *str = JS_ValueToString(c, rval);
+    const char * chars = JS_EncodeString(c, str);
+    __lenResult = strlen(chars);
+    __result = (char *) rtlMalloc(__lenResult);
+    memcpy(__result, chars, __lenResult);
+}
+ENDC++;
+
+//--------------------------------------------------------
+
+// ECL code - an input dataset with 2 records, each containing 2 strings
+// Note that it uses the threaded concat operation, to test that multi-threaded access to the JS engine works
+
+inrec := RECORD
+           string f1;
+           string f2;
+         END;
+infile1 := DATASET([{'a', 'b'}, {'c', 'd'}], inrec);
+infile2 := DATASET([{'e', 'f'}, {'g', 'h'}], inrec);
+
+// Output record has just one string, filled in from the result of the JavaScript function
+outrec := RECORD
+            string c;
+          END;
+
+outrec t(inrec L) := TRANSFORM
+  SELF.c := jseval('a + b', L.f1, L.f2)  // Evaluates JavaScript expression to concatenate strings
+END;
+
+outfile := project(infile1, t(LEFT))+project(infile2, t(LEFT));  // threaded concat operation
+
+outfile;

+ 81 - 0
initfiles/examples/javascript/jsv8.ecl

@@ -0,0 +1,81 @@
+/* Example of calling JavaScript (V8 engine) from ECL code via embedded C++
+*
+* This example evalues a JS expression within an ECL transform
+*
+*/
+
+#option('linkOptions', '-lv8')
+
+// Embedded C++ that makes a evaluates the script passed to it
+
+string jseval(varstring script, varstring a, varstring b) := BEGINC++
+
+// This section of the code should probably move to a plugin, or somesuch
+
+/* Include the JSAPI header file to get access to SpiderMonkey. */
+#include "v8.h"
+
+#body
+
+using namespace v8;
+
+// extern  void user1(size32_t & __lenResult,char * & __result, const char * script, const char *a, const char *b) {
+{
+    Isolate* isolate = Isolate::New();
+    {
+        v8::Isolate::Scope iscope(isolate);
+        // Create a stack-allocated handle scope.
+        HandleScope handle_scope;
+        Persistent<Context> context = Context::New();
+        Context::Scope context_scope(context);
+
+        // Bind the parameters into the context
+        context->Global()->Set(String::New("a"), String::New(a));
+        context->Global()->Set(String::New("b"), String::New(b));
+
+        // Create a string containing the JavaScript source code.
+        Handle<String> source = String::New(script);
+
+        // Compile the source code.
+        Handle<Script> script = Script::Compile(source);
+
+        // Run the script to get the result.
+        Handle<Value> result = script->Run();
+
+        // Dispose the persistent context.
+        context.Dispose();
+
+        // Convert the result to an ASCII string and return it.
+        String::AsciiValue ascii(result);
+        const char *chars= *ascii;
+        __lenResult = strlen(chars);
+        __result = (char *)rtlMalloc(__lenResult);
+        memcpy(__result, chars, __lenResult);
+    }
+    isolate->Dispose();
+}
+ENDC++;
+
+//--------------------------------------------------------
+
+// ECL code - an input dataset with 2 records, each containing 2 strings
+
+inrec := RECORD
+           string f1;
+           string f2;
+         END;
+infile1 := DATASET([{'a', 'b'}, {'c', 'd'}], inrec);
+infile2 := DATASET([{'e', 'f'}, {'g', 'h'}], inrec);
+
+// Output record has just one string, filled in from the result of the JavaScript function
+outrec := RECORD
+            string c;
+          END;
+
+outrec t(inrec L) := TRANSFORM
+  SELF.c := jseval('a + b', L.f1, L.f2)  // Evaluates JavaScript expression to concatenate strings
+END;
+
+outfile := project(infile1, t(LEFT))+project(infile2, t(LEFT));  // threaded concat operation
+
+outfile;

BIN
initfiles/examples/jni/JavaCat.class


+ 9 - 0
initfiles/examples/jni/JavaCat.java

@@ -0,0 +1,9 @@
+public class JavaCat
+{
+    public static String cat(String a, String b)
+    {
+        // System.out.println("In java");
+        // System.out.println(a+b);
+        return a + b;
+    }
+}

+ 28 - 9
initfiles/examples/jni/java_from_ecl.ecl

@@ -20,13 +20,17 @@
 *
 *   javap -s -p javacat
 *
-* To compile this ECL example, you need to link the JNI libraries:
-*
-* eclcc calljava.ecl -Wc,-I/usr/lib/jvm/java-6-openjdk/include/ -Wl,-L/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server/ \
-*    -Wl,-L/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/ -Wl,-ljawt -Wl,-ljvm -target=roxie
+* Note that a JVM - once loaded - cannot be successfully unloaded. Therefore this code will work
+* for repeated workunits on hthor, but on Thor and Roxie you will only be able to run once before
+* needing to restart the Thor/Roxie cluster. Moving the code into a plugin will help resolve that.
 *
 */
 
+// Set the compiler/linker options needed to ensure that the jvm is linked
+
+#option ('compileOptions', '-I/usr/lib/jvm/java-6-openjdk/include/');
+#option ('linkOptions', '-L/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server/ -L/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/ -Wl,-rpath,/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/ -Wl,-rpath,/usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server/');
+
 // Embedded C++ that makes a JNI call
 
 string cat(varstring a, varstring b) := BEGINC++
@@ -35,6 +39,8 @@ string cat(varstring a, varstring b) := BEGINC++
 
 #include <jni.h>
 #include <assert.h>
+#option library jawt
+#option library jvm
 
 static JavaVM *javaVM;       /* denotes a Java VM */
 static pthread_once_t jni_init_flag = PTHREAD_ONCE_INIT;  /* Ensures initialized just once */
@@ -44,17 +50,28 @@ static void initJNI()
     assert (!javaVM);
     JavaVMInitArgs vm_args; /* JDK/JRE 6 VM initialization arguments */
     JavaVMOption* options = new JavaVMOption[2];
-    options[0].optionString = "-Djava.class.path=.";
-    options[1].optionString = "-verbose:jni";
+    char classPath[2048];
+    strcpy(classPath,"-Djava.class.path=.:/opt/HPCCSystems/examples/jni");
+    const char *oldPath = getenv("CLASSPATH");
+    if (oldPath && *oldPath)
+    {
+        strcat(classPath, ":");
+        strcat(classPath, oldPath);
+    }
+    options[0].optionString = classPath;
+    options[1].optionString = (char *) "-verbose:jni";
     vm_args.version = JNI_VERSION_1_6;
     vm_args.nOptions = 1;  // set to 2 if you want the verbose...
     vm_args.options = options;
     vm_args.ignoreUnrecognized = false;
     /* load and initialize a Java VM, return a JNI interface pointer in env */
     JNIEnv *env;       /* receives pointer to native method interface */
-    JNI_CreateJavaVM(&javaVM, (void**)&env, &vm_args);
-
+    jint res =JNI_CreateJavaVM(&javaVM, (void**)&env, &vm_args);
     delete options;
+    if (res < 0)
+    {
+        rtlFail(res, "Couldn't create JVM");
+    }
 }
 
 static JNIEnv *getJNIEnvironment()
@@ -109,6 +126,8 @@ static void resolveJNIMethods()
     pthread_once(&jni_resolve_flag, resolveJNIMethods);
 
     JNIEnv *env = getJNIEnvironment();
+    if (!env || !JavaCat || !JavaCat_cat)
+        rtlFail(0, "Failed to resolve JNI functions");
     jstring jstrA = env->NewStringUTF(a);
     jstring jstrB = env->NewStringUTF(b);
     jstring result = (jstring) env->CallStaticObjectMethod(JavaCat, JavaCat_cat, jstrA, jstrB);
@@ -116,7 +135,7 @@ static void resolveJNIMethods()
 
     __lenResult = env->GetStringUTFLength(result);
     const char * chars =  env->GetStringUTFChars(result, NULL);
-    __result = new char(__lenResult);
+    __result = (char *)rtlMalloc(__lenResult);
     memcpy(__result, chars, __lenResult);
     env->ReleaseStringUTFChars(result, chars);
 // }

+ 152 - 0
initfiles/examples/python/python_embed.ecl

@@ -0,0 +1,152 @@
+/* Example of calling Python from ECL code via embedded C++
+*
+* This example evalues a python expression within an ECL transform
+* Because the python will be compiled every time the expression is evaluated, it
+* is likely to be less efficient than calling external python code as in the example
+* python_from_ecl
+*
+*/
+
+#option('compileOptions', '-I/usr/include/python2.7/')
+#option('linkOptions', '-lpython2.7')
+
+// Embedded C++ that makes a call to a Python function
+
+
+string pythonCat(varstring a, varstring b) := BEGINC++
+
+// This section of the code should probably move to a plugin, or somesuch
+
+#include <Python.h>
+#include <assert.h>
+#include <pthread.h>
+
+static PyObject *pFunc_cat;
+
+class PythonInitializer
+{
+    PyObject *pModule;
+    PyThreadState *tstate;
+    bool pythonInitialized;
+public:
+    PythonInitializer()
+    {
+        pModule = NULL;
+        tstate = NULL;
+        pythonInitialized = false;
+
+        // Initialize the Python Interpreter
+        Py_Initialize();
+        PyEval_InitThreads();
+        pythonInitialized = true;
+        tstate = PyEval_SaveThread();
+    }
+    ~PythonInitializer()
+    {
+        PyEval_RestoreThread(tstate);
+        // Clean up
+        if (pModule)
+            Py_DECREF(pModule);
+        // Finish the Python Interpreter
+        if (pythonInitialized)
+            Py_Finalize();
+    }
+
+};
+
+PythonInitializer __initializer;
+
+// Use class OwnedPyObject for any objects that are not 'borrowed references'
+// so that the appropriate Py_DECREF call is made when the OwnedPyObject goes
+// out of scope, even if the function returns prematurely (such as via an exception).
+// In particular, checkPythonError is a lot easier to call safely if this is used.
+
+class OwnedPyObject
+{
+    PyObject *ptr;
+public:
+    inline OwnedPyObject(PyObject *_ptr) : ptr(_ptr) {}
+    inline ~OwnedPyObject()                { if (ptr) Py_DECREF(ptr); }
+    inline PyObject * get() const          { return ptr; }
+    inline PyObject * operator -> () const { return ptr; }
+    inline operator PyObject *() const     { return ptr; }
+};
+
+// call checkPythonError to throw an exception if Python error state is set
+
+static void checkPythonError()
+{
+    PyObject* err = PyErr_Occurred();
+    if (err)
+    {
+        OwnedPyObject errStr = PyObject_Str(err);
+        PyErr_Clear();
+        rtlFail(0, PyString_AsString(errStr));
+    }
+}
+
+// The Python Global Interpreter Lock (GIL) won't know about C++-created threads, so we need to
+// call PyGILState_Ensure() and PyGILState_Release at the start and end of every function.
+// Wrapping them in a class like this ensures that the release always happens even if
+// the function exists prematurely
+
+class GILstateWrapper
+{
+    PyGILState_STATE gstate;
+public:
+    GILstateWrapper()
+    {
+        gstate = PyGILState_Ensure();
+    }
+    ~GILstateWrapper()
+    {
+        PyGILState_Release(gstate);
+    }
+};
+
+//--------------------------------------------------------
+
+#body
+
+// extern  void user1(size32_t & __lenResult,char * & __result, const char *a, const char *b) {
+{
+    GILstateWrapper gstate;
+    static OwnedPyObject pythonCode = Py_CompileString("a+b", "user1", Py_eval_input);
+
+    OwnedPyObject locals = PyDict_New ();
+    OwnedPyObject globals = PyDict_New ();
+    PyDict_SetItemString(locals, "a", PyString_FromString(a));
+    PyDict_SetItemString(locals, "b", PyString_FromString(b));
+    OwnedPyObject pResult = PyEval_EvalCode((PyCodeObject *) pythonCode.get(), locals, globals);
+    checkPythonError();
+     __lenResult = PyString_Size(pResult);
+    const char * chars =  PyString_AsString(pResult);
+    checkPythonError();
+    __result = (char *)rtlMalloc(__lenResult);
+    memcpy(__result, chars, __lenResult);
+}
+ENDC++;
+
+//--------------------------------------------------------
+
+// ECL code - an input dataset with 2 records, each containing 2 strings
+
+inrec := RECORD
+           string f1;
+           string f2;
+         END;
+infile1 := DATASET([{'a', 'b'}, {'c', 'd'}], inrec);
+infile2 := DATASET([{'e', 'f'}, {'g', 'h'}], inrec);
+
+// Output record has just one string, filled in from the result of the python function
+outrec := RECORD
+            string c;
+          END;
+
+outrec t(inrec L) := TRANSFORM
+  SELF.c := pythonCat(L.f1, L.f2)  // Evaluates python expression to concatenate strings
+END;
+
+outfile := project(infile1, t(LEFT))+project(infile2, t(LEFT));  // threaded concat operation
+
+outfile;

+ 151 - 0
initfiles/examples/python/python_embed2.ecl

@@ -0,0 +1,151 @@
+/* Example of calling Python from ECL code via embedded C++
+*
+* This example evalues a python expression within an ECL transform
+* Because the python will be compiled every time the expression is evaluated, it
+* is likely to be less efficient than calling external python code as in the example
+* python_from_ecl
+*
+*/
+
+#option('compileOptions', '-I/usr/include/python2.7/')
+#option('linkOptions', '-lpython2.7')
+
+// Embedded C++ that makes a call to a Python function
+
+
+string python(varstring expr, varstring a, varstring b) := BEGINC++
+
+// This section of the code should probably move to a plugin, or somesuch
+
+#include <Python.h>
+#include <assert.h>
+#include <pthread.h>
+
+static PyObject *pFunc_cat;
+
+class PythonInitializer
+{
+    PyObject *pModule;
+    PyThreadState *tstate;
+    bool pythonInitialized;
+public:
+    PythonInitializer()
+    {
+        pModule = NULL;
+        tstate = NULL;
+        pythonInitialized = false;
+
+        // Initialize the Python Interpreter
+        Py_Initialize();
+        PyEval_InitThreads();
+        pythonInitialized = true;
+        tstate = PyEval_SaveThread();
+    }
+    ~PythonInitializer()
+    {
+        PyEval_RestoreThread(tstate);
+        // Clean up
+        if (pModule)
+            Py_DECREF(pModule);
+        // Finish the Python Interpreter
+        if (pythonInitialized)
+            Py_Finalize();
+    }
+
+};
+
+PythonInitializer __initializer;
+
+// Use class OwnedPyObject for any objects that are not 'borrowed references'
+// so that the appropriate Py_DECREF call is made when the OwnedPyObject goes
+// out of scope, even if the function returns prematurely (such as via an exception).
+// In particular, checkPythonError is a lot easier to call safely if this is used.
+
+class OwnedPyObject
+{
+    PyObject *ptr;
+public:
+    inline OwnedPyObject(PyObject *_ptr) : ptr(_ptr) {}
+    inline ~OwnedPyObject()                { if (ptr) Py_DECREF(ptr); }
+    inline PyObject * get() const          { return ptr; }
+    inline PyObject * operator -> () const { return ptr; }
+    inline operator PyObject *() const     { return ptr; }
+};
+
+// call checkPythonError to throw an exception if Python error state is set
+
+static void checkPythonError()
+{
+    PyObject* err = PyErr_Occurred();
+    if (err)
+    {
+        OwnedPyObject errStr = PyObject_Str(err);
+        PyErr_Clear();
+        rtlFail(0, PyString_AsString(errStr));
+    }
+}
+
+// The Python Global Interpreter Lock (GIL) won't know about C++-created threads, so we need to
+// call PyGILState_Ensure() and PyGILState_Release at the start and end of every function.
+// Wrapping them in a class like this ensures that the release always happens even if
+// the function exists prematurely
+
+class GILstateWrapper
+{
+    PyGILState_STATE gstate;
+public:
+    GILstateWrapper()
+    {
+        gstate = PyGILState_Ensure();
+    }
+    ~GILstateWrapper()
+    {
+        PyGILState_Release(gstate);
+    }
+};
+
+//--------------------------------------------------------
+
+#body
+
+// extern  void user1(size32_t & __lenResult,char * & __result, const char * python, const char *a, const char *b) {
+{
+    GILstateWrapper gstate;
+    checkPythonError();
+    OwnedPyObject locals = PyDict_New ();
+    OwnedPyObject globals = PyDict_New ();
+    PyDict_SetItemString(locals, "a", PyString_FromString(a));
+    PyDict_SetItemString(locals, "b", PyString_FromString(b));
+    OwnedPyObject pResult = PyRun_String(expr, Py_eval_input, locals, globals);
+    checkPythonError();
+     __lenResult = PyString_Size(pResult);
+    const char * chars =  PyString_AsString(pResult);
+    checkPythonError();
+    __result = (char *)rtlMalloc(__lenResult);
+    memcpy(__result, chars, __lenResult);
+}
+ENDC++;
+
+//--------------------------------------------------------
+
+// ECL code - an input dataset with 2 records, each containing 2 strings
+
+inrec := RECORD
+           string f1;
+           string f2;
+         END;
+infile1 := DATASET([{'a', 'b'}, {'c', 'd'}], inrec);
+infile2 := DATASET([{'e', 'f'}, {'g', 'h'}], inrec);
+
+// Output record has just one string, filled in from the result of the python function
+outrec := RECORD
+            string c;
+          END;
+
+outrec t(inrec L) := TRANSFORM
+  SELF.c := python('a + b', L.f1, L.f2)  // Evaluates python expression to concatenate strings
+END;
+
+outfile := project(infile1, t(LEFT))+project(infile2, t(LEFT));  // threaded concat operation
+
+outfile;

+ 83 - 68
initfiles/examples/python/python_from_ecl.ecl

@@ -5,14 +5,17 @@
 * def cat(a, b):
 *   return a + b
 *
-* To compile this ECL example, you need to link the Python libraries:
+* Note that you may need to change the line that sets the Python sys.path, if you extend this
+* code to use other python examples, or it you are running on a system other than a standard HPCC
+* install on Linux
 *
-* eclcc python_from_ecl.ecl -Wc,-I/usr/include/python2.7/ -Wl,-lpython2.7 -target=hthor
-*
-* To run it, ensure that PYTHONPATH is set such that python_cat.py can be located
 */
 
-// Embedded C++ that makes a JNI call
+#option('compileOptions', '-I/usr/include/python2.7/')
+#option('linkOptions', '-lpython2.7')
+
+// Embedded C++ that makes a call to a Python function
+
 
 string cat(varstring a, varstring b) := BEGINC++
 
@@ -20,64 +23,92 @@ string cat(varstring a, varstring b) := BEGINC++
 
 #include <Python.h>
 #include <assert.h>
+#include <pthread.h>
 
-static pthread_once_t python_resolve_flag = PTHREAD_ONCE_INIT;  /* Ensures called just once */
-static PyObject *pName, *pModule, *pFunc_cat;
-static bool pythonInitialized = false;
+static PyObject *pFunc_cat;
 
-static void resolvePythonFunctions()
+class PythonInitializer
 {
-    /* Do all the function resolution just the once... */
-    PyObject *pDict;
-
-    // Initialize the Python Interpreter
-    Py_Initialize();
-    PyEval_InitThreads();
-    pythonInitialized = true;
-
-    // Build the name object
-    pName = PyString_FromString("python_cat");
-
-    // Load the module object
-    pModule = PyImport_Import(pName);
-    if (pModule == NULL)
+    PyObject *pModule;
+    PyThreadState *tstate;
+    bool pythonInitialized;
+public:
+    PythonInitializer()
     {
-        PyErr_Print();
+        pModule = NULL;
+        tstate = NULL;
+        pythonInitialized = false;
+
+        // Initialize the Python Interpreter
+        Py_Initialize();
+        PyEval_InitThreads();
+        pythonInitialized = true;
+        resolvePythonFunctions();
+        tstate = PyEval_SaveThread();
     }
-    else
+    ~PythonInitializer()
     {
-        // pDict is a borrowed reference
-        pDict = PyModule_GetDict(pModule);
-        // pFunc_cat is also a borrowed reference
-        pFunc_cat = PyDict_GetItemString(pDict, "cat");
-        if (!pFunc_cat || !PyCallable_Check(pFunc_cat))
+        PyEval_RestoreThread(tstate);
+        // Clean up
+        if (pModule)
+            Py_DECREF(pModule);
+        // Finish the Python Interpreter
+        if (pythonInitialized)
+            Py_Finalize();
+    }
+
+    void resolvePythonFunctions()
+    {
+        PySys_SetPath("/opt/HPCCSystems/examples/python/");    // Set this to where you want to pick up python_cat.py from
+        pModule = PyImport_ImportModule("python_cat");
+        if (pModule == NULL)
         {
             PyErr_Print();
-            pFunc_cat = NULL;
+        }
+        else
+        {
+            // pDict is a borrowed reference
+            PyObject *pDict = PyModule_GetDict(pModule);
+            // pFunc_cat is also a borrowed reference
+            pFunc_cat = PyDict_GetItemString(pDict, "cat");
+            if (!pFunc_cat || !PyCallable_Check(pFunc_cat))
+            {
+                PyErr_Print();
+                pFunc_cat = NULL;
+            }
         }
     }
-    PyEval_ReleaseLock();
-}
 
-static void finishPython()
+};
+
+PythonInitializer __initializer;
+
+// Use class OwnedPyObject for any objects that are not 'borrowed references'
+// so that the appropriate Py_DECREF call is made when the OwnedPyObject goes
+// out of scope, even if the function returns prematurely (such as via an exception).
+// In particular, checkPythonError is a lot easier to call safely if this is used.
+
+class OwnedPyObject
 {
-    // Clean up
-    if (pModule)
-        Py_DECREF(pModule);
-    if (pName)
-        Py_DECREF(pName);
-    // Finish the Python Interpreter
-    if (pythonInitialized)
-        Py_Finalize();
-}
+    PyObject *ptr;
+public:
+    inline OwnedPyObject(PyObject *_ptr) : ptr(_ptr) {}
+    inline ~OwnedPyObject()                { if (ptr) Py_DECREF(ptr); }
+    inline PyObject * get() const          { return ptr; }
+    inline PyObject * operator -> () const { return ptr; }
+    inline operator PyObject *() const     { return ptr; }
+};
+
+// call checkPythonError to throw an exception if Python error state is set
 
 static void checkPythonError()
 {
     PyObject* err = PyErr_Occurred();
     if (err)
     {
-        PyErr_Print();
-        rtlFail(0, "Unexpected failure"); // MORE - should probably get some info out of PyError rather than just printing it
+        OwnedPyObject errStr = PyObject_Str(err);
+        PyErr_Clear();
+        rtlFail(0, PyString_AsString(errStr));
     }
 }
 
@@ -100,41 +131,25 @@ public:
     }
 };
 
-// Use class OwnedPyObject for any objects that are not 'borrowed references'
-// so that the appropriate Py_DECREF call is made when the OwnedPyObject goes
-// out of scope, even if the function returns prematurely (such as via an exception).
-// In particular, checkPythonError is a lot easier to call safely if this is used.
-
-class OwnedPyObject
-{
-    PyObject *ptr;
-public:
-    inline OwnedPyObject(PyObject *_ptr) : ptr(_ptr) {}
-    inline ~OwnedPyObject()                { if (ptr) Py_DECREF(ptr); }
-    inline PyObject * operator -> () const { return ptr; }
-    inline operator PyObject *() const     { return ptr; }
-};
-
 //--------------------------------------------------------
 
 #body
 
 // extern  void user1(size32_t & __lenResult,char * & __result,const char * a,const char * b) {
-    pthread_once(&python_resolve_flag, resolvePythonFunctions);
+{
     if (!pFunc_cat)
        rtlFail(0, "Could not resolve python functions");
-    GILstateWrapper gstate; // Ensure that we play nice with Python threads
-
+    GILstateWrapper gstate;
     OwnedPyObject pArgs = Py_BuildValue("s,s", a, b);
     checkPythonError();
     OwnedPyObject pResult = PyObject_CallObject(pFunc_cat, pArgs);
     checkPythonError();
-
-    __lenResult = PyString_Size(pResult);
+     __lenResult = PyString_Size(pResult);
     const char * chars =  PyString_AsString(pResult);
-    __result = new char(__lenResult);
+    checkPythonError();
+    __result = (char *)rtlMalloc(__lenResult);
     memcpy(__result, chars, __lenResult);
-// }
+}
 ENDC++;
 
 //--------------------------------------------------------
@@ -148,7 +163,7 @@ inrec := RECORD
 infile1 := DATASET([{'a', 'b'}, {'c', 'd'}], inrec);
 infile2 := DATASET([{'e', 'f'}, {'g', 'h'}], inrec);
 
-// Output record has just one string, filled in from the result of the java function
+// Output record has just one string, filled in from the result of the python function
 outrec := RECORD
             string c;
           END;

+ 149 - 0
initfiles/examples/python/python_from_ecl2.ecl

@@ -0,0 +1,149 @@
+/* Example of calling Python from ECL code via embedded C++
+*
+* This example evalues a python expression within an ECL transform
+* Because the python will be compiled every time the expression is evaluated, it
+* is likely to be less efficient than calling external python code as in the example
+* python_from_ecl
+*
+*/
+
+#option('compileOptions', '-I/usr/include/python2.7/')
+#option('linkOptions', '-lpython2.7')
+
+// Embedded C++ that makes a call to a Python function
+
+
+string python(varstring expr) := BEGINC++
+
+// This section of the code should probably move to a plugin, or somesuch
+
+#include <Python.h>
+#include <assert.h>
+#include <pthread.h>
+
+static PyObject *pFunc_cat;
+
+class PythonInitializer
+{
+    PyObject *pModule;
+    PyThreadState *tstate;
+    bool pythonInitialized;
+public:
+    PythonInitializer()
+    {
+        pModule = NULL;
+        tstate = NULL;
+        pythonInitialized = false;
+
+        // Initialize the Python Interpreter
+        Py_Initialize();
+        PyEval_InitThreads();
+        pythonInitialized = true;
+        tstate = PyEval_SaveThread();
+    }
+    ~PythonInitializer()
+    {
+        PyEval_RestoreThread(tstate);
+        // Clean up
+        if (pModule)
+            Py_DECREF(pModule);
+        // Finish the Python Interpreter
+        if (pythonInitialized)
+            Py_Finalize();
+    }
+
+};
+
+PythonInitializer __initializer;
+
+// Use class OwnedPyObject for any objects that are not 'borrowed references'
+// so that the appropriate Py_DECREF call is made when the OwnedPyObject goes
+// out of scope, even if the function returns prematurely (such as via an exception).
+// In particular, checkPythonError is a lot easier to call safely if this is used.
+
+class OwnedPyObject
+{
+    PyObject *ptr;
+public:
+    inline OwnedPyObject(PyObject *_ptr) : ptr(_ptr) {}
+    inline ~OwnedPyObject()                { if (ptr) Py_DECREF(ptr); }
+    inline PyObject * get() const          { return ptr; }
+    inline PyObject * operator -> () const { return ptr; }
+    inline operator PyObject *() const     { return ptr; }
+};
+
+// call checkPythonError to throw an exception if Python error state is set
+
+static void checkPythonError()
+{
+    PyObject* err = PyErr_Occurred();
+    if (err)
+    {
+        OwnedPyObject errStr = PyObject_Str(err);
+        PyErr_Clear();
+        rtlFail(0, PyString_AsString(errStr));
+    }
+}
+
+// The Python Global Interpreter Lock (GIL) won't know about C++-created threads, so we need to
+// call PyGILState_Ensure() and PyGILState_Release at the start and end of every function.
+// Wrapping them in a class like this ensures that the release always happens even if
+// the function exists prematurely
+
+class GILstateWrapper
+{
+    PyGILState_STATE gstate;
+public:
+    GILstateWrapper()
+    {
+        gstate = PyGILState_Ensure();
+    }
+    ~GILstateWrapper()
+    {
+        PyGILState_Release(gstate);
+    }
+};
+
+//--------------------------------------------------------
+
+#body
+
+// extern  void user1(size32_t & __lenResult,char * & __result, const char * expr) {
+{
+    GILstateWrapper gstate;
+    checkPythonError();
+    OwnedPyObject locals = PyDict_New ();
+    OwnedPyObject globals = PyDict_New ();
+    OwnedPyObject pResult = PyRun_String(expr, Py_eval_input, locals, globals);
+    checkPythonError();
+     __lenResult = PyString_Size(pResult);
+    const char * chars =  PyString_AsString(pResult);
+    checkPythonError();
+    __result = (char *)rtlMalloc(__lenResult);
+    memcpy(__result, chars, __lenResult);
+}
+ENDC++;
+
+//--------------------------------------------------------
+
+// ECL code - an input dataset with 2 records, each containing 2 strings
+
+inrec := RECORD
+           string f1;
+           string f2;
+         END;
+infile1 := DATASET([{'a', 'b'}, {'c', 'd'}], inrec);
+infile2 := DATASET([{'e', 'f'}, {'g', 'h'}], inrec);
+
+// Output record has just one string, filled in from the result of the python function
+outrec := RECORD
+            string c;
+          END;
+
+outrec t(inrec L) := TRANSFORM
+  SELF.c := python('"' + L.f1 + '" + "' + L.f2 + '"')  // Evaluates python expression to concatenate strings
+END;
+
+outfile := project(infile1, t(LEFT))+project(infile2, t(LEFT));  // threaded concat operation
+
+outfile;

+ 24 - 10
initfiles/sbin/hpcc_setenv.in

@@ -22,17 +22,31 @@
 
 ###<REPLACE>###
 
+function kill_process () {
+    SENTINEL=$1
+    PID=$2
+    TIMEOUT=$3
+    if [ -e $SENTINEL ]; then
+        rm -f $SENTINEL
+    fi
+    if [ -e $PID ]; then
+        pidwait_fn $PID $TIMEOUT
+    fi
+}
+
 function pidwait_fn () {
-        WATCH_PID=$1
-        TIMEOUT=$(($2*1000))
-        kill $WATCH_PID
-        while [ -d /proc/$WATCH_PID -a $TIMEOUT -gt 0 ]; do
-                sleep 0.2
-                TIMEOUT=$(($TIMEOUT-200))
-        done
-        if [ $TIMEOUT -le 0 ]; then
-                kill -9 $WATCH_PID
-        fi
+    PID=$1
+    WATCH_PID=`cat $PID`
+    TIMEOUT=$(($2*1000))
+    kill $WATCH_PID
+    while [ -d /proc/$WATCH_PID -a $TIMEOUT -gt 0 ]; do
+            sleep 0.2
+            TIMEOUT=$(($TIMEOUT-200))
+    done
+    if [ $TIMEOUT -le 0 ]; then
+            kill -9 $WATCH_PID
+    fi
+    rm -f $PID
 }
 
 platform='unknown'

+ 33 - 131
roxie/ccd/ccdfile.cpp

@@ -398,7 +398,7 @@ public:
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
 
     virtual const char *queryFilename() { return logical->queryFilename(); }
-    virtual bool IsShared() const { return CInterface::IsShared(); }
+    virtual bool isAlive() const { return CInterface::isAlive(); }
     virtual int getLinkCount() const { return CInterface::getLinkCount(); }
     virtual RoxieFileType getFileType() { return fileType; }
 
@@ -515,7 +515,7 @@ typedef MapStringTo<StringArrayPtr> MapStringToDiffFileUsage;
 
 class CRoxieFileCache : public CInterface, implements ICopyFileProgress, implements IRoxieFileCache
 {
-    ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
+    mutable ICopyArrayOf<ILazyFileIO> todo; // Might prefer a queue but probably doesn't really matter.
     InterruptableSemaphore toCopy;
     InterruptableSemaphore toClose;
     mutable CopyMapStringToMyClass<ILazyFileIO> files;
@@ -1011,10 +1011,14 @@ public:
                         break;
                     if (todo.ordinality())
                     {
-                        next.set(&todo.pop());
+                        ILazyFileIO *popped = &todo.pop();
+                        if (popped->isAlive())
+                        {
+                            next.set(popped);
+                            if (next)
+                                currentTodoFile.append(next->queryFilename());
+                        }
                         atomic_dec(&numFilesToProcess);    // must decrement counter for SNMP accuracy
-                        if (next)
-                            currentTodoFile.append(next->queryFilename());
                     }
                 }
                 if (next)
@@ -1131,6 +1135,13 @@ public:
         ILazyFileIO *goer = files.getValue(filename);
         if (goer == file)
             files.remove(filename);
+        ForEachItemInRev(idx, todo)
+        {
+            if (file == &todo.item(idx))
+            {
+                todo.remove(idx);
+            }
+        }
     }
 
     virtual ILazyFileIO *lookupFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const char *baseIndexFileName,  ILazyFileIO *patchFile, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &deployedLocationInfo, offset_t size, const CDateTime &modified, bool memFile, bool isRemote, bool startFileCopy, bool doForegroundCopy, unsigned crc, bool isCompressed, const char *lookupDali)
@@ -1140,45 +1151,30 @@ public:
         {
             CriticalBlock b(crit);
             ILazyFileIO *f = files.getValue(localLocation);
-            if (f)
+            if (f && f->isAlive())
             {
                 if ((size != -1 && size != f->getSize()) ||
                     (!modified.isNull() && !modified.equals(*f->queryDateTime(), false)))
                 {
-                    if (!f->IsShared())
-                    {
-                        // kill it
-                        files.remove(localLocation);
-                        ForEachItemInRev(idx, todo)
-                        {
-                            if (f == &todo.item(idx))
-                            {
-                                todo.remove(idx);
-                            }
-                        }
-                    }
-                    else
+                    StringBuffer modifiedDt;
+                    if (!modified.isNull())
+                        modified.getString(modifiedDt);
+                    StringBuffer fileDt;
+                    f->queryDateTime()->getString(fileDt);
+                    if (fileErrorList.find(id) == 0)
                     {
-                        StringBuffer modifiedDt;
-                        if (!modified.isNull())
-                            modified.getString(modifiedDt);
-                        StringBuffer fileDt;
-                        f->queryDateTime()->getString(fileDt);
-                        if (fileErrorList.find(id) == 0)
+                        switch (fileType)
                         {
-                            switch (fileType)
-                            {
-                                case ROXIE_KEY:
-                                    fileErrorList.setValue(id, "Key");
-                                    break;
+                            case ROXIE_KEY:
+                                fileErrorList.setValue(id, "Key");
+                                break;
 
-                                case ROXIE_FILE:
-                                    fileErrorList.setValue(id, "File");
-                                    break;
-                            }
+                            case ROXIE_FILE:
+                                fileErrorList.setValue(id, "File");
+                                break;
                         }
-                        throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d  Date = %s  %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
                     }
+                    throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d  Date = %s  %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
                 }
                 else
                     return LINK(f);
@@ -1252,7 +1248,7 @@ public:
         {
             CriticalBlock b(crit);
             ret = files.getValue(localLocation);
-            if (ret)
+            if (ret && ret->isAlive())
             {
                 if (crc)
                 {
@@ -1299,26 +1295,6 @@ public:
         return ret;
     }
 
-    virtual IFileIO *lookupPluginFile(const char* dllname, const char *localLocation)
-    {
-        // plugins should be copied via deployment tool, not by roxie, so they should already be copied to the node - error if not
-        ILazyFileIO *ret;
-        {
-            CriticalBlock b(crit);
-            ret = files.getValue(localLocation);
-            if (ret) // already opened, valid and increase LINK count
-            {
-                // MORE - need to check version label
-                return LINK(ret);
-            }
-            
-            ret = openPlugin(dllname, localLocation);
-            files.setValue(localLocation, ret);
-        }
-        ret->checkOpen();
-        return ret;
-    }
-
     virtual void closeExpired(bool remote)
     {
         // This schedules a close at the next available opportunity
@@ -1343,7 +1319,7 @@ public:
         ForEach(h)
         {
             ILazyFileIO *f = files.mapToValue(&h.query());
-            if (f->isOpen() && f->isRemote()==remote && !f->isCopying())
+            if (f->isAlive() && f->isOpen() && f->isRemote()==remote && !f->isCopying())
             {
                 unsigned age = msTick() - f->getLastAccessed();
                 if (age > maxFileAge[remote])
@@ -1430,80 +1406,6 @@ public:
         }
     }
 
-    virtual void flushUnused(bool deleteFiles, bool cleanUpOneTimeQueries)
-    {
-        CriticalBlock b(crit);
-        // Everything currently in the queue should be removed if it is only in the queue and the cache.
-        ForEachItemInRev(idx, todo)
-        {
-            ILazyFileIO *f = &todo.item(idx);
-            if (f)  // make sure it hasn't already been blanked out by prior call to flushUnused
-            {
-                if (!f->IsShared())
-                    todo.remove(idx);
-            }
-        }
-
-        IArrayOf<ILazyFileIO> goers;
-        HashIterator h(files);
-        StringAttrMapping metaFileGoers;
-
-        ForEach(h)
-        {
-            // checking if file is shared, but count is 1 and names match - if so it means the only use of this file 
-            // is the copy, so we can remove the file
-            ILazyFileIO *f = files.mapToValue(&h.query());
-            if (!f->IsShared())
-                goers.append(*LINK(f));
-            else if (f->getLinkCount() == 2 && (stricmp(f->queryFilename(), currentTodoFile.str()) == 0) && deleteFiles)
-            {
-                // cannot be in the middle of copying a dll so don't worry about cleanUpOneTimeQueries
-                // this file is in the middle of being copied, get rid of it...
-                needToDeleteFile = true;
-                goers.append(*LINK(f));
-
-                DBGLOG("NAME = %s", f->queryFilename());
-            }
-        }
-
-        ForEachItemInRev(idx1, goers)
-        {
-            ILazyFileIO *item = &goers.item(idx1);
-            StringBuffer goer(item->queryFilename());
-
-            RoxieFileType type = item->getFileType();
-
-            if (cleanUpOneTimeQueries)  // only want to delete dlls
-            {
-                if (type != ROXIE_WU_DLL)
-                    continue;
-            }
-            
-            if (!files.remove(goer))
-                DBGLOG("ERROR - file was not removed from cache %s", goer.str());
-            else if (deleteFiles || type == ROXIE_WU_DLL)  // always want to delete WU dlls
-            {
-                bool isRemote = item->isRemote();
-
-                goers.remove(idx1);   // remove so we can delete the file if needed
-                if ((!isRemote) && (type != ROXIE_PLUGIN_DLL))
-                {
-                    DBGLOG("trying to delete - file %s", goer.str());
-                    try
-                    {
-                        OwnedIFile unneededFile = createIFile(goer.str());
-                        unneededFile->remove();
-                    }
-                    catch (IException *E)
-                    {
-                        EXCLOG(MCoperatorError, E, "While trying to delete a file");
-                        E->Release();
-                    }
-                }
-            }
-        }
-    }
-    
     int numFilesToCopy()
     {
         CriticalBlock b(crit);

+ 1 - 3
roxie/ccd/ccdfile.hpp

@@ -31,7 +31,7 @@ interface ILazyFileIO : extends IFileIO
 {
     virtual const char *queryFilename() = 0;
     virtual void checkOpen() = 0;
-    virtual bool IsShared() const = 0;
+    virtual bool isAlive() const = 0;
     virtual void addSource(IFile *source) = 0;
     virtual bool isRemote() = 0;
     virtual offset_t getSize() = 0;
@@ -66,8 +66,6 @@ interface IRoxieFileCache : extends IInterface
 {
     virtual ILazyFileIO *lookupFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const char *baseIndexFileName, ILazyFileIO *patchFile, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &deployedLocationInfo, offset_t size, const CDateTime &modified, bool memFile, bool isRemote, bool startFileCopy, bool doForegroundCopy, unsigned crc, bool isCompressed, const char *lookupDali) = 0;
     virtual IFileIO *lookupDllFile(const char* dllname, const char *localLocation, const StringArray &remoteNames, unsigned crc, bool isRemote) = 0;
-    virtual IFileIO *lookupPluginFile(const char* dllname, const char *localLocation) = 0;
-    virtual void flushUnused(bool deleteFiles, bool cleanUpOneTimeQueries) = 0;
     virtual RoxieFileStatus fileUpToDate(IFile *f, RoxieFileType fileType, offset_t size, const CDateTime &modified, unsigned crc, const char* id, bool isCompressed) = 0;
     virtual int numFilesToCopy() = 0;
     virtual void closeExpired(bool remote) = 0;

+ 5 - 3
rtl/eclrtl/rtlds.cpp

@@ -63,16 +63,18 @@ void RtlDatasetBuilder::ensure(size32_t required)
     if (required > maxSize)
     {
         maxSize = getNextSize(maxSize, required);
-        buffer = (byte *)realloc(buffer, maxSize);
-        if (!buffer)
+        byte * newbuffer = (byte *)realloc(buffer, maxSize);
+        if (!newbuffer)
             throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
+        buffer = newbuffer;
     }
+    self = buffer + totalSize;
 }
 
 byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
 {
     ensure(totalSize + required);
-    return buffer + totalSize;
+    return self; // self is updated by ensure()
 }
 
 void RtlDatasetBuilder::flushDataset()

+ 1 - 1
system/jlib/jlog.ipp

@@ -604,7 +604,7 @@ public:
     int                       flush() { CriticalBlock block(crit); return fflush(handle); }
     char const *              disable();
     void                      enable();
-    bool                      getLogName(StringBuffer &name) const { name.append(filename); return true; }
+    bool                      getLogName(StringBuffer &name) const { CriticalBlock block(crit); name.append(filename); return true; }
 protected:
     void                      checkRollover() const;
     void                      doRollover(bool daily, const char *forceName = NULL) const;

+ 24 - 9
system/jlib/jstring.cpp

@@ -1175,19 +1175,34 @@ void StringAttr::setown(const char * _text)
   text = (char *)_text;
 }
 
+void StringAttr::toLowerCase()
+{
+    if (text)
+    {
+        char * cur = text;
+        char next;
+        while ((next = *cur) != 0)
+        {
+            if (isupper(next))
+                *cur = tolower(next);
+            cur++;
+        }
+    }
+}
+
 void StringAttr::toUpperCase()
 {
-  if (text)
-  {
-    char * cur = text;
-    char next;
-    while ((next = *cur) != 0)
+    if (text)
     {
-      if (islower(next))
-        *cur = toupper(next);
-      cur++;
+        char * cur = text;
+        char next;
+        while ((next = *cur) != 0)
+        {
+            if (islower(next))
+              *cur = toupper(next);
+            cur++;
+        }
     }
-  }
 }
 
 

+ 1 - 0
system/jlib/jstring.hpp

@@ -252,6 +252,7 @@ public:
     void         set(const char * _text);
     void         setown(const char * _text);
     void         set(const char * _text, unsigned _len);
+    void         toLowerCase();
     void         toUpperCase();
     
 private:

+ 24 - 14
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -287,7 +287,7 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
         PointerIArrayOf<CSendBucket> buckets;
         UnsignedArray candidates;
         size32_t totalSz;
-        bool senderFull, doDedup;
+        bool senderFull, doDedup, aborted;
         Semaphore senderFullSem;
         Linked<IException> exception;
         OwnedMalloc<bool> senderFinished;
@@ -309,11 +309,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             numFinished = 0;
             dedupSamples = dedupSuccesses = 0;
             doDedup = owner.doDedup;
-            writerPool.setown(createThreadPool("HashDist writer pool", this, &owner, owner.writerPoolSize, 5*60*1000));
+            writerPool.setown(createThreadPool("HashDist writer pool", this, this, owner.writerPoolSize, 5*60*1000));
             self = owner.activity->queryJob().queryMyRank()-1;
             for (n=0; n<owner.numnodes; n++)
                 pendingBuckets.append(new CSendBucketQueue);
             numActiveWriters = 0;
+            aborted = false;
         }
 
         void dedup(CSendBucket *sendBucket)
@@ -592,6 +593,8 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
                             }
                         }
                     }
+                    if (aborted)
+                        break;
                     const void *row = input->ungroupedNextRow();
                     if (!row)
                         break;
@@ -619,22 +622,25 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
             }
             catch (IException *e)
             {
-                ActPrintLog(owner.activity, e, "HDIST: sendloop");
+                ActPrintLog(owner.activity, e, "HDIST: sender.process");
                 owner.fireException(e);
             }
 
             ActPrintLog(owner.activity, "Distribute send finishing");
-            // send remainder
-            Owned<IShuffledIterator> iter = createShuffledIterator(owner.numnodes);
-            ForEach(*iter)
-            {
-                unsigned dest=iter->get();
-                Owned<CSendBucket> bucket = getBucketClear(dest);
-                HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
-                if (bucket && bucket->querySize())
+            if (!aborted)
+            {
+                // send remainder
+                Owned<IShuffledIterator> iter = createShuffledIterator(owner.numnodes);
+                ForEach(*iter)
                 {
-                    HDSendPrintLog3("Sending last bucket(s): %d, size = %d", bucket->queryDestination(), bucket->querySize());
-                    add(bucket.getClear());
+                    unsigned dest=iter->get();
+                    Owned<CSendBucket> bucket = getBucketClear(dest);
+                    HDSendPrintLog4("Looking at last bucket(%d): %d, size = %d", dest, bucket.get()?bucket->queryDestination():0, bucket.get()?bucket->querySize():-1);
+                    if (bucket && bucket->querySize())
+                    {
+                        HDSendPrintLog3("Sending last bucket(s): %d, size = %d", bucket->queryDestination(), bucket->querySize());
+                        add(bucket.getClear());
+                    }
                 }
             }
             ActPrintLog(owner.activity, "HDIST: waiting for threads");
@@ -652,8 +658,12 @@ class CDistributorBase : public CSimpleInterface, implements IHashDistributor, i
     // IExceptionHandler impl.
         virtual bool fireException(IException *e)
         {
-            if (!exception.get())
+            if (!aborted)
+            {
                 exception.set(e);
+                aborted = true;
+                senderFullSem.signal(); // send regardless, because senderFull could be about to be set.
+            }
             return owner.fireException(e);
         }
         friend class CWriteHandler;

+ 63 - 6
thorlcr/graph/thgraphmaster.cpp

@@ -1268,16 +1268,57 @@ CJobMaster::~CJobMaster()
     tmpHandler.clear();
 }
 
+static IException *createBCastException(unsigned slave, const char *errorMsg)
+{
+    // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
+    StringBuffer msg("General failure communicating to slave");
+    if (slave)
+        msg.append("(").append(slave).append(") ");
+    else
+        msg.append("s ");
+    Owned<IThorException> e = MakeThorException(0, "%s", msg.append(" [").append(errorMsg).append("]").str());
+    e->setAction(tea_shutdown);
+    return e.getClear();
+}
+
 void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned timeout, const char *errorMsg, mptag_t *_replyTag, bool sendOnly)
 {
     mptag_t replyTag = createReplyTag();
     msg.setReplyTag(replyTag);
-    if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
+    if (globals->getPropBool("@broadcastSendAsync", true)) // only here in case of problems/debugging.
+    {
+        class CSendAsyncfor : public CAsyncFor
+        {
+            CJobMaster &job;
+            CMessageBuffer &msg;
+            mptag_t mptag;
+            unsigned timeout;
+            StringAttr errorMsg;
+        public:
+            CSendAsyncfor(CJobMaster &_job, CMessageBuffer &_msg, mptag_t _mptag, unsigned _timeout, const char *_errorMsg)
+                : job(_job), msg(_msg), mptag(_mptag), timeout(_timeout), errorMsg(_errorMsg)
+            {
+            }
+            void Do(unsigned i)
+            {
+                if (!job.queryJobComm().send(msg, i+1, mptag, timeout))
+                    throw createBCastException(i+1, errorMsg);
+            }
+        } afor(*this, msg, mptag, timeout, errorMsg);
+        try
+        {
+            afor.For(querySlaves(), querySlaves());
+        }
+        catch (IException *e)
+        {
+            EXCLOG(e, "broadcastSendAsync");
+            abort(e);
+            throw;
+        }
+    }
+    else if (!queryJobComm().send(msg, RANK_ALL_OTHER, mptag, timeout))
     {
-        // think this should always be fatal, could check link down here, or in general and flag as _shutdown.
-        StringBuffer msg("General failure communicating to slaves [");
-        Owned<IThorException> e = MakeThorException(0, "%s", msg.append(errorMsg).append("]").str());
-        e->setAction(tea_shutdown);
+        Owned<IException> e = createBCastException(0, errorMsg);
         EXCLOG(e, NULL);
         abort(e);
         throw e.getClear();
@@ -1286,6 +1327,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
     if (_replyTag)
         *_replyTag = replyTag;
     unsigned respondents = 0;
+    Owned<IBitSet> bitSet = createBitSet();
     loop
     {
         rank_t sender;
@@ -1294,7 +1336,21 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
         {
             if (_replyTag) _replyTag = NULL;
             StringBuffer tmpStr;
-            Owned<IException> e = MakeThorFatal(NULL, 0, "%s - Timeout receiving from slaves", errorMsg?tmpStr.append(": ").append(errorMsg).str():"");
+            if (errorMsg)
+                tmpStr.append(": ").append(errorMsg).append(" - ");
+            tmpStr.append("Timeout receiving from slaves - no reply from: [");
+            unsigned s = bitSet->scan(0, false);
+            assertex(s<querySlaves()); // must be at least one
+            tmpStr.append(s+1);
+            loop
+            {
+                s = bitSet->scan(s+1, false);
+                if (s>=querySlaves())
+                    break;
+                tmpStr.append(",").append(s+1);
+            }
+            tmpStr.append("]");
+            Owned<IException> e = MakeThorFatal(NULL, 0, " %s", tmpStr.str());
             EXCLOG(e, NULL);
             throw e.getClear();
         }
@@ -1308,6 +1364,7 @@ void CJobMaster::broadcastToSlaves(CMessageBuffer &msg, mptag_t mptag, unsigned
             throw e.getClear();
         }
         ++respondents;
+        bitSet->set((unsigned)sender-1);
         if (respondents == querySlaveGroup().ordinality())
             break;
     }