浏览代码

Merge pull request #9801 from richardkchapman/h15448a

HPCC-16236 Multiple States for WUQuery - cassandra support

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 年之前
父节点
当前提交
ceed83cd44
共有 4 个文件被更改,包括 254 次插入19 次删除
  1. 124 10
      common/workunit/workunit.cpp
  2. 1 0
      common/workunit/wuerror.hpp
  3. 64 4
      plugins/cassandra/cassandrawu.cpp
  4. 65 5
      tools/wutool/wutool.cpp

+ 124 - 10
common/workunit/workunit.cpp

@@ -2871,6 +2871,19 @@ public:
         return root->numChildren();
     }
 
+    /**
+     * Add a filter to an xpath query, with the appropriate filter flags
+     */
+    static void appendFilterToQueryString(StringBuffer& query, int flags, const char* name, const char* value)
+    {
+        query.append('[').append(name).append('=');
+        if (flags & WUSFnocase)
+            query.append('?');
+        if (flags & WUSFwild)
+            query.append('~');
+        query.append('"').append(value).append("\"]");
+    };
+
     IConstWorkUnitIterator* getWorkUnitsSorted( WUSortField sortorder, // field to sort by (and flags for desc sort etc)
                                                 WUSortField *filters,   // NULL or list of fields to filter on (terminated by WUSFterm)
                                                 const void *filterbuf,  // (appended) string values for filters
@@ -2881,6 +2894,69 @@ public:
                                                 ISecManager *secmgr,
                                                 ISecUser *secuser)
     {
+        class CQueryOrFilter : public CInterface
+        {
+            unsigned flags;
+            StringAttr name;
+            StringArray values;
+        public:
+            IMPLEMENT_IINTERFACE;
+            CQueryOrFilter(unsigned _flags, const char *_name, const char *_value)
+            : flags(_flags), name(_name)
+            {
+                values.appendListUniq(_value, "|");
+            };
+
+            const char* queryName() { return name.get(); };
+            unsigned querySearchFlags() { return flags; };
+            const StringArray& queryValues() const { return values; };
+        };
+        class CMultiPTreeIterator : public CInterfaceOf<IPropertyTreeIterator>
+        {
+        public:
+            virtual bool first() override
+            {
+                curSource = 0;
+                while (sources.isItem(curSource))
+                {
+                    if (sources.item(curSource).first())
+                        return true;
+                    curSource++;
+                }
+                return false;
+            }
+            virtual bool next() override
+            {
+                if (sources.isItem(curSource))
+                {
+                    if (sources.item(curSource).next())
+                        return true;
+                    curSource++;
+                    while (sources.isItem(curSource))
+                    {
+                        if (sources.item(curSource).first())
+                            return true;
+                        curSource++;
+                    }
+                }
+                return false;
+            }
+            virtual bool isValid() override
+            {
+                return sources.isItem(curSource);
+            }
+            virtual IPropertyTree & query() override
+            {
+                return sources.item(curSource).query();
+            }
+            void addSource(IPropertyTreeIterator &source)
+            {
+                sources.append(source);
+            }
+        private:
+            IArrayOf<IPropertyTreeIterator> sources;
+            unsigned curSource = 0;
+        };
         class CWorkUnitsPager : public CSimpleInterface, implements IElementsPager
         {
             StringAttr xPath;
@@ -2888,12 +2964,13 @@ public:
             StringAttr nameFilterLo;
             StringAttr nameFilterHi;
             StringArray unknownAttributes;
+            Owned<CQueryOrFilter> orFilter;
 
         public:
             IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-            CWorkUnitsPager(const char* _xPath, const char *_sortOrder, const char* _nameFilterLo, const char* _nameFilterHi, StringArray& _unknownAttributes)
-                : xPath(_xPath), sortOrder(_sortOrder), nameFilterLo(_nameFilterLo), nameFilterHi(_nameFilterHi)
+            CWorkUnitsPager(const char* _xPath, CQueryOrFilter* _orFilter, const char *_sortOrder, const char* _nameFilterLo, const char* _nameFilterHi, StringArray& _unknownAttributes)
+                : xPath(_xPath), orFilter(_orFilter), sortOrder(_sortOrder), nameFilterLo(_nameFilterLo), nameFilterHi(_nameFilterHi)
             {
                 ForEachItemIn(x, _unknownAttributes)
                     unknownAttributes.append(_unknownAttributes.item(x));
@@ -2903,7 +2980,36 @@ public:
                 Owned<IRemoteConnection> conn = querySDS().connect("WorkUnits", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
                 if (!conn)
                     return NULL;
-                Owned<IPropertyTreeIterator> iter = conn->getElements(xPath);
+                Owned<IPropertyTreeIterator> iter;
+                if (!orFilter)
+                {
+                    iter.setown(conn->getElements(xPath.get()));
+                }
+                else
+                {
+                    Owned <CMultiPTreeIterator> multi = new CMultiPTreeIterator;
+                    bool added = false;
+                    const char* fieldName = orFilter->queryName();
+                    unsigned flags = orFilter->querySearchFlags();
+                    const StringArray& values = orFilter->queryValues();
+                    ForEachItemIn(i, values)
+                    {
+                        StringBuffer path = xPath.get();
+                        const char* value = values.item(i);
+                        if (!isEmptyString(value))
+                        {
+                            appendFilterToQueryString(path, flags, fieldName, value);
+                            IPropertyTreeIterator *itr = conn->getElements(path.str());
+                            if (itr)
+                            {
+                                multi->addSource(*itr);
+                                added = true;
+                            }
+                        }
+                    }
+                    if (added)
+                        iter.setown(multi.getClear());
+                }
                 if (!iter)
                     return NULL;
                 sortElements(iter, sortOrder.get(), nameFilterLo.get(), nameFilterHi.get(), unknownAttributes, elements);
@@ -2946,6 +3052,7 @@ public:
                 return ret;
             }
         };
+        Owned<CQueryOrFilter> orFilter;
         Owned<ISortedElementsTreeFilter> sc = new CScopeChecker(secmgr,secuser);
         StringBuffer query;
         StringBuffer so;
@@ -2984,12 +3091,19 @@ public:
                 }
                 else
                 {
-                    query.append('[').append(getEnumText(subfmt,workunitSortFields)).append('=');
-                    if (fmt&WUSFnocase)
-                        query.append('?');
-                    if (fmt&WUSFwild)
-                        query.append('~');
-                    query.append('"').append(fv).append("\"]");
+                    const char* fieldName = getEnumText(subfmt,workunitSortFields);
+                    if (!strchr(fv, '|'))
+                        appendFilterToQueryString(query, fmt, fieldName, fv);
+                    else if (orFilter)
+                        throw MakeStringException(WUERR_InvalidUserInput, "Multiple OR filters not allowed");
+                    else
+                    {
+                        if (!strieq(fieldName, getEnumText(WUSFstate,workunitSortFields)) &&
+                            !strieq(fieldName, getEnumText(WUSFuser,workunitSortFields)) &&
+                            !strieq(fieldName, getEnumText(WUSFcluster,workunitSortFields)))
+                            throw MakeStringException(WUERR_InvalidUserInput, "OR filters not allowed for %s", fieldName);
+                        orFilter.setown(new CQueryOrFilter(fmt, fieldName, fv));
+                    }
                 }
                 fv = fv + strlen(fv)+1;
             }
