Pārlūkot izejas kodu

Merge branch 'candidate-6.0.4'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 gadi atpakaļ
vecāks
revīzija
5efc5ddbc0
42 mainītis faili ar 263 papildinājumiem un 99 dzēšanām
  1. 3 1
      common/workunit/workunit.cpp
  2. 3 1
      ecl/hthor/hthor.cpp
  3. 1 0
      ecl/regress/regex4.ecl
  4. 1 0
      ecl/regress/regex5.ecl
  5. 1 0
      esp/esdllib/CMakeLists.txt
  6. 8 5
      esp/esdllib/esdl_transformer2.cpp
  7. 4 2
      initfiles/bash/etc/init.d/dafilesrv.in
  8. 5 4
      initfiles/bash/etc/init.d/hpcc-init.in
  9. 7 4
      initfiles/bash/etc/init.d/hpcc_common.in
  10. 0 2
      initfiles/bin/init_configesp
  11. 0 2
      initfiles/bin/init_dafilesrv.in
  12. 0 2
      initfiles/bin/init_dali
  13. 0 2
      initfiles/bin/init_dfuserver
  14. 0 2
      initfiles/bin/init_eclagent.in
  15. 0 2
      initfiles/bin/init_eclccserver
  16. 0 2
      initfiles/bin/init_eclscheduler
  17. 0 2
      initfiles/bin/init_roxie
  18. 0 2
      initfiles/bin/init_sasha
  19. 2 4
      initfiles/bin/init_thor
  20. 4 0
      initfiles/componentfiles/configxml/buildsetCC.xml.in
  21. 13 0
      initfiles/etc/DIR_NAME/environment.xml.in
  22. 3 0
      roxie/ccd/ccdcontext.cpp
  23. 4 4
      roxie/roxiemem/roxiemem.cpp
  24. 3 3
      rtl/eclrtl/eclregex.cpp
  25. 1 1
      testing/regress/ecl/jsonfetch.ecl
  26. 1 1
      testing/regress/ecl/key/jsonfetch.xml
  27. 5 0
      testing/regress/ecl/key/nwaytest.xml
  28. 1 1
      testing/regress/ecl/key/xmlfetch2.xml
  29. 47 0
      testing/regress/ecl/nwaytest.ecl
  30. 3 0
      testing/regress/ecl/stepping2.ecl
  31. 1 1
      testing/regress/ecl/xmlfetch2.ecl
  32. 43 6
      thorlcr/activities/funnel/thfunnelslave.cpp
  33. 2 2
      thorlcr/activities/join/thjoinslave.cpp
  34. 1 0
      thorlcr/activities/msort/thmsortslave.cpp
  35. 1 1
      thorlcr/activities/selfjoin/thselfjoinslave.cpp
  36. 6 0
      thorlcr/graph/thgraphslave.cpp
  37. 1 0
      thorlcr/graph/thgraphslave.hpp
  38. 2 1
      thorlcr/msort/tsorts.cpp
  39. 1 0
      thorlcr/msort/tsorts.hpp
  40. 15 13
      thorlcr/slave/slave.ipp
  41. 47 25
      thorlcr/thorutil/thmem.cpp
  42. 23 1
      tools/wutool/wutool.cpp

+ 3 - 1
common/workunit/workunit.cpp

@@ -2916,7 +2916,7 @@ public:
                 {
                     unknownAttributes.append(getEnumText(subfmt,workunitSortFields));
                     if (subfmt==WUSFtotalthortime)
-                        sortorder = (WUSortField) (sortorder | WUSFnumeric);
+                        sortorder = (WUSortField) (sortorder & ~WUSFnumeric);
                 }
                 else
                 {
@@ -2930,6 +2930,8 @@ public:
                 fv = fv + strlen(fv)+1;
             }
         }
+        if ((sortorder&0xff)==WUSFtotalthortime)
+            sortorder = (WUSortField) (sortorder & ~WUSFnumeric);
         query.insert(0, namefilter.get());
         if (sortorder)
         {

+ 3 - 1
ecl/hthor/hthor.cpp

@@ -7999,7 +7999,9 @@ void CHThorDiskReadBaseActivity::resolve()
                         }
                         assertex(fdesc);
                         superfile.set(fdesc->querySuperFileDescriptor());
-                        assertex(superfile);
+                        if (!superfile && numsubs>0)
+                            logicalFileName.set(subfileLogicalFilenames.item(0));
+
                     }
                 }
                 if((helper.getFlags() & (TDXtemporary | TDXjobtemp)) == 0)

+ 1 - 0
ecl/regress/regex4.ecl

@@ -0,0 +1 @@
+REGEXREPLACE(u'a[', 'b', 'c');

+ 1 - 0
ecl/regress/regex5.ecl

@@ -0,0 +1 @@
+REGEXREPLACE('a[', 'b', 'c');

+ 1 - 0
esp/esdllib/CMakeLists.txt

@@ -26,6 +26,7 @@ include_directories (
     ${HPCC_SOURCE_DIR}/esp/esdllib
     ${HPCC_SOURCE_DIR}/system/jlib
     ${HPCC_SOURCE_DIR}/system/security/shared
+    ${HPCC_SOURCE_DIR}/rtl/eclrtl
     ${HPCC_SOURCE_DIR}/rtl/include #IXMLWriter
     ${HPCC_SOURCE_DIR}/common/thorhelper #JSONWRITER
 )

+ 8 - 5
esp/esdllib/esdl_transformer2.cpp

@@ -23,6 +23,7 @@
 
 #include "eclhelper.hpp"    //IXMLWriter
 #include "thorxmlwrite.hpp" //JSON WRITER
+#include "eclrtl.hpp"
 
 using namespace std;
 
