Sfoglia il codice sorgente

Merge branch 'candidate-6.0.2'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 anni fa
parent
commit
e95c9b27d9

+ 1 - 0
cmake_modules/commonSetup.cmake

@@ -92,6 +92,7 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
   option(GENERATE_COVERAGE_INFO "Generate coverage info for gcov" OFF)
   option(USE_SIGNED_CHAR "Build system with default char type is signed" OFF)
   option(USE_UNSIGNED_CHAR "Build system with default char type is unsigned" OFF)
+  option(USE_MYSQL "Enable mysql support" ON)
   # Generates code that is more efficient, but will cause problems if target platforms do not support it.
   if (CMAKE_SIZEOF_VOID_P EQUAL 8)
     option(USE_INLINE_TSC "Inline calls to read TSC (time stamp counter)" ON)

+ 1 - 1
ecl/hqlcpp/hqlcpp.cpp

@@ -4406,7 +4406,7 @@ void HqlCppTranslator::buildTempExpr(BuildCtx & ctx, BuildCtx & declareCtx, CHql
         }
     }
 
-    typemod_t modifier = (&ctx != &declareCtx) ? typemod_member : typemod_none;
+    typemod_t modifier = !ctx.isSameLocation(declareCtx) ? typemod_member : typemod_none;
     OwnedITypeInfo type = makeModifier(expr->getType(), modifier);
     BuildCtx subctx(ctx);
     switch (type->getTypeCode())

+ 2 - 2
ecl/hqlcpp/hqlhtcpp.cpp

@@ -1529,7 +1529,7 @@ BoundRow * HqlCppTranslator::declareStaticRow(BuildCtx & ctx, IHqlExpression * e
     if (maxRecordSize > options.maxLocalRowSize)
         getInvariantMemberContext(ctx, &declarectx, NULL, false, false);
 
-    if (declarectx != &ctx)
+    if (!declarectx->isSameLocation(ctx))
         rowType.setown(makeModifier(rowType.getClear(), typemod_member, NULL));
     else
         declarectx->setNextPriority(BuildCtx::OutermostScopePrio);
@@ -1552,7 +1552,7 @@ BoundRow * HqlCppTranslator::declareTempRow(BuildCtx & ctx, BuildCtx & codectx,
     bool createRowDynamically = tempRowRequiresFinalize(record) || (maxRecordSize > options.maxLocalRowSize);
     if (createRowDynamically)
     {
-        return declareLinkedRow(ctx, expr, &ctx != &codectx);
+        return declareLinkedRow(ctx, expr, !ctx.isSameLocation(codectx));
     }
     else
     {

+ 11 - 0
ecl/hqlcpp/hqlstmt.cpp

@@ -669,6 +669,17 @@ bool BuildCtx::hasAssociation(HqlExprAssociation & search, bool unconditional)
 }
 
 
+bool BuildCtx::isSameLocation(const BuildCtx & other) const
+{
+    if (this == &other)
+        return true;
+    if (curStmts != other.curStmts)
+        return false;
+    if (nextPriority != other.nextPriority)
+        return false;
+    return true;
+}
+
 bool BuildCtx::isOuterContext() const
 {
     HqlStmts * searchStmts = curStmts;

+ 1 - 0
ecl/hqlcpp/hqlstmt.hpp

@@ -134,6 +134,7 @@ public:
     HqlExprAssociation *        associateExpr(IHqlExpression * represents, IHqlExpression * expr);
     HqlExprAssociation *        associateExpr(IHqlExpression * represents, const CHqlBoundExpr & bound);
     bool                        hasAssociation(HqlExprAssociation & search, bool unconditional);
+    bool                        isSameLocation(const BuildCtx & other) const;
     HqlExprAssociation *        queryAssociation(IHqlExpression * dataset, AssocKind kind, HqlExprCopyArray * selectors);
     HqlExprAssociation *        queryFirstAssociation(AssocKind kind);
     HqlExprAssociation *        queryFirstCommonAssociation(AssocKind kind);

+ 7 - 2
esp/src/eclwatch/WsWorkunits.js

@@ -32,9 +32,14 @@ define([
             service: "WsWorkunits",
             action: "WUShowScheduled",
             responseQualifier: "WUShowScheduledResponse.Workunits.ScheduledWU",
-            idProperty: "Wuid"
-    });
+            idProperty: "calculatedID",
 
+        preProcessRow: function (row) {
+            lang.mixin(row, {
+                calculatedID: row.Wuid + row.EventText
+            });
+        }
+    });
     return {
         States: {
             0: "unknown",

+ 7 - 5
system/jlib/jexcept.cpp

@@ -988,7 +988,7 @@ void excsighandler(int signum, siginfo_t *info, void *extra)
         return;
 
     excsignal = 0;
-#ifdef NO_LINUX_SEH
+#if defined(NO_LINUX_SEH) && !defined(SA_RESETHAND)
     signal(SIGSEGV, SIG_DFL);
     signal(SIGBUS, SIG_DFL);
     signal(SIGILL, SIG_DFL);
@@ -1191,8 +1191,8 @@ void excsighandler(int signum, siginfo_t *info, void *extra)
     PROGLOG( "ThreadList:\n%s",getThreadList(threadlist).str());
     queryLogMsgManager()->flushQueue(10*1000);
 
+    // MCK - really should not return after recv'ing any of these signals
 
-    
 #ifndef NO_LINUX_SEH
     void (* _P)() = throwSigSegV;
     uc->uc_mcontext.gregs[REG_ESP]-=4;
@@ -1205,10 +1205,8 @@ void excsighandler(int signum, siginfo_t *info, void *extra)
     {
         if ( SEHHandler->fireException(new CSEHException(signum,s.str())) )
             return;
-        else
-            kill(getpid(), SIGABRT);
     }
-
+    raise(signum);
 #endif
     nested--;
 }
@@ -1265,7 +1263,11 @@ void jlib_decl enableSEHtoExceptionMapping()
     sigset_t blockset;
     sigemptyset(&blockset);
     act.sa_mask = blockset;
+#if defined(SA_RESETHAND)
+    act.sa_flags = SA_SIGINFO | SA_RESETHAND;
+#else
     act.sa_flags = SA_SIGINFO;
+#endif
     act.sa_sigaction = &excsighandler; 
     sigaction(SIGSEGV, &act, NULL);
     sigaction(SIGILL, &act, NULL);

+ 2 - 2
testing/regress/ecl-test

@@ -197,10 +197,10 @@ class RegressMain:
 
         # Process config parameter
         self.config = Config(self.args.config).configObj
-        setConfig(self.config)
-        if 'server' in self.args:
+        if ('server' in self.args) and (self.args.server != None):
             self.config.set('espIp',  self.args.server)
             pass
+        setConfig(self.config)
 
         # Process target parameter
         self.targetClusters = []

+ 168 - 96
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -273,7 +273,6 @@ class CBroadcaster : public CSimpleInterface
         unsigned pseudoNode = (myNode<origin) ? nodes-origin+myNode : myNode-origin;
         CMessageBuffer replyMsg;
         // sends to all in 1st pass, then waits for ack from all
-        CriticalBlock b(*broadcastLock); // prevent other channels overlapping, otherwise causes queue ordering issues with MP multi packet messages to same dst.
         for (unsigned sendRecv=0; sendRecv<2 && !activity.queryAbortSoon(); sendRecv++)
         {
             unsigned i = 0;
@@ -294,6 +293,7 @@ class CBroadcaster : public CSimpleInterface
 #endif
                     CMessageBuffer &msg = sendItem->queryMsg();
                     msg.setReplyTag(rt); // simulate sendRecv
+                    CriticalBlock b(*broadcastLock); // prevent other channels overlapping, otherwise causes queue ordering issues with MP multi packet messages to same dst.
                     comm.send(msg, t, mpTag);
                 }
                 else // recv reply
@@ -810,7 +810,6 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
     Owned<IException> leftexception;
 
     bool eos, eog, someSinceEog;
-    SpinLock rHSRowSpinLock;
 
 protected:
     typedef CAllOrLookupHelper<HELPER> HELPERBASE;
@@ -911,6 +910,7 @@ protected:
         }
     } *rowProcessor;
 
