|
@@ -19247,6 +19247,8 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
virtual void onExecute()
|
|
|
{
|
|
|
int sequence = helper.getSequence();
|
|
@@ -19272,18 +19274,14 @@ public:
|
|
|
rowSerializer.setown(rowAllocator->createRowSerializer(ctx->queryCodeContext()));
|
|
|
}
|
|
|
__int64 initialProcessed = processed;
|
|
|
- ConstPointerArray *lResult = NULL;
|
|
|
- if (saveInContext)
|
|
|
- lResult = new ConstPointerArray;
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
loop
|
|
|
{
|
|
|
const void *row = input->nextInGroup();
|
|
|
- if (lResult)
|
|
|
+ if (saveInContext)
|
|
|
{
|
|
|
- if (row)
|
|
|
- LinkRoxieRow(row);
|
|
|
if (row || grouped)
|
|
|
- lResult->append(row);
|
|
|
+ builder.append(row);
|
|
|
}
|
|
|
if (grouped && (processed != initialProcessed))
|
|
|
{
|
|
@@ -19304,12 +19302,8 @@ public:
|
|
|
row = input->nextInGroup();
|
|
|
if (!row)
|
|
|
break;
|
|
|
- if (lResult)
|
|
|
- {
|
|
|
- if (row)
|
|
|
- LinkRoxieRow(row);
|
|
|
- lResult->append(row);
|
|
|
- }
|
|
|
+ if (saveInContext)
|
|
|
+ builder.append(row);
|
|
|
}
|
|
|
processed++;
|
|
|
if (serverContext->outputResultsToWorkUnit())
|
|
@@ -19346,8 +19340,8 @@ public:
|
|
|
}
|
|
|
ReleaseRoxieRow(row);
|
|
|
}
|
|
|
- if (lResult)
|
|
|
- serverContext->appendResultDeserialized(storedName, sequence, lResult, (helper.getFlags() & POFextend) != 0, LINK(meta.queryOriginal()));
|
|
|
+ if (saveInContext)
|
|
|
+ serverContext->appendResultDeserialized(storedName, sequence, builder.getcount(), builder.linkrows(), (helper.getFlags() & POFextend) != 0, LINK(meta.queryOriginal()));
|
|
|
if (serverContext->outputResultsToWorkUnit())
|
|
|
serverContext->appendResultRawContext(storedName, sequence, result.length(), result.toByteArray(), processed, (helper.getFlags() & POFextend) != 0, false); // MORE - shame to do extra copy...
|
|
|
}
|
|
@@ -19363,7 +19357,7 @@ public:
|
|
|
{
|
|
|
isReread = usageCount > 0;
|
|
|
Owned<IHThorWorkUnitWriteArg> helper = (IHThorWorkUnitWriteArg *) helperFactory();
|
|
|
- isInternal = (helper->getSequence()==-3);
|
|
|
+ isInternal = (helper->getSequence()==ResultSequenceInternal);
|
|
|
}
|
|
|
|
|
|
virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
|
|
@@ -19379,6 +19373,81 @@ IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsig
|
|
|
|
|
|
}
|
|
|
|
|
|
+//=====================================================================================================
|
|
|
+
|
|
|
+class CRoxieServerWorkUnitWriteDictActivity : public CRoxieServerInternalSinkActivity
|
|
|
+{
|
|
|
+ IHThorDictionaryWorkUnitWriteArg &helper;
|
|
|
+ IRoxieServerContext *serverContext;
|
|
|
+
|
|
|
+public:
|
|
|
+ CRoxieServerWorkUnitWriteDictActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
+ : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDictionaryWorkUnitWriteArg &)basehelper)
|
|
|
+ {
|
|
|
+ serverContext = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void onCreate(IRoxieSlaveContext *_ctx, IHThorArg *_colocalParent)
|
|
|
+ {
|
|
|
+ CRoxieServerInternalSinkActivity::onCreate(_ctx, _colocalParent);
|
|
|
+ serverContext = ctx->queryServerContext();
|
|
|
+ if (!serverContext)
|
|
|
+ {
|
|
|
+ throw MakeStringException(ROXIE_PIPE_ERROR, "Write Dictionary activity cannot be executed in slave context");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual bool needsAllocator() const { return true; }
|
|
|
+
|
|
|
+ virtual void onExecute()
|
|
|
+ {
|
|
|
+ int sequence = helper.getSequence();
|
|
|
+ const char *storedName = helper.queryName();
|
|
|
+ assertex(storedName && *storedName);
|
|
|
+ assertex(sequence < 0);
|
|
|
+
|
|
|
+ __int64 initialProcessed = processed;
|
|
|
+ RtlLinkedDictionaryBuilder builder(rowAllocator, helper.queryHashLookupInfo());
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ const void *row = input->nextInGroup();
|
|
|
+ if (!row)
|
|
|
+ {
|
|
|
+ row = input->nextInGroup();
|
|
|
+ if (!row)
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ builder.appendOwn(row);
|
|
|
+ processed++;
|
|
|
+ }
|
|
|
+ serverContext->appendResultDeserialized(storedName, sequence, builder.getcount(), builder.linkrows(), (helper.getFlags() & POFextend) != 0, LINK(meta.queryOriginal()));
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+class CRoxieServerWorkUnitWriteDictActivityFactory : public CRoxieServerInternalSinkFactory
|
|
|
+{
|
|
|
+
|
|
|
+public:
|
|
|
+ CRoxieServerWorkUnitWriteDictActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot)
|
|
|
+ : CRoxieServerInternalSinkFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _usageCount, _isRoot)
|
|
|
+ {
|
|
|
+ Owned<IHThorDictionaryWorkUnitWriteArg> helper = (IHThorDictionaryWorkUnitWriteArg *) helperFactory();
|
|
|
+ isInternal = (helper->getSequence()==ResultSequenceInternal);
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
|
|
|
+ {
|
|
|
+ return new CRoxieServerWorkUnitWriteDictActivity(this, _probeManager);
|
|
|
+ }
|
|
|
+
|
|
|
+};
|
|
|
+
|
|
|
+IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteDictActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot)
|
|
|
+{
|
|
|
+ return new CRoxieServerWorkUnitWriteDictActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _usageCount, _isRoot);
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
//=================================================================================
|
|
|
|
|
|
class CRoxieServerRemoteResultActivity : public CRoxieServerInternalSinkActivity
|
|
@@ -19406,7 +19475,7 @@ public:
|
|
|
: CRoxieServerInternalSinkFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _usageCount, _isRoot)
|
|
|
{
|
|
|
Owned<IHThorRemoteResultArg> helper = (IHThorRemoteResultArg *) helperFactory();
|
|
|
- isInternal = (helper->getSequence()==-3);
|
|
|
+ isInternal = (helper->getSequence()==ResultSequenceInternal);
|
|
|
}
|
|
|
|
|
|
virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
|
|
@@ -25164,6 +25233,10 @@ public:
|
|
|
{
|
|
|
select(id).getLinkedResult(count, ret);
|
|
|
}
|
|
|
+ virtual void getDictionaryResult(unsigned & count, byte * * & ret, unsigned id)
|
|
|
+ {
|
|
|
+ select(id).getLinkedResult(count, ret);
|
|
|
+ }
|
|
|
void setResult(unsigned id, IGraphResult * result)
|
|
|
{
|
|
|
CriticalBlock procedure(cs);
|
|
@@ -27056,6 +27129,10 @@ public:
|
|
|
{
|
|
|
results->getLinkedResult(count, ret, id);
|
|
|
}
|
|
|
+ virtual void getDictionaryResult(unsigned & count, byte * * & ret, unsigned id)
|
|
|
+ {
|
|
|
+ results->getLinkedResult(count, ret, id);
|
|
|
+ }
|
|
|
virtual void setResult(unsigned id, IGraphResult * result)
|
|
|
{
|
|
|
results->setResult(id, result);
|
|
@@ -28171,6 +28248,7 @@ public:
|
|
|
virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { throwUnexpected(); }
|
|
|
virtual unsigned getResultHash(const char * name, unsigned sequence) { throwUnexpected(); }
|
|
|
virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }
|
|
|
+ virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IOutputRowDeserializer * deserializer, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { throwUnexpected(); }
|
|
|
|
|
|
virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) { throwUnexpected(); }
|
|
|
|
|
@@ -28412,13 +28490,13 @@ public:
|
|
|
if (!type || stricmp(type, "temporary"))
|
|
|
{
|
|
|
output->outputBeginNested("Temporary", true);
|
|
|
- ctx->printResults(output, name, (unsigned) -3);
|
|
|
+ ctx->printResults(output, name, (unsigned) ResultSequenceInternal);
|
|
|
output->outputEndNested("Temporary");
|
|
|
}
|
|
|
if (!type || stricmp(type, "global"))
|
|
|
{
|
|
|
output->outputBeginNested("Global", true);
|
|
|
- ctx->printResults(output, name, (unsigned) -1);
|
|
|
+ ctx->printResults(output, name, (unsigned) ResultSequenceStored);
|
|
|
output->outputEndNested("Global");
|
|
|
}
|
|
|
output->outputEndNested("Variables");
|
|
@@ -28426,21 +28504,26 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
+typedef byte *row_t;
|
|
|
+typedef row_t * rowset_t;
|
|
|
+
|
|
|
class DeserializedDataReader : public CInterface, implements IWorkUnitRowReader
|
|
|
{
|
|
|
- const ConstPointerArray &data;
|
|
|
+ const rowset_t data;
|
|
|
+ size32_t count;
|
|
|
unsigned idx;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- DeserializedDataReader(const ConstPointerArray &_data) : data(_data)
|
|
|
+ DeserializedDataReader(size32_t _count, rowset_t _data)
|
|
|
+ : data(_data), count(_count)
|
|
|
{
|
|
|
idx = 0;
|
|
|
}
|
|
|
virtual const void * nextInGroup()
|
|
|
{
|
|
|
- if (data.isItem(idx))
|
|
|
+ if (idx < count)
|
|
|
{
|
|
|
- const void *row = data.item(idx);
|
|
|
+ const void *row = data[idx];
|
|
|
if (row)
|
|
|
LinkRoxieRow(row);
|
|
|
idx++;
|
|
@@ -28448,11 +28531,19 @@ public:
|
|
|
}
|
|
|
return NULL;
|
|
|
}
|
|
|
+ virtual void getResultRowset(size32_t & tcount, byte * * & tgt)
|
|
|
+ {
|
|
|
+ tcount = count;
|
|
|
+ if (data)
|
|
|
+ rtlLinkRowset(data);
|
|
|
+ tgt = data;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
class CDeserializedResultStore : public CInterface, implements IDeserializedResultStore
|
|
|
{
|
|
|
- PointerArrayOf<ConstPointerArray> stored;
|
|
|
+ PointerArrayOf<row_t> stored;
|
|
|
+ UnsignedArray counts;
|
|
|
PointerIArrayOf<IOutputMetaData> metas;
|
|
|
mutable SpinLock lock;
|
|
|
public:
|
|
@@ -28461,63 +28552,48 @@ public:
|
|
|
{
|
|
|
ForEachItemIn(idx, stored)
|
|
|
{
|
|
|
- ConstPointerArray *rows = stored.item(idx);
|
|
|
+ rowset_t rows = stored.item(idx);
|
|
|
if (rows)
|
|
|
{
|
|
|
- ReleaseRoxieRowSet(*rows);
|
|
|
- delete rows;
|
|
|
+ rtlReleaseRowset(counts.item(idx), (byte**) rows);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- virtual int addResult(ConstPointerArray *data, IOutputMetaData *meta)
|
|
|
+ virtual int addResult(size32_t count, rowset_t data, IOutputMetaData *meta)
|
|
|
{
|
|
|
SpinBlock b(lock);
|
|
|
stored.append(data);
|
|
|
+ counts.append(count);
|
|
|
metas.append(meta);
|
|
|
return stored.ordinality()-1;
|
|
|
}
|
|
|
- virtual int appendResult(int oldId, ConstPointerArray *data, IOutputMetaData *meta)
|
|
|
+ virtual void queryResult(int id, size32_t &count, rowset_t &data) const
|
|
|
{
|
|
|
- SpinBlock b(lock);
|
|
|
- ConstPointerArray *oldData = stored.item(oldId);
|
|
|
- ConstPointerArray *newData = new ConstPointerArray;
|
|
|
- ForEachItemIn(idx, *oldData)
|
|
|
- {
|
|
|
- const void * row = oldData->item(idx);
|
|
|
- if (row)
|
|
|
- LinkRoxieRow(row);
|
|
|
- newData->append(row);
|
|
|
- }
|
|
|
- ForEachItemIn(idx2, *data)
|
|
|
- {
|
|
|
- const void * row = data->item(idx2);
|
|
|
- newData->append(row); // don't link these rows, because...
|
|
|
- }
|
|
|
- delete data; // ...then we don't need to release them when we delete this array
|
|
|
- // Note that we don't delete the previons ConstPointerArray yet, as it could be in use.
|
|
|
- return addResult(newData, meta);
|
|
|
+ count = counts.item(id);
|
|
|
+ data = stored.item(id);
|
|
|
}
|
|
|
virtual IWorkUnitRowReader *createDeserializedReader(int id) const
|
|
|
{
|
|
|
- return new DeserializedDataReader(*stored.item(id));
|
|
|
+ return new DeserializedDataReader(counts.item(id), stored.item(id));
|
|
|
}
|
|
|
virtual void serialize(unsigned & tlen, void * & tgt, int id, ICodeContext *codectx) const
|
|
|
{
|
|
|
IOutputMetaData *meta = metas.item(id);
|
|
|
- ConstPointerArray *data = stored.item(id);
|
|
|
+ rowset_t data = stored.item(id);
|
|
|
+ size32_t count = counts.item(id);
|
|
|
|
|
|
MemoryBuffer result;
|
|
|
- Owned<IOutputRowSerializer> rowSerializer = meta->createRowSerializer(codectx, 0); // NOTE - we don't have a maningful activity id. Only used for error reporting.
|
|
|
+ Owned<IOutputRowSerializer> rowSerializer = meta->createRowSerializer(codectx, 0); // NOTE - we don't have a meaningful activity id. Only used for error reporting.
|
|
|
bool grouped = meta->isGrouped();
|
|
|
- ForEachItemIn(idx, *data)
|
|
|
+ for (size32_t idx = 0; idx<count; idx++)
|
|
|
{
|
|
|
- const void *row = data->item(idx);
|
|
|
+ const byte *row = data[idx];
|
|
|
if (grouped && idx)
|
|
|
result.append(row == NULL);
|
|
|
if (row)
|
|
|
{
|
|
|
CThorDemoRowSerializer serializerTarget(result);
|
|
|
- rowSerializer->serialize(serializerTarget, (const byte *) row);
|
|
|
+ rowSerializer->serialize(serializerTarget, row);
|
|
|
}
|
|
|
}
|
|
|
tlen = result.length();
|
|
@@ -29058,7 +29134,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- virtual void appendResultDeserialized(const char *name, unsigned sequence, ConstPointerArray *data, bool extend, IOutputMetaData *meta)
|
|
|
+ virtual void appendResultDeserialized(const char *name, unsigned sequence, size32_t count, rowset_t data, bool extend, IOutputMetaData *meta)
|
|
|
{
|
|
|
CriticalBlock b(contextCrit);
|
|
|
IPropertyTree &ctx = useContext(sequence);
|
|
@@ -29066,16 +29142,26 @@ public:
|
|
|
IPropertyTree *val = ctx.queryPropTree(name);
|
|
|
if (extend && val)
|
|
|
{
|
|
|
- int old = val->getPropInt("@id", -1);
|
|
|
- assertex(old!=-1);
|
|
|
- val->setPropInt("@id", resultStore.appendResult(old, data, meta));
|
|
|
+ int oldId = val->getPropInt("@id", -1);
|
|
|
+ const char * oldFormat = val->queryProp("@format");
|
|
|
+ assertex(oldId != -1);
|
|
|
+ assertex(oldFormat && strcmp(oldFormat, "deserialized")==0);
|
|
|
+ size32_t oldCount;
|
|
|
+ rowset_t oldData;
|
|
|
+ resultStore.queryResult(oldId, oldCount, oldData);
|
|
|
+ Owned<IEngineRowAllocator> allocator = createRoxieRowAllocator(*rowManager, meta, 0, 0, roxiemem::RHFnone);
|
|
|
+ RtlLinkedDatasetBuilder builder(allocator);
|
|
|
+ builder.appendRows(oldCount, oldData);
|
|
|
+ builder.appendRows(count, data);
|
|
|
+ rtlReleaseRowset(count, data);
|
|
|
+ val->setPropInt("@id", resultStore.addResult(builder.getcount(), builder.linkrows(), meta));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
if (!val)
|
|
|
val = ctx.addPropTree(name, createPTree());
|
|
|
val->setProp("@format", "deserialized");
|
|
|
- val->setPropInt("@id", resultStore.addResult(data, meta));
|
|
|
+ val->setPropInt("@id", resultStore.addResult(count, data, meta));
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -29680,24 +29766,8 @@ public:
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- bool atEOG = true;
|
|
|
Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(stepname, sequence, xmlTransformer, _rowAllocator, isGrouped);
|
|
|
- RtlLinkedDatasetBuilder builder(_rowAllocator);
|
|
|
- loop
|
|
|
- {
|
|
|
- const void *ret = wuReader->nextInGroup();
|
|
|
- if (!ret)
|
|
|
- {
|
|
|
- if (atEOG || !isGrouped)
|
|
|
- break;
|
|
|
- atEOG = true;
|
|
|
- }
|
|
|
- else
|
|
|
- atEOG = false;
|
|
|
- builder.appendOwn(ret);
|
|
|
- }
|
|
|
- tcount = builder.getcount();
|
|
|
- tgt = builder.linkrows();
|
|
|
+ wuReader->getResultRowset(tcount, tgt);
|
|
|
}
|
|
|
catch (IException * e)
|
|
|
{
|
|
@@ -29712,6 +29782,25 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * stepname, unsigned sequence, IOutputRowDeserializer * deserializer, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(stepname, sequence, xmlTransformer, _rowAllocator, false);
|
|
|
+ wuReader->getResultRowset(tcount, tgt);
|
|
|
+ }
|
|
|
+ catch (IException * e)
|
|
|
+ {
|
|
|
+ StringBuffer text;
|
|
|
+ e->errorMessage(text);
|
|
|
+ e->Release();
|
|
|
+ throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\". [%s]", stepname, text.str());
|
|
|
+ }
|
|
|
+ catch (...)
|
|
|
+ {
|
|
|
+ throw MakeStringException(ROXIE_DATA_ERROR, "Failed to retrieve data value \"%s\"", stepname);
|
|
|
+ }
|
|
|
+ }
|
|
|
virtual void getResultSet(bool & tisAll, unsigned & tlen, void * & tgt, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
|
|
|
{
|
|
|
try
|
|
@@ -29754,7 +29843,43 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class RawDataReader : public CInterface, implements IWorkUnitRowReader
|
|
|
+ class WorkUnitRowReaderBase : public CInterface, implements IWorkUnitRowReader
|
|
|
+ {
|
|
|
+ protected:
|
|
|
+ Linked<IEngineRowAllocator> rowAllocator;
|
|
|
+ bool isGrouped;
|
|
|
+
|
|
|
+ public:
|
|
|
+ IMPLEMENT_IINTERFACE;
|
|
|
+ WorkUnitRowReaderBase(IEngineRowAllocator *_rowAllocator, bool _isGrouped)
|
|
|
+ : rowAllocator(_rowAllocator), isGrouped(_isGrouped)
|
|
|
+ {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void getResultRowset(size32_t & tcount, byte * * & tgt)
|
|
|
+ {
|
|
|
+ bool atEOG = true;
|
|
|
+ RtlLinkedDatasetBuilder builder(rowAllocator);
|
|
|
+ loop
|
|
|
+ {
|
|
|
+ const void *ret = nextInGroup();
|
|
|
+ if (!ret)
|
|
|
+ {
|
|
|
+ if (atEOG || !isGrouped)
|
|
|
+ break;
|
|
|
+ atEOG = true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ atEOG = false;
|
|
|
+ builder.appendOwn(ret);
|
|
|
+ }
|
|
|
+ tcount = builder.getcount();
|
|
|
+ tgt = builder.linkrows();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ class RawDataReader : public WorkUnitRowReaderBase
|
|
|
{
|
|
|
protected:
|
|
|
const IRoxieContextLogger &logctx;
|
|
@@ -29764,8 +29889,6 @@ public:
|
|
|
CThorStreamDeserializerSource rowSource;
|
|
|
bool eof;
|
|
|
bool eogPending;
|
|
|
- bool isGrouped;
|
|
|
- Linked<IEngineRowAllocator> rowAllocator;
|
|
|
Owned<IOutputRowDeserializer> rowDeserializer;
|
|
|
|
|
|
virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base) = 0;
|
|
@@ -29783,9 +29906,8 @@ public:
|
|
|
}
|
|
|
|
|
|
public:
|
|
|
- IMPLEMENT_IINTERFACE;
|
|
|
RawDataReader(CRoxieServerContext *parent, IEngineRowAllocator *_rowAllocator, bool _isGrouped)
|
|
|
- : logctx(*parent), rowAllocator(_rowAllocator), isGrouped(_isGrouped)
|
|
|
+ : WorkUnitRowReaderBase(_rowAllocator, _isGrouped), logctx(*parent)
|
|
|
{
|
|
|
eof = false;
|
|
|
eogPending = false;
|
|
@@ -29928,17 +30050,16 @@ public:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- class InlineXmlDataReader : public CInterface, implements IWorkUnitRowReader
|
|
|
+ class InlineXmlDataReader : public WorkUnitRowReaderBase
|
|
|
{
|
|
|
Linked<IPropertyTree> xml;
|
|
|
Owned <XmlColumnProvider> columns;
|
|
|
Owned<IPropertyTreeIterator> rows;
|
|
|
IXmlToRowTransformer &rowTransformer;
|
|
|
- Linked<IEngineRowAllocator> rowAllocator;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- InlineXmlDataReader(IXmlToRowTransformer &_rowTransformer, IPropertyTree *_xml, IEngineRowAllocator *_rowAllocator)
|
|
|
- : xml(_xml), rowTransformer(_rowTransformer), rowAllocator(_rowAllocator)
|
|
|
+ InlineXmlDataReader(IXmlToRowTransformer &_rowTransformer, IPropertyTree *_xml, IEngineRowAllocator *_rowAllocator, bool _isGrouped)
|
|
|
+ : WorkUnitRowReaderBase(_rowAllocator, _isGrouped), xml(_xml), rowTransformer(_rowTransformer)
|
|
|
{
|
|
|
columns.setown(new XmlDatasetColumnProvider);
|
|
|
rows.setown(xml->getElements("Row")); // NOTE - the 'hack for Gordon' as found in thorxmlread is not implemented here. Does it need to be ?
|
|
@@ -29988,7 +30109,7 @@ public:
|
|
|
if (!format || strcmp(format, "xml") == 0)
|
|
|
{
|
|
|
if (xmlTransformer)
|
|
|
- return new InlineXmlDataReader(*xmlTransformer, val, rowAllocator);
|
|
|
+ return new InlineXmlDataReader(*xmlTransformer, val, rowAllocator, isGrouped);
|
|
|
}
|
|
|
else if (strcmp(format, "raw") == 0)
|
|
|
{
|