Browse Source

Merge branch 'closedown-4.2.x' into candidate-5.0.0

Conflicts:
	roxie/ccd/ccdserver.cpp
	roxie/ccd/ccdstate.cpp
	thorlcr/graph/thgraph.cpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
e939213846

+ 14 - 5
dali/base/dadfs.cpp

@@ -674,11 +674,13 @@ public:
     {
         StringBuffer xpath;
         dlfn.makeFullnameQuery(xpath,DXB_File,true).append("/ClusterLock");
-        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, SDS_CONNECT_TIMEOUT));
-    }
 
-    ~CClustersLockedSection()
-    {
+        /* Avoid RTM_CREATE_QUERY connect() if possible by making 1st call without. This is to avoid write contention caused by RTM_CREATE*
+         * NB: RTM_CREATE_QUERY should probably only gain exclusive access in Dali if node is missing.
+         */
+        conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
+        if (!conn.get()) // NB: ClusterLock is now created at File create time, so this can only be true for pre-existing File's
+            conn.setown(querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE_QUERY | RTM_LOCK_WRITE, SDS_CONNECT_TIMEOUT));
     }
 };
 
@@ -3306,6 +3308,7 @@ public:
 #endif
         parent = _parent;
         root.setown(createPTree(queryDfsXmlBranchName(DXB_File)));
+        root->setPropTree("ClusterLock", createPTree());
 //      fdesc->serializeTree(*root,IFDSF_EXCLUDE_NODES);
         setFileAttrs(fdesc,true);
         setClusters(fdesc);
@@ -8043,7 +8046,13 @@ void CDistributedFileDirectory::addEntry(CDfsLogicalFileName &dlfn,IPropertyTree
     }
     root->setProp("@name",tail.str());
     root->setProp("OrigName",dlfn.get());
-    sroot->addPropTree(superfile?queryDfsXmlBranchName(DXB_SuperFile):queryDfsXmlBranchName(DXB_File),root); // now owns root  
+    if (superfile)
+        sroot->addPropTree(queryDfsXmlBranchName(DXB_SuperFile), root); // now owns root
+    else
+    {
+        IPropertyTree *file = sroot->addPropTree(queryDfsXmlBranchName(DXB_File), root); // now owns root
+        file->setPropTree("ClusterLock", createPTree());
+    }
 }
 
 IDistributedFileIterator *CDistributedFileDirectory::getIterator(const char *wildname, bool includesuper, IUserDescriptor *user)

+ 1 - 1
dali/base/dasds.cpp

