Преглед изворни кода

HPCC-13796 Add support for filtering WUs by file in Cassandra

Store the fileread information in a child table, and create an index that can
be used to lookup wuids from filenames.

When filtering by filename, use the latter table to generate a list of wuids
and then merge and fetch them.

Also fixed some issues in the behaviour of the paging code and the test
program.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 10 година
родитељ
комит
4800aa2383

+ 16 - 6
common/workunit/workunit.cpp

@@ -5947,7 +5947,6 @@ void CLocalWorkUnit::_loadResults() const
 
 void CLocalWorkUnit::loadResults() const
 {
-    CriticalBlock block(crit);
     if (!resultsCached)
     {
         assertex(results.length() == 0);
@@ -6269,13 +6268,22 @@ static void _noteFileRead(IDistributedFile *file, IPropertyTree *filesRead)
     }
 }
 
+void CLocalWorkUnit::_loadFilesRead() const
+{
+    // Nothing to do
+}
+
 void CLocalWorkUnit::noteFileRead(IDistributedFile *file)
 {
-    CriticalBlock block(crit);
-    IPropertyTree *files = p->queryPropTree("FilesRead");
-    if (!files)
-        files = p->addPropTree("FilesRead", createPTree());
-    _noteFileRead(file, files);
+    if (file)
+    {
+        CriticalBlock block(crit);
+        _loadFilesRead();
+        IPropertyTree *files = p->queryPropTree("FilesRead");
+        if (!files)
+            files = p->addPropTree("FilesRead", createPTree());
+        _noteFileRead(file, files);
+    }
 }
 
 static void addFile(IPropertyTree *files, const char *fileName, const char *cluster, unsigned usageCount, WUFileKind fileKind, const char *graphOwner)
@@ -6409,6 +6417,7 @@ IPropertyTreeIterator & CLocalWorkUnit::getFileIterator() const
 IPropertyTreeIterator & CLocalWorkUnit::getFilesReadIterator() const
 {
     CriticalBlock block(crit);
+    _loadFilesRead();
     return * p->getElements("FilesRead/File");
 }
 
@@ -6559,6 +6568,7 @@ unsigned CLocalWorkUnit::getGraphCount() const
 unsigned CLocalWorkUnit::getSourceFileCount() const
 {
     CriticalBlock block(crit);
+    _loadFilesRead();
     if (p->hasProp("FilesRead"))
     {
         return p->queryPropTree("FilesRead")->numChildren();

+ 1 - 0
common/workunit/workunit.ipp

@@ -558,6 +558,7 @@ protected:
     virtual void _lockRemote() {};
     virtual void _unlockRemote() {};
     virtual void unsubscribe();
+    virtual void _loadFilesRead() const;
     virtual void _loadResults() const;
     virtual void _loadStatistics() const;
     virtual void _loadExceptions() const;

+ 60 - 5
ecl/wutest/wutest.cpp

@@ -423,6 +423,7 @@ class WuTest : public CppUnit::TestFixture
         CPPUNIT_TEST(testList2);
         CPPUNIT_TEST(testListByAppValue);
         CPPUNIT_TEST(testListByAppValueWild);
+        CPPUNIT_TEST(testListByFilesRead);
         CPPUNIT_TEST(testSet);
         CPPUNIT_TEST(testDelete);
         CPPUNIT_TEST(testCopy);
@@ -458,6 +459,17 @@ protected:
             wu->setApplicationValue("appname", "userId", userId.str(), true);
             wu->setApplicationValue("appname2", "clusterName", clusterName.str(), true);
             wuids.append(wu->queryWuid());
+
+            // We should reall be doing a noteFileRead here but the API is such a pain that we'll do it this way
+            IPropertyTree *p = queryExtendedWU(wu)->queryPTree();
+            VStringBuffer fileinfo(" <FilesRead>"
+                "  <File name='myfile%02d' useCount='2' cluster = 'mycluster'/>"
+                "  <File name='mysuperfile' useCount='2' cluster = 'mycluster'>"
+                "   <Subfile name='myfile%02d'/>"
+                "  </File>"
+                " </FilesRead>", i % 10, i % 10);
+            p->setPropTree("FilesRead", createPTreeFromXMLString(fileinfo));
+            wu->noteFileRead(NULL); // Make sure we notice that it was modified
         }
         unsigned after = factory->numWorkUnits();
         DBGLOG("%u workunits created in %d ms (%d total)", testSize, msTick()-start, after);
@@ -901,6 +913,23 @@ protected:
             ASSERT(streq(wu->queryWuScope(), "scope"));
             ASSERT(streq(wu->getSnapshot(s).str(),"snap"));
             ASSERT(wu->getWarningSeverity(1234, SeverityInformation) == SeverityFatal);
+
+            ASSERT(wu->getSourceFileCount()==2);
+            Owned<IPropertyTreeIterator> sourceFiles = &wu->getFilesReadIterator();
+            ForEach(*sourceFiles)
+            {
+                IPTree &file = sourceFiles->query();
+                ASSERT(file.getPropInt("@useCount") == 2);
+                if (streq(file.queryProp("@name"), "mysuperfile"))
+                {
+                    ASSERT(strncmp(file.queryProp("Subfile/@name"), "myfile", 6)==0);
+                }
+                else
+                {
+                    ASSERT(strncmp(file.queryProp("@name"), "myfile", 6)==0);
+                    ASSERT(!file.hasProp("Subfile"));
+                }
+            }
         }
         end = msTick();
         DBGLOG("%u workunits reread in %d ms", testSize, end-start);
@@ -1125,7 +1154,8 @@ protected:
                 ASSERT(strcmp(wu.queryStateDesc(), prevValue)<=0);
             prevValue.set(wu.queryStateDesc());
             numIterated++;
-            ASSERT(!wus->next());
+            bool nextSeen = wus->next();
+            ASSERT(!nextSeen);
             startRow++;
         }
         DBGLOG("%d workunits filtered by cluster, descending state, page by page in %d ms", numIterated, msTick()-start);
@@ -1147,7 +1177,8 @@ protected:
                 ASSERT(strcmp(wu.queryWuid(), prevValue)<0);
             prevValue.set(wu.queryWuid());
             numIterated++;
-            ASSERT(!wus->next());
+            bool nextSeen = wus->next();
+            ASSERT(!nextSeen);
             startRow++;
         }
         DBGLOG("%d workunits filtered by cluster, descending wuid, page by page in %d ms", numIterated, msTick()-start);
@@ -1169,7 +1200,8 @@ protected:
                 ASSERT(strcmp(wu.queryWuid(), prevValue)>0);
             prevValue.set(wu.queryWuid());
             numIterated++;
-            ASSERT(!wus->next());
+            bool nextSeen = wus->next();
+            ASSERT(!nextSeen);
             startRow++;
         }
         DBGLOG("%d workunits filtered by cluster, ascending wuid, page by page in %d ms", numIterated, msTick()-start);
@@ -1200,7 +1232,9 @@ protected:
                 ASSERT(wu.getTotalThorTime()<=prevThorTime);
             prevThorTime = wu.getTotalThorTime();
             numIterated++;
-            ASSERT(!wus->next());
+            bool nextSeen = wus->next();
+            ASSERT(!nextSeen);
+            wus.clear();
             startRow++;
         }
         DBGLOG("%d workunits descending thortime, page by page in %d ms", numIterated, msTick()-start);
@@ -1230,7 +1264,6 @@ protected:
     void testListByAppValueWild()
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-        bool isDali = streq(factory->queryStoreType(), "Dali");
         unsigned start = msTick();
         unsigned numIterated = 0;
         // Test filter by appValue
@@ -1250,6 +1283,28 @@ protected:
         ASSERT(numIterated == testSize);
         numIterated++;
     }
+    void testListByFilesRead()
+    {
+        Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        unsigned start = msTick();
+        unsigned numIterated = 0;
+        // Test filter by filesRead
+        WUSortField filterByFilesRead[] = { WUSFfileread, WUSFterm };
+        start = msTick();
+        StringAttr prevValue;
+        Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFreverse), filterByFilesRead, "myfile00", 0, 10000, NULL, NULL);
+        ForEach(*wus)
+        {
+            IConstWorkUnitInfo &wu = wus->query();
+            if (numIterated)
+                ASSERT(strcmp(wu.queryWuid(), prevValue)<0);
+            prevValue.set(wu.queryWuid());
+            numIterated++;
+        }
+        DBGLOG("%d workunits by fileread wild in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+9)/10);
+        numIterated++;
+    }
 };
 StringArray WuTest::wuids;
 

+ 3 - 3
esp/services/ws_workunits/ws_workunitsHelpers.cpp

@@ -247,7 +247,7 @@ void WsWuInfo::getSourceFiles(IEspECLWorkunit &info, unsigned flags)
                 bool bFound = false;
                 if (fileName && *fileName && (fileNames.length() > 0))
                 {
-                    for (unsigned i = 0; i < fileNames.length(); i++ )
+                    for (unsigned i = 0; i < fileNames.length(); i++ ) // MORE - unnecessary n^2 process
                     {
                         const char *fileName0 = fileNames.item(i);
                         if (!stricmp(fileName, fileName0))
@@ -1706,7 +1706,7 @@ void WsWuInfo::getSubFiles(IPropertyTreeIterator* f, IEspECLSourceFile* eclSuper
         int fileCount = query.getPropInt("@useCount");
 
         bool bFound = false;
-        if (fileName && *fileName && (fileNames.length() > 0))
+        if (fileName && *fileName && (fileNames.length() > 0)) // MORE - this is an n^2 process and as far as I can tell unnecessary as there will be no dups
         {
             for (unsigned i = 0; i < fileNames.length(); i++ )
             {
@@ -1736,7 +1736,7 @@ void WsWuInfo::getSubFiles(IPropertyTreeIterator* f, IEspECLSourceFile* eclSuper
 
         file->setCount(fileCount);
 
-        Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
+        Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile"); // We do not store subfiles of subfiles like this - so this code will never be triggered
         if (filetrees->first())
         {
             file->setIsSuperFile(true);

+ 190 - 30
plugins/cassandra/cassandrawu.cpp

@@ -32,6 +32,7 @@
 #include "jsort.hpp"
 #include "jptree.hpp"
 #include "jregexp.hpp"
+#include "dadfs.hpp"
 
 #include "workunit.hpp"
 #include "workunit.ipp"
@@ -1131,7 +1132,7 @@ public:
     }
     virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
     {
-        Owned<IPTree> map = createPTree(name);
+        Owned<IPTree> map = name ? createPTree(name) : LINK(row);
         CassandraIterator elems(cass_iterator_from_collection(value));
         while (cass_iterator_next(elems))
         {
@@ -1139,7 +1140,8 @@ public:
             stringColumnMapper.toXML(child, nameAttr, cass_iterator_get_value(elems));
             map->addPropTree(elemName, child.getClear());
         }
-        row->addPropTree(name, map.getClear());
+        if (name)
+            row->addPropTree(name, map.getClear());
         return row;
     }
     virtual bool fromXML(CassandraStatement *statement, unsigned idx, IPTree *row, const char *name, const char *userVal)
@@ -1171,7 +1173,7 @@ public:
 private:
     const char *elemName;
     const char *nameAttr;
-} pluginListColumnMapper("Plugin", "@dllname");
+} pluginListColumnMapper("Plugin", "@dllname"), subfileListColumnMapper("Subfile", "@name");
 
 struct CassandraXmlMapping
 {
@@ -1215,7 +1217,7 @@ static const CassandraXmlMapping workunitsMappings [] =
 
     // These are catchalls for anything not processed above or in a child table
 
-    {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
+    {"elements", "map<text, text>", "@Action@Application@Debug@Exceptions@FilesRead@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
     {"subtrees", "map<text, text>", "@Process@Tracing@", subTreeMapColumnMapper},  // name is the INCLUSION list, note trailing @
 
     { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
@@ -1236,7 +1238,7 @@ static const CassandraXmlMapping workunitInfoMappings [] =  // A cut down versio
     {"protected", "boolean", "@protected", boolColumnMapper},
     {"scheduled", "text", "@timeScheduled", stringColumnMapper},   // Should store as a date?
     {"totalThorTime", "text", "@totalThorTime", stringColumnMapper},  // We store in the wu ptree as a collatable string. Need to force to one partition too
-    {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
+    {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
     { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
 };
 
@@ -1259,7 +1261,7 @@ static const CassandraXmlMapping searchMappings [] =
     {"protected", "boolean", "@protected", boolColumnMapper},
     {"scheduled", "text", "@timeScheduled", stringColumnMapper},   // Should store as a date?
     {"totalThorTime", "text", "@totalThorTime", stringColumnMapper},  // We store in the wu ptree as a collatable string. Need to force to one partition too
-    {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper}, // MORE - change to a custom map to make searchable
+    {"appvalues", "map<text, text>", "@Application@", subTreeMapColumnMapper},
     { NULL, "workunitsSearch", "((xpath, fieldPrefix), fieldValue, wuid)|CLUSTERING ORDER BY (fieldValue ASC, wuid DESC)", stringColumnMapper}
 };
 
@@ -1281,6 +1283,12 @@ static const CassandraXmlMapping uniqueSearchMappings [] =
 
 const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL};
 
+static const CassandraXmlMapping filesReadSearchMappings [] =
+{
+    {"name", "text", "@name", stringColumnMapper},
+    {"wuid", "text", NULL, suppliedStringColumnMapper},
+    { NULL, "filesReadSearchValues", "((name), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
+};
 
 /*
  * Some thoughts on the secondary tables:
@@ -1296,7 +1304,7 @@ const char * wildSearchPaths[] = { "@submitID", "@clusterName", "@jobName", NULL
 
 // The following describe child tables - all keyed by wuid
 
-enum ChildTablesEnum { WuExceptionsChild, WuStatisticsChild, WuGraphProgressChild, WuResultsChild, WuVariablesChild, ChildTablesSize };
+enum ChildTablesEnum { WuExceptionsChild, WuStatisticsChild, WuGraphProgressChild, WuResultsChild, WuVariablesChild, WuFilesReadChild,ChildTablesSize };
 
 struct ChildTableInfo
 {
@@ -1409,8 +1417,26 @@ static const ChildTableInfo wuVariablesTable =
     wuVariablesMappings
 };
 
+static const CassandraXmlMapping wuFilesReadMappings [] =
+{
+    {"partition", "int", NULL, hashRootNameColumnMapper},
+    {"wuid", "text", NULL, rootNameColumnMapper},
+    {"name", "text", "@name", stringColumnMapper},
+    {"cluster", "text", "@cluster", stringColumnMapper},
+    {"useCount", "int", "@useCount", intColumnMapper}, // MORE - could think about using a counter column, but would mess up the commit paradigm
+    {"subfiles", "list<text>", NULL, subfileListColumnMapper},
+    { NULL, "wuFilesRead", "((partition, wuid), name)", stringColumnMapper}
+};
+
+static const ChildTableInfo wuFilesReadTable =
+{
+    "FilesRead", "File",
+    WuFilesReadChild,
+    wuFilesReadMappings
+};
+
 // Order should match the enum above
-static const ChildTableInfo * const childTables [] = { &wuExceptionsTable, &wuStatisticsTable, &wuGraphProgressTable, &wuResultsTable, &wuVariablesTable, NULL };
+static const ChildTableInfo * const childTables [] = { &wuExceptionsTable, &wuStatisticsTable, &wuGraphProgressTable, &wuResultsTable, &wuVariablesTable, &wuFilesReadTable, NULL };
 
 interface ICassandraSession : public IInterface  // MORE - rename!
 {
@@ -1418,7 +1444,7 @@ interface ICassandraSession : public IInterface  // MORE - rename!
     virtual CassandraPrepared *prepareStatement(const char *query) const = 0;
     virtual unsigned queryTraceLevel() const = 0;
 
-    virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid) const = 0;
+    virtual const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const = 0;
     virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
 };
 
@@ -1544,6 +1570,24 @@ extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *ba
     check(cass_batch_add_statement(batch, update));
 }
 
+extern void deleteSimpleXML(const ICassandraSession *session, CassBatch *batch, const CassandraXmlMapping *mappings, IPTree *inXML, const char *userVal = NULL)
+{
+    StringBuffer names;
+    StringBuffer tableName;
+    getFieldNames(mappings, names, tableName);
+    VStringBuffer deleteQuery("DELETE from %s where name=? and wuid=?", tableName.str());
+    Owned<CassandraPrepared> prepared = session->prepareStatement(deleteQuery);
+    CassandraStatement update(cass_prepared_bind(*prepared));
+    unsigned bindidx = 0;
+    while (mappings->columnName)
+    {
+        if (mappings->mapper.fromXML(&update, bindidx, inXML, mappings->xpath, userVal))
+            bindidx++;
+        mappings++;
+    }
+    check(cass_batch_add_statement(batch, update));
+}
+
 extern void addUniqueValue(const ICassandraSession *session, CassBatch *batch, const char *xpath, const char *value)
 {
     StringBuffer bindings;
@@ -1904,8 +1948,8 @@ private:
 class CassSortableIterator : public CassandraIterator
 {
 public:
-    CassSortableIterator(CassIterator *_iterator, unsigned _compareColumn, bool _descending)
-    : CassandraIterator(_iterator), compareColumn(_compareColumn), descending(_descending)
+    CassSortableIterator(CassIterator *_iterator, unsigned _idx, int _compareColumn, bool _descending)
+    : CassandraIterator(_iterator), idx(_idx), compareColumn(_compareColumn), descending(_descending)
     {
 
     }
@@ -1913,8 +1957,11 @@ public:
     {
         if (iterator && cass_iterator_next(iterator))
         {
-            const CassRow *row = cass_iterator_get_row(iterator);
-            getCassString(value.clear(), cass_row_get_column(row, compareColumn));
+            if (compareColumn != -1)
+            {
+                const CassRow *row = cass_iterator_get_row(iterator);
+                getCassString(value.clear(), cass_row_get_column(row, compareColumn));
+            }
             return this;
         }
         else
@@ -1927,12 +1974,15 @@ public:
     }
     int compare(const CassSortableIterator *to) const
     {
+        if (compareColumn==-1)
+            return idx - to->idx;  // concat mode
         int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
         return descending ? -ret : ret;
     }
 private:
     StringBuffer value;
-    unsigned compareColumn;
+    unsigned idx;
+    int compareColumn;
     bool descending;
 };
 
@@ -1940,6 +1990,7 @@ interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
 {
     virtual bool hasPostFilters() const = 0;
     virtual bool isMerging() const = 0;
+    virtual void notePosition() const = 0;
 };
 
 /*
@@ -1970,7 +2021,12 @@ public:
         ForEachItemInRev(idx, rows)
         {
             unsigned foundRow = rows.item(idx);
-            assertex(foundRow != row);
+            if (foundRow==row)
+            {
+                assert(streq(wuids.item(idx), wuid));
+                assert(streq(fieldValues.item(idx), fieldValue));
+                return;
+            }
             if (foundRow < row)
                 break;
         }
@@ -2026,7 +2082,7 @@ class CassMultiIterator : public CInterface, implements IRowProvider, implements
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, unsigned _compareColumn, bool _descending)
+    CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, int _compareColumn, bool _descending)
     : cache(_cache)
     {
         compareColumn = _compareColumn;
@@ -2037,6 +2093,11 @@ public:
     {
         startRowNum = start; // we managed to do a seek forward via a filter
     }
+    void setCompareColumn(int _compareColumn)
+    {
+        assert(!inputs.length());
+        compareColumn = _compareColumn;
+    }
     void addResult(CassandraResult &result)
     {
         results.append(result);
@@ -2064,12 +2125,19 @@ public:
         inputs.kill();
         ForEachItemIn(idx, results)
         {
-            inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending));
+            inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending));
         }
         merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
         rowNum = startRowNum;
         return next();
     }
+    virtual void notePosition() const
+    {
+        if (cache && current)
+        {
+            cache->noteWuid(current->queryWuid(), lastThorTime, rowNum);
+        }
+    }
     virtual bool next()
     {
         Owned<IConstWorkUnitInfo> last = current.getClear();
@@ -2143,7 +2211,7 @@ private:
     Owned<IConstWorkUnitInfo> current;
     Linked<CCassandraWuUQueryCacheEntry> cache;
     StringAttr lastThorTime;
-    unsigned compareColumn;
+    int compareColumn;
     unsigned startRowNum;
     unsigned rowNum;
     bool descending;
@@ -2170,7 +2238,12 @@ public:
     virtual bool next()
     {
         idx++;
-        return sorted.isItem(idx);
+        if (sorted.isItem(idx))
+            return true;
+        return false;
+    }
+    virtual void notePosition() const
+    {
     }
     virtual bool isValid()
     {
@@ -2271,11 +2344,18 @@ public:
     }
     virtual bool next()
     {
+        idx++;
         if (idx >= pageSize)
+        {
+            input->notePosition();
             return false;
-        idx++;
+        }
         return input->next();
     }
+    virtual void notePosition() const
+    {
+        input->notePosition();
+    }
     virtual bool isValid()
     {
         return idx < pageSize && input->isValid();
@@ -2326,7 +2406,7 @@ public:
         inputs.kill();
         ForEachItemIn(idx, results)
         {
-            Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending);
+            Owned <CassSortableIterator> input = new CassSortableIterator(cass_iterator_from_result(results.item(idx)), idx, compareColumn, descending);
             if (!input->nextRow())
                 return false;
             inputs.append(*input.getClear());
@@ -2480,6 +2560,7 @@ public:
                 wuVariablesXMLtoCassandra(sessionCache, *batch, p, "Temporaries/Variable", "-3"); // ResultSequenceInternal // NOTE - lookups may also request ResultSequenceOnce
                 childXMLtoCassandra(sessionCache, *batch, wuExceptionsMappings, p, "Exceptions/Exception", 0);
                 childXMLtoCassandra(sessionCache, *batch, wuStatisticsMappings, p, "Statistics/Statistic", 0);
+                childXMLtoCassandra(sessionCache, *batch, wuFilesReadMappings, p, "FilesRead/File", 0);
             }
             else
             {
@@ -2618,6 +2699,31 @@ public:
         memset(childLoaded, 1, sizeof(childLoaded));
         allDirty = true;
     }
+    virtual void noteFileRead(IDistributedFile *file)
+    {
+        if (file)
+        {
+            CLocalWorkUnit::noteFileRead(file);
+            VStringBuffer xpath("FilesRead/File[@name='%s']", file->queryLogicalName());
+            noteDirty(xpath, wuFilesReadMappings);
+        }
+        else
+        {
+            // A hack for testing!
+            Owned<IPropertyTreeIterator> files = p->getElements("FilesRead/File");
+            ForEach(*files)
+            {
+                VStringBuffer xpath("FilesRead/File[@name='%s']", files->query().queryProp("@name"));
+                noteDirty(xpath, wuFilesReadMappings);
+            }
+        }
+    }
+
+    virtual void _loadFilesRead() const
+    {
+        checkChildLoaded(wuFilesReadTable);        // Lazy populate the FilesRead branch of p from Cassandra
+        CLocalWorkUnit::_loadFilesRead();
+    }
 
     virtual void _loadResults() const
     {
@@ -2666,7 +2772,7 @@ protected:
         // NOTE - should be called inside critsec
         if (!childLoaded[childTable.index])
         {
-            CassandraResult result(sessionCache->fetchDataForWuid(childTable.mappings, queryWuid()));
+            CassandraResult result(sessionCache->fetchDataForWuid(childTable.mappings, queryWuid(), false));
             Owned<IPTree> results;
             CassandraIterator rows(cass_iterator_from_result(result));
             while (cass_iterator_next(rows))
@@ -2734,6 +2840,13 @@ protected:
         for (const char * const *search = searchPaths; *search; search++)
             deleteSecondaryByKey(*search, p->queryProp(*search), wuid, sessionCache, *batch);
         deleteAppSecondaries(*p, wuid);
+        Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
+        ForEach(*filesRead)
+        {
+            IPTree &file = filesRead->query();
+            deleteSimpleXML(sessionCache, *batch, filesReadSearchMappings, &file, wuid);
+        }
+        // MORE deleteFilesReadSecondaries(*p, wuid);
     }
 
     void updateSecondaries(const char *wuid)
@@ -2759,6 +2872,12 @@ protected:
             addUniqueValue(sessionCache, *batch, xpath, val.queryValue());  // Used to get lists of values for a given app and name, and for filtering
             simpleXMLtoCassandra(sessionCache, *batch, searchMappings, p, xpath);
         }
+        Owned<IPropertyTreeIterator> filesRead = &getFilesReadIterator();
+        ForEach(*filesRead)
+        {
+            IPTree &file = filesRead->query();
+            simpleXMLtoCassandra(sessionCache, *batch, filesReadSearchMappings, &file, wuid);
+        }
     }
 
     // Keep track of previously committed values for fields that we have a secondary table for, so that we can update them appropriately when we commit
@@ -2987,6 +3106,7 @@ public:
         IArrayOf<IPostFilter> goodFilters;
         IArrayOf<IPostFilter> wuidFilters;
         IArrayOf<IPostFilter> poorFilters;
+        IArrayOf<IPostFilter> fileFilters;
         IArrayOf<IPostFilter> remoteWildFilters;
         Owned<IConstWorkUnitIteratorEx> result;
         WUSortField baseSort = (WUSortField) (sortorder & 0xff);
@@ -3069,7 +3189,8 @@ public:
                         mergeFilter(wuidFilters, field, fv);
                     break;
                 case WUSFfileread:
-                    UNIMPLEMENTED;
+                    fileFilters.append(*new PostFilter(field, fv, true));
+                    break;
                 case WUSFtotalthortime:
                     // This should be treated as a low value - i.e. return only wu's that took longer than the supplied value
                     if (thorTimeThreshold.isEmpty()) // If not a continuation
@@ -3101,8 +3222,25 @@ public:
                 thisFilter++;
                 fv = fv + strlen(fv)+1;
             }
-
-            if (sortByThorTime)
+            if (fileFilters.length())
+            {
+                // We can't postfilter by these - we COULD in some cases do a join between these and some other filtered set
+                // but we will leave that as an exercise to the reader. So if there is a fileFilter, read it first, and turn it into a merge set of the resulting wus.
+                assertex(fileFilters.length()==1);  // If we supported more there would be a join phase here
+                merger->addPostFilters(goodFilters, 0);
+                merger->addPostFilters(poorFilters, 0);
+                merger->addPostFilters(remoteWildFilters, 0);
+                CassandraResult wuids(fetchDataForFileRead(fileFilters.item(0).queryValue(), wuidFilters, 0));
+                CassandraIterator rows(cass_iterator_from_result(wuids));
+                StringBuffer value;
+                while (cass_iterator_next(rows))
+                {
+                    const CassRow *row = cass_iterator_get_row(rows);
+                    getCassString(value.clear(), cass_row_get_column(row, 0));
+                    merger->addResult(*new CassandraResult(fetchDataForWuid(workunitInfoMappings, value, true)));
+                }
+            }
+            else if (sortByThorTime)
             {
                 merger->addPostFilters(goodFilters, 0);
                 merger->addPostFilters(poorFilters, 0);
@@ -3116,6 +3254,7 @@ public:
                     assertex(wuidFilters.length()==1);
                     merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, wuidFilters.item(0).queryValue(), sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
                     merger->addResult(*new CassandraResult(fetchMoreDataByThorTime(thorTimeThreshold, NULL, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                    merger->setCompareColumn(-1);  // we want to preserve the order of these two results
                 }
                 else
                     merger->addResult(*new CassandraResult(fetchDataByThorTime(thorTimeThreshold, sortDescending, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
@@ -3276,7 +3415,7 @@ public:
         // 2. Check that there are no orphaned entries in search or child tables
         errCount += checkOrphans(searchMappings, 3, batch);
         for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
-            errCount += checkOrphans(table[0]->mappings, 0, batch);
+            errCount += checkOrphans(table[0]->mappings, 1, batch);
         // 3. Commit fixes
         if (batch)
         {
@@ -3311,6 +3450,7 @@ public:
         ensureTable(session, workunitsMappings);
         ensureTable(session, searchMappings);
         ensureTable(session, uniqueSearchMappings);
+        ensureTable(session, filesReadSearchMappings);
         for (const ChildTableInfo * const * table = childTables; *table != NULL; table++)
             ensureTable(session, table[0]->mappings);
     }
@@ -3490,7 +3630,7 @@ private:
 
     IPTree *cassandraToWorkunitXML(const char *wuid) const
     {
-        CassandraResult result(fetchDataForWuid(workunitsMappings, wuid));
+        CassandraResult result(fetchDataForWuid(workunitsMappings, wuid, false));
         CassandraIterator rows(cass_iterator_from_result(result));
         if (cass_iterator_next(rows)) // should just be one
         {
@@ -3520,7 +3660,6 @@ private:
         StringBuffer tableName;
         getFieldNames(mappings, names, tableName);
         VStringBuffer selectQuery("select %s from %s;", names.str()+1, tableName.str());
-        selectQuery.append(';');
         if (traceLevel >= 2)
             DBGLOG("%s", selectQuery.str());
         CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
@@ -3567,12 +3706,12 @@ private:
 
     // Fetch matching rows from a child table, or the main wu table
 
-    const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid) const
+    const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid, bool includeWuid) const
     {
         assertex(wuid && *wuid);
         StringBuffer names;
         StringBuffer tableName;
-        getFieldNames(mappings+2, names, tableName);  // mappings+1 means we don't return the partition or wuid columns
+        getFieldNames(mappings + (includeWuid ? 1 : 2), names, tableName);  // mappings+2 means we don't return the partition or wuid columns
         VStringBuffer selectQuery("select %s from %s where partition=%d and wuid='%s';", names.str()+1, tableName.str(), rtlHash32VStr(wuid, 0) % NUM_PARTITIONS, wuid); // MORE - should consider using prepared/bind for this - is it faster?
         if (traceLevel >= 2)
             DBGLOG("%s", selectQuery.str());
@@ -3715,6 +3854,27 @@ private:
         return executeQuery(session, statement);
     }
 
+    // Fetch rows from the file search table
+
+    const CassResult *fetchDataForFileRead(const char *name, const IArrayOf<IPostFilter> &wuidFilters, unsigned limit) const
+    {
+        StringBuffer names;
+        StringBuffer tableName;
+        getFieldNames(filesReadSearchMappings+1, names, tableName);  // mappings+3 means we don't return the key column (name)
+        VStringBuffer selectQuery("select %s from %s where name='%s'", names.str()+1, tableName.str(), name);
+        ForEachItemIn(idx, wuidFilters)
+        {
+            const IPostFilter &wuidFilter = wuidFilters.item(idx);
+            selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
+        }
+        if (limit)
+            selectQuery.appendf(" LIMIT %u", limit);
+        if (traceLevel >= 2)
+            DBGLOG("%s", selectQuery.str());
+        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
+        return executeQuery(session, statement);
+    }
+
     // Fetch matching rows from the search table, for a single wuid
 
     const CassResult *fetchDataForKeyAndWuid(const char *xpath, const char *key, const char *wuid) const