Jelajahi Sumber

HPCC-13735 Add support for sorting/filtering WU lists in Cassandra

Added cache hint support.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 tahun lalu
induk
melakukan
8bd2a62f07
2 mengubah file dengan 437 tambahan dan 123 penghapusan
  1. 77 2
      ecl/wutest/wutest.cpp
  2. 360 121
      plugins/cassandra/cassandraembed.cpp

+ 77 - 2
ecl/wutest/wutest.cpp

@@ -404,7 +404,14 @@ int main(int argc, const char *argv[])
 
 #ifdef _USE_CPPUNIT
 #include "unittests.hpp"
-
+inline int max(int a, int b)
+{
+    return a > b ? a : b;
+}
+inline int min(int a, int b)
+{
+    return a < b ? a : b;
+}
 class WuTest : public CppUnit::TestFixture
 {
     CPPUNIT_TEST_SUITE(WuTest);
@@ -1045,7 +1052,7 @@ protected:
         // Check wild user
         WUSortField filterByWildUser[] = { (WUSortField) (WUSFuser|WUSFwild), WUSFterm };
         start = msTick();
-        wus.setown(factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFwild), filterByWildUser, "WuTestUser0", 0, 10000, NULL, NULL));
+        wus.setown(factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFreverse), filterByWildUser, "WuTestUser0*", 0, 10000, NULL, NULL));
         numIterated = 0;
         ForEach(*wus)
         {
@@ -1057,6 +1064,74 @@ protected:
             numIterated++;
         }
         DBGLOG("%d workunits listed by wild user, descending wuid in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize/50)*10 + min(testSize % 50, 10));
+
+        // Test use of cache/page mechanism - on something needing a postsort
+        start = msTick();
+        __int64 cachehint = 0;
+        numIterated = 0;
+        int startRow = 0;
+        loop
+        {
+            wus.setown(factory->getWorkUnitsSorted((WUSortField)(WUSFstate|WUSFreverse), filterByCluster, "WuTestCluster0", startRow, 1, &cachehint, NULL));
+            if (!wus->first())
+                break;
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryClusterName(), "WuTestCluster0"));
+            if (numIterated)
+                ASSERT(strcmp(wu.queryStateDesc(), prevValue)<=0);
+            prevValue.set(wu.queryStateDesc());
+            numIterated++;
+            ASSERT(!wus->next());
+            startRow++;
+        }
+        DBGLOG("%d workunits filtered by cluster, descending state, page by page in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+4)/5);
+
+        // Test use of cache/page mechanism - on something NOT needing a postsort
+        start = msTick();
+        cachehint = 0;
+        numIterated = 0;
+        startRow = 0;
+        loop
+        {
+            wus.setown(factory->getWorkUnitsSorted((WUSortField)(WUSFwuid|WUSFreverse), filterByCluster, "WuTestCluster0", startRow, 1, &cachehint, NULL));
+            if (!wus->first())
+                break;
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryClusterName(), "WuTestCluster0"));
+            if (numIterated)
+                ASSERT(strcmp(wu.queryWuid(), prevValue)<0);
+            prevValue.set(wu.queryWuid());
+            numIterated++;
+            ASSERT(!wus->next());
+            startRow++;
+        }
+        DBGLOG("%d workunits filtered by cluster, descending wuid, page by page in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+4)/5);
+
+        // Test use of cache/page mechanism - on something NOT needing a postsort, ascending
+        start = msTick();
+        cachehint = 0;
+        numIterated = 0;
+        startRow = 0;
+        loop
+        {
+            wus.setown(factory->getWorkUnitsSorted((WUSortField)(WUSFwuid), filterByCluster, "WuTestCluster0", startRow, 1, &cachehint, NULL));
+            if (!wus->first())
+                break;
+            IConstWorkUnitInfo &wu = wus->query();
+            ASSERT(streq(wu.queryClusterName(), "WuTestCluster0"));
+            if (numIterated)
+                ASSERT(strcmp(wu.queryWuid(), prevValue)>0);
+            prevValue.set(wu.queryWuid());
+            numIterated++;
+            ASSERT(!wus->next());
+            startRow++;
+        }
+        DBGLOG("%d workunits filtered by cluster, ascending wuid, page by page in %d ms", numIterated, msTick()-start);
+        ASSERT(numIterated == (testSize+4)/5);
+
     }
 };
 StringArray WuTest::wuids;

+ 360 - 121
plugins/cassandra/cassandraembed.cpp

@@ -2258,6 +2258,7 @@ extern void cassandraToGenericXML()
 
 //--------------------------------------------
 
+#define CASS_WU_QUERY_EXPIRES  (1000*60*5)
 #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
 #define CASS_SEARCH_PREFIX_SIZE 2
 #define NUM_PARTITIONS 2
@@ -3790,16 +3791,101 @@ private:
     bool descending;
 };
 
-class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIterator
+interface IConstWorkUnitIteratorEx : public IConstWorkUnitIterator
+{
+    virtual bool hasPostFilters() const = 0;
+};
+
+/*
+ *
+ * The cache entries serve two purposes:
+ *
+ * 1. They allow us to map row numbers to values for the end of each page returned, which can make forward paging efficient when not post-sorting
+ * 2. They allow us to preserve post-sort results in order to avoid having to re-retrieve them.
+ */
+
+class CCassandraWuUQueryCacheEntry : public CInterfaceOf<IInterface>
+{
+public:
+    CCassandraWuUQueryCacheEntry()
+    {
+        hint = get_cycles_now(); // MORE - should do better perhaps?
+        lastAccess = msTick();
+    }
+    __int64 queryHint() const
+    {
+        return hint;
+    }
+    void noteWuid(const char *wuid, unsigned row)
+    {
+        CriticalBlock b(crit);
+        ForEachItemInRev(idx, rows)
+        {
+            unsigned foundRow = rows.item(idx);
+            assertex(foundRow != row);
+            if (foundRow < row)
+                break;
+        }
+        rows.add(row, idx+1);
+        wuids.add(wuid, idx+1);
+    }
+    IConstWorkUnitIteratorEx *getResult() const
+    {
+        CriticalBlock b(crit);
+        return result.getLink();
+    }
+    void setResult(IConstWorkUnitIteratorEx *_result)
+    {
+        CriticalBlock b(crit);
+        result.set(_result);
+    }
+    unsigned lookupStartRow(StringBuffer &wuid, unsigned startOffset)
+    {
+        // See if we can provide a base wuid to search above/below
+        CriticalBlock b(crit);
+        ForEachItemInRev(idx, rows)
+        {
+            unsigned foundRow = rows.item(idx);
+            if (foundRow <= startOffset)
+            {
+                wuid.set(wuids.item(idx));
+                return foundRow;
+            }
+        }
+        return 0;
+    }
+    void touch()
+    {
+        lastAccess = msTick();
+    }
+    inline unsigned queryLastAccess() const
+    {
+        return lastAccess;
+    }
+private:
+    mutable CriticalSection crit;  // It's POSSIBLE that we could get two queries in hitting the cache at the same time, I think...
+    UnsignedArray rows;
+    StringArray wuids;
+    Owned<IConstWorkUnitIteratorEx> result;
+    __uint64 hint;
+    unsigned lastAccess;
+};
+
+class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIteratorEx
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CassMultiIterator(unsigned _compareColumn, bool _descending)
+    CassMultiIterator(CCassandraWuUQueryCacheEntry *_cache, unsigned _startRowNum, unsigned _compareColumn, bool _descending)
+    : cache(_cache)
     {
         compareColumn = _compareColumn;
         descending = _descending;
+        startRowNum = _startRowNum;
+    }
+    void setStartOffset(unsigned start)
+    {
+        startRowNum = start; // we managed to do a seek forward via a filter
     }
-
     void addResult(CassandraResult &result)
     {
         results.append(result);
@@ -3814,7 +3900,7 @@ public:
     {
         postFilters.append(filter);
     }
-    bool hasPostFilters() const
+    virtual bool hasPostFilters() const
     {
         return postFilters.length() != 0;
     }
@@ -3826,16 +3912,21 @@ public:
             inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending));
         }
         merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
+        rowNum = startRowNum;
         return next();
     }
     virtual bool next()
     {
-        current.clear();
+        Owned<IConstWorkUnitInfo> last = current.getClear();
         loop
         {
             const CassandraIterator *nextSource = get_row();
             if (!nextSource)
+            {
+                if (cache && last)
+                    cache->noteWuid(last->queryWuid(), rowNum);
                 return false;
+            }
             Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource));
             bool postFiltered = false;
             ForEachItemIn(pfIdx, postFilters)
@@ -3849,6 +3940,7 @@ public:
             if (!postFiltered)
             {
                 current.setown(createConstWorkUnitInfo(*wuXML));
+                rowNum++;
                 return true;
             }
         }
@@ -3862,7 +3954,6 @@ public:
         assertex(current);
         return *current.get();
     }
-
     const CassandraIterator *get_row()
     {
         return (const CassSortableIterator *) merger->nextRow();
@@ -3892,11 +3983,14 @@ private:
     IArrayOf<CassSortableIterator> inputs;
     CIArrayOf<PostFilter> postFilters;
     Owned<IConstWorkUnitInfo> current;
+    Linked<CCassandraWuUQueryCacheEntry> cache;
     unsigned compareColumn;
+    unsigned startRowNum;
+    unsigned rowNum;
     bool descending;
 };
 
-class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIterator>, implements ICompare
+class CassPostSortIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>, implements ICompare
 {
 public:
     CassPostSortIterator(IConstWorkUnitIterator * _input, unsigned _sortorder, unsigned _limit)
@@ -3927,6 +4021,10 @@ public:
     {
         return sorted.item(idx);
     }
+    virtual bool hasPostFilters() const
+    {
+        return false;  // they are done by my input. But we may want to rename this function to indicate "may return more than asked" in which case would be true
+    }
 private:
     void readFirst()
     {
@@ -3984,7 +4082,57 @@ private:
     unsigned limit;
 };
 
-class CassJoinIterator : public CInterface, implements IConstWorkUnitIterator
+class SubPageIterator : public CInterfaceOf<IConstWorkUnitIteratorEx>
+{
+public:
+    SubPageIterator(IConstWorkUnitIteratorEx *_input, unsigned _startOffset, unsigned _pageSize)
+    : input(_input), startOffset(_startOffset), pageSize(_pageSize), idx(0)
+    {
+    }
+    virtual bool first()
+    {
+        idx = 0;
+
+        // MORE - put a seek into the Ex interface
+        if (input->first())
+        {
+            for (int i = 0; i < startOffset;i++)
+            {
+                if (!input->next())
+                    return false;
+            }
+            return true;
+        }
+        else
+            return false;
+    }
+    virtual bool next()
+    {
+        if (idx >= pageSize)
+            return false;
+        idx++;
+        return input->next();
+    }
+    virtual bool isValid()
+    {
+        return idx < pageSize && input->isValid();
+    }
+    virtual IConstWorkUnitInfo & query()
+    {
+        return input->query();
+    }
+    virtual bool hasPostFilters() const
+    {
+        return false;
+    }
+private:
+    Owned<IConstWorkUnitIteratorEx> input;
+    unsigned startOffset;
+    unsigned pageSize;
+    unsigned idx;
+};
+
+class CassJoinIterator : public CInterface, implements IConstWorkUnitIteratorEx
 {
 public:
     IMPLEMENT_IINTERFACE;
@@ -4442,21 +4590,11 @@ protected:
     MapStringTo<const CassandraXmlMapping *> dirtyPaths;
 };
 
-/*
-Streams from N sources...
-Loop through round and round seeing if all the same
-if we get N matches in a row, we have success
-if we are lower than the current seek value, keep on me until >=
-if we are higher, reset match count and use me as new target
-if equal, increase match count and go on to next
-if any reach eof, we are done.
-*/
-
 class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandraSession
 {
     IMPLEMENT_IINTERFACE;
 public:
-    CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now())
+    CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
     {
         StringArray options;
         Owned<IPTreeIterator> it = props->getElements("Option");
@@ -4482,10 +4620,13 @@ public:
         if (cluster.keyspace.isEmpty())
             cluster.keyspace.set("hpcc");
         connect();
+        cacheRetirer.start();
     }
 
     ~CCasssandraWorkUnitFactory()
     {
+        cacheRetirer.stop();
+        cacheRetirer.join();
     }
 
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
@@ -4568,7 +4709,7 @@ public:
         return getWorkUnitsByXXX("@state", getWorkunitStateStr(WUStateScheduled), secmgr, secuser); // MORE - there may be more efficient ways to do this?
     }
     virtual IConstWorkUnitIterator * getWorkUnitsSorted(WUSortField sortorder, WUSortField * filters, const void * filterbuf,
-                                                        unsigned startoffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
+                                                        unsigned startOffset, unsigned pageSize, __int64 * cachehint, unsigned *total,
                                                         ISecManager *secmgr, ISecUser *secuser)
     {
         // Note that we only support a single sort order.
@@ -4584,125 +4725,172 @@ public:
         // Searching by application values are done as if each was a new (user-defined) attribute.
 
         // At present we assume that the order in which filters are provided indicates the best order to apply them - this may not be smart
-        Owned<IConstWorkUnitIterator> result;
+        Owned<CCassandraWuUQueryCacheEntry> cached;
+        if (cachehint && *cachehint)
+        {
+            CriticalBlock b(cacheCrit);
+            cached.set(cacheIdMap.getValue(*cachehint));
+        }
+        if (cached)
+            cached->touch();
+        else
+            cached.setown(new CCassandraWuUQueryCacheEntry());
         const WUSortField *thisFilter = filters;
         CIArrayOf<PostFilter> goodFilters;
         CIArrayOf<PostFilter> wuidFilters;
         CIArrayOf<PostFilter> poorFilters;
         CIArrayOf<PostFilter> remoteWildFilters;
-        Owned<CassMultiIterator> merger = new CassMultiIterator(0, sortorder!=WUSFwuid); // Merge by wuid. We always fetch ordered by wuid desc, except if the order requested is wuid asc.
-        pageSize += startoffset; // Nasty!
-        const char *fv = (const char *) filterbuf;
-        while (*thisFilter)
+        Owned<IConstWorkUnitIteratorEx> result;
+        bool needsPostSort = ((sortorder & 0xff) != WUSFwuid);
+        result.setown(cached->getResult());
+        if (!result)
         {
-            WUSortField field = (WUSortField) (*thisFilter & 0xff);
-            bool isWild = (*thisFilter & WUSFwild) != 0;
-            switch (field)
+            Owned<CassMultiIterator> merger = new CassMultiIterator(needsPostSort ? NULL : cached, 0, 0, sortorder!=WUSFwuid); // Merge by wuid. We always fetch ordered by wuid desc, except if the order requested is wuid asc.
+            if (startOffset > 4)
             {
-            case WUSFuser:
-            case WUSFcluster:
-            case WUSFjob:
-                if (isWild)
-                    remoteWildFilters.append(*new PostFilter(field, fv, true));  // Trailing-only wildcards can be done remotely
-                else
-                    goodFilters.append(*new PostFilter(field, fv, false));
-                break;
-            case WUSFstate:
-            case WUSFpriority:
-            case WUSFprotected:
-                // These can't be wild, but are not very good filters
-                poorFilters.append(*new PostFilter(field, fv, false));
-                break;
-            case WUSFwuid: // Acts as wuidLo when specified as a filter
-            case WUSFwuidhigh:
-                // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
-                mergeFilter(wuidFilters, field, fv);
-                break;
-            case WUSFfileread:
-                UNIMPLEMENTED;
-            case WUSFtotalthortime:
-                UNIMPLEMENTED; // Numeric sort makes it hard to do the filter remotely. And ESP does not presently use. Though it would be pretty handy...
-                break;
-            case WUSFwildwuid:
-                // Translate into a range - note that we only support trailing * wildcard.
-                if (fv && *fv)
+                StringBuffer startWuid;
+                unsigned found = cached->lookupStartRow(startWuid, startOffset);
+                if (found)
+                {
+                    if (sortorder==WUSFwuid)
+                        startWuid.append('\x21');  // we want to find the first wuid AFTER. This is printable but not going to be in any wuid
+                    else
+                        startWuid.setCharAt(startWuid.length()-1, startWuid.charAt(startWuid.length()-1)-1);  // we want to find the last wuid BEFORE
+
+                    wuidFilters.append(*new PostFilter(sortorder==WUSFwuid ? WUSFwuid : WUSFwuidhigh, startWuid, true));
+                    startOffset -= found;
+                    merger->setStartOffset(found);
+                }
+            }
+            const char *fv = (const char *) filterbuf;
+            while (*thisFilter)
+            {
+                WUSortField field = (WUSortField) (*thisFilter & 0xff);
+                bool isWild = (*thisFilter & WUSFwild) != 0;
+
+                switch (field)
                 {
-                    StringBuffer s(fv);
-                    if (s.charAt(s.length()-1)== '*')
-                        s.remove(s.length()-1, 1);
-                    if (s.length())
+                case WUSFuser:
+                case WUSFcluster:
+                case WUSFjob:
+                    if (isWild)
+                    {
+                        StringBuffer s(fv);
+                        if (s.charAt(s.length()-1)== '*')
+                            s.remove(s.length()-1, 1);
+                        if (s.length())
+                            remoteWildFilters.append(*new PostFilter(field, s, true));  // Trailing-only wildcards can be done remotely
+                    }
+                    else
+                        goodFilters.append(*new PostFilter(field, fv, false));
+                    break;
+                case WUSFstate:
+                case WUSFpriority:
+                case WUSFprotected:
+                    // These can't be wild, but are not very good filters
+                    poorFilters.append(*new PostFilter(field, fv, false));
+                    break;
+                case WUSFwuid: // Acts as wuidLo when specified as a filter
+                case WUSFwuidhigh:
+                    // Wuid filters can be added to good and poor filters, and to remoteWild if they are done via merged sets rather than ranges...
+                    mergeFilter(wuidFilters, field, fv);
+                    break;
+                case WUSFfileread:
+                    UNIMPLEMENTED;
+                case WUSFtotalthortime:
+                    UNIMPLEMENTED; // Numeric sort makes it hard to do the filter remotely. And ESP does not presently use. Though it would be pretty handy...
+                    break;
+                case WUSFwildwuid:
+                    // Translate into a range - note that we only support trailing * wildcard.
+                    if (fv && *fv)
                     {
-                        mergeFilter(wuidFilters, WUSFwuid, s);
-                        s.append('\x7e');  // '~' - higher than anything that should occur in a wuid (but still printable)
-                        mergeFilter(wuidFilters, WUSFwuidhigh, s);
+                        StringBuffer s(fv);
+                        if (s.charAt(s.length()-1)== '*')
+                            s.remove(s.length()-1, 1);
+                        if (s.length())
+                        {
+                            mergeFilter(wuidFilters, WUSFwuid, s);
+                            s.append('\x7e');  // '~' - higher than anything that should occur in a wuid (but still printable)
+                            mergeFilter(wuidFilters, WUSFwuidhigh, s);
+                        }
                     }
+                    break;
+                case WUSFcustom:
+                    UNIMPLEMENTED;
+                case WUSFecl: // This is different...
+                    if (isWild)
+                        merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
+                    else
+                        goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
                 }
-                break;
-            case WUSFcustom:
-                UNIMPLEMENTED;
-            case WUSFecl: // This is different...
-                if (isWild)
-                    merger->addPostFilter(*new PostFilter(field, fv, true)); // Wildcards on ECL are trailing and leading - no way to do remotely
-                else
-                    goodFilters.append(*new PostFilter(field, fv, false)); // A hard filter on exact ecl match is possible but very unlikely
+                thisFilter++;
+                fv = fv + strlen(fv)+1;
             }
-            thisFilter++;
-            fv = fv + strlen(fv)+1;
-        }
 
-        if (goodFilters.length())
-        {
-            merger->addPostFilters(goodFilters, 1);
-            merger->addPostFilters(poorFilters, 0);
-            merger->addPostFilters(remoteWildFilters, 0);
-            merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(goodFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
-        }
-        else if (poorFilters.length())
-        {
-            merger->addPostFilters(poorFilters, 1);
-            merger->addPostFilters(remoteWildFilters, 0);
-            merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(poorFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
-        }
-        else if (remoteWildFilters.length())
-        {
-            merger->addPostFilters(remoteWildFilters, 1);  // Any other filters have to be done locally
-            // Convert into a value IN [] which we do via a merge
-            // MORE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
-            StringArray fieldValues;
-            PostFilter &best= remoteWildFilters.item(0);
-            _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
-            ForEachItemIn(idx, fieldValues)
+            if (goodFilters.length())
             {
-                PostFilter p(best.queryField(), best.queryValue(), false);
-                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(p, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
+                merger->addPostFilters(goodFilters, 1);
+                merger->addPostFilters(poorFilters, 0);
+                merger->addPostFilters(remoteWildFilters, 0);
+                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(goodFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
             }
-        }
-        else
-        {
-            // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
-            for (int i = 0; i < NUM_PARTITIONS; i++)
+            else if (poorFilters.length())
             {
-                merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize)));
+                merger->addPostFilters(poorFilters, 1);
+                merger->addPostFilters(remoteWildFilters, 0);
+                merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(poorFilters.item(0), wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+            }
+            else if (remoteWildFilters.length())
+            {
+                merger->addPostFilters(remoteWildFilters, 1);  // Any other filters have to be done locally
+                // Convert into a value IN [] which we do via a merge
+                // MORE - If we want sorted by filter (or don't care about sort order), we could do directly as a range - but the wuid range filters then don't work, and the merger would be invalid
+                StringArray fieldValues;
+                PostFilter &best= remoteWildFilters.item(0);
+                _getUniqueValues(best.queryXPath(), best.queryValue(), fieldValues);
+                ForEachItemIn(idx, fieldValues)
+                {
+                    PostFilter p(best.queryField(), fieldValues.item(idx), false);
+                    merger->addResult(*new CassandraResult(fetchDataForKeyWithFilter(p, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                }
+            }
+            else
+            {
+                // If all we have is a wuid range (or nothing), search the wuid table and/or return everything
+                for (int i = 0; i < NUM_PARTITIONS; i++)
+                {
+                    merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i, wuidFilters, sortorder, merger->hasPostFilters() ? 0 : pageSize+startOffset)));
+                }
             }
-        }
-        result.setown(merger.getClear());
 
-        // The result we have will be sorted by wuid (ascending or descending)
-        if ((sortorder & 0xff) != WUSFwuid)
-        {
-            // A post-sort will be required.
-            // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
-            result.setown(new CassPostSortIterator(result.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
+            // The result we have will be sorted by wuid (ascending or descending)
+            if (needsPostSort)
+            {
+                // A post-sort will be required.
+                // Result should be limited in (to CASS_WORKUNIT_POSTSORT_LIMIT * number of results being merged)
+                result.setown(new CassPostSortIterator(merger.getClear(), sortorder, pageSize > CASS_WORKUNIT_POSTSORT_LIMIT ? pageSize : CASS_WORKUNIT_POSTSORT_LIMIT));
+                cached->setResult(result);
+            }
+            else
+                result.setown(merger.getClear());
         }
-        if (startoffset && result->first())
+        if (startOffset || needsPostSort || result->hasPostFilters()) // we need a subpage if we have fetched anything other than exactly the rows requested
+            result.setown(new SubPageIterator(result.getClear(), startOffset, pageSize));
+        if (cachehint)
         {
-            while (startoffset && result->next())
-                startoffset--;
+            *cachehint = cached->queryHint();
+            CriticalBlock b(cacheCrit);
+            cacheIdMap.setValue(*cachehint, cached.getClear());
         }
-        if (cachehint)
-            *cachehint = 0;
+        // Caching/continuation...
+        // For continuation purposes, it's useful to pass the client something that allows us to give the next page, accurately and efficiently
+        // This will typically be the wuid of the last row (if not sorting) or the sortfield (if sorting)
+        // We would also prefer NOT to recalculate the postsort/postfilter cases if we don't have to - the cachehint allows us to reuse a previous case
+        // We can use a cachehint to retrieve a piece of user state info, that will contain this info.
+        // For caching purposes, we would prefer NOT to recalculate the searches for other users, but I suspect that we really should (since there is no way to know what is a reasonable expiry).
+        // For commonly used pages in eclwatch, esp should itself cache the page (e.g. activity page).
         if (total)
-            *total = 0; // we don't know... (unless we postsort)
+            *total = 0; // We don't know
         return result.getClear();
     }
     virtual StringArray &getUniqueValues(WUSortField field, const char *prefix, StringArray &result) const
@@ -4910,7 +5098,7 @@ private:
     }
     IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
     {
-        Owned<CassMultiIterator> merger = new CassMultiIterator(0, true); // Merge by wuid
+        Owned<CassMultiIterator> merger = new CassMultiIterator(NULL, 0, 0, true); // Merge by wuid
         if (!key || !*key)
         {
             CIArrayOf<PostFilter> wuidFilters;
@@ -5152,8 +5340,7 @@ private:
         default:
             // If not wuid, descending, we will have to post-sort. We still need in wuid desc order for the merge though.
             selectQuery.appendf(" ORDER BY fieldvalue ASC, WUID DESC");
-            if (!limit)
-                limit = CASS_WORKUNIT_POSTSORT_LIMIT;
+            limit = CASS_WORKUNIT_POSTSORT_LIMIT;
             break;
         }
         if (limit)
@@ -5217,14 +5404,66 @@ private:
         check(cass_batch_add_statement(batch, update));
     }
 
+    unsigned retireCache()
+    {
+        CriticalBlock b(cacheCrit); // Is this too coarse-grained?
+        unsigned expires = CASS_WU_QUERY_EXPIRES;
+        unsigned now = msTick();
+        ICopyArrayOf<CCassandraWuUQueryCacheEntry> goers;
+        HashIterator iter(cacheIdMap);
+        ForEach(iter)
+        {
+            CCassandraWuUQueryCacheEntry *entry = cacheIdMap.mapToValue(&iter.query());
+            unsigned age = now - entry->queryLastAccess();
+            int ttl = CASS_WU_QUERY_EXPIRES-age;
+            if (ttl<= 0)
+                goers.append(*entry);
+            else if (ttl< expires)
+                expires = ttl;
+        }
+        ForEachItemIn(idx, goers)
+        {
+            DBGLOG("Expiring cache entry %p", &goers.item(idx));
+            cacheIdMap.remove(goers.item(idx).queryHint());
+        }
+        return expires;
+    }
+
+    class CacheRetirer : public Thread
+    {
+    public:
+        CacheRetirer(CCasssandraWorkUnitFactory &_parent) : Thread("WorkunitListCacheRetirer"), parent(_parent)
+        {
+            stopping = false;
+        }
+        virtual int run()
+        {
+            while (!stopping)
+            {
+                unsigned delay = parent.retireCache();
+                sem.wait(delay);
+            }
+            return 0;
+        }
+        void stop()
+        {
+            stopping = true;
+            sem.signal();
+        }
+    private:
+        Semaphore sem;
+        CCasssandraWorkUnitFactory &parent;
+        bool stopping;
+    } cacheRetirer;
 
     unsigned randomizeSuffix;
     unsigned traceLevel;
     unsigned randState;
     CassandraCluster cluster;
     CassandraSession session;
-    mutable CriticalSection cacheCrit;
+    mutable CriticalSection cacheCrit;  // protects both of the caches below... we could separate
     mutable MapStringToMyClass<CassandraPrepared> preparedCache;
+    mutable MapXToMyClass<__uint64, __uint64, CCassandraWuUQueryCacheEntry> cacheIdMap;
 };