|
@@ -1740,6 +1740,7 @@ protected:
|
|
|
Owned<IRowWriter> overflowWriteStream;
|
|
|
rowcount_t overflowWriteCount;
|
|
|
OwnedMalloc<IChannelDistributor *> channelDistributors;
|
|
|
+ unsigned nextRhsToSpill = 0;
|
|
|
|
|
|
inline bool isSmart() const { return smart; }
|
|
|
inline void setFailoverToLocal()
|
|
@@ -1759,25 +1760,36 @@ protected:
|
|
|
inline bool isRhsCollated() const { return rhsCollated; }
|
|
|
rowidx_t clearNonLocalRows(CThorRowArrayWithFlushMarker &rows, unsigned slave)
|
|
|
{
|
|
|
- CThorArrayLockBlock block(rows);
|
|
|
rowidx_t clearedRows = 0;
|
|
|
rowidx_t committedRows = rows.numCommitted();
|
|
|
ActPrintLog("clearNonLocalRows[slave=%u], numCommitted=%" RIPF "u, totalRows(inc uncommitted)=%" RIPF "u, flushMarker=%" RIPF "u", slave, committedRows, rows.queryTotalRows(), rows.flushMarker);
|
|
|
+
|
|
|
+ const void **_rows = rows.getBlock(committedRows);
|
|
|
for (rowidx_t r=rows.flushMarker; r<committedRows; r++)
|
|
|
{
|
|
|
- unsigned hv = rightHash->hash(rows.query(r));
|
|
|
- if (myNodeNum != (hv % numNodes))
|
|
|
+ const void *row = _rows[r];
|
|
|
+ if (row) // NB: rows can be null if OOM event flushed and saved row arrays, in which case flushMarker will have been reset
|
|
|
{
|
|
|
- OwnedConstThorRow row = rows.getClear(r); // dispose of
|
|
|
- ++clearedRows;
|
|
|
+ unsigned hv = rightHash->hash(row);
|
|
|
+ if (myNodeNum != (hv % numNodes))
|
|
|
+ {
|
|
|
+ ReleaseThorRow(row);
|
|
|
+ _rows[r] = nullptr;
|
|
|
+ ++clearedRows;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- /* Record point to which clearNonLocalRows will reach
|
|
|
- * so that can resume from that point, when recalled.
|
|
|
+ /* Record point that clearNonLocalRows reached,
|
|
|
+ * so that can resume from that point when recalled.
|
|
|
*/
|
|
|
rows.flushMarker = committedRows;
|
|
|
return clearedRows;
|
|
|
}
|
|
|
+ rowidx_t clearNonLocalRowsProtected(CThorRowArrayWithFlushMarker &rows, unsigned slave)
|
|
|
+ {
|
|
|
+ CThorArrayLockBlock block(rows);
|
|
|
+ return clearNonLocalRows(rows, slave);
|
|
|
+ }
|
|
|
// Annoyingly similar to above, used post broadcast when rhsSlaveRows collated into 'rhs'
|
|
|
rowidx_t clearNonLocalRows(CThorExpandingRowArray &rows)
|
|
|
{
|
|
@@ -1822,7 +1834,7 @@ protected:
|
|
|
ForEachItemIn(slave, rhsSlaveRows)
|
|
|
{
|
|
|
CThorRowArrayWithFlushMarker &rows = *rhsSlaveRows.item(slave);
|
|
|
- clearedRows += clearNonLocalRows(rows, slave);
|
|
|
+ clearedRows += clearNonLocalRowsProtected(rows, slave);
|
|
|
}
|
|
|
}
|
|
|
ActPrintLog("handleLowMem: clearedRows = %" RIPF "d", clearedRows);
|
|
@@ -1831,22 +1843,33 @@ protected:
|
|
|
}
|
|
|
if (spillRowArrays) // only do if have to due to memory pressure. Not via foreign node notification.
|
|
|
{
|
|
|
- // no non-locals left to spill, so flush a rhsSlaveRows array
|
|
|
- ForEachItemIn(slave, rhsSlaveRows)
|
|
|
+ // NB: round robin through row-arrays, to avoid spilling the same row arrays that have had a few new rows added
|
|
|
+ unsigned startRhsToSpill = nextRhsToSpill;
|
|
|
+ do
|
|
|
{
|
|
|
- CThorRowArrayWithFlushMarker &rows = *rhsSlaveRows.item(slave);
|
|
|
+ unsigned curRhsToSpill = nextRhsToSpill;
|
|
|
+ if (++nextRhsToSpill == rhsSlaveRows.ordinality())
|
|
|
+ nextRhsToSpill = 0;
|
|
|
+
|
|
|
+ CThorRowArrayWithFlushMarker &rows = *rhsSlaveRows.item(curRhsToSpill);
|
|
|
if (rows.numCommitted())
|
|
|
{
|
|
|
- clearNonLocalRows(rows, slave);
|
|
|
- rows.flushMarker = 0; // reset marker, since save will cause numCommitted to shrink
|
|
|
- VStringBuffer tempPrefix("spill_%d", container.queryId());
|
|
|
- StringBuffer tempName;
|
|
|
- GetTempName(tempName, tempPrefix.str(), true);
|
|
|
- Owned<CFileOwner> file = new CFileOwner(createIFile(tempName.str()));
|
|
|
- VStringBuffer spillPrefixStr("clearAllNonLocalRows(%d)", SPILL_PRIORITY_SPILLABLE_STREAM);
|
|
|
-
|
|
|
- // 3rd param. is skipNulls = true, the row arrays may have had the non-local rows delete already.
|
|
|
- rows.save(file->queryIFile(), spillCompInfo, true, spillPrefixStr.str()); // saves committed rows
|
|
|
+ Owned<CFileOwner> file;
|
|
|
+ {
|
|
|
+ // NB: rows may still be added to the row arrays, so protect array whilst saving
|
|
|
+ CThorArrayLockBlock block(rows);
|
|
|
+ if (rows.numCommitted() == clearNonLocalRows(rows, curRhsToSpill)) // this is to clear out any stragglers that can get added whilst initial fail over to local was happening.
|
|
|
+ continue; // all rows were cleared, no local rows remain, so nothing to save. Skip to next row set
|
|
|
+
|
|
|
+ VStringBuffer tempPrefix("spill_%d", container.queryId());
|
|
|
+ StringBuffer tempName;
|
|
|
+ GetTempName(tempName, tempPrefix.str(), true);
|
|
|
+ file.setown(new CFileOwner(createIFile(tempName.str())));
|
|
|
+ VStringBuffer spillPrefixStr("clearAllNonLocalRows(%d)", SPILL_PRIORITY_SPILLABLE_STREAM);
|
|
|
+ // 3rd param. is skipNulls = true, the row arrays may have had the non-local rows delete already.
|
|
|
+ rows.save(file->queryIFile(), spillCompInfo, true, spillPrefixStr.str()); // saves committed rows
|
|
|
+ rows.flushMarker = 0; // reset because array will be moved as a consequence of further adds, so next scan must be from start
|
|
|
+ }
|
|
|
|
|
|
unsigned rwFlags = DEFAULT_RWFLAGS;
|
|
|
if (spillCompInfo)
|
|
@@ -1858,6 +1881,7 @@ protected:
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
+ while (nextRhsToSpill != startRhsToSpill);
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -1921,20 +1945,7 @@ protected:
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- /* NB: If cleared before rhsCollated, then need to clear non-locals that were added after spill
|
|
|
- * There should not be many, as broadcast starts to stop as soon as a slave notifies it is spilling
|
|
|
- * and ignores all non-locals.
|
|
|
- */
|
|
|
-
|
|
|
- rhsSlaveRows.sort(sortBySize); // because want biggest compacted/consumed 1st
|
|
|
- ForEachItemIn(slave, rhsSlaveRows)
|
|
|
- {
|
|
|
- CThorRowArrayWithFlushMarker &rows = *rhsSlaveRows.item(slave);
|
|
|
- clearNonLocalRows(rows, slave);
|
|
|
-
|
|
|
- ActPrintLog("Compacting rhsSlaveRows[%u], has %" RIPF "u rows", slave, rows.numCommitted());
|
|
|
- rows.compact();
|
|
|
- }
|
|
|
+ rhsSlaveRows.sort(sortBySize); // because want biggest consumed 1st
|
|
|
|
|
|
// NB: Some streams may have already been added to gatheredRHSNodeStreams, as a result of previous spilling
|
|
|
for (unsigned a=0; a<rhsSlaveRows.ordinality(); a++)
|
|
@@ -2022,15 +2033,11 @@ protected:
|
|
|
doBroadcastRHS(stopping);
|
|
|
|
|
|
rowidx_t rhsRows = 0;
|
|
|
- bool globalBroadcastSpilt = false;
|
|
|
{
|
|
|
CriticalBlock b(broadcastSpillingLock);
|
|
|
rhsRows = getGlobalRHSTotal(); // flushes all rhsSlaveRows arrays to calculate total.
|
|
|
if (hasFailedOverToLocal())
|
|
|
- {
|
|
|
overflowWriteStream.clear(); // broadcast has finished, no more can be written
|
|
|
- globalBroadcastSpilt = true;
|
|
|
- }
|
|
|
}
|
|
|
if (!hasFailedOverToLocal())
|
|
|
{
|
|
@@ -2111,17 +2118,35 @@ protected:
|
|
|
|
|
|
rightRowManager->removeRowBuffer(this);
|
|
|
|
|
|
- if (!globalBroadcastSpilt && hasFailedOverToLocal()) // i.e. global broadcast didn't spill, but has since
|
|
|
+ ActPrintLog("Broadcasting final spilt status: %s", hasFailedOverToLocal() ? "spilt" : "did not spill");
|
|
|
+ // NB: Will cause other slaves to flush non-local if any have and failedOverToLocal will be set on all
|
|
|
+ doBroadcastStop(broadcast2MpTag, hasFailedOverToLocal() ? bcastflag_spilt : bcastflag_null);
|
|
|
+
|
|
|
+ if (hasFailedOverToLocal())
|
|
|
{
|
|
|
+ // If HT sized already and now spilt, it's too big. Clear for re-use by handleLocalRHS()
|
|
|
+ clearHT();
|
|
|
+ marker.reset();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!rhsCollated) // NB: could have spilt after collated
|
|
|
+ {
|
|
|
+ // Can now clean/prepare remaining row arrays for next stages
|
|
|
ForEachItemIn(a, rhsSlaveRows)
|
|
|
{
|
|
|
- CThorSpillableRowArray &rows = *rhsSlaveRows.item(a);
|
|
|
- rows.flush();
|
|
|
+ CThorRowArrayWithFlushMarker &rows = *rhsSlaveRows.item(a);
|
|
|
+ rows.flush(true); // If the row array was spilt, force to relocate so it can be compacted
|
|
|
+
|
|
|
+ /* NB: need to clear non-locals that were added after spill
|
|
|
+ * There should not be many, as broadcast starts to stop as soon as a slave notifies it is spilling
|
|
|
+ * and ignores all non-locals.
|
|
|
+ */
|
|
|
+ clearNonLocalRows(rows, a);
|
|
|
+
|
|
|
+ ActPrintLog("Compacting rhsSlaveRows[%u], has %" RIPF "u rows", a, rows.numCommitted());
|
|
|
+ rows.compact();
|
|
|
}
|
|
|
}
|
|
|
- ActPrintLog("Broadcasting final spilt status: %s", hasFailedOverToLocal() ? "spilt" : "did not spill");
|
|
|
- // NB: Will cause other slaves to flush non-local if any have and failedOverToLocal will be set on all
|
|
|
- doBroadcastStop(broadcast2MpTag, hasFailedOverToLocal() ? bcastflag_spilt : bcastflag_null);
|
|
|
}
|
|
|
InterChannelBarrier();
|
|
|
ActPrintLog("Shared memory manager memory report");
|
|
@@ -2212,7 +2237,7 @@ protected:
|
|
|
atomic_set(&spilt, 0);
|
|
|
//NB: all channels will have done this, before rows are added
|
|
|
}
|
|
|
-#define HPCC_17331 // Whilst under investigation
|
|
|
+#define HPCC_17331 // Whilst under investigation. Should be solved by fix for HPCC-21091
|
|
|
void process(IRowStream *right)
|
|
|
{
|
|
|
#ifdef HPCC_17331
|
|
@@ -2473,13 +2498,6 @@ protected:
|
|
|
if (grouped)
|
|
|
throw MakeActivityException(this, 0, "Degraded to Distributed Local Lookup, but input is marked as grouped and cannot preserve LHS order");
|
|
|
|
|
|
- if (0 == queryJobChannelNumber())
|
|
|
- {
|
|
|
- // If HT sized already and now spilt, it's too big. Clear for re-use by handleLocalRHS()
|
|
|
- clearHT();
|
|
|
- marker.reset();
|
|
|
- }
|
|
|
-
|
|
|
ICompare *cmp = rhsCollated ? NULL : compareRight; // if rhsCollated=true, then sorted, otherwise can't rely on any previous order.
|
|
|
rightCollector.setown(handleFailoverToLocalRHS(cmp));
|
|
|
if (rightCollector->hasSpilt())
|
|
@@ -2707,6 +2725,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
failedOverToStandard = false;
|
|
|
+ nextRhsToSpill = 0;
|
|
|
}
|
|
|
}
|
|
|
CATCH_NEXTROW()
|
|
@@ -2822,7 +2841,7 @@ public:
|
|
|
rows.clearRows();
|
|
|
return localRows.ordinality();
|
|
|
}
|
|
|
- virtual bool addRHSRows(CThorSpillableRowArray &rhsRows, CThorExpandingRowArray &inRows, CThorExpandingRowArray &rhsInRowsTemp)
|
|
|
+ virtual bool addRHSRows(CThorSpillableRowArray &rhsRows, CThorExpandingRowArray &inRows, CThorExpandingRowArray &rhsInRowsTemp) override
|
|
|
{
|
|
|
dbgassertex(0 == rhsInRowsTemp.ordinality());
|
|
|
if (hasFailedOverToLocal())
|
|
@@ -2831,9 +2850,6 @@ public:
|
|
|
return true;
|
|
|
}
|
|
|
CriticalBlock b(rhsRowLock);
|
|
|
- /* NB: If PARENT::addRHSRows fails, it will cause clearAllNonLocalRows() to have been triggered and failedOverToLocal to be set
|
|
|
- * When all is done, a last pass is needed to clear out non-locals
|
|
|
- */
|
|
|
if (overflowWriteFile)
|
|
|
{
|
|
|
/* Tried to do outside crit above, but if empty, and now overflow, need to inside
|