Quellcode durchsuchen

Merge branch 'candidate-6.0.6'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman vor 8 Jahren
Ursprung
Commit
2ff87d2a62

+ 32 - 31
common/workunit/workunit.cpp

@@ -2665,38 +2665,26 @@ void CWorkUnitFactory::clearAborting(const char *wuid)
     }
 }
 
-bool CWorkUnitFactory::checkAbnormalTermination(const char *wuid, WUState &state, SessionId agent)
+void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent)
 {
-    if (queryDaliServerVersion().compare("2.1")>=0)
+    WARNLOG("reportAbnormalTermination: session stopped unexpectedly: %" I64F "d state: %d", (__int64) agent, (int) state);
+    bool isEcl = false;
+    switch (state)
     {
-        if((agent>0) && querySessionManager().sessionStopped(agent, 0))
-        {
-            bool isEcl = false;
-            switch (state)
-            {
-                case WUStateCompiling:
-                    isEcl = true;
-                    // drop into
-                case WUStateRunning:
-                case WUStateBlocked:
-                    state = WUStateFailed;
-                    break;
-                case WUStateAborting:
-                    state = WUStateAborted;
-                    break;
-                default:
-                    return false;
-            }
-            WARNLOG("checkAbnormalTermination: workunit terminated: %" I64F "d state = %d",(__int64) agent, (int) state);
-            Owned<IWorkUnit> wu = updateWorkUnit(wuid, NULL, NULL);
-            wu->setState(state);
-            Owned<IWUException> e = wu->createException();
-            e->setExceptionCode(isEcl ? 1001 : 1000);
-            e->setExceptionMessage(isEcl ? "EclServer terminated unexpectedly" : "Workunit terminated unexpectedly");
-            return true;
-        }
+        case WUStateAborting:
+            state = WUStateAborted;
+            break;
+        case WUStateCompiling:
+            isEcl = true;
+            // drop into
+        default:
+            state = WUStateFailed;
     }
-    return false;
+    Owned<IWorkUnit> wu = updateWorkUnit(wuid, NULL, NULL);
+    wu->setState(state);
+    Owned<IWUException> e = wu->createException();
+    e->setExceptionCode(isEcl ? 1001 : 1000);
+    e->setExceptionMessage(isEcl ? "EclServer terminated unexpectedly" : "Workunit terminated unexpectedly");
 }
 
 static CriticalSection deleteDllLock;
@@ -3019,6 +3007,8 @@ public:
         Owned<IRemoteConnection> conn = sdsManager->connect(wuRoot.str(), session, 0, SDS_LOCK_TIMEOUT);
         if (conn)
         {
+            SessionId agent = -1;
+            bool agentSessionStopped = false;
             unsigned start = msTick();
             loop
             {
@@ -3046,13 +3036,24 @@ public:
                 case WUStateDebugRunning:
                 case WUStateBlocked:
                 case WUStateAborting:
-                    SessionId agent = conn->queryRoot()->getPropInt64("@agentSession", -1);
-                    if (checkAbnormalTermination(wuid, ret, agent))
+                    if (agentSessionStopped)
                     {
+                        reportAbnormalTermination(wuid, ret, agent);
                         return ret;
                     }
+                    if (queryDaliServerVersion().compare("2.1")>=0)
+                    {
+                        agent = conn->queryRoot()->getPropInt64("@agentSession", -1);
+                        if((agent>0) && querySessionManager().sessionStopped(agent, 0))
+                        {
+                            agentSessionStopped = true;
+                            conn->reload();
+                            continue;
+                        }
+                    }
                     break;
                 }
+                agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
                 unsigned waited = msTick() - start;
                 if (timeout==-1)
                 {

+ 1 - 1
common/workunit/workunit.ipp

@@ -630,7 +630,7 @@ public:
     }
 
 protected:
-    bool checkAbnormalTermination(const char *wuid, WUState &state, SessionId agent);
+    void reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent);
 
     // These need to be implemented by the derived classes
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0;

+ 17 - 2
esp/src/eclwatch/InfoGridWidget.js

