|
@@ -182,8 +182,8 @@ void doDescheduleWorkkunit(char const * wuid)
|
|
|
* Graph progress support
|
|
|
*/
|
|
|
|
|
|
-CWuGraphStats::CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id)
|
|
|
- : progress(_progress), creatorType(_creatorType), creator(_creator), id(_id)
|
|
|
+CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _creator, unsigned wfid, const char * _rootScope, unsigned _id, bool _merge)
|
|
|
+ : creatorType(_creatorType), creator(_creator), id(_id), merge(_merge)
|
|
|
{
|
|
|
StatsScopeId graphScopeId;
|
|
|
verifyex(graphScopeId.setScopeText(_rootScope));
|
|
@@ -195,8 +195,25 @@ CWuGraphStats::CWuGraphStats(IPropertyTree *_progress, StatisticCreatorType _cre
|
|
|
|
|
|
void CWuGraphStats::beforeDispose()
|
|
|
{
|
|
|
- Owned<IStatisticCollection> stats = collector->getResult();
|
|
|
+ collector->endScope();
|
|
|
+ StringBuffer tag;
|
|
|
+ tag.append("sg").append(id);
|
|
|
|
|
|
+ IPropertyTree &progress = queryProgressTree();
|
|
|
+ if (merge && progress.hasProp(tag))
|
|
|
+ {
|
|
|
+ VStringBuffer statsPath("%s/Stats", tag.str());
|
|
|
+ MemoryBuffer compressed;
|
|
|
+ progress.getPropBin(statsPath, compressed);
|
|
|
+ if (compressed.length())
|
|
|
+ {
|
|
|
+ MemoryBuffer serialized;
|
|
|
+ decompressToBuffer(serialized, compressed);
|
|
|
+ Owned<IStatisticCollection> prevCollection = createStatisticCollection(serialized);
|
|
|
+ prevCollection->mergeInto(*collector);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Owned<IStatisticCollection> stats = collector->getResult();
|
|
|
MemoryBuffer compressed;
|
|
|
{
|
|
|
MemoryBuffer serialized;
|
|
@@ -208,18 +225,15 @@ void CWuGraphStats::beforeDispose()
|
|
|
unsigned maxActivity = 0;
|
|
|
stats->getMinMaxActivity(minActivity, maxActivity);
|
|
|
|
|
|
- StringBuffer tag;
|
|
|
- tag.append("sg").append(id);
|
|
|
-
|
|
|
//Replace the particular subgraph statistics added by this creator
|
|
|
- IPropertyTree * subgraph = progress->setPropTree(tag);
|
|
|
+ IPropertyTree * subgraph = progress.setPropTree(tag);
|
|
|
subgraph->setProp("@c", queryCreatorTypeName(creatorType));
|
|
|
subgraph->setProp("@creator", creator);
|
|
|
subgraph->setPropInt("@minActivity", minActivity);
|
|
|
subgraph->setPropInt("@maxActivity", maxActivity);
|
|
|
subgraph->setPropBin("Stats", compressed.length(), compressed.toByteArray());
|
|
|
- if (!progress->getPropBool("@stats", false))
|
|
|
- progress->setPropBool("@stats", true);
|
|
|
+ if (!progress.getPropBool("@stats", false))
|
|
|
+ progress.setPropBool("@stats", true);
|
|
|
}
|
|
|
|
|
|
IStatisticGatherer & CWuGraphStats::queryStatsBuilder()
|
|
@@ -237,10 +251,10 @@ public:
|
|
|
progress.setown(createPTree());
|
|
|
formatVersion = progress->getPropInt("@format", PROGRESS_FORMAT_V);
|
|
|
}
|
|
|
- virtual IPropertyTree * getProgressTree()
|
|
|
+ virtual IPropertyTree * getProgressTree(bool doFormat) override
|
|
|
{
|
|
|
if (progress->getPropBool("@stats"))
|
|
|
- return createProcessTreeFromStats(); // Should we cache that?
|
|
|
+ return createProcessTreeFromStats(doFormat); // Should we cache that?
|
|
|
return LINK(progress);
|
|
|
}
|
|
|
virtual unsigned queryFormatVersion()
|
|
@@ -253,7 +267,7 @@ protected:
|
|
|
{
|
|
|
formatVersion = PROGRESS_FORMAT_V;
|
|
|
}
|
|
|
- static void expandStats(IPropertyTree * target, IStatisticCollection & collection)
|
|
|
+ static void expandStats(IPropertyTree * target, IStatisticCollection & collection, bool doFormat)
|
|
|
{
|
|
|
StringBuffer formattedValue;
|
|
|
unsigned numStats = collection.getNumStatistics();
|
|
@@ -262,13 +276,20 @@ protected:
|
|
|
StatisticKind kind;
|
|
|
unsigned __int64 value;
|
|
|
collection.getStatistic(kind, value, i);
|
|
|
- formatStatistic(formattedValue.clear(), value, kind);
|
|
|
- target->setProp(queryTreeTag(kind), formattedValue);
|
|
|
+ if (doFormat)
|
|
|
+ {
|
|
|
+ formatStatistic(formattedValue.clear(), value, kind);
|
|
|
+ target->setProp(queryTreeTag(kind), formattedValue);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ target->setPropInt64(queryTreeTag(kind), value);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- void expandProcessTreeFromStats(IPropertyTree * rootTarget, IPropertyTree * target, IStatisticCollection * collection)
|
|
|
+ void expandProcessTreeFromStats(IPropertyTree * rootTarget, IPropertyTree * target, IStatisticCollection * collection, bool doFormat)
|
|
|
{
|
|
|
- expandStats(target, *collection);
|
|
|
+ expandStats(target, *collection, doFormat);
|
|
|
|
|
|
StringBuffer scopeName;
|
|
|
Owned<IStatisticCollectionIterator> activityIter = &collection->getScopes(NULL, false);
|
|
@@ -299,9 +320,11 @@ protected:
|
|
|
case SSTworkflow:
|
|
|
case SSTgraph:
|
|
|
// SSTworkflow and SSTgraph may be safely ignored. They are not required to produce the statistics.
|
|
|
- expandProcessTreeFromStats(rootTarget, target, &cur);
|
|
|
+ expandProcessTreeFromStats(rootTarget, target, &cur, doFormat);
|
|
|
continue;
|
|
|
case SSTfunction:
|
|
|
+ case SSTchannel:
|
|
|
+ case SSTglobal:
|
|
|
//MORE:Should function scopes be included in the graph scope somehow, and if so how?
|
|
|
continue;
|
|
|
default:
|
|
@@ -310,11 +333,11 @@ protected:
|
|
|
|
|
|
IPropertyTree * next = curTarget->addPropTree(tag);
|
|
|
next->setProp("@id", id);
|
|
|
- expandProcessTreeFromStats(rootTarget, next, &cur);
|
|
|
+ expandProcessTreeFromStats(rootTarget, next, &cur, doFormat);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- IPropertyTree * createProcessTreeFromStats()
|
|
|
+ IPropertyTree * createProcessTreeFromStats(bool doFormat)
|
|
|
{
|
|
|
MemoryBuffer compressed;
|
|
|
MemoryBuffer serialized;
|
|
@@ -330,7 +353,7 @@ protected:
|
|
|
decompressToBuffer(serialized.clear(), compressed);
|
|
|
Owned<IStatisticCollection> collection = createStatisticCollection(serialized);
|
|
|
|
|
|
- expandProcessTreeFromStats(progressTree, progressTree, collection);
|
|
|
+ expandProcessTreeFromStats(progressTree, progressTree, collection, doFormat);
|
|
|
}
|
|
|
}
|
|
|
return progressTree.getClear();
|
|
@@ -1608,7 +1631,7 @@ private:
|
|
|
{
|
|
|
//Walk dependencies - should possibly have a different SST e.g., SSTdependency since they do not
|
|
|
//share many characteristics with edges - e.g. no flowing records => few/no stats.
|
|
|
- curGraph.setown(graphIter->query().getXGMMLTree(false));
|
|
|
+ curGraph.setown(graphIter->query().getXGMMLTree(false, false));
|
|
|
Owned<IPropertyTreeIterator> treeIter = curGraph->getElements("edge");
|
|
|
if (treeIter && treeIter->first())
|
|
|
{
|
|
@@ -3638,15 +3661,42 @@ extern IConstWorkUnitInfo *createConstWorkUnitInfo(IPropertyTree &p)
|
|
|
return new CLightweightWorkunitInfo(p);
|
|
|
}
|
|
|
|
|
|
+class CDaliWorkUnit;
|
|
|
class CDaliWuGraphStats : public CWuGraphStats
|
|
|
{
|
|
|
public:
|
|
|
- CDaliWuGraphStats(IRemoteConnection *_conn, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id)
|
|
|
- : CWuGraphStats(LINK(_conn->queryRoot()), _creatorType, _creator, _wfid, _rootScope, _id), conn(_conn)
|
|
|
+ CDaliWuGraphStats(const CDaliWorkUnit* _owner, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
|
|
|
+ : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), owner(_owner), graphName(_rootScope), wfid(_wfid)
|
|
|
{
|
|
|
}
|
|
|
protected:
|
|
|
+ virtual IPropertyTree &queryProgressTree() override;
|
|
|
+ const CDaliWorkUnit *owner;
|
|
|
Owned<IRemoteConnection> conn;
|
|
|
+ StringAttr graphName;
|
|
|
+ unsigned wfid;
|
|
|
+};
|
|
|
+
|
|
|
+class CLocalWuGraphStats : public CWuGraphStats
|
|
|
+{
|
|
|
+public:
|
|
|
+ CLocalWuGraphStats(IPropertyTree *_p, StatisticCreatorType _creatorType, const char * _creator, unsigned _wfid, const char * _rootScope, unsigned _id, bool _merge)
|
|
|
+ : CWuGraphStats(_creatorType, _creator, _wfid, _rootScope, _id, _merge), graphName(_rootScope), p(_p)
|
|
|
+ {
|
|
|
+ }
|
|
|
+protected:
|
|
|
+ virtual IPropertyTree &queryProgressTree() override
|
|
|
+ {
|
|
|
+ IPropertyTree *progress = p->queryPropTree("GraphProgress");
|
|
|
+ if (!progress)
|
|
|
+ progress = p->addPropTree("GraphProgress");
|
|
|
+ IPropertyTree *graph = progress->queryPropTree(graphName);
|
|
|
+ if (!graph)
|
|
|
+ graph = progress->addPropTree(graphName);
|
|
|
+ return *graph;
|
|
|
+ }
|
|
|
+ StringAttr graphName;
|
|
|
+ Owned<IPropertyTree> p;
|
|
|
};
|
|
|
|
|
|
CWorkUnitWatcher::CWorkUnitWatcher(IWorkUnitSubscriber *_subscriber, WUSubscribeOptions flags, const char *wuid) : subscriber(_subscriber)
|
|
@@ -3751,6 +3801,7 @@ bool CPersistedWorkUnit::aborting() const
|
|
|
|
|
|
class CDaliWorkUnit : public CPersistedWorkUnit
|
|
|
{
|
|
|
+ friend class CDaliWuGraphStats;
|
|
|
public:
|
|
|
CDaliWorkUnit(IRemoteConnection *_conn, ISecManager *secmgr, ISecUser *secuser)
|
|
|
: connection(_conn), CPersistedWorkUnit(secmgr, secuser)
|
|
@@ -3934,9 +3985,9 @@ public:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override
|
|
|
+ virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override
|
|
|
{
|
|
|
- return new CDaliWuGraphStats(getWritableProgressConnection(graphName, _wfid), creatorType, creator, _wfid, graphName, subgraph);
|
|
|
+ return new CDaliWuGraphStats(this, creatorType, creator, _wfid, graphName, subgraph, merge);
|
|
|
}
|
|
|
virtual void import(IPropertyTree *wuTree, IPropertyTree *graphProgressTree)
|
|
|
{
|
|
@@ -4031,7 +4082,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- virtual IConstWorkUnit * unlock()
|
|
|
+ virtual IConstWorkUnit * unlock() override
|
|
|
{
|
|
|
c->unlockRemote();
|
|
|
return c.getClear();
|
|
@@ -4246,8 +4297,8 @@ public:
|
|
|
{ c->setGraphState(graphName, wfid, state); }
|
|
|
virtual void setNodeState(const char *graphName, WUGraphIDType nodeId, WUGraphState state) const
|
|
|
{ c->setNodeState(graphName, nodeId, state); }
|
|
|
- virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const override
|
|
|
- { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph); }
|
|
|
+ virtual IWUGraphStats *updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const override
|
|
|
+ { return c->updateStats(graphName, creatorType, creator, _wfid, subgraph, merge); }
|
|
|
virtual void clearGraphProgress() const
|
|
|
{ c->clearGraphProgress(); }
|
|
|
virtual IStringVal & getAbortBy(IStringVal & str) const
|
|
@@ -4440,6 +4491,12 @@ public:
|
|
|
{ c->setResultDataset(name, sequence, len, val, numRows, extend); }
|
|
|
};
|
|
|
|
|
|
+IPropertyTree &CDaliWuGraphStats::queryProgressTree()
|
|
|
+{
|
|
|
+ conn.setown(owner->getWritableProgressConnection(graphName, wfid));
|
|
|
+ return *conn->queryRoot();
|
|
|
+}
|
|
|
+
|
|
|
class CLocalWUAssociated : implements IConstWUAssociatedFile, public CInterface
|
|
|
{
|
|
|
Owned<IPropertyTree> p;
|
|
@@ -7009,6 +7066,12 @@ IWorkUnit& CLocalWorkUnit::lock()
|
|
|
return lockRemote(true);
|
|
|
}
|
|
|
|
|
|
+IConstWorkUnit *CLocalWorkUnit::unlock()
|
|
|
+{
|
|
|
+ unlockRemote();
|
|
|
+ return this;
|
|
|
+}
|
|
|
+
|
|
|
const char *CLocalWorkUnit::queryWuid() const
|
|
|
{
|
|
|
CriticalBlock block(crit);
|
|
@@ -9468,7 +9531,7 @@ IPropertyTree *CLocalWorkUnit::getUnpackedTree(bool includeProgress) const
|
|
|
ForEach(*graphIter)
|
|
|
{
|
|
|
IConstWUGraph &graph = graphIter->query();
|
|
|
- Owned<IPropertyTree> graphTree = graph.getXGMMLTree(includeProgress);
|
|
|
+ Owned<IPropertyTree> graphTree = graph.getXGMMLTree(includeProgress, false);
|
|
|
SCMStringBuffer gName;
|
|
|
graph.getName(gName);
|
|
|
StringBuffer xpath("Graphs/Graph[@name=\"");
|
|
@@ -9554,7 +9617,7 @@ IStringVal& CLocalWUGraph::getLabel(IStringVal &str) const
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- Owned<IPropertyTree> xgmml = getXGMMLTree(false);
|
|
|
+ Owned<IPropertyTree> xgmml = getXGMMLTree(false, false);
|
|
|
str.set(xgmml->queryProp("@label"));
|
|
|
return str;
|
|
|
}
|
|
@@ -9566,9 +9629,9 @@ WUGraphState CLocalWUGraph::getState() const
|
|
|
}
|
|
|
|
|
|
|
|
|
-IStringVal& CLocalWUGraph::getXGMML(IStringVal &str, bool mergeProgress) const
|
|
|
+IStringVal& CLocalWUGraph::getXGMML(IStringVal &str, bool mergeProgress, bool doFormatStats) const
|
|
|
{
|
|
|
- Owned<IPropertyTree> xgmml = getXGMMLTree(mergeProgress);
|
|
|
+ Owned<IPropertyTree> xgmml = getXGMMLTree(mergeProgress, doFormatStats);
|
|
|
if (xgmml)
|
|
|
{
|
|
|
StringBuffer x;
|
|
@@ -9814,17 +9877,16 @@ void CLocalWorkUnit::createGraph(const char * name, const char *label, WUGraphTy
|
|
|
graphs.append(*q);
|
|
|
}
|
|
|
|
|
|
-IConstWUGraphProgress *CLocalWorkUnit::getGraphProgress(const char *name) const
|
|
|
+IConstWUGraphProgress *CLocalWorkUnit::getGraphProgress(const char *graphName) const
|
|
|
{
|
|
|
-/* Owned<IRemoteConnection> conn = getProgressConnection();
|
|
|
- if (conn)
|
|
|
+ IPTree *graphProgress = p->queryPropTree("GraphProgress");
|
|
|
+ if (graphProgress)
|
|
|
{
|
|
|
- IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
|
|
|
+ IPTree *progress = graphProgress->queryPropTree(graphName);
|
|
|
if (progress)
|
|
|
- return new CConstGraphProgress(p->queryName(), graphName, progress);
|
|
|
+ return new CConstGraphProgress(queryWuid(), graphName, progress);
|
|
|
}
|
|
|
- */
|
|
|
- return NULL;
|
|
|
+ return nullptr;
|
|
|
}
|
|
|
WUGraphState CLocalWorkUnit::queryGraphState(const char *graphName) const
|
|
|
{
|
|
@@ -9842,9 +9904,9 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W
|
|
|
{
|
|
|
throwUnexpected(); // Should only be used for persisted workunits
|
|
|
}
|
|
|
-IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph) const
|
|
|
+IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned _wfid, unsigned subgraph, bool merge) const
|
|
|
{
|
|
|
- return new CWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph);
|
|
|
+ return new CLocalWuGraphStats(LINK(p), creatorType, creator, _wfid, graphName, subgraph, merge);
|
|
|
}
|
|
|
|
|
|
void CLocalWUGraph::setName(const char *str)
|
|
@@ -9961,7 +10023,7 @@ IPropertyTree * CLocalWUGraph::getXGMMLTreeRaw() const
|
|
|
return p->getPropTree("xgmml");
|
|
|
}
|
|
|
|
|
|
-IPropertyTree * CLocalWUGraph::getXGMMLTree(bool doMergeProgress) const
|
|
|
+IPropertyTree * CLocalWUGraph::getXGMMLTree(bool doMergeProgress, bool doFormatStats) const
|
|
|
{
|
|
|
if (!graph)
|
|
|
{
|
|
@@ -9985,7 +10047,7 @@ IPropertyTree * CLocalWUGraph::getXGMMLTree(bool doMergeProgress) const
|
|
|
{
|
|
|
//MORE: Eventually this should directly access the new stats structure
|
|
|
unsigned progressV = progress->queryFormatVersion();
|
|
|
- Owned<IPropertyTree> progressTree = progress->getProgressTree();
|
|
|
+ Owned<IPropertyTree> progressTree = progress->getProgressTree(doFormatStats);
|
|
|
Owned<IPropertyTreeIterator> nodeIterator = copy->getElements("node");
|
|
|
ForEach (*nodeIterator)
|
|
|
mergeProgress(nodeIterator->query(), *progressTree, progressV);
|
|
@@ -11692,6 +11754,14 @@ extern WORKUNIT_API ILocalWorkUnit * createLocalWorkUnit(const char *xml)
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+extern WORKUNIT_API ILocalWorkUnit * createLocalWorkUnitFromPTree(IPropertyTree *ptree)
|
|
|
+{
|
|
|
+ Owned<CLocalWorkUnit> cw = new CLocalWorkUnit((ISecManager *) NULL, NULL);
|
|
|
+ cw->loadPTree(ptree);
|
|
|
+ ILocalWorkUnit* ret = QUERYINTERFACE(&cw->lockRemote(false), ILocalWorkUnit);
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
void exportWorkUnitToXMLWithHiddenPasswords(IPropertyTree *p, IIOStream &out, unsigned extraXmlFlags)
|
|
|
{
|
|
|
const char *name = p->queryName();
|
|
@@ -13511,6 +13581,11 @@ public:
|
|
|
StatsScopeId scopeId(SSTchildgraph, id);
|
|
|
beginScope(scopeId);
|
|
|
}
|
|
|
+ virtual void beginChannelScope(unsigned id)
|
|
|
+ {
|
|
|
+ StatsScopeId scopeId(SSTchannel, id);
|
|
|
+ beginScope(scopeId);
|
|
|
+ }
|
|
|
virtual void endScope()
|
|
|
{
|
|
|
scope.setLength(prevLenStack.popGet());
|