|
@@ -2581,7 +2581,6 @@ public:
|
|
|
{
|
|
|
OptimizedRowBuilder rowBuilder(owner.rowAllocator, owner.meta, output, owner.serializer);
|
|
|
helper->clearAggregate(rowBuilder);
|
|
|
- unsigned __int64 totalCount = 0;
|
|
|
while (!aborted && !deserializeSource.eos())
|
|
|
{
|
|
|
prefetcher->readAhead(deserializeSource);
|
|
@@ -3031,8 +3030,6 @@ protected:
|
|
|
Linked<TranslatorArray> layoutTranslators;
|
|
|
Linked<IKeyArray> keyArray;
|
|
|
IDefRecordMeta *activityMeta;
|
|
|
-
|
|
|
- IRecordLayoutTranslator *lastTranslator;
|
|
|
bool createSegmentMonitorsPending;
|
|
|
|
|
|
virtual void createSegmentMonitors() = 0;
|
|
@@ -3088,7 +3085,8 @@ protected:
|
|
|
: CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory),
|
|
|
keyArray(_aFactory->queryKeyArray()),
|
|
|
layoutTranslators(_aFactory->queryLayoutTranslators()),
|
|
|
- activityMeta(_aFactory->queryActivityMeta())
|
|
|
+ activityMeta(_aFactory->queryActivityMeta()),
|
|
|
+ createSegmentMonitorsPending(true)
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -3169,6 +3167,8 @@ public:
|
|
|
indexHelper = (IHThorIndexReadBaseArg *) basehelper;
|
|
|
variableFileName = (indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) != 0;
|
|
|
isOpt = (indexHelper->getFlags() & TDRoptional) != 0;
|
|
|
+ inputData = NULL;
|
|
|
+ inputCount = 0;
|
|
|
inputsDone = 0;
|
|
|
processed = 0;
|
|
|
keyprocessed = 0;
|
|
@@ -3179,7 +3179,6 @@ public:
|
|
|
numSeeks = 0;
|
|
|
if (packet->getSmartStepInfoLength())
|
|
|
{
|
|
|
- unsigned smartStepInfoLength = packet->getSmartStepInfoLength();
|
|
|
const byte *smartStepInfoValue = packet->querySmartStepInfoData();
|
|
|
numSkipFields = * (unsigned short *) smartStepInfoValue;
|
|
|
smartStepInfoValue += sizeof(unsigned short);
|
|
@@ -3203,7 +3202,6 @@ public:
|
|
|
}
|
|
|
#endif
|
|
|
}
|
|
|
- createSegmentMonitorsPending = true;
|
|
|
}
|
|
|
|
|
|
virtual void onCreate()
|
|
@@ -3307,10 +3305,7 @@ public:
|
|
|
i++;
|
|
|
}
|
|
|
if (allKeys->numParts())
|
|
|
- {
|
|
|
tlk.setown(::createKeyMerger(allKeys, 0, steppingOffset, &logctx));
|
|
|
- createSegmentMonitorsPending = true;
|
|
|
- }
|
|
|
else
|
|
|
tlk.clear();
|
|
|
createSegmentMonitorsPending = true;
|
|
@@ -4089,7 +4084,6 @@ public:
|
|
|
MTIME_SECTION(timer, "CRoxieIndexGroupAggregateActivity ::process");
|
|
|
Owned<IRowManager> rowManager = roxiemem::createRowManager(0, NULL, logctx, NULL, true); // MORE - should not really use default limits
|
|
|
Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
|
|
|
- unsigned skipped = 0;
|
|
|
|
|
|
unsigned processedBefore = processed;
|
|
|
try
|
|
@@ -4161,7 +4155,7 @@ public:
|
|
|
if (tlk) // a very early abort can mean it is NULL....
|
|
|
{
|
|
|
logctx.noteStatistic(STATS_ACCEPTED, processed-processedBefore, 1);
|
|
|
- logctx.noteStatistic(STATS_REJECTED, skipped, 1);
|
|
|
+ logctx.noteStatistic(STATS_REJECTED, 0, 1);
|
|
|
}
|
|
|
logctx.flush(true, aborted);
|
|
|
if (aborted)
|
|
@@ -4262,6 +4256,7 @@ public:
|
|
|
{
|
|
|
helper = (IHThorFetchBaseArg *) basehelper;
|
|
|
fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
|
|
|
+ base = 0;
|
|
|
variableFileName = (fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
|
|
|
isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
|
|
|
onCreate();
|
|
@@ -4354,7 +4349,6 @@ public:
|
|
|
CRoxieFetchActivity(SlaveContextLogger &_logctx, IRoxieQueryPacket *_packet, HelperFactory *_hFactory, const CRoxieFetchActivityFactory *_aFactory)
|
|
|
: CRoxieFetchActivityBase(_logctx, _packet, _hFactory, _aFactory)
|
|
|
{
|
|
|
- IHThorFetchArg *h = (IHThorFetchArg *) helper;
|
|
|
IHThorFetchContext * fetchContext = static_cast<IHThorFetchContext *>(helper->selectInterface(TAIfetchcontext_1));
|
|
|
IOutputMetaData *diskMeta = fetchContext->queryDiskRecordSize();
|
|
|
diskAllocator.setown(getRowAllocator(diskMeta, basefactory->queryId()));
|
|
@@ -4938,9 +4932,7 @@ class CRoxieKeyedJoinFetchActivity : public CRoxieSlaveActivity
|
|
|
IHThorKeyedJoinArg *helper;
|
|
|
Owned<IFileIO> rawFile;
|
|
|
const CRoxieKeyedJoinFetchActivityFactory *factory;
|
|
|
- char *rawBuffer;
|
|
|
offset_t base;
|
|
|
- unsigned rawSize;
|
|
|
const char *inputLimit;
|
|
|
const char *inputData;
|
|
|
Owned<IFileIOArray> varFiles;
|
|
@@ -4961,6 +4953,7 @@ public:
|
|
|
CRoxieSlaveActivity(_logctx, _packet, _hFactory, _aFactory)
|
|
|
{
|
|
|
// MORE - no continuation row support?
|
|
|
+ base = 0;
|
|
|
helper = (IHThorKeyedJoinArg *) basehelper;
|
|
|
variableFileName = (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) != 0;
|
|
|
onCreate();
|
|
@@ -5135,7 +5128,6 @@ public:
|
|
|
MTIME_SECTION(timer, "CRoxieRemoteActivity ::process");
|
|
|
|
|
|
Owned<IMessagePacker> output = ROQ->createOutputStream(packet->queryHeader(), false, logctx);
|
|
|
- unsigned totalSizeSent = 0;
|
|
|
unsigned __int64 rowLimit = remoteHelper->getRowLimit();
|
|
|
|
|
|
rtlRowBuilder remoteExtractBuilder;
|
|
@@ -5148,8 +5140,6 @@ public:
|
|
|
{
|
|
|
remoteGraph->beforeExecute();
|
|
|
Owned<IRoxieInput> input = remoteGraph->startOutput(0, remoteExtractBuilder.size(), remoteExtractBuilder.getbytes(), false);
|
|
|
-
|
|
|
- unsigned processedBefore = processed;
|
|
|
while (!aborted)
|
|
|
{
|
|
|
const void * next = input->nextInGroup();
|
|
@@ -5179,7 +5169,6 @@ public:
|
|
|
output->putBuffer(recBuffer, nextSize, meta.isVariableSize());
|
|
|
}
|
|
|
ReleaseRoxieRow(next);
|
|
|
- totalSizeSent += nextSize;
|
|
|
}
|
|
|
|
|
|
remoteGraph->afterExecute();
|