Browse Source

HPCC-23343 Support multiple options for searching Suspended Queries

Options:
1. all queries
2. Not suspended (slow)
3. Suspended (slow)
4. suspended by first node
5. suspended by any node (slow)
6. suspended by user

Signed-off-by: wangkx <kevin.wang@lexisnexis.com>
wangkx 5 years ago
parent
commit
cfabcc0ec0

+ 49 - 14
common/workunit/workunit.cpp

@@ -5330,16 +5330,17 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
                                             unsigned maxnum,
                                             __int64 *cachehint,
                                             unsigned *total,
-                                            const MapStringTo<bool> *_subset)
+                                            const MapStringTo<bool> *_subset,
+                                            const MapStringTo<bool> *_suspendedQueriesByCluster)
 {
     struct PostFilters
     {
         WUQueryFilterBoolean activatedFilter;
-        WUQueryFilterBoolean suspendedByUserFilter;
+        WUQueryFilterSuspended suspendedFilter;
         PostFilters()
         {
             activatedFilter = WUQFSAll;
-            suspendedByUserFilter = WUQFSAll;
+            suspendedFilter = WUQFAllQueries;
         };
     } postFilters;
 
@@ -5351,6 +5352,7 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
         PostFilters postFilters;
         StringArray unknownAttributes;
         const MapStringTo<bool> *subset;
+        const MapStringTo<bool> *suspendedQueriesByCluster;
 
         void populateQueryTree(const IPropertyTree* querySetTree, IPropertyTree* queryTree)
         {
@@ -5388,9 +5390,30 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
                     continue;
                 if (!activated && (postFilters.activatedFilter == WUQFSYes))
                     continue;
-                if ((postFilters.suspendedByUserFilter == WUQFSNo) && query.hasProp(getEnumText(WUQSFSuspendedByUser,querySortFields)))
-                    continue;
-                if ((postFilters.suspendedByUserFilter == WUQFSYes) && !query.hasProp(getEnumText(WUQSFSuspendedByUser,querySortFields)))
+
+                bool isSuspendedByUser = query.hasProp(getEnumText(WUQSFSuspendedByUser,querySortFields));
+                bool skip = false;
+                switch(postFilters.suspendedFilter)
+                {
+                case WUQFSUSPDNo:
+                    if (isSuspendedByUser || checkSuspendedByCluster(querySetId, queryId))
+                        skip = true;
+                    break;
+                case WUQFSUSPDYes:
+                    if (!isSuspendedByUser && !checkSuspendedByCluster(querySetId, queryId))
+                        skip = true;
+                    break;
+                case WUQFSUSPDByUser:
+                    if (!isSuspendedByUser)
+                        skip = true;
+                    break;
+                case WUQFSUSPDByFirstNode:
+                case WUQFSUSPDByAnyNode:
+                    if (!checkSuspendedByCluster(querySetId, queryId))
+                        skip = true;
+                    break;
+                }
+                if (skip)
                     continue;
 
                 IPropertyTree *queryWithSetId = queryTree->addPropTree("Query", createPTreeFromIPT(&query));
@@ -5417,14 +5440,24 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
                 populateQueryTree(root, queryTree);
             return conn.getClear();
         }
+        bool checkSuspendedByCluster(const char *querySetId, const char *queryId)
+        {
+            if (!suspendedQueriesByCluster)
+                return false;
+
+            VStringBuffer match("%s/%s", querySetId, queryId);
+            bool *found = suspendedQueriesByCluster->getValue(match);
+            return (found && *found);
+        }
     public:
         IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-        CQuerySetQueriesPager(const char* _querySet, const char* _xPath, const char *_sortOrder, PostFilters& _postFilters, StringArray& _unknownAttributes, const MapStringTo<bool> *_subset)
-            : querySet(_querySet), xPath(_xPath), sortOrder(_sortOrder), subset(_subset)
+        CQuerySetQueriesPager(const char* _querySet, const char* _xPath, const char *_sortOrder, PostFilters& _postFilters,
+            StringArray& _unknownAttributes, const MapStringTo<bool> *_subset, const MapStringTo<bool> *_suspendedQueriesByCluster)
+            : querySet(_querySet), xPath(_xPath), sortOrder(_sortOrder), subset(_subset), suspendedQueriesByCluster(_suspendedQueriesByCluster)
         {
             postFilters.activatedFilter = _postFilters.activatedFilter;
-            postFilters.suspendedByUserFilter = _postFilters.suspendedByUserFilter;
+            postFilters.suspendedFilter = _postFilters.suspendedFilter;
             ForEachItemIn(x, _unknownAttributes)
                 unknownAttributes.append(_unknownAttributes.item(x));
         }
@@ -5460,8 +5493,8 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
                 xPath.append('[').append(getEnumText(subfmt,querySortFields)).append("<=").append(fv).append("]");
             else if (subfmt==WUQSFActivited)
                 postFilters.activatedFilter = (WUQueryFilterBoolean) atoi(fv);
-            else if (subfmt==WUQSFSuspendedByUser)
-                postFilters.suspendedByUserFilter = (WUQueryFilterBoolean) atoi(fv);
+            else if (subfmt==WUQSFSuspendedFilter)
+                postFilters.suspendedFilter = (WUQueryFilterSuspended) atoi(fv);
             else if (!fv || !*fv)
                 unknownAttributes.append(getEnumText(subfmt,querySortFields));
             else {
@@ -5492,7 +5525,8 @@ IConstQuerySetQueryIterator* CWorkUnitFactory::getQuerySetQueriesSorted( WUQuery
         }
     }
     IArrayOf<IPropertyTree> results;
-    Owned<IElementsPager> elementsPager = new CQuerySetQueriesPager(querySet.get(), xPath.str(), so.length()?so.str():NULL, postFilters, unknownAttributes, _subset);
+    Owned<IElementsPager> elementsPager = new CQuerySetQueriesPager(querySet.get(), xPath.str(), so.length()?so.str():NULL,
+        postFilters, unknownAttributes, _subset, _suspendedQueriesByCluster);
     Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,NULL,"",cachehint,results,total,NULL);
     return new CConstQuerySetQueryIterator(results);
 }
