Sfoglia il codice sorgente

HPCC-13616 Cache sessions and prepared queries in Cassandra driver

Step 1 - move code around so that cluster and session are stored together.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 anni fa
parent
commit
55b4148080
2 ha cambiato i file con 58 aggiunte e 40 eliminazioni
  1. 23 15
      plugins/cassandra/cassandraembed.cpp
  2. 35 25
      plugins/cassandra/cassandraembed.hpp

+ 23 - 15
plugins/cassandra/cassandraembed.cpp

@@ -108,7 +108,7 @@ void check(CassError rc)
 
 // Wrappers to Cassandra structures that require corresponding releases
 
-void CassandraCluster::setOptions(const StringArray &options)
+void CassandraClusterSession::setOptions(const StringArray &options)
 {
     const char *contact_points = "localhost";
     const char *user = "";
@@ -267,18 +267,18 @@ void CassandraCluster::setOptions(const StringArray &options)
         cass_cluster_set_credentials(cluster, user, password);
 }
 
-void CassandraCluster::checkSetOption(CassError rc, const char *name)
+void CassandraClusterSession::checkSetOption(CassError rc, const char *name)
 {
     if (rc != CASS_OK)
     {
         failx("While setting option %s: %s", name, cass_error_desc(rc));
     }
 }
-cass_bool_t CassandraCluster::getBoolOption(const char *val, const char *option)
+cass_bool_t CassandraClusterSession::getBoolOption(const char *val, const char *option)
 {
     return strToBool(val) ? cass_true : cass_false;
 }
-unsigned CassandraCluster::getUnsignedOption(const char *val, const char *option)
+unsigned CassandraClusterSession::getUnsignedOption(const char *val, const char *option)
 {
     char *endp;
     long value = strtoul(val, &endp, 0);
@@ -286,7 +286,7 @@ unsigned CassandraCluster::getUnsignedOption(const char *val, const char *option
         failx("Invalid value '%s' for option %s", val, option);
     return (unsigned) value;
 }
-unsigned CassandraCluster::getDoubleOption(const char *val, const char *option)
+unsigned CassandraClusterSession::getDoubleOption(const char *val, const char *option)
 {
     char *endp;
     double value = strtod(val, &endp);
@@ -294,11 +294,22 @@ unsigned CassandraCluster::getDoubleOption(const char *val, const char *option)
         failx("Invalid value '%s' for option %s", val, option);
     return value;
 }
-__uint64 CassandraCluster::getUnsigned64Option(const char *val, const char *option)
+__uint64 CassandraClusterSession::getUnsigned64Option(const char *val, const char *option)
 {
     // MORE - could check it's all digits (with optional leading spaces...), if we cared.
     return rtlVStrToUInt8(val);
 }
+void CassandraClusterSession::connect()
+{
+    assertex(cluster && !session);
+    session.setown(new CassandraSession(cass_session_new()));
+    CassandraFuture future(keyspace.isEmpty() ? cass_session_connect(*session, cluster) : cass_session_connect_keyspace(*session, cluster, keyspace));
+    future.wait("connect");
+}
+void CassandraClusterSession::disconnect()
+{
+    session.clear();
+}
 
 //------------------
 
@@ -1221,11 +1232,9 @@ public:
     {
         StringArray opts;
         opts.appendList(options, ",");
-        cluster.setown(new CassandraCluster(cass_cluster_new()));
+        cluster.setown(new CassandraClusterSession(cass_cluster_new()));
         cluster->setOptions(opts);
-        session.setown(new CassandraSession(cass_session_new()));
-        CassandraFuture future(cluster->keyspace.isEmpty() ? cass_session_connect(*session, *cluster) : cass_session_connect_keyspace(*session, *cluster, cluster->keyspace));
-        future.wait("connect");
+        cluster->connect();
     }
     virtual bool getBooleanResult()
     {
@@ -1651,7 +1660,7 @@ public:
                     break;
                 }
                 CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
-                CassandraFuture future(cass_session_execute(*session, statement));
+                CassandraFuture future(cass_session_execute(*cluster->session, statement));
                 future.wait("execute statement");
                 script = nextScript;
             }
