浏览代码

HPCC-12251 Create cassandra plugin for workunit storage

Add support for WorkUnitExceptions.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父节点
当前提交
305072a326
共有 5 个文件被更改,包括 108 次插入25 次删除
  1. 20 12
      common/workunit/workunit.cpp
  2. 1 0
      common/workunit/workunit.hpp
  3. 1 0
      common/workunit/workunit.ipp
  4. 47 0
      ecl/wutest/wutest.cpp
  5. 39 13
      plugins/cassandra/cassandraembed.cpp

+ 20 - 12
common/workunit/workunit.cpp

@@ -1103,7 +1103,6 @@ public:
     {
         CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
         connection->close(true);
-        PROGLOG("WUID %s removed",p->queryName());
         connection.clear();
     }
 
@@ -1891,6 +1890,7 @@ public:
     virtual IStringVal & getExceptionFileName(IStringVal & str) const;
     virtual unsigned    getExceptionLineNo() const;
     virtual unsigned    getExceptionColumn() const;
+    virtual unsigned    getSequence() const;
     virtual void        setExceptionSource(const char *str);
     virtual void        setExceptionMessage(const char *str);
     virtual void        setExceptionCode(unsigned code);
@@ -5827,22 +5827,24 @@ void CLocalWorkUnit::unsubscribe()
     // Only overriding versions need to do anything
 }
 
+void CLocalWorkUnit::_loadExceptions() const
+{
+    assertex(exceptions.length() == 0);
+    Owned<IPropertyTreeIterator> r = p->getElements("Exceptions/Exception");
+    for (r->first(); r->isValid(); r->next())
+    {
+        IPropertyTree *rp = &r->query();
+        rp->Link();
+        exceptions.append(*new CLocalWUException(rp));
+    }
+}
+
 void CLocalWorkUnit::loadExceptions() const
 {
     CriticalBlock block(crit);
     if (!exceptionsCached)
     {
-        
-        assertex(exceptions.length() == 0);
-        Owned<IPropertyTreeIterator> r = p->getElements("Exceptions/Exception");
-
-
-        for (r->first(); r->isValid(); r->next())
-        {
-            IPropertyTree *rp = &r->query();
-            rp->Link();
-            exceptions.append(*new CLocalWUException(rp));
-        }
+        _loadExceptions();
         exceptionsCached = true;
     }
 }
@@ -5881,6 +5883,7 @@ IWUException* CLocalWorkUnit::createException()
         p->addPropTree("Exceptions", createPTree("Exceptions"));
     IPropertyTree *r = p->queryPropTree("Exceptions");
     IPropertyTree *s = r->addPropTree("Exception", createPTree("Exception"));
+    s->setPropInt("@sequence", exceptions.ordinality());
     IWUException* q = new CLocalWUException(LINK(s)); 
     exceptions.append(*LINK(q));
 
@@ -8563,6 +8566,11 @@ unsigned CLocalWUException::getExceptionColumn() const
     return p->getPropInt("@col", 0);
 }
 
+unsigned CLocalWUException::getSequence() const
+{
+    return p->getPropInt("@sequence", 0);
+}
+
 void CLocalWUException::setExceptionSource(const char *str)
 {
     p->setProp("@source", str);

+ 1 - 0
common/workunit/workunit.hpp

@@ -505,6 +505,7 @@ interface IConstWUException : extends IInterface
     virtual IStringVal & getExceptionFileName(IStringVal & str) const = 0;
     virtual unsigned getExceptionLineNo() const = 0;
     virtual unsigned getExceptionColumn() const = 0;
+    virtual unsigned getSequence() const = 0;
 };
 
 

+ 1 - 0
common/workunit/workunit.ipp

@@ -561,6 +561,7 @@ protected:
     virtual void unsubscribe();
     virtual void _loadResults() const;
     virtual void _loadStatistics() const;