+    CriticalSection rhsRowLock;
     Owned<CBroadcaster> broadcaster;
     CBroadcaster *channel0Broadcaster;
     CriticalSection *broadcastLock;
@@ -1059,14 +1059,25 @@ protected:
          * if it never spills, but will make flushing non-locals simpler if spilling occurs.
          */
         CThorSpillableRowArray &rows = *rhsSlaveRows.item(slave);
+        CThorExpandingRowArray rhsInRowsTemp(*this, sharedRightRowInterfaces);
+        CThorExpandingRowArray pending(*this, sharedRightRowInterfaces);
         RtlDynamicRowBuilder rowBuilder(rightAllocator); // NB: rightAllocator is the shared allocator
         CThorStreamDeserializerSource memDeserializer(mb.length(), mb.toByteArray());
         while (!memDeserializer.eos())
         {
             size32_t sz = rightDeserializer->deserialize(rowBuilder, memDeserializer);
-            OwnedConstThorRow fRow = rowBuilder.finalizeRowClear(sz);
-            // NB: If spilt, addRHSRow will filter out non-locals
-            if (!addRHSRow(rows, fRow)) // NB: in SMART case, must succeed
+            pending.append(rowBuilder.finalizeRowClear(sz));
+            if (pending.ordinality() >= 100)
+            {
+                // NB: If spilt, addRHSRows will filter out non-locals
+                if (!addRHSRows(rows, pending, rhsInRowsTemp)) // NB: in SMART case, must succeed
+                    throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
+            }
+        }
+        if (pending.ordinality())
+        {
+            // NB: If spilt, addRHSRows will filter out non-locals
+            if (!addRHSRows(rows, pending, rhsInRowsTemp)) // NB: in SMART case, must succeed
                 throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
         }
     }
