Переглянути джерело

HPCC-19620 Add Smart Join value to indicate type of join performed

Allows users to tell if smart join degraded to local lookup or
standed hash join.
Published value equates to either - globallookup, locallookup or
standard join.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 7 роки тому
батько
коміт
1908bcc008

+ 2 - 0
system/jlib/jstatcodes.h

@@ -210,6 +210,8 @@ enum StatisticKind
     StNumAttribsProcessed,
     StNumAttribsSimplified,
     StNumAttribsFromCache,
+    StNumSmartJoinDegradedToLocal,      // number of times global smart join degraded to local smart join (<=1 unless in loop)
+    StNumSmartJoinSlavesDegradedToStd,  // number of times a slave in smart join degraded from local smart join to standard hash join
     StMax,
 
     //For any quantity there is potentially the following variants.

+ 5 - 0
system/jlib/jstats.cpp

@@ -417,6 +417,8 @@ StringBuffer & formatStatistic(StringBuffer & out, unsigned __int64 value, Stati
     case SMeasureId:
     case SMeasureFilename:
         return out.append(value);
+    case SMeasureEnum:
+        return out.append("Enum{").append(value).append("}"); // JCS->GH for now, should map to known enum text somehow
     default:
         return out.append(value).append('?');
     }
@@ -729,6 +731,7 @@ extern jlib_decl StatsMergeAction queryMergeMode(StatisticKind kind)
 #define PERSTAT(y) STAT(Per, y, SMeasurePercent)
 #define IPV4STAT(y) STAT(IPV4, y, SMeasureIPV4)
 #define CYCLESTAT(y) St##Cycle##y##Cycles, SMeasureCycle, St##Time##y, St##Cycle##y##Cycles, { NAMES(Cycle, y##Cycles) }, { TAGS(Cycle, y##Cycles) }
+#define ENUMSTAT(y) STAT(Enum, y, SMeasureEnum)
 
 //--------------------------------------------------------------------------------------------------------------------
 
@@ -842,6 +845,8 @@ static const StatisticMeta statsMetaData[StMax] = {
     { NUMSTAT(AttribsProcessed) },
     { NUMSTAT(AttribsSimplified) },
     { NUMSTAT(AttribsFromCache) },
+    { NUMSTAT(SmartJoinDegradedToLocal) },
+    { NUMSTAT(SmartJoinSlavesDegradedToStd) },
 };
 
 

+ 49 - 12
thorlcr/activities/lookupjoin/thlookupjoin.cpp

@@ -20,9 +20,14 @@
 #include "thexception.hpp"
 #include "thbufdef.hpp"
 
+#include "thlookupjoincommon.hpp"
+
 class CLookupJoinActivityMaster : public CMasterActivity
 {
     mptag_t broadcast2MpTag, broadcast3MpTag, lhsDistributeTag, rhsDistributeTag;
+    unsigned failoversToLocal = 0;
+    Owned<CThorStats> localFailoverToStd;
+    bool isGlobal = false;
 
     bool isAll() const
     {
@@ -38,18 +43,23 @@ class CLookupJoinActivityMaster : public CMasterActivity
 public:
     CLookupJoinActivityMaster(CMasterGraphElement * info) : CMasterActivity(info)
     {
-        mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
-        if (!isAll())
+        isGlobal = !container.queryLocal() && (queryJob().querySlaves()>1);
+        if (isGlobal)
         {
-            broadcast2MpTag = container.queryJob().allocateMPTag();
-            broadcast3MpTag = container.queryJob().allocateMPTag();
-            lhsDistributeTag = container.queryJob().allocateMPTag();
-            rhsDistributeTag = container.queryJob().allocateMPTag();
+            mpTag = container.queryJob().allocateMPTag(); // NB: base takes ownership and free's
+            if (!isAll())
+            {
+                broadcast2MpTag = container.queryJob().allocateMPTag();
+                broadcast3MpTag = container.queryJob().allocateMPTag();
+                lhsDistributeTag = container.queryJob().allocateMPTag();
+                rhsDistributeTag = container.queryJob().allocateMPTag();
+            }
         }
+        localFailoverToStd.setown(new CThorStats(queryJob(), StNumSmartJoinSlavesDegradedToStd));
     }
     ~CLookupJoinActivityMaster()
     {
-        if (!isAll())
+        if (isGlobal && !isAll())
         {
             container.queryJob().freeMPTag(broadcast2MpTag);
             container.queryJob().freeMPTag(broadcast3MpTag);
@@ -58,8 +68,10 @@ public:
             // NB: if mpTag is allocated, the activity base class frees
         }
     }
-    void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave) override
     {
+        if (!isGlobal)
+            return;
         serializeMPtag(dst, mpTag);
         if (!isAll())
         {
@@ -69,12 +81,37 @@ public:
             serializeMPtag(dst, rhsDistributeTag);
         }
     }
+    virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
+    {
+        CMasterActivity::deserializeStats(node, mb);
+
+        if (isSmartJoin(*this))
+        {
+            if (isGlobal)
+            {
+                unsigned _failoversToLocal;
+                mb.read(_failoversToLocal);
+                dbgassertex(0 == failoversToLocal || (_failoversToLocal == failoversToLocal)); // i.e. sanity check, all slaves must have agreed.
+                failoversToLocal = _failoversToLocal;
+            }
+            unsigned failoversToStd;
+            mb.read(failoversToStd);
+            localFailoverToStd->set(node, failoversToStd);
+        }
+    }
+    virtual void getActivityStats(IStatisticGatherer & stats) override
+    {
+        CMasterActivity::getActivityStats(stats);
+        if (isSmartJoin(*this))
+        {
+            if (isGlobal)
+                stats.addStatistic(StNumSmartJoinDegradedToLocal, failoversToLocal);
+            localFailoverToStd->getTotalStat(stats);
+        }
+    }
 };
 
 CActivityBase *createLookupJoinActivityMaster(CMasterGraphElement *container)
 {
-    if (container->queryLocal() || 1 == container->queryJob().querySlaves())
-        return new CMasterActivity(container);
-    else
-        return new CLookupJoinActivityMaster(container);
+    return new CLookupJoinActivityMaster(container);
 }

+ 40 - 0
thorlcr/activities/lookupjoin/thlookupjoincommon.hpp

@@ -0,0 +1,40 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef THLOOKUPSLAVECOMMON_HPP
+#define THLOOKUPSLAVECOMMON_HPP
+
+#include "eclhelper.hpp"
+
+#include "thgraph.hpp"
+
+
+inline bool isSmartJoin(CActivityBase &activity)
+{
+    switch (activity.queryContainer().getKind())
+    {
+        case TAKsmartjoin:
+        case TAKsmartdenormalize:
+        case TAKsmartdenormalizegroup:
+            return true;
+    }
+    return false;
+}
+
+#endif
+
+

+ 41 - 29
thorlcr/activities/lookupjoin/thlookupjoinslave.cpp

@@ -26,6 +26,7 @@
 #include "thorxmlwrite.hpp"
 #include "../hashdistrib/thhashdistribslave.ipp"
 #include "thsortu.hpp"
+#include "thlookupjoincommon.hpp"
 
 #ifdef _DEBUG
 #define _TRACEBROADCAST
@@ -1734,9 +1735,11 @@ protected:
     unsigned spillCompInfo;
     Owned<IHashDistributor> lhsDistributor, rhsDistributor;
     ICompare *compareLeft;
-    atomic_t failedOverToLocal, failedOverToStandard;
+    std::atomic<bool> failedOverToLocal{false}, failedOverToStandard{false};
     CriticalSection broadcastSpillingLock;
     Owned<IJoinHelper> joinHelper;
+    std::atomic<unsigned> aggregateFailoversToLocal{0}; // total number of times this activity has failed over to local smart join (0/1 unless in loop)
+    std::atomic<unsigned> aggregateFailoversToStandard{0}; // total number of times this activity has failed over to standard hash join (0/1 unless in loop)
 
     // NB: Only used by channel 0
     Owned<CFileOwner> overflowWriteFile;
@@ -1745,10 +1748,20 @@ protected:
     OwnedMalloc<IChannelDistributor *> channelDistributors;
 
     inline bool isSmart() const { return smart; }
