Sfoglia il codice sorgente

Merge latest changes from candidate-3.6.x to master

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 anni fa
parent
commit
573ba888bc

+ 3 - 0
ecl/hqlcpp/hqlcppds.cpp

@@ -1969,6 +1969,8 @@ void HqlCppTranslator::doBuildDataset(BuildCtx & ctx, IHqlExpression * expr, CHq
             return;
         }
     case no_limit:
+        if (expr->hasProperty(skipAtom) || expr->hasProperty(onFailAtom))
+            break;
         doBuildDatasetLimit(ctx, expr, tgt, format);
         return;
     case no_compound_childread:
@@ -2285,6 +2287,7 @@ void HqlCppTranslator::buildDatasetAssign(BuildCtx & ctx, const CHqlBoundTarget
     switch (op)
     {
     case no_limit:
+        assertex(!expr->hasProperty(skipAtom) && !expr->hasProperty(onFailAtom));
         //Do the limit check as a post test.  
         //It means we may read more records than we need to, but the code is inline, and the code is generally much better.
         if (target.count)

+ 6 - 4
thorlcr/activities/join/thjoin.cpp

@@ -38,7 +38,7 @@ class JoinActivityMaster : public CMasterActivity
     bool islocal;
     bool rightpartition;
     unsigned selfJoinWarnLevel, lastMsgTime;
-    mptag_t mpTagRPC;
+    mptag_t mpTagRPC, barrierMpTag;
     Owned<IBarrier> barrier;
     Owned<ProgressInfo> lhsProgress, rhsProgress;
 
@@ -91,12 +91,14 @@ public:
         selfJoinWarnLevel = INITIAL_SELFJOIN_MATCH_WARNING_LEVEL;
         lastMsgTime = 0;
         mpTagRPC = container.queryJob().allocateMPTag();
-        barrier.setown(container.queryJob().createBarrier(container.queryJob().allocateMPTag()));
+        barrierMpTag = container.queryJob().allocateMPTag();
+        barrier.setown(container.queryJob().createBarrier(barrierMpTag));
         climitedcmp = NULL;
     }
     ~JoinActivityMaster()
     {
         container.queryJob().freeMPTag(mpTagRPC);
+        container.queryJob().freeMPTag(barrierMpTag);
         delete climitedcmp;
     }
     void init()
@@ -106,8 +108,8 @@ public:
     {
         if (!islocal)
         {
-            dst.append((int)mpTagRPC);
-            dst.append((int)barrier->queryTag());
+            serializeMPtag(dst, mpTagRPC);
+            serializeMPtag(dst, barrierMpTag);
         }
     }
     void preStart(size32_t parentExtractSz, const byte *parentExtract)

+ 7 - 4
thorlcr/activities/msort/thmsort.cpp

@@ -55,19 +55,22 @@ public:
 class CMSortActivityMaster : public CMasterActivity
 {
     IThorSorterMaster *imaster;
-    mptag_t mpTagRPC;
+    mptag_t mpTagRPC, barrierMpTag;
     Owned<IBarrier> barrier;
+    
 public:
     CMSortActivityMaster(CMasterGraphElement *info)
       : CMasterActivity(info)
     {
         mpTagRPC = container.queryJob().allocateMPTag();
-        barrier.setown(container.queryJob().createBarrier(container.queryJob().allocateMPTag()));
+        barrierMpTag = container.queryJob().allocateMPTag();
+        barrier.setown(container.queryJob().createBarrier(barrierMpTag));
     }
 
     ~CMSortActivityMaster()
     {
         container.queryJob().freeMPTag(mpTagRPC);
+        container.queryJob().freeMPTag(barrierMpTag);
     }
 
 protected:
@@ -85,8 +88,8 @@ protected:
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
-        dst.append((int)mpTagRPC);
-        dst.append((int)barrier->queryTag());
+        serializeMPtag(dst, mpTagRPC);
+        serializeMPtag(dst, barrierMpTag);
     }   
     virtual void preStart(size32_t parentExtractSz, const byte *parentExtract)
     {

+ 10 - 1
thorlcr/activities/nsplitter/thnsplitterslave.cpp

@@ -295,7 +295,16 @@ public:
         grouped = false;
         eofHit = false;
         recsReady = 0;
-        inputsConfigured = false;
+        if (inputsConfigured)
+        {
+            // ensure old inputs cleared, to avoid being reused before re-setup on subsequent executions
+            ForEachItemIn(o, outputs)
+            {
+                CDelayedInput *delayedInput = (CDelayedInput *)outputs.item(o);
+                delayedInput->setInput(NULL);
+            }
+            inputsConfigured = false;
+        }
     }
     void init(MemoryBuffer &data, MemoryBuffer &slaveData)
     {

+ 33 - 23
thorlcr/graph/thgraph.cpp

@@ -539,6 +539,12 @@ void CGraphElementBase::connectInput(unsigned input, CGraphElementBase *inputAct
     inputAct->connectedOutputs.replace(new CIOConnection(this, input), inputOutIdx);
 }
 
+void CGraphElementBase::addAssociatedChildGraph(CGraphBase *childGraph)
+{
+    if (!associatedChildGraphs.contains(*childGraph))
+        associatedChildGraphs.append(*LINK(childGraph));
+}
+
 void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
 {
     if (!onCreateCalled) return;
@@ -1673,7 +1679,28 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
         resultsGraph = this;
         tmpHandler.setown(queryJob().createTempHandler());
     }
-    
+
+    bool localChild = false;
+    if (owner && parentActivityId)
+    {
+        CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
+        parentElement->addAssociatedChildGraph(this);
+        switch (parentElement->getKind())
+        {
+            case TAKlooprow:
+            case TAKloopcount:
+            case TAKloopdataset:
+            case TAKgraphloop:
+            case TAKparallelgraphloop:
+                if (!parentElement->queryLocal())
+                    global = true;
+                break;
+            default:
+                localChild = true;
+                break;
+        }
+    }
+
     Owned<IPropertyTreeIterator> nodes = xgmml->getElements("node");
     ForEach(*nodes)
     {
@@ -1689,29 +1716,12 @@ void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGrap
         }
         else
         {
-            if (owner && parentActivityId)
+            if (localChild && !e.getPropBool("att[@name=\"coLocal\"]/@value", false))
             {
-                CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
-                parentElement->addAssociatedChildGraph(this);
-                switch (parentElement->getKind())
-                {
-                    case TAKlooprow:
-                    case TAKloopcount:
-                    case TAKloopdataset:
-                    case TAKgraphloop:
-                    case TAKparallelgraphloop:
-                        break;
-                    default:
-                        // not a loop graph, force it to be local child graph
-                        if (!e.getPropBool("att[@name=\"coLocal\"]/@value", false))
-                        {
-                            IPropertyTree *att = createPTree("att");
-                            att->setProp("@name", "coLocal");
-                            att->setPropBool("@value", true);
-                            e.addPropTree("att", att);
-                        }
-                        break;
-                }
+                IPropertyTree *att = createPTree("att");
+                att->setProp("@name", "coLocal");
+                att->setPropBool("@value", true);
+                e.addPropTree("att", att);
             }
             CGraphElementBase *act = createGraphElement(e, *this, resultsGraph);
             addActivity(act);

+ 1 - 1
thorlcr/graph/thgraph.hpp

@@ -270,7 +270,7 @@ public:
     void clearConnections();
     virtual void connectInput(unsigned which, CGraphElementBase *input, unsigned inputOutIdx);
     void setResultsGraph(CGraphBase *_resultsGraph) { resultsGraph = _resultsGraph; }
-    void addAssociatedChildGraph(CGraphBase *childGraph) { associatedChildGraphs.append(*LINK(childGraph)); }
+    void addAssociatedChildGraph(CGraphBase *childGraph);
     void releaseIOs();
     void addDependsOn(CGraphBase *graph, int controlId);
     IThorGraphDependencyIterator *getDependsIterator() const;

+ 22 - 16
thorlcr/graph/thgraphmaster.cpp

@@ -192,14 +192,17 @@ void CSlaveMessageHandler::main()
                         DBGLOG("%s", msg.str());
                         parentExtract = graph->setParentCtx(parentExtractSz, parentExtract);
                     }
-                    Owned<IThorActivityIterator> iter = graph->getIterator();
-                    // onCreate all
-                    ForEach (*iter)
                     {
-                        CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
-                        element.onCreate();
-                        if (isDiskInput(element.getKind()))
-                            element.onStart(parentExtractSz, parentExtract);
+                        CriticalBlock b(graph->queryCreateLock());
+                        Owned<IThorActivityIterator> iter = graph->getIterator();
+                        // onCreate all
+                        ForEach (*iter)
+                        {
+                            CMasterGraphElement &element = (CMasterGraphElement &)iter->query();
+                            element.onCreate();
+                            if (isDiskInput(element.getKind()))
+                                element.onStart(parentExtractSz, parentExtract);
+                        }
                     }
                     msg.clear();
                     graph->serializeCreateContexts(msg);
@@ -213,16 +216,19 @@ void CSlaveMessageHandler::main()
                     Owned<CMasterGraph> graph = (CMasterGraph *)job.getGraph(gid);
                     assertex(graph);
                     CGraphElementArray toSerialize;
-                    loop
                     {
-                        activity_id id;
-                        msg.read(id);
-                        if (!id)
-                            break;
-                        CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
-                        assertex(element);
-                        element->doCreateActivity();
-                        toSerialize.append(*LINK(element));
+                        CriticalBlock b(graph->queryCreateLock());
+                        loop
+                        {
+                            activity_id id;
+                            msg.read(id);
+                            if (!id)
+                                break;
+                            CMasterGraphElement *element = (CMasterGraphElement *)graph->queryElement(id);
+                            assertex(element);
+                            element->doCreateActivity();
+                            toSerialize.append(*LINK(element));
+                        }
                     }
                     msg.clear();
                     CMessageBuffer replyMsg;

+ 1 - 0
thorlcr/graph/thgraphmaster.ipp

@@ -63,6 +63,7 @@ public:
 
     virtual void init();
     virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
+    CriticalSection &queryCreateLock() { return createdCrit; }
     void handleSlaveDone(unsigned node, MemoryBuffer &mb);
     void serializeCreateContexts(MemoryBuffer &mb);
     void serializeStartCtxs(MemoryBuffer &mb);

+ 3 - 0
thorlcr/thorutil/thbuf.cpp

@@ -648,6 +648,7 @@ class COverflowableBuffer : public CSimpleInterface, implements IRowWriterMultiR
     Owned<IRowWriter> diskout;
     Owned<IFile> tmpfile;
     unsigned readersInUse;
+    SpinLock readerLock;
 
     void diskSwitch()
     {
@@ -707,6 +708,7 @@ public:
     }
     void readerStop()
     {
+        SpinBlock b(readerLock);
         --readersInUse;
     }
     void flush()
@@ -719,6 +721,7 @@ public:
     }
     IRowStream *getReader()
     {
+        SpinBlock b(readerLock);
         flush();
         class COverflowReader : public CSimpleInterface, implements IRowStream
         {