|
@@ -1474,6 +1474,7 @@ protected:
|
|
|
unsigned overflowCount;
|
|
|
unsigned maxCores;
|
|
|
unsigned outStreams;
|
|
|
+ offset_t sizeSpill;
|
|
|
ICompare *iCompare;
|
|
|
StableSortFlag stableSort;
|
|
|
bool preserveGrouping;
|
|
@@ -1483,6 +1484,8 @@ protected:
|
|
|
Owned<CSharedSpillableRowSet> spillableRowSet;
|
|
|
unsigned options;
|
|
|
bool compressSpills;
|
|
|
+ __uint64 spillCycles;
|
|
|
+ __uint64 sortCycles;
|
|
|
|
|
|
bool spillRows()
|
|
|
{
|
|
@@ -1491,6 +1494,7 @@ protected:
|
|
|
if (numRows == 0)
|
|
|
return false;
|
|
|
|
|
|
+ CCycleTimer spillTimer;
|
|
|
totalRows += numRows;
|
|
|
StringBuffer tempPrefix, tempName;
|
|
|
if (iCompare)
|
|
@@ -1498,6 +1502,7 @@ protected:
|
|
|
ActPrintLog(&activity, "Sorting %" RIPF "d rows", spillableRows.numCommitted());
|
|
|
CCycleTimer timer;
|
|
|
spillableRows.sort(*iCompare, maxCores); // sorts committed rows
|
|
|
+ sortCycles += timer.elapsedCycles();
|
|
|
ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
|
|
|
tempPrefix.append("srt");
|
|
|
}
|
|
@@ -1508,7 +1513,8 @@ protected:
|
|
|
spillableRows.save(*iFile, compressSpills, spillPrefixStr.str()); // saves committed rows
|
|
|
spillFiles.append(new CFileOwner(iFile.getLink()));
|
|
|
++overflowCount;
|
|
|
-
|
|
|
+ sizeSpill += iFile->size();
|
|
|
+ spillCycles += spillTimer.elapsedCycles();
|
|
|
return true;
|
|
|
}
|
|
|
void setPreserveGrouping(bool _preserveGrouping)
|
|
@@ -1606,7 +1612,11 @@ protected:
|
|
|
{
|
|
|
// Option(rcflag_noAllInMemSort) - avoid sorting allMemRows
|
|
|
if ((NULL == allMemRows) || (0 == (options & rcflag_noAllInMemSort)))
|
|
|
+ {
|
|
|
+ CCycleTimer timer;
|
|
|
spillableRows.sort(*iCompare, maxCores);
|
|
|
+ sortCycles += timer.elapsedCycles();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if ((rc_allDiskOrAllMem == diskMemMix) || // must supply allMemRows, only here if no spilling (see above)
|
|
@@ -1655,6 +1665,7 @@ protected:
|
|
|
spillFiles.kill();
|
|
|
totalRows = 0;
|
|
|
overflowCount = outStreams = 0;
|
|
|
+ sizeSpill = 0;
|
|
|
}
|
|
|
inline bool spillingEnabled() const { return SPILL_PRIORITY_DISABLE != spillPriority; }
|
|
|
void clearSpillingCallback()
|
|
@@ -1682,6 +1693,7 @@ public:
|
|
|
preserveGrouping = false;
|
|
|
totalRows = 0;
|
|
|
overflowCount = outStreams = 0;
|
|
|
+ sizeSpill = 0;
|
|
|
mmRegistered = false;
|
|
|
if (rc_allMem == diskMemMix)
|
|
|
spillPriority = SPILL_PRIORITY_DISABLE; // all mem, implies no spilling
|
|
@@ -1691,6 +1703,8 @@ public:
|
|
|
options = 0;
|
|
|
spillableRows.setup(rowIf, false, stableSort);
|
|
|
compressSpills = activity.getOptBool(THOROPT_COMPRESS_SPILLS, true);
|
|
|
+ spillCycles = 0;
|
|
|
+ sortCycles = 0;
|
|
|
}
|
|
|
~CThorRowCollectorBase()
|
|
|
{
|
|
@@ -1703,7 +1717,11 @@ public:
|
|
|
spillableRows.flush();
|
|
|
totalRows += spillableRows.numCommitted();
|
|
|
if (sort && iCompare)
|
|
|
+ {
|
|
|
+ CCycleTimer timer;
|
|
|
spillableRows.sort(*iCompare, maxCores);
|
|
|
+ sortCycles += timer.elapsedCycles();
|
|
|
+ }
|
|
|
out.transferFrom(spillableRows);
|
|
|
}
|
|
|
// IThorRowCollectorCommon
|
|
@@ -1773,6 +1791,25 @@ public:
|
|
|
CThorArrayLockBlock block(spillableRows);
|
|
|
return spillRows();
|
|
|
}
|
|
|
+ virtual unsigned __int64 getStatistic(StatisticKind kind)
|
|
|
+ {
|
|
|
+ switch (kind)
|
|
|
+ {
|
|
|
+ case StCycleSpillElapsedCycles:
|
|
|
+ return spillCycles;
|
|
|
+ case StCycleSortElapsedCycles:
|
|
|
+ return sortCycles;
|
|
|
+ case StTimeSpillElapsed:
|
|
|
+ return cycle_to_nanosec(spillCycles);
|
|
|
+ case StTimeSortElapsed:
|
|
|
+ return cycle_to_nanosec(sortCycles);
|
|
|
+ case StNumSpills:
|
|
|
+ return overflowCount;
|
|
|
+ case StSizeSpillFile:
|
|
|
+ return sizeSpill;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
enum TRLGroupFlag { trl_ungroup, trl_preserveGrouping, trl_stopAtEog };
|
|
@@ -1825,6 +1862,7 @@ public:
|
|
|
}
|
|
|
virtual void resize(rowidx_t max) { CThorRowCollectorBase::resize(max); }
|
|
|
virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
|
|
|
+ virtual unsigned __int64 getStatistic(StatisticKind kind) { return CThorRowCollectorBase::getStatistic(kind); }
|
|
|
// IThorRowLoader
|
|
|
virtual IRowStream *load(IRowStream *in, const bool &abort, bool preserveGrouping, CThorExpandingRowArray *allMemRows, memsize_t *memUsage, bool doReset)
|
|
|
{
|
|
@@ -1876,6 +1914,7 @@ public:
|
|
|
}
|
|
|
virtual void resize(rowidx_t max) { CThorRowCollectorBase::resize(max); }
|
|
|
virtual void setOptions(unsigned options) { CThorRowCollectorBase::setOptions(options); }
|
|
|
+ virtual unsigned __int64 getStatistic(StatisticKind kind) { return CThorRowCollectorBase::getStatistic(kind); }
|
|
|
// IThorRowCollector
|
|
|
virtual IRowWriter *getWriter()
|
|
|
{
|