|
@@ -1250,31 +1250,41 @@ rowidx_t CThorSpillableRowArray::save(IFile &iFile, bool useCompression, const c
|
|
nextCBI = nextCB->queryRecordNumber();
|
|
nextCBI = nextCB->queryRecordNumber();
|
|
}
|
|
}
|
|
Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags);
|
|
Owned<IExtRowWriter> writer = createRowWriter(&iFile, rowIf, rwFlags);
|
|
- const void **rows = getBlock(n);
|
|
|
|
- for (rowidx_t i=0; i < n; i++)
|
|
|
|
|
|
+ rowidx_t i=0;
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- const void *row = rows[i];
|
|
|
|
- assertex(row || allowNulls);
|
|
|
|
- if (i == nextCBI)
|
|
|
|
|
|
+ const void **rows = getBlock(n);
|
|
|
|
+ while (i<n)
|
|
{
|
|
{
|
|
- writer->flush();
|
|
|
|
- do
|
|
|
|
|
|
+ const void *row = rows[i];
|
|
|
|
+ assertex(row || allowNulls);
|
|
|
|
+ if (i == nextCBI)
|
|
{
|
|
{
|
|
- nextCB->filePosition(writer->getPosition());
|
|
|
|
- if (cbCopy.ordinality())
|
|
|
|
|
|
+ writer->flush();
|
|
|
|
+ do
|
|
{
|
|
{
|
|
- nextCB = &cbCopy.popGet();
|
|
|
|
- nextCBI = nextCB->queryRecordNumber();
|
|
|
|
|
|
+ nextCB->filePosition(writer->getPosition());
|
|
|
|
+ if (cbCopy.ordinality())
|
|
|
|
+ {
|
|
|
|
+ nextCB = &cbCopy.popGet();
|
|
|
|
+ nextCBI = nextCB->queryRecordNumber();
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ nextCBI = RCIDXMAX; // indicating no more
|
|
}
|
|
}
|
|
- else
|
|
|
|
- nextCBI = RCIDXMAX; // indicating no more
|
|
|
|
|
|
+ while (i == nextCBI); // loop as may be >1 IWritePosCallback at same pos
|
|
}
|
|
}
|
|
- while (i == nextCBI); // loop as may be >1 IWritePosCallback at same pos
|
|
|
|
|
|
+ rows[i++] = NULL;
|
|
|
|
+ writer->putRow(row); // NB: putRow takes ownership/should avoid leaking if fails
|
|
}
|
|
}
|
|
- writer->putRow(row);
|
|
|
|
- rows[i] = NULL;
|
|
|
|
|
|
+ writer->flush();
|
|
|
|
+ }
|
|
|
|
+ catch (IException *e)
|
|
|
|
+ {
|
|
|
|
+ EXCLOG(e, "CThorSpillableRowArray::save");
|
|
|
|
+ firstRow += i; // ensure released rows are noted.
|
|
|
|
+ throw;
|
|
}
|
|
}
|
|
- writer->flush();
|
|
|
|
firstRow += n;
|
|
firstRow += n;
|
|
offset_t bytesWritten = writer->getPosition();
|
|
offset_t bytesWritten = writer->getPosition();
|
|
writer.clear();
|
|
writer.clear();
|
|
@@ -1437,10 +1447,9 @@ protected:
|
|
tempPrefix.appendf("spill_%d", activity.queryActivityId());
|
|
tempPrefix.appendf("spill_%d", activity.queryActivityId());
|
|
GetTempName(tempName, tempPrefix.str(), true);
|
|
GetTempName(tempName, tempPrefix.str(), true);
|
|
Owned<IFile> iFile = createIFile(tempName.str());
|
|
Owned<IFile> iFile = createIFile(tempName.str());
|
|
- spillFiles.append(new CFileOwner(iFile.getLink()));
|
|
|
|
VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
|
|
VStringBuffer spillPrefixStr("RowCollector(%d)", spillPriority);
|
|
spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true), spillPrefixStr.str()); // saves committed rows
|
|
spillableRows.save(*iFile, activity.getOptBool(THOROPT_COMPRESS_SPILLS, true), spillPrefixStr.str()); // saves committed rows
|
|
-
|
|
|
|
|
|
+ spillFiles.append(new CFileOwner(iFile.getLink()));
|
|
++overflowCount;
|
|
++overflowCount;
|
|
|
|
|
|
return true;
|
|
return true;
|