|
@@ -146,6 +146,10 @@ void CassandraCluster::setOptions(const StringArray &options)
|
|
|
}
|
|
|
else if (stricmp(optName, "pageSize")==0)
|
|
|
pageSize = getUnsignedOption(val, "pageSize");
|
|
|
+ else if (stricmp(optName, "maxFutures")==0)
|
|
|
+ maxFutures=getUnsignedOption(val, "maxFutures");
|
|
|
+ else if (stricmp(optName, "maxRetries")==0)
|
|
|
+ maxRetries=getUnsignedOption(val, "maxRetries");
|
|
|
else if (stricmp(optName, "port")==0)
|
|
|
{
|
|
|
unsigned port = getUnsignedOption(val, "port");
|
|
@@ -332,20 +336,96 @@ void CassandraSession::set(CassSession *_session)
|
|
|
session = _session;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+class CassandraRetryingFuture : public CInterface
|
|
|
+{
|
|
|
+public:
|
|
|
+ CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter = NULL, unsigned _retries = 10)
|
|
|
+ : session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
|
|
|
+ {
|
|
|
+ execute();
|
|
|
+ }
|
|
|
+ ~CassandraRetryingFuture()
|
|
|
+ {
|
|
|
+ if (future)
|
|
|
+ cass_future_free(future);
|
|
|
+ }
|
|
|
+ inline operator CassFuture *() const
|
|
|
+ {
|
|
|
+ return future;
|
|
|
+ }
|
|
|
+ void wait(const char *why)
|
|
|
+ {
|
|
|
+ cass_future_wait(future);
|
|
|
+ CassError rc = cass_future_error_code(future);
|
|
|
+ if(rc != CASS_OK)
|
|
|
+ {
|
|
|
+ switch (rc)
|
|
|
+ {
|
|
|
+ case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: // MORE - are there others we should retry?
|
|
|
+ if (retry(why))
|
|
|
+ break;
|
|
|
+ // fall into
|
|
|
+ default:
|
|
|
+ const char *message;
|
|
|
+ size_t length;
|
|
|
+ cass_future_error_message(future, &message, &length);
|
|
|
+ VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
|
|
|
+ rtlFail(0, err.str());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+private:
|
|
|
+ bool retry(const char *why)
|
|
|
+ {
|
|
|
+ for (int i = 0; i < retries; i++)
|
|
|
+ {
|
|
|
+ execute();
|
|
|
+ cass_future_wait(future);
|
|
|
+ CassError rc = cass_future_error_code(future);
|
|
|
+ if(rc == CASS_OK)
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ void execute()
|
|
|
+ {
|
|
|
+ if (limiter)
|
|
|
+ limiter->wait();
|
|
|
+ future = cass_session_execute(session, statement);
|
|
|
+ if (limiter)
|
|
|
+ cass_future_set_callback(future, signaller, this); // Note - this will call the callback if the future has already completed
|
|
|
+ }
|
|
|
+ static void signaller(CassFuture *future, void *data)
|
|
|
+ {
|
|
|
+ CassandraRetryingFuture *self = (CassandraRetryingFuture *) data;
|
|
|
+ if (self && self->limiter)
|
|
|
+ self->limiter->signal();
|
|
|
+ }
|
|
|
+ CassandraRetryingFuture(const CassandraFuture &);
|
|
|
+ CassFuture *future;
|
|
|
+ CassSession *session;
|
|
|
+ CassandraStatement statement;
|
|
|
+ unsigned retries;
|
|
|
+ Semaphore *limiter;
|
|
|
+};
|
|
|
+
|
|
|
//----------------------
|
|
|
|
|
|
-CassandraStatementInfo::CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSize)
|
|
|
-: session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode)
|
|
|
+CassandraStatementInfo::CassandraStatementInfo(CassandraSession *_session, CassandraPrepared *_prepared, unsigned _numBindings, CassBatchType _batchMode, unsigned pageSizee, unsigned _maxFutures, unsigned _maxRetries)
|
|
|
+ : session(_session), prepared(_prepared), numBindings(_numBindings), batchMode(_batchMode), semaphore(NULL), maxFutures(_maxFutures), maxRetries(_maxRetries)
|
|
|
{
|
|
|
assertex(prepared && *prepared);
|
|
|
statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
|
|
|
if (pageSize)
|
|
|
cass_statement_set_paging_size(*statement, pageSize);
|
|
|
-
|
|
|
+ inBatch = false;
|
|
|
}
|
|
|
CassandraStatementInfo::~CassandraStatementInfo()
|
|
|
{
|
|
|
stop();
|
|
|
+ futures.kill();
|
|
|
+ delete semaphore;
|
|
|
}
|
|
|
void CassandraStatementInfo::stop()
|
|
|
{
|
|
@@ -380,10 +460,11 @@ bool CassandraStatementInfo::next()
|
|
|
void CassandraStatementInfo::startStream()
|
|
|
{
|
|
|
if (batchMode != (CassBatchType) -1)
|
|
|
- {
|
|
|
batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
|
|
|
- statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
|
|
|
- }
|
|
|
+ else
|
|
|
+ semaphore = new Semaphore(maxFutures ? maxFutures : 100);
|
|
|
+ statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
|
|
|
+ inBatch = true;
|
|
|
}
|
|
|
void CassandraStatementInfo::endStream()
|
|
|
{
|
|
@@ -392,6 +473,13 @@ void CassandraStatementInfo::endStream()
|
|
|
result.setown(new CassandraFutureResult (cass_session_execute_batch(*session, *batch)));
|
|
|
assertex (rowCount() == 0);
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ ForEachItemIn(idx, futures)
|
|
|
+ {
|
|
|
+ futures.item(idx).wait("endStream");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
void CassandraStatementInfo::execute()
|
|
|
{
|
|
@@ -401,6 +489,11 @@ void CassandraStatementInfo::execute()
|
|
|
check(cass_batch_add_statement(*batch, *statement));
|
|
|
statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
|
|
|
}
|
|
|
+ else if (inBatch)
|
|
|
+ {
|
|
|
+ futures.append(*new CassandraRetryingFuture(*session, statement->getClear(), semaphore, maxRetries));
|
|
|
+ statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
|
|
|
+ }
|
|
|
else
|
|
|
{
|
|
|
result.setown(new CassandraFutureResult(cass_session_execute(*session, *statement)));
|
|
@@ -1138,7 +1231,7 @@ class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
|
|
|
{
|
|
|
public:
|
|
|
CassandraEmbedFunctionContext(const IContextLogger &_logctx, unsigned _flags, const char *options)
|
|
|
- : logctx(_logctx), flags(_flags), nextParam(0), numParams(0)
|
|
|
+ : logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1), pageSize(0)
|
|
|
{
|
|
|
StringArray opts;
|
|
|
opts.appendList(options, ",");
|
|
@@ -1587,7 +1680,7 @@ public:
|
|
|
numParams = countBindings(script);
|
|
|
else
|
|
|
numParams = 0;
|
|
|
- stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, cluster->batchMode, cluster->pageSize));
|
|
|
+ stmtInfo.setown(new CassandraStatementInfo(session, prepared, numParams, cluster->batchMode, cluster->pageSize, cluster->maxFutures, cluster->maxRetries));
|
|
|
}
|
|
|
}
|
|
|
virtual void callFunction()
|