@@ -2855,7 +2855,7 @@ PDState CServerRemoteTree::checkChange(IPropertyTree &changeTree, CBranchChange
                 Owned<CBranchChange> childChange = new CBranchChange(*(CRemoteTreeBase *)idTree);
                 if (!removeTree(idTree))
                     throw MakeSDSException(-1, "::checkChange - Failed to remove child(%s) from parent(%s) at %s(%d)", idTree->queryName(), queryName(), __FILE__, __LINE__);
-                mergePDState(res, PDS_Deleted);
+                mergePDState(res, PDS_Structure);
                 if (parentBranchChange)
                 {
                     PDState _res = res;

+ 8 - 2
ecl/hqlcpp/hqlhtcpp.cpp

@@ -15032,10 +15032,16 @@ ABoundActivity * HqlCppTranslator::doBuildActivityExecuteWhen(BuildCtx & ctx, IH
         label = "Parallel";
         when = WhenParallelId;
     }
-    else
+    else if (expr->hasAttribute(beforeAtom))
     {
         label = "Before";
-        when = WhenDefaultId;
+        when = WhenBeforeId;
+    }
+    else
+    {
+        //Should WHEN default to BEFORE or PARALLEL??
+        label = "Parallel";
+        when = WhenParallelId;
     }
 
     bool useImplementationClass = options.minimizeActivityClasses;

+ 1 - 0
ecl/hthor/hthor.cpp

@@ -6220,6 +6220,7 @@ CHThorWhenActionActivity::CHThorWhenActionActivity(IAgentContext &_agent, unsign
 void CHThorWhenActionActivity::ready()
 {
     CHThorSimpleActivityBase::ready();
+    graphElement->executeDependentActions(agent, NULL, WhenBeforeId);
     graphElement->executeDependentActions(agent, NULL, WhenParallelId);
 }
 

+ 10 - 2
roxie/ccd/ccddali.cpp

@@ -57,6 +57,8 @@ public:
         CriticalBlock b(crit);
         try
         {
+            if (traceLevel > 5)
+                DBGLOG("Subscribing to %s, %p", xpath.get(), this);
             change = querySDS().subscribe(xpath, *this, true);
         }
         catch (IException *E)
@@ -71,6 +73,8 @@ public:
         notifier = NULL;
         try
         {
+            if (traceLevel > 5)
+                DBGLOG("unsubscribing from %s, %p", xpath.get(), this);
             if (change)
                 querySDS().unsubscribe(change);
         }
@@ -91,21 +95,25 @@ public:
         // Despite the danger of deadlocks (that requires careful code in the notifier to avoid), I think it is neccessary to hold the lock during the call,
         // as otherwise notifier may point to a deleted object.
         CriticalBlock b(crit);
+        if (traceLevel > 5)
+            DBGLOG("resubscribing to %s, %p", xpath.get(), this);
         change = querySDS().subscribe(xpath, *this, true);
         if (notifier)
             notifier->notify(0, NULL, SDSNotify_None);
     }
-    virtual void notify(SubscriptionId subid, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    virtual void notify(SubscriptionId subid, const char *daliXpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
     {
         Linked<CDaliPackageWatcher> me = this;  // Ensure that I am not released by the notify call (which would then access freed memory to release the critsec)
         Linked<ISDSSubscription> myNotifier;
         {
             CriticalBlock b(crit);
+            if (traceLevel > 5)
+                DBGLOG("Notification on %s (%s), %p", xpath.get(), daliXpath, this);
             myNotifier.set(notifier);
             // allow crit to be released, allowing this to be unsubscribed, to avoid deadlocking when other threads via notify call unsubscribe
         }
         if (myNotifier)
-            myNotifier->notify(subid, xpath, flags, valueLen, valueData);
+            myNotifier->notify(subid, daliXpath, flags, valueLen, valueData);
     }
 };
 

+ 46 - 18
roxie/ccd/ccdserver.cpp

@@ -951,7 +951,13 @@ public:
     {
         CriticalBlock cb(statecrit);
         if (traceStartStop)
-            DBGLOG("%p destroy state=%s", this, queryStateText(state)); // Note- CTXLOG may not be safe
+        {
+            DBGLOG("%p destroy %d state=%s", this, activityId, queryStateText(state)); // Note- CTXLOG may not be safe
+            if (watchActivityId && watchActivityId==activityId)
+            {
+                DBGLOG("WATCH: %p destroy %d state=%s", this, activityId, queryStateText(state)); // Note- CTXLOG may not be safe
+            }
+        }
         if (state!=STATEreset)
         {
             DBGLOG("STATE: Activity %d destroyed but not reset", activityId);
@@ -1160,10 +1166,10 @@ public:
 #ifdef TRACE_STARTSTOP
         if (traceStartStop)
         {
-            CTXLOG("start %d", activityId);
+            CTXLOG("start %p %d", this, activityId);
             if (watchActivityId && watchActivityId==activityId)
             {
-                CTXLOG("WATCH: start %d", activityId);
+                CTXLOG("WATCH: start %p %d", this, activityId);
             }
         }
 #endif
@@ -1240,23 +1246,23 @@ public:
 
     inline void stop(bool aborting)
     {
+        // NOTE - don't be tempted to skip the stop for activities that are reset - splitters need to see the stops
         if (state != STATEstopped)
         {
             CriticalBlock cb(statecrit);
             if (state != STATEstopped)
             {
-                if (state != STATEreset)
-                    state=STATEstopped;
 #ifdef TRACE_STARTSTOP
                 if (traceStartStop)
                 {
-                    CTXLOG("stop %d", activityId);
+                    CTXLOG("stop %p %d (state currently %s)", this, activityId, queryStateText(state));
                     if (watchActivityId && watchActivityId==activityId)
                     {
-                        CTXLOG("WATCH: stop %d", activityId);
+                        CTXLOG("WATCH: stop %p %d", this, activityId);
                     }
                 }
 #endif
+                state=STATEstopped;
                 // NOTE - this is needed to ensure that dependencies which were not used are properly stopped
                 ForEachItemIn(idx, dependencies)
                 {
@@ -1298,10 +1304,10 @@ public:
 #ifdef TRACE_STARTSTOP
                 if (traceStartStop)
                 {
-                    CTXLOG("reset %d", activityId);
+                    CTXLOG("reset %p %d", this, activityId);
                     if (watchActivityId && watchActivityId==activityId)
                     {
-                        CTXLOG("WATCH: reset %d", activityId);
+                        CTXLOG("WATCH: reset %p %d", this, activityId);
                     }
                 }
 #endif
@@ -5968,12 +5974,17 @@ public:
         graph->setResult(helper.querySequence(), result);
     }
 
+    virtual const void *nextInGroup()
+    {
+        return input->nextInGroup(); // I can act as a passthrough input
+    }
+
     IRoxieInput * querySelectOutput(unsigned id)
     {
         if (id == helper.querySequence())
         {
-            executed = true;
-            return LINK(input);
+            executed = true;  // Ensure that we don't try to pull as a sink as well as via the passthrough
+            return LINK(this);
         }
         return NULL;
     }
@@ -6046,12 +6057,17 @@ public:
         graph->setResult(helper.querySequence(), result);
     }
 
+    virtual const void *nextInGroup()
+    {
+        return input->nextInGroup(); // I can act as a passthrough input
+    }
+
     IRoxieInput * querySelectOutput(unsigned id)
     {
         if (id == helper.querySequence())
         {
-            executed = true;
-            return LINK(input);
+            executed = true;  // Ensure that we don't try to pull as a sink as well as via the passthrough
+            return LINK(this);
         }
         return NULL;
     }
@@ -10914,10 +10930,15 @@ public:
         }
         else
         {
-            outSeq->flush(&crc);
-            updateWorkUnitResult(processed);
-            uncompressedBytesWritten = outSeq->getPosition();
-            writer->finish(true, this);
+            if (outSeq)
+                outSeq->flush(&crc);
+            if (outSeq)
+                uncompressedBytesWritten = outSeq->getPosition();
+            if (writer)
+            {
+                updateWorkUnitResult(processed);
+                writer->finish(true, this);
+            }
         }
         writer.clear();
         CRoxieServerActivity::stop(aborting);
@@ -15341,7 +15362,12 @@ public:
         {
             CRoxieServerActivity::reset();
             libraryGraph->reset();
-            //Call reset on all unused outputs from the graph - no one else will.
+            //Call reset on all unused inputs/outputs from the graph - no one else will.
+            for (unsigned i1 = 0; i1 < numInputs; i1++)
+            {
+                if (!inputUsed[i1])
+                    inputAdaptors[i1]->reset();
+            }
             IRoxieServerChildGraph * graph = libraryGraph->queryLoopGraph();
             ForEachItemIn(i3, extra.unusedOutputs)
             {
@@ -19482,6 +19508,7 @@ public:
         savedExtractSize = parentExtractSize;
         savedExtract = parentExtract;
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
+        executeDependencies(parentExtractSize, parentExtract, WhenBeforeId);
         executeDependencies(parentExtractSize, parentExtract, WhenParallelId);        // MORE: This should probably be done in parallel!
     }
 
@@ -19545,6 +19572,7 @@ public:
         savedExtractSize = parentExtractSize;
         savedExtract = parentExtract;
         CRoxieServerActionBaseActivity::start(parentExtractSize, parentExtract, paused);
+        executeDependencies(parentExtractSize, parentExtract, WhenBeforeId);
         executeDependencies(parentExtractSize, parentExtract, WhenParallelId);        // MORE: This should probably be done in parallel!
     }
 

+ 3 - 4
roxie/ccd/ccdstate.cpp

@@ -634,7 +634,7 @@ public:
             StringBuffer compulsoryMsg;
             if (isCompulsory())
                     compulsoryMsg.append(" (Package is compulsory)");
-            if (!opt)
+            if (!opt && !pretendAllOpt)
                 throw MakeStringException(ROXIE_FILE_ERROR, "Could not resolve filename %s%s", fileName.str(), compulsoryMsg.str());
             if (traceLevel > 4)
                 DBGLOG("Could not resolve OPT filename %s%s", fileName.str(), compulsoryMsg.str());
@@ -1454,9 +1454,6 @@ public:
     CRoxiePackageSetWatcher(IRoxieDaliHelper *_daliHelper, ISDSSubscription *_owner, unsigned numChannels, bool forceReload)
     : stateHash(0), daliHelper(_daliHelper), owner(_owner)
     {
-        Owned<IDaliPackageWatcher> notifier = daliHelper->getPackageSetsSubscription(this);
-        if (notifier)
-            notifiers.append(*notifier.getClear());
         ForEachItemIn(idx, allQuerySetNames)
         {
             createQueryPackageManagers(numChannels, allQuerySetNames.item(idx), forceReload);
@@ -1684,6 +1681,7 @@ private:
 
 class CRoxiePackageSetManager : public CInterface, implements IRoxieQueryPackageManagerSet, implements ISDSSubscription
 {
+    Owned<IDaliPackageWatcher> notifier;
 public:
     IMPLEMENT_IINTERFACE;
     CRoxiePackageSetManager(const IQueryDll *_standAloneDll) :
@@ -1692,6 +1690,7 @@ public:
         daliHelper.setown(connectToDali(ROXIE_DALI_CONNECT_TIMEOUT));
         atomic_set(&autoPending, 0);
         autoReloadThread.start();
+        notifier.setown(daliHelper->getPackageSetsSubscription(this));
     }
 
     ~CRoxiePackageSetManager()

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -1344,6 +1344,7 @@ const int WhenDefaultId = 0;
 const int WhenSuccessId = -1;
 const int WhenFailureId = -2;
 const int WhenParallelId = -3;
+const int WhenBeforeId = -4;
 
 typedef IHThorNullArg IHThorWhenActionArg;
 

+ 15 - 0
testing/regress/ecl/key/when9.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+ <Row><s>3</s></Row>
+ <Row><s>1</s></Row>
+ <Row><s>9</s></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><f1>1</f1></Row>
+ <Row><f1>9</f1></Row>
+ <Row><f1>3</f1></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+ <Row><c>1</c></Row>
+</Dataset>

+ 1 - 1
testing/regress/ecl/when8.ecl

@@ -40,7 +40,7 @@ trueValue := true : stored('trueValue');
 
 osumx := IF(trueValue, osum, FAIL('Should not be called'));
 
-x1 := when(simple, osumx);
+x1 := when(simple, osumx, before);
 
 o1 := output(TABLE(x1, { f1 }));
 o2 := output(TABLE(simple, { c := count(group) }, f3));

+ 47 - 0
testing/regress/ecl/when9.ecl

@@ -0,0 +1,47 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//skip type==thorlcr TBD
+
+r := {unsigned f1, unsigned f2, unsigned f3, unsigned f4 };
+
+r t(unsigned a, unsigned b, unsigned c, unsigned d) := TRANSFORM
+    SELF.f1 := a;
+    SELF.f2 := b;
+    SELF.f3 := c;
+    SELF.f4 := d;
+END;
+
+ds := dataset([
+        t(1,2,3,4),
+        t(1,4,2,5),
+        t(9,3,4,5),
+        t(3,4,2,9)]);
+
+simple := dedup(nofold(ds), f1);
+
+osum := output(TABLE(simple, { s := sum(group, f1) }, f3));
+
+trueValue := true : stored('trueValue');
+
+osumx := IF(trueValue, osum, FAIL('Should not be called'));
+
+x1 := when(simple, osumx, parallel);
+
+o1 := output(TABLE(x1, { f1 }));
+o2 := output(TABLE(simple, { c := count(group) }, f3));
+when(o1, o2, parallel);

+ 9 - 0
thorlcr/graph/thgraph.cpp

@@ -706,6 +706,15 @@ bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *par
                 }
                 break;
             }
+            case TAKwhen_dataset:
+            case TAKwhen_action:
+            {
+                if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
+                    return false;
+                if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
+                    return false;
+                break;
+            }
         }
         ForEachItemIn(i, inputs)
         {