Ver código fonte

HPCC-13616 Cache sessions and prepared queries in Cassandra driver

Step 3 - Move the semaphore that limits concurrent futures down into the
session/cluster object.

Remove excessive operator overloading in CassandraClusterSession.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 anos atrás
pai
commit
0e0107f65b

+ 20 - 13
plugins/cassandra/cassandraembed.cpp

@@ -123,7 +123,14 @@ void CassandraClusterSession::setOptions(const StringArray &options)
             else if (stricmp(optName, "keyspace")==0)
                 keyspace.set(val);
             else if (stricmp(optName, "maxFutures")==0)
-                maxFutures=getUnsignedOption(val, "maxFutures");
+            {
+                if (!semaphore)
+                {
+                    maxFutures=getUnsignedOption(val, "maxFutures");
+                    if (maxFutures)
+                        semaphore = new Semaphore(maxFutures);
+                }
+            }
             else if (stricmp(optName, "maxRetries")==0)
                 maxRetries=getUnsignedOption(val, "maxRetries");
             else if (stricmp(optName, "port")==0)
@@ -310,6 +317,11 @@ CassandraPrepared *CassandraClusterSession::prepareStatement(const char *query,
     preparedCache.setValue(query, cached); // NOTE - this links parameter
     return cached.getClear();
 }
+CassandraStatementInfo *CassandraClusterSession::createStatementInfo(const char *script, unsigned numParams, CassBatchType batchMode, unsigned pageSize) const
+{
+    Owned<CassandraPrepared> prepared = prepareStatement(script, false); // We could make tracing selectable
+    return new CassandraStatementInfo(session, prepared, numParams, batchMode, pageSize, semaphore, maxRetries);
+}
 
 typedef CassandraClusterSession *CassandraClusterSessionPtr;
 typedef MapBetween<hash64_t, hash64_t, CassandraClusterSessionPtr, CassandraClusterSessionPtr> ClusterSessionMap;
@@ -381,7 +393,7 @@ void CassandraSession::set(CassSession *_session)
 
 //----------------------
 
-CassandraRetryingFuture::CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, LinkedSemaphore *_limiter, unsigned _retries)
+CassandraRetryingFuture::CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries)
 : session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
 {
     execute();
@@ -435,20 +447,19 @@ void CassandraRetryingFuture::execute()
         limiter->wait();
     future = cass_session_execute(session, statement);
     if (limiter)
-        cass_future_set_callback(future, signaller, LINK(limiter)); // Note - this will call the callback if the future has already completed
+        cass_future_set_callback(future, signaller, limiter); // Note - this will call the callback if the future has already completed
 }
 
 void CassandraRetryingFuture::signaller(CassFuture *future, void *data)
 {
-    LinkedSemaphore *sem = (LinkedSemaphore *) data;
+    Semaphore *sem = (Semaphore *) data;
     sem->signal();
-    ::Release(sem);
 }
 
 //----------------------
 
-CassandraStatementInfo::CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, unsigned _maxFutures, unsigned _maxRetries)
-    : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode), semaphore(NULL), maxFutures(_maxFutures), maxRetries(_maxRetries)
+CassandraStatementInfo::CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, Semaphore *_semaphore, unsigned _maxRetries)
+    : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode), semaphore(_semaphore), maxRetries(_maxRetries)
 {
     assertex(prepared && *prepared);
     statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
@@ -460,7 +471,6 @@ CassandraStatementInfo::~CassandraStatementInfo()
 {
     stop();
     futures.kill();
-    semaphore.clear();  // Note - may live on for a while until all futures associated with it have signalled.
 }
 void CassandraStatementInfo::stop()
 {
@@ -496,8 +506,6 @@ void CassandraStatementInfo::startStream()
 {
     if (batchMode != (CassBatchType) -1)
         batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
-    else if (maxFutures)
-        semaphore.setown(new LinkedSemaphore(maxFutures));
     statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
     inBatch = true;
 }
@@ -1719,19 +1727,18 @@ public:
                     break;
                 }
                 CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
-                CassandraFuture future(cass_session_execute(*cluster, statement));
+                CassandraFuture future(cass_session_execute(cluster->querySession(), statement));
                 future.wait("execute statement");
                 script = nextScript;
             }
         }
         else
         {
-            Owned<CassandraPrepared> prepared = cluster->prepareStatement(script, false); // We could make tracing selectable
             if ((flags & EFnoparams) == 0)
                 numParams = countBindings(script);
             else
                 numParams = 0;
-            stmtInfo.setown(new CassandraStatementInfo(*cluster, prepared, numParams, batchMode, pageSize, cluster->maxFutures, cluster->maxRetries));
+            stmtInfo.setown(cluster->createStatementInfo(script, numParams, batchMode, pageSize));
         }
     }
     virtual void callFunction()