@@ -6367,10 +6401,11 @@ public:
                                                 unsigned maxnum,
                                                 __int64 *cachehint,
                                                 unsigned *total,
-                                                const MapStringTo<bool> *subset)
+                                                const MapStringTo<bool> *subset,
+                                                const MapStringTo<bool> *suspendedByCluster)
     {
         // MORE - why no security?
-        return baseFactory->getQuerySetQueriesSorted(sortorder,filters,filterbuf,startoffset,maxnum,cachehint,total,subset);
+        return baseFactory->getQuerySetQueriesSorted(sortorder,filters,filterbuf,startoffset,maxnum,cachehint,total,subset,suspendedByCluster);
     }
 
     virtual unsigned numWorkUnits()

+ 13 - 1
common/workunit/workunit.hpp

@@ -1415,6 +1415,16 @@ enum WUQueryFilterBoolean
     WUQFSAll = 2
 };
 
+enum WUQueryFilterSuspended
+{
+    WUQFAllQueries = 0,//all queries including Suspended and not suspended
+    WUQFSUSPDNo = 1,
+    WUQFSUSPDYes = 2,
+    WUQFSUSPDByUser = 3,
+    WUQFSUSPDByFirstNode = 4,
+    WUQFSUSPDByAnyNode = 5
+};
+
 enum WUQuerySortField
 {
     WUQSFId = 1,
@@ -1434,6 +1444,7 @@ enum WUQuerySortField
     WUQSFSuspendedByUser = 15,
     WUQSFLibrary = 16,
     WUQSFPublishedBy = 17,
+    WUQSFSuspendedFilter = 18,
     WUQSFterm = 0,
     WUQSFreverse = 256,
     WUQSFnocase = 512,
@@ -1464,7 +1475,8 @@ interface IWorkUnitFactory : extends IPluggableFactory
     virtual unsigned numWorkUnits() = 0;
     virtual IConstWorkUnitIterator *getScheduledWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual void descheduleAllWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) = 0;
+    virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf,
+        unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset, const MapStringTo<bool> *suspendedByCluster) = 0;
     virtual bool isAborting(const char *wuid) const = 0;
     virtual void clearAborting(const char *wuid) = 0;
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, std::list<WUState> expectedStates) = 0;

+ 2 - 1
common/workunit/workunit.ipp

@@ -648,7 +648,8 @@ public:
     virtual unsigned numWorkUnits() = 0;
     virtual IConstWorkUnitIterator *getScheduledWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser);
-    virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
+    virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf,
+        unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset, const MapStringTo<bool> *suspendedByCluster);
     virtual bool isAborting(const char *wuid) const;
     virtual void clearAborting(const char *wuid);
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled,  std::list<WUState> expectedStates) = 0;

