Jelajahi Sumber

HPCC-16236 Multiple States for WUQuery - cassandra support

Also refactored the unit test code to be more consistent with existing tests,
refactored dali version in workunit.cpp to bemore efficient (and cleaner) and
test some additional cases.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 8 tahun lalu
induk
melakukan
dc322d730d
3 mengubah file dengan 216 tambahan dan 148 penghapusan
  1. 87 66
      common/workunit/workunit.cpp
  2. 64 4
      plugins/cassandra/cassandrawu.cpp
  3. 65 78
      tools/wutool/wutool.cpp

+ 87 - 66
common/workunit/workunit.cpp

@@ -2871,6 +2871,19 @@ public:
         return root->numChildren();
     }
 
+    /**
+     * Add a filter to an xpath query, with the appropriate filter flags
+     */
+    static void appendFilterToQueryString(StringBuffer& query, int flags, const char* name, const char* value)
+    {
+        query.append('[').append(name).append('=');
+        if (flags & WUSFnocase)
+            query.append('?');
+        if (flags & WUSFwild)
+            query.append('~');
+        query.append('"').append(value).append("\"]");
+    };
+
     IConstWorkUnitIterator* getWorkUnitsSorted( WUSortField sortorder, // field to sort by (and flags for desc sort etc)
                                                 WUSortField *filters,   // NULL or list of fields to filter on (terminated by WUSFterm)
                                                 const void *filterbuf,  // (appended) string values for filters
@@ -2881,66 +2894,68 @@ public:
                                                 ISecManager *secmgr,
                                                 ISecUser *secuser)
     {
-        class CQueryOrFilter : public CInterface, implements IInterface
+        class CQueryOrFilter : public CInterface
         {
-            int fmt;
+            unsigned flags;
             StringAttr name;
             StringArray values;
         public:
             IMPLEMENT_IINTERFACE;
+            CQueryOrFilter(unsigned _flags, const char *_name, const char *_value)
+            : flags(_flags), name(_name)
+            {
+                values.appendListUniq(_value, "|");
+            };
 
-            CQueryOrFilter() {};
-
-            void setName(const char* _name) { name.set(_name); };
-            void setFormat(int _fmt) { fmt = _fmt; };
-            void appendValue(const char* _value) { values.append(_value); };
-            const char* getName() { return name.get(); };
-            int getFormat() { return fmt; };
-            StringArray& getValues() { return values; };
+            const char* queryName() { return name.get(); };
+            unsigned querySearchFlags() { return flags; };
+            const StringArray& queryValues() const { return values; };
         };
-        class CWorkUnitsPagerHelper : public CInterface, implements IInterface
+        class CMultiPTreeIterator : public CInterfaceOf<IPropertyTreeIterator>
         {
         public:
-            IMPLEMENT_IINTERFACE;
-
-            CWorkUnitsPagerHelper() {};
-            void setOrFilter(CQueryOrFilter& orFilter, int fmt, const char* name, const char* value, const char* sep)
+            virtual bool first() override
             {
-                if (!strieq(name, getEnumText(WUSFstate,workunitSortFields)) && !strieq(name, getEnumText(WUSFuser,workunitSortFields))
-                    && !strieq(name, getEnumText(WUSFcluster,workunitSortFields)))
-                    throw MakeStringException(WUERR_InvalidUserInput, "OR filters not allowed for %s", name);
-
-                StringArray vlist;
-                vlist.appendListUniq(value, sep);
-                ForEachItemIn(q, vlist)
+                curSource = 0;
+                while (sources.isItem(curSource))
                 {
-                    const char* v = vlist.item(q);
-                    if (!isEmptyString(v))
-                        orFilter.appendValue(v);
+                    if (sources.item(curSource).first())
+                        return true;
+                    curSource++;
                 }
-                if (orFilter.getValues().length() < 2)
-                    throw MakeStringException(WUERR_InvalidUserInput, "Invalid OR filter %s for %s", value, name);
-
-                orFilter.setName(name);
-                orFilter.setFormat(fmt);
-            };
-            void appendFilterToQueryString(StringBuffer& query, int fmt, const char* name, const char* value)
-            {
-                query.append('[').append(name).append('=');
-                if (fmt&WUSFnocase)
-                    query.append('?');
-                if (fmt&WUSFwild)
-                    query.append('~');
-                query.append('"').append(value).append("\"]");
-            };
-            void addToWUTree(IPropertyTree* wuTree, IPropertyTreeIterator* from)
+                return false;
+            }
+            virtual bool next() override
             {
-                ForEach (*from)
+                if (sources.isItem(curSource))
                 {
-                    IPropertyTree& pt = from->query();
-                    wuTree->addPropTree(pt.queryName(), LINK(&pt));
+                    if (sources.item(curSource).next())
+                        return true;
+                    curSource++;
+                    while (sources.isItem(curSource))
+                    {
+                        if (sources.item(curSource).first())
+                            return true;
+                        curSource++;
+                    }
                 }
-            };
+                return false;
+            }
+            virtual bool isValid() override
+            {
+                return sources.isItem(curSource);
+            }
+            virtual IPropertyTree & query() override
+            {
+                return sources.item(curSource).query();
+            }
+            void addSource(IPropertyTreeIterator &source)
+            {
+                sources.append(source);
+            }
+        private:
+            IArrayOf<IPropertyTreeIterator> sources;
+            unsigned curSource = 0;
         };
         class CWorkUnitsPager : public CSimpleInterface, implements IElementsPager
         {
@@ -2949,7 +2964,7 @@ public:
             StringAttr nameFilterLo;
             StringAttr nameFilterHi;
             StringArray unknownAttributes;
-            CQueryOrFilter* orFilter;
+            Owned<CQueryOrFilter> orFilter;
 
         public:
             IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -2972,23 +2987,28 @@ public:
                 }
                 else
                 {
-                    CWorkUnitsPagerHelper helper;
-                    Owned<IPropertyTree> wuTree = createPTree("WorkUnits");
-
-                    const char* name = orFilter->getName();
-                    int fmt = orFilter->getFormat();
-                    StringArray& values = orFilter->getValues();
+                    Owned <CMultiPTreeIterator> multi = new CMultiPTreeIterator;
+                    bool added = false;
+                    const char* fieldName = orFilter->queryName();
+                    unsigned flags = orFilter->querySearchFlags();
+                    const StringArray& values = orFilter->queryValues();
                     ForEachItemIn(i, values)
                     {
                         StringBuffer path = xPath.get();
                         const char* value = values.item(i);
-                        helper.appendFilterToQueryString(path, fmt, name, value);
-                        Owned<IPropertyTreeIterator> itr = conn->getElements(path.str());
-                        if (itr)
-                            helper.addToWUTree(wuTree, itr);
+                        if (!isEmptyString(value))
+                        {
+                            appendFilterToQueryString(path, flags, fieldName, value);
+                            IPropertyTreeIterator *itr = conn->getElements(path.str());
+                            if (itr)
+                            {
+                                multi->addSource(*itr);
+                                added = true;
+                            }
+                        }
                     }
-                    if(wuTree->hasChildren())
-                        iter.setown(wuTree->getElements("*"));
+                    if (added)
+                        iter.setown(multi.getClear());
                 }
                 if (!iter)
                     return NULL;
