Przeglądaj źródła

Merge remote-tracking branch 'origin/candidate-4.2.6' into closedown-4.2.x

Conflicts:
	version.cmake

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 lat temu
rodzic
commit
70ae9bf3a3

+ 14 - 5
dali/base/dadfs.cpp

@@ -579,11 +579,13 @@ public:
     {
         StringBuffer xpath;
         dlfn.makeFullnameQuery(xpath,DXB_File,true).append("/ClusterLock");
-        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, SDS_CONNECT_TIMEOUT));
-    }
 
-    ~CClustersLockedSection()
-    {
+        /* Avoid RTM_CREATE_QUERY connect() if possible by making 1st call without. This is to avoid write contention caused by RTM_CREATE*
+         * NB: RTM_CREATE_QUERY should probably only gain exclusive access in Dali if node is missing.
+         */
+        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
+        if (!conn.get()) // NB: ClusterLock is now created at File create time, so this can only be true for pre-existing File's
+            conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE_QUERY | RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
     }
 };
 
@@ -3192,6 +3194,7 @@ public:
 #endif
         parent = _parent;
         root.setown(createPTree(queryDfsXmlBranchName(DXB_File)));
+        root->setPropTree("ClusterLock", createPTree());
 //      fdesc->serializeTree(*root,IFDSF_EXCLUDE_NODES);
         setFileAttrs(fdesc,true);
         setClusters(fdesc);
@@ -7969,7 +7972,13 @@ void CDistributedFileDirectory::addEntry(CDfsLogicalFileName &dlfn,IPropertyTree
     }
     root->setProp("@name",tail.str());
     root->setProp("OrigName",dlfn.get());
-    sroot->addPropTree(superfile?queryDfsXmlBranchName(DXB_SuperFile):queryDfsXmlBranchName(DXB_File),root); // now owns root  
+    if (superfile)
+        sroot->addPropTree(queryDfsXmlBranchName(DXB_SuperFile), root); // now owns root
+    else
+    {
+        IPropertyTree *file = sroot->addPropTree(queryDfsXmlBranchName(DXB_File), root); // now owns root
+        file->setPropTree("ClusterLock", createPTree());
+    }
 }
 
 IDistributedFileIterator *CDistributedFileDirectory::getIterator(const char *wildname, bool includesuper, IUserDescriptor *user)

+ 1 - 1
dali/base/dasds.cpp

@@ -2807,7 +2807,7 @@ PDState CServerRemoteTree::checkChange(IPropertyTree &changeTree, CBranchChange
                 Owned<CBranchChange> childChange = new CBranchChange(*(CRemoteTreeBase *)idTree);
                 if (!removeTree(idTree))
                     throw MakeSDSException(-1, "::checkChange - Failed to remove child(%s) from parent(%s) at %s(%d)", idTree->queryName(), queryName(), __FILE__, __LINE__);
-                mergePDState(res, PDS_Deleted);
+                mergePDState(res, PDS_Structure);
                 if (parentBranchChange)
                 {
                     PDState _res = res;

+ 8 - 2
ecl/hqlcpp/hqlhtcpp.cpp

@@ -14750,10 +14750,16 @@ ABoundActivity * HqlCppTranslator::doBuildActivityExecuteWhen(BuildCtx & ctx, IH
         label = "Parallel";
         when = WhenParallelId;
     }
-    else
+    else if (expr->hasAttribute(beforeAtom))
     {
         label = "Before";
-        when = WhenDefaultId;
+        when = WhenBeforeId;
+    }
+    else
+    {
+        //Should WHEN default to BEFORE or PARALLEL??
+        label = "Parallel";
+        when = WhenParallelId;
     }
 
     bool useImplementationClass = options.minimizeActivityClasses;

+ 1 - 0
ecl/hthor/hthor.cpp

@@ -6217,6 +6217,7 @@ CHThorWhenActionActivity::CHThorWhenActionActivity(IAgentContext &_agent, unsign
 void CHThorWhenActionActivity::ready()
 {
     CHThorSimpleActivityBase::ready();
+    graphElement->executeDependentActions(agent, NULL, WhenBeforeId);
     graphElement->executeDependentActions(agent, NULL, WhenParallelId);
 }
 

+ 10 - 2
roxie/ccd/ccddali.cpp

@@ -57,6 +57,8 @@ public:
         CriticalBlock b(crit);
         try
         {
+            if (traceLevel > 5)
+                DBGLOG("Subscribing to %s, %p", xpath.get(), this);
             change = querySDS().subscribe(xpath, *this, true);
         }
         catch (IException *E)
@@ -71,6 +73,8 @@ public:
         notifier = NULL;
         try
         {
+            if (traceLevel > 5)
+                DBGLOG("unsubscribing from %s, %p", xpath.get(), this);
             if (change)
                 querySDS().unsubscribe(change);
         }
@@ -91,21 +95,25 @@ public:
         // Despite the danger of deadlocks (that requires careful code in the notifier to avoid), I think it is neccessary to hold the lock during the call,
         // as otherwise notifier may point to a deleted object.
         CriticalBlock b(crit);
+        if (traceLevel > 5)
+            DBGLOG("resubscribing to %s, %p", xpath.get(), this);
         change = querySDS().subscribe(xpath, *this, true);
         if (notifier)
             notifier->notify(0, NULL, SDSNotify_None);
     }
-    virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    virtual void notify(SubscriptionId subid, const char *daliXpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
         Linked<CDaliPackageWatcher> me = this;  // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
         Linked<ISDSSubscription> myNotifier;
         {
             CriticalBlock b(crit);
+            if (traceLevel > 5)
+                DBGLOG("Notification on %s (%s), %p", xpath.get(), daliXpath, this);
             myNotifier.set(notifier);
             // allow crit to be released, allowing this to be unsubscribed, to avoid deadlocking when other threads via notify call unsubscribe
         }
         if (myNotifier)
-            myNotifier->notify(subid, xpath, flags, valueLen, valueData);
+            myNotifier->notify(subid, daliXpath, flags, valueLen, valueData);
     }
 };
 

+ 112 - 47
roxie/ccd/ccdserver.cpp

@@ -887,6 +887,7 @@ protected:
     IRoxieSlaveContext *ctx;
     const IRoxieServerActivityFactory *factory;
     IRoxieServerActivityCopyArray dependencies;
+    IntArray dependencyIndexes;
     IntArray dependencyControlIds;
     IArrayOf<IActivityGraph> childGraphs;
     CachedOutputMetaData meta;
@@ -943,7 +944,13 @@ public:
     {
         CriticalBlock cb(statecrit);
         if (traceStartStop)
-            DBGLOG("%p destroy state=%s", this, queryStateText(state)); // Note- CTXLOG may not be safe
+        {
+            DBGLOG("%p destroy %d state=%s", this, activityId, queryStateText(state)); // Note- CTXLOG may not be safe
+            if (watchActivityId && watchActivityId==activityId)
+            {
+                DBGLOG("WATCH: %p destroy %d state=%s", this, activityId, queryStateText(state)); // Note- CTXLOG may not be safe
+            }
+        }
         if (state!=STATEreset)
         {
             DBGLOG("STATE: Activity %d destroyed but not reset", activityId);
@@ -1150,10 +1157,10 @@ public:
 #ifdef TRACE_STARTSTOP
         if (traceStartStop)
         {
-            CTXLOG("start %d", activityId);
+            CTXLOG("start %p %d", this, activityId);
             if (watchActivityId && watchActivityId==activityId)
             {
-                CTXLOG("WATCH: start %d", activityId);
+                CTXLOG("WATCH: start %p %d", this, activityId);
             }
         }
 #endif
@@ -1230,23 +1237,29 @@ public:
 
     inline void stop(bool aborting)
     {
+        // NOTE - don't be tempted to skip the stop for activities that are reset - splitters need to see the stops
         if (state != STATEstopped)
         {
             CriticalBlock cb(statecrit);
             if (state != STATEstopped)
             {
-                if (state != STATEreset)
-                    state=STATEstopped;
 #ifdef TRACE_STARTSTOP
                 if (traceStartStop)
                 {
-                    CTXLOG("stop %d", activityId);
+                    CTXLOG("stop %p %d (state currently %s)", this, activityId, queryStateText(state));
                     if (watchActivityId && watchActivityId==activityId)
                     {
-                        CTXLOG("WATCH: stop %d", activityId);
+                        CTXLOG("WATCH: stop %p %d", this, activityId);
                     }
                 }
 #endif
+                state=STATEstopped;
+                // NOTE - this is needed to ensure that dependencies which were not used are properly stopped
+                ForEachItemIn(idx, dependencies)
+                {
+                    if (dependencyControlIds.item(idx) == 0)
+                        dependencies.item(idx).stopSink(dependencyIndexes.item(idx));
+                }
                 if (input)
                     input->stop(aborting);
             }
@@ -1282,10 +1295,10 @@ public:
 #ifdef TRACE_STARTSTOP
                 if (traceStartStop)
                 {
-                    CTXLOG("reset %d", activityId);
+                    CTXLOG("reset %p %d", this, activityId);
                     if (watchActivityId && watchActivityId==activityId)
                     {
-                        CTXLOG("WATCH: reset %d", activityId);
+                        CTXLOG("WATCH: reset %p %d", this, activityId);
                     }
                 }
 #endif
@@ -1303,6 +1316,7 @@ public:
     virtual void addDependency(IRoxieServerActivity &source, unsigned sourceIdx, int controlId) 
     {
         dependencies.append(source);
+        dependencyIndexes.append(sourceIdx);
         dependencyControlIds.append(controlId);
     } 
 
@@ -1322,6 +1336,11 @@ public:
         throw MakeStringException(ROXIE_SINK, "Internal error: executeChild() requires a suitable sink");
     }
 
+    virtual void stopSink(unsigned idx)
+    {
+        throw MakeStringException(ROXIE_SINK, "Internal error: stopSink() requires a suitable sink");
+    }
+
     virtual __int64 evaluate() 
     {
         throw MakeStringException(ROXIE_SINK, "Internal error: evaluate() requires a function");
@@ -2116,19 +2135,31 @@ public:
 class CRoxieServerInternalSinkActivity : public CRoxieServerActivity
 {
 protected:
+    unsigned numOutputs;
     bool executed;
+    bool *stopped;
     CriticalSection ecrit;
     Owned<IException> exception;
 
 public:
-    CRoxieServerInternalSinkActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerActivity(_factory, _probeManager)
+    CRoxieServerInternalSinkActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerActivity(_factory, _probeManager), numOutputs(_numOutputs)
     {
         executed = false;
+        stopped = new bool[numOutputs];
+        for (unsigned s = 0; s < numOutputs; s++)
+            stopped[s] = false;
+    }
+
+    ~CRoxieServerInternalSinkActivity()
+    {
+        delete [] stopped;
     }
 
     virtual void reset()
     {
+        for (unsigned s = 0; s < numOutputs; s++)
+            stopped[s] = false;
         executed = false;
         exception.clear();
         CRoxieServerActivity::reset();
@@ -2139,6 +2170,18 @@ public:
         return NULL;
     }
 
+    virtual void stopSink(unsigned outputIdx)
+    {
+        if (!stopped[outputIdx])
+        {
+            stopped[outputIdx] = true;
+            for (unsigned s = 0; s < numOutputs; s++)
+                if (!stopped[s])
+                    return;
+            stop(false); // all outputs stopped - stop parent.
+        }
+    }
+
     virtual const void *nextInGroup()
     {
         throwUnexpected(); // I am nobody's input
@@ -4443,7 +4486,7 @@ class CRoxieServerApplyActivity : public CRoxieServerInternalSinkActivity
 
 public:
     CRoxieServerApplyActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorApplyArg &) basehelper)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper((IHThorApplyArg &) basehelper)
     {
     }
 
@@ -5813,8 +5856,8 @@ class CRoxieServerLocalResultWriteActivity : public CRoxieServerInternalSinkActi
     unsigned graphId;
 
 public:
-    CRoxieServerLocalResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorLocalResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerLocalResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorLocalResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -5835,12 +5878,17 @@ public:
         graph->setResult(helper.querySequence(), result);
     }
 
+    virtual const void *nextInGroup()
+    {
+        return input->nextInGroup(); // I can act as a passthrough input
+    }
+
     IRoxieInput * querySelectOutput(unsigned id)
     {
         if (id == helper.querySequence())
         {
-            executed = true;
-            return LINK(input);
+            executed = true;  // Ensure that we don't try to pull as a sink as well as via the passthrough
+            return LINK(this);
         }
         return NULL;
     }
@@ -5859,7 +5907,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerLocalResultWriteActivity(this, _probeManager, graphId);
+        return new CRoxieServerLocalResultWriteActivity(this, _probeManager, graphId, usageCount);
     }
 
 };
@@ -5878,8 +5926,8 @@ class CRoxieServerDictionaryResultWriteActivity : public CRoxieServerInternalSin
     unsigned graphId;
 
 public:
-    CRoxieServerDictionaryResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDictionaryResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerDictionaryResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _usageCount, unsigned _graphId)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _usageCount), helper((IHThorDictionaryResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -5913,12 +5961,17 @@ public:
         graph->setResult(helper.querySequence(), result);
     }
 
+    virtual const void *nextInGroup()
+    {
+        return input->nextInGroup(); // I can act as a passthrough input
+    }
+
     IRoxieInput * querySelectOutput(unsigned id)
     {
         if (id == helper.querySequence())
         {
-            executed = true;
-            return LINK(input);
+            executed = true;  // Ensure that we don't try to pull as a sink as well as via the passthrough
+            return LINK(this);
         }
         return NULL;
     }
@@ -5936,7 +5989,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerDictionaryResultWriteActivity(this, _probeManager, graphId);
+        return new CRoxieServerDictionaryResultWriteActivity(this, _probeManager, usageCount, graphId);
     }
 };
 