+ 1 - 1
esp/scm/ws_workunits.ecm

@@ -25,7 +25,7 @@ EspInclude(ws_workunits_queryset_req_resp);
 
 ESPservice [
     auth_feature("DEFERRED"), //This declares that the method logic handles feature level authorization
-    version("1.77"), default_client_version("1.77"), cache_group("ESPWsWUs"),
+    version("1.78"), default_client_version("1.78"), cache_group("ESPWsWUs"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
 {
     ESPmethod [cache_seconds(60), resp_xsl_default("/esp/xslt/workunits.xslt")]     WUQuery(WUQueryRequest, WUQueryResponse);

+ 2 - 1
esp/scm/ws_workunits_queryset_req_resp.ecm

@@ -200,7 +200,8 @@ ESPrequest [nil_remove] WUListQueriesRequest
     nonNegativeInteger PriorityLow;
     nonNegativeInteger PriorityHigh;
     [min_ver("1.48")] bool Activated;
-    [min_ver("1.48")] bool SuspendedByUser;
+    [min_ver("1.48"), depr_ver("1.78")] bool SuspendedByUser;
+    [min_ver("1.78")] ESPEnum WUQueryFilterSuspendedType SuspendedFilter;
     [min_ver("1.50")] string WUID;
     [min_ver("1.51")] string QueryID;
     [min_ver("1.51")] string QueryName;

+ 10 - 0
esp/scm/ws_workunits_struct.ecm

@@ -99,6 +99,16 @@ ESPenum WUQuerySetFilterType : string
     STATUS("Status")
 };
 
+ESPenum WUQueryFilterSuspendedType : string
+{
+    ALL("All queries"),
+    NOTSUSPD("Not suspended"),
+    SUSPD("Suspended"),
+    SUSPDBYUSER("Suspended by user"),
+    SUSPDBYFirstNode("Suspended by first node"),
+    SUSPDBYAnyNode("Suspended by any node")
+};
+
 ESPenum QuerysetImportActivation : string
 {
     None("None"),

+ 66 - 3
esp/services/ws_workunits/ws_workunitsQuerySets.cpp

@@ -1492,8 +1492,24 @@ bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequ
         addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getPriorityHigh(), (WUQuerySortField) (WUQSFpriorityHi | WUQSFnumeric));
     if (!req.getActivated_isNull())
         addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getActivated(), (WUQuerySortField) (WUQSFActivited | WUQSFnumeric));
-    if (!req.getSuspendedByUser_isNull())
-        addWUQSQueryFilterInt(filters, filterCount, filterBuf, req.getSuspendedByUser(), (WUQuerySortField) (WUQSFSuspendedByUser | WUQSFnumeric));
+
+    MapStringTo<bool> suspendedQueriesByCluster;
+    CWUQueryFilterSuspendedType suspendedType = req.getSuspendedFilter();
+    if (suspendedType != WUQueryFilterSuspendedType_Undefined)
+    {
+        addWUQSQueryFilterInt(filters, filterCount, filterBuf, suspendedType, (WUQuerySortField) (WUQSFSuspendedFilter | WUQSFnumeric));
+        if (suspendedType == CWUQueryFilterSuspendedType_SUSPDBYFirstNode)
+        {
+            getSuspendedQueriesByCluster(suspendedQueriesByCluster, req.getQuerySetName(), req.getQueryID(), false);
+        }
+        else if ((suspendedType == CWUQueryFilterSuspendedType_SUSPDBYAnyNode) || (suspendedType == CWUQueryFilterSuspendedType_NOTSUSPD)
+            || (suspendedType == CWUQueryFilterSuspendedType_SUSPD))
+        {
+            getSuspendedQueriesByCluster(suspendedQueriesByCluster, req.getQuerySetName(), req.getQueryID(), true);
+        }
+    }
+    else if (!req.getSuspendedByUser_isNull()) //For the client before version 1.78
+        addWUQSQueryFilterInt(filters, filterCount, filterBuf, CWUQueryFilterSuspendedType_SUSPDBYUSER, (WUQuerySortField) (WUQSFSuspendedFilter | WUQSFnumeric));
     filters[filterCount] = WUQSFterm;
 
     unsigned numberOfQueries = 0;
@@ -1527,7 +1543,8 @@ bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequ
 
     PROGLOG("WUListQueries: getQuerySetQueriesSorted");
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
-    Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(), pageStartFrom, pageSize, &cacheHint, &numberOfQueries, queriesUsingFileMap);
+    Owned<IConstQuerySetQueryIterator> it = factory->getQuerySetQueriesSorted(sortOrder, filters, filterBuf.bufferBase(),
+        pageStartFrom, pageSize, &cacheHint, &numberOfQueries, queriesUsingFileMap, &suspendedQueriesByCluster);
     resp.setCacheHint(cacheHint);
     PROGLOG("WUListQueries: getQuerySetQueriesSorted done");
 
