Browse Source

Merge pull request #7515 from richardkchapman/cassandra-logging

HPCC-13865 Add cql tracing to Cassandra WU support

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 years ago
parent
commit
197d08c3ad

+ 1 - 6
common/workunit/workunit.cpp

@@ -1141,8 +1141,6 @@ public:
     {
         try
         {
-            //MORE: I'm not convinced this is useful...
-            setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, NULL, StWhenWorkunitModified, NULL, getTimeStampNowValue(), 1, 0, StatsMergeReplace);
             try
             {
                 connection->commit();
@@ -3763,10 +3761,7 @@ void CLocalWorkUnit::unlockRemote()
 {
     CriticalBlock block(crit);
     locked.unlock();
-    if (IsShared())  // Is this right? Doesn't feel right! Commit on last unlock would seem smarter
-    {
-        _unlockRemote();
-    }
+    _unlockRemote();
 }
 
 IWorkUnit &CLocalWorkUnit::lockRemote(bool commit)

+ 4 - 3
ecl/wutest/wutest.cpp

@@ -151,6 +151,7 @@ Owned<IProperties> globals;
 
 int main(int argc, const char *argv[])
 {
+    int ret = 0;
     InitModuleObjects();
     unsigned count=0;
     globals.setown(createProperties("WUTEST.INI", true));
@@ -209,8 +210,7 @@ int main(int argc, const char *argv[])
             CppUnit::TextUi::TestRunner runner;
             CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry("WuTest");
             runner.addTest( registry.makeTest() );
-            bool wasSucessful = runner.run( "", false );
-            return wasSucessful;
+            ret = runner.run( "", false );
         }
         else
 #endif
@@ -394,6 +394,7 @@ int main(int argc, const char *argv[])
     }
     closeDllServer();   
     closeEnvironment(); 
+    clientShutdownWorkUnit();
     closedownClientProcess();   // dali client closedown
     if (queryActiveTimer())
     {
@@ -401,7 +402,7 @@ int main(int argc, const char *argv[])
         queryActiveTimer()->reset();
     }
     releaseAtoms();
-    return 0;
+    return ret;
 }
 
 #ifdef _USE_CPPUNIT

+ 6 - 4
plugins/cassandra/CMakeLists.txt

@@ -95,10 +95,12 @@ if (USE_CASSANDRA)
     ADD_DEFINITIONS( -D_USRDLL -DCASSANDRAEMBED_EXPORTS)
 
     HPCC_ADD_LIBRARY( cassandraembed SHARED ${SRCS} )
-    if (${CMAKE_VERSION} VERSION_LESS "2.8.9")
-      message("WARNING: Cannot set NO_SONAME. shlibdeps will give warnings when package is installed")
-    elseif(NOT APPLE)
-      set_target_properties( cassandraembed PROPERTIES NO_SONAME 1 )
+    if ( NOT FORCE_WORKUNITS_TO_CASSANDRA )
+      if (${CMAKE_VERSION} VERSION_LESS "2.8.9")
+        message("WARNING: Cannot set NO_SONAME. shlibdeps will give warnings when package is installed")
+      elseif(NOT APPLE)
+        set_target_properties( cassandraembed PROPERTIES NO_SONAME 1 )
+      endif()
     endif()
 
     install ( TARGETS cassandraembed DESTINATION plugins COMPONENT Runtime )

+ 1 - 1
plugins/cassandra/cassandraembed.cpp

@@ -1582,7 +1582,7 @@ public:
             // MORE - can cache this, perhaps, if script is same as last time?
             CassandraFuture future(cass_session_prepare(*session, script));
             future.wait("prepare statement");
-            Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
+            Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future), NULL);
             if ((flags & EFnoparams) == 0)
                 numParams = countBindings(script);
             else

+ 99 - 23
plugins/cassandra/cassandraembed.hpp

@@ -156,6 +156,32 @@ private:
     CassBatch *batch;
 };
 
