Browse Source

Merge branch 'candidate-7.12.x' into candidate-8.0.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 4 years ago
parent
commit
29792b730d

+ 1 - 1
esp/services/ws_machine/ws_machineService.cpp

@@ -2397,7 +2397,7 @@ void Cws_machineEx::readThorUsageReq(const char* name, IConstEnvironment* constE
 
     Owned<IPropertyTree> machineReq = createMachineUsageReq(constEnv, computer);
     xpath.setf("Machine[@netAddress='%s']", machineReq->queryProp("@netAddress"));
-    if (componentReq->queryPropTree(xpath))
+    if (componentReq->hasProp(xpath))
     {   //ThorMasterProcess is running on one of the ThorSlaveProcess machines.
         //So, we do not add this machine again.
         usageReq->addPropTree(componentReq->queryName(), LINK(componentReq));

+ 52 - 17
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -957,6 +957,7 @@ protected:
     OwnedMalloc<CInMemJoinBase *> channels;
 
     std::atomic<unsigned> interChannelToNotifyCount{0}; // only used on channel 0
+    Owned<IBitSet> rhsChannelStop; // only used on channel 0
     InterruptableSemaphore interChannelBarrierSem;
     bool channelActivitiesAssigned;
 
@@ -1382,6 +1383,7 @@ public:
         return broadcastLock;
     }
     HTHELPER *queryTable() { return table; }
+    IBitSet *queryRhsChannelStopSet() { dbgassertex(0 == queryJobChannelNumber()); return rhsChannelStop; }
     void startLeftInput()
     {
         try
@@ -1440,6 +1442,8 @@ public:
             {
                 rowProcessor = new CRowProcessor(*this);
                 broadcastLock = new CriticalSection;
+                if (queryJob().queryJobChannels()>1)
+                    rhsChannelStop.setown(createThreadSafeBitSet());
             }
             sharedRightRowInterfaces.setown(createThorRowInterfaces(rightRowManager, rightOutputMeta, queryId(), queryHeapFlags(), &queryJobChannel().querySharedMemCodeContext()));
             rhs.setup(sharedRightRowInterfaces);
@@ -1543,6 +1547,9 @@ public:
             broadcaster->reset();
         stopInput(0, "(L)");
         left.clear();
+        if (isGlobal() && (queryJob().queryJobChannels()>1) && (0 == queryJobChannelNumber()))
+            rhsChannelStop->reset();
+
         dataLinkStop();
     }
     virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override
@@ -2496,11 +2503,25 @@ protected:
                  * If any have spilt, still need to distribute rest of RHS..
                  */
                 bool ok = handleGlobalRHS(marker, helper->isRightAlreadySorted(), stopping);
-                if (stopping)
+
+                if (queryJob().queryJobChannels()>1)
+                {
+                    if (stopping)
+                        channels[0]->queryRhsChannelStopSet()->set(queryJobChannelNumber(), true);
+                    InterChannelBarrier();
+                    // if all stopping, then scan will == # channels
+                    if (queryJob().queryJobChannels() == channels[0]->queryRhsChannelStopSet()->scan(0, false))
+                    {
+                        ActPrintLog("All channels in global getRHS stopped");
+                        return;
+                    }
+                }
+                else if (stopping)
                 {
                     ActPrintLog("Global getRHS stopped");
                     return;
                 }
+
                 if (ok)
                     ActPrintLog("RHS global rows fitted in memory in this channel, count: %" RIPF "d", rhs.ordinality());
                 else
@@ -2787,15 +2808,14 @@ public:
         {
             if (isGlobal())
             {
-                if (gotRHS)
-                {
-                    // Other channels sharing HT. So do not reset until all here
-                    // NB: See handleGlobalRHS, if any channel failed over to local, all will have been marked failed over to local.
-                    if (!hasFailedOverToLocal() && queryJob().queryJobChannels()>1)
-                        InterChannelBarrier();
-                }
-                else
+                if (!gotRHS)
                     getRHS(true); // If global, need to handle RHS until all are slaves stop
+
+                // Other channels sharing HT. So do not reset until all here
+                // NB: See handleGlobalRHS, if any channel failed over to local, all will have been marked failed over to local.
+                // Also need to wait for other channels, if some proceeded to use RHS when others did not.
+                if (!hasFailedOverToLocal() && queryJob().queryJobChannels()>1)
+                    InterChannelBarrier();
             }
 
             if (rhsDistributor)
@@ -3256,8 +3276,24 @@ protected:
             if (isGlobal())
             {
                 doBroadcastRHS(stopping);
-                if (stopping) // broadcast done and no-one spilt, this node can now stop
+
+                if (queryJob().queryJobChannels()>1)
+                {
+                    if (stopping)
+                        channels[0]->queryRhsChannelStopSet()->set(queryJobChannelNumber(), true);
+                    InterChannelBarrier();
+                    // if all stopping, then scan will == # channels
+                    if (queryJob().queryJobChannels() == channels[0]->queryRhsChannelStopSet()->scan(0, false))
+                    {
+                        ActPrintLog("All channels in global getRHS stopped");
+                        return;
+                    }
+                }
+                else if (stopping)
+                {
+                    ActPrintLog("Global getRHS stopped");
                     return;
+                }
 
                 if (0 == queryJobChannelNumber()) // All channels broadcast, but only channel 0 receives
                 {
@@ -3343,14 +3379,13 @@ public:
         {
             if (isGlobal())
             {
-                if (gotRHS)
-                {
-                    // Other channels sharing HT. So do not reset until all here
-                    if (queryJob().queryJobChannels()>1)
-                        InterChannelBarrier();
-                }
-                else
+                if (!gotRHS)
                     getRHS(true); // If global, need to handle RHS until all are slaves stop
+
+                // Other channels sharing HT. So do not reset until all here
+                // Also need to wait for other channels, if some proceeded to use RHS when others did not.
+                if (queryJob().queryJobChannels()>1)
+                    InterChannelBarrier();
             }
         }
         PARENT::stop();