@@ -1582,6 +1599,52 @@ bool CWsWorkunitsEx::onWUListQueries(IEspContext &context, IEspWUListQueriesRequ
     return true;
 }
 
+void CWsWorkunitsEx::getSuspendedQueriesByCluster(MapStringTo<bool> &suspendedQueries, const char *querySet, const char *queryID, bool checkAllNodes)
+{
+    StringArray queryIDs;
+    if (!isEmptyString(queryID))
+        queryIDs.append(queryID);
+
+    if (!isEmptyString(querySet))
+    {
+        Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(querySet, querySet, &queryIDs, checkAllNodes);
+        addSuspendedQueryIDs(suspendedQueries, queriesOnCluster, querySet);
+    }
+    else
+    {
+        Owned<IStringIterator> targets = getTargetClusters("RoxieCluster", nullptr);
+        ForEach(*targets)
+        {
+            SCMStringBuffer target;
+            targets->str(target);
+
+            Owned<IPropertyTree> queriesOnCluster = getQueriesOnCluster(target.str(), target.str(), &queryIDs, checkAllNodes);
+            addSuspendedQueryIDs(suspendedQueries, queriesOnCluster, target.str());
+        }
+    }
+}
+
+void CWsWorkunitsEx::addSuspendedQueryIDs(MapStringTo<bool> &suspendedQueryIDs, IPropertyTree *queriesOnCluster, const char *querySet)
+{
+    if (!queriesOnCluster)
+        throw makeStringExceptionV(ECLWATCH_INTERNAL_ERROR, "getQueriesOnCluster() returns nullptr for target <%s>", querySet);
+
+    Owned<IPropertyTreeIterator> queries = queriesOnCluster->getElements("Endpoint/Queries/Query");
+    ForEach(*queries)
+    {
+        IPropertyTree &query = queries->query();
+        const char *id = query.queryProp("@id");
+        if (isEmptyString(id))
+            continue; //Should not happen
+
+        if (query.getPropInt("@suspended"))
+        {
+            VStringBuffer suspendedID("%s/%s", querySet, id);
+            suspendedQueryIDs.setValue(suspendedID, true);
+        }
+    }
+}
+
 bool CWsWorkunitsEx::onWUListQueriesUsingFile(IEspContext &context, IEspWUListQueriesUsingFileRequest &req, IEspWUListQueriesUsingFileResponse &resp)
 {
     const char *target = req.getTarget();

+ 4 - 1
esp/services/ws_workunits/ws_workunitsService.hpp

@@ -360,12 +360,15 @@ private:
         IArrayOf<IEspQueryStats> &queryStatsList);
     void readQueryStatsList(IPropertyTree *queryStatsTree, const char *status, const char *ep,
         bool all, IArrayOf<IEspEndpointQueryStats> &endpointQueryStatsList);
+
     void getWsWuResult(IEspContext &context, const char *wuid, const char *name, const char *logical, unsigned index, __int64 start,
         unsigned &count, __int64 &total, IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &mb,
         WUState &wuState, bool xsd=true);
     void getFileResults(IEspContext &context, const char *logicalName, const char *cluster, __int64 start, unsigned &count, __int64 &total,
         IStringVal &resname, bool bin, IArrayOf<IConstNamedValue> *filterBy, MemoryBuffer &buf, bool xsd);
-    IDistributedFile *lookupLogicalName(IEspContext &context, const char *logicalName);
+    IDistributedFile *lookupLogicalName(IEspContext &contcontext, const char *logicalName);
+    void getSuspendedQueriesByCluster(MapStringTo<bool> &suspendedByCluster, const char *querySet, const char *queryID, bool checkAllNodes);
+    void addSuspendedQueryIDs(MapStringTo<bool> &suspendedQueryIDs, IPropertyTree *queriesOnCluster, const char *target);
 
     unsigned awusCacheMinutes;
     StringBuffer queryDirectory;