Browse Source

HPCC-17313 Fix potential smartjoin assert if failing over to local

When failing over to local smartjoin, if memory pressure causes a
OOM callback twice after successfully gathering the whole RHS
without spilling, then an assert was hit whilst reading the
gathered rhs rows.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 8 years ago
parent
commit
05d53f5ca4

+ 1 - 0
system/jlib/jstring.hpp

@@ -554,6 +554,7 @@ inline StringBuffer& operator << (StringBuffer& s, const TValue& value)
 
 extern jlib_decl void decodeCppEscapeSequence(StringBuffer & out, const char * in, bool errorIfInvalid);
 extern jlib_decl bool strToBool(const char * text);
+inline const char *boolToStr(bool b) { return b ? "true" : "false"; }
 extern jlib_decl bool strToBool(size_t len, const char * text);
 extern jlib_decl bool clipStrToBool(size_t len, const char * text);
 extern jlib_decl bool clipStrToBool(const char * text);

+ 12 - 0
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -2014,11 +2014,15 @@ protected:
             doBroadcastRHS(stopping);
 
             rowidx_t rhsRows = 0;
+            bool globalBroadcastSpilt = false;
             {
                 CriticalBlock b(broadcastSpillingLock);
                 rhsRows = getGlobalRHSTotal(); // flushes all rhsSlaveRows arrays to calculate total.
                 if (hasFailedOverToLocal())
+                {
                     overflowWriteStream.clear(); // broadcast has finished, no more can be written
+                    globalBroadcastSpilt = true;
+                }
             }
             if (!hasFailedOverToLocal())
             {
@@ -2099,6 +2103,14 @@ protected:
 
                 rightRowManager->removeRowBuffer(this);
 
+                if (!globalBroadcastSpilt && hasFailedOverToLocal()) // i.e. global broadcast didn't spill, but has since
+                {
+                    ForEachItemIn(a, rhsSlaveRows)
+                    {
+                        CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
+                        rows.flush();
+                    }
+                }
                 ActPrintLog("Broadcasting final spilt status: %s", hasFailedOverToLocal() ? "spilt" : "did not spill");
                 // NB: Will cause other slaves to flush non-local if any have and failedOverToLocal will be set on all
                 doBroadcastStop(broadcast2MpTag, hasFailedOverToLocal() ? bcastflag_spilt : bcastflag_null);

+ 4 - 2
thorlcr/thorutil/thmem.cpp

@@ -1361,7 +1361,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
     rowidx_t n = numCommitted();
     if (0 == n)
         return 0;
-    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save %" RIPF "d rows", tracingPrefix, n);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save (skipNulls=%s, allowNulls=%s) max rows = %"  RIPF "u", tracingPrefix, boolToStr(skipNulls), boolToStr(allowNulls), n);
 
     if (_spillCompInfo)
         assertex(0 == writeCallbacks.ordinality()); // incompatible
@@ -1389,6 +1389,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
     }
     Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags, nullptr, compBlkSz);
     rowidx_t i=0;
+    rowidx_t rowsWritten=0;
     try
     {
         const void **rows = getBlock(n);
@@ -1413,6 +1414,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
             }
             if (row)
             {
+                ++rowsWritten;
                 rows[i] = NULL;
                 writer->putRow(row); // NB: putRow takes ownership/should avoid leaking if fails
             }
@@ -1434,7 +1436,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, unsigned _spillCompInfo, boo
     firstRow += n;
     offset_t bytesWritten = writer->getPosition();
     writer.clear();
-    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, bytes = %" I64F "d", tracingPrefix, (__int64)bytesWritten);
+    ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, rows written = %" RIPF "u, bytes = %" I64F "u", tracingPrefix, rowsWritten, (__int64)bytesWritten);
     return n;
 }