Browse Source

HPCC-13616 Cache sessions and prepared queries in Cassandra driver

Step 2 - add a hash table to cache previous sessions where options match.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
1a6975614e
2 changed files with 121 additions and 56 deletions
  1. 86 29
      plugins/cassandra/cassandraembed.cpp
  2. 35 27
      plugins/cassandra/cassandraembed.hpp

+ 86 - 29
plugins/cassandra/cassandraembed.cpp

@@ -74,13 +74,6 @@ static void logCallBack(const CassLogMessage *message, void *data)
     DBGLOG("cassandra: %s - %s", cass_log_level_string(message->severity), message->message);
 }
 
-MODULE_INIT(INIT_PRIORITY_STANDARD)
-{
-    cass_log_set_callback(logCallBack, NULL);
-    cass_log_set_level(CASS_LOG_WARN);
-    return true;
-}
-
 extern void failx(const char *message, ...)
 {
     va_list args;
@@ -129,17 +122,6 @@ void CassandraClusterSession::setOptions(const StringArray &options)
                 password = val;
             else if (stricmp(optName, "keyspace")==0)
                 keyspace.set(val);
-            else if (stricmp(optName, "batch")==0)
-            {
-                if (stricmp(val, "LOGGED")==0)
-                    batchMode = CASS_BATCH_TYPE_LOGGED;
-                else if (stricmp(val, "UNLOGGED")==0)
-                    batchMode = CASS_BATCH_TYPE_UNLOGGED;
-                else if (stricmp(val, "COUNTER")==0)
-                    batchMode = CASS_BATCH_TYPE_COUNTER;
-            }
-            else if (stricmp(optName, "pageSize")==0)
-                pageSize = getUnsignedOption(val, "pageSize");
             else if (stricmp(optName, "maxFutures")==0)
                 maxFutures=getUnsignedOption(val, "maxFutures");
             else if (stricmp(optName, "maxRetries")==0)
@@ -310,6 +292,62 @@ void CassandraClusterSession::disconnect()
 {
     session.clear();
 }
+CassandraPrepared *CassandraClusterSession::prepareStatement(const char *query, bool trace) const
+{
+    assertex(session);
+    CriticalBlock b(cacheCrit);
+    Linked<CassandraPrepared> cached = preparedCache.getValue(query);
+    if (cached)
+        return cached.getClear();
+    {
+        // We don't want to block cache lookups while we prepare a new bound statement
+        // Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
+        CriticalUnblock b(cacheCrit);
+        CassandraFuture futurePrep(cass_session_prepare(*session, query));
+        futurePrep.wait("prepare statement");
+        cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep), trace ? query : NULL));
+    }
+    preparedCache.setValue(query, cached); // NOTE - this links parameter
+    return cached.getClear();
+}
+
+typedef CassandraClusterSession *CassandraClusterSessionPtr;
+typedef MapBetween<hash64_t, hash64_t, CassandraClusterSessionPtr, CassandraClusterSessionPtr> ClusterSessionMap;
+static CriticalSection clusterCacheCrit;
+static ClusterSessionMap cachedSessions;
+
+CassandraClusterSession *lookupCachedSession(hash64_t hash, const StringArray &opts)
+{
+    Owned<CassandraClusterSession> cluster;
+    CassandraClusterSessionPtr *found = cachedSessions.getValue(hash);
+    if (found)
+        cluster.set(*found);
+    if (!cluster)
+    {
+        cluster.setown(new CassandraClusterSession(cass_cluster_new()));
+        cluster->setOptions(opts);
+        cluster->connect();
+        cachedSessions.setValue(hash, cluster.getLink());
+    }
+    return cluster.getClear();
+}
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    cass_log_set_callback(logCallBack, NULL);
+    cass_log_set_level(CASS_LOG_WARN);
+    return true;
+}
+
+MODULE_EXIT()
+{
+    HashIterator i(cachedSessions);
+    ForEach(i)
+    {
+        CassandraClusterSession *session = *cachedSessions.mapToValue(&i.query());
+        ::Release(session);
+    }
+}
 
 //------------------
 
@@ -1228,13 +1266,34 @@ class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
     CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
-      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0)
+      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
     {
         StringArray opts;
         opts.appendList(options, ",");
-        cluster.setown(new CassandraClusterSession(cass_cluster_new()));
-        cluster->setOptions(opts);
-        cluster->connect();
+        hash64_t hash = 0;
+        ForEachItemInRev(idx, opts)
+        {
+            const char *opt = opts.item(idx);
+            if (strnicmp(opt, "batch=", 6)==0)
+            {
+                const char *val=opt+6;
+                if (stricmp(val, "LOGGED")==0)
+                    batchMode = CASS_BATCH_TYPE_LOGGED;
+                else if (stricmp(val, "UNLOGGED")==0)
+                    batchMode = CASS_BATCH_TYPE_UNLOGGED;
+                else if (stricmp(val, "COUNTER")==0)
+                    batchMode = CASS_BATCH_TYPE_COUNTER;
+                opts.remove(idx);
+            }
+            else if (strnicmp(opt, "pagesize=", 9)==0)
+            {
+                pageSize = atoi(opt+9);
+                opts.remove(idx);
+            }
+            else
+                hash = rtlHash64VStr(opt, hash);
+        }
+        cluster.setown(lookupCachedSession(hash, opts));
     }
     virtual bool getBooleanResult()
     {
@@ -1660,22 +1719,19 @@ public:
                     break;
                 }
                 CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
