|
@@ -349,7 +349,8 @@ protected:
|
|
|
// General activity statistics
|
|
|
|
|
|
static const StatisticsMapping actStatistics(StWhenFirstRow, StTimeElapsed, StTimeLocalExecute, StTimeTotalExecute, StSizeMaxRowSize,
|
|
|
- StNumRowsProcessed, StNumSlaves, StNumStarted, StNumStopped, StNumStrands, StKindNone);
|
|
|
+ StNumRowsProcessed, StNumSlaves, StNumStarted, StNumStopped, StNumStrands,
|
|
|
+ StNumScansPerRow, StNumAllocations, StNumAllocationScans, StKindNone);
|
|
|
static const StatisticsMapping joinStatistics(&actStatistics, StNumAtmostTriggered, StKindNone);
|
|
|
static const StatisticsMapping keyedJoinStatistics(&joinStatistics, StNumServerCacheHits, StNumIndexSeeks, StNumIndexScans, StNumIndexWildSeeks,
|
|
|
StNumIndexSkips, StNumIndexNullSkips, StNumIndexMerges, StNumIndexMergeCompares,
|
|
@@ -1069,10 +1070,20 @@ public:
|
|
|
totalCycles.merge(strandCycles);
|
|
|
stats.merge(strandStats);
|
|
|
}
|
|
|
- inline void createRowAllocator()
|
|
|
+ inline void ensureRowAllocator()
|
|
|
{
|
|
|
if (!rowAllocator)
|
|
|
- rowAllocator = ctx->getRowAllocatorEx(meta.queryOriginal(), activityId, factory->getHeapFlags());
|
|
|
+ rowAllocator = createRowAllocator(meta.queryOriginal());
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IEngineRowAllocator * createRowAllocator(IOutputMetaData * metadata)
|
|
|
+ {
|
|
|
+ return ctx->getRowAllocatorEx(metadata, activityId, factory->getHeapFlags());
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IEngineRowAllocator * createRowAllocatorEx(IOutputMetaData * metadata, roxiemem::RoxieHeapFlags extraFlags)
|
|
|
+ {
|
|
|
+ return ctx->getRowAllocatorEx(metadata, activityId, (roxiemem::RoxieHeapFlags)(factory->getHeapFlags()|extraFlags));
|
|
|
}
|
|
|
|
|
|
inline ICodeContext *queryCodeContext()
|
|
@@ -1197,7 +1208,7 @@ public:
|
|
|
colocalParent = _colocalParent;
|
|
|
createPending = true;
|
|
|
if (needsAllocator())
|
|
|
- createRowAllocator();
|
|
|
+ ensureRowAllocator();
|
|
|
processed = 0;
|
|
|
if (factory)
|
|
|
factory->onCreateChildQueries(ctx, &basehelper, childGraphs, this, probeManager, *this, _numParallel);
|
|
@@ -1329,6 +1340,11 @@ public:
|
|
|
}
|
|
|
if (inputStream)
|
|
|
inputStream->stop();
|
|
|
+ if (rowAllocator)
|
|
|
+ {
|
|
|
+ stats.reset(heapStatistics);
|
|
|
+ rowAllocator->gatherStats(stats);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1651,8 +1667,7 @@ public:
|
|
|
{
|
|
|
if (needsAllocator)
|
|
|
{
|
|
|
- roxiemem::RoxieHeapFlags extraFlags = _parent.factory->getHeapFlags();
|
|
|
- rowAllocator = parent.queryContext()->getRowAllocatorEx(parent.queryOutputMeta(), parent.queryId(), (roxiemem::RoxieHeapFlags)(roxiemem::RHFunique|extraFlags));
|
|
|
+ rowAllocator = parent.createRowAllocatorEx(parent.queryOutputMeta(), roxiemem::RHFunique);
|
|
|
}
|
|
|
else
|
|
|
rowAllocator = NULL;
|
|
@@ -1674,6 +1689,10 @@ public:
|
|
|
{
|
|
|
if (inputStream)
|
|
|
inputStream->stop();
|
|
|
+
|
|
|
+ stats.reset(heapStatistics); // Heap stats are always gathered from scratch each time
|
|
|
+ if (rowAllocator)
|
|
|
+ rowAllocator->gatherStats(stats);
|
|
|
parent.stop();
|
|
|
parent.mergeStrandStats(processed, totalCycles, stats);
|
|
|
}
|
|
@@ -4241,7 +4260,7 @@ public:
|
|
|
if (meta.needsSerializeDisk())
|
|
|
{
|
|
|
deserializer.setown(meta.createDiskDeserializer(ctx->queryCodeContext(), activity.queryId()));
|
|
|
- rowAllocator.setown(ctx->queryCodeContext()->getRowAllocator(meta.queryOriginal(), activity.queryId()));
|
|
|
+ rowAllocator.setown(activity.createRowAllocator(meta.queryOriginal()));
|
|
|
}
|
|
|
if (ctx->queryDebugContext() && ctx->queryDebugContext()->getExecuteSequentially())
|
|
|
deferredStart = true;
|
|
@@ -7497,9 +7516,9 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
|
|
|
class HashDedupTable : public SuperHashTable
|
|
|
{
|
|
|
public:
|
|
|
- HashDedupTable(IHThorHashDedupArg & _helper, unsigned _activityId)
|
|
|
+ HashDedupTable(IHThorHashDedupArg & _helper, CRoxieServerHashDedupActivity & _activity)
|
|
|
: helper(_helper),
|
|
|
- activityId(_activityId),
|
|
|
+ activity(_activity),
|
|
|
keySize(helper.queryKeySize())
|
|
|
{
|
|
|
}
|
|
@@ -7529,7 +7548,7 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
|
|
|
|
|
|
void onCreate(IRoxieSlaveContext *ctx)
|
|
|
{
|
|
|
- keyRowAllocator.setown(ctx->queryCodeContext()->getRowAllocator(keySize.queryOriginal(), activityId));
|
|
|
+ keyRowAllocator.setown(activity.createRowAllocator(keySize.queryOriginal()));
|
|
|
}
|
|
|
|
|
|
void reset()
|
|
@@ -7553,12 +7572,12 @@ class CRoxieServerHashDedupActivity : public CRoxieServerActivity
|
|
|
IHThorHashDedupArg & helper;
|
|
|
CachedOutputMetaData keySize;
|
|
|
Owned<IEngineRowAllocator> keyRowAllocator;
|
|
|
- unsigned activityId;
|
|
|
+ CRoxieServerHashDedupActivity & activity;
|
|
|
} table;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerHashDedupActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
- : CRoxieServerActivity(_ctx, _factory, _probeManager), helper((IHThorHashDedupArg &)basehelper), table(helper, activityId)
|
|
|
+ : CRoxieServerActivity(_ctx, _factory, _probeManager), helper((IHThorHashDedupArg &)basehelper), table(helper, *this)
|
|
|
{
|
|
|
eof = false;
|
|
|
}
|
|
@@ -12228,7 +12247,7 @@ class CRoxieServerJoinActivity : public CRoxieServerTwoInputActivity
|
|
|
if (!defaultLeft)
|
|
|
{
|
|
|
if (!defaultLeftAllocator)
|
|
|
- defaultLeftAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input->queryOutputMeta(), activityId));
|
|
|
+ defaultLeftAllocator.setown(createRowAllocator(input->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultLeftAllocator);
|
|
|
size32_t thisSize = helper.createDefaultLeft(rowBuilder);
|
|
@@ -12241,7 +12260,7 @@ class CRoxieServerJoinActivity : public CRoxieServerTwoInputActivity
|
|
|
if (!defaultRight)
|
|
|
{
|
|
|
if (!defaultRightAllocator)
|
|
|
- defaultRightAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input1->queryOutputMeta(), activityId));
|
|
|
+ defaultRightAllocator.setown(createRowAllocator(input1->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultRightAllocator);
|
|
|
size32_t thisSize = helper.createDefaultRight(rowBuilder);
|
|
@@ -16736,8 +16755,8 @@ public:
|
|
|
{
|
|
|
CRoxieServerNaryActivity::onCreate(_colocalParent);
|
|
|
ICodeContext * codectx = ctx->queryCodeContext();
|
|
|
- inputAllocator.setown(codectx->getRowAllocator(helper.queryInputMeta(), activityId));
|
|
|
- outputAllocator.setown(codectx->getRowAllocator(helper.queryOutputMeta(), activityId));
|
|
|
+ inputAllocator.setown(createRowAllocator(helper.queryInputMeta()));
|
|
|
+ outputAllocator.setown(createRowAllocator(helper.queryOutputMeta()));
|
|
|
}
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
@@ -17230,7 +17249,7 @@ public:
|
|
|
virtual void onCreate(IHThorArg *_colocalParent)
|
|
|
{
|
|
|
CRoxieServerActivity::onCreate(_colocalParent);
|
|
|
- rightRowAllocator.setown(ctx->queryCodeContext()->getRowAllocator(QUERYINTERFACE(helper.queryRightRecordSize(), IOutputMetaData), activityId));
|
|
|
+ rightRowAllocator.setown(createRowAllocator(helper.queryRightRecordSize()));
|
|
|
}
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
@@ -17547,7 +17566,7 @@ public:
|
|
|
const void *defaultRow()
|
|
|
{
|
|
|
if (!rowAllocator)
|
|
|
- createRowAllocator(); // We delay as often not needed...
|
|
|
+ ensureRowAllocator(); // We delay as often not needed...
|
|
|
RtlDynamicRowBuilder rowBuilder(rowAllocator);
|
|
|
size32_t thisSize = helper.createDefault(rowBuilder);
|
|
|
return rowBuilder.finalizeRowClear(thisSize);
|
|
@@ -17756,7 +17775,7 @@ class CRoxieServerSelfJoinActivity : public CRoxieServerActivity
|
|
|
if (!defaultLeft)
|
|
|
{
|
|
|
if (!defaultAllocator)
|
|
|
- defaultAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input->queryOutputMeta(), activityId));
|
|
|
+ defaultAllocator.setown(createRowAllocator(input->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultAllocator);
|
|
|
size32_t thisSize = helper.createDefaultLeft(rowBuilder);
|
|
@@ -17769,7 +17788,7 @@ class CRoxieServerSelfJoinActivity : public CRoxieServerActivity
|
|
|
if (!defaultRight)
|
|
|
{
|
|
|
if (!defaultAllocator)
|
|
|
- defaultAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input->queryOutputMeta(), activityId));
|
|
|
+ defaultAllocator.setown(createRowAllocator(input->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultAllocator);
|
|
|
size32_t thisSize = helper.createDefaultRight(rowBuilder);
|
|
@@ -18333,7 +18352,7 @@ private:
|
|
|
if (!defaultRight)
|
|
|
{
|
|
|
if (!defaultRightAllocator)
|
|
|
- defaultRightAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input1->queryOutputMeta(), activityId));
|
|
|
+ defaultRightAllocator.setown(createRowAllocator(input1->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultRightAllocator);
|
|
|
size32_t thisSize = helper.createDefaultRight(rowBuilder);
|
|
@@ -18857,7 +18876,7 @@ private:
|
|
|
if (!defaultLeft)
|
|
|
{
|
|
|
if (!defaultLeftAllocator)
|
|
|
- defaultLeftAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input->queryOutputMeta(), activityId));
|
|
|
+ defaultLeftAllocator.setown(createRowAllocator(input->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultLeftAllocator);
|
|
|
size32_t thisSize = helper.createDefaultLeft(rowBuilder);
|
|
@@ -18870,7 +18889,7 @@ private:
|
|
|
if (!defaultRight)
|
|
|
{
|
|
|
if (!defaultRightAllocator)
|
|
|
- defaultRightAllocator.setown(ctx->queryCodeContext()->getRowAllocator(input1->queryOutputMeta(), activityId));
|
|
|
+ defaultRightAllocator.setown(createRowAllocator(input1->queryOutputMeta()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultRightAllocator);
|
|
|
size32_t thisSize = helper.createDefaultRight(rowBuilder);
|
|
@@ -19536,7 +19555,7 @@ protected:
|
|
|
ctx->queryDebugContext()->checkBreakpoint(DebugStateLimit, NULL, static_cast<IActivityBase *>(this));
|
|
|
if (transformExtra)
|
|
|
{
|
|
|
- createRowAllocator();
|
|
|
+ ensureRowAllocator();
|
|
|
RtlDynamicRowBuilder rowBuilder(rowAllocator);
|
|
|
size32_t outSize = transformExtra->transformOnLimitExceeded(rowBuilder);
|
|
|
if (outSize)
|
|
@@ -19708,7 +19727,7 @@ protected:
|
|
|
ReleaseRoxieRows(buff);
|
|
|
if (createRow)
|
|
|
{
|
|
|
- createRowAllocator();
|
|
|
+ ensureRowAllocator();
|
|
|
RtlDynamicRowBuilder rowBuilder(rowAllocator);
|
|
|
size32_t outSize = helper.transformOnExceptionCaught(rowBuilder, E);
|
|
|
if (outSize)
|
|
@@ -20891,7 +20910,7 @@ public:
|
|
|
}
|
|
|
if (workunit != NULL || (results && protocol->getFlags() & HPCC_PROTOCOL_NATIVE_RAW))
|
|
|
{
|
|
|
- createRowAllocator();
|
|
|
+ ensureRowAllocator();
|
|
|
rowSerializer.setown(rowAllocator->createDiskSerializer(ctx->queryCodeContext()));
|
|
|
}
|
|
|
__int64 initialProcessed = processed;
|
|
@@ -22730,7 +22749,7 @@ public:
|
|
|
protected:
|
|
|
virtual const void * createLimitFailRow(bool isKeyed)
|
|
|
{
|
|
|
- createRowAllocator();
|
|
|
+ ensureRowAllocator();
|
|
|
RtlDynamicRowBuilder rowBuilder(rowAllocator);
|
|
|
size32_t outSize = isKeyed ? limitTransformExtra->transformOnKeyedLimitExceeded(rowBuilder) : limitTransformExtra->transformOnLimitExceeded(rowBuilder);
|
|
|
if (outSize)
|
|
@@ -22902,7 +22921,7 @@ public:
|
|
|
|
|
|
virtual bool processSingleKey(IKeyIndex *key, IRecordLayoutTranslator * trans)
|
|
|
{
|
|
|
- createRowAllocator();
|
|
|
+ ensureRowAllocator();
|
|
|
remote.injectResult(new LazyLocalKeyReader(*this, key, trans));
|
|
|
return false;
|
|
|
}
|
|
@@ -24458,7 +24477,7 @@ public:
|
|
|
partNo = map->mapOffset(rp);
|
|
|
if (needsRHS)
|
|
|
{
|
|
|
- Owned<IEngineRowAllocator> extractAllocator = ctx->queryCodeContext()->getRowAllocator(helper.queryExtractedSize(), activityId);
|
|
|
+ Owned<IEngineRowAllocator> extractAllocator = createRowAllocator(helper.queryExtractedSize());
|
|
|
RtlDynamicRowBuilder rb(extractAllocator, true);
|
|
|
unsigned rhsSize = helper.extractJoinFields(rb, row);
|
|
|
char * block = (char *) remote.getMem(partNo, 0, sizeof(rp) + sizeof(rhsSize) + rhsSize); // MORE - superfiles
|
|
@@ -24917,7 +24936,7 @@ public:
|
|
|
virtual void onCreate(IHThorArg *_colocalArg)
|
|
|
{
|
|
|
CRemoteResultAdaptor::onCreate(_colocalArg);
|
|
|
- ccdRecordAllocator.setown(ctx->queryCodeContext()->getRowAllocator(QUERYINTERFACE(helper.queryJoinFieldsRecordSize(), IOutputMetaData), activityId));
|
|
|
+ ccdRecordAllocator.setown(activity.createRowAllocator(helper.queryJoinFieldsRecordSize()));
|
|
|
}
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
@@ -25077,7 +25096,7 @@ public:
|
|
|
{
|
|
|
CRoxieServerActivity::onCreate(_colocalParent);
|
|
|
remote.onCreate(_colocalParent);
|
|
|
- indexReadAllocator.setown(ctx->queryCodeContext()->getRowAllocator(indexReadMeta, activityId));
|
|
|
+ indexReadAllocator.setown(createRowAllocator(indexReadMeta));
|
|
|
}
|
|
|
|
|
|
virtual void setInput(unsigned idx, unsigned _sourceIdx, IFinalRoxieInput *_in)
|
|
@@ -25396,7 +25415,7 @@ protected:
|
|
|
if (!defaultRight)
|
|
|
{
|
|
|
if (!defaultRightAllocator)
|
|
|
- defaultRightAllocator.setown(ctx->queryCodeContext()->getRowAllocator(helper.queryJoinFieldsRecordSize(), activityId));
|
|
|
+ defaultRightAllocator.setown(createRowAllocator(helper.queryJoinFieldsRecordSize()));
|
|
|
|
|
|
RtlDynamicRowBuilder rowBuilder(defaultRightAllocator);
|
|
|
size32_t thisSize = helper.createDefaultRight(rowBuilder);
|
|
@@ -25822,7 +25841,7 @@ public:
|
|
|
CRoxieServerKeyedJoinBase::onCreate(_colocalParent);
|
|
|
head.onCreate(_colocalParent);
|
|
|
fetchInputFields.set(helper.queryFetchInputRecordSize());
|
|
|
- fetchInputAllocator.setown(ctx->queryCodeContext()->getRowAllocator(helper.queryFetchInputRecordSize(), activityId));
|
|
|
+ fetchInputAllocator.setown(createRowAllocator(helper.queryFetchInputRecordSize()));
|
|
|
}
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
@@ -25959,11 +25978,11 @@ public:
|
|
|
virtual void onCreate(IHThorArg *_colocalParent)
|
|
|
{
|
|
|
CRoxieServerKeyedJoinBase::onCreate(_colocalParent);
|
|
|
- indexReadAllocator.setown(ctx->queryCodeContext()->getRowAllocator(indexReadMeta, activityId));
|
|
|
+ indexReadAllocator.setown(createRowAllocator(indexReadMeta));
|
|
|
|
|
|
IOutputMetaData *joinFieldsMeta = helper.queryJoinFieldsRecordSize();
|
|
|
joinPrefixedMeta.setown(new CPrefixedOutputMeta(KEYEDJOIN_RECORD_SIZE(0), joinFieldsMeta)); // MORE - not sure if we really need this
|
|
|
- joinFieldsAllocator.setown(ctx->queryCodeContext()->getRowAllocator(joinPrefixedMeta, activityId));
|
|
|
+ joinFieldsAllocator.setown(createRowAllocator(joinPrefixedMeta));
|
|
|
}
|
|
|
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|