|
@@ -338,7 +338,7 @@ void CassandraSession::set(CassSession *_session)
|
|
|
|
|
|
//----------------------
|
|
|
|
|
|
-CassandraRetryingFuture::CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, Semaphore *_limiter, unsigned _retries)
|
|
|
+CassandraRetryingFuture::CassandraRetryingFuture(CassSession *_session, CassStatement *_statement, LinkedSemaphore *_limiter, unsigned _retries)
|
|
|
: session(_session), statement(_statement), retries(_retries), limiter(_limiter), future(NULL)
|
|
|
{
|
|
|
execute();
|
|
@@ -379,8 +379,9 @@ bool CassandraRetryingFuture::retry(const char *why)
|
|
|
execute();
|
|
|
cass_future_wait(future);
|
|
|
CassError rc = cass_future_error_code(future);
|
|
|
- if(rc == CASS_OK)
|
|
|
+ if (rc == CASS_OK)
|
|
|
return true;
|
|
|
+ Sleep(10);
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -391,14 +392,14 @@ void CassandraRetryingFuture::execute()
|
|
|
limiter->wait();
|
|
|
future = cass_session_execute(session, statement);
|
|
|
if (limiter)
|
|
|
- cass_future_set_callback(future, signaller, this); // Note - this will call the callback if the future has already completed
|
|
|
+ cass_future_set_callback(future, signaller, LINK(limiter)); // Note - this will call the callback if the future has already completed
|
|
|
}
|
|
|
|
|
|
void CassandraRetryingFuture::signaller(CassFuture *future, void *data)
|
|
|
{
|
|
|
- CassandraRetryingFuture *self = (CassandraRetryingFuture *) data;
|
|
|
- if (self && self->limiter)
|
|
|
- self->limiter->signal();
|
|
|
+ LinkedSemaphore *sem = (LinkedSemaphore *) data;
|
|
|
+ sem->signal();
|
|
|
+ ::Release(sem);
|
|
|
}
|
|
|
|
|
|
//----------------------
|
|
@@ -416,7 +417,7 @@ CassandraStatementInfo::~CassandraStatementInfo()
|
|
|
{
|
|
|
stop();
|
|
|
futures.kill();
|
|
|
- delete semaphore;
|
|
|
+ semaphore.clear(); // Note - may live on for a while until all futures associated with it have signalled.
|
|
|
}
|
|
|
void CassandraStatementInfo::stop()
|
|
|
{
|
|
@@ -452,8 +453,8 @@ void CassandraStatementInfo::startStream()
|
|
|
{
|
|
|
if (batchMode != (CassBatchType) -1)
|
|
|
batch.setown(new CassandraBatch(cass_batch_new(batchMode)));
|
|
|
- else
|
|
|
- semaphore = new Semaphore(maxFutures ? maxFutures : 100);
|
|
|
+ else if (maxFutures)
|
|
|
+ semaphore.setown(new LinkedSemaphore(maxFutures));
|
|
|
statement.setown(new CassandraStatement(cass_prepared_bind(*prepared)));
|
|
|
inBatch = true;
|
|
|
}
|