Selaa lähdekoodia

Merge pull request #5658 from afishbeck/optControlQueries

HPCC-10924 Merge roxie endpoint results for control:queries

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 vuotta sitten
vanhempi
commit
067355e014

+ 14 - 25
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -753,42 +753,31 @@ bool CWsWorkunitsEx::onWUQuerysets(IEspContext &context, IEspWUQuerysetsRequest
 
 void addClusterQueryStates(IPropertyTree* queriesOnCluster, const char *target, const char *id, IArrayOf<IEspClusterQueryState>& clusterStates, double version)
 {
+    queriesOnCluster = queriesOnCluster->queryPropTree("Endpoint[1]/Queries[1]");
+    if (!queriesOnCluster)
+        return;
+
+    int reporting = queriesOnCluster->getPropInt("@reporting");
+
     Owned<IEspClusterQueryState> clusterState = createClusterQueryState();
     clusterState->setCluster(target);
 
-    VStringBuffer xpath("Endpoint/Queries/Query[@id='%s']", id);
-    Owned<IPropertyTreeIterator> iter = queriesOnCluster->getElements(xpath.str());
-    bool found = false;
-    bool suspended = false;
-    bool available = false;
-    StringBuffer errors;
-    ForEach (*iter)
-    {
-        found = true;
-        if (iter->query().getPropBool("@suspended", false))
-            suspended = true;
-        else
-            available = true;
-        const char* error = iter->query().queryProp("@error");
-        if (error && *error && (version >=1.46))
-        {
-            if (errors.length())
-                errors.append(";");
-            errors.append(error);
-        }
-    }
-    if (!found)
+    VStringBuffer xpath("Query[@id='%s']", id);
+    IPropertyTree *query = queriesOnCluster->getPropTree(xpath.str());
+    int suspended = query->getPropInt("@suspended");
+    const char* error = query->queryProp("@error");
+    if (!query)
         clusterState->setState("Not Found");
     else if (suspended)
     {
         clusterState->setState("Suspended");
-        if (available)
+        if (suspended<reporting)
             clusterState->setMixedNodeStates(true);
     }
     else
         clusterState->setState("Available");
-    if ((version >=1.46) && errors.length())
-        clusterState->setErrors(errors.str());
+    if ((version >=1.46) && error && *error)
+        clusterState->setErrors(error);
 
     clusterStates.append(*clusterState.getClear());
 }

+ 26 - 12
roxie/ccd/ccdlistener.cpp

@@ -522,21 +522,29 @@ public:
             return locksGot > getNumNodes()/2;
     }
 
+    enum CascadeMergeType { CascadeMergeNone, CascadeMergeStats, CascadeMergeQueries };
+
     void doControlQuery(SocketEndpoint &ep, const char *queryText, StringBuffer &reply)
     {
         if (logctx.queryTraceLevel() > 5)
             logctx.CTXLOG("doControlQuery (%d): %.80s", isMaster, queryText);
         // By this point we should have cascade-connected thanks to a prior <control:lock>
         // So do the query ourselves and in all child threads;
-        Owned<IPropertyTree> mergedStats;
+        CascadeMergeType mergeType=CascadeMergeNone;
         if (strstr(queryText, "querystats"))
-            mergedStats.setown(createPTree("Endpoint"));
+            mergeType=CascadeMergeStats;
+        else if (strstr(queryText, ":queries"))
+            mergeType=CascadeMergeQueries;
+        Owned<IPropertyTree> mergedReply;
+        if (mergeType!=CascadeMergeNone)
+            mergedReply.setown(createPTree("Endpoint"));
 
         class casyncfor: public CAsyncFor
         {
             const char *queryText;
             CascadeManager *parent;
-            IPropertyTree *mergedStats;
+            IPropertyTree *mergedReply;
+            CascadeMergeType mergeType;
             StringBuffer &reply;
             CriticalSection crit;
             SocketEndpoint &ep;
@@ -544,9 +552,9 @@ public:
             const IRoxieContextLogger &logctx;
 
         public:
-            casyncfor(const char *_queryText, CascadeManager *_parent, IPropertyTree *_mergedStats,
+            casyncfor(const char *_queryText, CascadeManager *_parent, IPropertyTree *_mergedReply, CascadeMergeType _mergeType,
                       StringBuffer &_reply, SocketEndpoint &_ep, unsigned _numChildren, const IRoxieContextLogger &_logctx)
-                : queryText(_queryText), parent(_parent), mergedStats(_mergedStats), reply(_reply), ep(_ep), numChildren(_numChildren), logctx(_logctx)
+                : queryText(_queryText), parent(_parent), mergedReply(_mergedReply), mergeType(_mergeType), reply(_reply), ep(_ep), numChildren(_numChildren), logctx(_logctx)
             {
             }
             void Do(unsigned i)
@@ -571,9 +579,12 @@ public:
                     ForEach(*meat)
                     {
                         CriticalBlock cb(crit);
-                        if (mergedStats)
+                        if (mergedReply)
                         {
-                            mergeStats(mergedStats, &meat->query());
+                            if (mergeType == CascadeMergeStats)
+                                mergeStats(mergedReply, &meat->query());
+                            else if (mergeType == CascadeMergeQueries)
+                                mergeQueries(mergedReply, &meat->query());
                         }
                         else
                             toXML(&meat->query(), reply);
@@ -601,19 +612,22 @@ public:
                 }
                 myReply.append("</Endpoint>\n");
                 CriticalBlock cb(crit);
-                if (mergedStats)
+                if (mergedReply)
                 {
                     Owned<IPropertyTree> xml = createPTreeFromXMLString(myReply);
-                    mergeStats(mergedStats, xml);
+                    if (mergeType == CascadeMergeStats)
+                        mergeStats(mergedReply, xml);
+                    else if (mergeType == CascadeMergeQueries)
+                        mergeQueries(mergedReply, xml);
                 }
                 else
                     reply.append(myReply);
             }
-        } afor(queryText, this, mergedStats, reply, ep, activeChildren.ordinality(), logctx);
+        } afor(queryText, this, mergedReply, mergeType, reply, ep, activeChildren.ordinality(), logctx);
         afor.For(activeChildren.ordinality()+(isMaster ? 0 : 1), 10);
         activeChildren.kill();
