Pārlūkot izejas kodu

HPCC-12251 Create cassandra plugin for workunit storage

Removed unused methods from workunitFactory interface.

Refactored any few remaining places that searched by xpath.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 gadi atpakaļ
vecāks
revīzija
1d3ae7939d

+ 5 - 48
common/workunit/workunit.cpp

@@ -2664,41 +2664,19 @@ public:
         Owned<CLocalWorkUnit> cw = new CDaliWorkUnit(conn, (ISecManager *) NULL, NULL);
         return &cw->lockRemote(false);
     }
-
     virtual IConstWorkUnitIterator* getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
     {
         StringBuffer path("*");
         if (owner && *owner)
-            path.append("[@submitID=\"").append(owner).append("\"]");
-        return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
-    }
-    IConstWorkUnitIterator* getWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser)
-    {
-        StringBuffer path("*");
-        path.append("[@state=\"").append(getEnumText(state, states)).append("\"]");
-        return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
-    }
-    IConstWorkUnitIterator* getWorkUnitsByECL(const char* ecl, ISecManager *secmgr, ISecUser *secuser)
-    {
-        StringBuffer path("*");
-        if (ecl && *ecl)
-            path.append("[Query/Text=~\"*").append(ecl).append("*\"]");
+            path.append("[@submitID=?~\"").append(owner).append("\"]");
         return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
     }
-    IConstWorkUnitIterator* getWorkUnitsByCluster(const char* cluster, ISecManager *secmgr, ISecUser *secuser)
+    IConstWorkUnitIterator* getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
     {
         StringBuffer path("*");
-        if (cluster && *cluster)
-            path.append("[@clusterName=\"").append(cluster).append("\"]");
+        path.append("[@state=\"").append(getEnumText(WUStateScheduled, states)).append("\"]");
         return _getWorkUnitsByXPath(path.str(), secmgr, secuser);
     }
-
-    IConstWorkUnitIterator* getWorkUnitsByXPath(const char *xpath, ISecManager *secmgr, ISecUser *secuser)
-    {
-        // NOTE - this is deprecated - we want to get rid of it (daliadmin MAY be allowed to use it, but nothing else should)
-        return _getWorkUnitsByXPath(xpath, secmgr, secuser);
-    }
-
     virtual void clientShutdown();
 
     virtual unsigned numWorkUnits()
@@ -3139,33 +3117,12 @@ public:
         if (!secUser) secUser = defaultSecUser.get();
         return baseFactory->getWorkUnitsByOwner(owner, secMgr, secUser);
     }
-    virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secMgr, ISecUser *secUser)
+    virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secMgr, ISecUser *secUser)
     {
         if (!secMgr) secMgr = defaultSecMgr.get();
         if (!secUser) secUser = defaultSecUser.get();
-        return baseFactory->getWorkUnitsByState(state, secMgr, secUser);
+        return baseFactory->getScheduledWorkUnits(secMgr, secUser);
     }
-    virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char* ecl, ISecManager *secMgr, ISecUser *secUser)
-    {
-        if (!secMgr) secMgr = defaultSecMgr.get();
-        if (!secUser) secUser = defaultSecUser.get();
-        return baseFactory->getWorkUnitsByECL(ecl, secMgr, secUser);
-    }
-
-    virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char* cluster, ISecManager *secMgr, ISecUser *secUser)
-    {   
-        if (!secMgr) secMgr = defaultSecMgr.get();
-        if (!secUser) secUser = defaultSecUser.get();
-        return baseFactory->getWorkUnitsByCluster(cluster, secMgr, secUser);
-    }
-
-    virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath, ISecManager *secMgr, ISecUser *secUser)
-    {
-        if (!secMgr) secMgr = defaultSecMgr.get();
-        if (!secUser) secUser = defaultSecUser.get();
-        return baseFactory->getWorkUnitsByXPath(xpath, secMgr, secUser);
-    }
-
     virtual void descheduleAllWorkUnits(ISecManager *secMgr, ISecUser *secUser)
     {
         if (!secMgr) secMgr = defaultSecMgr.get();

+ 1 - 4
common/workunit/workunit.hpp

@@ -1270,14 +1270,11 @@ interface IWorkUnitFactory : extends IInterface
     virtual int setTracingLevel(int newlevel) = 0;
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char * scope, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
                                                         unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total,
                                                         ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual unsigned numWorkUnits() = 0;
+    virtual IConstWorkUnitIterator *getScheduledWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual void descheduleAllWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) = 0;
     virtual bool isAborting(const char *wuid) const = 0;

+ 1 - 4
common/workunit/workunit.ipp

@@ -584,10 +584,6 @@ public:
     virtual IWorkUnit * createNamedWorkUnit(const char * wuid, const char * app, const char *scope, ISecManager *secmgr, ISecUser *secuser);
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr, ISecUser *secuser) = 0;
     virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
-    virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;  // deprecated
     virtual IConstWorkUnitIterator* getWorkUnitsSorted(WUSortField sortorder, // field to sort by
                                                 WUSortField *filters,   // NULL or list of fields to filter on (terminated by WUSFterm)
                                                 const void *filterbuf,  // (appended) string values for filters
@@ -599,6 +595,7 @@ public:
                                                 ISecManager *secmgr,
                                                 ISecUser *secuser) = 0;
     virtual unsigned numWorkUnits() = 0;
+    virtual IConstWorkUnitIterator *getScheduledWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser);
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
     virtual bool isAborting(const char *wuid) const;

+ 7 - 6
dali/daliadmin/daliadmin.cpp

@@ -2510,15 +2510,16 @@ static void wuidCompress(const char *match, const char *type, bool compress)
         WARNLOG("Currently, only type=='graph' supported.");
         return;
     }
+    Owned<IRemoteConnection> conn = querySDS().connect("/WorkUnits", myProcessSession(), 0, daliConnectTimeoutMs);
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-    Owned<IConstWorkUnitIterator> iter = factory->getWorkUnitsByXPath(match);
+    Owned<IPropertyTreeIterator> iter = conn->queryRoot()->getElements(match?match:"*", iptiter_remote);
     ForEach(*iter)
     {
-        IConstWorkUnitInfo &wuidInfo = iter->query();
-        IConstWorkUnit &wuid = *factory->openWorkUnit(wuidInfo.queryWuid(), false);
+        const char *wuid = iter->query().queryName();
+        IConstWorkUnit &wu = *factory->openWorkUnit(wuid, false);
 
         StringArray graphNames;
-        Owned<IConstWUGraphIterator> graphIter = &wuid.getGraphs(GraphTypeAny);
+        Owned<IConstWUGraphIterator> graphIter = &wu.getGraphs(GraphTypeAny);
         ForEach(*graphIter)
         {
             SCMStringBuffer graphName;
@@ -2534,8 +2535,8 @@ static void wuidCompress(const char *match, const char *type, bool compress)
         if (graphNames.ordinality())
         {
             const char *msg = compress ? "Compressing" : "Uncompressing";
-            PROGLOG("%s graphs for workunit: %s", msg, wuidInfo.queryWuid());
-            Owned<IWorkUnit> wWuid = &wuid.lock();
+            PROGLOG("%s graphs for workunit: %s", msg, wuid);
+            Owned<IWorkUnit> wWuid = &wu.lock();
             ForEachItemIn(n, graphNames)
             {
                 Owned<IWUGraph> wGraph = wWuid->updateGraph(graphNames.item(n));

+ 5 - 18
ecl/wutest/wutest.cpp

@@ -430,7 +430,7 @@ protected:
             if (i % 6)
                 wu->setState(WUStateCompleted);
             else
-                wu->setState(WUStateFailed);
+                wu->setState(WUStateScheduled);
             wu->setUser(userId);
             wu->setClusterName(clusterName);
             if (i % 3)
@@ -932,30 +932,17 @@ protected:
         DBGLOG("%d non-existent workunits listed in %d ms", numIterated, msTick()-start);
         ASSERT(numIterated == 0);
 
-        // And by cluster
-        wus.setown(factory->getWorkUnitsByCluster("WuTestCluster0", NULL, NULL));
+        // Get Scheduled Workunits
+        wus.setown(factory->getScheduledWorkUnits(NULL, NULL));
         start = msTick();
         numIterated = 0;
         ForEach(*wus)
         {
             IConstWorkUnitInfo &wu = wus->query();
-            ASSERT(streq(wu.queryClusterName(), "WuTestCluster0"));
+            ASSERT(wu.getState() == WUStateScheduled);
             numIterated++;
         }
-        DBGLOG("%d cluster workunits listed in %d ms", numIterated, msTick()-start);
-        ASSERT(numIterated == (testSize+4)/5);
-
-        // And by state
-        wus.setown(factory->getWorkUnitsByState(WUStateFailed, NULL, NULL));
-        start = msTick();
-        numIterated = 0;
-        ForEach(*wus)
-        {
-            IConstWorkUnitInfo &wu = wus->query();
-            ASSERT(wu.getState() == WUStateFailed);
-            numIterated++;
-        }
-        DBGLOG("%d failed workunits listed in %d ms", numIterated, msTick()-start);
+        DBGLOG("%d scheduled workunits listed in %d ms", numIterated, msTick()-start);
         ASSERT(numIterated == (testSize+5)/6);
 
     }

+ 2 - 2
esp/scm/ws_workunits.ecm

@@ -905,7 +905,7 @@ ESPrequest WUExportRequest
     string State;
     string StartDate;
     string EndDate;
-    string ECL;
+    [max_ver("1.55")] string ECL;
     string Jobname;
 };
 ESPresponse [exceptions_inline] WUExportResponse
@@ -1610,7 +1610,7 @@ ESPresponse [exceptions_inline] WUGetStatsResponse
 };
 
 ESPservice [
-    version("1.55"), default_client_version("1.55"),
+    version("1.56"), default_client_version("1.56"),
     noforms,exceptions_inline("./smc_xslt/exceptions.xslt"),use_method_name] WsWorkunits
 {
     ESPmethod [resp_xsl_default("/esp/xslt/workunits.xslt")]     WUQuery(WUQueryRequest, WUQueryResponse);

+ 14 - 27
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -2201,47 +2201,34 @@ void WsWuInfo::getWorkunitAssociatedXml(const char* name, const char* ipAddress,
 
 
 
-WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* ecl,const char* jobname,const char* appname,const char* appkey,const char* appvalue)
+WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* jobname)
 {
     SecAccessFlags accessOwn;
     SecAccessFlags accessOthers;
     getUserWuAccessFlags(context, accessOwn, accessOthers, true);
 
     Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
+    Owned<IConstWorkUnitIterator> it(factory->getWorkUnitsByOwner(owner)); // null owner means fetch all
 
-    StringBuffer xpath("*");
-    if(ecl && *ecl)
-        xpath.append("[Query/Text=?~\"*").append(ecl).append("*\"]");
-    if(state && *state)
-        xpath.append("[@state=\"").append(state).append("\"]");
-    if(cluster && *cluster)
-        xpath.append("[@clusterName=\"").append(cluster).append("\"]");
-    if(owner && *owner)
-        xpath.append("[@submitID=?~\"").append(owner).append("\"]");
-    if(jobname && *jobname)
-        xpath.append("[@jobName=?~\"*").append(jobname).append("*\"]");
-    if((appname && *appname) || (appkey && *appkey) || (appvalue && *appvalue))
-    {
-        xpath.append("[Application/").append(appname && *appname ? appname : "*");
-        xpath.append("/").append(appkey && *appkey ? appkey : "*");
-        if(appvalue && *appvalue)
-            xpath.append("=?~\"").append(appvalue).append("\"");
-        xpath.append("]");
-    }
-
-    Owned<IConstWorkUnitIterator> it(factory->getWorkUnitsByXPath(xpath.str()));
-
-    StringBuffer wuFrom, wuTo;
-    if(startDate && *startDate)
+    StringBuffer wuFrom, wuTo, jobPattern;
+    if (startDate && *startDate)
         createWuidFromDate(startDate, wuFrom);
-    if(endDate && *endDate)
+    if (endDate && *endDate)
         createWuidFromDate(endDate, wuTo);
+    if (jobname && *jobname)
+        jobPattern.appendf("*%s*", jobname);
 
     ForEach(*it)
     {
         IConstWorkUnitInfo &cw = it->query();
         if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
             continue;
+        if (state && *state && !strieq(cw.queryStateDesc(), state))
+            continue;
+        if (cluster && *cluster && !strieq(cw.queryClusterName(), cluster))
+            continue;
+        if (jobPattern.length() && !WildMatch(cw.queryJobName(), jobPattern, true))
+            continue;
 
         const char *wuid = cw.queryWuid();
         if (wuFrom.length() && strcmp(wuid,wuFrom.str())<0)
@@ -2781,7 +2768,7 @@ int WUSchedule::run()
         while(!stopping)
         {
             Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-            Owned<IConstWorkUnitIterator> itr = factory->getWorkUnitsByState(WUStateScheduled);
+            Owned<IConstWorkUnitIterator> itr = factory->getScheduledWorkUnits();
             if (itr)
             {
                 ForEach(*itr)

+ 1 - 1
esp/services/ws_workunits/ws_workunitsHelpers.hpp

@@ -211,7 +211,7 @@ void getSashaNode(SocketEndpoint &ep);
 
 struct WsWuSearch
 {
-    WsWuSearch(IEspContext& context,const char* owner=NULL,const char* state=NULL,const char* cluster=NULL,const char* startDate=NULL,const char* endDate=NULL,const char* ecl=NULL,const char* jobname=NULL,const char* appname=NULL,const char* appkey=NULL,const char* appvalue=NULL);
+    WsWuSearch(IEspContext& context,const char* owner=NULL,const char* state=NULL,const char* cluster=NULL,const char* startDate=NULL,const char* endDate=NULL,const char* jobname=NULL);
 
     typedef std::vector<std::string>::iterator iterator;
 

+ 3 - 64
esp/services/ws_workunits/ws_workunitsService.cpp

@@ -1644,69 +1644,6 @@ void doWUQueryByFile(IEspContext &context, const char *logicalFile, IEspWUQueryR
     resp.setCount(1);
 }
 
-void doWUQueryByXPath(IEspContext &context, IEspWUQueryRequest & req, IEspWUQueryResponse & resp)
-{
-    IArrayOf<IEspECLWorkunit> results;
-
-    WsWuSearch wlist(context,req.getOwner(),req.getState(),req.getCluster(),req.getStartDate(),req.getEndDate(),req.getECL(),req.getJobname(),req.getApplicationName(),req.getApplicationKey(),req.getApplicationData());
-
-    int count=(int)req.getPageSize();
-    if (!count)
-        count=100;
-
-    if (wlist.getSize() < 1)
-    {
-        resp.setNumWUs(0);
-        return;
-    }
-
-    if (wlist.getSize() < count)
-        count = (int) wlist.getSize() - 1;
-
-    WsWuSearch::iterator begin, end;
-
-    if(notEmpty(req.getAfter()))
-    {
-        begin=wlist.locate(req.getAfter());
-        end=min(begin+count,wlist.end());
-    }
-    else if (notEmpty(req.getBefore()))
-    {
-        end=wlist.locate(req.getBefore());
-        begin=max(end-count,wlist.begin());
-    }
-    else
-    {
-        begin=wlist.begin();
-        end=min(begin+count,wlist.end());
-    }
-
-    if(begin>wlist.begin() && begin<wlist.end())
-        resp.setCurrent(begin->c_str());
-
-    if (context.getClientVersion() > 1.02)
-    {
-        resp.setPageStartFrom(begin - wlist.begin() + 1);
-        resp.setNumWUs((int)wlist.getSize());
-        resp.setCount(end - begin);
-    }
-
-    if(end<wlist.end())
-        resp.setNext(end->c_str());
-
-    for(;begin!=end;begin++)
-    {
-        Owned<IEspECLWorkunit> info = createECLWorkunit("","");
-        WsWuInfo winfo(context, begin->c_str());
-        winfo.getCommon(*info, 0);
-        results.append(*info.getClear());
-    }
-    resp.setPageSize(abs(count));
-    resp.setWorkunits(results);
-
-    return;
-}
-
 bool addWUQueryFilter(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *name, WUSortField value)
 {
     if (isEmpty(name))
@@ -3112,8 +3049,10 @@ bool CWsWorkunitsEx::onWUExport(IEspContext &context, IEspWUExportRequest &req,
 {
     try
     {
+        if (req.getECL() && *req.getECL())
+            throw makeStringException(0, "WUExport no longer supports filtering by ECL text");
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
-        WsWuSearch ws(context, req.getOwner(), req.getState(), req.getCluster(), req.getStartDate(), req.getEndDate(), req.getECL(), req.getJobname());
+        WsWuSearch ws(context, req.getOwner(), req.getState(), req.getCluster(), req.getStartDate(), req.getEndDate(), req.getJobname());
 
         StringBuffer xml("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Workunits>");
         for(WsWuSearch::iterator it=ws.begin(); it!=ws.end(); it++)

+ 2 - 8
plugins/cassandra/cassandraembed.cpp

@@ -4209,16 +4209,10 @@ public:
     {
         return getWorkUnitsByXXX("@submitID", owner, secmgr, secuser);
     }
-    virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser)
+    virtual IConstWorkUnitIterator * getScheduledWorkUnits(ISecManager *secmgr, ISecUser *secuser)
     {
-        return getWorkUnitsByXXX("@state", getWorkunitStateStr(state), secmgr, secuser);
+        return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser);
     }
-    virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
-    virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster, ISecManager *secmgr, ISecUser *secuser)
-    {
-        return getWorkUnitsByXXX("@clusterName", cluster, secmgr, secuser);
-    }
-    virtual IConstWorkUnitIterator * getWorkUnitsByXPath(const char * xpath, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
                                                         unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total,
                                                         ISecManager *secmgr, ISecUser *secuser)