Selaa lähdekoodia

Merge pull request #11097 from ghalliday/issue19455

HPCC-19455 Introduce TAKspillread and TAKspillwrite to aid graph viewer

Reviewed By: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 vuotta sitten
vanhempi
commit
fd8033ec00

+ 2 - 0
common/thorhelper/commonext.cpp

@@ -208,6 +208,8 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     kindArray[TAKsmartdenormalizegroup] = "smartdenormalizegroup";
     kindArray[TAKselfdenormalize] = "selfdenormalize";
     kindArray[TAKselfdenormalizegroup] = "selfdenormalizegroup";
+    kindArray[TAKspillread] = "spillread" ;
+    kindArray[TAKspillwrite] = "spillwrite" ;
 
 //Non standard
     kindArray[TAKsubgraph] = "subgraph";

+ 4 - 0
common/thorhelper/thorcommon.cpp

@@ -797,6 +797,8 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKselfdenormalizegroup:   return "Self Denormalize Group";
     case TAKtrace:                  return "Trace";
     case TAKquantile:               return "Quantile";
+    case TAKspillread:              return "Spill Read";
+    case TAKspillwrite:             return "Spill Write";
     }
     throwUnexpected();
 }
@@ -843,6 +845,7 @@ extern bool isActivitySource(ThorActivityKind kind)
     case TAKindexgroupcount:
     case TAKstreamediterator:
     case TAKexternalsource:
+    case TAKspillread:
         return true;
     }
     return false;
@@ -879,6 +882,7 @@ extern bool isActivitySink(ThorActivityKind kind)
     case TAKwhen_action:
     case TAKdictionaryworkunitwrite:
     case TAKdictionaryresultwrite:
+    case TAKspillwrite:
         return true;
     }
     return false;

+ 1 - 1
common/workunit/referencedfilelist.cpp

@@ -651,7 +651,7 @@ bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackag
 
             ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
             //not likely to be part of roxie queries, but for forward compatibility:
-            if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
+            if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
                 continue;
             if (node.getPropBool("att[@name='_isSpill']/@value") ||
                 node.getPropBool("att[@name='_isTransformSpill']/@value"))

+ 5 - 1
ecl/eclagent/eclgraph.cpp

@@ -46,7 +46,8 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
 {
     switch (kind)
     {
-    case TAKdiskwrite: 
+    case TAKdiskwrite:
+    case TAKspillwrite:
         return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind);
     case TAKsort: 
         return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind);
@@ -231,6 +232,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKchildthroughnormalize:
         return createChildThroughNormalizeActivity(agent, activityId, subgraphId, (IHThorChildThroughNormalizeArg &)arg, kind);
     case TAKdiskread:
+    case TAKspillread:
         return createDiskReadActivity(agent, activityId, subgraphId, (IHThorDiskReadArg &)arg, kind);
     case TAKdisknormalize:
         return createDiskNormalizeActivity(agent, activityId, subgraphId, (IHThorDiskNormalizeArg &)arg, kind);
@@ -377,6 +379,7 @@ bool EclGraphElement::alreadyUpToDate(IAgentContext & agent)
     case TAKcsvwrite:
     case TAKxmlwrite:
     case TAKjsonwrite:
+    case TAKspillwrite:
         {
             IHThorDiskWriteArg * helper = static_cast<IHThorDiskWriteArg *>(arg.get());
             filename.set(helper->getFileName());
@@ -585,6 +588,7 @@ bool EclGraphElement::prepare(IAgentContext & agent, const byte * parentExtract,
         case TAKcsvwrite:
         case TAKxmlwrite:
         case TAKjsonwrite:
+        case TAKspillwrite:
             alreadyUpdated = alreadyUpToDate(agent);
             if (alreadyUpdated)
                 return false;

+ 5 - 8
ecl/hqlcpp/hqlhtcpp.cpp

@@ -10788,6 +10788,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
         kind = (isJson) ? TAKjsonwrite : TAKxmlwrite;
         activityArgName = "XmlWrite";
     }
+    else if (expr->hasAttribute(_spill_Atom))
+        kind = TAKspillwrite;
 
     bool useImplementationClass = options.minimizeActivityClasses && targetRoxie() && expr->hasAttribute(_spill_Atom);
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, kind, expr, activityArgName);
@@ -10801,15 +10803,10 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
         StringBuffer s;
         s.append(getActivityText(kind));
         s.append("\n");