+    virtual void _loadExceptions() const;
 };
 
 interface ISDSManager; // MORE - can remove once dali split out

+ 47 - 0
ecl/wutest/wutest.cpp

@@ -760,6 +760,19 @@ protected:
         for (i = 0; i < testSize; i++)
         {
             Owned<IWorkUnit> wu = factory->updateWorkUnit(wuids.item(i));
+            for (int exNum = 0; exNum < 10; exNum++)
+            {
+                Owned <IWUException> ex = wu->createException();
+                ex->setExceptionCode(77 + exNum);
+                ex->setExceptionColumn(88 + exNum);
+                ex->setExceptionFileName("exfile");
+                ex->setExceptionLineNo(99);
+                ex->setExceptionMessage("exMessage");
+                ex->setExceptionSource("exSource");
+                ex->setSeverity(SeverityFatal);
+                ex->setTimeStamp("2001");
+            }
+
             wu->addProcess("ptype", "pInstance", 54321, "mylog");
             wu->setAction(WUActionCompile);
             wu->setApplicationValue("app1", "av1", "value", true);
@@ -798,6 +811,22 @@ protected:
                 exportWorkUnitToXML(wu, wuXML, true, false, false);
                 DBGLOG("%s", wuXML.str());
             }
+            ASSERT(wu->getExceptionCount() == 10);
+            Owned<IConstWUExceptionIterator> exceptions = &wu->getExceptions();
+            int exNo = 0;
+            ForEach(*exceptions)
+            {
+                IConstWUException &ex = exceptions->query();
+                ASSERT(ex.getExceptionCode()==77 + exNo);
+                ASSERT(ex.getExceptionColumn()==88 + exNo);
+                ASSERT(streq(ex.getExceptionFileName(s).str(),"exfile"));
+                ASSERT(ex.getExceptionLineNo()==99);
+                ASSERT(streq(ex.getExceptionMessage(s).str(),"exMessage"));
+                ASSERT(streq(ex.getExceptionSource(s).str(),"exSource"));
+                ASSERT(ex.getSeverity()==SeverityFatal);
+                ASSERT(streq(ex.getTimeStamp(s).str(),"2001"));
+                exNo++;
+            }
 
             Owned<IPTreeIterator> processes = wu->getProcesses("ptype", "pInstance");
             ASSERT(processes->first());
@@ -805,6 +834,7 @@ protected:
             ASSERT(process.getPropInt("@pid", 0)==54321);
             ASSERT(streq(process.queryProp("@log"), "mylog"));
             ASSERT(!processes->next());
+
             ASSERT(wu->getAction() == WUActionCompile);
             ASSERT(streq(wu->getApplicationValue("app1", "av1", s).str(), "value"));
             ASSERT(wu->getApplicationValueInt("app2", "av2", 0) == 42);
@@ -838,6 +868,23 @@ protected:
         }
         end = msTick();
         DBGLOG("%u workunits reread in %d ms", testSize, end-start);
+        start = end;
+        for (i = 0; i < testSize; i++)
+        {
+            Owned<IWorkUnit> wu = factory->updateWorkUnit(wuids.item(i));
+            wu->clearExceptions();
+        }
+        end = msTick();
+        DBGLOG("%u * 10 workunit exceptions cleared in %d ms", testSize, end-start);
+        start = end;
+        for (i = 0; i < testSize; i++)
+        {
+            Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuids.item(i), false);
+            ASSERT(wu->getExceptionCount() == 0);
+        }
+        end = msTick();
+        DBGLOG("%u workunits reread in %d ms", testSize, end-start);
+        start = end;
     }
 
     void testList()

+ 39 - 13
plugins/cassandra/cassandraembed.cpp

@@ -2359,8 +2359,8 @@ public:
     }
     virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
     {
-        // never bound
-        return false;
+        // never bound, but does need to be included in the ?
+        return true;
     }
 } timestampColumnMapper;
 
