Browse Source

HPCC-13795 Add support for filtering WUs by application values in Cassandra

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
1d108c9654

+ 17 - 24
common/workunit/workunit.cpp

@@ -2000,6 +2000,7 @@ mapEnums workunitSortFields[] =
    },
    { WUSFwuidhigh, "@" },
    { WUSFwildwuid, "@" },
+   { WUSFappvalue, "Application" },
    { WUSFterm, NULL }
 };
 
@@ -2765,8 +2766,12 @@ public:
                     namefilterhi.set(fv);
                 else if (subfmt==WUSFwildwuid)
                     namefilter.set(fv);
-                else if (subfmt==WUSFcustom)
-                    query.append("[").append(fv).append("]");
+                else if (subfmt==WUSFappvalue)
+                {
+                    query.append("[Application/").append(fv).append("=?\"");
+                    fv = fv + strlen(fv)+1;
+                    query.append(fv).append("\"]");
+                }
                 else if (!fv || !*fv)
                     unknownAttributes.append(getEnumText(subfmt,workunitSortFields));
                 else {
@@ -4211,7 +4216,6 @@ void CLocalWorkUnit::setApplicationValue(const char *app, const char *propname,
     prop.append(app).append('/').append(propname);
     if (overwrite || !p->hasProp(prop.str()))
     {
-        // MORE - not sure these lines should be needed....
         StringBuffer sp;
         p->setProp(sp.append("Application").str(), ""); 
         p->setProp(sp.append('/').append(app).str(), ""); 
@@ -4221,17 +4225,8 @@ void CLocalWorkUnit::setApplicationValue(const char *app, const char *propname,
 
 void CLocalWorkUnit::setApplicationValueInt(const char *app, const char *propname, int value, bool overwrite)
 {
-    CriticalBlock block(crit);
-    StringBuffer prop("Application/");
-    prop.append(app).append('/').append(propname);
-    if (overwrite || !p->hasProp(prop.str()))
-    {
-        // MORE - not sure these lines should be needed....
-        StringBuffer sp;
-        p->setProp(sp.append("Application").str(), ""); 
-        p->setProp(sp.append('/').append(app).str(), ""); 
-        p->setPropInt(prop.str(), value); 
-    }
+    VStringBuffer s("%d", value);
+    setApplicationValue(app, propname, s, overwrite);
 }
 
 void CLocalWorkUnit::setPriorityLevel(int level) 
@@ -8477,30 +8472,28 @@ void CLocalWUException::setExceptionColumn(unsigned c)
 
 //==========================================================================================
 
-CLocalWUAppValue::CLocalWUAppValue(IPropertyTree *props,unsigned child): p(props)
+CLocalWUAppValue::CLocalWUAppValue(IPropertyTree *props, unsigned child) : p(props)
 {
     StringAttrBuilder propPath(prop);
     propPath.append("*[").append(child).append("]");
 }
 
-IStringVal & CLocalWUAppValue::getApplication(IStringVal & str) const
+const char * CLocalWUAppValue::queryApplication() const
 {
-    str.set(p->queryName());
-    return str;
+    return p->queryName();
 }
 
-IStringVal & CLocalWUAppValue::getName(IStringVal & str) const
+const char * CLocalWUAppValue::queryName() const
 {
     IPropertyTree* val=p->queryPropTree(prop.str());
     if(val)
-        str.set(val->queryName());
-    return str;
+        return val->queryName();
+    return ""; // Should not happen in normal usage
 }
 
-IStringVal & CLocalWUAppValue::getValue(IStringVal & str) const
+const char * CLocalWUAppValue::queryValue() const
 {
-    str.set(p->queryProp(prop.str()));
-    return str;
+    return p->queryProp(prop.str());
 }
 
 //==========================================================================================

+ 5 - 4
common/workunit/workunit.hpp

@@ -766,9 +766,9 @@ interface IConstWUTimeStampIterator : extends IScmIterator
 
 interface IConstWUAppValue : extends IInterface
 {
-    virtual IStringVal & getApplication(IStringVal & str) const = 0;
-    virtual IStringVal & getName(IStringVal & str) const = 0;
-    virtual IStringVal & getValue(IStringVal & str) const = 0;
+    virtual const char *queryApplication() const = 0;
+    virtual const char *queryName() const = 0;
+    virtual const char *queryValue() const = 0;
 };
 
 
@@ -1200,7 +1200,8 @@ enum WUSortField
     WUSFtotalthortime = 11,
     WUSFwildwuid = 12,
     WUSFecl = 13,
-    WUSFcustom = 14,
+    // WUSFcustom = 14, obsolete
+    WUSFappvalue=15,
     WUSFterm = 0,
     WUSFreverse = 256,
     WUSFnocase = 512,

+ 3 - 3
common/workunit/workunit.ipp

@@ -28,9 +28,9 @@ public:
     IMPLEMENT_IINTERFACE;
     CLocalWUAppValue(IPropertyTree *p,unsigned child);
 
-    virtual IStringVal & getApplication(IStringVal & str) const;
-    virtual IStringVal & getName(IStringVal & str) const;
-    virtual IStringVal & getValue(IStringVal & dt) const;
+    virtual const char *queryApplication() const;
+    virtual const char *queryName() const;
+    virtual const char *queryValue() const;
 };
 
 

+ 63 - 5
ecl/wutest/wutest.cpp

@@ -147,11 +147,13 @@ bool dump(IConstWorkUnit &w, IProperties *globals)
 extern "C" IWorkUnitFactory *createWorkUnitFactory(const IPropertyTree *props);
 #endif
 
+Owned<IProperties> globals;
+
 int main(int argc, const char *argv[])
 {
     InitModuleObjects();
     unsigned count=0;
-    Owned<IProperties> globals = createProperties("WUTEST.INI", true);
+    globals.setown(createProperties("WUTEST.INI", true));
     for (int i = 1; i < argc; i++)
     {
         if (strchr(argv[i],'='))
@@ -206,8 +208,7 @@ int main(int argc, const char *argv[])
             queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
             CppUnit::TextUi::TestRunner runner;
             CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry("WuTest");
-            runner.addTest( registry.makeTest() );
-            bool wasSucessful = runner.run( "", false );
+            runner.addTest( registry.makeTest() );            bool wasSucessful = runner.run( "", false );
             return wasSucessful;
         }
         else
@@ -416,8 +417,11 @@ class WuTest : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE(WuTest);
         CPPUNIT_TEST(testCreate);
+        CPPUNIT_TEST(testValidate);
         CPPUNIT_TEST(testList);
         CPPUNIT_TEST(testList2);
+        CPPUNIT_TEST(testListByAppValue);
+        CPPUNIT_TEST(testListByAppValueWild);
         CPPUNIT_TEST(testSet);
         CPPUNIT_TEST(testDelete);
         CPPUNIT_TEST(testCopy);
@@ -427,6 +431,12 @@ protected:
     void testCreate()
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        if (globals->getPropBool("entire", false) && globals->getPropBool("repository", false))
+        {
+            factory->deleteRepository(false);
+            factory->createRepository();
+            DBGLOG("Repository recreated\n");
+        }
         unsigned before = factory->numWorkUnits();
         unsigned start = msTick();
         for (int i = 0; i < testSize; i++)
@@ -444,15 +454,22 @@ protected:
             if (i % 3)
                 wu->setJobName(jobName);
             wu->setStatistic(SCTsummary, "thor", SSTglobal, GLOBAL_SCOPE, StTimeElapsed, "Total thor time", ((i+2)/2) * 1000000, 1, 0, StatsMergeReplace);
+            wu->setApplicationValue("appname", "userId", userId.str(), true);
+            wu->setApplicationValue("appname2", "clusterName", clusterName.str(), true);
             wuids.append(wu->queryWuid());
         }
         unsigned after = factory->numWorkUnits();
         DBGLOG("%u workunits created in %d ms (%d total)", testSize, msTick()-start, after);
         ASSERT(after-before==testSize);
         ASSERT(wuids.length() == testSize);
-        start = msTick();
+    }
+
+    void testValidate()
+    {
+        unsigned start = msTick();
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         ASSERT(factory->validateRepository(false)==0);
-        DBGLOG("%u workunits validated in %d ms", after, msTick()-start);
+        DBGLOG("Repository validated in %d ms", msTick()-start);
     }
 
     void testCopy()
@@ -1181,6 +1198,47 @@ protected:
         DBGLOG("%d workunits descending thortime, page by page in %d ms", numIterated, msTick()-start);
         ASSERT(numIterated == testSize);
     }
+
+    void testListByAppValue()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        bool isDali = streq(factory->queryStoreType(), "Dali");
+        unsigned start = msTick();
+        unsigned numIterated = 0;
+        // Test filter by appValue
+        WUSortField filterByAppValue[] = { WUSFappvalue, WUSFterm };
+        start = msTick();
+        Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFreverse), filterByAppValue, "appname/userId\0WuTestUser00", 0, 10000, NULL, NULL);
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
+            numIterated++;
+        }
+        DBGLOG("%d workunits by appvalue in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+49)/50);
+        numIterated++;
+    }
+    void testListByAppValueWild()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        bool isDali = streq(factory->queryStoreType(), "Dali");
+        unsigned start = msTick();
+        unsigned numIterated = 0;
+        // Test filter by appValue
+        WUSortField filterByAppValueWild[] = { (WUSortField) (WUSFappvalue|WUSFwild), WUSFterm };
+        start = msTick();
+        Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFreverse), filterByAppValueWild, "appname/userId\0WuTestUser*", 0, 10000, NULL, NULL);
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser00"));
+            numIterated++;
+        }
+        DBGLOG("%d workunits by appvalue wild in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == testSize);
+        numIterated++;
+    }
 };
 StringArray WuTest::wuids;
 