@@ -160,7 +161,8 @@ void Esdl2LocalContext::handleDataFor(IXmlWriterExt & writer)
             for (it.first(); it.isValid(); it.next())
             {
                 IMapping& et = it.query();
-                writer.outputCString(m_dataFor->mapToValue(&et)->get(), NULL);
+                auto val = m_dataFor->mapToValue(&et)->get();
+                writer.outputUtf8(rtlUtf8Length(strlen(val),val),val, "@xsi:schemaLocation");
             }
         }
     }
@@ -320,7 +322,7 @@ void Esdl2Base::output_content(Esdl2TransformerContext &ctx, const char * conten
                         break;
                     case ESDLT_STRING:
                     default:
-                        ctx.writer->outputCString(content, tagname);
+                        ctx.writer->outputUtf8(rtlUtf8Length(strlen(content),content), content, tagname);
                         break;
                 }
             }
@@ -945,7 +947,8 @@ void Esdl2Struct::process(Esdl2TransformerContext &ctx, const char *out_name, Es
                             StringBuffer attname("@");
                             attname.append(local_in->m_startTag->getLocalName(idx));
 
-                            ctx.writer->outputCString(local_in->m_startTag->getValue(idx), attname.str());
+                            auto val = local_in->m_startTag->getValue(idx);
+                            ctx.writer->outputUtf8(rtlUtf8Length(strlen(val),val),val,attname.str());
                         }
                     }
                 }
@@ -1042,7 +1045,7 @@ void Esdl2Struct::process(Esdl2TransformerContext &ctx, const char *out_name, Es
         }
 
         if (completeContent.length()>0)
-            ctx.writer->outputCString(completeContent.str(), NULL);
+            ctx.writer->outputUtf8(rtlUtf8Length(completeContent.length(),completeContent.str()),completeContent.str(),NULL);
 
         local.handleDataFor(*(ctx.writer));
 