@@ -3032,9 +3052,7 @@ public:
                 return ret;
             }
         };
-        CWorkUnitsPagerHelper helper;
-        bool hasOrFilter = false;
-        CQueryOrFilter orFilter;
+        Owned<CQueryOrFilter> orFilter;
         Owned<ISortedElementsTreeFilter> sc = new CScopeChecker(secmgr,secuser);
         StringBuffer query;
         StringBuffer so;
@@ -3075,13 +3093,16 @@ public:
                 {
                     const char* fieldName = getEnumText(subfmt,workunitSortFields);
                     if (!strchr(fv, '|'))
-                        helper.appendFilterToQueryString(query, fmt, fieldName, fv);
-                    else if (hasOrFilter)
+                        appendFilterToQueryString(query, fmt, fieldName, fv);
+                    else if (orFilter)
                         throw MakeStringException(WUERR_InvalidUserInput, "Multiple OR filters not allowed");
                     else
                     {
-                        helper.setOrFilter(orFilter, fmt, fieldName, fv, "|");
-                        hasOrFilter = true;
+                        if (!strieq(fieldName, getEnumText(WUSFstate,workunitSortFields)) &&
+                            !strieq(fieldName, getEnumText(WUSFuser,workunitSortFields)) &&
+                            !strieq(fieldName, getEnumText(WUSFcluster,workunitSortFields)))
+                            throw MakeStringException(WUERR_InvalidUserInput, "OR filters not allowed for %s", fieldName);
+                        orFilter.setown(new CQueryOrFilter(fmt, fieldName, fv));
                     }
                 }
                 fv = fv + strlen(fv)+1;