+class CassandraPrepared : public CInterfaceOf<IInterface>
+{
+public:
+    inline CassandraPrepared(const CassPrepared *_prepared, const char *_queryString)
+    : prepared(_prepared), queryString(_queryString)
+    {
+    }
+    inline ~CassandraPrepared()
+    {
+        if (prepared)
+            cass_prepared_free(prepared);
+    }
+    inline operator const CassPrepared *() const
+    {
+        return prepared;
+    }
+    inline const char *queryQueryString() const
+    {
+        return queryString;
+    }
+private:
+    CassandraPrepared(const CassandraPrepared &);
+    StringAttr queryString;
+    const CassPrepared *prepared;
+};
+
 class CassandraStatement : public CInterface
 {
 public:
@@ -165,48 +191,98 @@ public:
     inline CassandraStatement(const char *simple) : statement(cass_statement_new(simple, 0))
     {
     }
+    inline CassandraStatement(const CassandraPrepared *_prepared) : statement(cass_prepared_bind(*_prepared))
+    {
+        const char *queryString = _prepared->queryQueryString(); // Only set when tracing..
+        if (queryString)
+            query.set(queryString);
+        _prepared->Release();
+    }
     inline ~CassandraStatement()
     {
         if (statement)
             cass_statement_free(statement);
     }
-    inline operator CassStatement *() const
+    operator CassStatement *() const
     {
+        if (query.length())
+            DBGLOG("Executing %s", query.str());
         return statement;
     }
-    inline void bindString(unsigned idx, const char *value)
+    void bindBool(unsigned idx, cass_bool_t value)
     {
-        //DBGLOG("bind %d %s", idx, value);
-        check(cass_statement_bind_string(statement, idx, value));
+        if (query.length())
+            traceBind(idx, "%s", value ? "true" : "false");
+        check(cass_statement_bind_bool(statement, idx, value));
     }
-    inline void bindString_n(unsigned idx, const char *value, unsigned len)
+    void bindInt32(unsigned idx, __int32 value)
     {
-        //DBGLOG("bind %d %.*s", idx, len, value);
-        check(cass_statement_bind_string_n(statement, idx, value, len));
+        if (query.length())
+            traceBind(idx, "%d", value);
+        check(cass_statement_bind_int32(statement, idx, value));
     }
-private:
-    CassandraStatement(const CassandraStatement &);
-    CassStatement *statement;
-};
-
-class CassandraPrepared : public CInterfaceOf<IInterface>
-{
-public:
-    inline CassandraPrepared(const CassPrepared *_prepared) : prepared(_prepared)
+    void bindInt64(unsigned idx, __int64 value)
     {
+        if (query.length())
+            traceBind(idx, "%" I64F "d", value);
+        check(cass_statement_bind_int64(statement, idx, value));
     }
-    inline ~CassandraPrepared()
+    void bindString(unsigned idx, const char *value)
     {
-        if (prepared)
-            cass_prepared_free(prepared);
+        if (query.length())
+            traceBind(idx, "'%s'", value);
+        check(cass_statement_bind_string(statement, idx, value));
     }
-    inline operator const CassPrepared *() const
+    void bindString_n(unsigned idx, const char *value, unsigned len)
     {
-        return prepared;
+        if (strlen(value)<len)
+        {
+            if (query.length())
+                traceBind(idx, "'%s'", value);
+            check(cass_statement_bind_string(statement, idx, value));
+        }
+        else
+        {
+            if (query.length())
+                traceBind(idx, "'%.*s'", len, value);
+            check(cass_statement_bind_string_n(statement, idx, value, len));
+        }
+    }
+    void bindBytes(unsigned idx, const cass_byte_t *value, unsigned len)
+    {
+        if (query.length())
+            traceBind(idx, "(bytes)");
+        check(cass_statement_bind_bytes(statement, idx, value, len));
+    }
+    void bindCollection(unsigned idx, const CassCollection *value)
+    {
+        if (query.length())
+            traceBind(idx, "(collection)");
+        check(cass_statement_bind_collection(statement, idx, value));
     }
 private:
-    CassandraPrepared(const CassandraPrepared &);
-    const CassPrepared *prepared;
+    void traceBind(unsigned idx, const char *format, ...)
+    {
+        assert(query.length());
+        StringBuffer bound;
+        va_list args;
+        va_start(args, format);
+        bound.valist_appendf(format, args);
+        va_end(args);
+        const char *pos = query;
+        do
+        {
+            pos = strchr(pos, '?');
+            assert(pos);
+            if (!pos)
+                return;
+            pos++;
+        } while (idx--);
+        query.insert(pos-query, bound);
+    }
+    CassandraStatement(const CassandraStatement &);
+    StringBuffer query;
+    CassStatement *statement;
 };
 
 class CassandraResult : public CInterfaceOf<IInterface>

+ 134 - 422
plugins/cassandra/cassandrawu.cpp

@@ -47,304 +47,6 @@
 
 namespace cassandraembed {
 
-#define ATTRIBUTES_NAME "attributes"
-
-void addElement(IPTree *parent, const char *name, const CassValue *value)
-{
-    switch (cass_value_type(value))
-    {
-    case CASS_VALUE_TYPE_UNKNOWN:
-        // It's a NULL - ignore it (or we could add empty element...)
-        break;
-
-    case CASS_VALUE_TYPE_ASCII:
-    case CASS_VALUE_TYPE_TEXT:
-    case CASS_VALUE_TYPE_VARCHAR:
-    {
-        rtlDataAttr str;
-        unsigned chars;
-        getUTF8Result(NULL, value, chars, str.refstr());
-        StringAttr s(str.getstr(), rtlUtf8Size(chars, str.getstr()));
-        parent->addProp(name, s);
-        break;
-    }
-
-    case CASS_VALUE_TYPE_INT:
-    case CASS_VALUE_TYPE_BIGINT:
-    case CASS_VALUE_TYPE_VARINT:
-        parent->addPropInt64(name, getSignedResult(NULL, value));
-        break;
-
-    case CASS_VALUE_TYPE_BLOB:
-    {
-        rtlDataAttr data;
-        unsigned bytes;
-        getDataResult(NULL, value, bytes, data.refdata());
-        parent->addPropBin(name, bytes, data.getbytes());
-        break;
-    }
-    case CASS_VALUE_TYPE_BOOLEAN:
-        parent->addPropBool(name, getBooleanResult(NULL, value));
-        break;
-
-    case CASS_VALUE_TYPE_DOUBLE:
-    case CASS_VALUE_TYPE_FLOAT:
-    {
-        double v = getRealResult(NULL, value);
-        StringBuffer s;
-        s.append(v);
-        parent->addProp(name, s);
-        break;
-    }
-    case CASS_VALUE_TYPE_LIST:
-    case CASS_VALUE_TYPE_SET:
-    {
-        CassandraIterator elems(cass_iterator_from_collection(value));
-        Owned<IPTree> list = createPTree(name);
-        while (cass_iterator_next(elems))
-            addElement(list, "item", cass_iterator_get_value(elems));
-        parent->addPropTree(name, list.getClear());
-        break;
-    }
-    case CASS_VALUE_TYPE_MAP:
-    {
-        CassandraIterator elems(cass_iterator_from_map(value));
-        if (strcmp(name, ATTRIBUTES_NAME)==0 && isString(cass_value_primary_sub_type(value)))
-        {
-            while (cass_iterator_next(elems))
-            {
-                rtlDataAttr str;
-                unsigned chars;
-                getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
-                StringBuffer s("@");
-                s.append(chars, str.getstr());
-                addElement(parent, s, cass_iterator_get_map_value(elems));
-            }
-        }
-        else
-        {
-            Owned<IPTree> map = createPTree(name);
-            while (cass_iterator_next(elems))
-            {
-                if (isString(cass_value_primary_sub_type(value)))
-                {
-                    rtlDataAttr str;
-                    unsigned chars;
-                    getStringResult(NULL, cass_iterator_get_map_key(elems), chars, str.refstr());
-                    StringAttr s(str.getstr(), chars);
-                    addElement(map, s, cass_iterator_get_map_value(elems));
-                }
-                else
-                {
-                    Owned<IPTree> mapping = createPTree("mapping");
-                    addElement(mapping, "key", cass_iterator_get_map_key(elems));
-                    addElement(mapping, "value", cass_iterator_get_map_value(elems));
-                    map->addPropTree("mapping", mapping.getClear());
-                }
-            }
-            parent->addPropTree(name, map.getClear());
-        }
-        break;
-    }
-    default:
-        DBGLOG("Column type %d not supported", cass_value_type(value));
-        UNSUPPORTED("Column type");
-    }
-}
-
-void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const char *name, CassValueType type)
-{
-    if (parent->hasProp(name) || strcmp(name, ATTRIBUTES_NAME)==0)
-    {
-        switch (type)
-        {
-        case CASS_VALUE_TYPE_ASCII:
-        case CASS_VALUE_TYPE_TEXT:
-        case CASS_VALUE_TYPE_VARCHAR:
-        {
-            const char *value = parent->queryProp(name);
-            if (value)
-                check(cass_statement_bind_string(statement, idx, value));
-            break;
-        }
-
-        case CASS_VALUE_TYPE_INT:
-            check(cass_statement_bind_int32(statement, idx, parent->getPropInt(name)));
-            break;
-        case CASS_VALUE_TYPE_BIGINT:
-        case CASS_VALUE_TYPE_VARINT:
-            check(cass_statement_bind_int64(statement, idx, parent->getPropInt64(name)));
-            break;
-
-        case CASS_VALUE_TYPE_BLOB:
-        {
-            MemoryBuffer buf;
-            parent->getPropBin(name, buf);
-            check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t*)buf.toByteArray(), buf.length()));
-            break;
-        }
-        case CASS_VALUE_TYPE_BOOLEAN:
-            check(cass_statement_bind_bool(statement, idx, (cass_bool_t) parent->getPropBool(name)));
-            break;
-
-        case CASS_VALUE_TYPE_DOUBLE:
-            check(cass_statement_bind_double(statement, idx, atof(parent->queryProp(name))));
-            break;
-        case CASS_VALUE_TYPE_FLOAT:
-            check(cass_statement_bind_float(statement, idx, atof(parent->queryProp(name))));
-            break;
-        case CASS_VALUE_TYPE_LIST:
-        case CASS_VALUE_TYPE_SET:
-        {
-            Owned<IPTree> child = parent->getPropTree(name);
-            unsigned numItems = child->getCount("item");
-            if (numItems)
-            {
-                CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numItems));
-                Owned<IPTreeIterator> items = child->getElements("item");
-                ForEach(*items)
-                {
-                    // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
-                    if (strcmp(name, "list1")==0)
-                        check(cass_collection_append_int32(collection, items->query().getPropInt(NULL)));
-                    else
-                        check(cass_collection_append_string(collection, items->query().queryProp(NULL)));
-                }
-                check(cass_statement_bind_collection(statement, idx, collection));
-            }
-            break;
-        }
-
-        case CASS_VALUE_TYPE_MAP:
-        {
-            // We don't know the subtypes - we can assert that we only support string, for most purposes, I suspect
-            if (strcmp(name, ATTRIBUTES_NAME)==0)
-            {
-                Owned<IAttributeIterator> attrs = parent->getAttributes();
-                unsigned numItems = attrs->count();
-                ForEach(*attrs)
-                {
-                    numItems++;
-                }
-                if (numItems)
-                {
-                    CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
-                    ForEach(*attrs)
-                    {
-                        const char *key = attrs->queryName();
-                        const char *value = attrs->queryValue();
-                        check(cass_collection_append_string(collection, key+1));  // skip the @
-                        check(cass_collection_append_string(collection, value));
-                    }
-                    check(cass_statement_bind_collection(statement, idx, collection));
-                }
-            }
-            else
-            {
-                Owned<IPTree> child = parent->getPropTree(name);
-                unsigned numItems = child->numChildren();
-                // MORE - if the cassandra driver objects to there being fewer than numItems supplied, we may need to recode using a second pass.
-                if (numItems)
-                {
-                    CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_MAP, numItems));
-                    Owned<IPTreeIterator> items = child->getElements("*");
-                    ForEach(*items)
-                    {
-                        IPTree &item = items->query();
-                        const char *key = item.queryName();
-                        const char *value = item.queryProp(NULL);
-                        if (key && value)
-                        {
-                            check(cass_collection_append_string(collection, key));
-                            check(cass_collection_append_string(collection, value));
-                        }
-                    }
-                    check(cass_statement_bind_collection(statement, idx, collection));
-                }
-            }
-            break;
-        }
-        default:
-            DBGLOG("Column type %d not supported", type);
-            UNSUPPORTED("Column type");
-        }
-    }
-}
-
-
-extern void cassandraToGenericXML()
-{
-    CassandraCluster cluster(cass_cluster_new());
-    cass_cluster_set_contact_points(cluster, "127.0.0.1");
-
-    CassandraSession session(cass_session_new());
-    CassandraFuture future(cass_session_connect_keyspace(session, cluster, "test"));
-    future.wait("connect");
-    CassandraStatement statement(cass_statement_new("select * from tbl1 where name = 'name1';", 0));
-    CassandraFuture future2(cass_session_execute(session, statement));
-    future2.wait("execute");
-    CassandraResult result(cass_future_get_result(future2));
-    StringArray names;
-    UnsignedArray types;
-    for (int i = 0; i < cass_result_column_count(result); i++)
-    {
-        const char *column;
-        size_t length;
-        cass_result_column_name(result, i, &column, &length);
-        StringBuffer name(length, column);
-        names.append(name);
-        types.append(cass_result_column_type(result, i));
-    }
-    // Now fetch the rows
-    Owned<IPTree> xml = createPTree("tbl1");
-    CassandraIterator rows(cass_iterator_from_result(result));
-    while (cass_iterator_next(rows))
-    {
-        CassandraIterator cols(cass_iterator_from_row(cass_iterator_get_row(rows)));
-        Owned<IPTree> row = createPTree("row");
-        unsigned colidx = 0;
-        while (cass_iterator_next(cols))
-        {
-            const CassValue *value = cass_iterator_get_column(cols);
-            const char *name = names.item(colidx);
-            addElement(row, name, value);
-            colidx++;
-        }
-        xml->addPropTree("row", row.getClear());
-    }
-    xml->setProp("row[1]/name", "newname");
-    StringBuffer buf;
-    toXML(xml, buf);
-    DBGLOG("%s", buf.str());
-
-    // Now try going the other way...
-    // For this we need to know the expected names (can fetch them from system table) and types (ditto, potentially, though a dummy select may be easier)
-    StringBuffer colNames;
-    StringBuffer values;
-    ForEachItemIn(idx, names)
-    {
-        colNames.append(",").append(names.item(idx));
-        values.append(",?");
-    }
-    VStringBuffer insertQuery("INSERT into tbl1 (%s) values (%s);", colNames.str()+1, values.str()+1);
-    Owned<IPTreeIterator> xmlRows = xml->getElements("row");
-    ForEach(*xmlRows)
-    {
-        IPropertyTree *xmlrow = &xmlRows->query();
-        CassandraStatement update(cass_statement_new(insertQuery.str(), names.length()));
-        ForEachItemIn(idx, names)
-        {
-            bindElement(update, xmlrow, idx, names.item(idx), (CassValueType) types.item(idx));
-        }
-        // MORE - use a batch
-        CassandraFuture future3(cass_session_execute(session, update));
-        future2.wait("insert");
-    }
-
-}
-
-//--------------------------------------------
-
 #define CASS_WU_QUERY_EXPIRES  (1000*60*5)
 #define CASS_WORKUNIT_POSTSORT_LIMIT 10000
 #define CASS_SEARCH_PREFIX_SIZE 2
@@ -440,7 +142,7 @@ public:
         if (value.length())
         {
             if (statement)
-                check(cass_statement_bind_bytes(*statement, idx, (const cass_byte_t *) value.toByteArray(), value.length()));
+                statement->bindBytes(idx, (const cass_byte_t *) value.toByteArray(), value.length());
             return true;
         }
         else
@@ -475,7 +177,7 @@ public:
         if (statement)
         {
             int hash = rtlHash32VStr(row->queryName(), 0) % NUM_PARTITIONS;
-            check(cass_statement_bind_int32(*statement, idx, hash));
+            statement->bindInt32(idx, hash);
         }
         return true;
     }
@@ -569,7 +271,7 @@ public:
         if (value.length())
         {
             if (statement)
-                check(cass_statement_bind_bytes(*statement, idx, (const cass_byte_t *) value.str(), value.length()));
+                statement->bindBytes(idx, (const cass_byte_t *) value.str(), value.length());
             return true;
         }
         else
@@ -592,7 +294,7 @@ public:
             if (statement)
             {
                 bool value = row->getPropBool(name, false);
-                check(cass_statement_bind_bool(*statement, idx, value ? cass_true : cass_false));
+                statement->bindBool(idx, value ? cass_true : cass_false);
             }
             return true;
         }
@@ -623,7 +325,7 @@ protected:
                 StringBuffer buf(columnVal);
                 if (uc)
                     buf.toUpperCase();
-                if (prefixLength && prefixLength < buf.length())
+                if (prefixLength)
                     statement->bindString_n(idx, buf, prefixLength);
                 else
                     statement->bindString(idx, buf);
@@ -669,7 +371,7 @@ public:
             if (statement)
             {
                 int value = row->getPropInt(name);
-                check(cass_statement_bind_int32(*statement, idx, value));
+                statement->bindInt32(idx, value);
             }
             return true;
         }
@@ -686,7 +388,7 @@ public:
         if (statement)
         {
             int value = row->getPropInt(name, atoi(defaultValue));
-            check(cass_statement_bind_int32(*statement, idx, value));
+            statement->bindInt32(idx, value);
         }
         return true;
     }
@@ -707,7 +409,7 @@ public:
             if (statement)
             {
                 __int64 value = row->getPropInt64(name);
-                check(cass_statement_bind_int64(*statement, idx, value));
+                statement->bindInt64(idx, value);
             }
             return true;
         }
@@ -731,7 +433,7 @@ public:
         if (statement)
         {
             int value = row->getPropInt(name);
-            check(cass_statement_bind_int64(*statement, idx, value));
+            statement->bindInt64(idx, value);
         }
         return true;
     }
@@ -778,7 +480,7 @@ public:
                             check(cass_collection_append_string(collection, value));
                         }
                     }
-                    check(cass_statement_bind_collection(*statement, idx, collection));
+                    statement->bindCollection(idx, collection);
                 }
                 return true;
             }
@@ -832,7 +534,7 @@ public:
                         check(cass_collection_append_string(collection, value));
                     }
                 }
-                check(cass_statement_bind_collection(*statement, idx, collection));
+                statement->bindCollection(idx, collection);
             }
             return true;
         }
@@ -896,7 +598,7 @@ public:
                         }
                     }
                 }
-                check(cass_statement_bind_collection(*statement, idx, collection));
+                statement->bindCollection(idx, collection);
             }
             return true;
         }
@@ -921,7 +623,10 @@ public:
             StringBuffer valStr;
             getCassString(valStr, value);
             if (valStr.length() && valStr.charAt(0)== '<')
-                row->setPropTree(elemName, createPTreeFromXMLString(valStr));
+            {
+                IPTree *sub = createPTreeFromXMLString(valStr);
+                row->setPropTree(elemName, sub);
+            }
         }
         return row;
     }
@@ -964,7 +669,7 @@ public:
                         }
                     }
                 }
-                check(cass_statement_bind_collection(*statement, idx, collection));
+                statement->bindCollection(idx, collection);
             }
             return true;
         }
@@ -1037,7 +742,7 @@ public:
                             check(cass_collection_append_string(collection, value));
                         }
                     }
-                    check(cass_statement_bind_collection(*statement, idx, collection));
+                    statement->bindCollection(idx, collection);
                 }
                 return true;
             }
@@ -1116,7 +821,7 @@ public:
                         check(cass_collection_append_string(collection, value));
                     }
                 }
-                check(cass_statement_bind_collection(*statement, idx, collection));
+                statement->bindCollection(idx, collection);
             }
             return true;
         }
@@ -1163,7 +868,7 @@ public:
                         if (value)
                             check(cass_collection_append_string(collection, value));
                     }
-                    check(cass_statement_bind_collection(*statement, idx, collection));
+                    statement->bindCollection(idx, collection);
                 }
                 return true;
             }
@@ -1373,9 +1078,7 @@ static const ChildTableInfo wuGraphProgressTable =
         {"wuid", "text", NULL, rootNameColumnMapper},          \
         {"sequence", "int", "@sequence", defaultedIntColumnMapper},     \
         {"name", "text", "@name", stringColumnMapper},         \
-        {"format", "text", "@format", stringColumnMapper},     /* xml, xmlset, csv, or null to mean raw. Could probably switch to int if we wanted, or drop altogether since included in attributes? */ \
-        {"status", "text", "@status", stringColumnMapper},     \
-        {"attributes", "map<text, text>", "@sequence@name@format@status@", attributeMapColumnMapper},  /* name is the suppression list. We could consider folding format/status into this? */ \
+        {"attributes", "map<text, text>", "@sequence@name@", attributeMapColumnMapper},  /* name is the suppression list */ \
         {"rowcount", "int", "rowCount", intColumnMapper},      /* This is the number of rows in result (which may be stored in a file rather than in value) */ \
         {"totalrowcount", "bigint", "totalRowCount", bigintColumnMapper},  /* This is the number of rows in value */ \
         {"schemaRaw", "blob", "SchemaRaw", blobColumnMapper},              \