@@ -1074,9 +1085,10 @@ protected:
     {
         Owned<CSendItem> sendItem = broadcaster->newSendItem(bcast_send);
         MemoryBuffer mb;
+        CThorExpandingRowArray rhsInRowsTemp(*this, sharedRightRowInterfaces);
+        CThorExpandingRowArray pending(*this, sharedRightRowInterfaces);
         try
         {
-            CThorSpillableRowArray &localRhsRows = *rhsSlaveRows.item(mySlaveNum);
             CMemoryRowSerializer mbser(mb);
             while (!abortSoon)
             {
@@ -1089,25 +1101,28 @@ protected:
                     /* Add all locally read right rows to channel0 directly
                      * NB: these rows remain on their channel allocator.
                      */
-                    if (0 == queryJobChannelNumber())
-                    {
-                        if (!addRHSRow(localRhsRows, row)) // may cause broadcaster to be told to stop (for isStopping() to become true)
-                            throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
-                    }
-                    else
-                    {
-                        if (!channels[0]->addRHSRow(mySlaveNum, row))
-                            throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
-                    }
                     if (numNodes>1)
                     {
                         rightSerializer->serialize(mbser, (const byte *)row.get());
+                        pending.append(row.getClear());
                         if (mb.length() >= MAX_SEND_SIZE || channel0Broadcaster->stopRequested())
                             break;
                     }
+                    else
+                        pending.append(row.getClear());
+                    if (pending.ordinality() >= 100)
+                    {
+                        if (!channels[0]->addRHSRows(mySlaveNum, pending, rhsInRowsTemp)) // may cause broadcaster to be told to stop (for isStopping() to become true)
+                            throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
+                    }
                     if (channel0Broadcaster->stopRequested())
                         break;
                 }
+                if (pending.ordinality())
+                {
+                    if (!channels[0]->addRHSRows(mySlaveNum, pending, rhsInRowsTemp)) // may cause broadcaster to be told to stop (for isStopping() to become true)
+                        throw MakeActivityException(this, 0, "Out of memory: Unable to add any more rows to RHS");
+                }
                 if (0 == mb.length()) // will always be true if numNodes = 1
                     break;
                 if (channel0Broadcaster->stopRequested())
@@ -1414,7 +1429,7 @@ public:
         leftITDL = queryInput(0);
         rightITDL = queryInput(1);
         rightOutputMeta = rightITDL->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta();
-        rightAllocator.setown(rightThorAllocator->getRowAllocator(rightOutputMeta, container.queryId()));
+        rightAllocator.setown(rightThorAllocator->getRowAllocator(rightOutputMeta, container.queryId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFpacked|roxiemem::RHFunique)));
 
         if (isGlobal())
         {
@@ -1556,21 +1571,15 @@ public:
         }
         return (rowidx_t)rhsRows;
     }
-    bool addRHSRow(unsigned slave, const void *row)
+    bool addRHSRows(unsigned slave, CThorExpandingRowArray &inRows, CThorExpandingRowArray &rhsInRowsTemp)
     {
         CThorSpillableRowArray &rows = *rhsSlaveRows.item(slave);
-        return addRHSRow(rows, row);
+        return addRHSRows(rows, inRows, rhsInRowsTemp);
     }
-    virtual bool addRHSRow(CThorSpillableRowArray &rhsRows, const void *row)
+    virtual bool addRHSRows(CThorSpillableRowArray &rhsRows, CThorExpandingRowArray &inRows, CThorExpandingRowArray &rhsInRowsTemp)
     {
-        LinkThorRow(row);
-        {
-            SpinBlock b(rHSRowSpinLock);
-            if (rhsRows.append(row))
-                return true;
-        }
-        ReleaseThorRow(row);
-        return false;
+        CriticalBlock b(rhsRowLock);
+        return rhsRows.appendRows(inRows, true);
     }
 
 // IBCastReceive (only used if global)
@@ -1685,6 +1694,7 @@ protected:
     using PARENT::tableProxy;
     using PARENT::gatheredRHSNodeStreams;
     using PARENT::queryInput;
+    using PARENT::rhsRowLock;
 
     IHash *leftHash, *rightHash;
     ICompare *compareRight, *compareLeftRight;
@@ -1705,7 +1715,6 @@ protected:
     Owned<IJoinHelper> joinHelper;
 
     // NB: Only used by channel 0
-    CriticalSection overflowCrit;
     Owned<CFileOwner> overflowWriteFile;
     Owned<IRowWriter> overflowWriteStream;
     rowcount_t overflowWriteCount;