-        if (mergedStats)
-            toXML(mergedStats, reply);
+        if (mergedReply)
+            toXML(mergedReply, reply);
         if (logctx.queryTraceLevel() > 5)
             logctx.CTXLOG("doControlQuery (%d) finished: %.80s", isMaster, queryText);
     }

+ 30 - 1
roxie/ccd/ccdstate.cpp

@@ -1601,7 +1601,7 @@ private:
     void getQueryInfo(IPropertyTree *control, StringBuffer &reply, bool full, const IRoxieContextLogger &logctx) const
     {
         Owned<IPropertyTreeIterator> ids = control->getElements("Query");
-        reply.append("<Queries>\n");
+        reply.append("<Queries reporting='1'>\n");
         if (ids->first())
         {
             ForEach(*ids)
@@ -2630,6 +2630,35 @@ void mergeStats(IPropertyTree *s1, IPropertyTree *s2)
     mergeStats(s1, s2, 0);
 }
 
+void mergeQueries(IPropertyTree *dest, IPropertyTree *src)
+{
+    IPropertyTree *destQueries = ensurePTree(dest, "Queries");
+    IPropertyTree *srcQueries = src->queryPropTree("Queries");
+    if (!srcQueries)
+        return;
+    destQueries->setPropInt("@reporting", destQueries->getPropInt("@reporting") + srcQueries->getPropInt("@reporting"));
+
+    Owned<IPropertyTreeIterator> it = srcQueries->getElements("Query");
+    ForEach(*it)
+    {
+        IPropertyTree *srcQuery = &it->query();
+        const char *id = srcQuery->queryProp("@id");
+        if (!id || !*id)
+            continue;
+        VStringBuffer xpath("Query[@id='%s']", id);
+        IPropertyTree *destQuery = destQueries->queryPropTree(xpath);
+        if (!destQuery)
+        {
+            destQueries->addPropTree("Query", LINK(srcQuery));
+            continue;
+        }
+        int suspended = destQuery->getPropInt("@suspended") + srcQuery->getPropInt("@suspended"); //keep count to recognize "partially suspended" queries
+        mergePTree(destQuery, srcQuery);
+        if (suspended)
+            destQuery->setPropInt("@suspended", suspended);
+    }
+}
+
 #ifdef _USE_CPPUNIT
 #include <cppunit/extensions/HelperMacros.h>
 

+ 1 - 0
roxie/ccd/ccdstate.hpp

@@ -137,6 +137,7 @@ extern void cleanupPlugins();
 
 extern void mergeStats(IPropertyTree *s1, IPropertyTree *s2, unsigned level);
 extern void mergeStats(IPropertyTree *s1, IPropertyTree *s2);
+extern void mergeQueries(IPropertyTree *s1, IPropertyTree *s2);
 
 extern const char *queryNodeFileName(const IPropertyTree &graphNode);
 extern const char *queryNodeIndexName(const IPropertyTree &graphNode);