소스 검색

HPCC-12251 Create cassandra plugin for workunit storage

Use partition and merge for wuid lookups.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 년 전
부모
커밋
7bc7a8dec7
4개의 변경된 파일269개의 추가작업 그리고 60개의 파일을 삭제
  1. 14 0
      common/workunit/workunit.cpp
  2. 2 0
      common/workunit/workunit.hpp
  3. 7 0
      ecl/wutest/wutest.cpp
  4. 246 60
      plugins/cassandra/cassandraembed.cpp

+ 14 - 0
common/workunit/workunit.cpp

@@ -1066,6 +1066,11 @@ protected:
     bool _isAborting;
 };
 
+extern IConstWorkUnitInfo *createConstWorkUnitInfo(IPropertyTree &p)
+{
+    return new CLightweightWorkunitInfo(p);
+}
+
 class CDaliWorkUnit : public CLocalWorkUnit
 {
 public:
@@ -2585,6 +2590,11 @@ public:
     {
         // Nothing to do
     }
+    virtual const char *queryStoreType() const
+    {
+        return "Dali";
+    }
+
     virtual CLocalWorkUnit *_createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
     {
         StringBuffer wuRoot;
@@ -3042,6 +3052,10 @@ public:
     {
         return baseFactory->createRepository();
     }
+    virtual const char *queryStoreType() const
+    {
+        return baseFactory->queryStoreType();
+    }
 
     virtual IWorkUnit* createNamedWorkUnit(const char *wuid, const char *app, const char *user, ISecManager *secMgr, ISecUser *secUser)
     {

+ 2 - 0
common/workunit/workunit.hpp

@@ -1287,6 +1287,7 @@ interface IWorkUnitFactory : extends IInterface
     virtual unsigned validateRepository(bool fixErrors) = 0;
     virtual void deleteRepository(bool recreate) = 0;
     virtual void createRepository() = 0;  // If not already there...
+    virtual const char *queryStoreType() const = 0; // Returns "Dali" or "Cassandra"
 };
 
 interface IWorkflowScheduleConnection : extends IInterface
@@ -1376,6 +1377,7 @@ extern WORKUNIT_API void setWorkUnitFactory(IWorkUnitFactory *_factory);
 extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory();
 extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory(ISecManager *secmgr, ISecUser *secuser);
 extern WORKUNIT_API ILocalWorkUnit* createLocalWorkUnit(const char *XML);
+extern WORKUNIT_API IConstWorkUnitInfo *createConstWorkUnitInfo(IPropertyTree &p);
 extern WORKUNIT_API StringBuffer &exportWorkUnitToXML(const IConstWorkUnit *wu, StringBuffer &str, bool unpack, bool includeProgress, bool hidePasswords);
 extern WORKUNIT_API void exportWorkUnitToXMLFile(const IConstWorkUnit *wu, const char * filename, unsigned extraXmlFlags, bool unpack, bool includeProgress, bool hidePasswords);
 extern WORKUNIT_API void submitWorkUnit(const char *wuid, const char *username, const char *password);

+ 7 - 0
ecl/wutest/wutest.cpp

@@ -441,7 +441,9 @@ protected:
         DBGLOG("%u workunits created in %d ms (%d total)", testSize, msTick()-start, after);
         ASSERT(after-before==testSize);
         ASSERT(wuids.length() == testSize);
+        start = msTick();
         ASSERT(factory->validateRepository(false)==0);
+        DBGLOG("%u workunits validated in %d ms", after, msTick()-start);
     }
 
     void testCopy()
@@ -890,13 +892,18 @@ protected:
     void testList()
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+        bool isDali = streq(factory->queryStoreType(), "Dali");
         unsigned before = factory->numWorkUnits();
         unsigned start = msTick();
         unsigned numIterated = 0;
         Owned<IConstWorkUnitIterator> wus = factory->getWorkUnitsByOwner(NULL, NULL, NULL);
+        StringBuffer lastWu;
         ForEach(*wus)
         {
             IConstWorkUnitInfo &wu = wus->query();
+            if (lastWu.length() && !isDali)  // Dali does not define the order here
+                ASSERT(strcmp(wu.queryWuid(), lastWu) <= 0);
+            lastWu.set(wu.queryWuid());
             numIterated++;
         }
         DBGLOG("%d workunits listed in %d ms", numIterated, msTick()-start);

+ 246 - 60
plugins/cassandra/cassandraembed.cpp

@@ -29,6 +29,7 @@
 #include "rtlembed.hpp"
 #include "roxiemem.hpp"
 #include "nbcd.hpp"
+#include "jsort.hpp"
 #include "jptree.hpp"
 
 #include "workunit.hpp"
@@ -453,7 +454,7 @@ private:
     const CassPrepared *prepared;
 };
 
-class CassandraResult : public CInterface
+class CassandraResult : public CInterfaceOf<IInterface>
 {
 public:
     CassandraResult(const CassResult *_result) : result(_result)
@@ -473,7 +474,7 @@ private:
     const CassResult *result;
 };
 
-class CassandraIterator : public CInterface
+class CassandraIterator : public CInterfaceOf<IInterface>
 {
 public:
     CassandraIterator(CassIterator *_iterator) : iterator(_iterator)
@@ -484,11 +485,17 @@ public:
         if (iterator)
             cass_iterator_free(iterator);
     }
+    inline void set(CassIterator *_iterator)
+    {
+        if (iterator)
+            cass_iterator_free(iterator);
+        iterator = _iterator;
+    }
     inline operator CassIterator *() const
     {
         return iterator;
     }
-private:
+protected:
     CassandraIterator(const CassandraIterator &);
     CassIterator *iterator;
 };
@@ -2251,6 +2258,7 @@ extern void cassandraToGenericXML()
 //--------------------------------------------
 
 #define CASS_SEARCH_PREFIX_SIZE 2
+#define NUM_PARTITIONS 2
 
 static const CassValue *getSingleResult(const CassResult *result)
 {
@@ -2364,6 +2372,24 @@ public:
     }
 } timestampColumnMapper;
 
+class HashRootNameColumnMapper : implements CassandraColumnMapper
+{
+public:
+    virtual IPTree *toXML(IPTree *row, const char *name, const CassValue *value)
+    {
+        throwUnexpected(); // we never return the partition column
+    }
+    virtual bool fromXML(CassStatement *statement, unsigned idx, IPTree *row, const char *name, const char * userVal)
+    {
+        if (statement)
+        {
+            int hash = rtlHash32VStr(row->queryName(), 0) % NUM_PARTITIONS;
+            check(cass_statement_bind_int32(statement, idx, hash));
+        }
+        return true;
+    }
+} hashRootNameColumnMapper;
+
 class RootNameColumnMapper : implements CassandraColumnMapper
 {
 public:
@@ -3064,6 +3090,7 @@ struct CassandraTableInfo
 
 static const CassandraXmlMapping workunitsMappings [] =
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
@@ -3087,11 +3114,12 @@ static const CassandraXmlMapping workunitsMappings [] =
     {"elements", "map<text, text>", "@Debug@Exceptions@Graphs@Results@Statistics@Plugins@Query@Variables@Temporaries@Workflow@", elementMapColumnMapper},  // name is the suppression list, note trailing @
     {"subtrees", "map<text, text>", "@Application@Process@Tracing@", subTreeMapColumnMapper},  // name is the INCLUSION list, note trailing @
 
-    { NULL, "workunits", "((wuid))", stringColumnMapper}
+    { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
 };
 
 static const CassandraXmlMapping workunitInfoMappings [] =  // A cut down version of the workunit mappings - used when querying with no key
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"clustername", "text", "@clusterName", stringColumnMapper},
     {"jobname", "text", "@jobName", stringColumnMapper},
@@ -3100,7 +3128,7 @@ static const CassandraXmlMapping workunitInfoMappings [] =  // A cut down versio
     {"scope", "text", "@scope", stringColumnMapper},
     {"submitID", "text", "@submitID", stringColumnMapper},
     {"state", "text", "@state", stringColumnMapper},
-    { NULL, "workunits", "((wuid))", stringColumnMapper}
+    { NULL, "workunits", "((partition), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
 };
 
 // The following describes the search table - this contains copies of the basic wu information but keyed by different fields
@@ -3151,11 +3179,12 @@ struct ChildTableInfo
 
 static const CassandraXmlMapping wuExceptionsMappings [] =
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"sequence", "int", "@sequence", intColumnMapper},
     {"attributes", "map<text, text>", "", attributeMapColumnMapper},
     {"value", "text", ".", stringColumnMapper},
-    { NULL, "wuExceptions", "((wuid), sequence)", stringColumnMapper}
+    { NULL, "wuExceptions", "((partition, wuid), sequence)", stringColumnMapper}
 };
 
 static const ChildTableInfo wuExceptionsTable =
@@ -3167,13 +3196,14 @@ static const ChildTableInfo wuExceptionsTable =
 
 static const CassandraXmlMapping wuStatisticsMappings [] =
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"ts", "bigint", "@ts", bigintColumnMapper},  // MORE - should change this to a timeuuid ?
     {"kind", "text", "@kind", stringColumnMapper},
     {"creator", "text", "@creator", stringColumnMapper},
     {"scope", "text", "@scope", stringColumnMapper},
     {"attributes", "map<text, text>", "@ts@kind@creator@scope@", attributeMapColumnMapper},
-    { NULL, "wuStatistics", "((wuid), ts, kind, creator, scope)", stringColumnMapper}
+    { NULL, "wuStatistics", "((partition, wuid), ts, kind, creator, scope)", stringColumnMapper}
 };
 
 static const ChildTableInfo wuStatisticsTable =
@@ -3185,12 +3215,13 @@ static const ChildTableInfo wuStatisticsTable =
 
 static const CassandraXmlMapping wuGraphProgressMappings [] =
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"graphID", "text", NULL, graphIdColumnMapper},
     {"progress", "blob", NULL, progressColumnMapper},  // NOTE - order of these is significant - this creates the subtree that ones below will modify
     {"subgraphID", "text", "@id", subgraphIdColumnMapper},
     {"state", "int", "@_state", intColumnMapper},
-    { NULL, "wuGraphProgress", "((wuid), graphid, subgraphid)", stringColumnMapper}
+    { NULL, "wuGraphProgress", "((partition, wuid), graphid, subgraphid)", stringColumnMapper}
 };
 
 static const ChildTableInfo wuGraphProgressTable =
@@ -3202,6 +3233,7 @@ static const ChildTableInfo wuGraphProgressTable =
 
 static const CassandraXmlMapping wuResultsMappings [] =
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"sequence", "int", "@sequence", intColumnMapper},
     {"name", "text", "@name", stringColumnMapper},
@@ -3213,7 +3245,7 @@ static const CassandraXmlMapping wuResultsMappings [] =
     {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
     {"logicalName", "text", "logicalName", stringColumnMapper},  // either this or value will be present once result status is "calculated"
     {"value", "blob", "Value", blobColumnMapper},
-    { NULL, "wuResults", "((wuid), sequence)", stringColumnMapper}
+    { NULL, "wuResults", "((partition, wuid), sequence)", stringColumnMapper}
 };
 
 static const ChildTableInfo wuResultsTable =
@@ -3227,6 +3259,7 @@ static const ChildTableInfo wuResultsTable =
 
 static const CassandraXmlMapping wuVariablesMappings [] =
 {
+    {"partition", "int", NULL, hashRootNameColumnMapper},
     {"wuid", "text", NULL, rootNameColumnMapper},
     {"sequence", "int", "@sequence", defaultedIntColumnMapper},  // Note - should be either variable or temporary...
     {"name", "text", "@name", requiredStringColumnMapper},
@@ -3237,7 +3270,7 @@ static const CassandraXmlMapping wuVariablesMappings [] =
     {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},
     {"logicalName", "text", "logicalName", stringColumnMapper},  // either this or value will be present once result status is "calculated"
     {"value", "blob", "Value", blobColumnMapper},
-    { NULL, "wuVariables", "((wuid), sequence, name)", stringColumnMapper}
+    { NULL, "wuVariables", "((partition, wuid), sequence, name)", stringColumnMapper}
 };
 
 static const ChildTableInfo wuVariablesTable =
