Forráskód Böngészése

HPCC-21793 Add rawStats option to roxie queryAggregates

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 6 éve
szülő
commit
e4a1ffae75
3 módosított fájl, 95 hozzáadás és 8 törlés
  1. 92 6
      roxie/ccd/ccdsnmp.cpp
  2. 1 1
      roxie/ccd/ccdsnmp.hpp
  3. 2 1
      roxie/ccd/ccdstate.cpp

+ 92 - 6
roxie/ccd/ccdsnmp.cpp

@@ -624,6 +624,24 @@ class CQueryStatsAggregator : public CInterface, implements IQueryStatsAggregato
                 result.setPropInt("percentile97", 0);
         }
 
+        static void getRawStats(IPropertyTree &result, std::deque<QueryStatsRecord> &useStats)
+        {
+            for (auto r : useStats)
+            {
+                Owned<IPropertyTree> queryStatsRecord = createPTree("QueryStatsRecord", ipt_fast);
+                CDateTime dt;
+                StringBuffer s;
+                dt.set(r.startTime);
+                queryStatsRecord->setProp("@startTime", dt.getString(s.clear(), true).str());
+                queryStatsRecord->setPropInt("elapsedTimeMs", r.elapsedTimeMs);
+                queryStatsRecord->setPropInt("memUsed", r.memUsed);
+                queryStatsRecord->setPropInt("slavesReplyLen", r.slavesReplyLen);
+                queryStatsRecord->setPropInt("bytesOut", r.bytesOut);
+                queryStatsRecord->setPropBool("failed", r.failed);
+                result.addPropTree(queryStatsRecord->queryName(), LINK(queryStatsRecord));
+            }
+        }
+
         static bool checkOlder(const void *_left, const void *_right)
         {
             QueryStatsRecord *left = (QueryStatsRecord *) _left;
@@ -673,6 +691,19 @@ class CQueryStatsAggregator : public CInterface, implements IQueryStatsAggregato
             return (difftime(startTime, from) >= 0 && difftime(to, endTime) > 0);
         }
 
+        bool timeOverlap(time_t from, time_t to)
+        {
+            if (from == startTime)
+                return true;
+
+            double diffFrom = difftime(from, startTime);
+            if (diffFrom > 0 && difftime(endTime, from) > 0)
+                return true;
+            if (diffFrom < 0 && difftime(to, startTime) > 0)
+                return true;
+            return false;
+        }
+
         bool matches(time_t queryTime)
         {
             return (difftime(queryTime, startTime) >= 0 && difftime(queryTime, endTime) < 0);
@@ -785,6 +816,16 @@ class CQueryStatsAggregator : public CInterface, implements IQueryStatsAggregato
                 }
             }
         }
+
+        static void getRawStats(IPropertyTree &result, std::deque<QueryStatsAggregateRecord> &useStats, time_t from, time_t to)
+        {
+            for (auto r : useStats)
+            {
+                Owned<IPropertyTree> queryStatsAggregateRecord = createPTree("QueryStatsAggregateRecord", ipt_fast);
+                r.getStats(*queryStatsAggregateRecord, true);
+                result.addPropTree(queryStatsAggregateRecord->queryName(), LINK(queryStatsAggregateRecord));
+            }
+        }
     };
 
     CriticalSection statsLock;  // Protects multithreaded access to recent and aggregated structures
@@ -877,7 +918,7 @@ public:
     ~CQueryStatsAggregator()
     {
     }
-    static IPropertyTree *getAllQueryStats(bool includeQueries, time_t from, time_t to)
+    static IPropertyTree *getAllQueryStats(bool includeQueries, bool rawStats, time_t from, time_t to)
     {
         Owned<IPTree> result = createPTree("QueryStats", ipt_fast);
         if (includeQueries)
@@ -886,10 +927,16 @@ public:
             ForEachItemIn(idx, queryStatsAggregators)
             {
                 CQueryStatsAggregator &thisQuery = queryStatsAggregators.item(idx);
-                result->addPropTree("Query", thisQuery.getStats(from, to));
+                if (!rawStats)
+                    result->addPropTree("Query", thisQuery.getStats(from, to));
+                else
+                    result->addPropTree("Query", thisQuery.getRawStats(from, to));
             }
         }