@@ -6113,8 +6166,8 @@ class CRoxieServerGraphLoopResultWriteActivity : public CRoxieServerInternalSink
     unsigned graphId;
 
 public:
-    CRoxieServerGraphLoopResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
+    CRoxieServerGraphLoopResultWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
     {
         graph = NULL;
     }
@@ -6199,7 +6252,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerGraphLoopResultWriteActivity(this, _probeManager, graphId);
+        return new CRoxieServerGraphLoopResultWriteActivity(this, _probeManager, graphId, usageCount);
     }
 
 };
@@ -9019,8 +9072,8 @@ class CRoxieServerPipeWriteActivity : public CRoxieServerInternalSinkActivity
     bool recreate;
     bool inputExhausted;
 public:
-    CRoxieServerPipeWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorPipeWriteArg &)basehelper)
+    CRoxieServerPipeWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorPipeWriteArg &)basehelper)
     {
         recreate = helper.recreateEachRow();
         firstRead = false;
@@ -9140,7 +9193,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerPipeWriteActivity(this, _probeManager);
+        return new CRoxieServerPipeWriteActivity(this, _probeManager, usageCount);
     }
 };
 
@@ -9663,8 +9716,8 @@ class CRoxieServerActionActivity : public CRoxieServerInternalSinkActivity
     IHThorActionArg &helper;
 public:
 
-    CRoxieServerActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorActionArg &)basehelper)
+    CRoxieServerActionActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorActionArg &)basehelper)
     {
     }
 
@@ -9684,7 +9737,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerActionActivity(this, _probeManager);
+        return new CRoxieServerActionActivity(this, _probeManager, usageCount);
     }
 };
 
@@ -10711,7 +10764,7 @@ protected:
 
 public:
     CRoxieServerDiskWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDiskWriteArg &)basehelper)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper((IHThorDiskWriteArg &)basehelper)
     {
         extend = ((helper.getFlags() & TDWextend) != 0);
         overwrite = ((helper.getFlags() & TDWoverwrite) != 0);
@@ -10781,10 +10834,15 @@ public:
         }
         else
         {
-            outSeq->flush(&crc);
-            updateWorkUnitResult(processed);
-            uncompressedBytesWritten = outSeq->getPosition();
-            writer->finish(true, this);
+            if (outSeq)
+                outSeq->flush(&crc);
+            if (outSeq)
+                uncompressedBytesWritten = outSeq->getPosition();
+            if (writer)
+            {
+                updateWorkUnitResult(processed);
+                writer->finish(true, this);
+            }
         }
         writer.clear();
         CRoxieServerActivity::stop(aborting);
@@ -11195,7 +11253,7 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity,
 
 public:
     CRoxieServerIndexWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper(static_cast<IHThorIndexWriteArg &>(basehelper))
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, 0), helper(static_cast<IHThorIndexWriteArg &>(basehelper))
     {
         overwrite = ((helper.getFlags() & TIWoverwrite) != 0);
         reccount = 0;
@@ -15115,7 +15173,12 @@ public:
         {
             CRoxieServerActivity::reset();
             libraryGraph->reset();
-            //Call reset on all unused outputs from the graph - no one else will.
+            //Call reset on all unused inputs/outputs from the graph - no one else will.
+            for (unsigned i1 = 0; i1 < numInputs; i1++)
+            {
+                if (!inputUsed[i1])
+                    inputAdaptors[i1]->reset();
+            }
             IRoxieServerChildGraph * graph = libraryGraph->queryLoopGraph();
             ForEachItemIn(i3, extra.unusedOutputs)
             {
@@ -19050,6 +19113,7 @@ public:
         savedExtractSize = parentExtractSize;
         savedExtract = parentExtract;
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        executeDependencies(parentExtractSize, parentExtract, WhenBeforeId);
         executeDependencies(parentExtractSize, parentExtract, WhenParallelId);        // MORE: This should probably be done in parallel!
     }
 
@@ -19113,6 +19177,7 @@ public:
         savedExtractSize = parentExtractSize;
         savedExtract = parentExtract;
         CRoxieServerActionBaseActivity::start(parentExtractSize, parentExtract, paused);
+        executeDependencies(parentExtractSize, parentExtract, WhenBeforeId);
         executeDependencies(parentExtractSize, parentExtract, WhenParallelId);        // MORE: This should probably be done in parallel!
     }
 
@@ -19315,8 +19380,8 @@ class CRoxieServerWorkUnitWriteActivity : public CRoxieServerInternalSinkActivit
     IRoxieServerContext *serverContext;
 
 public:
-    CRoxieServerWorkUnitWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
+    CRoxieServerWorkUnitWriteActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _isReread, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorWorkUnitWriteArg &)basehelper), isReread(_isReread)
     {
         grouped = (helper.getFlags() & POFgrouped) != 0;
         serverContext = NULL;
@@ -19460,7 +19525,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerWorkUnitWriteActivity(this, _probeManager, isReread);
+        return new CRoxieServerWorkUnitWriteActivity(this, _probeManager, isReread, usageCount);
     }
 
 };
