Quellcode durchsuchen

HPCC-22498 Filter WUs on Total Cluster Time

Similar function has been implemented in cassandrawu.
One bug is fixed in the fetchDataByThorTime() of the
cassandrawu.cpp: the 'where fieldValue' in the existing
line 4302 should be changed to 'and fieldValue'.
wangkx vor 6 Jahren
Ursprung
Commit
9310ea5225

+ 6 - 0
common/workunit/workunit.cpp

@@ -5705,6 +5705,12 @@ public:
                         query.append("=?~\"").append(fv).append('\"');
                     query.append("]");
                 }
+                else if (subfmt==WUSFtotalthortime)
+                {
+                    query.append("[@totalThorTime>=\"");
+                    formatTimeCollatable(query, milliToNano(atoi(fv)), false);
+                    query.append("\"]");
+                }
                 else if (!*fv)
                 {
                     unknownAttributes.append(getEnumText(subfmt,workunitSortFields));

+ 1 - 0
esp/scm/ws_workunits.ecm

@@ -507,6 +507,7 @@ ESPrequest [nil_remove] WUQueryRequest
     [min_ver("1.57")] ESParray<ESPstruct ApplicationValue> ApplicationValues;
     [min_ver("1.72")] string BeforeWU;
     [min_ver("1.72")] string AfterWU;
+    [min_ver("1.77")] unsigned TotalClusterTimeThresholdMilliSec(0);
 
     [depr_ver("1.02")] string After; //Not used since 1.02
     [depr_ver("1.02")] string Before; //Not used since 1.02

+ 13 - 0
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -1751,6 +1751,17 @@ bool addWUQueryFilterTime(WUSortField *filters, unsigned short &count, MemoryBuf
     return true;
 }
 
+bool addWUQueryFilterTotalClusterTime(WUSortField *filters, unsigned short &count, MemoryBuffer &filterBuf, unsigned milliseconds, WUSortField value)
+{
+    if (milliseconds == 0)
+        return false;
+
+    VStringBuffer vBuf("%u", milliseconds);
+    filters[count++] = value;
+    filterBuf.append(vBuf);
+    return true;
+}
+
 bool addWUQueryFilterApplication(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *appname, const char *appkey, const char *appdata)
 {
     if (isEmpty(appname))
@@ -1858,6 +1869,8 @@ void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQue
     addWUQueryFilter(filters, filterCount, filterbuf, req.getJobname(), (WUSortField) (WUSFjob | WUSFnocase));
     addWUQueryFilter(filters, filterCount, filterbuf, req.getECL(), (WUSortField) (WUSFecl | WUSFwild));
 
+    addWUQueryFilterTotalClusterTime(filters, filterCount, filterbuf, req.getTotalClusterTimeThresholdMilliSec(), WUSFtotalthortime);
+
     addWUQueryFilterTime(filters, filterCount, filterbuf, req.getStartDate(), WUSFwuid);
     addWUQueryFilterTime(filters, filterCount, filterbuf, req.getEndDate(), WUSFwuidhigh);
     if (version < 1.55)

+ 20 - 4
plugins/cassandra/cassandrawu.cpp

@@ -3417,6 +3417,22 @@ public:
         bool sortByThorTime = (baseSort == WUSFtotalthortime);
         bool needsPostSort = (baseSort != WUSFwuid && baseSort != WUSFtotalthortime);
         bool sortDescending = (sortorder & WUSFreverse) || needsPostSort;
+
+        bool keepThorTimeFilter = sortByThorTime;
+        if (!keepThorTimeFilter)
+        {
+            const WUSortField *filterPtr = filters;
+            while (filterPtr && *filterPtr)
+            {
+                WUSortField field = (WUSortField) (*filterPtr & 0xff);
+                if (field == WUSFtotalthortime)
+                {
+                    keepThorTimeFilter = true;
+                    break;
+                }
+            }
+        }
+
         if (!result)
         {
             Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortDescending); // We always merge by wuid (except when we merge by thor time... we turn the compare off then to make it an appender)
@@ -3426,7 +3442,7 @@ public:
                 unsigned found = cached->lookupStartRow(startWuid, thorTimeThreshold, startOffset);
                 if (found)
                 {
-                    if (!sortByThorTime)
+                    if (!keepThorTimeFilter)
                     {
                         if (sortDescending)
                             startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1);  // we want to find the last wuid BEFORE
@@ -3492,7 +3508,7 @@ public:
                 case WUSFwuid: // Acts as wuidLo when specified as a filter
                 case WUSFwuidhigh:
                     // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
-                    if (sortByThorTime)
+                    if (keepThorTimeFilter)
                         remoteWildFilters.append(*new PostFilter(field, fv, true));
                     else
                         mergeFilter(wuidFilters, field, fv);
@@ -3554,7 +3570,7 @@ public:
                     merger->addResult(*new CassandraResult(fetchDataForWuid(workunitInfoMappings, value, true)));
                 }
             }
-            else if (sortByThorTime)
+            else if (sortByThorTime || !thorTimeThreshold.isEmpty())
             {
                 merger->addPostFilters(goodFilters, 0);
                 merger->addPostFilters(poorFilters, 0);
@@ -4299,7 +4315,7 @@ private:
         getFieldNames(searchMappings+3, names, tableName);  // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
         VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=?", names.str()+1, tableName.str());
         if (threshold && *threshold)
-            selectQuery.appendf(" where fieldValue >= ?");
+            selectQuery.appendf(" and fieldValue >= ?");
         if (descending)
             selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
         else