+ 19 - 23
plugins/cassandra/cassandraembed.hpp

@@ -76,11 +76,13 @@ private:
     CassSession *session;
 };
 
+class CassandraStatementInfo;
+
 class CassandraClusterSession : public CInterface
 {
 public:
     inline CassandraClusterSession(CassCluster *_cluster)
-    : cluster(_cluster), maxFutures(0), maxRetries(0)
+    : cluster(_cluster), semaphore(NULL), maxFutures(0), maxRetries(0)
     {
     }
     void setOptions(const StringArray &options);
@@ -89,22 +91,25 @@ public:
         session.clear();  // Should do this before freeing cluster
         if (cluster)
             cass_cluster_free(cluster);
+        if (semaphore)
+            delete semaphore;
     }
-    inline operator CassCluster *() const
+    inline CassSession *querySession() const
     {
-        return cluster;
+        return *session;
     }
-    inline operator CassandraSession *() const
+    inline const char *queryKeySpace() const
     {
-        return session;
+        return keyspace;
     }
-    inline operator CassSession *() const
+    inline void setKeySpace(const char *val)
     {
-        return *session;
+        keyspace.set(val);
     }
     void connect();
     void disconnect();
     CassandraPrepared *prepareStatement(const char *query, bool trace) const;
+    CassandraStatementInfo *createStatementInfo(const char *script, unsigned numParams, CassBatchType batchMode, unsigned pageSize) const;
 private:
     void checkSetOption(CassError rc, const char *name);
     cass_bool_t getBoolOption(const char *val, const char *option);
@@ -116,11 +121,9 @@ private:
     Owned<CassandraSession> session;
     mutable MapStringToMyClass<CassandraPrepared> preparedCache;
     mutable CriticalSection cacheCrit;
-public:
-    // These are here as convenient to set from same options string. They are really properties of the session
-    // or query rather than the cluster, but we have one session per cluster so we get away with it at the moment.
-	unsigned maxFutures;
-	unsigned maxRetries;
+    Semaphore *semaphore;
+    unsigned maxFutures;
+    unsigned maxRetries;
     StringAttr keyspace;
 };
 
@@ -227,16 +230,10 @@ private:
     CassStatement *statement;
 };
 
-class LinkedSemaphore : public CInterfaceOf<IInterface>, public Semaphore
-{
-public:
-    LinkedSemaphore(unsigned initialCount) : Semaphore(initialCount) {}
-};
-
 class CassandraRetryingFuture : public CInterface
 {
 public:
-    CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, LinkedSemaphore *_limiter = NULL, unsigned _retries = 10);
+    CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries);
     ~CassandraRetryingFuture();
     inline operator CassFuture *() const
     {
@@ -253,7 +250,7 @@ private:
     CassSession *session;
     CassandraStatement statement;
     unsigned retries;
-    LinkedSemaphore *limiter;
+    Semaphore *limiter;
 };
 
 class CassandraResult : public CInterfaceOf<IInterface>
@@ -326,7 +323,7 @@ class CassandraStatementInfo : public CInterface
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, unsigned _maxFutures, unsigned _maxRetries);
+    CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, Semaphore *_semaphore, unsigned _maxRetries);
     ~CassandraStatementInfo();
     void stop();
     bool next();
@@ -360,8 +357,7 @@ protected:
     Owned<CassandraIterator> iterator;
     unsigned numBindings;
     CIArrayOf<CassandraRetryingFuture> futures;
-    Owned<LinkedSemaphore> semaphore;
-    unsigned maxFutures;
+    Semaphore *semaphore;
     unsigned maxRetries;
     bool inBatch;
     CassBatchType batchMode;