-        result->addPropTree("Global", globalStatsAggregator.getStats(from, to));
+        if (!rawStats)
+            result->addPropTree("Global", globalStatsAggregator.getStats(from, to));
+        else
+            result->addPropTree("Global", globalStatsAggregator.getRawStats(from, to));
         return result.getClear();
     }
 
@@ -950,6 +997,39 @@ public:
         }
         return result.getClear();
     }
+    virtual IPropertyTree *getRawStats(time_t from, time_t to)
+    {
+        Owned<IPropertyTree> result = createPTree("Query", ipt_fast);
+        result->setProp("@id", queryName);
+
+        std::deque<QueryStatsRecord> useStats;
+        {
+            CriticalBlock b(statsLock);
+            for (auto rec : recent)
+            {
+                if (rec.inRange(from, to))
+                    useStats.push_back(rec);
+            }
+            // lock is released here, and we process the useStats array at our leisure...
+        }
+        QueryStatsRecord::getRawStats(*result, useStats);
+
+        std::deque<QueryStatsAggregateRecord> aggregatedStats;
+        {
+            CriticalBlock b(statsLock);
+            for (auto thisSlot: aggregated)
+            {
+                if (thisSlot.timeOverlap(from, to))
+                    aggregatedStats.push_back(thisSlot);
+                else if (thisSlot.older(from))
+                    break;
+            }
+            // lock is released here, and we process the aggregator at our leisure...
+        }
+        QueryStatsAggregateRecord::getRawStats(*result, aggregatedStats, from, to);
+
+        return result.getClear();
+    }
     static inline IQueryStatsAggregator *queryGlobalStatsAggregator()
     {
         return &globalStatsAggregator;
@@ -970,9 +1050,9 @@ IQueryStatsAggregator *createQueryStatsAggregator(const char *_queryName, unsign
     return new CQueryStatsAggregator(_queryName, _expirySeconds);
 }
 
-IPropertyTree *getAllQueryStats(bool includeQueries, time_t from, time_t to)
+IPropertyTree *getAllQueryStats(bool includeQueries, bool rawStats, time_t from, time_t to)
 {
-     return CQueryStatsAggregator::getAllQueryStats(includeQueries, from, to);
+     return CQueryStatsAggregator::getAllQueryStats(includeQueries, rawStats, from, to);
 }
 
 //=======================================================================================================
@@ -1020,7 +1100,13 @@ protected:
             DBGLOG("%s", stats.str());
         }
         {
-            Owned<IPropertyTree> p = getAllQueryStats(true, start, end);
+            Owned<IPropertyTree> p = getAllQueryStats(true, false, start, end);
+            StringBuffer stats; 
+            toXML(p, stats);
+            DBGLOG("%s", stats.str());
+        }
+        {
+            Owned<IPropertyTree> p = getAllQueryStats(true, true, start, end);
             StringBuffer stats; 
             toXML(p, stats);
             DBGLOG("%s", stats.str());

+ 1 - 1
roxie/ccd/ccdsnmp.hpp

@@ -75,7 +75,7 @@ interface IQueryStatsAggregator : public IInterface
 
 extern IQueryStatsAggregator *queryGlobalQueryStatsAggregator();
 extern IQueryStatsAggregator *createQueryStatsAggregator(const char *queryName, unsigned expirySeconds);
-extern IPropertyTree *getAllQueryStats(bool includeQueries, time_t from, time_t to);
+extern IPropertyTree *getAllQueryStats(bool includeQueries, bool rawStats, time_t from, time_t to);
 
 extern RelaxedAtomic<unsigned> queryCount;
 extern RoxieQueryStats unknownQueryStats;

+ 2 - 1
roxie/ccd/ccdstate.cpp

@@ -2478,7 +2478,8 @@ private:
                 else
                 {
                     bool includeAllQueries = control->getPropBool("@all", true);
-                    Owned<const IPropertyTree> stats = getAllQueryStats(includeAllQueries, from, to);
+                    bool rawStats = control->getPropBool("@rawStats", false);
+                    Owned<const IPropertyTree> stats = getAllQueryStats(includeAllQueries, rawStats, from, to);
                     toXML(stats, reply);
                 }
             }