@@ -1868,7 +1877,7 @@ protected:
             // NB: If spilt after rhsCollated set, callback will have cleared and compacted, rows will still be sorted
             if (rhs.ordinality())
             {
-                CThorSpillableRowArray spillableRHS(*this, queryRowInterfaces(rightITDL));
+                CThorSpillableRowArray spillableRHS(*this, sharedRightRowInterfaces);
                 spillableRHS.transferFrom(rhs);
 
                 /* Set priority higher than std. lookup priority, because any spill will indicate need to
@@ -1926,7 +1935,7 @@ protected:
         }
         return NULL;
     }
-    bool prepareLocalHT(CMarker &marker)
+    bool prepareLocalHT(CMarker &marker, IThorRowCollector &rightCollector)
     {
         try
         {
@@ -1940,8 +1949,21 @@ protected:
             return false;
         }
         // Either was already sorted, or rowLoader->load() sorted on transfer out to rhs
-        rowidx_t uniqueKeys = marker.calculate(rhs, compareRight, false);
-        if (!setupHT(uniqueKeys))
+
+        rowidx_t uniqueKeys = 0;
+        {
+            CThorArrayLockBlock b(rightCollector);
+            if (rightCollector.hasSpilt())
+                return false;
+            /* transfer rows out of collector to perform calc, but we'll keep lock,
+             * so that a request to spill, will block delay, but can still proceed after calculate is done
+             */
+            CThorExpandingRowArray temp(*this);
+            rightCollector.transferRowsOut(temp);
+            uniqueKeys = marker.calculate(temp, compareRight, false);
+            rightCollector.transferRowsIn(temp);
+        }
+        if (!setupHT(uniqueKeys)) // could cause spilling
         {
             if (!isSmart())
                 throw MakeActivityException(this, 0, "Failed to allocate [LOCAL] hash table");
@@ -1978,7 +2000,7 @@ protected:
             if (!hasFailedOverToLocal())
             {
                 if (stable && !globallySorted)
-                    rhs.setup(NULL, false, stableSort_earlyAlloc);
+                    rhs.setup(sharedRightRowInterfaces, false, stableSort_earlyAlloc);
                 bool success=false;
                 try
                 {
@@ -2032,7 +2054,7 @@ protected:
                     if (stable && !globallySorted)
                     {
                         ActPrintLog("Clearing rhs stable ptr table");
-                        rhs.setup(NULL, false, stableSort_none); // don't need stable ptr table anymore
+                        rhs.setup(sharedRightRowInterfaces, false, stableSort_none); // don't need stable ptr table anymore
                     }
                 }
             }
@@ -2089,29 +2111,37 @@ protected:
     /*
      * NB: returned stream or rhs will be sorted
      */
-    IRowStream *handleLocalRHS(IRowStream *right, ICompare *cmp, CThorExpandingRowArray &rhs)
+    IThorRowCollector *handleLocalRHS(IRowStream *right, ICompare *cmp)
     {
-        Owned<IThorRowLoader> rowLoader;
+        Owned<IThorRowCollector> channelCollector;
         if (isSmart())
         {
             dbgassertex(!stable);
             if (getOptBool(THOROPT_LKJOIN_HASHJOINFAILOVER)) // for testing only (force to disk, as if spilt)
-                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_allDisk, SPILL_PRIORITY_LOOKUPJOIN));
+                channelCollector.setown(createThorRowCollector(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_allDisk, SPILL_PRIORITY_LOOKUPJOIN));
             else
-                rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
+                channelCollector.setown(createThorRowCollector(*this, queryRowInterfaces(rightITDL), cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN));
         }
         else
         {
             // i.e. will fire OOM if runs out of memory loading local right
-            rowLoader.setown(createThorRowLoader(*this, queryRowInterfaces(rightITDL), cmp, stable ? stableSort_lateAlloc : stableSort_none, rc_allMem, SPILL_PRIORITY_DISABLE));
+            channelCollector.setown(createThorRowCollector(*this, queryRowInterfaces(rightITDL), cmp, stable ? stableSort_lateAlloc : stableSort_none, rc_allMem, SPILL_PRIORITY_DISABLE));
         }
-        return rowLoader->load(right, abortSoon, false, &rhs);
+        Owned<IRowWriter> writer = channelCollector->getWriter();
+        while (!abortSoon)
+        {
+            const void *next = right->nextRow();
+            if (!next)
+                break;
+            writer->putRow(next);
+        }
+        return channelCollector.getClear();
     }
     /*
      * NB: if global attempt fails.
      * Returnes stream or rhs will be sorted
      */
-    IRowStream *handleFailoverToLocalRHS(CThorExpandingRowArray &rhs, ICompare *cmp)
+    IThorRowCollector *handleFailoverToLocalRHS(ICompare *cmp)
     {
         class CChannelDistributor : public CSimpleInterfaceOf<IChannelDistributor>, implements roxiemem::IBufferedRowCallback
         {
@@ -2158,11 +2188,6 @@ protected:
                     putRow(row.getClear());
                 }
             }
