瀏覽代碼

HPCC-12251 Create cassandra plugin for workunit storage

More testing, including validateRepository() code

Various refactoring.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
928d4878cf

+ 8 - 22
common/workunit/workunit.cpp

@@ -2535,18 +2535,6 @@ void CWorkUnitFactory::clearAborting(const char *wuid)
     }
 }
 
-unsigned CWorkUnitFactory::numWorkUnitsFiltered(WUSortField *filters,
-                                    const void *filterbuf,
-                                    ISecManager *secmgr,
-                                    ISecUser *secuser)
-{
-    if (!filters && !secuser && !secmgr)
-        return numWorkUnits();
-    unsigned total;
-    Owned<IConstWorkUnitIterator> iter =  getWorkUnitsSorted( NULL,filters,filterbuf,0,0x7fffffff,NULL,NULL,&total,secmgr,secuser);
-    return total;
-}
-
 static CriticalSection deleteDllLock;
 static Owned<IWorkQueueThread> deleteDllWorkQ;
 
@@ -2581,7 +2569,10 @@ public:
     {
         removeShutdownHook(*this);
     }
-
+    virtual unsigned validateRepository(bool fixErrors)
+    {
+        return 0;
+    }
     virtual CLocalWorkUnit *_createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
     {
         StringBuffer wuRoot;
@@ -3029,6 +3020,10 @@ public:
         : baseFactory(_baseFactory), defaultSecMgr(_secMgr), defaultSecUser(_secUser)
     {
     }
+    virtual unsigned validateRepository(bool fix)
+    {
+        return baseFactory->validateRepository(fix);
+    }
     virtual IWorkUnit* createNamedWorkUnit(const char *wuid, const char *app, const char *user, ISecManager *secMgr, ISecUser *secUser)
     {
         if (!secMgr) secMgr = defaultSecMgr.get();
@@ -3144,15 +3139,6 @@ public:
         return baseFactory->numWorkUnits();
     }
 
-    virtual unsigned numWorkUnitsFiltered(WUSortField *filters,
-                                        const void *filterbuf,
-                                        ISecManager *secMgr, ISecUser *secUser)
-    {
-        if (!secMgr) secMgr = defaultSecMgr.get();
-        if (!secUser) secUser = defaultSecUser.get();
-        return baseFactory->numWorkUnitsFiltered(filters, filterbuf, secMgr, secUser);
-    }
-
     virtual bool isAborting(const char *wuid) const
     {
         return baseFactory->isAborting(wuid);

+ 2 - 1
common/workunit/workunit.hpp

@@ -1274,12 +1274,13 @@ interface IWorkUnitFactory : extends IInterface
                                                         unsigned startoffset, unsigned maxnum, const char * queryowner, __int64 * cachehint, unsigned *total,
                                                         ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual unsigned numWorkUnits() = 0;
-    virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual void descheduleAllWorkUnits(ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) = 0;
     virtual bool isAborting(const char *wuid) const = 0;
     virtual void clearAborting(const char *wuid) = 0;
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState) = 0;
+
+    virtual unsigned validateRepository(bool fixErrors) = 0;
 };
 
 interface IWorkflowScheduleConnection : extends IInterface

+ 0 - 1
common/workunit/workunit.ipp

@@ -596,7 +596,6 @@ public:
                                                 ISecManager *secmgr,
                                                 ISecUser *secuser) = 0;
     virtual unsigned numWorkUnits() = 0;
-    virtual unsigned numWorkUnitsFiltered(WUSortField *filters, const void *filterbuf, ISecManager *secmgr, ISecUser *secuser);
     virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser);
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset);
     virtual bool isAborting(const char *wuid) const;

+ 0 - 11
dali/dfu/dfuwu.cpp

@@ -3107,17 +3107,6 @@ public:
         IPropertyTree *root = conn->queryRoot();
         return root->numChildren();
     }
-
-    virtual unsigned numWorkUnitsFiltered(DFUsortfield *filters,const void *filterbuf)
-    {
-        if (!filters)
-            return numWorkUnits();
-        unsigned total;
-        Owned<IConstDFUWorkUnitIterator> iter = getWorkUnitsSorted( NULL,filters,filterbuf,0,0x7fffffff,NULL,NULL,&total);
-        return total;
-    }
-
-
 };
 
 IDFUWorkUnitFactory * getDFUWorkUnitFactory()

+ 0 - 1
dali/dfu/dfuwu.hpp

@@ -450,7 +450,6 @@ interface IDFUWorkUnitFactory : extends IInterface
                                                         __int64 *cachehint,         // set to NULL if caching not required
                                                         unsigned *total) = 0;       // set to NULL if caching not required
     virtual unsigned numWorkUnits()=0;
-    virtual unsigned numWorkUnitsFiltered(DFUsortfield *filters,const void *filterbuf)=0;
     virtual __int64  subscribe(const char *xpath,void *iface) =0;       // internal use
 };
 

+ 45 - 11
ecl/wutest/wutest.cpp

@@ -34,6 +34,8 @@
 #include <cppunit/ui/text/TestRunner.h>
 #endif
 
+static unsigned testSize = 1000;
+
 void usage()
 {
     printf("Usage: WUTEST action [WUID=xxx] [OWNER=xxx]\n\n"
@@ -45,7 +47,8 @@ void usage()
            "   archive [TO=<directory>] [DEL=1] [KEEPFILERESULTS=1]\n"
            "   restore [FROM=<directory>]\n"
            "   pack\n"
-           "   unpack\n");
+           "   unpack\n"
+           "   validate [fix=1]\n");
 }
 
 bool dump(IConstWorkUnit &w, IProperties *globals)
@@ -217,6 +220,7 @@ int main(int argc, const char *argv[])
 #ifdef _USE_CPPUNIT
         else if (action && (stricmp(action, "-selftest")==0))
         {
+            testSize = globals->getPropInt("testSize", 1000);
             queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
             CppUnit::TextUi::TestRunner runner;
             CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry("WuTest");
@@ -225,6 +229,12 @@ int main(int argc, const char *argv[])
             return wasSucessful;
         }
 #endif
+        else if (action && (stricmp(action, "validate")==0))
+        {
+            bool fix = globals->getPropBool("fix", false);
+            unsigned errors = factory->validateRepository(fix);
+            printf("%u errors %s\n", errors, (fix && errors) ? "fixed" : "found");
+        }
         else if (action && (stricmp(action, "orphans")==0 || stricmp(action, "cleanup")==0))
         {
             factory->setTracingLevel(0);
@@ -401,8 +411,8 @@ class WuTest : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE(WuTest);
         CPPUNIT_TEST(testCreate);
-        CPPUNIT_TEST(testDelete);
         CPPUNIT_TEST(testList);
+        CPPUNIT_TEST(testDelete);
     CPPUNIT_TEST_SUITE_END();
 protected:
     static StringArray wuids;
@@ -411,37 +421,61 @@ protected:
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         unsigned before = factory->numWorkUnits();
         unsigned start = msTick();
-        for (int i = 0; i < 1000; i++)
+        for (int i = 0; i < testSize; i++)
         {
-            VStringBuffer userId("user%d", i % 10);
+            VStringBuffer userId("WuTestUser%d", i % 10);
             Owned<IWorkUnit>wu = factory->createWorkUnit("WuTest", NULL, NULL, NULL);
             wu->setState(WUStateFailed);
             wu->setUser(userId);
             wuids.append(wu->queryWuid());
         }
         unsigned after = factory->numWorkUnits();
-        DBGLOG("1000 workunits created in %d ms (%d total)", msTick()-start, after);
-        ASSERT(after-before==1000);
-        ASSERT(wuids.length() == 1000);
+        DBGLOG("%u workunits created in %d ms (%d total)", testSize, msTick()-start, after);
+        ASSERT(after-before==testSize);
+        ASSERT(wuids.length() == testSize);
     }
 
     void testDelete()
     {
-        ASSERT(wuids.length() == 1000);
+        ASSERT(wuids.length() == testSize);
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
         unsigned before = factory->numWorkUnits();
         unsigned start = msTick();
-        for (int i = 0; i < 1000; i++)
+        for (int i = 0; i < testSize; i++)
         {
             factory->deleteWorkUnit(wuids.item(i));
         }
         unsigned after = factory->numWorkUnits();
-        DBGLOG("1000 workunits deleted in %d ms (%d remain)", msTick()-start, after);
-        ASSERT(before-after==1000);
+        DBGLOG("%u workunits deleted in %d ms (%d remain)", testSize, msTick()-start, after);
+        ASSERT(before-after==testSize);
     }
 
     void testList()
     {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        unsigned before = factory->numWorkUnits();
+        unsigned start = msTick();
+        unsigned numIterated = 0;
+        Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsByOwner(NULL, NULL, NULL);
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            numIterated++;
+        }
+        DBGLOG("%d workunits listed in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == before);
+        // Now by owner
+        wus.setown(factory->getWorkUnitsByOwner("WuTestUser0", NULL, NULL));
+        start = msTick();
+        numIterated = 0;
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryUser(), "WuTestUser0"));
+            numIterated++;
+        }
+        DBGLOG("%d workunits listed in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+9)/10);
     }
 };
 StringArray WuTest::wuids;

+ 290 - 96
plugins/cassandra/cassandraembed.cpp

@@ -322,6 +322,9 @@ public:
     CassandraStatement(CassStatement *_statement) : statement(_statement)
     {
     }
+    CassandraStatement(const char *simple) : statement(cass_statement_new(cass_string_init(simple), 0))
+    {
+    }
     ~CassandraStatement()
     {
         if (statement)
@@ -2273,6 +2276,22 @@ public:
     }
 } rootNameColumnMapper;
 
+// WuidColumnMapper is used for columns containing a wuid that is NOT in the resulting XML - it
+// is an error to try to map such a column to/from the XML representation
+
+class WuidColumnMapper : implements CassandraColumnMapper
+{
+public:
+    virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
+    {
+        throwUnexpected();
+    }
+    virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, int userVal)
+    {
+        throwUnexpected();
+    }
+} wuidColumnMapper;
+
 class GraphIdColumnMapper : implements CassandraColumnMapper
 {
 public:
@@ -2679,7 +2698,7 @@ struct CassandraXmlMapping
 
 const CassandraXmlMapping wuExceptionsMappings [] =
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"wuid", "text", NULL, wuidColumnMapper},
     {"attributes", "map<text, text>", "", attributeMapColumnMapper},
     {"value", "text", ".", stringColumnMapper},
     {"ts", "timeuuid", NULL, timestampColumnMapper}, // must be last since we don't bind it, so it would throw out the colidx values of following fields
@@ -2688,7 +2707,7 @@ const CassandraXmlMapping wuExceptionsMappings [] =
 
 const CassandraXmlMapping wuStatisticsMappings [] =
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"wuid", "text", NULL, wuidColumnMapper},
     {"kind", "text", "@kind", stringColumnMapper},
     {"attributes", "map<text, text>", "@kind", attributeMapColumnMapper},
     { NULL, "wuStatistics", "((wuid), kind)", stringColumnMapper}
@@ -2717,8 +2736,8 @@ const CassandraXmlMapping workunitsMappings [] =
 
 const CassandraXmlMapping ownerMappings [] =
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},
     {"submitID", "text", "@submitID", stringColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
     {"priorityclass", "int", "@priorityClass", intColumnMapper},
@@ -2730,8 +2749,7 @@ const CassandraXmlMapping ownerMappings [] =
 
 const CassandraXmlMapping workunitInfoMappings [] =  // A cut down version of the workunit mappings - used when querying with no key
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},  // First field is not retrieved when querying...
-    {"wuid", "text", NULL, rootNameColumnMapper},  // But when we are using this mapping, we need it... simplest way to do that is just to repeat it here
+    {"wuid", "text", NULL, rootNameColumnMapper},
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
     {"priorityclass", "int", "@priorityClass", intColumnMapper},
@@ -2744,7 +2762,7 @@ const CassandraXmlMapping workunitInfoMappings [] =  // A cut down version of th
 
 const CassandraXmlMapping graphProgressMappings [] =
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"wuid", "text", NULL, wuidColumnMapper},
     {"graphID", "text", NULL, graphIdColumnMapper},
     {"progress", "blob", NULL, progressColumnMapper},  // NOTE - order of these is significant - this creates the subtree that ones below will modify
     {"subgraphID", "text", "@id", subgraphIdColumnMapper},
@@ -2754,7 +2772,7 @@ const CassandraXmlMapping graphProgressMappings [] =
 
 const CassandraXmlMapping wuResultsMappings [] =
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"wuid", "text", NULL, wuidColumnMapper},
     {"sequence", "int", "@sequence", intColumnMapper},
     {"name", "text", "@name", stringColumnMapper},
     {"format", "text", "@format", stringColumnMapper},  // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
@@ -2771,7 +2789,7 @@ const CassandraXmlMapping wuResultsMappings [] =
 
 const CassandraXmlMapping wuVariablesMappings [] =
 {
-    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"wuid", "text", NULL, wuidColumnMapper},
     {"sequence", "int", "@sequence", defaultedIntColumnMapper},  // Note - should be either variable or temporary...
     {"name", "text", "@name", requiredStringColumnMapper},
     {"format", "text", "@format", stringColumnMapper},  // xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted
@@ -2811,6 +2829,13 @@ void getFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, Str
     tableName.append(mappings->columnType);
 }
 
+const char *queryTableName(const CassandraXmlMapping *mappings)
+{
+    while (mappings->columnName)
+        mappings++;
+    return mappings->columnType;
+}
+
 StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &out)
 {
     StringBuffer fields;
@@ -2822,11 +2847,18 @@ StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &
     return out.appendf("CREATE TABLE IF NOT EXISTS HPCC.%s (%s PRIMARY KEY %s);", mappings->columnType, fields.str(), mappings->xpath);
 }
 
+const CassResult *executeQuery(CassSession *session, CassStatement *statement)
+{
+    CassandraFuture future(cass_session_execute(session, statement));
+    future.wait("executeQuery");
+    return cass_future_get_result(future);
+}
+
 const CassResult *fetchDataForKey(const char *key, CassSession *session, const CassandraXmlMapping *mappings)
 {
     StringBuffer names;
     StringBuffer tableName;
-    getFieldNames(mappings+1, names, tableName);  // mappings+1 means we don't return the key column
+    getFieldNames(mappings+(key?1:0), names, tableName);  // mappings+1 means we don't return the key column
     VStringBuffer selectQuery("select %s from HPCC.%s", names.str()+1, tableName.str());
     if (key)
         selectQuery.appendf(" where %s='%s'", mappings->columnName, key); // MORE - should consider using prepared for this - is it faster?
@@ -2834,9 +2866,48 @@ const CassResult *fetchDataForKey(const char *key, CassSession *session, const C
     //if (traceLevel >= 2)
     //    DBGLOG("%s", selectQuery.str());
     CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
-    CassandraFuture future(cass_session_execute(session, statement));
-    future.wait("execute");
-    return cass_future_get_result(future);
+    return executeQuery(session, statement);
+}
+
+const CassResult *fetchDataForKeyAndWuid(const char *key, const char *wuid, CassSession *session, const CassandraXmlMapping *mappings)
+{
+    StringBuffer names;
+    StringBuffer tableName;
+    getFieldNames(mappings+2, names, tableName);  // mappings+1 means we don't return the key column
+    VStringBuffer selectQuery("select %s from HPCC.%s where %s='%s' and wuid='%s'", names.str()+1, tableName.str(), mappings->columnName, key, wuid); // MORE - should consider using prepared/bind for this - is it faster?
+    selectQuery.append(';');
+    //if (traceLevel >= 2)
+    //    DBGLOG("%s", selectQuery.str());
+    CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
+    return executeQuery(session, statement);
+}
+
+void deleteSecondaryByKey(const CassandraXmlMapping *mappings, const char *wuid, const char *key, const ICassandraSession *sessionCache, CassBatch *batch)
+{
+    if (key && *key)
+    {
+        StringBuffer names;
+        StringBuffer tableName;
+        getFieldNames(mappings, names, tableName);
+        VStringBuffer insertQuery("DELETE from HPCC.%s where %s=? and wuid=?;", tableName.str(), mappings[0].columnName);
+        Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(insertQuery);
+        CassandraStatement update(cass_prepared_bind(*prepared));
+        check(cass_statement_bind_string(update, 0, cass_string_init(key)));
+        check(cass_statement_bind_string(update, 1, cass_string_init(wuid)));
+        check(cass_batch_add_statement(batch, update));
+    }
+}
+
+void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, const ICassandraSession *sessionCache, CassBatch *batch)
+{
+    StringBuffer names;
+    StringBuffer tableName;
+    getFieldNames(mappings, names, tableName);
+    VStringBuffer insertQuery("DELETE from HPCC.%s where wuid=?;", tableName.str());
+    Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(insertQuery);
+    CassandraStatement update(cass_prepared_bind(*prepared));
+    check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
+    check(cass_batch_add_statement(batch, update));
 }
 
 void executeSimpleCommand(CassSession *session, const char *command)
@@ -2866,8 +2937,8 @@ static void getBindingsString(const CassandraXmlMapping *mappings, StringBuffer
 
 static void bindFromXML(const CassandraXmlMapping *mappings, CassStatement *statement, IPTree *inXML, int defaultValue)
 {
-    unsigned colidx = 1;
-    unsigned bindidx = 1;
+    unsigned colidx = 0;
+    unsigned bindidx = 0;
     while (mappings[colidx].columnName)
     {
         if (mappings[colidx].mapper.fromXML(statement, bindidx, inXML, mappings[colidx].xpath, defaultValue))
@@ -2876,7 +2947,7 @@ static void bindFromXML(const CassandraXmlMapping *mappings, CassStatement *stat
     }
 }
 
-extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree *inXML)
+extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML)
 {
     StringBuffer names;
     StringBuffer bindings;
@@ -2885,35 +2956,10 @@ extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *ba
     VStringBuffer insertQuery("INSERT into HPCC.%s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
     Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
     CassandraStatement update(cass_prepared_bind(*prepared));
-    check(cass_statement_bind_string(update, 0, cass_string_init(wuid))); // wuid is first column of all tables
     bindFromXML(mappings, update, inXML, 0);
     check(cass_batch_add_statement(batch, update));
 }
 
-void deleteSecondaryByKey(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, const char *key)
-{
-    if (key && *key)
-    {
-        StringBuffer names;
-        StringBuffer tableName;
-        getFieldNames(mappings, names, tableName);
-        VStringBuffer insertQuery("DELETE from HPCC.%s where wuid=? and %s=?;", tableName.str(), mappings[1].columnName);
-        Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
-        CassandraStatement update(cass_prepared_bind(*prepared));
-        check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
-        check(cass_statement_bind_string(update, 1, cass_string_init(key)));
-        check(cass_batch_add_statement(batch, update));
-    }
-}
-
-void updateSecondaryTable(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTree *inXML,const char *prevKey)
-{
-    if (prevKey && *prevKey)
-        deleteSecondaryByKey(session, batch, mappings, wuid, prevKey);
-    if (inXML->hasProp(mappings[1].xpath))
-        simpleXMLtoCassandra(session, batch, mappings, wuid, inXML);
-}
-
 extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, const char *wuid, IPTreeIterator *elements, int defaultValue)
 {
     if (elements->first())
@@ -2942,30 +2988,6 @@ extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *bat
     childXMLtoCassandra(session, batch, mappings, inXML->queryName(), elements, defaultValue);
 }
 
-
-extern void cassandraToChildXML(CassSession *session, const CassandraXmlMapping *mappings, const char *wuid, IPTree *wuTree, const char *parentName, const char *childName)
-{
-    CassandraResult result(fetchDataForKey(wuid, session, mappings));
-    Owned<IPTree> parent = createPTree(parentName);
-    CassandraIterator rows(cass_iterator_from_result(result));
-    while (cass_iterator_next(rows))
-    {
-        CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
-        Owned<IPTree> child = createPTree(childName);
-        unsigned colidx = 1;
-        while (cass_iterator_next(cols))
-        {
-            assertex(mappings[colidx].columnName);
-            const CassValue *value = cass_iterator_get_column(cols);
-            if (value && !cass_value_is_null(value))
-                mappings[colidx].mapper.toXML(child, mappings[colidx].xpath, value);
-            colidx++;
-        }
-        parent->addPropTree(childName, child.getClear());
-    }
-    wuTree->addPropTree(parentName, parent.getClear());
-}
-
 extern void wuResultsXMLtoCassandra(const ICassandraSession *session, CassBatch *batch, IPTree *inXML, const char *xpath)
 {
     childXMLtoCassandra(session, batch, wuResultsMappings, inXML, xpath, 0);
@@ -3176,6 +3198,12 @@ static const CassValue *getSingleResult(const CassResult *result)
         return NULL;
 }
 