@@ -3010,7 +3124,7 @@ public:
             so.append(getEnumText(sortorder&0xff,workunitSortFields));
         }
         IArrayOf<IPropertyTree> results;
-        Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
+        Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), orFilter.getClear(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
         Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,secmgr?sc:NULL,"",cachehint,results,total,NULL);
         return new CConstWUArrayIterator(results);
     }

+ 1 - 0
common/workunit/wuerror.hpp

@@ -52,4 +52,5 @@
 #define WUERR_WorkunitPluginError               5027
 #define WUERR_WorkunitVersionMismatch           5028
 #define WUERR_InvalidFieldUsage                 5029
+#define WUERR_InvalidUserInput                  5030
 #endif

+ 64 - 4
plugins/cassandra/cassandrawu.cpp

@@ -1526,7 +1526,7 @@ public:
     {
         return field;
     }
-private:
+protected:
     const char *xpath;
     StringAttr pattern;
     StringAttr value;
@@ -1534,6 +1534,35 @@ private:
     bool wild;
 };
 
+class MultiValuePostFilter : public PostFilter
+{
+public:
+    MultiValuePostFilter(WUSortField _field, const char *_value)
+      : PostFilter(_field, _value, false)
+    {
+        setValue(_value);
+    }
+    virtual bool matches(IPTree &p) const
+    {
+        const char *val = p.queryProp(xpath);
+        if (val)
+        {
+            ForEachItemIn(idx, values)
+            {
+                if (strieq(val, values.item(idx)))
+                    return true;
+            }
+        }
+        return false;
+    }
+    void setValue(const char *_value)
+    {
+        values.appendList(_value, "|");
+    }
+private:
+    StringArray values;
+};
+
 class AppValuePostFilter : public CInterfaceOf<IPostFilter>
 {
 public:
@@ -3417,6 +3446,8 @@ public:
                         if (s.length())
                             remoteWildFilters.append(*new PostFilter(field, s, true));  // Trailing-only wildcards can be done remotely
                     }
+                    else if (strchr(fv, '|'))
+                        goodFilters.append(*new MultiValuePostFilter(field, fv));
                     else
                         goodFilters.append(*new PostFilter(field, fv, false));
                     break;
@@ -3424,7 +3455,10 @@ public:
                 case WUSFpriority:
                 case WUSFprotected:
                     // These can't be wild, but are not very good filters
-                    poorFilters.append(*new PostFilter(field, fv, false));
+                    if (strchr(fv, '|'))
+                        poorFilters.append(*new MultiValuePostFilter(field, fv));
+                    else
+                        poorFilters.append(*new PostFilter(field, fv, false));
                     break;
                 case WUSFwuid: // Acts as wuidLo when specified as a filter
                 case WUSFwuidhigh:
@@ -3515,14 +3549,40 @@ public:
                 merger->addPostFilters(poorFilters, 0);
                 merger->addPostFilters(remoteWildFilters, 0);
                 const IPostFilter &best = goodFilters.item(0);
-                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                const char *queryValue = best.queryValue();
+                if (strchr(queryValue, '|'))
+                {
+                    StringArray values;
+                    values.appendListUniq(queryValue, "|");
+                    ForEachItemIn(vidx, values)
+                    {
+                        const char *thisValue = values.item(vidx);
+                        if (!isEmptyString(thisValue))
+                            merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                    }
+                }
+                else
+                    merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
             }
             else if (poorFilters.length())
             {
                 merger->addPostFilters(poorFilters, 1);
                 merger->addPostFilters(remoteWildFilters, 0);
                 const IPostFilter &best= poorFilters.item(0);
-                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                const char *queryValue =best.queryValue();
+                if (strchr(queryValue, '|'))
+                {
+                    StringArray values;
+                    values.appendListUniq(queryValue, "|");
+                    ForEachItemIn(vidx, values)
+                    {
+                        const char *thisValue = values.item(vidx);
+                        if (!isEmptyString(thisValue))
+                            merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                    }
+                }
+                else
+                    merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
             }
             else if (remoteWildFilters.length())
             {

+ 65 - 5
tools/wutool/wutool.cpp

@@ -487,7 +487,7 @@ class WuTool : public CppUnit::TestFixture
         CPPUNIT_TEST(testQuery);
         CPPUNIT_TEST(testGraph);
         CPPUNIT_TEST(testGraphProgress);
-        CPPUNIT_TEST(testGlobal); 
+        CPPUNIT_TEST(testGlobal);
     CPPUNIT_TEST_SUITE_END();
 protected:
     static StringArray wuids;
@@ -1329,16 +1329,76 @@ protected:
         WUSortField sortByOwner[] = { WUSFuser, WUSFstate, WUSFterm };
         start = msTick();
         wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0completed", 0, 10000, NULL, NULL));
-        numIterated = 0;
+        unsigned numCompleted = 0;
         ForEach(*wus)
         {
             IConstWorkUnitInfo &wu = wus->query();
             ASSERT(streq(wu.queryUser(), "WuTestUser00"));
             ASSERT(wu.getState()==WUStateCompleted);
-            numIterated++;
+            numCompleted++;
+        }
+        DBGLOG("%d owned,completed workunits listed the hard way in %d ms", numCompleted, msTick()-start);
+        ASSERT(numCompleted > 0);  // Not sure what the exact answer should be! Around 5/6 of 1/50th ?
+
+        start = msTick();
+        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0scheduled", 0, 10000, NULL, NULL));
+        unsigned numScheduled = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
+            ASSERT(wu.getState()==WUStateScheduled);
+            numScheduled++;
+        }
+        DBGLOG("%d owned,scheduled workunits listed the hard way in %d ms", numScheduled, msTick()-start);
+        ASSERT(numScheduled > 0);
+        ASSERT(numScheduled + numCompleted == (testSize+49)/50);
+
+        // Use multiple states
+        DBGLOG("Testing multiple states");
+        start = msTick();
+        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0completed|scheduled", 0, 10000, NULL, NULL));
+        unsigned numCompleted2 = 0;
+        unsigned numScheduled2 = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
+            ASSERT((wu.getState()==WUStateCompleted) || (wu.getState()==WUStateScheduled));
+            if ((wu.getState()==WUStateCompleted))
+                numCompleted2++;
+            else
+                numScheduled2++;
+        }
+        DBGLOG("%d owned,completed and %d owned,scheduled workunits listed via multiple in %d ms", numCompleted2, numScheduled2, msTick()-start);
+        ASSERT(numCompleted == numCompleted2);
+        ASSERT(numScheduled == numScheduled2);
+
+        // Use multiple states only
+        DBGLOG("Testing multiple states only");
+        start = msTick();
+        WUSortField filterByState[] = { WUSFstate, WUSFterm };
+        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), filterByState, "completed|scheduled", 0, 10000, NULL, NULL));
+        unsigned numCompleted3 = 0;
+        unsigned numScheduled3 = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT((wu.getState()==WUStateCompleted) || (wu.getState()==WUStateScheduled));
+            if (strlen(wu.queryUser()) > 10 && memcmp(wu.queryUser(), "WuTestUser", 10)==0)  // Ignore any wu's not part of the test set
+            {
+                if ((wu.getState()==WUStateCompleted))
+                    numCompleted3++;
+                else
+                    numScheduled3++;
+            }
         }
-        DBGLOG("%d owned workunits listed the hard way in %d ms", numIterated, msTick()-start);
-        ASSERT(numIterated <= (testSize+49)/50);  // Not sure what the exact answer should be!
+        DBGLOG("%d completed and %d scheduled workunits listed via multiple in %d ms", numCompleted3, numScheduled3, msTick()-start);
+        ASSERT(numCompleted3 >= numCompleted2);
+        ASSERT(numScheduled3 >= numScheduled2);
+        ASSERT(numCompleted3 + numScheduled3 == testSize);
+
+
 
         // Get Scheduled Workunits
         start = msTick();