@@ -1659,14 +1668,14 @@ public:
         else
         {
             // MORE - can cache this, perhaps, if script is same as last time?
-            CassandraFuture future(cass_session_prepare(*session, script));
+            CassandraFuture future(cass_session_prepare(*cluster->session, script));
             future.wait("prepare statement");
             Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
             if ((flags & EFnoparams) == 0)
                 numParams = countBindings(script);
             else
                 numParams = 0;
-            stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, cluster->batchMode, cluster->pageSize, cluster->maxFutures, cluster->maxRetries));
+            stmtInfo.setown(new CassandraStatementInfo(cluster->session, prepared, numParams, cluster->batchMode, cluster->pageSize, cluster->maxFutures, cluster->maxRetries));
         }
     }
     virtual void callFunction()
@@ -1784,8 +1793,7 @@ protected:
             failx("While binding parameter %s: %s", name, cass_error_desc(rc));
         }
     }
-    Owned<CassandraCluster> cluster;
-    Owned<CassandraSession> session;
+    Owned<CassandraClusterSession> cluster;
     Owned<CassandraStatementInfo> stmtInfo;
     Owned<CassandraDatasetBinder> inputStream;
     const IContextLogger &logctx;

+ 35 - 25
plugins/cassandra/cassandraembed.hpp

@@ -29,16 +29,38 @@ extern bool isString(CassValueType t);
 
 // Wrappers to Cassandra structures that require corresponding releases
 
-class CassandraCluster : public CInterface
+class CassandraSession : public CInterface
+{
+public:
+    inline CassandraSession() : session(NULL) {}
+    inline CassandraSession(CassSession *_session) : session(_session)
+    {
+    }
+    inline ~CassandraSession()
+    {
+        set(NULL);
+    }
+    void set(CassSession *_session);
+    inline operator CassSession *() const
+    {
+        return session;
+    }
+private:
+    CassandraSession(const CassandraSession &);
+    CassSession *session;
+};
+
+class CassandraClusterSession : public CInterface
 {
 public:
-    inline CassandraCluster(CassCluster *_cluster)
+    inline CassandraClusterSession(CassCluster *_cluster)
     : cluster(_cluster), batchMode((CassBatchType) -1), pageSize(0), maxFutures(0), maxRetries(0)
     {
     }
     void setOptions(const StringArray &options);
-    inline ~CassandraCluster()
+    inline ~CassandraClusterSession()
     {
+        session.clear();  // Should do this before freeing cluster
         if (cluster)
             cass_cluster_free(cluster);
     }
@@ -46,15 +68,24 @@ public:
     {
         return cluster;
     }
+    inline operator CassSession *() const
+    {
+        return *session;
+    }
+    void connect();
+    void disconnect();
 private:
     void checkSetOption(CassError rc, const char *name);
     cass_bool_t getBoolOption(const char *val, const char *option);
     unsigned getUnsignedOption(const char *val, const char *option);
     unsigned getDoubleOption(const char *val, const char *option);
     __uint64 getUnsigned64Option(const char *val, const char *option);
-    CassandraCluster(const CassandraCluster &);
+    CassandraClusterSession(const CassandraClusterSession &);
     CassCluster *cluster;
 public:
+    Owned<CassandraSession> session;  // Make private later
+
+public:
     // These are here as convenient to set from same options string. They are really properties of the session
     // or query rather than the cluster, but we have one session per cluster so we get away with it at the moment.
     CassBatchType batchMode;
@@ -118,27 +149,6 @@ private:
 
 };
 
-class CassandraSession : public CInterface
-{
-public:
-    inline CassandraSession() : session(NULL) {}
-    inline CassandraSession(CassSession *_session) : session(_session)
-    {
-    }
-    inline ~CassandraSession()
-    {
-        set(NULL);
-    }
-    void set(CassSession *_session);
-    inline operator CassSession *() const
-    {
-        return session;
-    }
-private:
-    CassandraSession(const CassandraSession &);
-    CassSession *session;
-};
-
 class CassandraBatch : public CInterface
 {
 public: