Browse Source

HPCC-14871 Add support for stranded inline table

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 9 years ago
parent
commit
f06fb088c2

File diff suppressed because it is too large
+ 442 - 320
common/thorhelper/thorstrand.cpp


+ 10 - 1
common/thorhelper/thorstrand.hpp

@@ -49,11 +49,20 @@ interface IStrandBranch : extends IInterface
     virtual IStrandJunction * queryOutputJunction() = 0;
 };
 
+interface IOrderedOutputCallback
+{
+    virtual bool noteEndOfInputChunk() = 0;
+    virtual void noteEndOfInput() = 0;
+};
+typedef PointerArrayOf<IOrderedOutputCallback> OrderedCallbackArray;
+
+
 extern THORHELPER_API IStrandJunction * createStrandJunction(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned numOutputs, unsigned blockSize, bool isOrdered);
-extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & _rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped);
+extern THORHELPER_API IStrandBranch * createStrandBranch(roxiemem::IRowManager & _rowManager, unsigned numStrands, unsigned blockSize, bool isOrdered, bool isGrouped, bool inputIsStreamed);
 extern THORHELPER_API void clearRowQueue(IRowQueue * queue);
 
 extern THORHELPER_API IManyToOneRowStream * createManyToOneRowStream(roxiemem::IRowManager & _rowManager, unsigned numInputs, unsigned blockSize, bool isOrdered);
+extern THORHELPER_API const void * queryEndOfSectionMarker();
 
 //---------------------------------------------------------------------------------------------------------------------
 

+ 5 - 2
roxie/ccd/ccddebug.cpp

@@ -77,11 +77,14 @@ public:
         return in->gatherConjunctions(collector);
     }
 
-    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags)
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered)
     {
         assertex (!idx);
+        //Need to call
+        //extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered)
         PointerArrayOf<IEngineRowStream> instreams;
-        Owned<IStrandJunction> junction = in->getOutputStreams(ctx, sourceIdx, instreams, false, flags | SFforceSingle);
+        Owned<IStrandJunction> junction = in->getOutputStreams(ctx, sourceIdx, instreams, NULL, consumerOrdered);
+        //MORE: This needs to create a junction if instreams > 1
         // We forced to single, so should not be getting anything but a single stream back
         assertex(junction==NULL);
         assertex(instreams.length()==1);

+ 2 - 0
roxie/ccd/ccdlistener.cpp

@@ -812,6 +812,7 @@ public:
                 pthread_setaffinity_np(GetCurrentThreadId(), sizeof(cpu_set_t), &cpuMask);
             }
         }
+        clearAffinityCache();
 #endif
     }
 
@@ -872,6 +873,7 @@ extern void updateAffinity(unsigned __int64 affinity)
         }
         if (sched_setaffinity(0, sizeof(cpu_set_t), &cpus))
             throw makeStringException(errno, "Failed to set affinity");
+        clearAffinityCache();
 #endif
     }
     RoxieListener::updateAffinity();

+ 2 - 2
roxie/ccd/ccdmain.cpp

@@ -133,7 +133,7 @@ unsigned defaultKeyedJoinPreload = 0;
 unsigned dafilesrvLookupTimeout = 10000;
 bool defaultCheckingHeap = false;
 unsigned defaultStrandBlockSize = 512;
-unsigned defaultForceNumStrands = 0;
+unsigned defaultForceNumStrands = 1;
 
 unsigned slaveQueryReleaseDelaySeconds = 60;
 unsigned coresPerQuery = 0;
@@ -764,7 +764,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         defaultKeyedJoinPreload = topology->getPropInt("@defaultKeyedJoinPreload", 0);
         defaultPrefetchProjectPreload = topology->getPropInt("@defaultPrefetchProjectPreload", 10);
         defaultStrandBlockSize = topology->getPropInt("@defaultStrandBlockSize", 512);
-        defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 0);
+        defaultForceNumStrands = topology->getPropInt("@defaultForceNumStrands", 1);
         defaultCheckingHeap = topology->getPropBool("@checkingHeap", false);  // NOTE - not in configmgr - too dangerous!
 
         slaveQueryReleaseDelaySeconds = topology->getPropInt("@slaveQueryReleaseDelaySeconds", 60);

+ 99 - 99
roxie/ccd/ccdquery.cpp

@@ -522,146 +522,146 @@ protected:
         case TAKalldenormalizegroup:
             return createRoxieServerAllJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKapply:
-            return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerApplyActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKaggregate:
         case TAKexistsaggregate:    // could special case.
         case TAKcountaggregate:
-            return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKcase:
         case TAKchildcase:
-            return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
+            return createRoxieServerCaseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
         case TAKcatch:
         case TAKskipcatch:
         case TAKcreaterowcatch:
-            return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerCatchActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchilditerator:
-            return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerChildIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchoosesets:
-            return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerChooseSetsActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchoosesetsenth:
-            return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerChooseSetsEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchoosesetslast:
-            return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerChooseSetsLastActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKproject:
         case TAKcountproject:
             return createRoxieServerProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node); // code is common between Project, CountProject
         case TAKfilterproject:
-            return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerFilterProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKdatasetresult:
         case TAKrowresult:
-            return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerDatasetResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKdedup:
-            return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKdegroup:
-            return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerDegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKcsvread:
         case TAKxmlread:
         case TAKjsonread:
         case TAKdiskread:
         {       
             if (node.getPropBool("att[@name='_isSpill']/@value", false) || node.getPropBool("att[@name='_isSpillGlobal']/@value", false))
-                return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
+                return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
             else
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         }
         case TAKmemoryspillread:
-            return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSpillReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKdisknormalize:
         case TAKdiskcount:
         case TAKdiskaggregate:
         case TAKdiskgroupaggregate:
         {
             RemoteActivityId remoteId(id, hashValue);
-            return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+            return createRoxieServerDiskReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
         }
         case TAKchildnormalize:
-            return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNewChildNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchildaggregate:
-            return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNewChildAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchildgroupaggregate:
-            return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNewChildGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKchildthroughnormalize:
-            return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNewChildThroughNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKcsvwrite:
         case TAKdiskwrite:
         case TAKxmlwrite:
         case TAKjsonwrite:
         case TAKmemoryspillwrite:
-            return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerDiskWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKindexwrite:
-            return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerIndexWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKenth:
-            return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerEnthActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKfetch:
         case TAKcsvfetch:
         case TAKxmlfetch:
         case TAKjsonfetch:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         case TAKfilter:
-            return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerFilterActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKfiltergroup:
-            return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerFilterGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKfirstn:
-            return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerFirstNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKfunnel:
-            return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerConcatActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKgroup:
-            return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKhashaggregate:
             return createRoxieServerHashAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKif:
         case TAKchildif:
-            return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, isGraphIndependent(node));
+            return createRoxieServerIfActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isGraphIndependent(node));
         case TAKifaction:
-            return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerIfActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKparallel:
-            return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerParallelActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKsequential:
-            return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerSequentialActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKindexread:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerIndexReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         case TAKindexnormalize:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerIndexNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         case TAKindexcount:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerIndexCountActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         case TAKindexaggregate:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerIndexAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         case TAKindexgroupaggregate:
         case TAKindexgroupexists:
         case TAKindexgroupcount:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
+                return createRoxieServerIndexGroupAggregateActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId);
             }
         case TAKhashdedup:
-            return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerHashDedupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKhashdenormalize:
         case TAKhashdistribute:
         case TAKhashdistributemerge:
         case TAKhashjoin:
             throwUnexpected();  // Code generator should have removed or transformed
         case TAKiterate:
-            return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerIterateActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKprocess:
-            return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerProcessActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKjoin:
         case TAKjoinlight:
         case TAKdenormalize:
@@ -675,10 +675,10 @@ protected:
         {
             RemoteActivityId remoteId(id, hashValue);
             RemoteActivityId remoteId2(id | ROXIE_ACTIVITY_FETCH, hashValue);
-            return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, remoteId2, node);
+            return createRoxieServerKeyedJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, remoteId2);
         }
         case TAKlimit:
-            return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKlookupjoin:
         case TAKlookupdenormalize:
         case TAKlookupdenormalizegroup:
@@ -687,147 +687,147 @@ protected:
         case TAKsmartdenormalizegroup:
             return createRoxieServerLookupJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKmerge:
-            return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnormalize:
-            return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNormalizeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnormalizechild:
-            return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNormalizeChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnormalizelinkedchild:
-            return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNormalizeLinkedChildActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnull:
-            return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKsideeffect:
-            return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerSideEffectActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKsimpleaction:
-            return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
+            return createRoxieServerActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
         case TAKparse:
             return createRoxieServerParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node, this);
         case TAKworkunitwrite:
-            return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
+            return createRoxieServerWorkUnitWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
         case TAKdictionaryworkunitwrite:
-            return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
+            return createRoxieServerWorkUnitWriteDictActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
         case TAKpiperead:
-            return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerPipeReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKpipethrough:
-            return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerPipeThroughActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKpipewrite:
-            return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
+            return createRoxieServerPipeWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
         case TAKpull:
-            return createRoxieServerPullActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerPullActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKtrace:
-            return createRoxieServerTraceActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerTraceActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKlinkedrawiterator:
-            return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerLinkedRawIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKremoteresult:
-            return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), isRootAction(node));
+            return createRoxieServerRemoteResultActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), isRootAction(node));
         case TAKrollup:
-            return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerRollupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKsample:
-            return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSampleActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKselectn:
-            return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSelectNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKselfjoin:
         case TAKselfjoinlight:
             return createRoxieServerSelfJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKskiplimit:
         case TAKcreaterowlimit:
-            return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSkipLimitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKhttp_rowdataset:
         case TAKsoap_rowdataset:
-            return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSoapRowCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKsoap_rowaction:
-            return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerSoapRowActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKsoap_datasetdataset:
-            return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSoapDatasetCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKsoap_datasetaction:
-            return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerSoapDatasetActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKsort:
             return createRoxieServerSortActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKspill:
         case TAKmemoryspillsplit:
-            return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerThroughSpillActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKsplit:
-            return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSplitActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKstreamediterator:
-            return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerStreamedIteratorActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKinlinetable:
-            return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerInlineTableActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKthroughaggregate:
             throwUnexpected(); // Concept of through aggregates has been proven not to work in Roxie - codegen should not be creating them any more.
         case TAKtopn:
-            return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerTopNActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKworkunitread:
-            return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerWorkUnitReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKxmlparse:
-            return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerXmlParseActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKquantile:
-            return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerQuantileActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKregroup:
-            return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerRegroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKcombine:
-            return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerCombineActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKcombinegroup:
-            return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerCombineGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKrollupgroup:
-            return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerRollupGroupActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKlocalresultread:
             {
                 unsigned graphId = getGraphId(node);
-                return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
+                return createRoxieServerLocalResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
             }
         case TAKlocalstreamread:
-            return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerLocalResultStreamReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKlocalresultwrite:
             {
                 unsigned graphId = getGraphId(node);
-                return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId, isRootAction(node));
+                return createRoxieServerLocalResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
             }
         case TAKdictionaryresultwrite:
             {
                 unsigned graphId = getGraphId(node);
-                return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId, isRootAction(node));
+                return createRoxieServerDictionaryResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId, isRootAction(node));
             }
         case TAKloopcount:
         case TAKlooprow:
         case TAKloopdataset:
             {
                 unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
-                return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, loopId);
+                return createRoxieServerLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
             }
         case TAKremotegraph:
             {
                 RemoteActivityId remoteId(id, hashValue);
-                return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, isRootAction(node));
+                return createRoxieServerRemoteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, remoteId, isRootAction(node));
             }
         case TAKgraphloopresultread:
             {
                 unsigned graphId = getGraphId(node);
-                return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
+                return createRoxieServerGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
             }
         case TAKgraphloopresultwrite:
             {
                 unsigned graphId = getGraphId(node);
-                return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, usageCount(node), graphId);
+                return createRoxieServerGraphLoopResultWriteActivityFactory(id, subgraphId, *this, helperFactory, kind, node, usageCount(node), graphId);
             }
         case TAKnwaygraphloopresultread:
             {
                 unsigned graphId  = node.getPropInt("att[@name=\"_graphId\"]/@value", 0);
-                return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, graphId);
+                return createRoxieServerNWayGraphLoopResultReadActivityFactory(id, subgraphId, *this, helperFactory, kind, node, graphId);
             }
         case TAKnwayinput:
-            return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNWayInputActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnwaymerge:
-            return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNWayMergeActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnwaymergejoin:
         case TAKnwayjoin:
-            return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNWayMergeJoinActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKsorted:
-            return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerSortedActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKgraphloop:
         case TAKparallelgraphloop:
             {
                 unsigned loopId = node.getPropInt("att[@name=\"_loopid\"]/@value", 0);
-                return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, loopId);
+                return createRoxieServerGraphLoopActivityFactory(id, subgraphId, *this, helperFactory, kind, node, loopId);
             }
         case TAKlibrarycall:
             {
@@ -848,20 +848,20 @@ protected:
                 ForEach(*iter)
                     extra.outputs.append(iter->query().getPropInt("@value"));
 
-                return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, extra);
+                return createRoxieServerLibraryCallActivityFactory(id, subgraphId, *this, helperFactory, kind, node, extra);
             }
         case TAKnwayselect:
-            return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNWaySelectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKnonempty:
-            return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerNonEmptyActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKprefetchproject:
             return createRoxieServerPrefetchProjectActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKwhen_dataset:
-            return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind);
+            return createRoxieServerWhenActivityFactory(id, subgraphId, *this, helperFactory, kind, node);
         case TAKwhen_action:
-            return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerWhenActionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
         case TAKdistribution:
-            return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, isRootAction(node));
+            return createRoxieServerDistributionActivityFactory(id, subgraphId, *this, helperFactory, kind, node, isRootAction(node));
 
         // These are not required in Roxie for the time being - code generator should trap them
         case TAKchilddataset:

File diff suppressed because it is too large
+ 787 - 553
roxie/ccd/ccdserver.cpp


+ 105 - 108
roxie/ccd/ccdserver.hpp

@@ -91,11 +91,7 @@ interface IStrandJunction;
 
 class ClusterWriteHandler;
 
-enum StrandFlags
-{
-    SFforceSingle   = 0x0001,     // Force entire subtree to be single-stranded - eg when debugging or smart-stepping
-    SFpreserveOrder = 0x0002,     // Order must be preserved by any multistranding - returns a  suitable M:1 junction object to restore the order
-};
+class StrandOptions;
 
 interface IFinalRoxieInput : extends IInputBase
 {
@@ -109,10 +105,10 @@ interface IFinalRoxieInput : extends IInputBase
     virtual IRoxieServerActivity *queryActivity() = 0;
     virtual IIndexReadActivityInfo *queryIndexReadActivity() = 0;
 
-    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered) = 0;  // Use StrandFlags values for flags
 };
 
-extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, unsigned flags);
+extern IEngineRowStream *connectSingleStream(IRoxieSlaveContext *ctx, IFinalRoxieInput *input, unsigned idx, Owned<IStrandJunction> &junction, bool consumerOrdered);
 
 interface ISteppedConjunctionCollector;
 
@@ -150,7 +146,7 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual void execute(unsigned parentExtractSize, const byte *parentExtract) = 0;
     virtual void onCreate(IHThorArg *colocalArg) = 0;
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
-    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, bool multiOk, unsigned flags) = 0;  // Use StrandFlags values for flags
+    virtual IStrandJunction *getOutputStreams(IRoxieSlaveContext *ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const StrandOptions * consumerOptions, bool consumerOrdered) = 0;  // Use StrandFlags values for flags
 
     virtual void stop() = 0;
     virtual void abort() = 0;
@@ -164,7 +160,7 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual void serializeCreateStartContext(MemoryBuffer &out) = 0;
     virtual void serializeExtra(MemoryBuffer &out) = 0;
     virtual void stopSink(unsigned idx) = 0;
-    virtual void connectOutputStreams(unsigned flags) = 0;
+    virtual void connectInputStreams(bool consumerOrdered) = 0;
 
     //Functions to support result streaming between parallel loop/graphloop/library implementations
     virtual IFinalRoxieInput * querySelectOutput(unsigned id) = 0;
@@ -219,6 +215,7 @@ interface IRoxieServerActivityFactory : extends IActivityFactory
     virtual IDefRecordMeta *queryActivityMeta() const = 0;
     virtual unsigned numInputs() const = 0;
     virtual const StatisticsMapping &queryStatsMapping() const = 0;
+    virtual bool isInputOrdered(bool consumerOrdered, unsigned idx) const = 0;
 };
 interface IGraphResult : public IInterface
 {
@@ -326,117 +323,117 @@ private:
     CIndexTransformCallback & callback;
 };
 