@@ -3152,10 +3152,10 @@ struct ChildTableInfo
 static const CassandraXmlMapping wuExceptionsMappings [] =
 {
     {"wuid", "text", NULL, rootNameColumnMapper},
+    {"sequence", "int", "@sequence", intColumnMapper},
     {"attributes", "map<text, text>", "", attributeMapColumnMapper},
     {"value", "text", ".", stringColumnMapper},
-    {"ts", "timeuuid", NULL, timestampColumnMapper}, // must be last since we don't bind it, so it would throw out the colidx values of following fields
-    { NULL, "wuExceptions", "((wuid), ts)", stringColumnMapper}
+    { NULL, "wuExceptions", "((wuid), sequence)", stringColumnMapper}
 };
 
 static const ChildTableInfo wuExceptionsTable =
@@ -3656,14 +3656,22 @@ public:
                     const CassandraXmlMapping *table = *dirtyPaths.mapToValue(&iter.query());
                     if (sessionCache->queryTraceLevel()>2)
                         DBGLOG("Updating dirty path %s", path);
-                    IPTree *dirty = p->queryPropTree(path);
-                    if (dirty)
-                        childXMLRowtoCassandra(sessionCache, *batch, table, wuid, *dirty, 0);
-                    else if (sessionCache->queryTraceLevel())
+                    if (*path == '*')
+                    {
+                        sessionCache->deleteChildByWuid(table, wuid, *batch);
+                        childXMLtoCassandra(sessionCache, *batch, table, p, path+1, 0);
+                    }
+                    else
                     {
-                        StringBuffer xml;
-                        toXML(p, xml);
-                        DBGLOG("Missing dirty element %s in %s", path, xml.str());
+                        IPTree *dirty = p->queryPropTree(path);
+                        if (dirty)
+                            childXMLRowtoCassandra(sessionCache, *batch, table, wuid, *dirty, 0);
+                        else if (sessionCache->queryTraceLevel())
+                        {
+                            StringBuffer xml;
+                            toXML(p, xml);
+                            DBGLOG("Missing dirty element %s in %s", path, xml.str());
+                        }
                     }
                 }
             }
@@ -3751,14 +3759,19 @@ public:
     {
         return noteDirty(CLocalWorkUnit::updateVariableByName(name));
     }
+    virtual IWUException *createException()
+    {
+        IWUException *result = CLocalWorkUnit::createException();
+        VStringBuffer xpath("Exceptions/Exception[@sequence='%d']", result->getSequence());
+        noteDirty(xpath, wuExceptionsMappings);
+        return result;
+    }
     virtual void copyWorkUnit(IConstWorkUnit *cached, bool all)
     {
         // Make sure that any required updates to the secondary files happen
         IPropertyTree *fromP = queryExtendedWU(cached)->queryPTree();
         for (const char * const *search = searchPaths; *search; search++)
             trackSecondaryChange(fromP->queryProp(*search), *search);
-        //for (const CassandraXmlMapping * const * mapping = secondaryTables; *mapping; mapping++)
-        //    trackSecondaryChange(fromP->queryProp(mapping[0]->xpath), *mapping);
         for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
             checkChildLoaded(**table);
         CLocalWorkUnit::copyWorkUnit(cached, all);
@@ -3778,6 +3791,19 @@ public:
         CLocalWorkUnit::_loadStatistics();
     }
 
+    virtual void _loadExceptions() const
+    {
+        checkChildLoaded(wuExceptionsTable);        // Lazy populate the Exceptions branch of p from Cassandra
+        CLocalWorkUnit::_loadExceptions();
+    }
+
+    virtual void clearExceptions()
+    {
+        CriticalBlock b(crit);
+        noteDirty("*Exceptions/Exception", wuExceptionsMappings);
+        CLocalWorkUnit::clearExceptions();
+    }
+
     virtual IPropertyTree *queryPTree() const
     {
         // If anyone wants the whole ptree, we'd better make sure we have fully loaded it...