فهرست منبع

HPCC-13724 Review use of batch in cassandra plugin

If batch mode not specified on a bulk insert, use async futures to ensure
optimal performance.

Can set MAXFUTURES=nn option to control how many futures will be used, default
is 100.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 سال پیش
والد
کامیت
0aec90a953
2فایلهای تغییر یافته به همراه134 افزوده شده و 16 حذف شده
  1. 118 9
      plugins/cassandra/cassandraembed.cpp
  2. 16 7
      testing/regress/ecl/cassandra-simple.ecl

+ 118 - 9
plugins/cassandra/cassandraembed.cpp

@@ -243,11 +243,90 @@ public:
     {
         return statement;
     }
+    inline CassStatement *getClear()
+    {
+        CassStatement *ret = statement;
+        statement = NULL;
+        return ret;
+    }
 private:
     CassandraStatement(const CassandraStatement &);
     CassStatement *statement;
 };
 
+class CassandraRetryingFuture : public CInterface
+{
+public:
+    CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter = NULL, unsigned _retries = 10)
+    : session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
+    {
+        execute();
+    }
+    ~CassandraRetryingFuture()
+    {
+        if (future)
+            cass_future_free(future);
+    }
+    inline operator CassFuture *() const
+    {
+        return future;
+    }
+    void wait(const char *why)
+    {
+        cass_future_wait(future);
+        CassError rc = cass_future_error_code(future);
+        if(rc != CASS_OK)
+        {
+            switch (rc)
+            {
+            case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: // MORE - are there others we should retry?
+                if (retry(why))
+                    break;
+                // fall into
+            default:
+                const char *message;
+                size_t length;
+                cass_future_error_message(future, &message, &length);
+                VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
+                rtlFail(0, err.str());
+            }
+        }
+    }
+private:
+    bool retry(const char *why)
+    {
+        for (int i = 0; i < retries; i++)
+        {
+            execute();
+            cass_future_wait(future);
+            CassError rc = cass_future_error_code(future);
+            if(rc == CASS_OK)
+                return true;
+        }
+        return false;
+    }
+    void execute()
+    {
+        if (limiter)
+            limiter->wait();
+        future = cass_session_execute(session, statement);
+        if (limiter)
+            cass_future_set_callback(future, signaller, this); // Note - this will call the callback if the future has already completed
+    }
+    static void signaller(CassFuture *future, void *data)
+    {
+        CassandraRetryingFuture *self = (CassandraRetryingFuture *) data;
+        if (self && self->limiter)
+            self->limiter->signal();
+    }
+    CassandraRetryingFuture(const CassandraFuture &);
+    CassFuture *future;
+    CassSession *session;
+    CassandraStatement statement;
+    unsigned retries;
+    Semaphore *limiter;
+};
+
 class CassandraPrepared : public CInterface
 {
 public:
@@ -340,18 +419,20 @@ class CassandraStatementInfo : public CInterface
 {
 public:
     IMPLEMENT_IINTERFACE;
-    CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize)
-    : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
+    CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize, unsigned _maxFutures, unsigned _maxRetries)
+    : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode), semaphore(NULL), maxFutures(_maxFutures), maxRetries(_maxRetries)
     {
         assertex(prepared && *prepared);
         statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
         if (pageSize)
             cass_statement_set_paging_size(*statement, pageSize);
-
+        inBatch = false;
     }
     ~CassandraStatementInfo()
     {
         stop();
+        futures.kill();
+        delete semaphore;
     }
     inline void stop()
     {
@@ -386,10 +467,11 @@ public:
     void startStream()
     {
         if (batchMode != (CassBatchType) -1)
-        {
             batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
-            statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
-        }
+        else
+            semaphore = new Semaphore(maxFutures ? maxFutures : 100);
+        statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
+        inBatch = true;
     }
     void endStream()
     {
@@ -398,6 +480,13 @@ public:
             result.setown(new CassandraFutureResult (cass_session_execute_batch(*session, *batch)));
             assertex (rowCount() == 0);
         }
+        else
+        {
+            ForEachItemIn(idx, futures)
+            {
+                futures.item(idx).wait("endStream");
+            }
+        }
     }
     void execute()
     {
@@ -407,6 +496,11 @@ public:
             check(cass_batch_add_statement(*batch, *statement));
             statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
         }
+        else if (inBatch)
+        {
+            futures.append(*new CassandraRetryingFuture(*session, statement->getClear(), semaphore, maxRetries));
+            statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
+        }
         else
         {
             result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
@@ -437,8 +531,13 @@ protected:
     Owned<CassandraStatement> statement;
     Owned<CassandraFutureResult> result;
     Owned<CassandraIterator> iterator;
+    CIArrayOf<CassandraRetryingFuture> futures;
+    Semaphore *semaphore;
     unsigned numBindings;
-    CassBatchType(batchMode);
+    unsigned maxFutures;
+    unsigned maxRetries;
+    bool inBatch;
+    CassBatchType batchMode;
 };
 
 // Conversions from Cassandra values to ECL data
@@ -1162,7 +1261,7 @@ class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
     CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
-      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
+      : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0), maxFutures(0), maxRetries(10)
     {
         cluster.setown(new CassandraCluster(cass_cluster_new()));
         const char *contact_points = "localhost";
@@ -1200,6 +1299,14 @@ public:
                 {
                     pageSize=getUnsignedOption(val, "pageSize");
                 }
+                else if (stricmp(optName, "maxFutures")==0)
+                {
+                    maxFutures=getUnsignedOption(val, "maxFutures");
+                }
+                else if (stricmp(optName, "maxRetries")==0)
+                {
+                    maxRetries=getUnsignedOption(val, "maxRetries");
+                }
                 else if (stricmp(optName, "port")==0)
                 {
                     unsigned port = getUnsignedOption(val, "port");
@@ -1765,7 +1872,7 @@ public:
                 numParams = countBindings(script);
             else
                 numParams = 0;
-            stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, batchMode, pageSize));
+            stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, batchMode, pageSize, maxFutures, maxRetries));
         }
     }
     virtual void callFunction()
@@ -1924,6 +2031,8 @@ protected:
     unsigned nextParam;
     unsigned numParams;
     unsigned pageSize;
+    unsigned maxFutures;
+    unsigned maxRetries;
     CassBatchType batchMode;
     StringAttr queryString;
 

+ 16 - 7
testing/regress/ecl/cassandra-simple.ecl

@@ -24,7 +24,7 @@ IMPORT cassandra;
  This example illustrates various calls to embdded Cassandra CQL code
  */
 
-// This is the record structure in ECL that will correspond to the rows in the Cassabdra dataset
+// This is the record structure in ECL that will correspond to the rows in the Cassandra dataset
 // Note that the default values specified in the fields will be used when a NULL value is being
 // returned from Cassandra
 
@@ -93,16 +93,25 @@ createTables() := EMBED(cassandra : server(server),user('rchapman'),keyspace('te
 ENDEMBED;
 
 // Initialize the Cassandra table, passing in the ECL dataset to provide the rows
-// Note the batch option to control how cassandra inserts are batched
-// If not supplied, each insert is executed individually - because Cassandra
-// has restrictions about what can be done in a batch, we can't default to using batch
-// unless told to...
+// When not using batch mode, maxFutures controls how many simultaenous writes to Cassandra are allowed before
+// we start to throttle, and maxRetries controls how many times inserts that fail because Cassandra is too busy 
+// will be retried.
 
-initialize(dataset(childrec) values) := EMBED(cassandra : server(server),user('rchapman'),keyspace('test'),batch('unlogged'))
+initialize(dataset(childrec) values) := EMBED(cassandra : server(server),user('rchapman'),keyspace('test'),maxFutures(100),maxRetries(10))
   INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2,a,set1,list1,map1) values (?,?,?,?,?,?,?,?,?,?,?,?,?);
 ENDEMBED;
 
-initialize2(row(childrec) values) := EMBED(cassandra : server(server),user('rchapman'),keyspace('test'))
+// Note the batch option to control how cassandra inserts are batched
+// If not supplied, each insert is executed individually - because Cassandra
+// has restrictions about what can be done in a batch, we can't default to using batch
+// unless told to... Also Cassandra 2.2 and later will fail if batch gets too large. In general
+// best NOT to try to use batch for performance unless you know that 
+//   (a) the resulting batch will be small (default limit is 50k bytes) and
+//   (b) all the records in the batch share the same partition key and
+//   (c) you use 'unlogged' mode
+// Use of batch to ensure all trasactions either fail together of pass together is ok, but subject to the same size restrictions
+
+initialize2(row(childrec) values) := EMBED(cassandra : server(server),user('rchapman'),keyspace('test'),batch('unlogged'))
   INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2,a,set1,list1,map1) values (?,?,?,?,?,?,?,?,?,?,?,?,?);
 ENDEMBED;