@@ -1533,13 +1236,9 @@ void deleteSecondaryByKey(const char * xpath, const char *key, const char *wuid,
         StringBuffer tableName;
         getFieldNames(searchMappings, names, tableName);
         VStringBuffer deleteQuery("DELETE from %s where xpath=? and fieldPrefix=? and fieldValue=? and wuid=?;", tableName.str());
-        Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(deleteQuery);
-        CassandraStatement update(cass_prepared_bind(*prepared));
+        CassandraStatement update(sessionCache->prepareStatement(deleteQuery));
         update.bindString(0, xpath);
-        if (ucKey.length() < CASS_SEARCH_PREFIX_SIZE)
-            update.bindString(1, ucKey);
-        else
-            update.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
+        update.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
         update.bindString(2, ucKey);
         update.bindString(3, wuid);
         check(cass_batch_add_statement(batch, update));
@@ -1566,8 +1265,7 @@ extern void simpleXMLtoCassandra(const ICassandraSession *session, CassBatch *ba
     StringBuffer tableName;
     getBoundFieldNames(mappings, names, bindings, inXML, userVal, tableName);
     VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
-    Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
-    CassandraStatement update(cass_prepared_bind(*prepared));
+    CassandraStatement update(session->prepareStatement(insertQuery));
     unsigned bindidx = 0;
     while (mappings->columnName)
     {
@@ -1584,8 +1282,7 @@ extern void deleteSimpleXML(const ICassandraSession *session, CassBatch *batch,
     StringBuffer tableName;
     getFieldNames(mappings, names, tableName);
     VStringBuffer deleteQuery("DELETE from %s where name=? and wuid=?", tableName.str());
-    Owned<CassandraPrepared> prepared = session->prepareStatement(deleteQuery);
-    CassandraStatement update(cass_prepared_bind(*prepared));
+    CassandraStatement update(session->prepareStatement(deleteQuery));
     unsigned bindidx = 0;
     while (mappings->columnName)
     {
@@ -1603,15 +1300,11 @@ extern void addUniqueValue(const ICassandraSession *session, CassBatch *batch, c
     StringBuffer tableName;
     getBoundFieldNames(uniqueSearchMappings, names, bindings, NULL, NULL, tableName);
     VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
-    Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
-    CassandraStatement update(cass_prepared_bind(*prepared));
+    CassandraStatement update(session->prepareStatement(insertQuery));
     update.bindString(0, xpath);
     StringBuffer ucValue(value);
     ucValue.toUpperCase();
-    if (ucValue.length() < CASS_SEARCH_PREFIX_SIZE)
-        update.bindString(1, ucValue);
-    else
-        update.bindString_n(1, ucValue, CASS_SEARCH_PREFIX_SIZE);
+    update.bindString_n(1, ucValue, CASS_SEARCH_PREFIX_SIZE);
     update.bindString(2, ucValue);
     update.bindString(3, value);
     check(cass_batch_add_statement(batch, update));
@@ -1624,9 +1317,8 @@ extern void childXMLRowtoCassandra(const ICassandraSession *session, CassBatch *
     StringBuffer tableName;
     getBoundFieldNames(mappings, names, bindings, &row, userVal, tableName);
     VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
-    Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
-    CassandraStatement update(cass_prepared_bind(*prepared));
-    check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
+    CassandraStatement update(session->prepareStatement(insertQuery));
+    update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
     update.bindString(1, wuid);
     unsigned bindidx = 2; // We already bound wuid and partition
     unsigned colidx = 2; // We already bound wuid and partition
@@ -2436,9 +2128,10 @@ private:
 
 class CCassandraWorkUnit : public CLocalWorkUnit
 {
+    // IMPLEMENT_IINTERFACE;
 public:
     IMPLEMENT_IINTERFACE;
-    CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser)
+    CCassandraWorkUnit(ICassandraSession *_sessionCache, IPTree *wuXML, ISecManager *secmgr, ISecUser *secuser, bool write)
         : sessionCache(_sessionCache), CLocalWorkUnit(secmgr, secuser)
     {
         CLocalWorkUnit::loadPTree(wuXML);
@@ -2446,6 +2139,8 @@ public:
         memset(childLoaded, 0, sizeof(childLoaded));
         abortDirty = true;
         abortState = false;
+        if (write)
+            _lockRemote();
     }
     ~CCassandraWorkUnit()
     {
@@ -2466,9 +2161,8 @@ public:
             batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
         deleteChildren(wuid);
         deleteSecondaries(wuid);
-        Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;");
-        CassandraStatement update(cass_prepared_bind(*prepared));
-        check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
+        CassandraStatement update(sessionCache->prepareStatement("DELETE from workunits where partition=? and wuid=?;"));
+        update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
         update.bindString(1, wuid);
         check(cass_batch_add_statement(*batch, update));
         CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
@@ -2587,9 +2281,6 @@ public:
 
     virtual void _lockRemote()
     {
-        // Ignore locking for now!
-//        printStackReport();
-//        UNIMPLEMENTED;
         batch.setown(new CassandraBatch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED)));
     }
 
@@ -2775,7 +2466,8 @@ protected:
     {
         if (prevKey && *prevKey)
             deleteSecondaryByKey(xpath, prevKey, wuid, sessionCache, *batch);
-        if (p->hasProp(xpath))
+        const char *value = p->queryProp(xpath);
+        if (value && *value)
             simpleXMLtoCassandra(sessionCache, *batch, searchMappings, p, xpath);
     }
 
@@ -2826,7 +2518,7 @@ protected:
         for (search = wildSearchPaths; *search; search++)
         {
             const char *value = p->queryProp(*search);
-            if (value)
+            if (value && *value)
                 addUniqueValue(sessionCache, *batch, *search, value);
         }
         deleteAppSecondaries(*prev, wuid);
@@ -2940,6 +2632,8 @@ public:
     {
         cacheRetirer.stop();
         cacheRetirer.join();
+        if (traceLevel)
+            DBGLOG("CCasssandraWorkUnitFactory destroyed");
     }
 
     virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
@@ -2970,8 +2664,8 @@ public:
                     suffix /= 10;
                 }
             }
-            CassandraStatement statement(cass_prepared_bind(*prepared));
-            check(cass_statement_bind_int32(statement, 0, rtlHash32VStr(useWuid.str(), 0) % NUM_PARTITIONS));
+            CassandraStatement statement(prepared.getLink());
+            statement.bindInt32(0, rtlHash32VStr(useWuid.str(), 0) % NUM_PARTITIONS);
             statement.bindString(1, useWuid.str());
             if (traceLevel >= 2)
                 DBGLOG("Try creating %s", useWuid.str());
@@ -2984,8 +2678,7 @@ public:
                 // If there are multiple columns it will be '[applied]' (value false) and the fields of the existing row
                 Owned<IPTree> wuXML = createPTree(useWuid);
                 wuXML->setProp("@xmlns:xsi", "http://www.w3.org/1999/XMLSchema-instance");
-                Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
-                wu->lockRemote(true);
+                Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, true);
                 return wu.getClear();
             }
             suffix = rand_r(&randState);
@@ -2995,20 +2688,16 @@ public:
     }
     virtual CLocalWorkUnit* _openWorkUnit(const char *wuid, bool lock, ISecManager *secmgr, ISecUser *secuser)
     {
-        // MORE - what to do about lock?
         Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
         if (wuXML)
-            return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
+            return new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, lock);
         else
             return NULL;
     }
     virtual CLocalWorkUnit* _updateWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser)
     {
-        // Ignore locking for now
-        // Note - in Dali, this would lock for write, whereas _openWorkUnit would either lock for read (if lock set) or not lock at all
         Owned<IPTree> wuXML = cassandraToWorkunitXML(wuid);
-        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser);
-        wu->lockRemote(true);
+        Owned<CLocalWorkUnit> wu = new CCassandraWorkUnit(this, wuXML.getClear(), secmgr, secuser, true);
         return wu.getClear();
     }
 
@@ -3280,7 +2969,7 @@ public:
         {
             *cachehint = cached->queryHint();
             CriticalBlock b(cacheCrit);
-            cacheIdMap.setValue(*cachehint, cached.getClear());
+            cacheIdMap.setValue(*cachehint, cached); // Links its parameter
         }
         if (total)
             *total = 0; // We don't know
@@ -3292,8 +2981,7 @@ public:
     }
     virtual unsigned numWorkUnits()
     {
-        Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits;");
-        CassandraStatement statement(cass_prepared_bind(*prepared));
+        CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits;"));
         CassandraFuture future(cass_session_execute(session, statement));
         future.wait("select count(*)");
         CassandraResult result(cass_future_get_result(future));
@@ -3307,8 +2995,9 @@ public:
     */
     virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
     {
-        VStringBuffer select("select state from workunits where wuid = '%s';", wuid);
-        CassandraStatement statement(cass_statement_new(select.str(), 0));
+        CassandraStatement statement(prepareStatement("select state from workunits where partition=? and wuid = ?;"));
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
         unsigned start = msTick();
         loop
         {
@@ -3437,19 +3126,19 @@ public:
         Linked<CassandraPrepared> cached = preparedCache.getValue(query);
         if (cached)
         {
-            if (traceLevel >= 2)
+            if (traceLevel >= 3)
                 DBGLOG("prepareStatement: Reusing %s", query);
             return cached.getClear();
         }
         {
-            if (traceLevel >= 2)
+            if (traceLevel >= 3)
                 DBGLOG("prepareStatement: Binding %s", query);
             // We don't want to block cache lookups while we prepare a new bound statement
             // Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
             CriticalUnblock b(cacheCrit);
             CassandraFuture futurePrep(cass_session_prepare(session, query));
             futurePrep.wait("prepare statement");
-            cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep)));
+            cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep), traceLevel>=2?query:NULL));
         }
         preparedCache.setValue(query, cached); // NOTE - this links parameter
         return cached.getClear();
@@ -3463,10 +3152,9 @@ private:
     }
     bool checkWuExists(const char *wuid)
     {
-        Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;");
-        CassandraStatement statement(cass_prepared_bind(*prepared));
-        cass_statement_bind_int32(statement, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
-        cass_statement_bind_string(statement, 1, wuid);
+        CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;"));
+        statement.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        statement.bindString(1, wuid);
         CassandraFuture future(cass_session_execute(session, statement));
         future.wait("select count(*)");
         CassandraResult result(cass_future_get_result(future));
@@ -3640,11 +3328,11 @@ private:
         StringBuffer names;
         StringBuffer tableName;
         getFieldNames(mappings+1, names, tableName); // Don't fetch partition column
-        VStringBuffer selectQuery("select %s from %s where partition=%d", names.str()+1, tableName.str(), partition);
+        VStringBuffer selectQuery("select %s from %s where partition=?", names.str()+1, tableName.str());
         ForEachItemIn(idx, wuidFilters)
         {
             const IPostFilter &wuidFilter = wuidFilters.item(idx);
-            selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
+            selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
         }
         switch (sortOrder)
         {
@@ -3665,10 +3353,14 @@ private:
         if (limit)
             selectQuery.appendf(" LIMIT %u", limit);
         selectQuery.append(';');
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindInt32(0, partition);
+        ForEachItemIn(idx2, wuidFilters)
+        {
+            const IPostFilter &wuidFilter = wuidFilters.item(idx2);
+            select.bindString(idx2+1, wuidFilter.queryValue());
+        }
+        return executeQuery(session, select);
     }
 
     // Fetch matching rows from a child table, or the main wu table
@@ -3679,11 +3371,12 @@ private:
         StringBuffer names;
         StringBuffer tableName;
         getFieldNames(mappings + (includeWuid ? 1 : 2), names, tableName);  // mappings+2 means we don't return the partition or wuid columns
-        VStringBuffer selectQuery("select %s from %s where partition=%d and wuid='%s';", names.str()+1, tableName.str(), rtlHash32VStr(wuid, 0) % NUM_PARTITIONS, wuid); // MORE - should consider using prepared/bind for this - is it faster?
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        VStringBuffer selectQuery("select %s from %s where partition=? and wuid=?;", names.str()+1, tableName.str());
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        select.bindString(1, wuid);
+        return executeQuery(session, select);
+
     }
 
     // Fetch matching rows from the search table, for all wuids, sorted by wuid
@@ -3696,12 +3389,13 @@ private:
         StringBuffer ucKey(key);
         ucKey.toUpperCase();
         getFieldNames(searchMappings+3, names, tableName);  // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
-        VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s'", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str()); // MORE - should consider using prepared/bind for this - is it faster?
+        VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
         selectQuery.append(" ORDER BY fieldValue ASC, WUID desc;");
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, xpath);
+        select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
+        select.bindString(2, ucKey);
+        return executeQuery(session, select);
     }
 
     // Fetch matching rows from the search table, for all wuids, sorted by wuid
@@ -3713,11 +3407,11 @@ private:
         StringBuffer ucKey(key);
         ucKey.toUpperCase();
         getFieldNames(searchMappings+3, names, tableName);  // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
-        VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s'", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str());
+        VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue=?", names.str()+1, tableName.str());
         ForEachItemIn(idx, wuidFilters)
         {
             const IPostFilter &wuidFilter = wuidFilters.item(idx);
-            selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
+            selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
         }
         switch (sortOrder)
         {
@@ -3735,10 +3429,16 @@ private:
         }
         if (limit)
             selectQuery.appendf(" LIMIT %u", limit);
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, xpath);
+        select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
+        select.bindString(2, ucKey);
+        ForEachItemIn(idx2, wuidFilters)
+        {
+            const IPostFilter &wuidFilter = wuidFilters.item(idx2);
+            select.bindString(3+idx2, wuidFilter.queryValue());
+        }
+        return executeQuery(session, select);
     }
 
     // Fetch matching rows from the search or uniqueSearch table, for a given prefix
@@ -3755,11 +3455,13 @@ private:
         assertex(len);
         ucKeyEnd.setCharAt(len-1, ucKeyEnd.charAt(len-1)+1);
         getFieldNames(mappings+3, names, tableName);  // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
-        VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue >='%s' and fieldValue < '%s';", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str(), ucKeyEnd.str()); // MORE - should consider using prepared/bind for this - is it faster?
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue>=? and fieldValue<?;", names.str()+1, tableName.str());
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, xpath);
+        select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
+        select.bindString(2, ucKey);
+        select.bindString(3, ucKeyEnd);
+        return executeQuery(session, select);
     }
 
     // Fetch rows from the search table, by thorTime, above a threshold