@@ -3103,7 +3124,7 @@ public:
             so.append(getEnumText(sortorder&0xff,workunitSortFields));
         }
         IArrayOf<IPropertyTree> results;
-        Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), hasOrFilter ? &orFilter : nullptr, so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
+        Owned<IElementsPager> elementsPager = new CWorkUnitsPager(query.str(), orFilter.getClear(), so.length()?so.str():NULL, namefilterlo.get(), namefilterhi.get(), unknownAttributes);
         Owned<IRemoteConnection> conn=getElementsPaged(elementsPager,startoffset,maxnum,secmgr?sc:NULL,"",cachehint,results,total,NULL);
         return new CConstWUArrayIterator(results);
     }

+ 64 - 4
plugins/cassandra/cassandrawu.cpp

@@ -1526,7 +1526,7 @@ public:
     {
         return field;
     }
-private:
+protected:
     const char *xpath;
     StringAttr pattern;
     StringAttr value;
@@ -1534,6 +1534,35 @@ private:
     bool wild;
 };
 
+class MultiValuePostFilter : public PostFilter
+{
+public:
+    MultiValuePostFilter(WUSortField _field, const char *_value)
+      : PostFilter(_field, _value, false)
+    {
+        setValue(_value);
+    }
+    virtual bool matches(IPTree &p) const
+    {
+        const char *val = p.queryProp(xpath);
+        if (val)
+        {
+            ForEachItemIn(idx, values)
+            {
+                if (strieq(val, values.item(idx)))
+                    return true;
+            }
+        }
+        return false;
+    }
+    void setValue(const char *_value)
+    {
+        values.appendList(_value, "|");
+    }
+private:
+    StringArray values;
+};
+
 class AppValuePostFilter : public CInterfaceOf<IPostFilter>
 {
 public:
@@ -3417,6 +3446,8 @@ public:
                         if (s.length())
                             remoteWildFilters.append(*new PostFilter(field, s, true));  // Trailing-only wildcards can be done remotely
                     }
+                    else if (strchr(fv, '|'))
+                        goodFilters.append(*new MultiValuePostFilter(field, fv));
                     else
                         goodFilters.append(*new PostFilter(field, fv, false));
                     break;
@@ -3424,7 +3455,10 @@ public:
                 case WUSFpriority:
                 case WUSFprotected:
                     // These can't be wild, but are not very good filters
-                    poorFilters.append(*new PostFilter(field, fv, false));
+                    if (strchr(fv, '|'))
+                        poorFilters.append(*new MultiValuePostFilter(field, fv));
+                    else
+                        poorFilters.append(*new PostFilter(field, fv, false));
                     break;
                 case WUSFwuid: // Acts as wuidLo when specified as a filter
                 case WUSFwuidhigh:
@@ -3515,14 +3549,40 @@ public:
                 merger->addPostFilters(poorFilters, 0);
                 merger->addPostFilters(remoteWildFilters, 0);
                 const IPostFilter &best = goodFilters.item(0);