+ 3 - 5
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -595,12 +595,10 @@ void WsWuInfo::getApplicationValues(IEspECLWorkunit &info, unsigned flags)
         ForEach(*app)
         {
             IConstWUAppValue& val=app->query();
-            SCMStringBuffer buf;
-
             Owned<IEspApplicationValue> t= createApplicationValue("","");
-            t->setApplication(val.getApplication(buf).str());
-            t->setName(val.getName(buf).str());
-            t->setValue(val.getValue(buf).str());
+            t->setApplication(val.queryApplication());
+            t->setName(val.queryName());
+            t->setValue(val.queryValue());
             av.append(*t.getLink());
 
         }

+ 9 - 10
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -1654,13 +1654,14 @@ bool addWUQueryFilterTime(WUSortField *filters, unsigned short &count, MemoryBuf
 
 bool addWUQueryFilterApplication(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *appname, const char *appkey, const char *appdata)
 {
-    if (isEmpty(appname) && isEmpty(appkey) && isEmpty(appdata)) //no application filter
+    if (isEmpty(appname))
+        return false;  // appname must be specified
+    if (isEmpty(appkey) && isEmpty(appdata)) //one or other is required ( MORE - see if cassandra can relaax that)
         return false;
-    VStringBuffer path("Application/%s/%s", appname && *appname ? appname : "*", appkey && *appkey ? appkey : "*");
-    if(appdata && *appdata)
-        path.append("=?~\"").append(appdata).append("\"");
-    filters[count++] = WUSFcustom;
+    VStringBuffer path("%s/%s", appname, appkey && *appkey ? appkey : "*");
     buff.append(path.str());
+    buff.append(appdata);
+    filters[count++] = WUSFappvalue;
     return true;
 }
 
@@ -1833,12 +1834,10 @@ void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQue
             ForEach(*app)
             {
                 IConstWUAppValue& val=app->query();
-                SCMStringBuffer buf;
-
                 Owned<IEspApplicationValue> t= createApplicationValue("","");
-                t->setApplication(val.getApplication(buf).str());
-                t->setName(val.getName(buf).str());
-                t->setValue(val.getValue(buf).str());
+                t->setApplication(val.queryApplication());
+                t->setName(val.queryName());
+                t->setValue(val.queryValue());
                 av.append(*t.getLink());
 
             }

File diff suppressed because it is too large
+ 265 - 130
plugins/cassandra/cassandraembed.cpp