-        if (expr->hasAttribute(_spill_Atom))
-            s.append("Spill File");
-        else
-        {
-            if (expr->hasAttribute(_workflowPersist_Atom))
-                s.append("Persist ");
+        if (expr->hasAttribute(_workflowPersist_Atom))
+            s.append("Persist ");
 
-            filename->toString(s);
-        }
+        filename->toString(s);
         instance->graphLabel.set(s.str());
     }
 

+ 9 - 3
ecl/hqlcpp/hqlsource.cpp

@@ -1878,6 +1878,7 @@ static bool expandGraphLabel(ThorActivityKind kind)
     case TAKjsonread:
     case TAKdiskcount:
     case TAKdiskexists:
+    case TAKspillread:
         return true;
     default:
         return false;
@@ -1891,13 +1892,15 @@ ABoundActivity * SourceBuilder::buildActivity(BuildCtx & ctx, IHqlExpression * e
     translator.gatherActiveCursors(ctx, parentCursors);
 
     bool isSpill = tableExpr && tableExpr->hasAttribute(_spill_Atom);
+    if (isSpill && (activityKind == TAKdiskread))
+        activityKind = TAKspillread;
     useImplementationClass = translator.queryOptions().minimizeActivityClasses && translator.targetRoxie() && isSpill;
 
     Owned<ActivityInstance> localInstance = new ActivityInstance(translator, ctx, activityKind, expr, kind);
     if (useImplementationClass)
         localInstance->setImplementationClass(newMemorySpillReadArgId);
 
-    if ((activityKind >= TAKdiskread) && (activityKind <= TAKdiskgroupaggregate))
+    if (((activityKind >= TAKdiskread) && (activityKind <= TAKdiskgroupaggregate)) || (activityKind == TAKspillread))
     {
         IHqlExpression * seq = querySequence(tableExpr);
         translator.noteResultAccessed(ctx, seq, nameExpr);
@@ -2029,7 +2032,10 @@ ABoundActivity * SourceBuilder::buildActivity(BuildCtx & ctx, IHqlExpression * e
     if (nameExpr && nameExpr->queryValue())
     {
         if (isSpill)
-            graphLabel.append("\nSpill");
+        {
+            if (activityKind != TAKspillread)
+                graphLabel.append("\nSpill");
+        }
         else
         {
             graphLabel.newline();
@@ -3080,7 +3086,7 @@ void DiskReadBuilder::buildTransform(IHqlExpression * expr)
     }
 
     MemberFunction func(translator, instance->startctx);
-    if (instance->kind == TAKdiskread)
+    if ((instance->kind == TAKdiskread) || (instance->kind == TAKspillread))
         func.start("virtual size32_t transform(ARowBuilder & crSelf, const void * _left) override");
     else
         func.start("virtual size32_t transform(ARowBuilder & crSelf, const void * _left, IFilePositionProvider * fpp) override");

+ 1 - 1
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -208,7 +208,7 @@ bool copyWULogicalFiles(IEspContext &context, IConstWorkUnit &cw, const char *cl
                 IPropertyTree &node = iter->query();
                 ThorActivityKind kind = (ThorActivityKind) node.getPropInt("att[@name='_kind']/@value", TAKnone);
 
-                if(kind==TAKdiskwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
+                if(kind==TAKdiskwrite || kind==TAKspillwrite || kind==TAKindexwrite || kind==TAKcsvwrite || kind==TAKxmlwrite || kind==TAKjsonwrite)
                     continue;
                 if (node.getPropBool("att[@name='_isSpill']/@value") || node.getPropBool("att[@name='_isTransformSpill']/@value"))
                     continue;

+ 1 - 1
roxie/ccd/ccdactivities.cpp

@@ -4642,7 +4642,7 @@ public:
         try  // operations does not want any missing file errors to be fatal, or throw traps - just log it
         {
             ThorActivityKind kind = getActivityKind(_graphNode);
-            if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
+            if (kind != TAKdiskwrite && kind != TAKspillwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
             {
                 const char *fileName = queryNodeFileName(_graphNode, kind);
                 const char *indexName = queryNodeIndexName(_graphNode, kind);

+ 3 - 1
roxie/ccd/ccdquery.cpp

@@ -593,6 +593,7 @@ protected:
             else
                 return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
         }
+        case TAKspillread:
         case TAKmemoryspillread:
             return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKdisknormalize:
@@ -613,6 +614,7 @@ protected:
         case TAKxmlwrite:
         case TAKjsonwrite:
         case TAKmemoryspillwrite:
+        case TAKspillwrite:
             return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKindexwrite:
             return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
@@ -1140,7 +1142,7 @@ public:
                             {
                                 IPropertyTree &node = nodes->query();
                                 ThorActivityKind kind = getActivityKind(node);
-                                if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
+                                if (kind != TAKdiskwrite && kind != TAKspillwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
                                 {
                                     const char *fileName = queryNodeFileName(node, kind);
                                     const char *indexName = queryNodeIndexName(node, kind);

+ 4 - 3
roxie/ccd/ccdserver.cpp

@@ -11475,7 +11475,7 @@ public:
 
     virtual void addDependency(unsigned source, ThorActivityKind sourceKind, unsigned sourceIdx, int controlId, const char *edgeId)
     {
-        if (sourceKind==TAKspill || sourceKind==TAKdiskwrite) // Bit of a hack - codegen probably should differentiate
+        if (sourceKind==TAKspill || sourceKind==TAKdiskwrite || sourceKind==TAKspillwrite) // Bit of a hack - codegen probably should differentiate
             setInput(0, source, sourceIdx);
         else
             CRoxieServerActivityFactory::addDependency(source, kind, sourceIdx, controlId, edgeId);
@@ -11972,7 +11972,7 @@ public:
         Owned<IHThorDiskWriteArg> helper = (IHThorDiskWriteArg *) helperFactory();
         isTemp = (helper->getFlags() & TDXtemporary) != 0;
         setNumOutputs(helper->getTempUsageCount());
-        if (_kind!=TAKdiskwrite)
+        if (_kind != TAKspillwrite)
             assertex(numOutputs == 0);
     }
 
@@ -22688,6 +22688,7 @@ public:
         case TAKjsonread:
             return new CRoxieServerXmlReadActivity(_ctx, this, _probeManager, remoteId, numParts, isLocal, sorted, maySkip, manager, translators);
         case TAKdiskread:
+        case TAKspillread:
             return new CRoxieServerDiskReadActivity(_ctx, this, _probeManager, remoteId, numParts, isLocal, sorted, maySkip, manager, translators);
         case TAKdisknormalize:
             return new CRoxieServerDiskNormalizeActivity(_ctx, this, _probeManager, remoteId, numParts, isLocal, sorted, manager, translators);
@@ -24858,7 +24859,7 @@ public:
                 return;  // ignore 'spills'
             bool isLocal = _graphNode.getPropBool("att[@name='local']/@value") && queryFactory.queryChannel()!=0;
             ThorActivityKind kind = getActivityKind(_graphNode);
-            if (kind != TAKdiskwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
+            if (kind != TAKdiskwrite && kind != TAKspillwrite && kind != TAKindexwrite && kind != TAKpiperead && kind != TAKpipewrite)
             {
                 fileName.set(queryNodeFileName(_graphNode, kind));
                 indexName.set(queryNodeIndexName(_graphNode, kind));

+ 2 - 0
rtl/include/eclhelper.hpp

@@ -1033,6 +1033,8 @@ enum ThorActivityKind
     TAKtrace,
     TAKquantile,
     TAKjsonfetch,
+    TAKspillread,
+    TAKspillwrite,
 
     TAKlast
 };

+ 4 - 3
thorlcr/activities/thdiskbase.cpp

@@ -210,7 +210,8 @@ void CWriteMasterBase::init()
         if (blockCompressed)
             props.setPropBool("@blockCompressed", true);
         props.setProp("@kind", "flat");
-        if (TAKdiskwrite == container.getKind() && (0 != (diskHelperBase->getFlags() & TDXtemporary)) && container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
+        if (((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())) &&
+                (0 != (diskHelperBase->getFlags() & TDXtemporary)) && container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
         { // do early, because this will be local act. and will not come back to master until end of owning graph.
             publish();
         }
@@ -248,7 +249,7 @@ void CWriteMasterBase::publish()
             // create empty parts for a fileDesc being published that is larger than this clusters
             size32_t recordSize = 0;
             IOutputMetaData *diskRowMeta = diskHelperBase->queryDiskRecordSize()->querySerializedDiskMeta();
-            if (diskRowMeta->isFixedSize() && (TAKdiskwrite == container.getKind()))
+            if (diskRowMeta->isFixedSize() && ((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())))
             {
                 recordSize = diskRowMeta->getMinRecordSize();
                 if (0 != (diskHelperBase->getFlags() & TDXgrouped))
@@ -397,7 +398,7 @@ void CWriteMasterBase::done()
 {
     CMasterActivity::done();
     publish();
-    if (TAKdiskwrite == container.getKind() && (0 != (diskHelperBase->getFlags() & TDXtemporary)) && container.queryOwner().queryOwner()) // I am in a child query
+    if (((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())) && (0 != (diskHelperBase->getFlags() & TDXtemporary)) && container.queryOwner().queryOwner()) // I am in a child query
     {
         published = false;
         recordsProcessed = 0;

+ 1 - 1
thorlcr/activities/thdiskbaseslave.cpp

@@ -338,7 +338,7 @@ void CDiskWriteSlaveActivityBase::open()
      */
     size32_t diskRowMinSz = 0;
     IOutputMetaData *diskRowMeta = diskHelperBase->queryDiskRecordSize()->querySerializedDiskMeta();
-    if (diskRowMeta->isFixedSize() && (TAKdiskwrite == container.getKind()))
+    if (diskRowMeta->isFixedSize() && ((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())))
     {
         diskRowMinSz = diskRowMeta->getMinRecordSize();
         if (grouped)

+ 4 - 0
thorlcr/graph/thgraph.cpp

@@ -330,6 +330,7 @@ bool isDiskInput(ThorActivityKind kind)
         case TAKindexgroupaggregate:
         case TAKindexgroupexists:
         case TAKindexgroupcount:
+        case TAKspillread:
             return true;
         default:
             return false;
@@ -658,6 +659,7 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
                 case TAKcsvwrite:
                 case TAKxmlwrite:
                 case TAKjsonwrite:
+                case TAKspillwrite:
                     if (_shortCircuit) return true;
                     onCreate();
                     alreadyUpdated = checkUpdate();
@@ -836,6 +838,7 @@ bool isGlobalActivity(CGraphElementBase &container)
             unsigned flags = helper->getFlags();
             return (0 == (TDXtemporary & flags)); // global if not temporary
         }
+        case TAKspillwrite:
         case TAKspill:
             return false;
         case TAKcsvread:
@@ -952,6 +955,7 @@ bool isGlobalActivity(CGraphElementBase &container)
         case TAKchildgroupaggregate:
         case TAKchildthroughnormalize:
         case TAKchildnormalize:
+        case TAKspillread:
 
         case TAKindexread:
         case TAKindexnormalize:

+ 3 - 0
thorlcr/graph/thgraphmaster.cpp

@@ -581,6 +581,7 @@ bool CMasterGraphElement::checkUpdate()
         case TAKcsvwrite:
         case TAKxmlwrite:
         case TAKjsonwrite:
+        case TAKspillwrite:
         {
             IHThorDiskWriteArg *helper = (IHThorDiskWriteArg *)queryHelper();
             doCheckUpdate = 0 != (helper->getFlags() & TDWupdate);
@@ -588,6 +589,8 @@ bool CMasterGraphElement::checkUpdate()
             helper->getUpdateCRCs(eclCRC, totalCRC);
             if (TAKdiskwrite == getKind())
                 temporary = 0 != (helper->getFlags() & (TDXtemporary|TDXjobtemp));
+            else if (TAKspillwrite == getKind())
+                temporary = true;
             break;
         }
     }

+ 2 - 0
thorlcr/master/thactivitymaster.cpp

@@ -165,6 +165,7 @@ public:
                 break;
             case TAKdiskread:
             case TAKdisknormalize:
+            case TAKspillread:
                 ret = createDiskReadActivityMaster(this);
                 break;
             case TAKdiskaggregate:
@@ -194,6 +195,7 @@ public:
                 ret = createIndexGroupAggregateActivityMaster(this);
                 break;
             case TAKdiskwrite:
+            case TAKspillwrite:
                 ret = createDiskWriteActivityMaster(this);
                 break;
             case TAKcsvwrite:

+ 2 - 0
thorlcr/slave/slave.cpp

@@ -309,6 +309,7 @@ public:
         switch (kind)
         {
             case TAKdiskread:
+            case TAKspillread:
                 ret = createDiskReadSlave(this);
                 break;
             case TAKdisknormalize:
@@ -356,6 +357,7 @@ public:
                 ret = createSpillSlave(this);
                 break;
             case TAKdiskwrite:
+            case TAKspillwrite:
                 ret = createDiskWriteSlave(this);
                 break;
             case TAKsort: