浏览代码

HPCC-8457 Add hthor support for reading/writing dictionary to workunit

Support getResultDictionary and the dictionary workunit write activity.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 年之前
父节点
当前提交
4135585515
共有 7 个文件被更改,包括 132 次插入2 次删除
  1. 7 2
      ecl/eclagent/eclagent.cpp
  2. 2 0
      ecl/eclagent/eclgraph.cpp
  3. 86 0
      ecl/hthor/hthor.cpp
  4. 1 0
      ecl/hthor/hthor.hpp
  5. 12 0
      ecl/hthor/hthor.ipp
  6. 22 0
      rtl/eclrtl/rtlds.cpp
  7. 2 0
      rtl/eclrtl/rtlds_imp.hpp

+ 7 - 2
ecl/eclagent/eclagent.cpp

@@ -1053,11 +1053,16 @@ void EclAgent::getResultRowset(size32_t & tcount, byte * * & tgt, const char * s
     );
 }
 
-void EclAgent::getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IOutputRowDeserializer * deserializer, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher)
+void EclAgent::getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * stepname, unsigned sequence, IOutputRowDeserializer * deserializer, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher)
 {
-    UNIMPLEMENTED;
     tcount = 0;
     tgt = NULL;
+    PROTECTED_GETRESULT(stepname, sequence, "Rowset", "rowset",
+        MemoryBuffer datasetBuffer;
+        MemoryBuffer2IDataVal result(datasetBuffer);
+        r->getResultRaw(result, NULL, NULL);
+        rtlDictionary2RowsetX(tcount, tgt, _rowAllocator, deserializer, datasetBuffer.length(), datasetBuffer.toByteArray());
+    );
 }
 
 const void * EclAgent::fromXml(IEngineRowAllocator * rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)

+ 2 - 0
ecl/eclagent/eclgraph.cpp

@@ -98,6 +98,8 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
         return createGroupActivity(agent, activityId, subgraphId, (IHThorGroupArg &)arg, kind);
     case TAKworkunitwrite:
         return createWorkUnitWriteActivity(agent, activityId, subgraphId, (IHThorWorkUnitWriteArg &)arg, kind);
+    case TAKdictionaryworkunitwrite:
+        return createDictionaryWorkUnitWriteActivity(agent, activityId, subgraphId, (IHThorDictionaryWorkUnitWriteArg &)arg, kind);
     case TAKfunnel:
         return createConcatActivity(agent, activityId, subgraphId, (IHThorFunnelArg &)arg, kind);
     case TAKapply:

+ 86 - 0
ecl/hthor/hthor.cpp

@@ -5884,6 +5884,91 @@ void CHThorWorkUnitWriteActivity::execute()
 
 //=====================================================================================================
 