-    inline void setFailoverToLocal(bool tf) { atomic_set(&failedOverToLocal, (int)tf); }
-    inline void setFailoverToStandard(bool tf) { atomic_set(&failedOverToStandard, (int)tf); }
-    inline bool hasFailedOverToLocal() const { return 0 != atomic_read(&failedOverToLocal); }
-    inline bool hasFailedOverToStandard() const { return 0 != atomic_read(&failedOverToStandard); }
+    inline void setFailoverToLocal()
+    {
+        bool expectedState = false;
+        if (failedOverToLocal.compare_exchange_strong(expectedState, true))
+            ++aggregateFailoversToLocal;
+    }
+    inline void setFailoverToStandard()
+    {
+        bool expectedState = false;
+        if (failedOverToStandard.compare_exchange_strong(expectedState, true))
+            ++aggregateFailoversToStandard;
+    }
+    inline bool hasFailedOverToLocal() const { return failedOverToLocal; }
+    inline bool hasFailedOverToStandard() const { return failedOverToStandard; }
     inline bool isRhsCollated() const { return rhsCollated; }
     rowidx_t clearNonLocalRows(CThorRowArrayWithFlushMarker &rows, unsigned slave)
     {
@@ -1794,7 +1807,7 @@ protected:
         CriticalBlock b(broadcastSpillingLock);
         if (!hasFailedOverToLocal())
         {
-            setFailoverToLocal(true);
+            setFailoverToLocal();
             ActPrintLog("Clearing non-local rows - cause: %s", msg);
 
             broadcaster->stop(myNodeNum, bcastflag_spilt); // signals to broadcast to start stopping immediately and to signal spilt to others
@@ -1999,7 +2012,7 @@ protected:
      * handleGlobalRHS() attempts to broadcast and gather RHS rows and setup HT on channel 0
      * Checks at various stages if spilt and bails out.
      * Side effect of setting 'rhsCollated' based on ch0 value on all channels
-     * and setting setFailoverToLocal(true) if fails over.
+     * and setting setFailoverToLocal() if fails over.
      */
     bool handleGlobalRHS(CMarker &marker, bool globallySorted, bool stopping)
     {
@@ -2136,7 +2149,7 @@ protected:
             if (isSmart())
                 rightRowManager->removeRowBuffer(lkJoinCh0);
             if (lkJoinCh0->hasFailedOverToLocal())
-                setFailoverToLocal(true);
+                setFailoverToLocal();
             rhsCollated = lkJoinCh0->isRhsCollated();
         }
         ActPrintLog("Channel memory manager report");
@@ -2265,8 +2278,10 @@ protected:
         // roxiemem::IBufferedRowCallback impl.
             virtual bool freeBufferedRows(bool critical)
             {
-                CriticalBlock b(crit);
+                CriticalBlock b(crit); // JCSMORE no idea why this crit is here, looks like should be deleted.
+
                 owner.ActPrintLog("CChannelDistributor free memory callback called");
+                owner.setFailoverToStandard(); // mark early so statistics / eclwatch can see that activity has failed over to hash join asap.
                 unsigned startSpillChannel = nextSpillChannel;
                 for (;;)
                 {
@@ -2474,7 +2489,7 @@ protected:
                     {
                         ActPrintLog("Global SMART JOIN spilt to disk during Distributed Local Lookup handling. Failing over to Standard Join");
                         rightStream.setown(rightCollector->getStream());
-                        setFailoverToStandard(true);
+                        setFailoverToStandard();
                     }
 
                     // start LHS distributor, needed by local lookup or full join
@@ -2496,7 +2511,7 @@ protected:
                 {
                     rightStream.setown(rightCollector->getStream());
                     ActPrintLog("Local SMART JOIN spilt to disk. Failing over to regular local join");
-                    setFailoverToStandard(true);
+                    setFailoverToStandard();
                 }
                 else
                     ActPrintLog("RHS local rows fitted in memory in this channel, count: %" RIPF "d", rhs.ordinality());
@@ -2511,7 +2526,7 @@ protected:
                         marker.reset();
                     if (!prepareLocalHT(marker, *rightCollector)) // can cause others to spill, but must not be allowed to spill channel rows I'm working on.
                     {
-                        setFailoverToStandard(true);
+                        setFailoverToStandard();
                         ActPrintLog("Out of memory trying to prepare [LOCAL] hashtable for a SMART join (%" RIPF "d rows), will now failover to a std hash join", rhs.ordinality());
                     }
                     rightStream.setown(rightCollector->getStream(false, &rhs));
@@ -2520,7 +2535,7 @@ protected:
             if (rightStream)
             {
                 ActPrintLog("Performing STANDARD JOIN");
-                setFailoverToStandard(true);
+                setFailoverToStandard();
                 setupStandardJoin(rightStream); // NB: rightStream is sorted
             }
             else
@@ -2583,8 +2598,6 @@ public:
     {
         rhsCollated = rhsCompacted = false;
         broadcast2MpTag = broadcast3MpTag = lhsDistributeTag = rhsDistributeTag = TAG_NULL;
-        setFailoverToLocal(false);
-        setFailoverToStandard(false);
         leftHash = helper->queryHashLeft();
         rightHash = helper->queryHashRight();
         compareRight = helper->queryCompareRight();
@@ -2600,18 +2613,7 @@ public:
             atMost = (unsigned)-1;
         if (abortLimit < atMost)
             atMost = abortLimit;
-
-        switch (container.getKind())
-        {
-            case TAKsmartjoin:
-            case TAKsmartdenormalize:
-            case TAKsmartdenormalizegroup:
-                smart = true;
-                break;
-            default:
-                smart = false;
-                break;
-        }
+        smart = isSmartJoin(*this);
         overflowWriteCount = 0;
         spillCompInfo = 0x0;
         if (getOptBool(THOROPT_COMPRESS_SPILLS, true))
@@ -2703,11 +2705,11 @@ public:
             {
                 if (isGlobal())
                 {
-                    setFailoverToLocal(false);
+                    failedOverToLocal = false;
                     rhsCollated = rhsCompacted = false;
                 }
             }
-            setFailoverToStandard(false);
+            failedOverToStandard = false;
         }
     }
     CATCH_NEXTROW()
@@ -2900,6 +2902,16 @@ public:
             overflowWriteStream->putRow(rhsInRowsTemp.getClear(r));
         return true;
     }
+    virtual void serializeStats(MemoryBuffer &mb) override
+    {
+        CSlaveActivity::serializeStats(mb);
+        if (isSmart())
+        {
+            if (isGlobal())
+                mb.append(aggregateFailoversToLocal); // NB: is going to be same for all slaves.
+            mb.append(aggregateFailoversToStandard);
+        }
+    }
 };
 
 class CTableCommon : public CInterfaceOf<IInterface>

+ 12 - 1
thorlcr/graph/thgraphmaster.cpp

@@ -2828,7 +2828,7 @@ void CThorStats::tallyValue(unsigned __int64 thiscount, unsigned n)
     }
 }
 
-void CThorStats::processInfo()
+void CThorStats::processTotal()
 {
     reset();
     ForEachItemIn(n, counts)
@@ -2836,9 +2836,20 @@ void CThorStats::processInfo()
         unsigned __int64 thiscount = counts.item(n);
         tallyValue(thiscount, n+1);
     }
+}
+
+void CThorStats::processInfo()
+{
+    processTotal();
     calculateSkew();
 }
 
+void CThorStats::getTotalStat(IStatisticGatherer & stats)
+{
+    processTotal();
+    stats.addStatistic(kind, tot);
+}
+
 void CThorStats::getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual)
 {
     processInfo();

+ 2 - 0
thorlcr/graph/thgraphmaster.ipp

@@ -210,9 +210,11 @@ public:
 
     void extract(unsigned node, const CRuntimeStatisticCollection & stats);
     void set(unsigned node, unsigned __int64 count);
+    void getTotalStat(IStatisticGatherer & stats);
     void getStats(IStatisticGatherer & stats, bool suppressMinMaxWhenEqual);
 
 protected:
+    void processTotal();
     void calculateSkew();
     void tallyValue(unsigned __int64 value, unsigned node);
 };