-                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                const char *queryValue = best.queryValue();
+                if (strchr(queryValue, '|'))
+                {
+                    StringArray values;
+                    values.appendListUniq(queryValue, "|");
+                    ForEachItemIn(vidx, values)
+                    {
+                        const char *thisValue = values.item(vidx);
+                        if (!isEmptyString(thisValue))
+                            merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                    }
+                }
+                else
+                    merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
             }
             else if (poorFilters.length())
             {
                 merger->addPostFilters(poorFilters, 1);
                 merger->addPostFilters(remoteWildFilters, 0);
                 const IPostFilter &best= poorFilters.item(0);
-                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                const char *queryValue =best.queryValue();
+                if (strchr(queryValue, '|'))
+                {
+                    StringArray values;
+                    values.appendListUniq(queryValue, "|");
+                    ForEachItemIn(vidx, values)
+                    {
+                        const char *thisValue = values.item(vidx);
+                        if (!isEmptyString(thisValue))
+                            merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), thisValue, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                    }
+                }
+                else
+                    merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(best.queryXPath(), best.queryValue(), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
             }
             else if (remoteWildFilters.length())
             {

+ 65 - 78
tools/wutool/wutool.cpp

@@ -474,7 +474,6 @@ class WuTool : public CppUnit::TestFixture
         CPPUNIT_TEST(testValidate);
         CPPUNIT_TEST(testList);
         CPPUNIT_TEST(testList2);
-        CPPUNIT_TEST(testList3);
         CPPUNIT_TEST(testListByAppValue);
         CPPUNIT_TEST(testListByAppValueWild);
         CPPUNIT_TEST(testListByFilesRead);
@@ -488,7 +487,7 @@ class WuTool : public CppUnit::TestFixture
         CPPUNIT_TEST(testQuery);
         CPPUNIT_TEST(testGraph);
         CPPUNIT_TEST(testGraphProgress);
-        CPPUNIT_TEST(testGlobal); 
+        CPPUNIT_TEST(testGlobal);
     CPPUNIT_TEST_SUITE_END();
 protected:
     static StringArray wuids;
@@ -1330,16 +1329,76 @@ protected:
         WUSortField sortByOwner[] = { WUSFuser, WUSFstate, WUSFterm };
         start = msTick();
         wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0completed", 0, 10000, NULL, NULL));
-        numIterated = 0;
+        unsigned numCompleted = 0;
         ForEach(*wus)
         {
             IConstWorkUnitInfo &wu = wus->query();
             ASSERT(streq(wu.queryUser(), "WuTestUser00"));
             ASSERT(wu.getState()==WUStateCompleted);
-            numIterated++;
+            numCompleted++;
+        }
+        DBGLOG("%d owned,completed workunits listed the hard way in %d ms", numCompleted, msTick()-start);
+        ASSERT(numCompleted > 0);  // Not sure what the exact answer should be! Around 5/6 of 1/50th ?
+
+        start = msTick();
+        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0scheduled", 0, 10000, NULL, NULL));
+        unsigned numScheduled = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
+            ASSERT(wu.getState()==WUStateScheduled);
+            numScheduled++;
+        }
+        DBGLOG("%d owned,scheduled workunits listed the hard way in %d ms", numScheduled, msTick()-start);
+        ASSERT(numScheduled > 0);
+        ASSERT(numScheduled + numCompleted == (testSize+49)/50);
+
+        // Use multiple states
+        DBGLOG("Testing multiple states");
+        start = msTick();
+        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0completed|scheduled", 0, 10000, NULL, NULL));
+        unsigned numCompleted2 = 0;
+        unsigned numScheduled2 = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
+            ASSERT((wu.getState()==WUStateCompleted) || (wu.getState()==WUStateScheduled));
+            if ((wu.getState()==WUStateCompleted))
+                numCompleted2++;
+            else
+                numScheduled2++;
+        }
+        DBGLOG("%d owned,completed and %d owned,scheduled workunits listed via multiple in %d ms", numCompleted2, numScheduled2, msTick()-start);
+        ASSERT(numCompleted == numCompleted2);
+        ASSERT(numScheduled == numScheduled2);
+
+        // Use multiple states only
+        DBGLOG("Testing multiple states only");
+        start = msTick();
+        WUSortField filterByState[] = { WUSFstate, WUSFterm };
+        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), filterByState, "completed|scheduled", 0, 10000, NULL, NULL));
+        unsigned numCompleted3 = 0;
+        unsigned numScheduled3 = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT((wu.getState()==WUStateCompleted) || (wu.getState()==WUStateScheduled));
+            if (strlen(wu.queryUser()) > 10 && memcmp(wu.queryUser(), "WuTestUser", 10)==0)  // Ignore any wu's not part of the test set
+            {
+                if ((wu.getState()==WUStateCompleted))
+                    numCompleted3++;
+                else
+                    numScheduled3++;
+            }
         }