-            IRowStream *getStream(CThorExpandingRowArray *rhs=NULL)
-            {
-                channelCollectorWriter->flush();
-                return channelCollector->getStream(false, rhs);
-            }
         // roxiemem::IBufferedRowCallback impl.
             virtual bool freeBufferedRows(bool critical)
             {
@@ -2189,6 +2214,7 @@ protected:
             {
                 return owner.queryActivityId();
             }
+            virtual IThorRowCollector *getCollector() { return channelCollector.getLink(); }
         // IChannelDistributor impl.
             virtual void putRow(const void *row)
             {
@@ -2218,8 +2244,8 @@ protected:
          */
         roxiemem::IBufferedRowCallback *callback = ((CLookupJoinActivityBase *)channels[0])->channelDistributors[0]->queryCallback();
         queryRowManager()->addRowBuffer(callback);
-        Owned<IRowStream> stream;
         Owned<IException> exception;
+        Owned<IThorRowCollector> channelCollector;
         try
         {
             if (0 == queryJobChannelNumber())
@@ -2236,18 +2262,23 @@ protected:
 
             Owned<IRowStream> distChannelStream = rhsDistributor->connect(queryRowInterfaces(rightITDL), right.getClear(), rightHash, NULL);
             channelDistributor.processDistRight(distChannelStream);
-            stream.setown(channelDistributor.getStream(&rhs));
         }
         catch (IException *e)
         {
             EXCLOG(e, "During channel distribution");
             exception.setown(e);
         }
+
+        /* Now that channel distribution done, remove its roxiemem memory callback
+         * but allow collector return to continue to spill if there's memory pressure.
+         */
+        channelCollector.setown(channelDistributor.getCollector());
+        channelCollector->setup(cmp, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
         queryRowManager()->removeRowBuffer(callback);
         InterChannelBarrier(); // need barrier point to ensure all have removed callback before channelDistributor is destroyed
         if (exception)
             throw exception.getClear();
-        return stream.getClear();
+        return channelCollector.getClear();
     }
     void setupStandardJoin(IRowStream *right)
     {
@@ -2332,6 +2363,7 @@ protected:
         {
             CMarker marker(*this);
             Owned<IRowStream> rightStream;
+            Owned<IThorRowCollector> rightCollector;
             if (isGlobal())
             {
                 /* All slaves on all channels now know whether any one spilt or not, i.e. whether to perform local hash join or not
@@ -2361,10 +2393,11 @@ protected:
                     }
 
                     ICompare *cmp = rhsCollated ? NULL : compareRight; // if rhsCollated=true, then sorted, otherwise can't rely on any previous order.
-                    rightStream.setown(handleFailoverToLocalRHS(rhs, cmp));
-                    if (rightStream)
+                    rightCollector.setown(handleFailoverToLocalRHS(cmp));
+                    if (rightCollector->hasSpilt())
                     {
                         ActPrintLog("Global SMART JOIN spilt to disk during Distributed Local Lookup handling. Failing over to Standard Join");
+                        rightStream.setown(rightCollector->getStream());
                         setFailoverToStandard(true);
                     }
 
@@ -2382,7 +2415,8 @@ protected:
                     return;
                 }
                 ICompare *cmp = helper->isRightAlreadyLocallySorted() ? NULL : compareRight;
-                rightStream.setown(handleLocalRHS(right, cmp, rhs));
+                rightCollector.setown(handleLocalRHS(right, cmp));
+                rightStream.setown(rightCollector->getStream(&rhs));
                 if (rightStream)
                 {
                     ActPrintLog("Local SMART JOIN spilt to disk. Failing over to regular local join");
@@ -2399,13 +2433,9 @@ protected:
                 {
                     if (hasFailedOverToLocal())
                         marker.reset();
-                    if (!prepareLocalHT(marker))
-                    {
+                    if (!prepareLocalHT(marker, *rightCollector)) // can cause others to spill, but must not be allowed to spill channel rows I'm working on.
                         ActPrintLog("Out of memory trying to prepare [LOCAL] hashtable for a SMART join (%" RIPF "d rows), will now failover to a std hash join", rhs.ordinality());
-                        Owned<IThorRowCollector> collector = createThorRowCollector(*this, queryRowInterfaces(rightITDL), NULL, stableSort_none, rc_mixed, SPILL_PRIORITY_LOOKUPJOIN);
-                        collector->transferRowsIn(rhs); // can spill after this
-                        rightStream.setown(collector->getStream());
-                    }
+                    rightStream.setown(rightCollector->getStream(&rhs));
                 }
             }
             if (rightStream)
@@ -2415,6 +2445,7 @@ protected:
             }
             else
             {
+                // NB: No spilling here on in
                 if (isLocal() || hasFailedOverToLocal())
                 {
                     ActPrintLog("Performing LOCAL LOOKUP JOIN: rhs size=%u, lookup table size = %" RIPF "u", rhs.ordinality(), rhsTableLen);
@@ -2686,51 +2717,92 @@ public:
         // NB: only installed if lookup join and global
         return clearAllNonLocalRows("Out of memory callback", true);
     }
-    // NB: addRHSRow only called on channel 0
-    virtual bool addRHSRow(CThorSpillableRowArray &rhsRows, const void *row)
+    rowidx_t keepLocal(CThorExpandingRowArray &rows, CThorExpandingRowArray &localRows)
+    {
+        ForEachItemIn(r, rows)
+        {
+            unsigned hv = rightHash->hash(rows.query(r));
+            if (myNodeNum == (hv % numNodes))
+                localRows.append(rows.getClear(r));
+        }
+        rows.clearRows();
+        return localRows.ordinality();
+    }
+    virtual bool addRHSRows(CThorSpillableRowArray &rhsRows, CThorExpandingRowArray &inRows, CThorExpandingRowArray &rhsInRowsTemp)
     {
-        /* NB: If PARENT::addRHSRow fails, it will cause clearAllNonLocalRows() to have been triggered and failedOverToLocal to be set
+        dbgassertex(0 == rhsInRowsTemp.ordinality());
+        if (hasFailedOverToLocal())
+        {
+            if (0 == keepLocal(inRows, rhsInRowsTemp))
+                return true;
+        }
+        CriticalBlock b(rhsRowLock);
+        /* NB: If PARENT::addRHSRows fails, it will cause clearAllNonLocalRows() to have been triggered and failedOverToLocal to be set
          * When all is done, a last pass is needed to clear out non-locals
          */
-        if (!overflowWriteFile)
+        if (overflowWriteFile)
+        {
+            /* Tried to do outside crit above, but if empty, and now overflow, need to inside
+             * Will be one off if at all
+             */
+            if (0 == rhsInRowsTemp.ordinality())
+            {
+                if (0 == keepLocal(inRows, rhsInRowsTemp))
+                    return true;
+            }
+            overflowWriteCount += rhsInRowsTemp.ordinality();
+            ForEachItemIn(r, rhsInRowsTemp)
+                overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
+            return true;
+        }
+        if (hasFailedOverToLocal())
+        {
+            /* Tried to do outside crit above, but hasFailedOverToLocal() could be true, since gaining lock
+             * Will be one off if at all
+             */
+            if (0 == rhsInRowsTemp.ordinality())
+            {
+                if (0 == keepLocal(inRows, rhsInRowsTemp))
+                    return true;
+            }
+            if (rhsRows.appendRows(rhsInRowsTemp, true))
+                return true;
+        }
+        else
         {
-            if (!hasFailedOverToLocal() && PARENT::addRHSRow(rhsRows, row))
+            if (rhsRows.appendRows(inRows, true))
                 return true;
             dbgassertex(hasFailedOverToLocal());
-            // keep it only if it hashes to my node
-            unsigned hv = rightHash->hash(row);
-            if (myNodeNum != (hv % numNodes))
-                return true; // throw away non-local row
-            if (PARENT::addRHSRow(rhsRows, row))
+
+            if (0 == keepLocal(inRows, rhsInRowsTemp))
                 return true;
 
-            /* Could OOM whilst still failing over to local lookup again, dealing with last row, or trailing
-             * few rows being received. Unlikely since all local rows will have been cleared, but possible,
-             * particularly if last rows end up causing row ptr table expansion here.
-             *
-             * Need to stash away somewhere to allow it to continue.
-             */
-            CriticalBlock b(overflowCrit); // could be coming from broadcaster or receiver
-            if (!overflowWriteFile)
-            {
-                unsigned rwFlags = DEFAULT_RWFLAGS;
-                if (spillCompInfo)
-                {
-                    rwFlags |= rw_compress;
-                    rwFlags |= spillCompInfo;
-                }
-                StringBuffer tempFilename;
-                GetTempName(tempFilename, "lookup_local", true);
-                ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
-                OwnedIFile iFile = createIFile(tempFilename.str());
-                overflowWriteFile.setown(new CFileOwner(iFile.getLink()));
-                overflowWriteStream.setown(createRowWriter(iFile, queryRowInterfaces(rightITDL), rwFlags));
-            }
-        }
-        ++overflowWriteCount;
-        LinkThorRow(row);
-        CriticalBlock b(overflowCrit); // could be coming from broadcaster or receiver
-        overflowWriteStream->putRow(row);
+            // keep it only if it hashes to my node
+            if (rhsRows.appendRows(rhsInRowsTemp, true))
+                return true;
+        }
+        /* Could OOM whilst still failing over to local lookup again, dealing with last row, or trailing
+         * few rows being received. Unlikely since all local rows will have been cleared, but possible,
+         * particularly if last rows end up causing row ptr table expansion here.
+         *
+         * Need to stash away somewhere to allow it to continue.
+         */
+        unsigned rwFlags = DEFAULT_RWFLAGS;
+        if (spillCompInfo)
+        {
+            rwFlags |= rw_compress;
+            rwFlags |= spillCompInfo;
+        }
+        StringBuffer tempFilename;
+        GetTempName(tempFilename, "lookup_local", true);
+        ActPrintLog("Overflowing RHS broadcast rows to spill file: %s", tempFilename.str());
+        OwnedIFile iFile = createIFile(tempFilename.str());
+        overflowWriteFile.setown(new CFileOwner(iFile.getLink()));
+        overflowWriteStream.setown(createRowWriter(iFile, queryRowInterfaces(rightITDL), rwFlags));
+
+        overflowWriteCount += rhsInRowsTemp.ordinality();
+        ForEachItemIn(r, rhsInRowsTemp)
+            overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
         return true;
     }
 };

+ 12 - 16
thorlcr/graph/thgraphslave.cpp

@@ -1080,22 +1080,6 @@ void CSlaveGraph::done()
         throw LINK(exception.get());
 }
 
-void CSlaveGraph::end()
-{
-    CGraphBase::end();
-    if (!queryOwner())
-    {
-        if (nodesLoaded) // wouldn't mean much if parallel jobs running
-            GraphPrintLog("JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", cacheAdds.load(), cacheHits.load(), nodesLoaded.load(), blobCacheHits.load(), blobCacheAdds.load(), leafCacheHits.load(), leafCacheAdds.load(), nodeCacheHits.load(), nodeCacheAdds.load());
-        JSocketStatistics stats;
-        getSocketStatistics(stats);
-        StringBuffer s;
-        getSocketStatisticsString(stats,s);
-        GraphPrintLog("Socket statistics : %s\n",s.str());
-        resetSocketStatistics();
-    }
-}
-
 bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
 {
     unsigned beginPos = mb.length();
@@ -1548,6 +1532,18 @@ void CJobSlave::startJob()
     }
 }
 
