|
@@ -308,6 +308,8 @@ QueryOptions::QueryOptions()
|
|
|
allSortsMaySpill = false; // No global default for this
|
|
|
failOnLeaks = false;
|
|
|
collectFactoryStatistics = defaultCollectFactoryStatistics;
|
|
|
+ parallelWorkflow = false;
|
|
|
+ numWorkflowThreads = 1;
|
|
|
}
|
|
|
|
|
|
QueryOptions::QueryOptions(const QueryOptions &other)
|
|
@@ -340,6 +342,9 @@ QueryOptions::QueryOptions(const QueryOptions &other)
|
|
|
allSortsMaySpill = other.allSortsMaySpill;
|
|
|
failOnLeaks = other.failOnLeaks;
|
|
|
collectFactoryStatistics = other.collectFactoryStatistics;
|
|
|
+
|
|
|
+ parallelWorkflow = other.parallelWorkflow;
|
|
|
+ numWorkflowThreads = other.numWorkflowThreads;
|
|
|
}
|
|
|
|
|
|
void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
|
|
@@ -382,6 +387,9 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat
|
|
|
updateFromWorkUnit(failOnLeaks, wu, "failOnLeaks");
|
|
|
updateFromWorkUnit(noSeekBuildIndex, wu, "noSeekBuildIndex");
|
|
|
updateFromWorkUnit(collectFactoryStatistics, wu, "collectFactoryStatistics");
|
|
|
+
|
|
|
+ updateFromWorkUnit(parallelWorkflow, wu, "parallelWorkflow");
|
|
|
+ updateFromWorkUnit(numWorkflowThreads, wu, "numWorkflowthreads");
|
|
|
}
|
|
|
|
|
|
void QueryOptions::updateFromWorkUnitM(memsize_t &value, IConstWorkUnit &wu, const char *name)
|
|
@@ -442,6 +450,9 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
|
|
|
// Note: allSortsMaySpill is not permitted at context level (too late anyway, unless I refactored)
|
|
|
updateFromContext(failOnLeaks, ctx, "@failOnLeaks", "_FailOnLeaks");
|
|
|
updateFromContext(collectFactoryStatistics, ctx, "@collectFactoryStatistics", "_CollectFactoryStatistics");
|
|
|
+
|
|
|
+ updateFromContext(parallelWorkflow, ctx, "@parallelWorkflow", "_parallelWorkflow");
|
|
|
+ updateFromContext(numWorkflowThreads, ctx, "@numWorkflowThreads", "_numWorkflowThreads");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -897,13 +908,13 @@ protected:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const
|
|
|
+ virtual IRoxieServerActivityFactory *getRoxieServerActivityFactory(unsigned id) const override
|
|
|
{
|
|
|
checkSuspended();
|
|
|
return LINK(QUERYINTERFACE(findActivity(id), IRoxieServerActivityFactory));
|
|
|
}
|
|
|
|
|
|
- virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const
|
|
|
+ virtual ISlaveActivityFactory *getSlaveActivityFactory(unsigned id) const override
|
|
|
{
|
|
|
checkSuspended();
|
|
|
IActivityFactory *f = findActivity(id);
|
|
@@ -913,7 +924,7 @@ protected:
|
|
|
ActivityArray *loadChildGraph(IPropertyTree &graph)
|
|
|
{
|
|
|
// MORE - this is starting to look very much like loadGraph (on Roxie server side)
|
|
|
- ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"));
|
|
|
+ ActivityArray *activities = new ActivityArray(true, graph.getPropBool("@delayed"), graph.getPropBool("@library"), graph.getPropBool("@sequential"), graph.getPropBool("@wfid"));
|
|
|
unsigned subgraphId = graph.getPropInt("@id");
|
|
|
try
|
|
|
{
|
|
@@ -1101,12 +1112,12 @@ public:
|
|
|
package.Release();
|
|
|
}
|
|
|
|
|
|
- virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const
|
|
|
+ virtual IQueryFactory *lookupLibrary(const char *libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const override
|
|
|
{
|
|
|
return globalPackageSetManager->lookupLibrary(libraryName, expectedInterfaceHash, logctx);
|
|
|
}
|
|
|
|
|
|
- virtual void beforeDispose()
|
|
|
+ virtual void beforeDispose() override
|
|
|
{
|
|
|
// NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table.
|
|
|
// So only remove from hash table if what we find there matches the item that is being deleted.
|
|
@@ -1263,28 +1274,28 @@ public:
|
|
|
queryMap.setValue(hv, this);
|
|
|
}
|
|
|
|
|
|
- virtual unsigned queryChannel() const
|
|
|
+ virtual unsigned queryChannel() const override
|
|
|
{
|
|
|
return channelNo;
|
|
|
}
|
|
|
|
|
|
- virtual hash64_t queryHash() const
|
|
|
+ virtual hash64_t queryHash() const override
|
|
|
{
|
|
|
return hashValue;
|
|
|
}
|
|
|
|
|
|
- virtual ISharedOnceContext *querySharedOnceContext() const
|
|
|
+ virtual ISharedOnceContext *querySharedOnceContext() const override
|
|
|
{
|
|
|
return sharedOnceContext;
|
|
|
}
|
|
|
|
|
|
- virtual IDeserializedResultStore &queryOnceResultStore() const
|
|
|
+ virtual IDeserializedResultStore &queryOnceResultStore() const override
|
|
|
{
|
|
|
assertex(sharedOnceContext);
|
|
|
return sharedOnceContext->queryOnceResultStore();
|
|
|
}
|
|
|
|
|
|
- virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const
|
|
|
+ virtual IPropertyTree &queryOnceContext(const IRoxieContextLogger &logctx) const override
|
|
|
{
|
|
|
assertex(sharedOnceContext);
|
|
|
return sharedOnceContext->queryOnceContext(this, logctx);
|
|
@@ -1295,12 +1306,12 @@ public:
|
|
|
return (const char *) queryDll()->getResource(id);
|
|
|
}
|
|
|
|
|
|
- virtual ActivityArray *lookupGraphActivities(const char *name) const
|
|
|
+ virtual ActivityArray *lookupGraphActivities(const char *name) const override
|
|
|
{
|
|
|
return *graphMap.getValue(name);
|
|
|
}
|
|
|
|
|
|
- virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const
|
|
|
+ virtual IActivityGraph *lookupGraph(IRoxieSlaveContext *ctx, const char *name, IProbeManager *probeManager, const IRoxieContextLogger &logctx, IRoxieServerActivity *parentActivity) const override
|
|
|
{
|
|
|
assertex(name && *name);
|
|
|
ActivityArrayPtr *graph = graphMap.getValue(name);
|
|
@@ -1336,7 +1347,7 @@ public:
|
|
|
toXML(graph, reply);
|
|
|
}
|
|
|
|
|
|
- virtual IPropertyTree* cloneQueryXGMML() const
|
|
|
+ virtual IPropertyTree* cloneQueryXGMML() const override
|
|
|
{
|
|
|
assertex(dll && dll->queryWorkUnit());
|
|
|
Owned<IPropertyTree> tree = createPTree("Query", ipt_lowmem);
|
|
@@ -1354,7 +1365,7 @@ public:
|
|
|
return tree.getClear();
|
|
|
}
|
|
|
|
|
|
- virtual void getStats(StringBuffer &reply, const char *graphName) const
|
|
|
+ virtual void getStats(StringBuffer &reply, const char *graphName) const override
|
|
|
{
|
|
|
if (dll)
|
|
|
{
|
|
@@ -1376,7 +1387,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- virtual void getActivityMetrics(StringBuffer &reply) const
|
|
|
+ virtual void getActivityMetrics(StringBuffer &reply) const override
|
|
|
{
|
|
|
HashIterator i(allActivities);
|
|
|
StringBuffer myReply;
|
|
@@ -1392,7 +1403,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- virtual void getQueryInfo(StringBuffer &reply, bool full, IArrayOf<IQueryFactory> *slaveQueries, const IRoxieContextLogger &logctx) const
|
|
|
+ virtual void getQueryInfo(StringBuffer &reply, bool full, IArrayOf<IQueryFactory> *slaveQueries, const IRoxieContextLogger &logctx) const override
|
|
|
{
|
|
|
Owned<IPropertyTree> xref = createPTree("Query", ipt_fast);
|
|
|
xref->setProp("@id", id);
|
|
@@ -1423,7 +1434,7 @@ public:
|
|
|
}
|
|
|
toXML(xref, reply, 1, XML_Embed|XML_LineBreak|XML_SortTags);
|
|
|
}
|
|
|
- virtual void resetQueryTimings()
|
|
|
+ virtual void resetQueryTimings() override
|
|
|
{
|
|
|
HashIterator i(allActivities);
|
|
|
ForEach(i)
|
|
@@ -1432,56 +1443,56 @@ public:
|
|
|
f->resetNodeProgressInfo();
|
|
|
}
|
|
|
}
|
|
|
- virtual const char *queryErrorMessage() const
|
|
|
+ virtual const char *queryErrorMessage() const override
|
|
|
{
|
|
|
return errorMessage.str();
|
|
|
}
|
|
|
- virtual const char *queryQueryName() const
|
|
|
+ virtual const char *queryQueryName() const override
|
|
|
{
|
|
|
return id;
|
|
|
}
|
|
|
- virtual bool isQueryLibrary() const
|
|
|
+ virtual bool isQueryLibrary() const override
|
|
|
{
|
|
|
return libraryInterfaceHash != 0;
|
|
|
}
|
|
|
- virtual unsigned getQueryLibraryInterfaceHash() const
|
|
|
+ virtual unsigned getQueryLibraryInterfaceHash() const override
|
|
|
{
|
|
|
return libraryInterfaceHash;
|
|
|
}
|
|
|
- virtual void suspend(const char* errMsg)
|
|
|
+ virtual void suspend(const char* errMsg) override
|
|
|
{
|
|
|
isSuspended = true;
|
|
|
isLoadFailed = true;
|
|
|
errorMessage.append(errMsg);
|
|
|
}
|
|
|
|
|
|
- virtual bool loadFailed() const
|
|
|
+ virtual bool loadFailed() const override
|
|
|
{
|
|
|
return isLoadFailed;
|
|
|
}
|
|
|
- virtual bool suspended() const
|
|
|
+ virtual bool suspended() const override
|
|
|
{
|
|
|
return isSuspended;
|
|
|
}
|
|
|
- virtual const QueryOptions &queryOptions() const
|
|
|
+ virtual const QueryOptions &queryOptions() const override
|
|
|
{
|
|
|
return options;
|
|
|
}
|
|
|
- virtual ILoadedDllEntry *queryDll() const
|
|
|
+ virtual ILoadedDllEntry *queryDll() const override
|
|
|
{
|
|
|
assertex(dll);
|
|
|
return dll->queryDll();
|
|
|
}
|
|
|
- virtual IConstWorkUnit *queryWorkUnit() const
|
|
|
+ virtual IConstWorkUnit *queryWorkUnit() const override
|
|
|
{
|
|
|
assertex(dll);
|
|
|
return dll->queryWorkUnit();
|
|
|
}
|
|
|
- virtual const IRoxiePackage &queryPackage() const
|
|
|
+ virtual const IRoxiePackage &queryPackage() const override
|
|
|
{
|
|
|
return package;
|
|
|
}
|
|
|
- virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
|
|
|
+ virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const override
|
|
|
{
|
|
|
throwUnexpected(); // only on server...
|
|
|
}
|
|
@@ -1504,28 +1515,28 @@ public:
|
|
|
return strdup(result ? result : defaultValue);
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const
|
|
|
+ virtual IRoxieSlaveContext *createSlaveContext(const SlaveContextLogger &logctx, IRoxieQueryPacket *packet, bool hasChildren) const override
|
|
|
{
|
|
|
throwUnexpected(); // only implemented in derived slave class
|
|
|
}
|
|
|
|
|
|
- virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const
|
|
|
+ virtual IRoxieServerContext *createContext(IPropertyTree *xml, IHpccProtocolResponse *protocol, unsigned flags, const ContextLogger &_logctx, PTreeReaderOptions xmlReadFlags, const char *querySetName) const override
|
|
|
{
|
|
|
throwUnexpected(); // only implemented in derived server class
|
|
|
}
|
|
|
- virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const
|
|
|
+ virtual IRoxieServerContext *createContext(IConstWorkUnit *wu, const ContextLogger &_logctx) const override
|
|
|
{
|
|
|
throwUnexpected(); // only implemented in derived server class
|
|
|
}
|
|
|
- virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut)
|
|
|
+ virtual void noteQuery(time_t startTime, bool failed, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, unsigned bytesOut) override
|
|
|
{
|
|
|
throwUnexpected(); // only implemented in derived server class
|
|
|
}
|
|
|
- virtual IPropertyTree *getQueryStats(time_t from, time_t to)
|
|
|
+ virtual IPropertyTree *getQueryStats(time_t from, time_t to) override
|
|
|
{
|
|
|
throwUnexpected(); // only implemented in derived server class
|
|
|
}
|
|
|
- virtual void getGraphNames(StringArray &ret) const
|
|
|
+ virtual void getGraphNames(StringArray &ret) const override
|
|
|
{
|
|
|
Owned<IConstWUGraphIterator> graphs = &dll->queryWorkUnit()->getGraphs(GraphTypeActivities);
|
|
|
ForEach(*graphs)
|
|
@@ -1536,7 +1547,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- virtual bool isDynamic() const
|
|
|
+ virtual bool isDynamic() const override
|
|
|
{
|
|
|
return dynamic;
|
|
|
}
|
|
@@ -1557,7 +1568,7 @@ protected:
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- virtual void checkSuspended() const
|
|
|
+ virtual void checkSuspended() const override
|
|
|
{
|
|
|
if (isSuspended)
|
|
|
{
|
|
@@ -1604,7 +1615,7 @@ public:
|
|
|
queryGlobalQueryStatsAggregator()->noteQuery(startTime, failed, elapsed, memused, slavesReplyLen, bytesOut);
|
|
|
}
|
|
|
|
|
|
- virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities)
|
|
|
+ virtual void addDependency(unsigned sourceIdx, unsigned sourceId, unsigned targetId, int controlId, const char *edgeId, ActivityArray * activities) override
|
|
|
{
|
|
|
// addDependency is expected to fail occasionally on slave, but never on Roxie server
|
|
|
if (!doAddDependency(sourceIdx, sourceId, targetId, controlId, edgeId, activities))
|
|
@@ -1615,7 +1626,8 @@ public:
|
|
|
{
|
|
|
bool isLibraryGraph = graph.getPropBool("@library");
|
|
|
bool isSequential = graph.getPropBool("@sequential");
|
|
|
- ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
|
|
|
+ unsigned wfid = graph.getPropInt("@wfid");
|
|
|
+ ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential, wfid);
|
|
|
if (isLibraryGraph)
|
|
|
activities->setLibraryGraphId(graph.getPropInt("node/@id"));
|
|
|
try
|
|
@@ -1650,7 +1662,7 @@ public:
|
|
|
return createWorkUnitServerContext(wu, this, _logctx);
|
|
|
}
|
|
|
|
|
|
- virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const
|
|
|
+ virtual CRoxieWorkflowMachine *createWorkflowMachine(IConstWorkUnit *wu, bool isOnce, const IRoxieContextLogger &logctx) const override
|
|
|
{
|
|
|
IPropertyTree *workflow = queryWorkflowTree();
|
|
|
if (workflow)
|
|
@@ -1929,7 +1941,8 @@ public:
|
|
|
// MORE: common up with loadGraph for the Roxie server..
|
|
|
bool isLibraryGraph = graph.getPropBool("@library");
|
|
|
bool isSequential = graph.getPropBool("@sequential");
|
|
|
- ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential);
|
|
|
+ unsigned wfid = graph.getPropInt("@wfid");
|
|
|
+ ActivityArray *activities = new ActivityArray(isLibraryGraph, false, isLibraryGraph, isSequential, wfid);
|
|
|
if (isLibraryGraph)
|
|
|
activities->setLibraryGraphId(graph.getPropInt("node/@id"));
|
|
|
try
|