|
@@ -817,8 +817,10 @@ public:
|
|
|
|
|
|
virtual bool isSink() const
|
|
|
{
|
|
|
+ //If an internal result has as many dependencies (within the graph) as uses (which includes from outside the graph) then don't execute it unconditionally.
|
|
|
+ bool internalSpillAllUsesWithinGraph = (isInternal && dependentCount && dependentCount==usageCount);
|
|
|
//only a sink if a root activity
|
|
|
- return isRoot && !(isInternal && dependentCount && dependentCount==usageCount); // MORE - it's possible for this to get the answer wrong still, since usageCount does not include references from main procedure. Gavin?
|
|
|
+ return isRoot && !internalSpillAllUsesWithinGraph;
|
|
|
}
|
|
|
|
|
|
virtual void getEdgeProgressInfo(unsigned idx, IPropertyTree &edge) const
|
|
@@ -5315,6 +5317,17 @@ public:
|
|
|
{
|
|
|
}
|
|
|
|
|
|
+ //override execute() to ensure that start is nevr called on any input activities.
|
|
|
+ virtual void execute(unsigned parentExtractSize, const byte * parentExtract) override
|
|
|
+ {
|
|
|
+ CriticalBlock b(ecrit);
|
|
|
+ if (!executed)
|
|
|
+ {
|
|
|
+ executed = true;
|
|
|
+ stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
virtual void onExecute() override
|
|
|
{
|
|
|
}
|
|
@@ -21646,6 +21659,7 @@ public:
|
|
|
class CRoxieServerWorkUnitWriteActivityFactory : public CRoxieServerInternalSinkFactory
|
|
|
{
|
|
|
bool isReread;
|
|
|
+ bool isUnused = false;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)
|
|
@@ -21654,13 +21668,27 @@ public:
|
|
|
isReread = usageCount > 0;
|
|
|
Owned<IHThorWorkUnitWriteArg> helper = (IHThorWorkUnitWriteArg *) helperFactory();
|
|
|
isInternal = (helper->getSequence()==ResultSequenceInternal);
|
|
|
+ isUnused = isInternal && (usageCount == 0);
|
|
|
}
|
|
|
|
|
|
virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
|
|
|
{
|
|
|
+ if (isUnused && !CRoxieServerInternalSinkFactory::isSink())
|
|
|
+ {
|
|
|
+ if (_ctx->queryTraceLevel() > 2)
|
|
|
+ DBGLOG("Workunit write %u is unused - create null activity", id);
|
|
|
+ //Create a null sink activity that is always executed to ensure that stop() is called on the input.
|
|
|
+ return createRoxieServerNullSinkActivity(_ctx, this, _probeManager);
|
|
|
+ }
|
|
|
+
|
|
|
return new CRoxieServerWorkUnitWriteActivity(_ctx, this, _probeManager, isReread, usageCount);
|
|
|
}
|
|
|
|
|
|
+ virtual bool isSink() const
|
|
|
+ {
|
|
|
+ return isUnused || CRoxieServerInternalSinkFactory::isSink();
|
|
|
+ }
|
|
|
+
|
|
|
};
|
|
|
|
|
|
IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)
|