|
@@ -5687,47 +5687,37 @@ public:
|
|
|
class CGraphResult : public CInterface, implements IGraphResult
|
|
|
{
|
|
|
CriticalSection cs;
|
|
|
- CachedOutputMetaData meta;
|
|
|
- ConstPointerArray data;
|
|
|
- Owned<IEngineRowAllocator> rowsetAllocator;
|
|
|
+ byte **rowset;
|
|
|
+ size32_t count;
|
|
|
bool complete;
|
|
|
+
|
|
|
void clear()
|
|
|
{
|
|
|
CriticalBlock func(cs);
|
|
|
- ReleaseRoxieRowSet(data);
|
|
|
+ rtlReleaseRowset(count, rowset);
|
|
|
+ rowset = NULL;
|
|
|
+ count = 0;
|
|
|
complete = false;
|
|
|
}
|
|
|
|
|
|
public:
|
|
|
+ IMPLEMENT_IINTERFACE
|
|
|
CGraphResult()
|
|
|
{
|
|
|
complete = false; // dummy result is not supposed to be used...
|
|
|
+ rowset = NULL;
|
|
|
+ count = 0;
|
|
|
}
|
|
|
|
|
|
- CGraphResult(IEngineRowAllocator * _ownedRowsetAllocator, ConstPointerArray & input, bool own) : rowsetAllocator(_ownedRowsetAllocator)
|
|
|
+ CGraphResult(size32_t _count, byte **_rowset)
|
|
|
+ : count(_count), rowset(_rowset)
|
|
|
{
|
|
|
- meta.set(rowsetAllocator->queryOutputMeta());
|
|
|
- ForEachItemIn(i, input)
|
|
|
- {
|
|
|
- const void * cur = input.item(i);
|
|
|
- if (!own && cur)
|
|
|
- LinkRoxieRow(cur);
|
|
|
- data.append(cur);
|
|
|
- }
|
|
|
- if (own)
|
|
|
- input.kill();
|
|
|
complete = true;
|
|
|
}
|
|
|
- CGraphResult(IEngineRowAllocator * _ownedRowsetAllocator, void * row, bool own) : rowsetAllocator(_ownedRowsetAllocator)
|
|
|
- {
|
|
|
- meta.set(rowsetAllocator->queryOutputMeta());
|
|
|
- data.append(row);
|
|
|
- if (!own)
|
|
|
- LinkRoxieRow(row);
|
|
|
- complete = true;
|
|
|
+ ~CGraphResult()
|
|
|
+ {
|
|
|
+ clear();
|
|
|
}
|
|
|
- ~CGraphResult() { clear(); }
|
|
|
- IMPLEMENT_IINTERFACE
|
|
|
|
|
|
// interface IGraphResult
|
|
|
virtual IRoxieInput * createIterator()
|
|
@@ -5737,67 +5727,22 @@ public:
|
|
|
return new CGraphResultIterator(this);
|
|
|
}
|
|
|
|
|
|
- virtual void getResult(unsigned & lenResult, void * & result)
|
|
|
- {
|
|
|
- if (!complete)
|
|
|
- throw MakeStringException(ROXIE_GRAPH_PROCESSING_ERROR, "Internal Error: Reading uninitialised graph result");
|
|
|
-
|
|
|
- bool grouped = meta.isGrouped();
|
|
|
- MemoryBuffer rowdata;
|
|
|
- unsigned max = data.ordinality();
|
|
|
- unsigned i;
|
|
|
- for (i = 0; i < max; i++)
|
|
|
- {
|
|
|
- const void * nextrec = data.item(i);
|
|
|
- size32_t thisSize = meta.getRecordSize(nextrec);
|
|
|
- rowdata.append(thisSize, nextrec);
|
|
|
- if (grouped)
|
|
|
- {
|
|
|
- bool eog = false;
|
|
|
- if (data.isItem(i+1))
|
|
|
- {
|
|
|
- if (!data.item(i+1))
|
|
|
- {
|
|
|
- eog = true;
|
|
|
- i++;
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- eog = true;
|
|
|
- rowdata.append(eog);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- lenResult = rowdata.length();
|
|
|
- result = rowdata.detach();
|
|
|
- }
|
|
|
-
|
|
|
virtual void getLinkedResult(unsigned & countResult, byte * * & result)
|
|
|
{
|
|
|
if (!complete)
|
|
|
throw MakeStringException(ROXIE_GRAPH_PROCESSING_ERROR, "Internal Error: Reading uninitialised graph result");
|
|
|
|
|
|
- byte * * rowset = rowsetAllocator->createRowset(data.ordinality());
|
|
|
- unsigned max = data.ordinality();
|
|
|
- unsigned i;
|
|
|
- for (i = 0; i < max; i++)
|
|
|
- {
|
|
|
- const void * next = data.item(i);
|
|
|
- if (next) LinkRoxieRow(next);
|
|
|
- rowset[i] = (byte *)next;
|
|
|
- }
|
|
|
-
|
|
|
- countResult = max;
|
|
|
- result = rowset;
|
|
|
- }
|
|
|
+ result = rtlLinkRowset(rowset);
|
|
|
+ countResult = count;
|
|
|
+ }
|
|
|
|
|
|
//other
|
|
|
const void * getRow(unsigned i)
|
|
|
{
|
|
|
CriticalBlock func(cs);
|
|
|
- if (!data.isItem(i))
|
|
|
+ if (i >= count)
|
|
|
return NULL;
|
|
|
- const void * ret = data.item(i);
|
|
|
+ const void * ret = rowset[i];
|
|
|
if (ret) LinkRoxieRow(ret);
|
|
|
return ret;
|
|
|
}
|
|
@@ -6010,6 +5955,8 @@ public:
|
|
|
graph = NULL;
|
|
|
}
|
|
|
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
virtual void onCreate(IRoxieSlaveContext *_ctx, IHThorArg *_colocalParent)
|
|
|
{
|
|
|
CRoxieServerInternalSinkActivity::onCreate(_ctx, _colocalParent);
|
|
@@ -6018,31 +5965,23 @@ public:
|
|
|
|
|
|
virtual void onExecute()
|
|
|
{
|
|
|
- ConstPointerArray rows;
|
|
|
- try
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
+ loop
|
|
|
{
|
|
|
- loop
|
|
|
+ const void *nextrec = input->nextInGroup();
|
|
|
+ if (!nextrec)
|
|
|
{
|
|
|
- const void *nextrec = input->nextInGroup();
|
|
|
+ nextrec = input->nextInGroup();
|
|
|
if (!nextrec)
|
|
|
- {
|
|
|
- nextrec = input->nextInGroup();
|
|
|
- if (!nextrec)
|
|
|
- break;
|
|
|
- rows.append(NULL);
|
|
|
- }
|
|
|
- rows.append(nextrec);
|
|
|
+ break;
|
|
|
+ builder.appendEOG();
|
|
|
}
|
|
|
-
|
|
|
- IOutputMetaData *outputMeta = input->queryOutputMeta();
|
|
|
- Owned<CGraphResult> result = new CGraphResult(ctx->queryCodeContext()->getRowAllocator(outputMeta, activityId), rows, true);
|
|
|
- graph->setResult(helper.querySequence(), result);
|
|
|
- }
|
|
|
- catch(...)
|
|
|
- {
|
|
|
- ReleaseRoxieRowSet(rows);
|
|
|
- throw;
|
|
|
+ builder.appendOwn(nextrec);
|
|
|
}
|
|
|
+
|
|
|
+ IOutputMetaData *outputMeta = input->queryOutputMeta();
|
|
|
+ Owned<CGraphResult> result = new CGraphResult(builder.getcount(), builder.linkrows());
|
|
|
+ graph->setResult(helper.querySequence(), result);
|
|
|
}
|
|
|
|
|
|
IRoxieInput * querySelectOutput(unsigned id)
|
|
@@ -6079,6 +6018,83 @@ IRoxieServerActivityFactory *createRoxieServerLocalResultWriteActivityFactory(un
|
|
|
return new CRoxieServerLocalResultWriteActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _usageCount, _graphId, _isRoot);
|
|
|
}
|
|
|
|
|
|
+//=====================================================================================================
|
|
|
+
|
|
|
+class CRoxieServerDictionaryResultWriteActivity : public CRoxieServerInternalSinkActivity
|
|
|
+{
|
|
|
+ IHThorDictionaryResultWriteArg &helper;
|
|
|
+ ILocalGraphEx * graph;
|
|
|
+ unsigned graphId;
|
|
|
+
|
|
|
+public:
|
|
|
+ CRoxieServerDictionaryResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
|
|
|
+ : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDictionaryResultWriteArg &)basehelper), graphId(_graphId)
|
|
|
+ {
|
|
|
+ graph = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
+ virtual void onCreate(IRoxieSlaveContext *_ctx, IHThorArg *_colocalParent)
|
|
|
+ {
|
|
|
+ CRoxieServerInternalSinkActivity::onCreate(_ctx, _colocalParent);
|
|
|
+ graph = static_cast<ILocalGraphEx *>(_ctx->queryCodeContext()->resolveLocalQuery(graphId));
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void onExecute()
|
|
|
+ {
|
|
|
+ unsigned sequence = helper.querySequence();
|
|
|
+
|
|
|
+ RtlLinkedDictionaryBuilder builder(rowAllocator, helper.queryHashLookupInfo());
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ const void *row = input->nextInGroup();
|
|
|
+ if (!row)
|
|
|
+ {
|
|
|
+ row = input->nextInGroup();
|
|
|
+ if (!row)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ builder.appendOwn(row);
|
|
|
+ processed++;
|
|
|
+ }
|
|
|
+ IOutputMetaData *outputMeta = input->queryOutputMeta();
|
|
|
+ Owned<CGraphResult> result = new CGraphResult(builder.getcount(), builder.linkrows());
|
|
|
+ graph->setResult(helper.querySequence(), result);
|
|
|
+ }
|
|
|
+
|
|
|
+ IRoxieInput * querySelectOutput(unsigned id)
|
|
|
+ {
|
|
|
+ if (id == helper.querySequence())
|
|
|
+ {
|
|
|
+ executed = true;
|
|
|
+ return LINK(input);
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+class CRoxieServerDictionaryResultWriteActivityFactory : public CRoxieServerInternalSinkFactory
|
|
|
+{
|
|
|
+ unsigned graphId;
|
|
|
+public:
|
|
|
+ CRoxieServerDictionaryResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, unsigned _graphId, bool _isRoot)
|
|
|
+ : CRoxieServerInternalSinkFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _usageCount, _isRoot), graphId(_graphId)
|
|
|
+ {
|
|
|
+ isInternal = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
|
|
|
+ {
|
|
|
+ return new CRoxieServerDictionaryResultWriteActivity(this, _probeManager, graphId);
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+IRoxieServerActivityFactory *createRoxieServerDictionaryResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, unsigned _graphId, bool _isRoot)
|
|
|
+{
|
|
|
+ return new CRoxieServerDictionaryResultWriteActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _usageCount, _graphId, _isRoot);
|
|
|
+}
|
|
|
+
|
|
|
//=================================================================================
|
|
|
|
|
|
class CRoxieServerGraphLoopResultReadActivity : public CRoxieServerActivity
|
|
@@ -6241,6 +6257,8 @@ public:
|
|
|
graph = NULL;
|
|
|
}
|
|
|
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
virtual void onCreate(IRoxieSlaveContext *_ctx, IHThorArg *_colocalParent)
|
|
|
{
|
|
|
CRoxieServerInternalSinkActivity::onCreate(_ctx, _colocalParent);
|
|
@@ -6249,31 +6267,24 @@ public:
|
|
|
|
|
|
virtual void onExecute()
|
|
|
{
|
|
|
- ConstPointerArray rows;
|
|
|
- try
|
|
|
+ // MORE - could probably common up this code snippet - it appears in a few places.
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
+ loop
|
|
|
{
|
|
|
- loop
|
|
|
+ const void *nextrec = input->nextInGroup();
|
|
|
+ if (!nextrec)
|
|
|
{
|
|
|
- const void *nextrec = input->nextInGroup();
|
|
|
+ nextrec = input->nextInGroup();
|
|
|
if (!nextrec)
|
|
|
- {
|
|
|
- nextrec = input->nextInGroup();
|
|
|
- if (!nextrec)
|
|
|
- break;
|
|
|
- rows.append(NULL);
|
|
|
- }
|
|
|
- rows.append(nextrec);
|
|
|
+ break;
|
|
|
+ builder.appendEOG();
|
|
|
}
|
|
|
-
|
|
|
- IOutputMetaData *outputMeta = input->queryOutputMeta();
|
|
|
- Owned<CGraphResult> result = new CGraphResult(ctx->queryCodeContext()->getRowAllocator(outputMeta, activityId), rows, true);
|
|
|
- graph->setGraphLoopResult(result);
|
|
|
- }
|
|
|
- catch(...)
|
|
|
- {
|
|
|
- ReleaseRoxieRowSet(rows);
|
|
|
- throw;
|
|
|
+ builder.appendOwn(nextrec);
|
|
|
}
|
|
|
+
|
|
|
+ IOutputMetaData *outputMeta = input->queryOutputMeta();
|
|
|
+ Owned<CGraphResult> result = new CGraphResult(builder.getcount(), builder.linkrows());
|
|
|
+ graph->setGraphLoopResult(result);
|
|
|
}
|
|
|
|
|
|
virtual IRoxieInput *queryOutput(unsigned idx)
|
|
@@ -13678,28 +13689,32 @@ class CPointerArrayRoxieInput : public CPseudoRoxieInput
|
|
|
public:
|
|
|
CPointerArrayRoxieInput()
|
|
|
{
|
|
|
- array = NULL;
|
|
|
+ rowset = NULL;
|
|
|
+ rowcount = 0;
|
|
|
curRow = 0;
|
|
|
}
|
|
|
- void init(ConstPointerArray * _array)
|
|
|
+ void init(size32_t _rowcount, byte **_rowset)
|
|
|
{
|
|
|
- array = _array;
|
|
|
+ rowset = _rowset;
|
|
|
+ rowcount = _rowcount;
|
|
|
curRow = 0;
|
|
|
}
|
|
|
virtual const void * nextInGroup()
|
|
|
{
|
|
|
- if (array->isItem(curRow))
|
|
|
+ if (curRow < rowcount)
|
|
|
{
|
|
|
- const void * ret = array->item(curRow);
|
|
|
- array->replace(NULL, curRow);
|
|
|
+ const void * ret = rowset[curRow];
|
|
|
+ if (ret)
|
|
|
+ LinkRoxieRow(ret);
|
|
|
curRow++;
|
|
|
return ret;
|
|
|
}
|
|
|
return NULL;
|
|
|
}
|
|
|
protected:
|
|
|
- ConstPointerArray * array;
|
|
|
- unsigned curRow;
|
|
|
+ byte **rowset;
|
|
|
+ size32_t rowcount;
|
|
|
+ size32_t curRow;
|
|
|
};
|
|
|
|
|
|
class CRoxieServerLoopActivity : public CRoxieServerActivity
|
|
@@ -13727,6 +13742,8 @@ public:
|
|
|
maxIterations = 0;
|
|
|
}
|
|
|
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
{
|
|
|
eof = false;
|
|
@@ -13752,7 +13769,9 @@ public:
|
|
|
{
|
|
|
void * counterRow = ctx->queryRowManager().allocate(sizeof(thor_loop_counter_t), activityId);
|
|
|
*((thor_loop_counter_t *)counterRow) = counter;
|
|
|
- Owned<IGraphResult> counterResult = new CGraphResult(ctx->queryCodeContext()->getRowAllocator(counterMeta, activityId), counterRow, true);
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
+ builder.appendOwn(counterRow);
|
|
|
+ Owned<CGraphResult> counterResult = new CGraphResult(builder.getcount(), builder.linkrows());
|
|
|
graph->setInputResult(2, counterResult);
|
|
|
}
|
|
|
}
|
|
@@ -13765,7 +13784,7 @@ class CRoxieServerSequentialLoopActivity : public CRoxieServerLoopActivity
|
|
|
Owned<IActivityGraph> loopQuery;
|
|
|
Owned<IRoxieServerChildGraph> loopGraph;
|
|
|
IRoxieInput * curInput;
|
|
|
- ConstPointerArray loopPending;
|
|
|
+ RtlLinkedDatasetBuilder *builder;
|
|
|
CPointerArrayRoxieInput arrayInput;
|
|
|
Linked<IRoxieInput> resultInput;
|
|
|
unsigned loopCounter;
|
|
@@ -13776,8 +13795,11 @@ public:
|
|
|
{
|
|
|
curInput = NULL;
|
|
|
loopCounter = 0;
|
|
|
+ builder = NULL;
|
|
|
}
|
|
|
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
virtual void onCreate(IRoxieSlaveContext *_ctx, IHThorArg *_colocalParent)
|
|
|
{
|
|
|
CRoxieServerLoopActivity::onCreate(_ctx, _colocalParent);
|
|
@@ -13792,11 +13814,13 @@ public:
|
|
|
|
|
|
//MORE: Not sure about this, should IRoxieServerChildGraph be combined with IActivityGraph?
|
|
|
loopGraph.set(loopQuery->queryLoopGraph());
|
|
|
+ builder = new RtlLinkedDatasetBuilder(rowAllocator);
|
|
|
}
|
|
|
|
|
|
virtual void stop(bool aborting)
|
|
|
{
|
|
|
- ReleaseRoxieRowSet(loopPending);
|
|
|
+ delete builder;
|
|
|
+ builder = NULL;
|
|
|
CRoxieServerLoopActivity::stop(aborting);
|
|
|
}
|
|
|
|
|
@@ -13805,7 +13829,6 @@ public:
|
|
|
ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext());
|
|
|
if (eof)
|
|
|
return NULL;
|
|
|
-
|
|
|
unsigned emptyIterations = 0;
|
|
|
loop
|
|
|
{
|
|
@@ -13832,7 +13855,7 @@ public:
|
|
|
processed++;
|
|
|
return ret;
|
|
|
}
|
|
|
- loopPending.append(ret);
|
|
|
+ builder->appendOwn(ret);
|
|
|
}
|
|
|
|
|
|
switch (activityKind)
|
|
@@ -13841,15 +13864,15 @@ public:
|
|
|
{
|
|
|
if (!(flags & IHThorLoopArg::LFnewloopagain))
|
|
|
{
|
|
|
- if (!helper.loopAgain(loopCounter, loopPending.ordinality(), (const void * *)loopPending.getArray()))
|
|
|
+ if (!helper.loopAgain(loopCounter, builder->getcount(), (const void**) builder->linkrows()))
|
|
|
{
|
|
|
- if (loopPending.ordinality() == 0)
|
|
|
+ if (builder->getcount() == 0)
|
|
|
{
|
|
|
eof = true;
|
|
|
return NULL;
|
|
|
}
|
|
|
-
|
|
|
- arrayInput.init(&loopPending);
|
|
|
+ arrayInput.init(builder->getcount(), builder->linkrows());
|
|
|
+ // MORE - should builder be cleared here?
|
|
|
curInput = &arrayInput;
|
|
|
finishedLooping = true;
|
|
|
continue; // back to the input loop again
|
|
@@ -13858,7 +13881,7 @@ public:
|
|
|
break;
|
|
|
}
|
|
|
case TAKlooprow:
|
|
|
- if (loopPending.empty())
|
|
|
+ if (!builder->getcount())
|
|
|
{
|
|
|
finishedLooping = true;
|
|
|
eof = true;
|
|
@@ -13867,7 +13890,7 @@ public:
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- if (loopPending.ordinality())
|
|
|
+ if (builder->getcount())
|
|
|
emptyIterations = 0;
|
|
|
else
|
|
|
{
|
|
@@ -13882,7 +13905,7 @@ public:
|
|
|
checkAbort();
|
|
|
try
|
|
|
{
|
|
|
- Owned<IRoxieGraphResults> results = executeIteration(loopExtractBuilder.size(), loopExtractBuilder.getbytes(), loopCounter, loopPending);
|
|
|
+ Owned<IRoxieGraphResults> results = executeIteration(loopExtractBuilder.size(), loopExtractBuilder.getbytes(), loopCounter);
|
|
|
resultInput.setown(results->createIterator(0));
|
|
|
|
|
|
if (flags & IHThorLoopArg::LFnewloopagain)
|
|
@@ -13908,13 +13931,14 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- IRoxieGraphResults * executeIteration(unsigned parentExtractSize, const byte *parentExtract, unsigned counter, ConstPointerArray & rows)
|
|
|
+ IRoxieGraphResults * executeIteration(unsigned parentExtractSize, const byte *parentExtract, unsigned counter)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
loopGraph->beforeExecute();
|
|
|
|
|
|
- Owned<IGraphResult> inputRowsResult = new CGraphResult(ctx->queryCodeContext()->getRowAllocator(meta.queryOriginal(), activityId), rows, true);
|
|
|
+ Owned<IGraphResult> inputRowsResult = new CGraphResult(builder->getcount(), builder->linkrows());
|
|
|
+ builder->clear();
|
|
|
loopGraph->setInputResult(1, inputRowsResult);
|
|
|
|
|
|
createCounterResult(loopGraph, counter);
|
|
@@ -14455,6 +14479,8 @@ public:
|
|
|
maxIterations = 0;
|
|
|
}
|
|
|
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
|
|
|
{
|
|
|
CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
|
|
@@ -14475,15 +14501,15 @@ public:
|
|
|
GraphExtractBuilder.clear();
|
|
|
}
|
|
|
|
|
|
- virtual bool needsAllocator() const { return true; }
|
|
|
-
|
|
|
void createCounterResult(IRoxieServerChildGraph * graph, unsigned counter)
|
|
|
{
|
|
|
if (flags & IHThorGraphLoopArg::GLFcounter)
|
|
|
{
|
|
|
void * counterRow = ctx->queryRowManager().allocate(sizeof(thor_loop_counter_t), activityId);
|
|
|
*((thor_loop_counter_t *)counterRow) = counter;
|
|
|
- Owned<IGraphResult> counterResult = new CGraphResult(ctx->queryCodeContext()->getRowAllocator(counterMeta, activityId), counterRow, true);
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
+ builder.appendOwn(counterRow);
|
|
|
+ Owned<CGraphResult> counterResult = new CGraphResult(builder.getcount(), builder.linkrows());
|
|
|
graph->setInputResult(0, counterResult);
|
|
|
}
|
|
|
}
|
|
@@ -14564,29 +14590,23 @@ public:
|
|
|
void createInitialGraphInput()
|
|
|
{
|
|
|
loopGraph->clearGraphLoopResults();
|
|
|
- ConstPointerArray rows;
|
|
|
- try
|
|
|
+ // MORE - could probably common up this code snippet - it appears in a few places.
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
+ loop
|
|
|
{
|
|
|
- loop
|
|
|
+ const void *nextrec = input->nextInGroup();
|
|
|
+ if (!nextrec)
|
|
|
{
|
|
|
- const void *nextrec = input->nextInGroup();
|
|
|
+ nextrec = input->nextInGroup();
|
|
|
if (!nextrec)
|
|
|
- {
|
|
|
- nextrec = input->nextInGroup();
|
|
|
- if (!nextrec)
|
|
|
- break;
|
|
|
- rows.append(NULL);
|
|
|
- }
|
|
|
- rows.append(nextrec);
|
|
|
+ break;
|
|
|
+ builder.appendEOG();
|
|
|
}
|
|
|
+ builder.appendOwn(nextrec);
|
|
|
}
|
|
|
- catch(IException *)
|
|
|
- {
|
|
|
- ReleaseRoxieRowSet(rows);
|
|
|
- throw;
|
|
|
- }
|
|
|
+
|
|
|
IOutputMetaData *outputMeta = input->queryOutputMeta();
|
|
|
- Owned<CGraphResult> result = new CGraphResult(ctx->queryCodeContext()->getRowAllocator(outputMeta, activityId), rows, true);
|
|
|
+ Owned<CGraphResult> result = new CGraphResult(builder.getcount(), builder.linkrows());
|
|
|
loopGraph->setGraphLoopResult(0, result);
|
|
|
}
|
|
|
void executeEntireGraph()
|
|
@@ -19406,7 +19426,6 @@ public:
|
|
|
assertex(storedName && *storedName);
|
|
|
assertex(sequence < 0);
|
|
|
|
|
|
- __int64 initialProcessed = processed;
|
|
|
RtlLinkedDictionaryBuilder builder(rowAllocator, helper.queryHashLookupInfo());
|
|
|
loop
|
|
|
{
|
|
@@ -25227,7 +25246,7 @@ public:
|
|
|
}
|
|
|
virtual void getResult(unsigned & lenResult, void * & result, unsigned id)
|
|
|
{
|
|
|
- select(id).getResult(lenResult, result);
|
|
|
+ throwUnexpected(); // Only required in legacy thor implementation
|
|
|
}
|
|
|
virtual void getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
|
|
|
{
|
|
@@ -32467,9 +32486,9 @@ bool MergeActivityTest::isDedup = false;
|
|
|
extern "C" IHThorArg * mergeActivityTestFactory() { return new MergeActivityTest; }
|
|
|
|
|
|
struct PrefetchProjectActivityTest : public ccdserver_hqlhelper::CThorPrefetchProjectArg {
|
|
|
- virtual IOutputMetaData * queryOutputMeta()
|
|
|
+ virtual IOutputMetaData * queryOutputMeta()
|
|
|
{
|
|
|
- return &testMeta;
|
|
|
+ return &testMeta;
|
|
|
}
|
|
|
virtual void onCreate(ICodeContext * _ctx, IHThorArg *, MemoryBuffer * in) {
|
|
|
ctx = _ctx;
|