+CHThorDictionaryWorkUnitWriteActivity::CHThorDictionaryWorkUnitWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryWorkUnitWriteArg &_arg, ThorActivityKind _kind)
+ : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
+{
+}
+
+void CHThorDictionaryWorkUnitWriteActivity::execute()
+{
+    size32_t outputLimit = agent.queryWorkUnit()->getDebugValueInt("outputLimit", defaultWorkUnitWriteLimit) * 0x100000;
+    MemoryBuffer rowdata;
+    IRecordSize * inputMeta = input->queryOutputMeta();
+
+    Owned<IOutputRowSerializer> rowSerializer;
+    if (input->queryOutputMeta()->getMetaFlags() & MDFneedserialize)
+        rowSerializer.setown( input->queryOutputMeta()->createRowSerializer(agent.queryCodeContext(), activityId) );
+
+    int sequence = helper.getSequence();
+    const char *storedName = helper.queryName();
+    assertex(storedName && *storedName);
+    assertex(sequence < 0);
+
+    RtlLinkedDictionaryBuilder builder(rowAllocator, helper.queryHashLookupInfo());
+    loop
+    {
+        const void *row = input->nextInGroup();
+        if (!row)
+        {
+            row = input->nextInGroup();
+            if (!row)
+                break;
+        }
+        builder.appendOwn(row);
+        processed++;
+    }
+
+    size32_t rows = builder.getcount();
+    byte **dict = builder.queryrows();
+    size32_t idx = 0;
+    while (idx < rows)
+    {
+        byte numRows = 0;
+        while (numRows < 255 && idx+numRows < rows && dict[idx+numRows] != NULL)
+            numRows++;
+        rowdata.append(1, &numRows);
+        for (int i = 0; i < numRows; i++)
+        {
+            byte *nextrec = dict[idx+i];
+            assert(nextrec);
+            size32_t thisSize = inputMeta->getRecordSize(nextrec);
+            if(outputLimit && ((rowdata.length() + thisSize) > outputLimit))
+            {
+                StringBuffer errMsg("Dictionary too large to output to workunit (limit ");
+                errMsg.append(outputLimit/0x100000).append(") megabytes, in result (");
+                const char *name = helper.queryName();
+                if (name)
+                    errMsg.append("name=").append(name);
+                else
+                    errMsg.append("sequence=").append(helper.getSequence());
+                errMsg.append(")");
+                throw MakeStringException(0, "%s", errMsg.str());
+            }
+            if (rowSerializer)
+            {
+                CThorDemoRowSerializer serializerTarget(rowdata);
+                rowSerializer->serialize(serializerTarget, nextrec );
+            }
+            else
+                rowdata.append(thisSize, nextrec);
+        }
+        idx += numRows;
+        byte numNulls = 0;
+        while (numNulls < 255 && idx+numNulls < rows && dict[idx+numNulls] == NULL)
+            numNulls++;
+        rowdata.append(1, &numNulls);
+        idx += numNulls;
+    }
+    WorkunitUpdate w = agent.updateWorkUnit();
+    Owned<IWUResult> result = updateWorkUnitResult(w, helper.queryName(), helper.getSequence());
+    result->setResultRaw(rowdata.length(), rowdata.toByteArray(), ResultFormatRaw);
+    result->setResultStatus(ResultStatusCalculated);
+    result->setResultRowCount(rows);
+    result->setResultTotalRowCount(rows); // Is this right??
+}
+
+//=====================================================================================================
+
 CHThorCountActivity::CHThorCountActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCountArg &_arg, ThorActivityKind _kind)
  : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
 {
@@ -9767,6 +9852,7 @@ MAKEFACTORY_ARG(SelfJoin, Join);
 MAKEFACTORY_ARG(LookupJoin, HashJoin);
 MAKEFACTORY(AllJoin);
 MAKEFACTORY(WorkUnitWrite);
+MAKEFACTORY(DictionaryWorkUnitWrite);
 MAKEFACTORY(FirstN);
 MAKEFACTORY(Count);
 MAKEFACTORY(TempTable);

+ 1 - 0
ecl/hthor/hthor.hpp

@@ -104,6 +104,7 @@ extern HTHOR_API IHThorActivity *createSelfJoinActivity(IAgentContext &, unsigne
 extern HTHOR_API IHThorActivity *createLookupJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorHashJoinArg & arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createAllJoinActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorAllJoinArg & arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createWorkUnitWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorWorkUnitWriteArg &arg, ThorActivityKind kind);
+extern HTHOR_API IHThorActivity *createDictionaryWorkUnitWriteActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryWorkUnitWriteArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createFirstNActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorFirstNArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createCountActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorCountArg &arg, ThorActivityKind kind);
 extern HTHOR_API IHThorActivity *createTempTableActivity(IAgentContext &, unsigned _activityId, unsigned _subgraphId, IHThorTempTableArg &arg, ThorActivityKind kind);

+ 12 - 0
ecl/hthor/hthor.ipp

@@ -1468,6 +1468,18 @@ public:
     virtual void execute();
 };
 
+class CHThorDictionaryWorkUnitWriteActivity : public CHThorActivityBase
+{
+    IHThorDictionaryWorkUnitWriteArg &helper;
+
+public:
+    IMPLEMENT_SINKACTIVITY;
+
+    CHThorDictionaryWorkUnitWriteActivity (IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorDictionaryWorkUnitWriteArg &_arg, ThorActivityKind _kind);
+    virtual void execute();
+    virtual bool needsAllocator() const { return true; }
+};
+
 class CHThorCountActivity : public CHThorActivityBase
 {
     IHThorCountArg &helper;

+ 22 - 0
rtl/eclrtl/rtlds.cpp

@@ -950,6 +950,28 @@ extern ECLRTL_API size32_t rtlSerializeToBuilder(ARowBuilder & builder, IOutputR
     return target.length();
 }
 
+extern ECLRTL_API void rtlDictionary2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
+{
+    RtlLinkedDatasetBuilder builder(rowAllocator);
+    Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
+    CThorStreamDeserializerSource source(stream);
+
+    byte nullsPending = 0;
+    byte rowsPending = 0;
+    while (!source.finishedNested(lenSrc))
+    {
+        source.read(1, &rowsPending);
+        for (int i = 0; i < rowsPending; i++)
+            builder.deserializeRow(*deserializer, source);
+        source.read(1, &nullsPending);
+        for (int i = 0; i < nullsPending; i++)
+            builder.append(NULL);
+    }
+
+    count = builder.getcount();
+    rowset = builder.linkrows();
+}
+
 //---------------------------------------------------------------------------
 
 RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)

+ 2 - 0
rtl/eclrtl/rtlds_imp.hpp

@@ -455,6 +455,8 @@ extern ECLRTL_API void rtlGroupedDataset2RowsetX(size32_t & count, byte * * & ro
 extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows);
 extern ECLRTL_API void rtlGroupedRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows);
 
+extern ECLRTL_API void rtlDictionary2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src);
+
 extern ECLRTL_API size32_t rtlDeserializeRow(size32_t lenOut, void * out, IOutputRowDeserializer * deserializer, const void * src);
 extern ECLRTL_API size32_t rtlSerializeRow(size32_t lenOut, void * out, IOutputRowSerializer * serializer, const void * src);
 extern ECLRTL_API size32_t rtlDeserializeToBuilder(ARowBuilder & builder, IOutputRowDeserializer * deserializer, const void * src);