-                CassandraFuture future(cass_session_execute(*cluster->session, statement));
+                CassandraFuture future(cass_session_execute(*cluster, statement));
                 future.wait("execute statement");
                 script = nextScript;
             }
         }
         else
         {
-            // MORE - can cache this, perhaps, if script is same as last time?
-            CassandraFuture future(cass_session_prepare(*cluster->session, script));
-            future.wait("prepare statement");
-            Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
+            Owned<CassandraPrepared> prepared = cluster->prepareStatement(script, false); // We could make tracing selectable
             if ((flags & EFnoparams) == 0)
                 numParams = countBindings(script);
             else
                 numParams = 0;
-            stmtInfo.setown(new CassandraStatementInfo(cluster->session, prepared, numParams, cluster->batchMode, cluster->pageSize, cluster->maxFutures, cluster->maxRetries));
+            stmtInfo.setown(new CassandraStatementInfo(*cluster, prepared, numParams, batchMode, pageSize, cluster->maxFutures, cluster->maxRetries));
         }
     }
     virtual void callFunction()
@@ -1801,7 +1857,8 @@ protected:
     unsigned nextParam;
     unsigned numParams;
     StringAttr queryString;
-
+    CassBatchType batchMode;
+    unsigned pageSize;
 };
 
 class CassandraEmbedContext : public CInterfaceOf<IEmbedContext>

+ 35 - 27
plugins/cassandra/cassandraembed.hpp

@@ -29,6 +29,32 @@ extern bool isString(CassValueType t);
 
 // Wrappers to Cassandra structures that require corresponding releases
 
+class CassandraPrepared : public CInterfaceOf<IInterface>
+{
+public:
+    inline CassandraPrepared(const CassPrepared *_prepared, const char *_queryString)
+    : prepared(_prepared), queryString(_queryString)
+    {
+    }
+    inline ~CassandraPrepared()
+    {
+        if (prepared)
+            cass_prepared_free(prepared);
+    }
+    inline operator const CassPrepared *() const
+    {
+        return prepared;
+    }
+    inline const char *queryQueryString() const
+    {
+        return queryString;
+    }
+private:
+    CassandraPrepared(const CassandraPrepared &);
+    StringAttr queryString;
+    const CassPrepared *prepared;
+};
+
 class CassandraSession : public CInterface
 {
 public:
@@ -54,7 +80,7 @@ class CassandraClusterSession : public CInterface
 {
 public:
     inline CassandraClusterSession(CassCluster *_cluster)
-    : cluster(_cluster), batchMode((CassBatchType) -1), pageSize(0), maxFutures(0), maxRetries(0)
+    : cluster(_cluster), maxFutures(0), maxRetries(0)
     {
     }
     void setOptions(const StringArray &options);
@@ -68,12 +94,17 @@ public:
     {
         return cluster;
     }
+    inline operator CassandraSession *() const
+    {
+        return session;
+    }
     inline operator CassSession *() const
     {
         return *session;
     }
     void connect();
     void disconnect();
+    CassandraPrepared *prepareStatement(const char *query, bool trace) const;
 private:
     void checkSetOption(CassError rc, const char *name);
     cass_bool_t getBoolOption(const char *val, const char *option);
@@ -82,14 +113,12 @@ private:
     __uint64 getUnsigned64Option(const char *val, const char *option);
     CassandraClusterSession(const CassandraClusterSession &);
     CassCluster *cluster;
-public:
-    Owned<CassandraSession> session;  // Make private later
-
+    Owned<CassandraSession> session;
+    mutable MapStringToMyClass<CassandraPrepared> preparedCache;
+    mutable CriticalSection cacheCrit;
 public:
     // These are here as convenient to set from same options string. They are really properties of the session
     // or query rather than the cluster, but we have one session per cluster so we get away with it at the moment.
-    CassBatchType batchMode;
-    unsigned pageSize;
 	unsigned maxFutures;
 	unsigned maxRetries;
     StringAttr keyspace;
@@ -169,27 +198,6 @@ private:
     CassBatch *batch;
 };
 
-class CassandraPrepared : public CInterfaceOf<IInterface>
-{
-public:
-    inline CassandraPrepared(const CassPrepared *_prepared)
-    : prepared(_prepared)
-    {
-    }
-    inline ~CassandraPrepared()
-    {
-        if (prepared)
-            cass_prepared_free(prepared);
-    }
-    inline operator const CassPrepared *() const
-    {
-        return prepared;
-    }
-private:
-    CassandraPrepared(const CassandraPrepared &);
-    const CassPrepared *prepared;
-};
-
 class CassandraStatement : public CInterface
 {
 public: