|
@@ -5277,6 +5277,26 @@ IRoxieServerActivityFactory *createRoxieServerNullActivityFactory(unsigned _id,
|
|
|
|
|
|
//=================================================================================
|
|
|
|
|
|
+class CRoxieServerNullSinkActivity : public CRoxieServerInternalSinkActivity
|
|
|
+{
|
|
|
+public:
|
|
|
+ CRoxieServerNullSinkActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
+ : CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, 0)
|
|
|
+ {
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void onExecute() override
|
|
|
+ {
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+IRoxieServerActivity * createRoxieServerNullSinkActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
+{
|
|
|
+ return new CRoxieServerNullSinkActivity(_ctx, _factory, _probeManager);
|
|
|
+}
|
|
|
+
|
|
|
+//=================================================================================
|
|
|
+
|
|
|
class CRoxieServerPassThroughActivity : public CRoxieServerActivity
|
|
|
{
|
|
|
public:
|
|
@@ -12095,13 +12115,18 @@ public:
|
|
|
case 0:
|
|
|
switch (kind)
|
|
|
{
|
|
|
- case TAKdiskwrite: return new CRoxieServerDiskWriteActivity(_ctx, this, _probeManager);
|
|
|
- case TAKcsvwrite: return new CRoxieServerCsvWriteActivity(_ctx, this, _probeManager);
|
|
|
+ case TAKdiskwrite:
|
|
|
+ return new CRoxieServerDiskWriteActivity(_ctx, this, _probeManager);
|
|
|
+ case TAKcsvwrite:
|
|
|
+ return new CRoxieServerCsvWriteActivity(_ctx, this, _probeManager);
|
|
|
case TAKxmlwrite:
|
|
|
case TAKjsonwrite:
|
|
|
return new CRoxieServerXmlWriteActivity(_ctx, this, _probeManager, kind);
|
|
|
+ case TAKspillwrite:
|
|
|
+ return new CRoxieServerNullSinkActivity(_ctx, this, _probeManager);
|
|
|
};
|
|
|
throwUnexpected();
|
|
|
+
|
|
|
case 1:
|
|
|
return new CRoxieServerSpillWriteActivity(_ctx, this, _probeManager);
|
|
|
default:
|
|
@@ -12111,7 +12136,7 @@ public:
|
|
|
|
|
|
virtual bool isSink() const
|
|
|
{
|
|
|
- return numOutputs == 0 && !isTemp; // MORE - check with Gavin if this is right if not a temp but reread in same job...
|
|
|
+ return numOutputs == 0 && (kind==TAKspillwrite || !isTemp); // MORE - check with Gavin if this is right if not a temp but reread in same job...
|
|
|
}
|
|
|
|
|
|
};
|
|
@@ -21675,6 +21700,11 @@ public:
|
|
|
return new CRoxieServerRemoteResultActivity(_ctx, this, _probeManager, usageCount);
|
|
|
}
|
|
|
|
|
|
+ virtual bool isSink() const override
|
|
|
+ {
|
|
|
+ return CRoxieServerInternalSinkFactory::isSink() || dependentCount == 0; // Codegen normally optimizes these away, but if it doesn't we need to treat as a sink rather than a dependency or upstream activities are not stopped properly
|
|
|
+ }
|
|
|
+
|
|
|
};
|
|
|
|
|
|
IRoxieServerActivityFactory *createRoxieServerRemoteResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot)
|