@@ -3260,8 +3293,6 @@ interface ICassandraSession : public IInterface  // MORE - rename!
     virtual void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, CassBatch *batch) const = 0;
 };
 
-
-
 void getBoundFieldNames(const CassandraXmlMapping *mappings, StringBuffer &names, StringBuffer &bindings, IPTree *inXML, const char *userVal, StringBuffer &tableName)
 {
     while (mappings->columnName)
@@ -3304,7 +3335,22 @@ StringBuffer & describeTable(const CassandraXmlMapping *mappings, StringBuffer &
         fields.appendf("%s %s,", mappings->columnName, mappings->columnType);
         mappings++;
     }
-    return out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s);", mappings->columnType, fields.str(), mappings->xpath);
+    StringArray options;
+    options.appendList(mappings->xpath, "|");
+    assertex(options.length()); // Primary key at least should be present!
+    out.appendf("CREATE TABLE IF NOT EXISTS %s (%s PRIMARY KEY %s)", mappings->columnType, fields.str(), options.item(0));
+    unsigned idx = 1;
+    while (options.isItem(idx))
+    {
+        if (idx==1)
+            out.append(" WITH ");
+        else
+            out.append(", ");
+        out.append(options.item(idx));
+        idx++;
+    }
+    out.append(';');
+    return out;
 }
 
 const CassResult *executeQuery(CassSession *session, CassStatement *statement)
@@ -3378,9 +3424,10 @@ extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *
     VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
     Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
     CassandraStatement update(cass_prepared_bind(*prepared));
-    check(cass_statement_bind_string(update, 0, wuid));
-    unsigned bindidx = 1; // We already bound wuid
-    unsigned colidx = 1; // We already bound wuid
+    check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
+    check(cass_statement_bind_string(update, 1, wuid));
+    unsigned bindidx = 2; // We already bound wuid and partition
+    unsigned colidx = 2; // We already bound wuid and partition
     while (mappings[colidx].columnName)
     {
         if (mappings[colidx].mapper.fromXML(update, bindidx, &row, mappings[colidx].xpath, userVal))
@@ -3581,6 +3628,136 @@ extern void cassandraTest()
 }
 */
 