+void CJobSlave::reportGraphEnd(graph_id gid)
+{
+    if (nodesLoaded) // wouldn't mean much if parallel jobs running
+        PROGLOG("Graph[%" GIDPF "u] - JHTree node stats:\ncacheAdds=%d\ncacheHits=%d\nnodesLoaded=%d\nblobCacheHits=%d\nblobCacheAdds=%d\nleafCacheHits=%d\nleafCacheAdds=%d\nnodeCacheHits=%d\nnodeCacheAdds=%d\n", gid, cacheAdds.load(), cacheHits.load(), nodesLoaded.load(), blobCacheHits.load(), blobCacheAdds.load(), leafCacheHits.load(), leafCacheAdds.load(), nodeCacheHits.load(), nodeCacheAdds.load());
+    JSocketStatistics stats;
+    getSocketStatistics(stats);
+    StringBuffer s;
+    getSocketStatisticsString(stats,s);
+    PROGLOG("Graph[%" GIDPF "u] - Socket statistics : %s\n", gid, s.str());
+    resetSocketStatistics();
+}
+
 __int64 CJobSlave::getWorkUnitValueInt(const char *prop, __int64 defVal) const
 {
     StringBuffer propName(prop);

+ 1 - 1
thorlcr/graph/thgraphslave.hpp

@@ -378,7 +378,6 @@ public:
     virtual void start() override;
     virtual void abort(IException *e) override;
     virtual void done() override;
-    virtual void end() override;
     virtual IThorGraphResults *createThorGraphResults(unsigned num);
 
 // IExceptionHandler
@@ -416,6 +415,7 @@ public:
 
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing);
     ISlaveWatchdog *queryProgressHandler() { return watchdog; }
