Browse Source

Merge pull request #7582 from richardkchapman/cassandra-wu-query

HPCC-13917 Make sure that query is only fetched on demand

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
30d9a45687

+ 42 - 12
common/workunit/workunit.cpp

@@ -1539,6 +1539,7 @@ public:
     virtual IStringVal& getQueryResTxtName(IStringVal &str) const;
     virtual IConstWUAssociatedFile * getAssociatedFile(WUFileType type, unsigned index) const;
     virtual IConstWUAssociatedFileIterator& getAssociatedFiles() const;
+    virtual bool isArchive() const;
 
     virtual void        setQueryType(WUQueryType qt);
     virtual void        setQueryText(const char *pstr);
@@ -5258,7 +5259,7 @@ IWUQuery* CLocalWorkUnit::updateQuery()
     {
         IPropertyTree *s = p->queryPropTree("Query");
         if (!s)
-            s = p->addPropTree("Query", createPTreeFromXMLString("<Query fetchEntire='1'/>"));
+            s = p->addPropTree("Query", createPTreeFromXMLString("<Query fetchEntire='1'/>")); // Is this really desirable (the fetchEntire) ?
         s->Link();
         query.setown(new CLocalWUQuery(s)); 
     }
@@ -6848,25 +6849,39 @@ IStringVal& CLocalWUQuery::getQueryText(IStringVal &str) const
 
 IStringVal& CLocalWUQuery::getQueryShortText(IStringVal &str) const
 {
-    const char * text = p->queryProp("Text");
-    if (isArchiveQuery(text))
+    const char * text = p->queryProp("ShortText");
+    if (text)
+        str.set(text);
+    else
     {
-        Owned<IPropertyTree> xml = createPTreeFromXMLString(text, ipt_caseInsensitive);
-        const char * path = xml->queryProp("Query/@attributePath");
-        if (path)
+        text = p->queryProp("Text");
+        if (isArchiveQuery(text))
         {
-            IPropertyTree * resolved = resolveDefinitionInArchive(xml, path);
-            if (resolved)
-                str.set(resolved->queryProp(NULL));
+            Owned<IPropertyTree> xml = createPTreeFromXMLString(text, ipt_caseInsensitive);
+            const char * path = xml->queryProp("Query/@attributePath");
+            if (path)
+            {
+                IPropertyTree * resolved = resolveDefinitionInArchive(xml, path);
+                if (resolved)
+                    str.set(resolved->queryProp(NULL));
+            }
+            else
+                str.set(xml->queryProp("Query"));
         }
         else
-            str.set(xml->queryProp("Query"));
+            str.set(text);
     }
-    else
-        str.set(text);
     return str;
 }
 
+bool CLocalWUQuery::isArchive() const
+{
+    if (p->hasProp("@isArchive"))
+        return p->getPropBool("@isArchive");
+    const char *text = p->queryProp("Text");
+    return isArchiveQuery(text);
+}
+
 IStringVal& CLocalWUQuery::getQueryName(IStringVal &str) const
 {
     str.set(p->queryProp("@name"));
@@ -6914,6 +6929,21 @@ unsigned CLocalWUQuery::getQueryDllCrc() const
 void CLocalWUQuery::setQueryText(const char *text)
 {
     p->setProp("Text", text);
+    bool isArchive = isArchiveQuery(text);
+    if (isArchive)
+    {
+        Owned<IPropertyTree> xml = createPTreeFromXMLString(text, ipt_caseInsensitive);
+        const char * path = xml->queryProp("Query/@attributePath");
+        if (path)
+        {
+            IPropertyTree * resolved = resolveDefinitionInArchive(xml, path);
+            if (resolved)
+                p->setProp("ShortText", resolved->queryProp(NULL));
+        }
+        else
+            p->setProp("ShortText", xml->queryProp("Query"));
+    }
+    p->setPropBool("@isArchive", isArchive);
 }
 
 void CLocalWUQuery::setQueryName(const char *qname)

+ 1 - 0
common/workunit/workunit.hpp

@@ -391,6 +391,7 @@ interface IConstWUQuery : extends IInterface
     virtual IConstWUAssociatedFileIterator & getAssociatedFiles() const = 0;
     virtual IStringVal & getQueryShortText(IStringVal & str) const = 0;
     virtual IStringVal & getQueryMainDefinition(IStringVal & str) const = 0;
+    virtual bool isArchive() const = 0;
 };
 
 