+static IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
+{
+    CassandraIterator cols(cass_iterator_from_row(row));
+    Owned<IPTree> xml = createPTree("row");  // May be overwritten below if wuid field is processed
+    if (xpath && *xpath && key && *key)
+        xml->setProp(xpath, key);
+    while (cass_iterator_next(cols))
+    {
+        assertex(mappings->columnName);
+        const CassValue *value = cass_iterator_get_column(cols);
+        if (value && !cass_value_is_null(value))
+            mappings->mapper.toXML(xml, mappings->xpath, value);
+        mappings++;
+    }
+    return xml.getClear();
+}
+
+class CassSortableIterator : public CassandraIterator
+{
+public:
+    CassSortableIterator(CassIterator *_iterator, unsigned _compareColumn, bool _descending)
+    : CassandraIterator(_iterator), compareColumn(_compareColumn), descending(_descending)
+    {
+
+    }
+    const CassSortableIterator *nextRow()
+    {
+        if (iterator && cass_iterator_next(iterator))
+        {
+            const CassRow *row = cass_iterator_get_row(iterator);
+            getCassString(value.clear(), cass_row_get_column(row, compareColumn));
+            return this;
+        }
+        else
+            return NULL;
+    }
+    void stop()
+    {
+        value.clear();
+        set(NULL);
+    }
+    int compare(const CassSortableIterator *to) const
+    {
+        int ret = strcmp(value, to->value); // Note - empty StringBuffer always returns ""
+        return descending ? -ret : ret;
+    }
+private:
+    StringBuffer value;
+    unsigned compareColumn;
+    bool descending;
+};
+
+class CassMultiIterator : public CInterface, implements IRowProvider, implements ICompare, implements IConstWorkUnitIterator
+{
+public:
+    IMPLEMENT_IINTERFACE;
+    CassMultiIterator(unsigned _compareColumn, bool _descending)
+    {
+        compareColumn = _compareColumn;
+        descending = _descending;
+    }
+
+    void addResult(CassandraResult &result)
+    {
+        results.append(result);
+    }
+
+    virtual bool first()
+    {
+        inputs.kill();
+        ForEachItemIn(idx, results)
+        {
+            inputs.append(*new CassSortableIterator(cass_iterator_from_result(results.item(idx)), compareColumn, descending));
+        }
+        merger.setown(createRowStreamMerger(inputs.length(), *this, this, false));
+        return next();
+    }
+    virtual bool next()
+    {
+        current.clear();
+        const CassandraIterator *nextSource = get_row();
+        if (!nextSource)
+            return false;
+        Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(*nextSource));
+        current.setown(createConstWorkUnitInfo(*wuXML));
+        return true;
+    }
+    virtual bool isValid()
+    {
+        return current != NULL;
+    }
+    virtual IConstWorkUnitInfo & query()
+    {
+        assertex(current);
+        return *current.get();
+    }
+
+    const CassandraIterator *get_row()
+    {
+        return (const CassSortableIterator *) merger->nextRow();
+    }
+protected:
+    virtual void linkRow(const void *row) { throwUnexpected(); }  // The 'rows' we pass around are CassSortableIterator objects - we CAN link them if we have to
+    virtual void releaseRow(const void *row) { throwUnexpected(); }
+    virtual const void *nextRow(unsigned idx)
+    {
+        CassSortableIterator &it = inputs.item(idx);
+        return it.nextRow(); // returns either a pointer to the iterator, or NULL
+    }
+    virtual void stop(unsigned idx)
+    {
+        inputs.item(idx).stop();
+    }
+    virtual int docompare(const void *a, const void *b) const
+    {
+        // a and b point to to CassSortableIterator objects
+        const CassSortableIterator *aa = (const CassSortableIterator *) a;
+        const CassSortableIterator *bb = (const CassSortableIterator *) b;
+        return aa->compare(bb);
+    }
+private:
+    Owned<IRowStream> merger;
+    IArrayOf<CassandraResult> results;
+    IArrayOf<CassSortableIterator> inputs;
+    Owned<IConstWorkUnitInfo> current;
+    unsigned compareColumn;
+    bool firstDone;
+    bool descending;
+};
+
 class CCassandraWorkUnit : public CLocalWorkUnit
 {
 public:
@@ -3613,9 +3790,10 @@ public:
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
         deleteChildren(wuid);
         deleteSecondaries(wuid);
-        Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from workunits where wuid=?;");
+        Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;");
         CassandraStatement update(cass_prepared_bind(*prepared));
-        check(cass_statement_bind_string(update, 0, wuid));
+        check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
+        check(cass_statement_bind_string(update, 1, wuid));
         check(cass_batch_add_statement(*batch, update));
         CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
         futureBatch.wait("execute");
@@ -3836,7 +4014,7 @@ protected:
                 if (!results)
                     results.setown(createPTree(childTable.parentElement));
                 child.setown(createPTree(childTable.childElement));
-                unsigned colidx = 1;  // We did not fetch wuid
+                unsigned colidx = 2;  // We did not fetch wuid or partition
                 while (cass_iterator_next(cols))
                 {
                     assertex(childTable.mappings[colidx].columnName);
@@ -3970,7 +4148,7 @@ public:
             suffix = 0;
             suffixLength = 0;
         }
-        Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (wuid) VALUES (?) IF NOT EXISTS;");
+        Owned<CassandraPrepared> prepared = prepareStatement("INSERT INTO workunits (partition, wuid) VALUES (?,?) IF NOT EXISTS;");
         loop
         {
             // Create a unique WUID by adding suffixes until we managed to add a new value
@@ -3985,7 +4163,8 @@ public:
                 }
             }
             CassandraStatement statement(cass_prepared_bind(*prepared));