@@ -137,7 +137,9 @@ define([
                                     case "Error":
                                         domClass.add(node, "ErrorCell");
                                         break;
-
+                                    case "Alert":
+                                        domClass.add(node, "AlertCell");
+                                        break;
                                     case "Warning":
                                         domClass.add(node, "WarningCell");
                                         break;
@@ -318,6 +320,8 @@ define([
                     var severity = this.store.getValue(item, "Severity", null);
                     if (severity == "Error") {
                         row.customStyles += "background-color: red;";
+                    } else if (severity === "Alert") {
+                        row.customStyles += "background-color: #febe47;";
                     } else if (severity == "Warning") {
                         row.customStyles += "background-color: yellow;";
                     }
@@ -370,7 +374,8 @@ define([
                     error: 0,
                     warning: 0,
                     errorWarning: 0,
-                    info: 0
+                    info: 0,
+                    alert: 0
                 };
                 arrayUtil.forEach(this.infoData, function (item, idx) {
                     lang.mixin(item, {
@@ -398,6 +403,12 @@ define([
                                 data.push(item);
                             }
                             break;
+                        case "Alert":
+                            this._counts.alert++;
+                            if (errorChecked) {
+                                data.push(item);
+                            }
+                            break;
                     }
                 }, this);
                 this.infoStore.setData(data);
@@ -423,6 +434,10 @@ define([
                         return -1;
                     } else if (r.Severity === "Error") {
                         return 1;
+                    } else if (l.Severity === "Alert") {
+                        return -1;
+                    } else if (r.Severity === "Alert") {
+                        return 1;
                     } else if (l.Severity === "Warning") {
                         return -1;
                     } else if (r.Severity === "Warning") {

+ 4 - 0
esp/src/eclwatch/css/hpcc.css

@@ -1055,6 +1055,10 @@ margin-left:-20px;
     color: white;
 }
 
+.AlertCell{
+    background: #febe47;
+}
+
 .WarningCell{
     background: yellow;
 }

+ 0 - 3
initfiles/bash/etc/init.d/hpcc_common.in

@@ -412,9 +412,6 @@ startCmd() {
     printf "Starting %-21s" "$compName ..."
     log "compType = $compType"
 
-    # use less heap when threaded
-    export MALLOC_ARENA_MAX=8
-
     # Creating logfiles for component
     logDir=$log/${compName}
 

+ 14 - 0
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -1071,6 +1071,20 @@
                 </xs:appinfo>
             </xs:annotation>
         </xs:attribute>
+        <xs:attribute name="udpSnifferReadThreadPriority" type="nonNegativeInteger" use="optional" default="3">
+            <xs:annotation>
+                <xs:appinfo>
+                    <tooltip>If non-zero, run the sniffer read thread at elevated priority level</tooltip>
+                </xs:appinfo>
+            </xs:annotation>
+        </xs:attribute>
+        <xs:attribute name="udpSnifferSendThreadPriority" type="nonNegativeInteger" use="optional" default="3">
+            <xs:annotation>
+                <xs:appinfo>
+                    <tooltip>If non-zero, run the sniffer send thread at elevated priority level</tooltip>
+                </xs:appinfo>
+            </xs:annotation>
+        </xs:attribute>
     </xs:attributeGroup>
   <xs:attributeGroup name="Cache">
     <xs:attribute name="blobCacheMem" type="xs:nonNegativeInteger" use="optional" default="0">

+ 3 - 0
initfiles/sbin/hpcc_setenv.in

@@ -80,6 +80,9 @@ source /etc/profile
 IFS="${OIFS}"
 umask ${OUMASK}
 
+# use less heap when threaded
+export MALLOC_ARENA_MAX=8
+
 PATH_PREFIX=`cat ${HPCC_CONFIG} | sed -n "/\[${SECTION}\]/,/\[/p" | grep "^path *= *" | sed -e 's/^path *= *//'`
 
 export PID=`cat ${HPCC_CONFIG} | sed -n "/\[${SECTION}\]/,/\[/p" | grep "^pid *= *" | sed -e 's/^pid *= *//'`

+ 16 - 2
plugins/cassandra/cassandrawu.cpp

@@ -3536,6 +3536,8 @@ public:
         CassandraStatement statement(prepareStatement("select state, agentSession from workunits where partition=? and wuid=?;"));
         statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
         statement.bindString(1, wuid);
+        SessionId agent = 0;
+        bool agentSessionStopped = false;
         unsigned start = msTick();
         loop
         {
@@ -3572,11 +3574,23 @@ public:
             case WUStateDebugRunning:
             case WUStateBlocked:
             case WUStateAborting:
-                SessionId agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
-                if (agent && checkAbnormalTermination(wuid, state, agent))
+                if (agentSessionStopped)
+                {
+                    reportAbnormalTermination(wuid, state, agent);
                     return state;
+                }
+                if (queryDaliServerVersion().compare("2.1")>=0)
+                {
+                    agent = getUnsignedResult(NULL, cass_row_get_column(row, 1));
+                    if(agent && querySessionManager().sessionStopped(agent, 0))
+                    {
+                        agentSessionStopped = true;
+                        continue;
+                    }
+                }
                 break;
             }
+            agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again
             unsigned waited = msTick() - start;
             if (timeout==-1)
             {

+ 3 - 0
roxie/ccd/ccdmain.cpp

@@ -735,6 +735,9 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                 udpRequestToSendTimeout = 5000;
         }
         // MORE: might want to check socket buffer sizes against sys max here instead of udp threads ?
+        udpSnifferReadThreadPriority = topology->getPropInt("@udpSnifferReadThreadPriority", 3);
+        udpSnifferSendThreadPriority = topology->getPropInt("@udpSnifferSendThreadPriority", 3);
+
         udpMulticastBufferSize = topology->getPropInt("@udpMulticastBufferSize", 262142);
         udpFlowSocketsSize = topology->getPropInt("@udpFlowSocketsSize", 131072);
         udpLocalWriteSocketSize = topology->getPropInt("@udpLocalWriteSocketSize", 1024000);

+ 9 - 5
roxie/ccd/ccdquery.cpp

@@ -1070,18 +1070,20 @@ public:
 
     virtual void beforeDispose()
     {
-        SpinBlock b(queriesCrit);
         // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table. 
         // So only remove from hash table if what we find there matches the item that is being deleted.
-        CQueryFactory *goer = queryMap.getValue(hashValue+channelNo);
+        hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
+        SpinBlock b(queriesCrit);
+        CQueryFactory *goer = queryMap.getValue(hv);
         if (goer == this)
-            queryMap.remove(hashValue+channelNo);
+            queryMap.remove(hv);
     }
 
     static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
     {
+        hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
         SpinBlock b(queriesCrit);
-        CQueryFactory *factory = LINK(queryMap.getValue(hashValue+channelNo));
+        CQueryFactory *factory = LINK(queryMap.getValue(hv));
         if (factory && factory->isAlive())
             return factory;
         else
@@ -1157,6 +1159,7 @@ public:
         }
         if (id)
             hashValue = rtlHash64VStr(id, hashValue);
+        hashValue = rtlHash64VStr("Roxie", hashValue);  // Adds some noise into the hash - otherwise adjacent wuids tend to hash very close together
         if (traceLevel > 8)
             DBGLOG("getQueryHash: %s %" I64F "u from id", id, hashValue);
         if (stateInfo)
@@ -1215,8 +1218,9 @@ public:
                 }
             }
         }
+        hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
         SpinBlock b(queriesCrit);
-        queryMap.setValue(hashValue+channelNo, this);
+        queryMap.setValue(hv, this);
     }
 
     virtual unsigned queryChannel() const

+ 4 - 2
roxie/ccd/ccdqueue.cpp

@@ -673,14 +673,16 @@ void doUnload(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
     unsigned channelNo = header.channel;
     logctx.CTXLOG("Unload received for channel %d", channelNo);
     hash64_t hashValue = header.queryHash;
+    hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
     SpinBlock b(onDemandQueriesCrit);
-    onDemandQueryCache.remove(hashValue+channelNo);
+    onDemandQueryCache.remove(hashValue);
 }
 
 void cacheOnDemandQuery(hash64_t hashValue, unsigned channelNo, IQueryFactory *query)
 {
+    hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
     SpinBlock b(onDemandQueriesCrit);
-    onDemandQueryCache.setValue(hashValue+channelNo, query);
+    onDemandQueryCache.setValue(hashValue, query);
 }
 
 //=================================================================================

+ 3 - 0
roxie/udplib/udplib.hpp

@@ -138,4 +138,7 @@ extern UDPLIB_API unsigned udpInlineCollationPacketLimit;
 extern UDPLIB_API bool udpInlineCollation;
 extern UDPLIB_API bool udpSnifferEnabled;
 extern UDPLIB_API bool udpSendCompletedInData;
+extern UDPLIB_API unsigned udpSnifferReadThreadPriority;
+extern UDPLIB_API unsigned udpSnifferSendThreadPriority;
+
 #endif

+ 2 - 0
roxie/udplib/udpsha.cpp

@@ -41,6 +41,8 @@ unsigned udpTraceLevel = 0;
 unsigned udpTraceCategories = (unsigned) -1;
 unsigned udpFlowSocketsSize = 131072;
 unsigned udpLocalWriteSocketSize = 1024000;
+unsigned udpSnifferReadThreadPriority = 3;
+unsigned udpSnifferSendThreadPriority = 3;
 
 unsigned multicastTTL = 1;
 

+ 18 - 1
roxie/udplib/udptrr.cpp

@@ -450,6 +450,14 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         virtual int run() 
         {
             DBGLOG("UdpReceiver: ReceiveFlowManager started");
+            if (udpSnifferSendThreadPriority)
+            {
+#ifdef __linux__
+                setLinuxThreadPriority(udpSnifferSendThreadPriority);
+#else
+                adjustPriority(1);
+#endif
+            }
             while (running)
             {
                 requestPending.wait();
@@ -501,7 +509,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface
             {
                 StringBuffer ipStr;
                 snifferIP.getIpText(ipStr);
-                DBGLOG("UdpReceiver: receive_sniffer port open %s:%i", ipStr.str(), snifferPort);
+                size32_t actualSize = sniffer_socket->get_receive_buffer_size();
+                DBGLOG("UdpReceiver: receive_sniffer port open %s:%i sockbuffsize=%d actual %d", ipStr.str(), snifferPort, udpFlowSocketsSize, actualSize);
             }
         }
 
@@ -529,6 +538,14 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         virtual int run() 
         {
             DBGLOG("UdpReceiver: sniffer started");
+            if (udpSnifferReadThreadPriority)
+            {
+#ifdef __linux__
+                setLinuxThreadPriority(udpSnifferReadThreadPriority);
+#else
+                adjustPriority(1);
+#endif
+            }
             while (running) 
             {
                 try 

+ 183 - 0
system/jlib/jdebug.cpp

@@ -2125,6 +2125,177 @@ static struct CNtKernelInformation
 } NtKernelFunctions;
 #endif
 
+struct PortStats
+{
+    unsigned port;
+    unsigned drops;
+    unsigned rx_queue;
+};
+typedef MapBetween<unsigned, unsigned, PortStats, PortStats> MapPortToPortStats;
+
+class CUdpStatsReporter
+{
+public:
+    CUdpStatsReporter()
+    {
+        dropsCol = -1;
+        portCol = -1;
+        uidCol = -1;
+        queueCol = -1;
+    }
+
+    bool reportUdpInfo(unsigned traceLevel)
+    {
+#ifdef _WIN32
+        return false;
+#else
+        if (uidCol==-1 && columnNames.length())
+            return false;
+        FILE *netfp = fopen("/proc/net/udp", "r");
+        if (!netfp)
+            return false;
+        char ln[512];
+        // Read header
+        if (!fgets(ln, sizeof(ln), netfp)) {
+            fclose(netfp);
+            return false;
+        }
+        if (!columnNames.length())
+        {
+            columnNames.appendList(ln, " ");
+            ForEachItemInRev(idx, columnNames)
+            {
+                if (streq(columnNames.item(idx), "rem_address"))
+                    columnNames.add("rem_port", idx+1);
+                else if (streq(columnNames.item(idx), "local_address"))
+                    columnNames.add("local_port", idx+1);
+            }
+            ForEachItemIn(idx2, columnNames)
+            {
+                if (streq(columnNames.item(idx2), "drops"))
+                    dropsCol = idx2;
+                else if (streq(columnNames.item(idx2), "local_port"))
+                    portCol = idx2;
+                else if (streq(columnNames.item(idx2), "rx_queue"))
+                    queueCol = idx2;
+                else if (streq(columnNames.item(idx2), "uid"))
+                    uidCol = idx2;
+            }
+            if (portCol == -1 || queueCol == -1 || uidCol == -1)
+            {
+                uidCol = -1;
+                fclose(netfp);
+                return false;
+            }
+        }
+        int myUid = geteuid();
+        while (fgets(ln, sizeof(ln), netfp))
+        {
+            StringArray cols;
+            cols.appendList(ln, " :");
+            if (cols.length() >= columnNames.length() && atoi(cols.item(uidCol))==myUid)
+            {
+                unsigned queue = strtoul(cols.item(queueCol), NULL, 16);
+                unsigned drops = 0;
+                if (dropsCol)
+                    drops =  strtoul(cols.item(dropsCol), NULL, 10);
+                if (queue || drops)
+                {
+                    unsigned port = strtoul(cols.item(portCol), NULL, 16);
+                    if (traceLevel > 0)
+                        DBGLOG("From /proc/net/udp: port %d rx_queue=%u drops=%u", port, queue, drops);
+                    PortStats *ret = map.getValue(port);
+                    if (!ret)
+                    {
+                        PortStats e = {port, 0, 0};
+                        map.setValue(port, e);
+                        ret = map.getValue(port);
+                        assertex(ret);
+                    }
+                    if (queue > ret->rx_queue)
+                    {
+                        DBGLOG("UDP queue: new max rx_queue: port %d rx_queue=%u drops=%u", port, queue, drops);
+                        ret->rx_queue = queue;
+                    }
+                    if (drops > ret->drops)
+                    {
+                        LOG(MCoperatorError, unknownJob, "DROPPED UDP PACKETS: port %d rx_queue=%u (peak %u) drops=%u (total %i)", port, queue, ret->rx_queue, drops-ret->drops, drops);
+                        ret->drops = drops;
+                    }
+                }
+            }
+        }
+        fclose(netfp);
+        return true;
+#endif
+    }
+private:
+    MapPortToPortStats map;
+    StringArray columnNames;
+    int dropsCol;
+    int portCol;
+    int uidCol;
+    int queueCol;
+};
+
+class CSnmpStatsReporter
+{
+public:
+    CSnmpStatsReporter()
+    {
+        inErrorsCol = -1;
+        prevErrors = 0;
+    }
+    bool reportSnmpInfo()
+    {
+#ifdef _WIN32
+        return false;
+#else
+        if (inErrorsCol==-1 && columnNames.length())
+            return false;
+        FILE *netfp = fopen("/proc/net/snmp", "r");
+        if (!netfp)
+            return false;
+        char ln[512];
+        bool ok = false;
+        while (fgets(ln, sizeof(ln), netfp))
+        {
+            if (strncmp(ln, "Udp:", 4)==0)
+            {
+                if (!columnNames.length())
+                {
+                    columnNames.appendList(ln, " ");
+                    ForEachItemIn(idx, columnNames)
+                    {
+                        if (streq(columnNames.item(idx), "InErrors"))
+                            inErrorsCol = idx;
+                    }
+                }
+                if (fgets(ln, sizeof(ln), netfp))
+                {
+                    StringArray cols;
+                    cols.appendList(ln, " ");
+                    if (cols.length() >= columnNames.length())
+                    {
+                        ok = true;
+                        unsigned errors = strtoul(cols.item(inErrorsCol), NULL, 10);
+                        if (errors > prevErrors)
+                            LOG(MCoperatorError, unknownJob, "UDP InErrors: %u (total %u)", errors-prevErrors, errors);
+                        prevErrors = errors;
+                    }
+                }
+                break;
+            }
+        }
+        fclose(netfp);
+        return ok;
+#endif
+    }
+private:
+    StringArray columnNames;
+    int inErrorsCol;
+    unsigned prevErrors;
+};
 
 static class CMemoryUsageReporter: public Thread
 {
@@ -2152,6 +2323,8 @@ static class CMemoryUsageReporter: public Thread
     StringBuffer                   secondaryfs;
     CriticalSection                sect; // for getSystemTraceInfo
 
+    CSnmpStatsReporter             snmpStats;
+    CUdpStatsReporter              udpStats;
 
 public:
     CMemoryUsageReporter(unsigned _interval, PerfMonMode _traceMode, IPerfMonHook * _hook, bool printklog)
@@ -2436,6 +2609,11 @@ public:
     {
         StringBuffer str;
         getSystemTraceInfo(str, traceMode&~PerfMonExtended); // initializes the values so that first one we print is meaningful rather than always saying PU=0%
+        if (traceMode&PerfMonUDP)
+        {
+            snmpStats.reportSnmpInfo();
+            udpStats.reportUdpInfo(0);
+        }
         CTimeMon tm(NAMEDCOUNTPERIOD*1000);
         while (!term) {
             if (sem.wait(interval))
@@ -2449,6 +2627,11 @@ public:
                 tm.reset(NAMEDCOUNTPERIOD*1000);
             }
 #endif
+            if (traceMode&PerfMonUDP)
+            {
+                snmpStats.reportSnmpInfo();
+                udpStats.reportUdpInfo(0);
+            }
             if(traceMode&&str.length()) {
                 LOG(MCdebugInfo, unknownJob, "SYS: %s", str.str());
 #ifndef _WIN32

+ 4 - 2
system/jlib/jdebug.hpp

@@ -298,11 +298,13 @@ enum
     PerfMonPackets   = 0x02,
     PerfMonDiskUsage = 0x04,
     //default and full modes:
-    PerfMonExtended  = 0x08,   
+    PerfMonExtended  = 0x08,
+    // UDP packet loss tracing
+    PerfMonUDP       = 0x10,
 #ifdef _WIN32
     PerfMonStandard  = PerfMonProcMem
 #else
-    PerfMonStandard  = PerfMonProcMem|PerfMonExtended
+    PerfMonStandard  = PerfMonProcMem|PerfMonExtended|PerfMonUDP
 #endif
 
 };

+ 2 - 2
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -92,10 +92,10 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
             csvSplitter.init(activity.helper->getMaxColumns(), csvInfo, activity.csvQuote, activity.csvSeparate, activity.csvTerminate, activity.csvEscape);
             maxRowSize = activity.getOptInt(OPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
         }
-        virtual void setPart(IPartDescriptor *partDesc, unsigned partNoSerialized)
+        virtual void setPart(IPartDescriptor *partDesc)
         {
             inputCRC.reset();
-            CDiskPartHandlerBase::setPart(partDesc, partNoSerialized);
+            CDiskPartHandlerBase::setPart(partDesc);
         }
         virtual void open() 
         {

+ 3 - 3
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -780,7 +780,7 @@ public:
         unsigned part = 0;
         while (!abortSoon && part<partDescs.ordinality())
         {
-            partHandler->setPart(&partDescs.item(part), part);
+            partHandler->setPart(&partDescs.item(part));
             ++part;
             loop
             {
@@ -908,7 +908,7 @@ public:
             unsigned part = 0;
             while (!abortSoon && part<partDescs.ordinality())
             {
-                partHandler->setPart(&partDescs.item(part), part);
+                partHandler->setPart(&partDescs.item(part));
                 ++part;
                 loop {
                     OwnedConstThorRow nextrow = partHandler->nextRow();
@@ -1023,7 +1023,7 @@ public:
                 unsigned part = 0;
                 while (!abortSoon && part<partDescs.ordinality())
                 {
-                    partHandler->setPart(&partDescs.item(part), part);
+                    partHandler->setPart(&partDescs.item(part));
                     ++part;
                     loop
                     {

+ 14 - 13
thorlcr/activities/selfjoin/thselfjoinslave.cpp

@@ -93,12 +93,6 @@ private:
         return sorter->startMerge(totalrows);
     }
 
-    IRowStream * doLightweightSelfJoin()
-    {
-        IRowStream *ret = LINK(inputStream);
-        return ret;
-    }
-
 public:
     SelfJoinSlaveActivity(CGraphElementBase *_container, bool _isLocal, bool _isLightweight)
         : CSlaveActivity(_container), spillStats(spillStatistics)
@@ -176,8 +170,14 @@ public:
             CriticalBlock b(joinHelperCrit);
             joinhelper.setown(createSelfJoinHelper(*this, helper, this, hintparallelmatch, hintunsortedoutput));
         }
-        strm.setown(isLightweight? doLightweightSelfJoin() : (isLocal ? doLocalSelfJoin() : doGlobalSelfJoin()));
-        assertex(strm);
+        if (isLightweight)
+            strm.set(inputStream);
+        else
+        {
+            strm.setown(isLocal ? doLocalSelfJoin() : doGlobalSelfJoin());
+            assertex(strm);
+            // NB: PARENT::stop() will now have been called
+        }
 
         joinhelper->init(strm, NULL, ::queryRowAllocator(queryInput(0)), ::queryRowAllocator(queryInput(0)), ::queryRowMetaData(queryInput(0)));
     }
@@ -201,13 +201,14 @@ public:
                 CriticalBlock b(joinHelperCrit);
                 joinhelper.clear();
             }
-            if (strm)
-            {
+            if (isLightweight)
+                PARENT::stop();
+            else if (strm) // if !isLightWeight, PARENT::stop() will have been called in start()
                 strm->stop();
-                strm.clear();
-            }
+            strm.clear();
         }
-        PARENT::stop();
+        else
+            PARENT::stop();
     }
     
     CATCH_NEXTROW()

+ 2 - 2
thorlcr/activities/thactivityutil.cpp

@@ -860,7 +860,7 @@ IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPar
             else
             {
                 eof = false;
-                partHandler->setPart(&partDescs.item(0), 0);
+                partHandler->setPart(&partDescs.item(0));
             }
         }
         virtual void stop()
@@ -898,7 +898,7 @@ IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPar
                     eof = true;
                     return NULL;
                 }
-                partHandler->setPart(&partDescs.item(part), part);
+                partHandler->setPart(&partDescs.item(part));
             }
         }
     };

+ 1 - 1
thorlcr/activities/thactivityutil.ipp

@@ -44,7 +44,7 @@ public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
     virtual ~CPartHandler() { }
-    virtual void setPart(IPartDescriptor *partDesc, unsigned partNoSerialized) = 0;
+    virtual void setPart(IPartDescriptor *partDesc) = 0;
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info, IPartDescriptor *partDesc) { }
     virtual void stop() = 0;
 };

+ 3 - 3
thorlcr/activities/thdiskbaseslave.cpp

@@ -74,7 +74,7 @@ CDiskPartHandlerBase::CDiskPartHandlerBase(CDiskReadSlaveActivityBase &_activity
 
 }
 
-void CDiskPartHandlerBase::setPart(IPartDescriptor *_partDesc, unsigned partNoSerialized)
+void CDiskPartHandlerBase::setPart(IPartDescriptor *_partDesc)
 {
     partDesc.set(_partDesc);
     compressed = partDesc->queryOwner().isCompressed(&blockCompressed);
@@ -83,6 +83,7 @@ void CDiskPartHandlerBase::setPart(IPartDescriptor *_partDesc, unsigned partNoSe
     checkFileCrc = activity.checkFileCrc?partDesc->getCrc(storedCrc):false;
     fileBaseOffset = partDesc->queryProperties().getPropInt64("@offset");
 
+    which = partDesc->queryPartIndex();
     if (0 != (activity.helper->getFlags() & TDRfilenamecallback)) // only get/serialize if using virtual file name fields
     {
         IFileDescriptor &fileDesc = partDesc->queryOwner();
@@ -91,7 +92,7 @@ void CDiskPartHandlerBase::setPart(IPartDescriptor *_partDesc, unsigned partNoSe
         {
             unsigned subfile;
             unsigned lnum;
-            if (superFDesc->mapSubPart(partNoSerialized, subfile, lnum))
+            if (superFDesc->mapSubPart(which, subfile, lnum))
                 logicalFilename.set(activity.queryLogicalFilename(subfile));
             else
                 logicalFilename.set("UNKNOWN"); // shouldn't happen, but will prevent query fault if did.
@@ -105,7 +106,6 @@ void CDiskPartHandlerBase::setPart(IPartDescriptor *_partDesc, unsigned partNoSe
         logicalFilename.set(activity.logicalFilename);
     eoi = false;
     firstInGroup = true;
-    which = partDesc->queryPartIndex();
 
     activity.helper->setCallback(this); // NB, if we were to have >1 of these objects, would prob. need separate helper instances also
     open();

+ 1 - 1
thorlcr/activities/thdiskbaseslave.ipp

@@ -63,7 +63,7 @@ public:
     virtual const char * queryLogicalFilename(const void * row);
 
 // CPartHandler
-    virtual void setPart(IPartDescriptor *_partDesc, unsigned partNoSerialized);
+    virtual void setPart(IPartDescriptor *_partDesc);
 
     virtual offset_t getLocalOffset()=0;
 };