@@ -1224,7 +1227,7 @@ void Esdl2Response::process(Esdl2TransformerContext &ctx, const char *out_name,
                     if (ctx.schemaLocation.length() > 0 )
                     {
                         ctx.writer->outputXmlns("xsi", "http://www.w3.org/2001/XMLSchema-instance");
-                        ctx.writer->outputCString(ctx.schemaLocation.str(), "@xsi:schemaLocation");
+                        ctx.writer->outputUtf8(rtlUtf8Length(ctx.schemaLocation.length(),ctx.schemaLocation.str()),ctx.schemaLocation.str(),"@xsi:schemaLocation");
                     }
 
                     ctx.do_output_ns=false;

+ 4 - 2
initfiles/bash/etc/init.d/dafilesrv.in

@@ -71,9 +71,9 @@ bin_path=${INSTALL_DIR}/bin
 component=""
 runSetupOnly=0
 source ${configgen_path}/hpcc_setenv
+createDir ${LOG_DIR}
 
 which_service
-get_commondirs
 
 #Check for existance of user
 check_user ${user}
@@ -92,7 +92,6 @@ if [ $? -ne 1 ];then
 fi
 
 
-validate_configuration
 COMPS=$(${configgen_path}/configgen -env ${envfile} -ip "127.0.0.1" -list)
 if [[ $rc -ne 0 ]]; then
     log "hpcc-init: failure to build COMPS from configgen call"
@@ -147,6 +146,9 @@ case "$arg" in
         ;;
 esac
 
+if [ "${cmd}" = "start" ] || [ "${cmd}" = "restart" ]; then
+    validate_configuration
+fi
 
 unset IFS
 

+ 5 - 4
initfiles/bash/etc/init.d/hpcc-init.in

@@ -92,6 +92,7 @@ source  ${INSTALL_DIR}/etc/init.d/export-path
 
 export logfile=${LOG_DIR}/hpcc-init.log
 
+createDir ${LOG_DIR}
 [ ! -e ${logfile}  ] && touch $logfile
 
 ## Debug variable allowing verbose debug output
@@ -117,7 +118,6 @@ bin_path=${path}/bin
 source ${configgen_path}/hpcc_setenv
 
 which_service
-get_commondirs
 
 log "--------------------------"
 log "--------------------------"
@@ -142,7 +142,6 @@ chown -cR $user:$group ${home}/${user} 1>/dev/null 2>/dev/null
 
 basepath=`pwd`
 
-validate_configuration
 COMPS=$(${configgen_path}/configgen -env ${envfile} -list)
 if [[ $rc -ne 0 ]]; then
     log "hpcc-init: failure to build COMPS from configgen call"
@@ -318,14 +317,16 @@ esac
 
 log "Attempting to execute ${cmd} argument on specified components"
 
-unset IFS
 
 # Create dropzone on a full system start
-if [ ${cmd} = "start" ] || [ "${cmd}" = "restart" ]; then
+if [ "${cmd}" = "start" ] || [ "${cmd}" = "restart" ]; then
+    validate_configuration
+    get_commondirs
     log "Creating dropzone"
     create_dropzone
 fi
 
+unset IFS
 #Before we start/stop any component we should check dafilesrv.
 if [ ! -z "${compDafilesrv}" ];then
     case "$1" in

+ 7 - 4
initfiles/bash/etc/init.d/hpcc_common.in

@@ -89,6 +89,7 @@ dir.parser() {
         if [ $# -lt 1 ]; then
             [[ "${VERBOSE:-0}" -eq 1 ]] && log_end_msg 1 && return 1 || return 1
         fi
+        OIFS=${IFS}
         IFS=$'\n' && cmp=( $@ )
         cmp=( ${cmp[*]/#/dir_} )
         cmp=( ${cmp[*]/=/= dirItem=} )
@@ -98,6 +99,7 @@ dir.parser() {
         for i in ${!dir_*}; do
             dirArray=( ${dirArray[@]} $i )
         done
+        IFS=${OIFS}
     [[ "${VERBOSE:-0}" -eq 1 ]] && log_end_msg 0
 }
 
@@ -273,7 +275,7 @@ set_componentvars() {
 
 validate_configuration() {
     if ! validation_error=$(${configgen_path}/configgen -env ${envfile} -validateonly 2>&1); then
-        log  "get_commondirs(): validation failure ${envfile}"
+        log  "validate_configuration(): validation failure ${envfile}"
         log  "${validation_error}"
         echo -e "\033[31merror\033[0m: configgen xml validation failure"
         exit 1
@@ -282,7 +284,6 @@ validate_configuration() {
 
 get_commondirs() {
     componentFile="${path}/componentfiles/configxml"
-    validate_configuration
     DIRS=$(${configgen_path}/configgen -env ${envfile} -id ${componentFile} -listcommondirs)
     rc=$?
     if [[ $rc -ne 0 ]]; then
@@ -299,7 +300,6 @@ configGenCmd() {
     # Creating logfiles for component
     logDir=$log/${compName}
 
-    validate_configuration
     configcmd="${configgen_path}/configgen -env ${envfile} -od ${runtime} -id ${componentFile} -c ${compName}"
     log "$configcmd"
     if [ "$(whoami)" != "${user}" ]; then
@@ -369,6 +369,7 @@ createRuntime() {
     chown -c $user:$group "$lock/$compName"  1> /dev/null 2>/dev/null
     chown -c $user:$group "$log/$compName"  1> /dev/null 2>/dev/null
     chown -c $user:$group "$compPath"  1> /dev/null 2>/dev/null
+
     dir.getByName data
     chown -c $user:$group "${dir_return}"  1> /dev/null 2>/dev/null
     dir.getByName data2
@@ -696,7 +697,8 @@ setup_component() {
 }
 
 create_dropzone() {
-    validate_configuration
+    OIFS=${IFS}
+    unset IFS
     dropzones=$(${configgen_path}/configgen -env ${envfile} -listdirs)
     rc=$?
     if [[ $rc -ne 0 ]]; then
@@ -712,6 +714,7 @@ create_dropzone() {
             chmod 777 $D > /dev/null 2>&1
         fi
     done
+    IFS=${OIFS}
 }
 
 check_user(){

+ 0 - 2
initfiles/bin/init_configesp

@@ -22,8 +22,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="configesp.sentinel"

+ 0 - 2
initfiles/bin/init_dafilesrv.in

@@ -33,8 +33,6 @@ source ${INSTALL_DIR}/etc/init.d/hpcc_common
 export handlelimit=32768
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="dafilesrv.sentinel"

+ 0 - 2
initfiles/bin/init_dali

@@ -21,8 +21,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="daserver.sentinel"

+ 0 - 2
initfiles/bin/init_dfuserver

@@ -21,8 +21,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="dfuserver.sentinel"

+ 0 - 2
initfiles/bin/init_eclagent.in

@@ -23,8 +23,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="agentexec.sentinel"

+ 0 - 2
initfiles/bin/init_eclccserver

@@ -21,8 +21,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="eclccserver.sentinel"

+ 0 - 2
initfiles/bin/init_eclscheduler

@@ -21,8 +21,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="eclscheduler.sentinel"

+ 0 - 2
initfiles/bin/init_roxie

@@ -21,8 +21,6 @@ PID_NAME="$PID/$(basename $PWD).pid"
 source ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="roxie.sentinel"

+ 0 - 2
initfiles/bin/init_sasha

@@ -23,8 +23,6 @@ INSTALL_DIR="$(dirname ${PATH_PRE})/.."
 source  ${INSTALL_DIR}/etc/init.d/hpcc_common
 
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 export logfile="${LOG_DIR}/${component}/init_${component}_$(date +%Y_%m_%d_%H_%M_%S).log"
 
 export SENTINEL="saserver.sentinel"

+ 2 - 4
initfiles/bin/init_thor

@@ -22,8 +22,6 @@ source ${PATH_PRE}
 INSTALL_DIR=$(dirname ${PATH_PRE})/..
 source  ${INSTALL_DIR}/etc/init.d/hpcc_common
 component=$(basename $PWD)
-dir.getByName lock
-lock="$dir_return"
 
 PID_NAME="$PID/${component}.pid"
 
@@ -80,7 +78,7 @@ kill_slaves()
             FRUNSSH_RC=$?
             if [[ ${FRUNSSH_RC} -gt 0 ]]; then
                 log "Error ${FRUNSSH_RC} in frunssh"
-                log "Please check $(dirname ${LOG_DIR})/frunssh for more details"
+                log "Please check ${LOG_DIR}/frunssh for more details"
                 # clean up any slaves it was able to reach
                 log "Stopping ${component}"
                 kill_process ${PID_NAME} thormaster_${component} 30
@@ -140,7 +138,7 @@ while [[ 1 ]]; do
         FRUNSSH_RC=$?
         if [[ ${FRUNSSH_RC} -gt 0 ]]; then
             log "Error ${FRUNSSH_RC} in frunssh"
-            log "Please check $(dirname ${LOG_DIR})/frunssh for more details"
+            log "Please check ${LOG_DIR}/frunssh for more details"
             # clean up any slaves it was able to reach
             killed
         fi

+ 4 - 0
initfiles/componentfiles/configxml/buildsetCC.xml.in

@@ -162,6 +162,10 @@
                           path="FileDkcAccess"
                           resource="FileDkcAccess"
                           service="ws_fs"/>
+     <AuthenticateFeature description="Access to upload files to dropzone"
+                          path="FileUploadAccess"
+                          resource="FileUploadAccess"
+                          service="ws_fs" />
      <AuthenticateFeature description="Access to files in dropzone"
                           path="FileIOAccess"
                           resource="FileIOAccess"

+ 13 - 0
initfiles/etc/DIR_NAME/environment.xml.in

@@ -362,6 +362,11 @@
                          resource="FileDkcAccess"
                          service="ws_fs"/>
     <AuthenticateFeature authenticate="Yes"
+                         description="Access to upload files to dropzone"
+                         path="FileUploadAccess"
+                         resource="FileUploadAccess"
+                         service="ws_fs"/>
+    <AuthenticateFeature authenticate="Yes"
                          description="Access to files in dropzone"
                          path="FileIOAccess"
                          resource="FileIOAccess"
@@ -607,6 +612,10 @@
                          path="FileDkcAccess"
                          resource="FileDkcAccess"
                          service="ws_fs"/>
+    <AuthenticateFeature description="Access to upload files to dropzone"
+                         path="FileUploadAccess"
+                         resource="FileUploadAccess"
+                         service="ws_fs"/>
     <AuthenticateFeature description="Access to files in dropzone"
                          path="FileIOAccess"
                          resource="FileIOAccess"
@@ -904,6 +913,10 @@
                           path="FileDkcAccess"
                           resource="FileDkcAccess"
                           service="ws_fs"/>
+     <AuthenticateFeature description="Access to upload files to dropzone"
+                          path="FileUploadAccess"
+                          resource="FileUploadAccess"
+                          service="ws_fs"/>
      <AuthenticateFeature description="Access to files in dropzone"
                           path="FileIOAccess"
                           resource="FileIOAccess"

+ 3 - 0
roxie/ccd/ccdcontext.cpp

@@ -2663,6 +2663,7 @@ protected:
         isBlocked = false;
         isNative = true;
         sendHeartBeats = false;
+        trim = false;
 
         lastSocketCheckTime = startTime;
         lastHeartBeat = startTime;
@@ -2761,6 +2762,8 @@ public:
         isNative = (flags & HPCC_PROTOCOL_NATIVE);
         isRaw = (flags & HPCC_PROTOCOL_NATIVE_RAW);
         isBlocked = (flags & HPCC_PROTOCOL_BLOCKED);
+        trim = (flags & HPCC_PROTOCOL_TRIM);
+
         xmlStoredDatasetReadFlags = _xmlReadFlags;
         sendHeartBeats = enableHeartBeat && isRaw && isBlocked && options.priority==0;
 

+ 4 - 4
roxie/roxiemem/roxiemem.cpp

@@ -132,6 +132,7 @@ static bool heapNotifyUnusedEachFree = true;
 static bool heapNotifyUnusedEachBlock = false;
 static unsigned __int64 lastStatsCycles;
 static unsigned __int64 statsCyclesInterval;
+static std::atomic<unsigned> activeRowManagers;
 
 static unsigned heapAllocated;
 static std::atomic_uint dataBufferPages;
@@ -549,14 +550,14 @@ static StringBuffer &memmap(StringBuffer &stats)
 
 static void throwHeapExhausted(unsigned allocatorId, unsigned pages)
 {
-    VStringBuffer msg("Memory pool exhausted: pool id %u (%u pages) exhausted, requested %u", allocatorId, heapTotalPages, pages);
+    VStringBuffer msg("Memory pool exhausted: pool id %u (%u pages) exhausted, requested %u active(%u) heap(%u/%u)", allocatorId, heapTotalPages, pages, activeRowManagers.load(), heapAllocated, heapTotalPages);
     DBGLOG("%s", msg.str());
     throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
 }
 
 static void throwHeapExhausted(unsigned allocatorId, unsigned newPages, unsigned oldPages)
 {
-    VStringBuffer msg("Memory pool exhausted: pool id %u (%u pages) exhausted, requested %u, had %u", allocatorId, heapTotalPages, newPages, oldPages);
+    VStringBuffer msg("Memory pool exhausted: pool id %u (%u pages) exhausted, requested %u, had %u active(%u) heap(%u/%u)", allocatorId, heapTotalPages, newPages, oldPages, activeRowManagers.load(), heapAllocated, heapTotalPages);
     DBGLOG("%s", msg.str());
     throw MakeStringExceptionDirect(ROXIEMM_MEMORY_POOL_EXHAUSTED, msg.str());
 }
@@ -3548,7 +3549,6 @@ void initAllocSizeMappings(const unsigned * sizes)
 
 //---------------------------------------------------------------------------------------------------------------------
 
-static std::atomic<unsigned> activeRowManagers;
 class CChunkingRowManager : public CRowManager
 {
     friend class CRoxieFixedRowHeap;
@@ -4152,7 +4152,7 @@ public:
                     releaseEmptyPages(querySlaveId(), true);
                     if (numHeapPages == totalHeapPages.load(std::memory_order_relaxed))
                     {
-                        VStringBuffer msg("Memory limit exceeded: current %u, requested %u, limit %u", pageCount, numRequested, pageLimit);
+                        VStringBuffer msg("Memory limit exceeded: current %u, requested %u, limit %u active(%u) heap(%u/%u)", pageCount, numRequested, pageLimit, activeRowManagers.load(), heapAllocated, heapTotalPages);
                         logctx.CTXLOG("%s", msg.str());
 
                         //Avoid a stack trace if the allocation is optional

+ 3 - 3
rtl/eclrtl/eclregex.cpp

@@ -344,7 +344,7 @@ public:
         else
             pattern = RegexPattern::compile(_UregExp, UREGEX_CASE_INSENSITIVE, uperr, uerr);
 
-        matcher = pattern->matcher(uerr);
+        matcher = pattern ? pattern->matcher(uerr) : NULL;
         if (U_FAILURE(uerr))
         {
             char * expAscii;
@@ -357,8 +357,8 @@ public:
             rtlFree(expAscii);
             delete matcher;
             delete pattern;
-            matcher = 0;
-            pattern = 0;
+            matcher = NULL;
+            pattern = NULL;
             rtlFail(0, msg.c_str());  //throws
         }
     }

+ 1 - 1
testing/regress/ecl/jsonfetch.ecl

@@ -69,6 +69,6 @@ BUILD(dsJsonFetchWithPos, {surname, RecPtr}, 'REGRESS::TEMP::jsonfetch.json.inde
 
 jsonFetchIndex := INDEX(dsJsonFetchWithPos, {surname, RecPtr}, DYNAMIC('REGRESS::TEMP::jsonfetch.json.index'));
 
-fetcheddata := LIMIT(FETCH(dsJsonFetchWithPos, jsonFetchIndex(surname = 'Mitchell'), RIGHT.RecPtr), 10);
+fetcheddata := LIMIT(SORT(FETCH(dsJsonFetchWithPos, jsonFetchIndex(surname = 'Mitchell'), RIGHT.RecPtr), RECORD), 10);
 fetchednopos := project(fetcheddata, personRecord); //don't output positions
 output(fetchednopos, named('fetched'));

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 1 - 1
testing/regress/ecl/key/jsonfetch.xml


+ 5 - 0
testing/regress/ecl/key/nwaytest.xml

@@ -0,0 +1,5 @@
+<Dataset name='Result 1'>
+ <Row><key>1</key><children1><Row><key>1</key><l>a1</l></Row><Row><key>2</key><l>a2</l></Row></children1><children2><Row><key>1</key><l>a1</l></Row><Row><key>2</key><l>a2</l></Row></children2><joinres><Row><key>1</key><l>a1c1</l></Row><Row><key>2</key><l>a2c2</l></Row></joinres></Row>
+ <Row><key>2</key><children1><Row><key>1</key><l>c1</l></Row><Row><key>2</key><l>c2</l></Row></children1><children2><Row><key>1</key><l>t_b1</l></Row><Row><key>2</key><l>t_b2</l></Row></children2><joinres><Row><key>1</key><l>t_b1c1</l></Row><Row><key>2</key><l>t_b2c2</l></Row></joinres></Row>
+ <Row><key>2</key><children1><Row><key>1</key><l>c1</l></Row><Row><key>2</key><l>c2</l></Row></children1><children2><Row><key>1</key><l>t_b1</l></Row><Row><key>2</key><l>t_b2</l></Row></children2><joinres><Row><key>1</key><l>t_b1c1</l></Row><Row><key>2</key><l>t_b2c2</l></Row></joinres></Row>
+</Dataset>

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 1 - 1
testing/regress/ecl/key/xmlfetch2.xml


+ 47 - 0
testing/regress/ecl/nwaytest.ecl

@@ -0,0 +1,47 @@
+
+
+rec := RECORD
+ unsigned key;
+ string l;
+END;
+
+input1 := DATASET([{1, 'a1'}, {2, 'a2'}], rec);
+input2 := DATASET([{1, 'b1'}, {2, 'b2'}], rec);
+input3 := DATASET([{1, 'c1'}, {2, 'c2'}], rec);
+
+p2 := PROJECT(NOFOLD(input2), TRANSFORM(rec, SELF.l := 't_' + LEFT.l; SELF := LEFT), PARALLEL(4));
+
+inputs := [input1, p2, input3];
+
+rec dojoin(DATASET(rec) m) := TRANSFORM
+ SELF.key := m[1].key;
+ SELF.l := m[1].l + m[2].l + m[3].l;
+END;
+
+
+
+parentrec := RECORD
+ unsigned key;
+ DATASET(rec) children1;
+ DATASET(rec) children2;
+ DATASET(rec) joinres;
+END;
+
+inrec := RECORD
+ unsigned key;
+ unsigned which;
+END;
+
+parentrec trans(inrec l) := TRANSFORM
+ r := RANGE(inputs, [l.which, 3]);
+ SELF.children1 := r[l.which];
+ SELF.children2 := inputs[l.which];
+ SELF.joinres := JOIN(r, LEFT.key=RIGHT.key, dojoin(ROWS(LEFT)), SORTED(key));
+ SELF := l;
+END;
+
+ds := DATASET([{1, 1}, {2, 2}, {2, 2}], inrec);
+t1 := PROJECT(ds, trans(LEFT), PARALLEL(4));
+
+OUTPUT(t1);
+

+ 3 - 0
testing/regress/ecl/stepping2.ecl

@@ -23,6 +23,9 @@ multiPart := #IFDEFINED(root.multiPart, false);
 
 //--- end of version configuration ---
 
+//nothor
+//Stepped Thor support
+
 import $.Setup;
 import $.Setup.TS;
 wordIndex := Setup.Files(multiPart, false).getWordIndex();

+ 1 - 1
testing/regress/ecl/xmlfetch2.ecl

@@ -68,6 +68,6 @@ BUILD(xmlWithPos, {surname, RecPtr}, 'REGRESS::TEMP::xmlfetch2.xml.index', OVERW
 
 xmlIndex := INDEX(xmlWithPos, {surname, RecPtr}, DYNAMIC('REGRESS::TEMP::xmlfetch2.xml.index'));
 
-fetcheddata := LIMIT(FETCH(xmlWithPos, xmlIndex(surname = 'Mitchell'), RIGHT.RecPtr), 10);
+fetcheddata := LIMIT(SORT(FETCH(xmlWithPos, xmlIndex(surname = 'Mitchell'), RIGHT.RecPtr), RECORD), 10);
 fetchednopos := project(fetcheddata, personRecord); //don't output positions
 output(fetchednopos, named('fetched'));

+ 43 - 6
thorlcr/activities/funnel/thfunnelslave.cpp

@@ -769,7 +769,7 @@ class CNWaySelectActivity : public CSlaveActivity, public CThorSteppable
     IHThorNWaySelectArg *helper;
     IThorDataLink *selectedInputITDL = nullptr;
     IEngineRowStream *selectedStream = nullptr;
-    Owned<IStrandJunction> selectedJunction;
+    IStrandJunction *selectedJunction = nullptr;
 public:
     IMPLEMENT_IINTERFACE_USING(CSlaveActivity);
 
@@ -782,11 +782,10 @@ public:
     {
         ActivityTimer s(totalCycles, timeActivities);
 
-        PARENT::start();
-
         unsigned whichInput = helper->getInputIndex();
-        selectedInputITDL = NULL;
-        selectedStream = NULL;
+        selectedInputITDL = nullptr;
+        selectedStream = nullptr;
+        selectedJunction = nullptr;
         if (whichInput--)
         {
             ForEachItemIn(i, inputs)
@@ -795,20 +794,36 @@ public:
                 IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
                 if (nWayInput)
                 {
+                    cur->start();
                     unsigned numRealInputs = nWayInput->numConcreteOutputs();
                     if (whichInput < numRealInputs)
                     {
                         selectedInputITDL = nWayInput->queryConcreteInput(whichInput);
-                        selectedStream = connectSingleStream(*this, selectedInputITDL, 0, selectedJunction, true);  // Should this be passing whichInput??
+                        selectedStream = nWayInput->queryConcreteInputStream(whichInput);
+                        selectedJunction = nWayInput->queryConcreteInputJunction(whichInput);
                         break;
                     }
                     whichInput -= numRealInputs;
                 }
+                else
+                {
+                    if (whichInput == 0)
+                    {
+                        selectedInputITDL = cur;
+                        selectedStream = queryInputStream(i);
+                        selectedJunction = queryInputJunction(i);
+                        break;
+                    }
+                    whichInput -= 1;
+                }
+                if (selectedInputITDL)
+                    break;
             }
         }
         if (selectedInputITDL)
             selectedInputITDL->start();
         startJunction(selectedJunction);
+        dataLinkStart();
     }
     virtual void stop() override
     {
@@ -877,6 +892,8 @@ class CThorNWayInputSlaveActivity : public CSlaveActivity, implements IThorNWayI
 {
     IHThorNWayInputArg *helper;
     PointerArrayOf<IThorDataLink> selectedInputs;
+    PointerArrayOf<IEngineRowStream> selectedInputStreams;
+    PointerArrayOf<IStrandJunction> selectedInputJunctions;
     bool grouped;
 
 public:
@@ -896,10 +913,16 @@ public:
         rtlDataAttr selection;
         helper->getInputSelection(selectionIsAll, selectionLen, selection.refdata());
         selectedInputs.kill();
+        selectedInputStreams.kill();
+        selectedInputJunctions.kill();
         if (selectionIsAll)
         {
             ForEachItemIn(i, inputs)
+            {
                 selectedInputs.append(queryInput(i));
+                selectedInputStreams.append(queryInputStream(i));
+                selectedInputJunctions.append(queryInputJunction(i));
+            }
         }
         else
         {
@@ -918,6 +941,8 @@ public:
                     throw MakeStringException(100, "Index %d in RANGE selection list is out of range", nextIndex);
 
                 selectedInputs.append(queryInput(nextIndex-1));
+                selectedInputStreams.append(queryInputStream(nextIndex-1));
+                selectedInputJunctions.append(queryInputJunction(nextIndex-1));
             }
         }
         // NB: Whatever pulls this IThorNWayInput, starts and stops the selectedInputs
@@ -947,6 +972,18 @@ public:
             return selectedInputs.item(idx);
         return NULL;
     }
+    virtual IEngineRowStream *queryConcreteInputStream(unsigned idx) const
+    {
+        if (selectedInputStreams.isItem(idx))
+            return selectedInputStreams.item(idx);
+        return NULL;
+    }
+    virtual IStrandJunction *queryConcreteInputJunction(unsigned idx) const
+    {
+        if (selectedInputJunctions.isItem(idx))
+            return selectedInputJunctions.item(idx);
+        return NULL;
+    }
 };
 
 

+ 2 - 2
thorlcr/activities/join/thjoinslave.cpp

@@ -546,7 +546,7 @@ public:
         }
         else
         {
-            sorter->Gather(primaryRowIf, primaryInputStream, primaryCompare, NULL, NULL, primaryKeySerializer, NULL, false, isUnstable(), abortSoon, NULL);
+            sorter->Gather(primaryRowIf, primaryInputStream, primaryCompare, NULL, NULL, primaryKeySerializer, NULL, NULL, false, isUnstable(), abortSoon, NULL);
             stopPartitionInput();
             if (abortSoon)
             {
@@ -571,7 +571,7 @@ public:
             sorter->stopMerge();
         }
         // NB: on secondary sort, the primaryKeySerializer is used
-        sorter->Gather(secondaryRowIf, secondaryInputStream, secondaryCompare, primarySecondaryCompare, primarySecondaryUpperCompare, primaryKeySerializer, partitionRow, noSortOtherSide(), isUnstable(), abortSoon, primaryRowIf); // primaryKeySerializer *is* correct
+        sorter->Gather(secondaryRowIf, secondaryInputStream, secondaryCompare, primarySecondaryCompare, primarySecondaryUpperCompare, primaryKeySerializer, primaryCompare, partitionRow, noSortOtherSide(), isUnstable(), abortSoon, primaryRowIf); // primaryKeySerializer *is* correct
         mergeStats(spillStats, sorter);
         //MORE: Stats from spilling the primaryStream??
         partitionRow.clear();

+ 1 - 0
thorlcr/activities/msort/thmsortslave.cpp

@@ -113,6 +113,7 @@ public:
                 helper->queryCompareLeftRight(),
                 NULL,helper->querySerialize(),
                 NULL,
+                NULL,
                 false,
                 isUnstable(),
                 abortSoon,

+ 1 - 1
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -77,7 +77,7 @@ private:
 #if THOR_TRACE_LEVEL > 5
         ActPrintLog("SELFJOIN: Performing global self-join");
 #endif
-        sorter->Gather(::queryRowInterfaces(input), inputStream, compare, NULL, NULL, keyserializer, NULL, false, isUnstable(), abortSoon, NULL);
+        sorter->Gather(::queryRowInterfaces(input), inputStream, compare, NULL, NULL, keyserializer, NULL, NULL, false, isUnstable(), abortSoon, NULL);
         PARENT::stop();
         if (abortSoon)
         {

+ 6 - 0
thorlcr/graph/thgraphslave.cpp

@@ -251,6 +251,12 @@ IEngineRowStream *CSlaveActivity::queryInputStream(unsigned index) const
     return inputs.item(index).stream;
 }
 
+IStrandJunction *CSlaveActivity::queryInputJunction(unsigned index) const
+{
+    if (index>=inputs.ordinality()) return nullptr;
+    return inputs.item(index).junction;
+}
+
 IEngineRowStream *CSlaveActivity::queryOutputStream(unsigned index) const
 {
     if (index>=outputStreams.ordinality()) return nullptr;

+ 1 - 0
thorlcr/graph/thgraphslave.hpp

@@ -171,6 +171,7 @@ public:
     IThorDataLink *queryOutput(unsigned index) const;
     IThorDataLink *queryInput(unsigned index) const;
     IEngineRowStream *queryInputStream(unsigned index) const;
+    IStrandJunction *queryInputJunction(unsigned index) const;
     IEngineRowStream *queryOutputStream(unsigned index) const;
     inline bool queryInputStarted(unsigned input) const { return inputs.item(input).isStarted(); }
     inline bool queryInputStopped(unsigned input) const { return inputs.item(input).isStopped(); }

+ 2 - 1
thorlcr/msort/tsorts.cpp

@@ -1177,6 +1177,7 @@ public:
         ICompare *_primarySecondaryCompare,
         ICompare *_primarySecondaryUpperCompare,
         ISortKeySerializer *_keyserializer,
+        ICompare *primaryCompare, // needed if no key serializer
         const void *_partitionrow,
         bool _nosort,
         bool _unstable,
@@ -1219,7 +1220,7 @@ public:
             ActPrintLog(activity, "No key serializer");
             keyIf.set(auxrowif);
             rowToKeySerializer.set(keyIf->queryRowSerializer());
-            keyRowCompare = rowCompare;
+            keyRowCompare = primaryCompare ? primaryCompare : rowCompare;
         }
         nosort = _nosort;
         if (nosort)

+ 1 - 0
thorlcr/msort/tsorts.hpp

@@ -42,6 +42,7 @@ public:
         ICompare *icollate,
         ICompare *icollateupper,
         ISortKeySerializer *keyserializer, 
+        ICompare *primaryCompare,
         const void *partitionrow, 
         bool nosort, 
         bool unstable, 

+ 15 - 13
thorlcr/slave/slave.ipp

@@ -83,6 +83,8 @@ interface IThorNWayInput
 {
     virtual unsigned numConcreteOutputs() const = 0;
     virtual IThorDataLink *queryConcreteInput(unsigned idx) const = 0;
+    virtual IEngineRowStream *queryConcreteInputStream(unsigned idx) const = 0;
+    virtual IStrandJunction *queryConcreteInputJunction(unsigned idx) const = 0;
 };
 
 
@@ -92,17 +94,13 @@ class CThorNarySlaveActivity : public CSlaveActivity
     
 protected:
     PointerArrayOf<IThorDataLink> expandedInputs;
-    Owned<IStrandJunction> *expandedJunctions = nullptr;
     PointerArrayOf<IEngineRowStream> expandedStreams;
+    PointerArrayOf<IStrandJunction> expandedJunctions;
 
 public:
     CThorNarySlaveActivity(CGraphElementBase *container) : CSlaveActivity(container)
     {
     }
-    ~CThorNarySlaveActivity()
-    {
-        delete [] expandedJunctions;
-    }
     virtual void start() override
     {
         ForEachItemIn(i, inputs)
@@ -112,24 +110,29 @@ public:
             IThorNWayInput *nWayInput = dynamic_cast<IThorNWayInput *>(cur);
             if (nWayInput)
             {
+                cur->start();
                 unsigned numRealInputs = nWayInput->numConcreteOutputs();
                 for (unsigned i=0; i < numRealInputs; i++)
                 {
                     IThorDataLink *curReal = nWayInput->queryConcreteInput(i);
+                    IEngineRowStream *curRealStream = nWayInput->queryConcreteInputStream(i);
+                    IStrandJunction *curRealJunction = nWayInput->queryConcreteInputJunction(i);
                     expandedInputs.append(curReal);
+                    expandedStreams.append(curRealStream);
+                    expandedJunctions.append(curRealJunction);
                 }
             }
             else
+            {
                 expandedInputs.append(cur);
+                expandedStreams.append(queryInputStream(i));
+                expandedJunctions.append(queryInputJunction(i));
+            }
         }
         ForEachItemIn(ei, expandedInputs)
             expandedInputs.item(ei)->start();
-        expandedJunctions = new Owned<IStrandJunction> [expandedInputs.ordinality()];
         ForEachItemIn(idx, expandedInputs)
-        {
-            expandedStreams.append(connectSingleStream(*this, expandedInputs.item(idx), 0, expandedJunctions[idx], true));  // MORE - is the index 0 right?
-            startJunction(expandedJunctions[idx]);
-        }
+            startJunction(expandedJunctions.item(idx));
         dataLinkStart();
     }
     void stop()
@@ -137,11 +140,10 @@ public:
         ForEachItemIn(ei, expandedStreams)
             expandedStreams.item(ei)->stop();
         ForEachItemIn(idx, expandedInputs)
-            resetJunction(expandedJunctions[idx]);
+            resetJunction(expandedJunctions.item(idx));
         expandedInputs.kill();
         expandedStreams.kill();
-        delete [] expandedJunctions;
-        expandedJunctions = nullptr;
+        expandedJunctions.kill();
     }
 };
 

+ 47 - 25
thorlcr/thorutil/thmem.cpp

@@ -266,18 +266,19 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
 {
     class CStream : public CSimpleInterface, implements IRowStream, implements IWritePosCallback
     {
-        rowidx_t pos;
-        offset_t outputOffset;
+        rowidx_t pos = 0;
+        offset_t outputOffset = (offset_t)-1;
         Owned<IRowStream> spillStream;
         Linked<CSharedSpillableRowSet> owner;
+        rowidx_t toRead = 0;
+        bool eos = false;
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CStream(CSharedSpillableRowSet &_owner) : owner(&_owner)
+        CStream(CSharedSpillableRowSet &_owner, rowidx_t _toRead) : owner(&_owner), toRead(_toRead)
         {
-            pos = 0;
-            outputOffset = (offset_t)-1;
-            owner->rows.registerWriteCallback(*this); // NB: CStream constructor called within rows lock
+            // NB: CStream constructor called within rows lock and only called if not yet spilled
+            owner->rows.registerWriteCallback(*this);
         }
         ~CStream()
         {
@@ -288,26 +289,43 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
     // IRowStream
         virtual const void *nextRow()
         {
-            if (spillStream)
-                return spillStream->nextRow();
-            CRowsLockBlock block(*owner);
-            if (owner->spillFile) // i.e. has spilt
-            {
-                block.clearCB = true;
-                assertex(((offset_t)-1) != outputOffset);
-                unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (owner->preserveNulls)
-                    rwFlags |= rw_grouped;
-                spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
-                owner->rows.unregisterWriteCallback(*this); // no longer needed
-                return spillStream->nextRow();
-            }
-            else if (pos == owner->rows.numCommitted())
+            if (!eos)
             {
-                owner->rows.unregisterWriteCallback(*this); // no longer needed
-                return NULL;
+                const void *ret;
+                if (spillStream)
+                    ret = spillStream->nextRow();
+                else
+                {
+                    CRowsLockBlock block(*owner);
+                    if (owner->spillFile) // i.e. has spilt
+                    {
+                        block.clearCB = true;
+                        assertex(((offset_t)-1) != outputOffset);
+                        unsigned rwFlags = DEFAULT_RWFLAGS;
+                        if (owner->preserveNulls)
+                            rwFlags |= rw_grouped;
+                        spillStream.setown(::createRowStreamEx(owner->spillFile, owner->rowIf, outputOffset, (offset_t)-1, (unsigned __int64)-1, rwFlags));
+                        owner->rows.unregisterWriteCallback(*this); // no longer needed
+                        ret = spillStream->nextRow();
+                    }
+                    else
+                    {
+                        // NB: would not reach here if nothing left to read
+                        ret = owner->rows.get(pos++);
+                        if (pos == toRead)
+                        {
+                            owner->rows.unregisterWriteCallback(*this); // no longer needed
+                            eos = true; // for any subsequent calls
+                        }
+                        return ret;
+                    }
+                }
+                if (ret)
+                    return ret;
+                if (!owner->preserveNulls)
+                    eos = true;
             }
-            return owner->rows.get(pos++);
+            return nullptr;
         }
         virtual void stop()
         {
@@ -344,7 +362,11 @@ public:
                 rwFlags |= rw_grouped;
             return ::createRowStream(spillFile, rowIf, rwFlags);
         }
-        return new CStream(*this);
+        rowidx_t toRead = rows.numCommitted();
+        if (toRead)
+            return new CStream(*this, toRead);
+        else
+            return createNullRowStream();
     }
 };
 

+ 23 - 1
tools/wutool/wutool.cpp

@@ -486,7 +486,8 @@ class WuTool : public CppUnit::TestFixture
         CPPUNIT_TEST(testQuery);
         CPPUNIT_TEST(testGraph);
         CPPUNIT_TEST(testGraphProgress);
-        CPPUNIT_TEST(testGlobal);
+        CPPUNIT_TEST(testGlobal); 
+        CPPUNIT_TEST(testSortByThorTime);
     CPPUNIT_TEST_SUITE_END();
 protected:
     static StringArray wuids;
@@ -1646,6 +1647,27 @@ protected:
         ASSERT(numIterated == (testSize+9)/10);
         numIterated++;
     }
+    void testSortByThorTime()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        unsigned start = msTick();
+        unsigned numIterated = 0;
+        // Test filter by filesRead
+        WUSortField nofilter[] = { WUSFterm };
+        unsigned prevValue = 0;
+        Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsSorted((WUSortField)(WUSFtotalthortime+WUSFnumeric), nofilter, nullptr, 0, 10000, NULL, NULL);
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            if (numIterated)
+                ASSERT(wu.getTotalThorTime() >= prevValue);
+            prevValue=wu.getTotalThorTime();
+            numIterated++;
+        }
+        DBGLOG("%d workunits by totalThorTime in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+9)/10);
+        numIterated++;
+    }
     void testGlobal()
     {
         // Is global workunit ever actually used any more? For scalar persists, perhaps