-            check(cass_statement_bind_string(statement, 0, useWuid.str()));
+            check(cass_statement_bind_int32(statement, 0, rtlHash32VStr(useWuid.str(), 0) % NUM_PARTITIONS));
+            check(cass_statement_bind_string(statement, 1, useWuid.str()));
             if (traceLevel >= 2)
                 DBGLOG("Try creating %s", useWuid.str());
             CassandraFuture future(cass_session_execute(session, statement));
@@ -4120,11 +4299,11 @@ public:
         // MORE - if the batch gets too big you may need to flush it occasionally
         CassandraBatch batch(fix ? cass_batch_new(CASS_BATCH_TYPE_LOGGED) : NULL);
         // 1. Check that every entry in main wu table has matching entries in secondary tables
-        CassandraResult result(fetchData(workunitInfoMappings));
+        CassandraResult result(fetchData(workunitInfoMappings+1));
         CassandraIterator rows(cass_iterator_from_result(result));
         while (cass_iterator_next(rows))
         {
-            Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings, cass_iterator_get_row(rows));
+            Owned<IPTree> wuXML = rowToPTree(NULL, NULL, workunitInfoMappings+1, cass_iterator_get_row(rows));
             const char *wuid = wuXML->queryName();
             // For each search entry, check that we get matching XML
             for (const char * const *search = searchPaths; *search; search++)
@@ -4173,6 +4352,11 @@ public:
             ensureTable(session, table[0]->mappings);
     }
 
+    virtual const char *queryStoreType() const
+    {
+        return "Cassandra";
+    }
+
     // Interface ICassandraSession
     virtual CassSession *querySession() const { return session; };
     virtual unsigned queryTraceLevel() const { return traceLevel; };