-        DBGLOG("%d owned workunits listed the hard way in %d ms", numIterated, msTick()-start);
-        ASSERT(numIterated <= (testSize+49)/50);  // Not sure what the exact answer should be!
+        DBGLOG("%d completed and %d scheduled workunits listed via multiple in %d ms", numCompleted3, numScheduled3, msTick()-start);
+        ASSERT(numCompleted3 >= numCompleted2);
+        ASSERT(numScheduled3 >= numScheduled2);
+        ASSERT(numCompleted3 + numScheduled3 == testSize);
+
+
 
         // Get Scheduled Workunits
         start = msTick();
@@ -1571,78 +1630,6 @@ protected:
         ASSERT(numIterated == before);
     }
 
-    void testList3()
-    {
-        // Now by owner and 2 states via generic mechanism
-        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-        unsigned start = msTick();
-        Owned<IConstWorkUnitIterator> wus;
-        __int64 cachehint = 0;
-        unsigned startRow = 0;
-
-        unsigned numCompleted = 0;
-        unsigned numFailed = 0;
-        WUSortField sortByOwner[] = { WUSFuser, WUSFstate, WUSFterm };
-
-        unsigned before = factory->numWorkUnits();
-        if (before > 0)
-        {
-            start = msTick();
-            wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0completed|failed", 0, 10000, NULL, NULL));
-            ForEach(*wus)
-            {
-                IConstWorkUnitInfo &wu = wus->query();
-                ASSERT(streq(wu.queryUser(), "WuTestUser00"));
-                ASSERT((wu.getState()==WUStateCompleted) || (wu.getState()==WUStateFailed));
-                if ((wu.getState()==WUStateCompleted))
-                    numCompleted++;
-                else
-                    numFailed++;
-            }
-            DBGLOG("%d completed workunits listed and %d failed workunits listed in %d ms", numCompleted, numFailed, msTick()-start);
-        }
-
-        StringArray wuidsNew;
-        unsigned newCompleted = 2;
-        unsigned newFailed = 3;
-        for (int i = 0; i < newCompleted + newFailed; i++)
-        {
-            VStringBuffer userId("WuTestUser00");
-            VStringBuffer clusterName("WuTestCluster0");
-            Owned<IWorkUnit>wu = factory->createWorkUnit("WuTest", NULL, NULL, NULL);
-            if (i % 2)
-                wu->setState(WUStateCompleted);
-            else
-                wu->setState(WUStateFailed);
-            wu->setUser(userId);
-            wu->setClusterName(clusterName);
-            wuidsNew.append(wu->queryWuid());
-        }
-        DBGLOG("Temporarily added %d completed workunits and %d failed workunits only for this test.", newCompleted, newFailed);
-
-        unsigned numCompletedWithNew = 0;
-        unsigned numFailedWithNew = 0;
-        start = msTick();
-        wus.setown(factory->getWorkUnitsSorted((WUSortField) (WUSFwuid | WUSFreverse), sortByOwner, "WuTestUser00\0completed|failed", 0, 10000, NULL, NULL));
-        ForEach(*wus)
-        {
-            IConstWorkUnitInfo &wu = wus->query();
-            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
-            ASSERT((wu.getState()==WUStateCompleted) || (wu.getState()==WUStateFailed));
-            if ((wu.getState()==WUStateCompleted))
-                numCompletedWithNew++;
-            else
-                numFailedWithNew++;
-        }
-        DBGLOG("%d completed workunits listed and %d failed workunits listed in %d ms", numCompletedWithNew, numFailedWithNew, msTick()-start);
-        ASSERT((numCompletedWithNew==numCompleted + newCompleted) && (numFailedWithNew==numFailed + newFailed));
-
-        for (int i = 0; i < newCompleted + newFailed; i++)
-        {
-            factory->deleteWorkUnit(wuidsNew.item(i));
-        }
-    }
-
     void testListByAppValue()
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();