浏览代码

Merge remote-tracking branch 'origin/candidate-5.0.4'

Conflicts:
	system/security/LdapSecurity/ldapconnection.cpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父节点
当前提交
7e1b022820

+ 6 - 3
dali/ft/filecopy.cpp

@@ -1172,13 +1172,16 @@ void FileSprayer::calculateSprayPartition()
     ForEachItemIn(idx2, partitioners)
         partitioners.item(idx2).getResults(partition);
 
-    if (partitioners.ordinality() > 0)
+    if ((partitioners.ordinality() > 0) && !srcAttr->hasProp("ECL"))
     {
         // Store discovered CSV record structure into target logical file.
         StringBuffer recStru;
         partitioners.item(0).getRecordStructure(recStru);
-        IDistributedFile * target = distributedTarget.get();
-        target->setECL(recStru.str());
+        if (recStru.length() > 0)
+        {
+            if (distributedTarget)
+                distributedTarget->setECL(recStru.str());
+        }
     }
 
 }

+ 10 - 10
ecllibrary/std/File.ecl

@@ -631,7 +631,7 @@ EXPORT RemotePull(varstring remoteEspFsURL, varstring sourceLogicalName, varstri
  * Creates a file monitor job in the DFU Server. If an appropriately named file arrives in this interval it will fire
  * the event with the name of the triggering object as the event subtype (see the EVENT function).
  *
- * @param eventName     The user-defined name of the event to fire when the filename appears. This value is used as
+ * @param eventToFire   The user-defined name of the event to fire when the filename appears. This value is used as
  *                      the first parameter to the EVENT function.
  * @param name          The name of the logical file to monitor.  This may contain wildcard characters ( * and ?)
  * @param shotCount     The number of times to generate the event before the monitoring job completes. A value
@@ -640,8 +640,8 @@ EXPORT RemotePull(varstring remoteEspFsURL, varstring sourceLogicalName, varstri
  * @return              The DFU workunit id for the job.
  */
 
-EXPORT varstring fMonitorLogicalFileName(varstring eventName, varstring name, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
-    lib_fileservices.FileServices.fMonitorLogicalFileName(eventName, name, shotCount, espServerIpPort);
+EXPORT varstring fMonitorLogicalFileName(varstring eventToFire, varstring name, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
+    lib_fileservices.FileServices.fMonitorLogicalFileName(eventToFire, name, shotCount, espServerIpPort);
 
 /**
  * Same as fMonitorLogicalFileName, but does not return the DFU Workunit ID.
@@ -649,14 +649,14 @@ EXPORT varstring fMonitorLogicalFileName(varstring eventName, varstring name, in
  * @see fMonitorLogicalFileName
  */
 
-EXPORT MonitorLogicalFileName(varstring eventName, varstring name, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
-    lib_fileservices.FileServices.MonitorLogicalFileName(eventName, name, shotCount, espServerIpPort);
+EXPORT MonitorLogicalFileName(varstring eventToFire, varstring name, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
+    lib_fileservices.FileServices.MonitorLogicalFileName(eventToFire, name, shotCount, espServerIpPort);
 
 /**
  * Creates a file monitor job in the DFU Server. If an appropriately named file arrives in this interval it will fire
  * the event with the name of the triggering object as the event subtype (see the EVENT function).
  *
- * @param eventName     The user-defined name of the event to fire when the filename appears. This value is used as
+ * @param eventToFire   The user-defined name of the event to fire when the filename appears. This value is used as
  *                      the first parameter to the EVENT function.
  * @param ip            The the IP address for the file to monitor. This may be omitted if the filename parameter
  *                      contains a complete URL.
@@ -668,8 +668,8 @@ EXPORT MonitorLogicalFileName(varstring eventName, varstring name, integer4 shot
  * @return              The DFU workunit id for the job.
  */
 
-EXPORT varstring fMonitorFile(varstring eventName, varstring ip, varstring filename, boolean subDirs=FALSE, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
-    lib_fileservices.FileServices.fMonitorFile(eventName, ip, filename, subDirs, shotCount, espServerIpPort);
+EXPORT varstring fMonitorFile(varstring eventToFire, varstring ip, varstring filename, boolean subDirs=FALSE, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
+    lib_fileservices.FileServices.fMonitorFile(eventToFire, ip, filename, subDirs, shotCount, espServerIpPort);
 
 /**
  * Same as fMonitorFile, but does not return the DFU Workunit ID.
@@ -677,8 +677,8 @@ EXPORT varstring fMonitorFile(varstring eventName, varstring ip, varstring filen
  * @see fMonitorFile
  */
 
-EXPORT MonitorFile(varstring eventName, varstring ip, varstring filename, boolean subdirs=FALSE, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
-    lib_fileservices.FileServices.MonitorFile(eventName, ip, filename, subdirs, shotCount, espServerIpPort);
+EXPORT MonitorFile(varstring eventToFire, varstring ip, varstring filename, boolean subdirs=FALSE, integer4 shotCount=1, varstring espServerIpPort=GETENV('ws_fs_server')) :=
+    lib_fileservices.FileServices.MonitorFile(eventToFire, ip, filename, subdirs, shotCount, espServerIpPort);
 
 /**
  * Waits for the specified DFU workunit to finish.

+ 1 - 1
esp/services/ws_access/ws_accessService.cpp

@@ -3448,7 +3448,7 @@ bool Cws_accessEx::onFilePermission(IEspContext &context, IEspFilePermissionRequ
             resp.setFileName(fileName);
             resp.setUserName(userName);
 
-            ISecUser* sec_user = secmgr->findUser(userName);
+            Owned<ISecUser> sec_user = secmgr->findUser(userName);
             if (sec_user)
             {
                 StringBuffer accessStr;

+ 1 - 1
esp/src/eclwatch/WsTopology.js

@@ -38,7 +38,7 @@ define([
                     arrayUtil.forEach(response.TpServiceQueryResponse.ServiceList.TpEspServers.TpEspServer, function (item, idx) {
                         if (lang.exists("TpBindings.TpBinding", item)) {
                             arrayUtil.forEach(item.TpBindings.TpBinding, function (binding, idx) {
-                                if (binding.Name === type) {
+                                if (binding.Service === type) {
                                     retVal = ESPRequest.getURL({
                                         port: binding.Port,
                                         pathname: ""

+ 86 - 56
initfiles/sbin/hpcc-run.sh.in

@@ -31,21 +31,35 @@ print_usage(){
         echo "usage: hpcc-run.sh [-c component] [-a {hpcc-init|dafilesrv}] [-n concurrent] [-s] [-S] {start|stop|restart|status|setup}"
         echo "  -a|--action: HPCC service name. Either hpcc-init (default) or dafilesrv."
         echo "  -c|--comp: HPCC component. For example, mydali, myroxie, mythor, etc."
-        echo "  -n|--concurrent: How many concurrent instances to run. The default is 5."
-        echo "  -S|--sequentially: For the command to run sequentially. i.e. one host a time."
+        echo "  -n|--concurrent: How many concurrent instances to run. The default is equal to the number of nodes present."
+        echo "  -S|--sequentially: For the command to run sequentially. i.e. one host a time. (overrides -n)"
         echo "  -s|--save: Save the result to a file named by ip."
         echo
         end 1
 }
 
 getIPS(){
+    if [ -z "${comp}" ]; then
         IPS=`${INSTALL_DIR}/sbin/configgen -env ${envfile} -machines | awk -F, '{print \$1}'  | sort | uniq`
+    else
+        IPS=`${INSTALL_DIR}/sbin/configgen -env ${envfile} -listall | grep -e "${comp}" | awk -F, '{ print \$3 }' | sort | uniq`
+        if [ -z "${IPS}" ]; then
+            log_failure_msg "Component ${comp} not found"
+            print_usage
+            end 1
+        fi
+    fi
 }
 
 getDali(){
         DIP=`${INSTALL_DIR}/sbin/configgen -env ${envfile} -listall | grep Dali | awk -F, '{print \$3}'  | sort | uniq`
 }
 
+createIPListFile(){
+    _file=$1
+    echo "$IPS" > $_file
+}
+
 createIPListFileExcludeDIP(){
   _file=$1
   echo "$IPS" | grep -v $DIP  > $_file
@@ -88,7 +102,6 @@ doOneIP(){
        echo "$_ip: Cannot Ping host? (Host Alive?)"
        return 1
    fi
-
 }
 
 createScript(){
@@ -115,7 +128,7 @@ if ping -c 1 -w 5 -n \$IP > /dev/null 2>&1; then
               CMD="sudo /etc/init.d/$_action -c $_comp $_cmd"
           fi
           echo "\$IP: Running \$CMD";
-          if [ $run_in_seq -eq 1 ]
+          if [ $concurrent -ne 1 ]
           then
               CMD="\$CMD | tee $hpccStatusFile"
           else
@@ -142,22 +155,20 @@ SCRIPTFILE
 }
 
 runScript() {
-
-   if [ $run_in_seq -eq 0 ] && [ $hasPython -eq 1 ]
-   then
-      eval ${INSTALL_DIR}/sbin/cluster_script.py -f ${scriptFile} "$OPTIONS"
-      rc=$?
-   else
-      if [ $run_in_seq -eq 0 ] 
-      then
-         echo ""
-         echo "Cannot detect python version ${expected_python_version}+. Will run on the cluster hosts sequentially."
-         echo ""
-      fi
-      run_cluster ${scriptFile} 0 $1
-      rc=$?
-   fi
-   rm -rf $scriptFile
+    if [ $concurrent -ne 1 ] && [ $hasPython -eq 1 ]; then
+        OPTIONS="${OPTIONS:+"$OPTIONS "}-n ${concurrent}"
+        eval ${INSTALL_DIR}/sbin/cluster_script.py -f ${scriptFile} "$OPTIONS"
+        rc=$?
+    else
+        if [ $hasPython -eq 0 ]; then
+            echo ""
+            echo "Cannot detect python version ${expected_python_version}+. Will run on the cluster hosts sequentially."
+            echo ""
+        fi
+        run_cluster ${scriptFile} 0 $1
+        rc=$?
+    fi
+    rm -rf $scriptFile
 }
 
 doSetup() {
@@ -165,7 +176,7 @@ doSetup() {
      scriptFile=/tmp/${action}_setup_$$
      createScript $scriptFile $action "setup" $comp
      runScript
-     [ $run_in_seq -eq 0 ] && report "${action} setup"
+     [ $concurrent -ne 1 ] && report "${action} setup"
 }
 
 doStatus() {
@@ -173,7 +184,7 @@ doStatus() {
      scriptFile=/tmp/${action}_status_$$
      createScript $scriptFile $action "status" $comp
      runScript
-     [ $run_in_seq -eq 0 ] && report "${action} status"
+     [ $concurrent -ne 1 ] && report "${action} status"
 }
 
 doStop() {
@@ -181,36 +192,63 @@ doStop() {
     init stop
     scriptFile=/tmp/${action}_stop_$$
     createScript $scriptFile $action "stop" $comp
-    OPTIONS="${OPTIONS:+"$OPTIONS "}-h $IPsExcludeDIP"
-    runScript $IPsExcludeDIP
-    [ $run_in_seq -eq 0 ] && report "${action} stop" $DIP
-
-    doOneIP $DIP $action "stop" $comp || end 0
+    if [ -n "${comp}" ]; then
+        OPTIONS="${OPTIONS:+"$OPTIONS "}-h $IPsFile"
+        runScript $IPsFile
+        report "${action} stop"
+    else
+        OPTIONS="${OPTIONS:+"$OPTIONS "}-h $IPsExcludeDIP"
+        runScript $IPsExcludeDIP
+        report "${action} stop" $DIP
+        doOneIP $DIP $action "stop" $comp
+        if [ "${action}" = "hpcc-init" ]; then
+            echo "Service dafilesrv is still running".
+            echo "To stop it, run \"service dafilesrv stop\"."
+        fi
+    fi
 }
 
 
 doStart() {
     init start
-    doOneIP $DIP $action "start" $comp || end 1  
-
+    if [ -n "${comp}" ]; then
+        startFile=$IPsFile
+    else
+        doOneIP $DIP $action "start" $comp || end 1
+        startFile=$IPsExcludeDIP
+    fi
     echo "$action start in the cluster ..."
     scriptFile=/tmp/${action}_start_$$
     createScript $scriptFile $action "start" $comp
-    OPTIONS="${OPTIONS:+"$OPTIONS "}-h $IPsExcludeDIP"
-    runScript $IPsExcludeDIP
+    OPTIONS="${OPTIONS:+"$OPTIONS "}-h $startFile"
+    runScript $startFile
+    if [ -n "${comp}" ]; then
+        report "${action} start"
+    else
+        report "${action} start" $DIP
+    fi
     [ $rc -ne 0 ] && end $rc
-    [ $run_in_seq -eq 0 ] && report "${action} start" $DIP
 }
 
 init() {
-     dateTime=$(date +"%Y%m%d_%H%M%S")
-     reportDir=/var/log/HPCCSystems/cluster/$1/${dateTime}
-     mkdir -p $reportDir
-     chown -R ${user}:${user} ${reportDir}/..
+    getIPS
+    getDali
+    IPsFile=/tmp/ip_list_$$
+    createIPListFile $IPsFile
+    IPsExcludeDIP=/tmp/ip_list_exclude_dip_$$
+    createIPListFileExcludeDIP $IPsExcludeDIP
+
+    if [ $concurrent -eq 0 ]; then
+        concurrent=$( wc -l $IPsFile | awk '{ print $1 }')
+    fi
+
+    dateTime=$(date +"%Y%m%d_%H%M%S")
+    reportDir=/var/log/HPCCSystems/cluster/$1/${dateTime}
+    mkdir -p $reportDir
+    chown -R ${user}:${user} ${reportDir}/..
 }
 
 report() {
-
     _title=$1
     hostToSkip=$2
 
@@ -221,8 +259,6 @@ report() {
        cat ${reportDir}/$_host | grep -v "ervice dafilesrv" | grep -v -e "^[[:space:]]*$" 
        echo
     done
-
-
 }
 
 
@@ -235,6 +271,7 @@ end() {
         rm -rf $reportDir
    fi
    [ -e "${IPsExcludeDIP}" ] &&  rm -rf ${IPsExcludeDIP}
+   [ -e "${IPsFile}" ] && rm -rf ${IPsFile}
    exit $1
 }
 
@@ -257,17 +294,12 @@ envfile=$configs/$environment
 configfile=${CONFIG_DIR}/${ENV_CONF_FILE}
 
 
-getIPS
-getDali
-IPsExcludeDIP=/tmp/ip_list_exclude_dip_$$
-createIPListFileExcludeDIP $IPsExcludeDIP
-
 hasPython=0
 save=0
-run_in_seq=0
 expected_python_version=2.6
 is_python_installed $expected_python_version
 [ $? -eq 0 ] && hasPython=1
+concurrent=0
 
 OPTIONS="-e $configfile -s ${SECTION:-DEFAULT}"
 
@@ -281,20 +313,18 @@ while true ; do
         -a|--action) action=$2
             shift 2 ;;
         -n|--concurrent) 
-            if [ -n "$2" ] && [[ $2 =~ ^[0-9]+$ ]]
-            then
-                [ $2 -gt 0 ] && OPTIONS="${OPTIONS:+"$OPTIONS "}-n $2"
-
+            if [ -n "$2" ] && [[ $2 =~ ^[1-9][0-9]*$ ]] && [ $concurrent -ne 1 ]; then
+                concurrent=$2
             fi
-
             shift 2 ;;
-        -S|--sequentially) run_in_seq=1
-                   RUN_CLUSTER_DISPLAY_OUTPUT=TRUE
-                   shift ;;
+        -S|--sequentially)
+            concurrent=1
+            RUN_CLUSTER_DISPLAY_OUTPUT=TRUE
+            shift ;;
         -s|--save) save=1
-                   shift ;;
+            shift ;;
         -h|--help) print_usage
-                   shift ;;
+            shift ;;
         --) shift ; break ;;
         *) print_usage ;;
     esac
@@ -320,7 +350,7 @@ for arg; do
         stop) 
             doStop
             ;;
-        restart) 
+        restart)
             doStop
             doStart
             ;;

文件差异内容过多而无法显示
+ 4 - 4
plugins/fileservices/fileservices.cpp


+ 51 - 4
system/mp/mpcomm.cpp

@@ -60,8 +60,11 @@
 #define CANCELTIMEOUT       1000        // 1 sec
 
 #define CONNECT_TIMEOUT         (5*60*1000) // 5 mins
-#define CONNECT_READ_TIMEOUT    (3*60*1000) // 3 min    
-#define CONFIRM_TIMEOUT         (CONNECT_READ_TIMEOUT/2)    
+#define CONNECT_READ_TIMEOUT    (3*60*1000) // 3 mins
+#define CONNECT_TIMEOUT_INTERVAL 5000 // 5 secs
+#define CONFIRM_TIMEOUT         (CONNECT_READ_TIMEOUT/2) // 1.5 mins
+#define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs
+#define CONFIRM_TRACESLOW_THRESHOLD 1000 // 1 sec
 
 #define VERIFY_DELAY            (1*60*1000)  // 1 Minute
 #define VERIFY_TIMEOUT          (1*60*1000)  // 1 Minute
@@ -607,6 +610,50 @@ public:
 };
 
 
+void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs)
+{
+    dbgassertex(timeoutChkIntervalMs < timeoutMs);
+    StringBuffer epStr;
+    CCycleTimer readTmsTimer;
+    unsigned intervalTimeoutMs = timeoutChkIntervalMs;
+    loop
+    {
+        try
+        {
+            sock->readtms(dst, minSize, maxSize, sizeRead, intervalTimeoutMs);
+            break;
+        }
+        catch (IJSOCK_Exception *e)
+        {
+            if (JSOCKERR_timeout_expired != e->errorCode())
+                throw;
+            unsigned elapsedMs = readTmsTimer.elapsedMs();
+            if (elapsedMs >= timeoutMs)
+                throw;
+            unsigned remainingMs = timeoutMs-elapsedMs;
+            if (remainingMs < timeoutChkIntervalMs)
+                intervalTimeoutMs = remainingMs;
+            if (0 == epStr.length())
+            {
+                SocketEndpoint ep;
+                sock->getPeerEndpoint(ep);
+                ep.getUrlStr(epStr);
+            }
+            WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs);
+        }
+    }
+    if (readTmsTimer.elapsedMs() >= CONFIRM_TRACESLOW_THRESHOLD)
+    {
+        if (0 == epStr.length())
+        {
+            SocketEndpoint ep;
+            sock->getPeerEndpoint(ep);
+            ep.getUrlStr(epStr);
+        }
+        WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs());
+    }
+}
+
 class CMPPacketReader;
 
 class CMPChannel: public CInterface
@@ -679,7 +726,7 @@ protected: friend class CMPPacketReader;
 #endif
                 size32_t reply;
                 size32_t rd;
-                newsock->readtms(&reply,sizeof(reply),sizeof(reply),rd,CONNECT_READ_TIMEOUT); 
+                traceSlowReadTms("MP: connect to", newsock, &reply, sizeof(reply), sizeof(reply), rd, CONNECT_READ_TIMEOUT, CONNECT_TIMEOUT_INTERVAL);
 #ifdef _FULLTRACE
                 LOG(MCdebugInfo(100), unknownJob, "MP: connect after socket read %d",reply);
 #endif
@@ -1664,7 +1711,7 @@ int CMPConnectThread::run()
                 SocketEndpoint remoteep;
                 SocketEndpoint hostep;
                 SocketEndpointV4 id[2];
-                sock->readtms(&id[0],sizeof(id),sizeof(id),rd,CONFIRM_TIMEOUT); 
+                traceSlowReadTms("MP: initial accept packet from", sock, &id[0], sizeof(id), sizeof(id), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL);
                 if (rd != sizeof(id))
                 {
                     StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from ");

文件差异内容过多而无法显示
+ 375 - 421
system/security/LdapSecurity/ldapconnection.cpp


+ 3 - 1
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -3399,7 +3399,7 @@ public:
                     throwUnexpected();
             }
         }
-        joinhelper->init(strmL, strmR, ::queryRowAllocator(inL), ::queryRowAllocator(inR), ::queryRowMetaData(inL), &abortSoon);
+        joinhelper->init(strmL, strmR, ::queryRowAllocator(inL), ::queryRowAllocator(inR), ::queryRowMetaData(inL));
         dataLinkStart();
     }
     void stopInput()
@@ -3452,6 +3452,8 @@ public:
             lhsDistributor->abort();
         if (rhsDistributor)
             rhsDistributor->abort();
+        if (joinhelper)
+            joinhelper->stop();
     }
     CATCH_NEXTROW()
     {

+ 7 - 1
thorlcr/activities/join/thjoinslave.cpp

@@ -306,7 +306,7 @@ public:
         }
         if (!leftStream.get()||!rightStream.get())
             throw MakeActivityException(this, TE_FailedToStartJoinStreams, "Failed to start join streams");
-        joinhelper->init(leftStream, rightStream, ::queryRowAllocator(inputs.item(0)),::queryRowAllocator(inputs.item(1)),::queryRowMetaData(inputs.item(0)), &abortSoon);
+        joinhelper->init(leftStream, rightStream, ::queryRowAllocator(inputs.item(0)),::queryRowAllocator(inputs.item(1)),::queryRowMetaData(inputs.item(0)));
     }
     void stopLeftInput()
     {
@@ -336,6 +336,12 @@ public:
         else
             stopRightInput();
     }
+    void abort()
+    {
+        CSlaveActivity::abort();
+        if (joinhelper)
+            joinhelper->stop();
+    }
     void stop() 
     {
         stopLeftInput();

+ 3 - 1
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -1712,7 +1712,7 @@ protected:
                     default:
                         throwUnexpected();
                 }
-                joinHelper->init(left, rightStream, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL), &abortSoon);
+                joinHelper->init(left, rightStream, leftAllocator, rightAllocator, ::queryRowMetaData(leftITDL));
                 return;
             }
             else
@@ -1879,6 +1879,8 @@ public:
             rhsDistributor->abort();
         if (lhsDistributor)
             lhsDistributor->abort();
+        if (joinHelper)
+            joinHelper->stop();
     }
     virtual void stop()
     {

+ 33 - 25
thorlcr/activities/msort/thsortu.cpp

@@ -319,7 +319,7 @@ class CJoinHelper : public CSimpleInterface, implements IJoinHelper
     OwnedConstThorRow defaultRight;
     Linked<IRowStream> strmL;
     Linked<IRowStream> strmR;
-    bool *abort;
+    bool abort;
     bool nextleftgot;
     bool nextrightgot;
     unsigned atmost;
@@ -390,7 +390,6 @@ public:
             IEngineRowAllocator *_allocatorL,
             IEngineRowAllocator *_allocatorR,
             IOutputMetaData * _outputmeta,
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept)
     {
         //DebugBreak();
@@ -462,7 +461,7 @@ public:
             size32_t sz = helper->createDefaultRight(r);
             defaultRight.setown(r.finalizeRowClear(sz));
         }
-        abort = _abort;
+        abort = false;
         atmost = helper->getJoinLimit();
         if (atmost)
             assertex(!rightouter);
@@ -803,7 +802,7 @@ public:
     retry:
             ret.clear();
             do {
-                if (*abort) 
+                if (abort)
                     return NULL;
                 switch (state) {
                 case JSonfail:
@@ -938,7 +937,11 @@ public:
                             if (cmp>0) 
                                 state = JSrightgrouponly;
                             else if (cmp<0) 
+                            {
+                                activity.logRow("prev: ", *allocatorL->queryOutputMeta(), prevleft);
+                                activity.logRow("next: ", *allocatorL->queryOutputMeta(), nextleft);
                                 throw MakeStringException(-1,"JOIN LHS not in sorted order");
+                            }
                         }
                         else
                             state = JSrightgrouponly;
@@ -961,7 +964,7 @@ public:
         CATCH_MEMORY_EXCEPTIONS
         return ret.getClear();;
     }
-    virtual void stop() { }
+    virtual void stop() { abort = true; }
     virtual rowcount_t getLhsProgress() const { return lhsProgressCount; }
     virtual rowcount_t getRhsProgress() const { return rhsProgressCount; }
 };
@@ -989,7 +992,7 @@ class SelfJoinHelper: public CSimpleInterface, implements IJoinHelper
     OwnedConstThorRow defaultLeft;
     OwnedConstThorRow defaultRight;
     Owned<IRowStream> strm;
-    bool *abort;
+    bool abort;
     unsigned atmost;
     rowcount_t progressCount;
     unsigned joinCounter;
@@ -1021,7 +1024,6 @@ public:
             IEngineRowAllocator *_allocatorL,
             IEngineRowAllocator *,
             IOutputMetaData * _outputmeta,
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept)
     {
         //DebugBreak();
@@ -1066,7 +1068,7 @@ public:
             size32_t sz = helper->createDefaultRight(r);
             defaultRight.setown(r.finalizeRowClear(sz));
         }
-        abort = _abort;
+        abort = false;
         atmost = helper->getJoinLimit();
         if (atmost)
             assertex(!rightouter);
@@ -1109,7 +1111,7 @@ public:
 retry:
             ret.clear();
             do {
-                if (*abort) 
+                if (abort)
                     return NULL;
                 switch (state) {
                 case JSonfail:
@@ -1300,7 +1302,7 @@ retry:
         CATCH_MEMORY_EXCEPTIONS
         return ret.getClear();
     }
-    virtual void stop() { }
+    virtual void stop() { abort = true; }
     virtual rowcount_t getLhsProgress() const { return progressCount; }
     virtual rowcount_t getRhsProgress() const { return progressCount; }
 };
@@ -1590,11 +1592,10 @@ public:
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept
         )
     {
-        if (!jhelper->init(strmL,strmR,allocatorL,allocatorR,outputmetaL,_abort,this))
+        if (!jhelper->init(strmL,strmR,allocatorL,allocatorR,outputmetaL,this))
             return false;
         if (rightouter) {
             RtlDynamicRowBuilder r(allocatorL);
@@ -1622,6 +1623,10 @@ public:
             return jhelper->getRhsProgress();
         return 0;
     }
+    virtual void stop()
+    {
+        jhelper->stop();
+    }
 };
 
 
@@ -1750,11 +1755,10 @@ public:
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept
         )
     {
-        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,_abort,this))
+        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,this))
             return false;
         stopped = false;
         for (unsigned i=0;i<numworkers;i++)
@@ -1798,6 +1802,7 @@ public:
         if (stopped)
             return;
         stopped = true;
+        CMultiCoreJoinHelperBase::stop();
         for (unsigned i=0;i<numworkers;i++)
             workers[i]->rowStream->stop();
     }
@@ -1926,20 +1931,12 @@ public:
 
     ~CMultiCoreUnorderedJoinHelper()
     {
-        if (!reader.join(1000*60))
-            ERRLOG("~CMulticoreUnorderedJoinHelper reader join timed out");
-        for (unsigned i=0;i<numworkers;i++) {
-            if (!workers[i]->join(1000*60))
-                ERRLOG("~CMulticoreUnorderedJoinHelper worker[%d] join timed out",i);
-        }
-        while (workqueue.ordinality())
-            delete workqueue.dequeue();
+        stop();
         for (unsigned i=0;i<numworkers;i++) 
             delete workers[i];
         delete [] workers;
         ::Release(jhelper);
     }
-
     void stopWorkers()
     {
         for (unsigned i=0;i<numworkers;i++)
@@ -1954,11 +1951,10 @@ public:
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *_mcoreintercept
         )
     {
-        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,_abort,this))
+        if (!CMultiCoreJoinHelperBase::init(strmL,strmR,allocatorL,allocatorR,outputmetaL,this))
             return false;
         workqueue.setLimit(numworkers+1);
         rowWriter.setown(multiWriter->getWriter());
@@ -1985,6 +1981,18 @@ public:
     }
     virtual void stop()
     {
+        CMultiCoreJoinHelperBase::stop();
+        workqueue.stop();
+        multiWriter->abort();
+        if (!reader.join(1000*60))
+            ERRLOG("~CMulticoreUnorderedJoinHelper reader join timed out");
+        for (unsigned i=0;i<numworkers;i++)
+        {
+            if (!workers[i]->join(1000*60))
+                ERRLOG("~CMulticoreUnorderedJoinHelper worker[%d] join timed out",i);
+        }
+        while (workqueue.ordinality())
+            delete workqueue.dequeueNow();
     }
 
 // IMulticoreIntercept impl.

+ 0 - 1
thorlcr/activities/msort/thsortu.hpp

@@ -53,7 +53,6 @@ interface IJoinHelper: public IRowStream
             IEngineRowAllocator *allocatorL,
             IEngineRowAllocator *allocatorR,
             IOutputMetaData * outputmetaL,   // for XML output 
-            bool *_abort,
             IMulticoreIntercept *mcoreintercept=NULL
         )=0;
 

+ 7 - 1
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -180,9 +180,15 @@ public:
         strm.setown(isLightweight? doLightweightSelfJoin() : (isLocal ? doLocalSelfJoin() : doGlobalSelfJoin()));
         assertex(strm);
 
-        joinhelper->init(strm, NULL, ::queryRowAllocator(inputs.item(0)), ::queryRowAllocator(inputs.item(0)), ::queryRowMetaData(inputs.item(0)), &abortSoon);
+        joinhelper->init(strm, NULL, ::queryRowAllocator(inputs.item(0)), ::queryRowAllocator(inputs.item(0)), ::queryRowMetaData(inputs.item(0)));
     }
 
+    virtual void abort()
+    {
+        CSlaveActivity::abort();
+        if (joinhelper)
+            joinhelper->stop();
+    }
     virtual void stop()
     {
         if (input)

+ 33 - 0
thorlcr/graph/thgraph.cpp

@@ -2909,6 +2909,39 @@ void CActivityBase::kill()
     ownedResults.clear();
 }
 
+bool CActivityBase::appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const
+{
+    if (!meta.hasXML())
+    {
+        target.append("<xml-unavailable/>");
+        return false;
+    }
+
+    try
+    {
+        CommonXmlWriter xmlWrite(XWFnoindent);
+        meta.toXML((byte *) row, xmlWrite);
+        target.append(xmlWrite.str());
+        return true;
+    }
+    catch (IException * e)
+    {
+        e->Release();
+        target.append("<invalid-row/>");
+        return false;
+    }
+}
+
+void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row)
+{
+    bool blindLogging = false; // MORE: should check a workunit/global option
+    if (meta.hasXML() && !blindLogging)
+    {
+        StringBuffer xml;
+        appendRowXml(xml, meta, row);
+    }
+}
+
 void CActivityBase::ActPrintLog(const char *format, ...)
 {
     va_list args;

+ 3 - 0
thorlcr/graph/thgraph.hpp

@@ -938,6 +938,9 @@ public:
     unsigned queryMaxCores() const { return maxCores; }
     IRowInterfaces *getRowInterfaces();
 
+    bool appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const;
+    void logRow(const char * prefix, IOutputMetaData & meta, const void * row);
+
     virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx) { }
     virtual void clearConnections() { }
     virtual void releaseIOs() { }

+ 23 - 25
thorlcr/thorutil/thbuf.cpp

@@ -1602,11 +1602,8 @@ class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWrite
                 {
                     // NB: allowed to go over limit, by as much as inRows.ordinality()-1
                     rows.appendRows(inRows, true);
-                    if (readerBlocked && (rows.numCommitted() >= readGranularity))
-                    {
-                        emptySem.signal();
-                        readerBlocked = false;
-                    }
+                    if (rows.numCommitted() >= readGranularity)
+                        checkReleaseReader();
                     return;
                 }
                 writersBlocked++;
@@ -1614,6 +1611,22 @@ class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWrite
             fullSem.wait();
         }
     }
+    inline void checkReleaseReader()
+    {
+        if (readerBlocked)
+        {
+            emptySem.signal();
+            readerBlocked = false;
+        }
+    }
+    inline void checkReleaseWriters()
+    {
+        if (writersBlocked)
+        {
+            fullSem.signal(writersBlocked);
+            writersBlocked = 0;
+        }
+    }
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
@@ -1646,11 +1659,7 @@ public:
         {
             rows.flush();
             eow = true;
-            if (readerBlocked)
-            {
-                emptySem.signal();
-                readerBlocked = false;
-            }
+            checkReleaseReader();
         }
     }
 // ISharedWriteBuffer impl.
@@ -1663,16 +1672,8 @@ public:
     {
         CThorArrayLockBlock block(rows);
         eos = true;
-        if (writersBlocked)
-        {
-            fullSem.signal(writersBlocked);
-            writersBlocked = 0;
-        }
-        if (readerBlocked)
-        {
-            emptySem.signal();
-            readerBlocked = false;
-        }
+        checkReleaseWriters();
+        checkReleaseReader();
     }
 // IRowStream impl.
     virtual const void *nextRow()
@@ -1695,11 +1696,7 @@ public:
                         }
                         rows.readBlock(readRows, rowsToRead);
                         rowPos = 0;
-                        if (writersBlocked)
-                        {
-                            fullSem.signal(writersBlocked);
-                            writersBlocked = 0;
-                        }
+                        checkReleaseWriters();
                         break; // fall through to return a row
                     }
                     readerBlocked = true;
@@ -1717,6 +1714,7 @@ public:
     virtual void stop()
     {
         eos = true;
+        checkReleaseWriters();
     }
 };