@@ -4209,30 +4393,34 @@ private:
     }
     bool checkWuExists(const char *wuid)
     {
-        Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits where wuid=?;");
+        Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;");
         CassandraStatement statement(cass_prepared_bind(*prepared));
-        cass_statement_bind_string(statement, 0, wuid);
+        cass_statement_bind_int32(statement, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        cass_statement_bind_string(statement, 1, wuid);
         CassandraFuture future(cass_session_execute(session, statement));
         future.wait("select count(*)");
         CassandraResult result(cass_future_get_result(future));
         return getUnsignedResult(NULL, getSingleResult(result)) != 0; // Shouldn't be more than 1, either
     }
 
-    IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
+    IConstWorkUnitIterator * getSortedWorkUnits(ISecManager *secmgr, ISecUser *secuser)
     {
-        Owned<CassandraResult> result;
-        const CassandraXmlMapping *mappings;
-        if (!key || !*key)
-        {
-            xpath = NULL;
-            mappings = workunitInfoMappings;
-            result.setown(new CassandraResult(fetchData(mappings)));   // Historically, providing no value on a call to getWorkUnitsByOwner (for example) filter meant unfiltered...
-        }
-        else
+        // Hack in some test code to test stream merging
+        Owned<IPTree> parent = createPTree("WorkUnits");
+        Owned<CassMultiIterator> merger = new CassMultiIterator(0, true); // Merge by wuid (note that we didn't fetch partition...)
+        for (int i = 0; i < NUM_PARTITIONS; i++)
         {
-            mappings = searchMappings+3;  // Don't return the xpath, searhPrefix or searchValue fields
-            result.setown(new CassandraResult(fetchDataForKey(xpath, key)));
+            merger->addResult(*new CassandraResult(fetchDataByPartition(workunitInfoMappings, i)));
         }
+        return merger.getClear();
+    }
+
+    IConstWorkUnitIterator * getWorkUnitsByXXX(const char *xpath, const char *key, ISecManager *secmgr, ISecUser *secuser)
+    {
+        if (!key || !*key)
+            return getSortedWorkUnits(secmgr, secuser);
+        const CassandraXmlMapping *mappings = searchMappings+3;  // Don't return the xpath, searchPrefix or searchValue fields
+        Owned<CassandraResult> result = new CassandraResult(fetchDataForKey(xpath, key));
         Owned<IPTree> parent = createPTree("WorkUnits");
         CassandraIterator rows(cass_iterator_from_result(*result));
         while (cass_iterator_next(rows))
@@ -4317,24 +4505,6 @@ private:
         return errCount;
     }
 
-
-    IPTree *rowToPTree(const char *xpath, const char *key, const CassandraXmlMapping *mappings, const CassRow *row)
-    {
-        CassandraIterator cols(cass_iterator_from_row(row));
-        Owned<IPTree> xml = createPTree("row");  // May be overwritten below if wuid field is processed
-        if (xpath && *xpath && key && *key)
-            xml->setProp(xpath, key);
-        while (cass_iterator_next(cols))
-        {
-            assertex(mappings->columnName);
-            const CassValue *value = cass_iterator_get_column(cols);
-            if (value && !cass_value_is_null(value))
-                mappings->mapper.toXML(xml, mappings->xpath, value);
-            mappings++;
-        }
-        return xml.getClear();
-    }
-
     IPTree *cassandraToWorkunitXML(const char *wuid) const
     {
         CassandraResult result(fetchDataForWuid(workunitsMappings, wuid));
@@ -4344,7 +4514,7 @@ private:
             Owned<IPTree> wuXML = createPTree(wuid);
             wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
             CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
-            unsigned colidx = 1;  // wuid is not returned
+            unsigned colidx = 2;  // wuid and partition are not returned
             while (cass_iterator_next(cols))
             {
                 assertex(workunitsMappings[colidx].columnName);
@@ -4374,6 +4544,21 @@ private:
         return executeQuery(session, statement);
     }
 
+    // Fetch all rows from a single partition of a table
+
+    const CassResult *fetchDataByPartition(const CassandraXmlMapping *mappings, int partition) const
+    {
+        StringBuffer names;
+        StringBuffer tableName;
+        getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
+        VStringBuffer selectQuery("select %s from %s where partition=%d;", names.str()+1, tableName.str(), partition);
+        selectQuery.append(';');
+        if (traceLevel >= 2)
+            DBGLOG("%s", selectQuery.str());
+        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
+        return executeQuery(session, statement);
+    }
+
     // Fetch matching rows from a child table, or the main wu table
 
     const CassResult *fetchDataForWuid(const CassandraXmlMapping *mappings, const char *wuid) const
@@ -4381,8 +4566,8 @@ private:
         assertex(wuid && *wuid);
         StringBuffer names;
         StringBuffer tableName;
-        getFieldNames(mappings+1, names, tableName);  // mappings+1 means we don't return the wuid column
-        VStringBuffer selectQuery("select %s from %s where wuid='%s';", names.str()+1, tableName.str(), wuid); // MORE - should consider using prepared/bind for this - is it faster?
+        getFieldNames(mappings+2, names, tableName);  // mappings+1 means we don't return the partition or wuid columns
+        VStringBuffer selectQuery("select %s from %s where partition=%d and wuid='%s';", names.str()+1, tableName.str(), rtlHash32VStr(wuid, 0) % NUM_PARTITIONS, wuid); // MORE - should consider using prepared/bind for this - is it faster?
         if (traceLevel >= 2)
             DBGLOG("%s", selectQuery.str());
         CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
@@ -4430,10 +4615,11 @@ private:
         StringBuffer names;
         StringBuffer tableName;
         getFieldNames(mappings, names, tableName);
-        VStringBuffer insertQuery("DELETE from %s where wuid=?;", tableName.str());
+        VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
         Owned<CassandraPrepared> prepared = prepareStatement(insertQuery);
         CassandraStatement update(cass_prepared_bind(*prepared));
-        check(cass_statement_bind_string(update, 0, wuid));
+        check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
+        check(cass_statement_bind_string(update, 1, wuid));
         check(cass_batch_add_statement(batch, update));
     }