瀏覽代碼

HPCC-12251 Create cassandra plugin for workunit storage

Start work on supporting filtered workunit lists.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
871bbf2f88
共有 4 個文件被更改,包括 124 次插入13 次删除
  1. 5 0
      common/workunit/workunit.cpp
  2. 2 0
      common/workunit/workunit.hpp
  3. 6 2
      ecl/wutest/wutest.cpp
  4. 111 11
      plugins/cassandra/cassandraembed.cpp

+ 5 - 0
common/workunit/workunit.cpp

@@ -2147,6 +2147,7 @@ public:
         UniqueScopes us;
         UniqueScopes us;
         if (secmgr /* && secmgr->authTypeRequired(RT_WORKUNIT_SCOPE) tbd */)
         if (secmgr /* && secmgr->authTypeRequired(RT_WORKUNIT_SCOPE) tbd */)
         {
         {
+            // MORE - this will defeat any lazy-fetch mechanism in the incoming iterator
             scopes.setown(secmgr->createResourceList("wuscopes"));
             scopes.setown(secmgr->createResourceList("wuscopes"));
             ForEach(*ptreeIter)
             ForEach(*ptreeIter)
             {
             {
@@ -2955,6 +2956,10 @@ protected:
     SessionId session;
     SessionId session;
 };
 };
 
 
+extern WORKUNIT_API IConstWorkUnitIterator *createConstWUIterator(IPropertyTreeIterator *iter, ISecManager *secmgr, ISecUser *secuser)
+{
+    return new CConstWUIterator(iter, secmgr, secuser);
+}
 static CriticalSection factoryCrit;
 static CriticalSection factoryCrit;
 static Owned<ILoadedDllEntry> workunitServerPlugin;  // NOTE - unload AFTER the factory is released!
 static Owned<ILoadedDllEntry> workunitServerPlugin;  // NOTE - unload AFTER the factory is released!
 static Owned<IWorkUnitFactory> factory;
 static Owned<IWorkUnitFactory> factory;

+ 2 - 0
common/workunit/workunit.hpp

@@ -1398,6 +1398,8 @@ extern WORKUNIT_API IPropertyTree * resolveDefinitionInArchive(IPropertyTree * a
 inline bool isLibrary(IConstWorkUnit * wu) { return wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0) != 0; }
 inline bool isLibrary(IConstWorkUnit * wu) { return wu->getApplicationValueInt("LibraryModule", "interfaceHash", 0) != 0; }
 extern WORKUNIT_API bool looksLikeAWuid(const char * wuid, const char firstChar);
 extern WORKUNIT_API bool looksLikeAWuid(const char * wuid, const char firstChar);
 
 
+extern WORKUNIT_API IConstWorkUnitIterator *createConstWUIterator(IPropertyTreeIterator *iter, ISecManager *secmgr, ISecUser *secuser);
+
 enum WUQueryActivationOptions
 enum WUQueryActivationOptions
 {
 {
     DO_NOT_ACTIVATE = 0,
     DO_NOT_ACTIVATE = 0,

+ 6 - 2
ecl/wutest/wutest.cpp

@@ -48,7 +48,11 @@ bool dump(IConstWorkUnit &w, IProperties *globals)
     const char *action = globals->queryProp("#action");
     const char *action = globals->queryProp("#action");
     if (!action || stricmp(action, "list")==0)
     if (!action || stricmp(action, "list")==0)
     {
     {
-        printf("%-30s %-20s %-10s\n", w.queryWuid(), w.queryJobName(), w.queryStateDesc());
+        Owned <IConstWUQuery> query = w.getQuery();
+        SCMStringBuffer queryText;
+        if (query)
+            query->getQueryText(queryText);
+        printf("%-20s %-10s %-10s %s\n", w.queryWuid(), w.queryJobName(), w.queryStateDesc(), queryText.str());
     }
     }
     else if (stricmp(action, "results")==0)
     else if (stricmp(action, "results")==0)
     {
     {
@@ -323,7 +327,7 @@ int main(int argc, const char *argv[])
         }
         }
         else if (globals->hasProp("WUID"))
         else if (globals->hasProp("WUID"))
         {
         {
-            if (stricmp(globals->queryProp("#action"), "restore")==0)
+            if (globals->queryProp("#action") && stricmp(globals->queryProp("#action"), "restore")==0)
             {
             {
                 StringBuffer from;
                 StringBuffer from;
                 globals->getProp("FROM", from);
                 globals->getProp("FROM", from);

+ 111 - 11
plugins/cassandra/cassandraembed.cpp

@@ -2714,6 +2714,33 @@ const CassandraXmlMapping workunitsMappings [] =
     { NULL, "workunits", "((wuid))", stringColumnMapper}
     { NULL, "workunits", "((wuid))", stringColumnMapper}
 };
 };
 
 
+const CassandraXmlMapping ownerMappings [] =
+{
+    {"submitID", "text", "@submitID", stringColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"clustername", "text", "@clusterName", stringColumnMapper},
+    {"jobname", "text", "@jobName", stringColumnMapper},
+    {"priorityclass", "int", "@priorityClass", intColumnMapper},
+    {"protected", "boolean", "@protected", boolColumnMapper},
+    {"scope", "text", "@scope", stringColumnMapper},
+    {"state", "text", "@state", stringColumnMapper},
+    { NULL, "workunitsByOwner", "((submitID), wuid)", stringColumnMapper}
+};
+
+const CassandraXmlMapping workunitInfoMappings [] =  // A cut down version of the workunit mappings - used when querying with no key
+{
+    {"wuid", "text", NULL, rootNameColumnMapper},  // First field is not retrieved when querying...
+    {"wuid", "text", NULL, rootNameColumnMapper},  // But when we are using this mapping, we need it... simplest way to do that is just to repeat it here
+    {"clustername", "text", "@clusterName", stringColumnMapper},
+    {"jobname", "text", "@jobName", stringColumnMapper},
+    {"priorityclass", "int", "@priorityClass", intColumnMapper},
+    {"protected", "boolean", "@protected", boolColumnMapper},
+    {"scope", "text", "@scope", stringColumnMapper},
+    {"submitID", "text", "@submitID", stringColumnMapper},
+    {"state", "text", "@state", stringColumnMapper},
+    { NULL, "workunits", "((wuid))", stringColumnMapper}
+};
+
 const CassandraXmlMapping graphProgressMappings [] =
 const CassandraXmlMapping graphProgressMappings [] =
 {
 {
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
@@ -2794,12 +2821,15 @@ StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &
     return out.appendf("CREATE TABLE IF NOT EXISTS HPCC.%s (%s PRIMARY KEY %s);", mappings->columnType, fields.str(), mappings->xpath);
     return out.appendf("CREATE TABLE IF NOT EXISTS HPCC.%s (%s PRIMARY KEY %s);", mappings->columnType, fields.str(), mappings->xpath);
 }
 }
 
 
-const CassResult *fetchDataForWu(const char *wuid, CassSession *session, const CassandraXmlMapping *mappings)
+const CassResult *fetchDataForKey(const char *key, CassSession *session, const CassandraXmlMapping *mappings)
 {
 {
     StringBuffer names;
     StringBuffer names;
     StringBuffer tableName;
     StringBuffer tableName;
-    getFieldNames(mappings+1, names, tableName);  // mappings+1 means we don't return the wuid column
-    VStringBuffer selectQuery("select %s from HPCC.%s where wuid='%s';", names.str()+1, tableName.str(), wuid);
+    getFieldNames(mappings+1, names, tableName);  // mappings+1 means we don't return the key column
+    VStringBuffer selectQuery("select %s from HPCC.%s", names.str()+1, tableName.str());
+    if (key)
+        selectQuery.appendf(" where %s='%s'", mappings->columnName, key); // MORE - should consider using prepared for this - is it faster?
+    selectQuery.append(';');
     DBGLOG("%s", selectQuery.str());
     DBGLOG("%s", selectQuery.str());
     CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
     CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
     CassandraFuture future(cass_session_execute(session, statement));
     CassandraFuture future(cass_session_execute(session, statement));
@@ -2858,6 +2888,21 @@ extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *ba
     check(cass_batch_add_statement(batch, update));
     check(cass_batch_add_statement(batch, update));
 }
 }
 
 
+void updateSecondaryTable(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree *inXML,const char *prevKey)
+{
+    if (prevKey)
+    {
+        StringBuffer names;
+        StringBuffer tableName;
+        getFieldNames(mappings, names, tableName);
+        VStringBuffer insertQuery("DELETE from HPCC.%s where %s=?;", tableName.str(), mappings[0].columnName);
+        Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
+        CassandraStatement update(cass_prepared_bind(*prepared));
+        check(cass_statement_bind_string(update, 0, cass_string_init(prevKey)));
+    }
+    simpleXMLtoCassandra(session, batch, mappings, wuid, inXML);
+}
+
 extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, int defaultValue)
 extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, int defaultValue)
 {
 {
     if (elements->first())
     if (elements->first())
@@ -2889,7 +2934,7 @@ extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *bat
 
 
 extern void cassandraToChildXML(CassSession *session, const CassandraXmlMapping *mappings, const char *wuid, IPTree *wuTree, const char *parentName, const char *childName)
 extern void cassandraToChildXML(CassSession *session, const CassandraXmlMapping *mappings, const char *wuid, IPTree *wuTree, const char *parentName, const char *childName)
 {
 {
-    CassandraResult result(fetchDataForWu(wuid, session, mappings));
+    CassandraResult result(fetchDataForKey(wuid, session, mappings));
     Owned<IPTree> parent = createPTree(parentName);
     Owned<IPTree> parent = createPTree(parentName);
     CassandraIterator rows(cass_iterator_from_result(result));
     CassandraIterator rows(cass_iterator_from_result(result));
     while (cass_iterator_next(rows))
     while (cass_iterator_next(rows))
@@ -2922,7 +2967,7 @@ extern void wuVariablesXMLtoCassandra(const ICassandraSession *session, CassBatc
 
 
 extern void cassandraToWuResultsXML(CassSession *session, const char *wuid, IPTree *wuTree)
 extern void cassandraToWuResultsXML(CassSession *session, const char *wuid, IPTree *wuTree)
 {
 {
-    CassandraResult result(fetchDataForWu(wuid, session, wuResultsMappings));
+    CassandraResult result(fetchDataForKey(wuid, session, wuResultsMappings));
     Owned<IPTree> results;
     Owned<IPTree> results;
     CassandraIterator rows(cass_iterator_from_result(result));
     CassandraIterator rows(cass_iterator_from_result(result));
     while (cass_iterator_next(rows))
     while (cass_iterator_next(rows))
@@ -2950,7 +2995,7 @@ extern void cassandraToWuResultsXML(CassSession *session, const char *wuid, IPTr
 
 
 extern void cassandraToWuVariablesXML(CassSession *session, const char *wuid, IPTree *wuTree)
 extern void cassandraToWuVariablesXML(CassSession *session, const char *wuid, IPTree *wuTree)
 {
 {
-    CassandraResult result(fetchDataForWu(wuid, session, wuVariablesMappings));
+    CassandraResult result(fetchDataForKey(wuid, session, wuVariablesMappings));
     Owned<IPTree> variables;
     Owned<IPTree> variables;
     Owned<IPTree> temporaries;
     Owned<IPTree> temporaries;
     CassandraIterator rows(cass_iterator_from_result(result));
     CassandraIterator rows(cass_iterator_from_result(result));
@@ -3088,7 +3133,7 @@ extern void cassandraToGraphProgressXML(CassSession *session, const char *wuid)
 
 
 extern IPTree *cassandraToWorkunitXML(CassSession *session, const char *wuid)
 extern IPTree *cassandraToWorkunitXML(CassSession *session, const char *wuid)
 {
 {
-    CassandraResult result(fetchDataForWu(wuid, session, workunitsMappings));
+    CassandraResult result(fetchDataForKey(wuid, session, workunitsMappings));
     CassandraIterator rows(cass_iterator_from_result(result));
     CassandraIterator rows(cass_iterator_from_result(result));
     if (cass_iterator_next(rows)) // should just be one
     if (cass_iterator_next(rows)) // should just be one
     {
     {
@@ -3152,7 +3197,8 @@ public:
         : sessionCache(_sessionCache), CLocalWorkUnit(secmgr, secuser)
         : sessionCache(_sessionCache), CLocalWorkUnit(secmgr, secuser)
     {
     {
         CLocalWorkUnit::loadPTree(wuXML);
         CLocalWorkUnit::loadPTree(wuXML);
-        allDirty = false;   // Debatable...
+        allDirty = false;   // Debatable... depends where the XML came from! If we read it from Cassandra. it's not. Otherwise, it is...
+        basicDirty = false;
         abortDirty = true;
         abortDirty = true;
         abortState = false;
         abortState = false;
     }
     }
@@ -3180,9 +3226,13 @@ public:
         StringBuffer s; toXML(p, s); DBGLOG("%s", s.str());
         StringBuffer s; toXML(p, s); DBGLOG("%s", s.str());
         if (batch)
         if (batch)
         {
         {
-            simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, queryWuid(), p);  // This just does the parent row
+            const char *wuid = queryWuid();
+            if (basicDirty)
+                updateSecondaryTable(sessionCache, *batch, ownerMappings, wuid, p, prevOwner);
+            simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, wuid, p);  // This just does the parent row
             if (allDirty)
             if (allDirty)
             {
             {
+                // MORE - should probably delete all prior records in these tables where wuid=wuid
                 wuResultsXMLtoCassandra(sessionCache, *batch, p, "Results/Result");
                 wuResultsXMLtoCassandra(sessionCache, *batch, p, "Results/Result");
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Variables/Variable", ResultSequenceStored);
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Variables/Variable", ResultSequenceStored);
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Temporaries/Variable", ResultSequenceInternal); // NOTE - lookups may also request ResultSequenceOnce
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Temporaries/Variable", ResultSequenceInternal); // NOTE - lookups may also request ResultSequenceOnce
@@ -3192,17 +3242,30 @@ public:
             else
             else
             {
             {
                 ResultPTreeIterator dirtyResultsIterator(dirtyResults);
                 ResultPTreeIterator dirtyResultsIterator(dirtyResults);
-                childXMLtoCassandra(sessionCache, *batch, wuResultsMappings, queryWuid(), &dirtyResultsIterator, 0);  // MORE - all the other dirty subtrees... TBD
+                childXMLtoCassandra(sessionCache, *batch, wuResultsMappings, wuid, &dirtyResultsIterator, 0);  // MORE - all the other dirty subtrees... TBD
             }
             }
             CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
             CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
             futureBatch.wait("execute");
             futureBatch.wait("execute");
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED))); // Commit leaves it locked...
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED))); // Commit leaves it locked...
+            prevOwner.clear();
             allDirty = false;
             allDirty = false;
         }
         }
         else
         else
             DBGLOG("No batch present??");
             DBGLOG("No batch present??");
     }
     }
 
 
+    virtual void setUser(const char *user)
+    {
+        if (!user)
+            user="";
+        const char *current = queryUser();
+        if (streq(current, user))
+            return;  // No change
+        if (!prevOwner) // We need to record the last _committed_ value so we can update the secondary tables appropriately
+            prevOwner.set(queryUser());
+        basicDirty = true;
+        CLocalWorkUnit::setUser(user);
+    }
     virtual void _lockRemote()
     virtual void _lockRemote()
     {
     {
         // Ignore locking for now!
         // Ignore locking for now!
@@ -3307,6 +3370,8 @@ protected:
     mutable bool abortDirty;
     mutable bool abortDirty;
     mutable bool abortState;
     mutable bool abortState;
     bool allDirty;
     bool allDirty;
+    bool basicDirty;
+    StringAttr prevOwner;
     Owned<CassandraBatch> batch;
     Owned<CassandraBatch> batch;
     IArrayOf<IWUResult> dirtyResults;
     IArrayOf<IWUResult> dirtyResults;
 };
 };
@@ -3337,6 +3402,7 @@ public:
         VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS hpcc WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1' } ;"); // MORE - options from props!
         VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS hpcc WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1' } ;"); // MORE - options from props!
         executeSimpleCommand(session, create);
         executeSimpleCommand(session, create);
         ensureTable(session, workunitsMappings);
         ensureTable(session, workunitsMappings);
+        ensureTable(session, ownerMappings);
         ensureTable(session, wuResultsMappings);
         ensureTable(session, wuResultsMappings);
         ensureTable(session, wuVariablesMappings);
         ensureTable(session, wuVariablesMappings);
         ensureTable(session, wuExceptionsMappings);
         ensureTable(session, wuExceptionsMappings);
@@ -3408,7 +3474,10 @@ public:
     }
     }
 
 
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) { UNIMPLEMENTED; }
     virtual IWorkUnit * getGlobalWorkUnit(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) { UNIMPLEMENTED; }
-    virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
+    virtual IConstWorkUnitIterator * getWorkUnitsByOwner(const char * owner, ISecManager *secmgr, ISecUser *secuser)
+    {
+        return getWorkUnitsByXXX(ownerMappings, owner, secmgr, secuser);
+    }
     virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstWorkUnitIterator * getWorkUnitsByState(WUState state, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstWorkUnitIterator * getWorkUnitsByECL(const char * ecl, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstWorkUnitIterator * getWorkUnitsByCluster(const char * cluster, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
@@ -3497,6 +3566,37 @@ public:
         }
         }
     }
     }
 private:
 private:
+    IConstWorkUnitIterator * getWorkUnitsByXXX(const CassandraXmlMapping *mappings, const char *key, ISecManager *secmgr, ISecUser *secuser)
+    {
+        if (!key)
+            mappings=workunitInfoMappings;   // Historically, providing no value on a call to getWorkUnitsByOwner (for example) filter meant unfiltered...
+        CassandraResult result(fetchDataForKey(key, session, mappings));
+        Owned<IPTree> parent = createPTree("WorkUnits");
+        CassandraIterator rows(cass_iterator_from_result(result));
+        while (cass_iterator_next(rows))
+        {
+            CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
+            Owned<IPTree> child = createPTree("wuid");  // Gets overwritten below when wuid field is processed
+            unsigned colidx = 1;
+            while (cass_iterator_next(cols))
+            {
+                assertex(mappings[colidx].columnName);
+                const CassValue *value = cass_iterator_get_column(cols);
+                if (value && !cass_value_is_null(value))
+                    mappings[colidx].mapper.toXML(child, mappings[colidx].xpath, value);
+                colidx++;
+            }
+            const char *childName = child->queryName();
+            parent->addPropTree(childName, child.getClear());
+        }
+        StringBuffer buf;
+        toXML(parent, buf);
+        DBGLOG("%s", buf.str());
+        Owned<IPropertyTreeIterator> iter = parent->getElements("*");
+        return createConstWUIterator(iter, secmgr, secuser);
+    }
+
+
     CassandraCluster cluster;
     CassandraCluster cluster;
     CassandraSession session;
     CassandraSession session;
     mutable CriticalSection cacheCrit;
     mutable CriticalSection cacheCrit;