+static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
+{
+    CassString output;
+    check(cass_value_get_string(value, &output));
+    return str.append(output.length, output.data);
+}
 
 /*
 extern void cassandraTestGraphProgressXML()
@@ -3230,11 +3258,12 @@ public:
         CLocalWorkUnit::cleanupAndDelete(deldll, deleteOwned, deleteExclusions);
         if (!batch)
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
+        deleteChildren(wuid);
+        deleteSecondaries(wuid);
         Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from HPCC.workunits where wuid=?;");
         CassandraStatement update(cass_prepared_bind(*prepared));
         check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
         check(cass_batch_add_statement(*batch, update));
-        deleteSecondaryByKey(sessionCache, *batch, ownerMappings, wuid, queryUser());
         CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
         futureBatch.wait("execute");
         batch.clear();
@@ -3252,12 +3281,15 @@ public:
             const char *wuid = queryWuid();
             if (basicDirty)
             {
-                updateSecondaryTable(sessionCache, *batch, ownerMappings, wuid, p, prevOwner);
+                updateSecondaries(wuid);
             }
-            simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, wuid, p);  // This just does the parent row
+            simpleXMLtoCassandra(sessionCache, *batch, workunitsMappings, p);  // This just does the parent row
             if (allDirty)
             {
-                // MORE - should probably delete all prior records in these tables where wuid=wuid
+                // MORE - these deletes are technically correct, but if we assert that the only place that copyWorkUnit is used is to populate an
+                // empty WU from XML text, they are unnecessary.
+                deleteChildren(wuid);
+
                 wuResultsXMLtoCassandra(sessionCache, *batch, p, "Results/Result");
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Variables/Variable", ResultSequenceStored);
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Temporaries/Variable", ResultSequenceInternal); // NOTE - lookups may also request ResultSequenceOnce
@@ -3281,16 +3313,10 @@ public:
 
     virtual void setUser(const char *user)
     {
-        if (!user)
-            user="";
-        const char *current = queryUser();
-        if (streq(current, user))
-            return;  // No change
-        if (!prevOwner) // We need to record the last _committed_ value so we can update the secondary tables appropriately
-            prevOwner.set(queryUser());
-        basicDirty = true;
-        CLocalWorkUnit::setUser(user);
+        if (trackSecondaryChange(user, queryUser(), prevOwner))
+            CLocalWorkUnit::setUser(user);
     }
+
     virtual void _lockRemote()
     {
         // Ignore locking for now!
@@ -3346,6 +3372,8 @@ public:
 
     virtual void copyWorkUnit(IConstWorkUnit *cached, bool all)
     {
+        // Make sure that any required updates to the secondary files happen
+        trackSecondaryChange(cached->queryUser(), queryUser(), prevOwner);
         // This populates entire XML tree - so we need to note that we did so to ensure everything is flushed by commit
         CLocalWorkUnit::copyWorkUnit(cached, all);
         allDirty = true;
@@ -3361,6 +3389,51 @@ public:
     }
 
 protected:
+    // Delete child table rows
+
+    void deleteChildren(const char *wuid)
+    {
+        deleteChildByWuid(wuResultsMappings, wuid, sessionCache, *batch);
+        deleteChildByWuid(wuVariablesMappings, wuid, sessionCache, *batch);
+        deleteChildByWuid(wuExceptionsMappings, wuid, sessionCache, *batch);
+        deleteChildByWuid(wuStatisticsMappings, wuid, sessionCache, *batch);
+    }
+
+    // Update secondary tables (used to search wuids by orner, state, jobname etc)
+
+    void updateSecondaryTable(const CassandraXmlMapping *mappings, const char *wuid, const char *prevKey)
+    {
+        deleteSecondaryByKey(mappings, wuid, prevKey, sessionCache, *batch);
+        if (p->hasProp(mappings[0].xpath))
+            simpleXMLtoCassandra(sessionCache, *batch, mappings, p);
+    }
+
+    void deleteSecondaries(const char *wuid)
+    {
+        deleteSecondaryByKey(ownerMappings, wuid, queryUser(), sessionCache, *batch);
+    }
+
+    void updateSecondaries(const char *wuid)
+    {
+        updateSecondaryTable(ownerMappings, wuid, prevOwner);
+    }
+
+    // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
+
+    bool trackSecondaryChange(const char *newval, const char *oldval, StringAttr &tracker)
+    {
+        if (!newval)
+            newval = "";
+        if (streq(newval, oldval))
+            return false;  // No change
+        if (!tracker) // We need to record the last _committed_ value so we can update the secondary tables appropriately
+            tracker.set(oldval);
+        basicDirty = true;
+        return true;
+    }
+
+    // Allows us to iterate over an array of IPTrees - MORE this could be in jptree? Should save the trees not the results I suspect.
+
     class ResultPTreeIterator : implements CInterfaceOf<IPTreeIterator>
     {
     public:
@@ -3550,7 +3623,6 @@ public:
         return getUnsignedResult(NULL, getSingleResult(result));
     }
     /*
-    virtual unsigned numWorkUnitsFiltered(WUSortField * filters, const void * filterbuf, ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual void descheduleAllWorkUnits(ISecManager *secmgr, ISecUser *secuser) { UNIMPLEMENTED; }
     virtual IConstQuerySetQueryIterator * getQuerySetQueriesSorted(WUQuerySortField *sortorder, WUQuerySortField *filters, const void *filterbuf, unsigned startoffset, unsigned maxnum, __int64 *cachehint, unsigned *total, const MapStringTo<bool> *subset) { UNIMPLEMENTED; }
     virtual bool isAborting(const char *wuid) const { UNIMPLEMENTED; }
@@ -3631,34 +3703,160 @@ public:
             Sleep(1000); // MORE - may want to back off as waited gets longer...
         }
     }
+
+    unsigned validateRepository(bool fix)
+    {
+        unsigned errCount = 0;
+        // MORE - if the batch gets too big you may need to flush it occasionally
+        CassandraBatch batch(fix ? cass_batch_new(CASS_BATCH_TYPE_LOGGED) : NULL);
+        // 1. Check that every entry in main wu table has matching entries in secondary tables
+        CassandraResult result(fetchDataForKey(NULL, session, workunitInfoMappings));
+        CassandraIterator rows(cass_iterator_from_result(result));
+        while (cass_iterator_next(rows))
+        {
+            Owned<IPTree> wuXML = rowToPTree(NULL, workunitInfoMappings, cass_iterator_get_row(rows));
+            const char *wuid = wuXML->queryName();
+            // For each secondary file, check that we get matching XML
+            errCount += validateSecondary(ownerMappings, wuid, wuXML, batch);
+        }
+        // 2. Check that there are no orphaned entries in secondary or child tables
+        errCount += checkOrphans(ownerMappings, 1, batch);
+        errCount += checkOrphans(wuResultsMappings, 0, batch);
+        errCount += checkOrphans(wuVariablesMappings, 0, batch);
+        errCount += checkOrphans(wuExceptionsMappings, 0, batch);
+        errCount += checkOrphans(wuStatisticsMappings, 0, batch);
+        // 3. Commit fixes
+        if (batch)
+        {
+            CassandraFuture futureBatch(cass_session_execute_batch(session, batch));
+            futureBatch.wait("Fix_repository");
+        }
+        return errCount;
+    }
+
 private:
+    bool checkWuExists(const char *wuid)
+    {
+        Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM HPCC.workunits where wuid=?;");
+        CassandraStatement statement(cass_prepared_bind(*prepared));
+        cass_statement_bind_string(statement, 0, cass_string_init(wuid));
+        CassandraFuture future(cass_session_execute(session, statement));
+        future.wait("select count(*)");
+        CassandraResult result(cass_future_get_result(future));
+        return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
+    }
+
     IConstWorkUnitIterator * getWorkUnitsByXXX(const CassandraXmlMapping *mappings, const char *key, ISecManager *secmgr, ISecUser *secuser)
     {
-        if (!key)
+        if (!key || !*key)
             mappings=workunitInfoMappings;   // Historically, providing no value on a call to getWorkUnitsByOwner (for example) filter meant unfiltered...
         CassandraResult result(fetchDataForKey(key, session, mappings));
         Owned<IPTree> parent = createPTree("WorkUnits");
         CassandraIterator rows(cass_iterator_from_result(result));
         while (cass_iterator_next(rows))
         {
-            CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
-            Owned<IPTree> child = createPTree("wuid");  // Gets overwritten below when wuid field is processed
-            unsigned colidx = 1;
-            while (cass_iterator_next(cols))
-            {
-                assertex(mappings[colidx].columnName);
-                const CassValue *value = cass_iterator_get_column(cols);
-                if (value && !cass_value_is_null(value))
-                    mappings[colidx].mapper.toXML(child, mappings[colidx].xpath, value);
-                colidx++;
-            }
-            const char *childName = child->queryName();
-            parent->addPropTree(childName, child.getClear());
+            Owned<IPTree> wuXML = rowToPTree(key, mappings, cass_iterator_get_row(rows));
+            const char *wuid = wuXML->queryName();
+            parent->addPropTree(wuid, wuXML.getClear());
         }
         Owned<IPropertyTreeIterator> iter = parent->getElements("*");
         return createConstWUIterator(iter, secmgr, secuser);
     }
 
+    unsigned validateSecondary(const CassandraXmlMapping *mappings, const char *wuid, IPTree *wuXML, CassBatch *batch)
+    {
+        unsigned errCount = 0;
+        const char *childKey = wuXML->queryProp(mappings->xpath);
+        if (childKey && *childKey)
+        {
+            CassandraResult result(fetchDataForKeyAndWuid(childKey, wuid, session, mappings));
+            switch (cass_result_row_count(result))
+            {
+            case 0:
+                DBGLOG("Missing secondary data in %s for wuid=%s %s=%s", queryTableName(mappings), wuid, mappings->columnName, childKey);
+                if (batch)
+                    simpleXMLtoCassandra(this, batch, mappings, wuXML);
+                errCount++;
+                break;
+            case 1:
+            {
+                Owned<IPTree> secXML = rowToPTree(NULL, mappings+2, cass_result_first_row(result));   // wuid and key not returned
+                secXML->setProp(mappings->xpath, childKey);
+                secXML->renameProp("/", wuid);
+                if (!areMatchingPTrees(wuXML, secXML))
+                {
+                    DBGLOG("Mismatched data in %s for wuid %s", queryTableName(mappings), wuid);
+                    if (batch)
+                        simpleXMLtoCassandra(this, batch, mappings, wuXML);
+                    errCount++;
+                }
+                break;
+            }
+            default:
+                DBGLOG("Multiple secondary data %d in %s for wuid %s", (int) cass_result_row_count(result), queryTableName(mappings), wuid); // This should be impossible!
+                if (batch)
+                {
+                    deleteSecondaryByKey(mappings, wuid, childKey, this, batch);
+                    simpleXMLtoCassandra(this, batch, mappings, wuXML);
+                }
+                break;
+            }
+        }
+        return errCount;
+    }
+
+    unsigned checkOrphans(const CassandraXmlMapping *mappings, unsigned wuidIndex, CassBatch *batch)
+    {
+        unsigned errCount = 0;
+        CassandraResult result(fetchDataForKey(NULL, session, mappings));
+        CassandraIterator rows(cass_iterator_from_result(result));
+        while (cass_iterator_next(rows))
+        {
+            const CassRow *row = cass_iterator_get_row(rows);
+            StringBuffer wuid;
+            getCassString(wuid, cass_row_get_column(row, wuidIndex));
+            if (!checkWuExists(wuid))
+            {
+                DBGLOG("Orphaned data in %s for wuid=%s", queryTableName(mappings), wuid.str());
+                if (batch)
+                {
+                    if (wuidIndex)
+                    {
+                        StringBuffer key;
+                        getCassString(key, cass_row_get_column(row, 0));
+                        deleteSecondaryByKey(mappings, wuid, key, this, batch);
+                    }
+                    else
+                        deleteChildByWuid(mappings, wuid, this, batch);
+                }
+                errCount++;
+            }
+        }
+        return errCount;
+    }
+
+
+    IPTree *rowToPTree(const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
+    {
+        CassandraIterator cols(cass_iterator_from_row(row));
+        Owned<IPTree> xml = createPTree("row");  // May be overwritten below if wuid field is processed
+        if (key && *key)
+        {
+            xml->setProp(mappings->xpath, key);
+            mappings++;
+        }
+        while (cass_iterator_next(cols))
+        {
+            assertex(mappings->columnName);
+            const CassValue *value = cass_iterator_get_column(cols);
+            if (value && !cass_value_is_null(value))
+                mappings->mapper.toXML(xml, mappings->xpath, value);
+            mappings++;
+        }
+        return xml.getClear();
+
+    }
+
     unsigned randomizeSuffix;
     unsigned traceLevel;
     CassandraCluster cluster;
@@ -3675,7 +3873,3 @@ extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const IPropertyTree *p
     return new cassandraembed::CCasssandraWorkUnitFactory(props);
 }
 
-extern EXPORT void forceLinkCassandraEmbed()
-{
-    // Makes debugging easier...
-}