@@ -3769,9 +3471,9 @@ private:
         StringBuffer names;
         StringBuffer tableName;
         getFieldNames(searchMappings+3, names, tableName);  // mappings+3 means we don't return the key columns (xpath, upper(keyPrefix), upper(key))
-        VStringBuffer selectQuery("select %s from %s where xpath='@totalThorTime' and fieldPrefix='%*s'", names.str()+1, tableName.str(), CASS_SEARCH_PREFIX_SIZE, "");
+        VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=?", names.str()+1, tableName.str());
         if (threshold && *threshold)
-            selectQuery.appendf(" where fieldValue >= '%s'", threshold);
+            selectQuery.appendf(" where fieldValue >= ?");
         if (descending)
             selectQuery.append(" ORDER BY fieldValue DESC, wuid ASC");
         else
@@ -3779,10 +3481,12 @@ private:
         if (limit)
             selectQuery.appendf(" LIMIT %u", limit);
         selectQuery.append(';');
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, "@totalThorTime");
+        select.bindString_n(1, "        ", CASS_SEARCH_PREFIX_SIZE);  // This would stop working if we ever set the search prefix to > 8 chars. So don't.
+        if (threshold && *threshold)
+            select.bindString(2, threshold);
+        return executeQuery(session, select);
     }
 
     // Fetch rows from the search table, continuing a previous query that was sorted by thor time - part one
@@ -3805,9 +3509,9 @@ private:
             wuidTest = "<";
             fieldTest = wuid ? "=" : ">";
         }
-        VStringBuffer selectQuery("select %s from %s where xpath='@totalThorTime' and fieldPrefix='%*s' and fieldValue %s '%s'", names.str()+1, tableName.str(), CASS_SEARCH_PREFIX_SIZE, "", fieldTest, threshold);
+        VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue %s ?", names.str()+1, tableName.str(), fieldTest);
         if (wuid)
-            selectQuery.appendf(" and wuid %s '%s'", wuidTest, wuid);
+            selectQuery.appendf(" and wuid %s ?", wuidTest);
         if (descending)
             selectQuery.append(" ORDER BY fieldValue DESC, WUID ASC");
         else
@@ -3815,10 +3519,13 @@ private:
         if (limit)
             selectQuery.appendf(" LIMIT %u", limit);
         selectQuery.append(';');
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, "@totalThorTime");
+        select.bindString_n(1, threshold, CASS_SEARCH_PREFIX_SIZE);
+        select.bindString(2, threshold);
+        if (wuid)
+            select.bindString(3, wuid);
+        return executeQuery(session, select);
     }
 
     // Fetch rows from the file search table
@@ -3828,18 +3535,22 @@ private:
         StringBuffer names;
         StringBuffer tableName;
         getFieldNames(filesReadSearchMappings+1, names, tableName);  // mappings+3 means we don't return the key column (name)
-        VStringBuffer selectQuery("select %s from %s where name='%s'", names.str()+1, tableName.str(), name);
+        VStringBuffer selectQuery("select %s from %s where name=?", names.str()+1, tableName.str());
         ForEachItemIn(idx, wuidFilters)
         {
             const IPostFilter &wuidFilter = wuidFilters.item(idx);
-            selectQuery.appendf(" and wuid %s '%s'", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=", wuidFilter.queryValue());
+            selectQuery.appendf(" and wuid %s ?", wuidFilter.queryField()==WUSFwuidhigh ? "<=" : ">=");
         }
         if (limit)
             selectQuery.appendf(" LIMIT %u", limit);
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, name);
+        ForEachItemIn(idx2, wuidFilters)
+        {
+            const IPostFilter &wuidFilter = wuidFilters.item(idx2);
+            select.bindString(idx2+1, wuidFilter.queryValue());
+        }
+        return executeQuery(session, select);
     }
 
     // Fetch matching rows from the search table, for a single wuid
@@ -3852,11 +3563,13 @@ private:
         StringBuffer ucKey(key);
         ucKey.toUpperCase();
         getFieldNames(searchMappings+4, names, tableName);  // mappings+4 means we don't return the key columns (xpath, upper(keyPrefix), upper(key), and wuid)
-        VStringBuffer selectQuery("select %s from %s where xpath='%s' and fieldPrefix='%.*s' and fieldValue ='%s' and wuid='%s';", names.str()+1, tableName.str(), xpath, CASS_SEARCH_PREFIX_SIZE, ucKey.str(), ucKey.str(), wuid); // MORE - should consider using prepared/bind for this - is it faster?
-        if (traceLevel >= 2)
-            DBGLOG("%s", selectQuery.str());
-        CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
-        return executeQuery(session, statement);
+        VStringBuffer selectQuery("select %s from %s where xpath=? and fieldPrefix=? and fieldValue =? and wuid=?;", names.str()+1, tableName.str());
+        CassandraStatement select(prepareStatement(selectQuery));
+        select.bindString(0, xpath);
+        select.bindString_n(1, ucKey, CASS_SEARCH_PREFIX_SIZE);
+        select.bindString(2, ucKey);
+        select.bindString(3, wuid);
+        return executeQuery(session, select);
     }
 
     // Delete matching rows from a child table
@@ -3867,10 +3580,9 @@ private:
         StringBuffer tableName;
         getFieldNames(mappings, names, tableName);
         VStringBuffer insertQuery("DELETE from %s where partition=? and wuid=?;", tableName.str());
-        Owned<CassandraPrepared> prepared = prepareStatement(insertQuery);
-        CassandraStatement update(cass_prepared_bind(*prepared));
-        check(cass_statement_bind_int32(update, 0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS));
-        check(cass_statement_bind_string(update, 1, wuid));
+        CassandraStatement update(prepareStatement(insertQuery));
+        update.bindInt32(0, rtlHash32VStr(wuid, 0) % NUM_PARTITIONS);
+        update.bindString(1, wuid);
         check(cass_batch_add_statement(batch, update));
     }