|
@@ -197,7 +197,6 @@ protected:
|
|
|
|
|
|
VStringBuffer spillPrefixStr("SpillableStream(%d)", SPILL_PRIORITY_SPILLABLE_STREAM); // const for now
|
|
|
rows.save(*spillFile, useCompression, spillPrefixStr.str()); // saves committed rows
|
|
|
- rows.noteSpilled(numRows);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -369,9 +368,7 @@ public:
|
|
|
if (fetch >= granularity)
|
|
|
fetch = granularity;
|
|
|
// consume 'fetch' rows
|
|
|
- const void **toRead = rows.getBlock(fetch);
|
|
|
- memcpy(readRows, toRead, fetch * sizeof(void *));
|
|
|
- rows.noteSpilled(fetch);
|
|
|
+ rows.readBlock(readRows, fetch);
|
|
|
numReadRows = fetch;
|
|
|
pos = 0;
|
|
|
}
|
|
@@ -1173,6 +1170,7 @@ void CThorSpillableRowArray::clearRows()
|
|
|
|
|
|
void CThorSpillableRowArray::compact()
|
|
|
{
|
|
|
+ CThorArrayLockBlock block(*this);
|
|
|
assertex(0 == firstRow && numRows == commitRows);
|
|
|
CThorExpandingRowArray::compact();
|
|
|
commitRows = numRows;
|
|
@@ -1259,6 +1257,7 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression, const c
|
|
|
rows[i] = NULL;
|
|
|
}
|
|
|
writer->flush();
|
|
|
+ firstRow += n;
|
|
|
offset_t bytesWritten = writer->getPosition();
|
|
|
writer.clear();
|
|
|
ActPrintLog(&activity, "%s: CThorSpillableRowArray::save done, bytes = %"I64F"d", tracingPrefix, (__int64)bytesWritten);
|
|
@@ -1333,12 +1332,21 @@ void CThorSpillableRowArray::swap(CThorSpillableRowArray &other)
|
|
|
commitRows = otherCommitRows;
|
|
|
}
|
|
|
|
|
|
+void CThorSpillableRowArray::readBlock(const void **outRows, rowidx_t readRows)
|
|
|
+{
|
|
|
+ CThorArrayLockBlock block(*this);
|
|
|
+ dbgassertex(firstRow + readRows <= commitRows);
|
|
|
+ memcpy(outRows, rows + firstRow, readRows*sizeof(void *));
|
|
|
+ firstRow += readRows;
|
|
|
+}
|
|
|
+
|
|
|
void CThorSpillableRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
|
|
|
{
|
|
|
+ CThorArrayLockBlock block(*this);
|
|
|
if (0 == numRows)
|
|
|
return;
|
|
|
assertex(numRows == commitRows);
|
|
|
- memcpy(outRows, rows, numRows*sizeof(void **));
|
|
|
+ memcpy(outRows, rows, numRows*sizeof(void *));
|
|
|
if (takeOwnership)
|
|
|
firstRow = commitRows = numRows = 0;
|
|
|
else
|
|
@@ -1385,6 +1393,7 @@ protected:
|
|
|
|
|
|
bool spillRows()
|
|
|
{
|
|
|
+ //This must only be called while a lock is held on spillableRows()
|
|
|
rowidx_t numRows = spillableRows.numCommitted();
|
|
|
if (numRows == 0)
|
|
|
return false;
|
|
@@ -1405,7 +1414,6 @@ protected:
|
|
|
spillFiles.append(new CFileOwner(iFile.getLink()));
|
|
|
VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
|
|
|
spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true), spillPrefixStr.str()); // saves committed rows
|
|
|
- spillableRows.noteSpilled(numRows);
|
|
|
|
|
|
++overflowCount;
|
|
|
|