+    void reportGraphEnd(graph_id gid);
 
     virtual mptag_t deserializeMPTag(MemoryBuffer &mb);
     virtual __int64 getWorkUnitValueInt(const char *prop, __int64 defVal) const;

+ 1 - 0
thorlcr/slave/slavmain.cpp

@@ -460,6 +460,7 @@ public:
                                     msg.append((rank_t)0); // JCSMORE - not sure why this would ever happen
                                 }
                             }
+                            job->reportGraphEnd(gid);
                         }
                         else
                         {

+ 11 - 6
thorlcr/slave/thslavemain.cpp

@@ -186,10 +186,10 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
 
 static bool jobListenerStopped = true;
 
-void UnregisterSelf(IException *e)
+bool UnregisterSelf(IException *e)
 {
     if (!hasMPServerStarted())
-        return;
+        return false;
 
     StringBuffer slfStr;
     slfEp.getUrlStr(slfStr);
@@ -202,28 +202,33 @@ void UnregisterSelf(IException *e)
         if (!queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION, 60*1000))
         {
             LOG(MCerror, thorJob, "Failed to unregister slave : %s", slfStr.str());
-            return;
+            return false;
         }
         LOG(MCdebugProgress, thorJob, "Unregistered slave : %s", slfStr.str());
+        return true;
     }
     catch (IException *e) {
         if (!jobListenerStopped)
             FLLOG(MCexception(e), thorJob, e,"slave unregistration error");
         e->Release();
     }
+    return false;
 }
 
 bool ControlHandler(ahType type)
 {
     if (ahInterrupt == type)
-        LOG(MCdebugProgress, thorJob, "CTRL-C pressed");
+        LOG(MCdebugProgress, thorJob, "CTRL-C detected");
+    else if (!jobListenerStopped)
+        LOG(MCdebugProgress, thorJob, "SIGTERM detected");
+    bool unregOK = false;
     if (!jobListenerStopped)
     {
         if (masterNode)
-            UnregisterSelf(NULL);
+            unregOK = UnregisterSelf(NULL);
         abortSlave();
     }
-    return false;
+    return !unregOK;
 }
 
 void usage()

+ 24 - 32
thorlcr/thorutil/thmem.cpp

@@ -1253,7 +1253,6 @@ void CThorSpillableRowArray::initCommon()
 {
     commitRows = 0;
     firstRow = 0;
-    resizing = resize_nop;
 }
 
 void CThorSpillableRowArray::clearRows()
@@ -1416,19 +1415,10 @@ bool CThorSpillableRowArray::_flush(bool force)
 
 bool CThorSpillableRowArray::shrink() // NB: if read active should be protected inside a CThorArrayLockBlock
 {
-    CToggleResizingState toggle(resizing);
-    if (!toggle.tryState(resize_shrinking)) // resize() may be in progress, in which case give up this attempt
-        return false;
+    // NB: Should only be called from writer thread
     _flush(true);
     rowidx_t prevMaxRows = maxRows;
-    {
-        /* NB: This method may be called via the roxiemem OOM callback.
-         * As this is shrinking the table, it will not itself invoke an OOM callback.
-         * The CS prevents another thread resizing (see resize()) until this is done.
-         */
-        CriticalBlock b(shrinkingCrit); // can block resize(), but should never be blocked by resize() as have checked/toggled resizing state to get here
-        shrink(numRows);
-    }
+    shrink(numRows);
     return maxRows != prevMaxRows;
 }
 
@@ -1505,14 +1495,6 @@ bool CThorSpillableRowArray::shrink(rowidx_t requiredRows)
 
 bool CThorSpillableRowArray::resize(rowidx_t requiredRows, unsigned maxSpillCost)
 {
-    CToggleResizingState toggle(resizing);
-    loop
-    {
-        if (toggle.tryState(resize_resizing)) // prevent shrink callback clashing
-            break;
-        shrinkingCrit.enter(); // will block if shrinking
-        shrinkingCrit.leave();
-    }
     if (needToMoveRows(false))
     {
         CThorArrayLockBlock block(*this);
@@ -1583,14 +1565,7 @@ protected:
         //This must only be called while a lock is held on spillableRows
         rowidx_t numRows = spillableRows.numCommitted();
         if (numRows == 0)
-        {
-            if (!critical)
-                return false;
-            bool res = spillableRows.shrink();
-            if (res)
-                return true;
-            return false;
-        }
+            return false; // cannot shrink(), as requires a flush and only writer thread can do that.
 
         CCycleTimer spillTimer;
         totalRows += numRows;
@@ -1629,7 +1604,7 @@ protected:
         rowidx_t maxRows = spillableRows.queryMaxRows();
         bool ret = spillableRows.shrink();
         if (traceInfo)
-            traceInfo->append("shink() - previous maxRows=").append(maxRows).append(", new maxRows=").append(spillableRows.queryMaxRows());
+            traceInfo->append("shrink() - previous maxRows=").append(maxRows).append(", new maxRows=").append(spillableRows.queryMaxRows());
         return ret;
     }
     void putRow(const void *row)
@@ -1648,9 +1623,11 @@ protected:
                     flush();
                     spillRows(false);
                 }