+ 43 - 1
ecl/wutest/wutest.cpp

@@ -407,6 +407,7 @@ inline int min(int a, int b)
 class WuTest : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE(WuTest);
+        CPPUNIT_TEST(testInit);
         CPPUNIT_TEST(testCreate);
         CPPUNIT_TEST(testValidate);
         CPPUNIT_TEST(testList);
@@ -418,13 +419,14 @@ class WuTest : public CppUnit::TestFixture
         CPPUNIT_TEST(testResults);
         CPPUNIT_TEST(testDelete);
         CPPUNIT_TEST(testCopy);
+        CPPUNIT_TEST(testQuery);
         CPPUNIT_TEST(testGraph);
         CPPUNIT_TEST(testGraphProgress);
         CPPUNIT_TEST(testGlobal);
     CPPUNIT_TEST_SUITE_END();
 protected:
     static StringArray wuids;
-    void testCreate()
+    void testInit()
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         if (globals->getPropBool("entire", false) && globals->getPropBool("repository", false))
@@ -433,6 +435,10 @@ protected:
             factory->createRepository();
             DBGLOG("Repository recreated\n");
         }
+    }
+    void testCreate()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         unsigned before = factory->numWorkUnits();
         unsigned start = msTick();
         for (int i = 0; i < testSize; i++)
@@ -747,6 +753,42 @@ protected:
         factory->deleteWorkUnit(wuid);
     }
 
+    void testQuery()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        Owned<IWorkUnit> createWu = factory->createWorkUnit("WuTest", NULL, NULL, NULL);
+        StringBuffer wuid(createWu->queryWuid());
+        {
+            Owned<IWUQuery> query = createWu->updateQuery();
+            ASSERT(query);
+            query->setQueryText("Hello");
+            query->setQueryName("qname");
+            query->setQueryMainDefinition("fred");
+            query->setQueryType(QueryTypeEcl);
+            query->addAssociatedFile(FileTypeCpp, "myfile", "1.2.3.4", "Description", 53);
+            createWu.clear();
+        }
+
+        Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid);
+        ASSERT(streq(wu->queryWuid(), wuid));
+        Owned<IConstWUQuery> query = wu->getQuery();
+        ASSERT(query);
+        SCMStringBuffer s;
+        ASSERT(streq(query->getQueryText(s).str(), "Hello"));
+        ASSERT(streq(query->getQueryName(s).str(), "qname"));
+        ASSERT(streq(query->getQueryMainDefinition(s).str(),"fred"));
+        ASSERT(query->getQueryType()==QueryTypeEcl);
+        Owned <IConstWUAssociatedFile> file = query->getAssociatedFile(FileTypeCpp, 0);
+        ASSERT(file);
+        ASSERT(streq(file->getDescription(s).str(), "Description"));
+        ASSERT(streq(file->getName(s).str(), "myfile"));
+        ASSERT(file->getCrc()==53);
+        ASSERT(file->getType()==FileTypeCpp);
+        ASSERT(streq(file->getIp(s).str(), "1.2.3.4"));
+        wu.clear();
+        factory->deleteWorkUnit(wuid);
+    }
+
     void testGraph()
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();

+ 1 - 4
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -507,10 +507,7 @@ void WsWuInfo::getHelpers(IEspECLWorkunit &info, unsigned flags)
 
             if (version > 1.30)
             {
-                SCMStringBuffer qText;
-                query->getQueryText(qText);
-                if ((qText.length() > 0) && isArchiveQuery(qText.str()))
-                    info.setHasArchiveQuery(true);
+                info.setHasArchiveQuery(query->isArchive());
             }
 
             for (unsigned i = 0; i < FileTypeSize; i++)