-extern IRoxieServerActivityFactory *createRoxieServerApplyActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerNullActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerChildIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNewChildNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNewChildAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNewChildGroupAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNewChildThroughNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerDatasetResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerInlineTableActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerWorkUnitReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerLocalResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned graphId);
-extern IRoxieServerActivityFactory *createRoxieServerLocalResultStreamReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerLocalResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, unsigned _graphId, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerDictionaryResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, unsigned _graphId, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerGraphLoopResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned graphId);
-extern IRoxieServerActivityFactory *createRoxieServerGraphLoopResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, unsigned _graphId);
-extern IRoxieServerActivityFactory *createRoxieServerDedupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerHashDedupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerRollupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNormalizeChildActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNormalizeLinkedChildActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerApplyActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerNullActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerChildIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNewChildNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNewChildAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNewChildGroupAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNewChildThroughNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerDatasetResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerInlineTableActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerWorkUnitReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerLocalResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned graphId);
+extern IRoxieServerActivityFactory *createRoxieServerLocalResultStreamReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerLocalResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, unsigned _graphId, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerDictionaryResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, unsigned _graphId, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerGraphLoopResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned graphId);
+extern IRoxieServerActivityFactory *createRoxieServerGraphLoopResultWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, unsigned _graphId);
+extern IRoxieServerActivityFactory *createRoxieServerDedupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerHashDedupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerRollupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNormalizeChildActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNormalizeLinkedChildActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerSortActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerThroughSpillActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSplitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerPipeReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerPipeThroughActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerPipeWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerFilterActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerFilterGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSideEffectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerSampleActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerChooseSetsActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerChooseSetsEnthActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerChooseSetsLastActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerEnthActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerThroughSpillActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSplitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerPipeReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerPipeThroughActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerPipeWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerFilterActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerFilterGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSideEffectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerSampleActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerChooseSetsActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerChooseSetsEnthActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerChooseSetsLastActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerEnthActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerHashAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerDegroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSpillReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerIndexWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerDegroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSpillReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerIndexWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerDenormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerConcatActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerMergeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerRegroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerCombineActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerCombineGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerRollupGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerDenormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerConcatActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerMergeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerRegroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerCombineActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerCombineGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerRollupGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _loopId);
-extern IRoxieServerActivityFactory *createRoxieServerGraphLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _loopId);
-extern IRoxieServerActivityFactory *createRoxieServerRemoteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerIterateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerProcessActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerFirstNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSelectNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerFilterProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _loopId);
+extern IRoxieServerActivityFactory *createRoxieServerGraphLoopActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _loopId);
+extern IRoxieServerActivityFactory *createRoxieServerRemoteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerIterateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerProcessActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerGroupActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerFirstNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSelectNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerSelfJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerLookupJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 extern IRoxieServerActivityFactory *createRoxieServerAllJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerTopNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSkipLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerCatchActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerCaseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _graphInvariant);
-extern IRoxieServerActivityFactory *createRoxieServerIfActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _graphInvariant);
+extern IRoxieServerActivityFactory *createRoxieServerTopNActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSkipLimitActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerCatchActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerCaseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _graphInvariant);
+extern IRoxieServerActivityFactory *createRoxieServerIfActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _graphInvariant);
 extern IRoxieServerActivityFactory *createRoxieServerParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, IResourceContext *rc);
-extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteDictActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerRemoteResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned _usageCount, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerXmlParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerDiskReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerIndexReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerIndexCountActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerIndexAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerIndexGroupAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerIndexNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerWorkUnitWriteDictActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerRemoteResultActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned _usageCount, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerXmlParseActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerDiskReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
+extern IRoxieServerActivityFactory *createRoxieServerIndexReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
+extern IRoxieServerActivityFactory *createRoxieServerIndexCountActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
+extern IRoxieServerActivityFactory *createRoxieServerIndexAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
+extern IRoxieServerActivityFactory *createRoxieServerIndexGroupAggregateActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
+extern IRoxieServerActivityFactory *createRoxieServerIndexNormalizeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
 extern IRoxieServerActivityFactory *createRoxieServerDiskCountActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerFetchActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerFetchActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId);
 extern IRoxieServerActivityFactory *createRoxieServerDummyActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool isLoadDataOnly);