-                //Ensure new rows are written to the head of the array.  It needs to be a separate call because
-                //spillRows() cannot shift active row pointer since it can be called from any thread
-                flush();
+                // This is a good time to shrink the row table back. shrink() force a flush.
+                StringBuffer info;
+                if (shrink(&info))
+                    activity.ActPrintLog("CThorRowCollectorBase: shrink - %s", info.str());
+
                 if (!spillableRows.append(row))
                     oom = true;
             }
@@ -1896,6 +1873,12 @@ public:
     {
         options = _options;
     }
+    virtual bool hasSpilt() const { return overflowCount >= 1; }
+
+// IThorArrayLock
+    virtual void lock() const { spillableRows.lock(); }
+    virtual void unlock() const { spillableRows.unlock(); }
+
 // IBufferedRowCallback
     virtual unsigned getSpillCost() const
     {
@@ -1985,6 +1968,11 @@ public:
     virtual void resize(rowidx_t max) { CThorRowCollectorBase::resize(max); }
     virtual void setOptions(unsigned options)  { CThorRowCollectorBase::setOptions(options); }
     virtual unsigned __int64 getStatistic(StatisticKind kind) { return CThorRowCollectorBase::getStatistic(kind); }
+    virtual bool hasSpilt() const { return CThorRowCollectorBase::hasSpilt(); }
+
+// IThorArrayLock
+    virtual void lock() const { CThorRowCollectorBase::lock(); }
+    virtual void unlock() const { CThorRowCollectorBase::unlock(); }
 // IThorRowLoader
     virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
     {
@@ -2038,6 +2026,10 @@ public:
     virtual void resize(rowidx_t max) { CThorRowCollectorBase::resize(max); }
     virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
     virtual unsigned __int64 getStatistic(StatisticKind kind) { return CThorRowCollectorBase::getStatistic(kind); }
+    virtual bool hasSpilt() const { return CThorRowCollectorBase::hasSpilt(); }
+// IThorArrayLock
+    virtual void lock() const { CThorRowCollectorBase::lock(); }
+    virtual void unlock() const { CThorRowCollectorBase::unlock(); }
 // IThorRowCollector
     virtual IRowWriter *getWriter()
     {

+ 3 - 28
thorlcr/thorutil/thmem.hpp

@@ -289,7 +289,7 @@ protected:
     StableSortFlag stableSort;
     rowidx_t maxRows;  // Number of rows that can fit in the allocated memory.
     rowidx_t numRows;  // High water mark of rows added
-    unsigned defaultMaxSpillCost;
+    unsigned defaultMaxSpillCost = roxiemem::SpillAllCost;
 
     const void *allocateRowTable(rowidx_t num);
     const void *allocateRowTable(rowidx_t num, unsigned maxSpillCost);
@@ -413,33 +413,7 @@ class graph_decl CThorSpillableRowArray : private CThorExpandingRowArray, implem
     rowidx_t commitRows;  // can only be updated by writing thread within a critical section
     mutable CriticalSection cs;
     ICopyArrayOf<IWritePosCallback> writeCallbacks;
-    CriticalSection shrinkingCrit;
-    enum ResizeState { resize_nop, resize_shrinking, resize_resizing };
-    std::atomic<ResizeState> resizing;
 
-    class CToggleResizingState
-    {
-        ResizeState state;
-        std::atomic<ResizeState> &resizing;
-    public:
-        CToggleResizingState(std::atomic<ResizeState> &_resizing) : resizing(_resizing)
-        {
-            state = resize_nop;
-        }
-        ~CToggleResizingState()
-        {
-            if (state != resize_nop)
-                verify(resizing.compare_exchange_strong(state, resize_nop));
-        }
-        bool tryState(ResizeState newState)
-        {
-            ResizeState expected = resize_nop;
-            if (!resizing.compare_exchange_strong(expected, newState))
-                return false;
-            state = newState;
-            return true;
-        }
-    };
     void initCommon();
     bool _flush(bool force);
     void doFlush();
@@ -556,7 +530,7 @@ private:
 
 enum RowCollectorSpillFlags { rc_mixed, rc_allMem, rc_allDisk, rc_allDiskOrAllMem };
 enum RowCollectorOptionFlags { rcflag_noAllInMemSort=0x01 };
-interface IThorRowCollectorCommon : extends IInterface
+interface IThorRowCollectorCommon : extends IInterface, extends IThorArrayLock
 {
     virtual rowcount_t numRows() const = 0;
     virtual unsigned numOverflows() const = 0;
@@ -569,6 +543,7 @@ interface IThorRowCollectorCommon : extends IInterface
     virtual void resize(rowidx_t max) = 0;
     virtual void setOptions(unsigned options) = 0;
     virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
+    virtual bool hasSpilt() const = 0; // equivalent to numOverlows() >= 1
 };
 
 interface IThorRowLoader : extends IThorRowCollectorCommon