+ 52 - 33
plugins/cassandra/cassandrawu.cpp

@@ -641,6 +641,7 @@ public:
     }
 } subTreeMapColumnMapper;
 
+/*
 static class QueryTextColumnMapper : public StringColumnMapper
 {
 public:
@@ -657,6 +658,7 @@ public:
         return StringColumnMapper::toXML(query, "Text", value);
     }
 } queryTextColumnMapper;
+*/
 
 static class GraphMapColumnMapper : implements CassandraColumnMapper
 {
@@ -715,28 +717,8 @@ public:
 private:
     const char *elemName;
     const char *nameAttr;
-} graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid");
+} graphMapColumnMapper("Graph", "@name"), workflowMapColumnMapper("Item", "@wfid"), associationsMapColumnMapper("File", "@filename");;
 
-static class AssociationsMapColumnMapper : public GraphMapColumnMapper
-{
-public:
-    AssociationsMapColumnMapper(const char *_elemName, const char *_nameAttr)
-    : GraphMapColumnMapper(_elemName, _nameAttr)
-    {
-    }
-    virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
-    {
-        // Name is "Query/Associated ...
-        IPTree *query = row->queryPropTree("Query");
-        if (!query)
-        {
-            query = createPTree("Query");
-            row->setPropTree("Query", query);
-            row->setProp("Query/@fetchEntire", "1"); // Compatibility...
-        }
-        return GraphMapColumnMapper::toXML(query, "Associated", value);
-    }
-} associationsMapColumnMapper("File", "@filename");
 
 static class WarningsMapColumnMapper : implements CassandraColumnMapper
 {
@@ -877,8 +859,6 @@ static const CassandraXmlMapping workunitsMappings [] =
     {"debug", "map<text, text>", "Debug", simpleMapColumnMapper},
     {"attributes", "map<text, text>", "@wuid@clusterName@jobName@priorityClass@protected@scope@submitID@state@timeScheduled@totalThorTime@", attributeMapColumnMapper},  // name is the suppression list, note trailing @
     {"plugins", "list<text>", "Plugins", pluginListColumnMapper},
-    {"query", "text", "Query/Text", queryTextColumnMapper},        // MORE - make me lazy...
-    {"associations", "map<text, text>", "Query/Associated", associationsMapColumnMapper},
     {"workflow", "map<text, text>", "Workflow", workflowMapColumnMapper},
     {"onWarnings", "map<int, text>", "OnWarnings/OnWarning", warningsMapColumnMapper},
 
@@ -971,7 +951,7 @@ static const CassandraXmlMapping filesReadSearchMappings [] =
 
 // The following describe child tables - all keyed by wuid
 
-enum ChildTablesEnum { WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild,ChildTablesSize };
+enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild,ChildTablesSize };
 
 struct ChildTableInfo
 {
@@ -981,6 +961,28 @@ struct ChildTableInfo
     const CassandraXmlMapping *mappings;
 };
 
+// wuQueries table is slightly unusual among the child tables as is is 1:1 - it is split out for lazy load purposes.
+
+static const CassandraXmlMapping wuQueryMappings [] =
+{
+    {"partition", "int", NULL, hashRootNameColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"associations", "map<text, text>", "Associated", associationsMapColumnMapper},
+    {"attributes", "map<text, text>", "", attributeMapColumnMapper},
+    {"query", "text", "Text", stringColumnMapper}, // May want to make this even lazier...
+    {"shortQuery", "text", "ShortText", stringColumnMapper},
+    { NULL, "wuQueries", "((partition, wuid))", stringColumnMapper}
+};
+
+static const ChildTableInfo wuQueriesTable =
+{
+    "Query", NULL,
+    WuQueryChild,
+    wuQueryMappings
+};
+
+// wuExceptions table holds the exceptions associated with a wuid
+
 static const CassandraXmlMapping wuExceptionsMappings [] =
 {
     {"partition", "int", NULL, hashRootNameColumnMapper},
@@ -1126,7 +1128,7 @@ static const ChildTableInfo wuFilesReadTable =
 };
 
 // Order should match the enum above