-extern IRoxieServerActivityFactory *createRoxieServerKeyedJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, const RemoteActivityId &_remoteId, const RemoteActivityId &_remoteId2, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerSoapRowCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSoapRowActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerSoapDatasetCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSoapDatasetActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerLinkedRawIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerQuantileActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-
-extern IRoxieServerActivityFactory *createRoxieServerNWayGraphLoopResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, unsigned graphId);
-extern IRoxieServerActivityFactory *createRoxieServerNWayInputActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNWayMergeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNWayMergeJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerSortedActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerNWaySelectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerLibraryCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, LibraryCallFactoryExtra & extra);
-extern IRoxieServerActivityFactory *createRoxieServerNonEmptyActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerIfActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerSequentialActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerParallelActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerKeyedJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, const RemoteActivityId &_remoteId, const RemoteActivityId &_remoteId2);
+extern IRoxieServerActivityFactory *createRoxieServerSoapRowCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSoapRowActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerSoapDatasetCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSoapDatasetActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerLinkedRawIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerQuantileActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+
+extern IRoxieServerActivityFactory *createRoxieServerNWayGraphLoopResultReadActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, unsigned graphId);
+extern IRoxieServerActivityFactory *createRoxieServerNWayInputActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNWayMergeActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNWayMergeJoinActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerSortedActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerNWaySelectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerLibraryCallActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, LibraryCallFactoryExtra & extra);
+extern IRoxieServerActivityFactory *createRoxieServerNonEmptyActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerIfActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerSequentialActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerParallelActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
 extern IRoxieServerActivityFactory *createRoxieServerPrefetchProjectActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
-extern IRoxieServerActivityFactory *createRoxieServerStreamedIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerWhenActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerStreamedIteratorActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerWhenActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerWhenActionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
 
-extern IRoxieServerActivityFactory *createRoxieServerDistributionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, bool _isRoot);
-extern IRoxieServerActivityFactory *createRoxieServerPullActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
-extern IRoxieServerActivityFactory *createRoxieServerTraceActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind);
+extern IRoxieServerActivityFactory *createRoxieServerDistributionActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot);
+extern IRoxieServerActivityFactory *createRoxieServerPullActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
+extern IRoxieServerActivityFactory *createRoxieServerTraceActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode);
 
 extern void throwRemoteException(IMessageUnpackCursor *extra);
 

+ 16 - 2
system/jlib/jdebug.cpp

@@ -26,6 +26,7 @@
 #include "jtime.hpp"
 #include <stdio.h>
 #include <time.h>
+#include <atomic>
 
 #if defined(_DEBUG) && defined(_WIN32) && !defined(USING_MPATROL)
  #undef new
@@ -903,7 +904,7 @@ void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed)
 
 }
 
-unsigned getAffinityCpus()
+static unsigned evalAffinityCpus()
 {
     unsigned numCpus = 0;
     DWORD ProcessAffinityMask, SystemAffinityMask;
@@ -988,7 +989,7 @@ void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed)
     close(cpufd);
 }
 
-unsigned getAffinityCpus()
+static unsigned evalAffinityCpus()
 {
 #ifdef __APPLE__
     // MORE - could do better
@@ -1180,6 +1181,19 @@ void getDiskUsage(char const * path, unsigned __int64 & total, unsigned __int64
 
 #endif
 
+static std::atomic<unsigned> cachedNumCpus;
+unsigned getAffinityCpus()
+{
+    if (cachedNumCpus.load(std::memory_order_acquire) == 0)
+        cachedNumCpus.store(evalAffinityCpus(), std::memory_order_release);
+    return cachedNumCpus.load(std::memory_order_acquire);
+}
+void clearAffinityCache()
+{
+    cachedNumCpus.store(0, std::memory_order_release);
+}
+
+
 static bool matchExtract(const char * prefix, const char * line, memsize_t & value)
 {
     size32_t len = strlen(prefix);

+ 2 - 0
system/jlib/jdebug.hpp

@@ -314,6 +314,8 @@ extern jlib_decl memsize_t getMapInfo(const char *type);
 extern jlib_decl void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed);
 extern jlib_decl void getPeakMemUsage(memsize_t &peakVm,memsize_t &peakResident);
 extern jlib_decl unsigned getAffinityCpus();
+extern jlib_decl void clearAffinityCache(); // should be called whenever the process affinity is changed to reset the cache
+
 extern jlib_decl void printProcMap(const char *fn, bool printbody, bool printsummary, StringBuffer *lnout, MemoryBuffer *mb, bool useprintf);
 extern jlib_decl void PrintMemoryReport(bool full=true);
 extern jlib_decl void printAllocationSummary();

+ 2 - 0
system/jlib/jthread.cpp

@@ -2421,6 +2421,7 @@ void setProcessAffinity(const char * cpuList)
         throw makeStringException(errno, "Failed to set affinity");
     DBGLOG("Process affinity set to %s", cpuList);
 #endif
+    clearAffinityCache();
 }
 
 void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char * optNodes)
@@ -2462,6 +2463,7 @@ void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char
 
     DBGLOG("Process bound to numa node %u of %u", curNode, numNumaNodes);
 #endif
+    clearAffinityCache();
 }
 
 void bindMemoryToLocalNodes()