@@ -19479,8 +19544,8 @@ class CRoxieServerWorkUnitWriteDictActivity : public CRoxieServerInternalSinkAct
     IRoxieServerContext *serverContext;
 
 public:
-    CRoxieServerWorkUnitWriteDictActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorDictionaryWorkUnitWriteArg &)basehelper)
+    CRoxieServerWorkUnitWriteDictActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _usageCount)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _usageCount), helper((IHThorDictionaryWorkUnitWriteArg &)basehelper)
     {
         serverContext = NULL;
     }
@@ -19534,7 +19599,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerWorkUnitWriteDictActivity(this, _probeManager);
+        return new CRoxieServerWorkUnitWriteDictActivity(this, _probeManager, usageCount);
     }
 
 };
@@ -19552,8 +19617,8 @@ class CRoxieServerRemoteResultActivity : public CRoxieServerInternalSinkActivity
     IHThorRemoteResultArg &helper;
 
 public:
-    CRoxieServerRemoteResultActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_factory, _probeManager), helper((IHThorRemoteResultArg &)basehelper)
+    CRoxieServerRemoteResultActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numOutputs)
+        : CRoxieServerInternalSinkActivity(_factory, _probeManager, _numOutputs), helper((IHThorRemoteResultArg &)basehelper)
     {
     }
 
@@ -19577,7 +19642,7 @@ public:
 
     virtual IRoxieServerActivity *createActivity(IProbeManager *_probeManager) const
     {
-        return new CRoxieServerRemoteResultActivity(this, _probeManager);
+        return new CRoxieServerRemoteResultActivity(this, _probeManager, usageCount);
     }
 
 };

+ 1 - 0
roxie/ccd/ccdserver.hpp

@@ -159,6 +159,7 @@ interface IRoxieServerActivity : extends IActivityBase
     virtual void executeChild(size32_t & retSize, void * & ret, unsigned parentExtractSize, const byte * parentExtract) = 0;
     virtual void serializeCreateStartContext(MemoryBuffer &out) = 0;
     virtual void serializeExtra(MemoryBuffer &out) = 0;
+    virtual void stopSink(unsigned idx) = 0;
 //Functions to support result streaming between parallel loop/graphloop/library implementations
     virtual IRoxieInput * querySelectOutput(unsigned id) = 0;
     virtual bool querySetStreamInput(unsigned id, IRoxieInput * _input) = 0;

+ 3 - 4
roxie/ccd/ccdstate.cpp

@@ -483,7 +483,7 @@ public:
             StringBuffer compulsoryMsg;
             if (isCompulsory())
                     compulsoryMsg.append(" (Package is compulsory)");
-            if (!opt)
+            if (!opt && !pretendAllOpt)
                 throw MakeStringException(ROXIE_FILE_ERROR, "Could not resolve filename %s%s", fileName.str(), compulsoryMsg.str());
             if (traceLevel > 4)
                 DBGLOG("Could not resolve OPT filename %s%s", fileName.str(), compulsoryMsg.str());
@@ -1295,9 +1295,6 @@ public:
     CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, unsigned numChannels)
     : stateHash(0), daliHelper(_daliHelper), owner(_owner)
     {
-        Owned<IDaliPackageWatcher> notifier = daliHelper->getPackageSetsSubscription(this);
-        if (notifier)
-            notifiers.append(*notifier.getClear());
         ForEachItemIn(idx, allQuerySetNames)
         {
             createQueryPackageManagers(numChannels, allQuerySetNames.item(idx));
@@ -1525,12 +1522,14 @@ private:
 
 class CRoxiePackageSetManager : public CInterface, implements IRoxieQueryPackageManagerSet, implements ISDSSubscription
 {
+    Owned<IDaliPackageWatcher> notifier;
 public:
     IMPLEMENT_IINTERFACE;
     CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
         standAloneDll(_standAloneDll)
     {
         daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
+        notifier.setown(daliHelper->getPackageSetsSubscription(this));
     }
 
     ~CRoxiePackageSetManager()

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -1282,6 +1282,7 @@ const int WhenDefaultId = 0;
 const int WhenSuccessId = -1;
 const int WhenFailureId = -2;
 const int WhenParallelId = -3;
+const int WhenBeforeId = -4;
 
 typedef IHThorNullArg IHThorWhenActionArg;
 

+ 1 - 1
testing/ecl/when8.ecl

@@ -40,7 +40,7 @@ trueValue := true : stored('trueValue');
 
 osumx := IF(trueValue, osum, FAIL('Should not be called'));
 
-x1 := when(simple, osumx);
+x1 := when(simple, osumx, before);
 
 o1 := output(TABLE(x1, { f1 }));
 o2 := output(TABLE(simple, { c := count(group) }, f3));

+ 12 - 0
testing/regress/ecl/key/resetsplitter.xml

@@ -0,0 +1,12 @@
+<Dataset name='Result 1'>
+ <Row><id>64</id></Row>
+ <Row><id>65</id></Row>
+ <Row><id>66</id></Row>
+ <Row><id>67</id></Row>
+ <Row><id>68</id></Row>
+ <Row><id>69</id></Row>
+ <Row><id>70</id></Row>
+ <Row><id>71</id></Row>
+ <Row><id>72</id></Row>
+ <Row><id>73</id></Row>
+</Dataset>

+ 15 - 0
testing/regress/ecl/key/when9.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+ <Row><s>3</s></Row>
+ <Row><s>1</s></Row>
+ <Row><s>9</s></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><f1>1</f1></Row>
+ <Row><f1>9</f1></Row>
+ <Row><f1>3</f1></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+</Dataset>

+ 36 - 0
testing/regress/ecl/resetsplitter.ecl

@@ -0,0 +1,36 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+idRecord := { unsigned id; };
+
+myDataset := DATASET(100, TRANSFORM(idRecord, SELF.id := COUNTER), DISTRIBUTED);
+
+filtered := NOFOLD(myDataset)(id % 20 != 0);
+
+filter1 := NOFOLD(filtered)(id % 3 != 0);
+
+filter2 := NOFOLD(filtered)(id % 3 != 1);
+
+p1 := PROJECT(NOFOLD(filtered), TRANSFORM(idRecord, SELF.id := LEFT.id + COUNT(filter1)));
+
+p2 := PROJECT(NOFOLD(filtered), TRANSFORM(idRecord, SELF.id := LEFT.id + COUNT(filter2)));
+
+boolean test := false : stored('test');
+
+r := IF(test, NOFOLD(p1), NOFOLD(p2));
+
+output(CHOOSEN(NOFOLD(r), 10));

+ 47 - 0
testing/regress/ecl/when9.ecl

@@ -0,0 +1,47 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//skip type==thorlcr TBD
+
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(nofold(ds), f1);
+
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
+
+trueValue := true : stored('trueValue');
+
+osumx := IF(trueValue, osum, FAIL('Should not be called'));
+
+x1 := when(simple, osumx, parallel);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
+when(o1, o2, parallel);

+ 23 - 0
thorlcr/graph/thgraph.cpp

@@ -678,6 +678,29 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
                     return true;
                 break;
             }
+
+            case TAKsequential:
+            case TAKparallel:
+            {
+                /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
+                 * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
+                 */
+                for (unsigned s=1; s<=dependsOn.ordinality(); s++)
+                {
+                    if (!executeDependencies(parentExtractSz, parentExtract, s, async))
+                        return false;
+                }
+                break;
+            }
+            case TAKwhen_dataset:
+            case TAKwhen_action:
+            {
+                if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
+                    return false;
+                if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
+                    return false;
+                break;
+            }
         }
         ForEachItemIn(i, inputs)
         {