-static const ChildTableInfo * const childTables [] = { &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, NULL };
+static const ChildTableInfo * const childTables [] = { &wuQueriesTable, &wuExceptionsTable, &wuStatisticsTable, &wuGraphsTable, &wuResultsTable, &wuVariablesTable, &wuTemporariesTable, &wuFilesReadTable, NULL };
 
 // Graph progress tables are read directly, XML mappers not used
 
@@ -2114,7 +2116,7 @@ public:
                 // empty newly-created WU, it is unnecessary.
                 //deleteChildren(wuid);
 
-                // MORE can use the table
+                // MORE can use the table?
                 childXMLtoCassandra(sessionCache, *batch, wuGraphsMappings, p, "Graphs/Graph", 0);
                 childXMLtoCassandra(sessionCache, *batch, wuResultsMappings, p, "Results/Result", "0");
                 childXMLtoCassandra(sessionCache, *batch, wuVariablesMappings, p, "Variables/Variable", "-1"); // ResultSequenceStored
@@ -2122,6 +2124,9 @@ public:
                 childXMLtoCassandra(sessionCache, *batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
                 childXMLtoCassandra(sessionCache, *batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
                 childXMLtoCassandra(sessionCache, *batch, wuFilesReadMappings, p, "FilesRead/File", 0);
+                IPTree *query = p->queryPropTree("Query");
+                if (query)
+                    childXMLRowtoCassandra(sessionCache, *batch, wuQueryMappings, wuid, *query, 0);
             }
             else
             {
@@ -2271,6 +2276,16 @@ public:
     {
         return noteDirty(CPersistedWorkUnit::updateVariableByName(name));
     }
+    virtual IWUQuery * updateQuery()
+    {
+        noteDirty("Query", wuQueryMappings);
+        return CPersistedWorkUnit::updateQuery();
+    }
+    virtual IConstWUQuery *getQuery() const
+    {
+        checkChildLoaded(wuQueriesTable);
+        return CPersistedWorkUnit::getQuery();
+    }
     virtual IWUException *createException()
     {
         IWUException *result = CPersistedWorkUnit::createException();
@@ -2565,15 +2580,18 @@ protected:
         if (!childLoaded[childTable.index])
         {
             CassandraResult result(sessionCache->fetchDataForWuid(childTable.mappings, queryWuid(), false));
-            Owned<IPTree> results;
+            IPTree *results = p->queryPropTree(childTable.parentElement);
             CassandraIterator rows(cass_iterator_from_result(result));
             while (cass_iterator_next(rows))
             {
                 CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
                 Owned<IPTree> child;
                 if (!results)
-                    results.setown(createPTree(childTable.parentElement));
-                child.setown(createPTree(childTable.childElement));
+                    results = ensurePTree(p, childTable.parentElement);
+                if (childTable.childElement)
+                    child.setown(createPTree(childTable.childElement));
+                else
+                    child.set(results);
                 unsigned colidx = 2;  // We did not fetch wuid or partition
                 while (cass_iterator_next(cols))
                 {
@@ -2583,11 +2601,12 @@ protected:
                         childTable.mappings[colidx].mapper.toXML(child, childTable.mappings[colidx].xpath, value);
                     colidx++;
                 }
-                const char *childName = child->queryName();
-                results->addPropTree(childName, child.getClear());
+                if (childTable.childElement)
+                {
+                    const char *childName = child->queryName();
+                    results->addPropTree(childName, child.getClear());
+                }
             }
-            if (results)
-                p->addPropTree(childTable.parentElement, results.getClear());
             childLoaded[childTable.index] = true;
         }
     }