Browse Source

Merge pull request #7618 from richardkchapman/cassandra-limiter54

HPCC-13724 Review use of batch in cassandra plugin

Reviewed-By: Jamie Noss <james.noss@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 years ago
parent
commit
356e8f242c
1 changed files with 18 additions and 11 deletions
  1. 18 11
      plugins/cassandra/cassandraembed.cpp

+ 18 - 11
plugins/cassandra/cassandraembed.cpp

@@ -254,10 +254,16 @@ private:
     CassStatement *statement;
 };
 
+class LinkedSemaphore : public CInterfaceOf<IInterface>, public Semaphore
+{
+public:
+    LinkedSemaphore(unsigned initialCount) : Semaphore(initialCount) {}
+};
+
 class CassandraRetryingFuture : public CInterface
 {
 public:
-    CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter = NULL, unsigned _retries = 10)
+    CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, LinkedSemaphore *_limiter = NULL, unsigned _retries = 10)
     : session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
     {
         execute();
@@ -300,8 +306,9 @@ private:
             execute();
             cass_future_wait(future);
             CassError rc = cass_future_error_code(future);
-            if(rc == CASS_OK)
+            if (rc == CASS_OK)
                 return true;
+            Sleep(10);
         }
         return false;
     }
@@ -311,20 +318,20 @@ private:
             limiter->wait();
         future = cass_session_execute(session, statement);
         if (limiter)
-            cass_future_set_callback(future, signaller, this); // Note - this will call the callback if the future has already completed
+            cass_future_set_callback(future, signaller, LINK(limiter)); // Note - this will call the callback if the future has already completed
     }
     static void signaller(CassFuture *future, void *data)
     {
-        CassandraRetryingFuture *self = (CassandraRetryingFuture *) data;
-        if (self && self->limiter)
-            self->limiter->signal();
+        LinkedSemaphore *sem = (LinkedSemaphore *) data;
+        sem->signal();
+        ::Release(sem);
     }
     CassandraRetryingFuture(const CassandraFuture &);
     CassFuture *future;
     CassSession *session;
     CassandraStatement statement;
     unsigned retries;
-    Semaphore *limiter;
+    LinkedSemaphore *limiter;
 };
 
 class CassandraPrepared : public CInterface
@@ -432,7 +439,7 @@ public:
     {
         stop();
         futures.kill();
-        delete semaphore;
+        semaphore.clear();  // Note - may live on for a while until all futures associated with it have signalled.
     }
     inline void stop()
     {
@@ -468,8 +475,8 @@ public:
     {
         if (batchMode != (CassBatchType) -1)
             batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
-        else
-            semaphore = new Semaphore(maxFutures ? maxFutures : 100);
+        else if (maxFutures)
+            semaphore.setown(new LinkedSemaphore(maxFutures));
         statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
         inBatch = true;
     }
@@ -532,7 +539,7 @@ protected:
     Owned<CassandraFutureResult> result;
     Owned<CassandraIterator> iterator;
     CIArrayOf<CassandraRetryingFuture> futures;
-    Semaphore *semaphore;
+    Owned<LinkedSemaphore> semaphore;
     unsigned numBindings;
     unsigned maxFutures;
     unsigned maxRetries;