|
@@ -155,6 +155,7 @@ public:
|
|
|
{
|
|
|
CassandraFuture close_future(cass_session_close(session));
|
|
|
cass_future_wait(close_future);
|
|
|
+ cass_session_free(session);
|
|
|
}
|
|
|
}
|
|
|
inline operator CassSession *() const
|
|
@@ -1106,12 +1107,6 @@ protected:
|
|
|
|
|
|
// Each call to a Cassandra function will use a new CassandraEmbedFunctionContext object
|
|
|
|
|
|
-static void cassandraLogCallback(cass_uint64_t time, CassLogLevel severity, CassString message, void* data)
|
|
|
-{
|
|
|
- const IContextLogger *logctx = (const IContextLogger *) data;
|
|
|
- logctx->CTXLOG("cassandra: %s: %.*s", cass_log_level_string(severity), (int) message.length, message.data);
|
|
|
-}
|
|
|
-
|
|
|
class CassandraEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
|
|
|
{
|
|
|
public:
|
|
@@ -1119,7 +1114,6 @@ public:
|
|
|
: logctx(_logctx), flags(_flags), nextParam(0), numParams(0), batchMode((CassBatchType) -1)
|
|
|
{
|
|
|
cluster.setown(new CassandraCluster(cass_cluster_new()));
|
|
|
- cass_cluster_set_log_callback(*cluster, cassandraLogCallback, (void *) &logctx);
|
|
|
const char *contact_points = "localhost";
|
|
|
const char *user = "";
|
|
|
const char *password = "";
|
|
@@ -1164,7 +1158,7 @@ public:
|
|
|
else if (stricmp(optName, "num_threads_io")==0)
|
|
|
{
|
|
|
unsigned num_threads_io = getUnsignedOption(val, "num_threads_io");
|
|
|
- checkSetOption(cass_cluster_set_num_threads_io(*cluster, num_threads_io), "num_threads_io");
|
|
|
+ cass_cluster_set_num_threads_io(*cluster, num_threads_io); // No status return
|
|
|
}
|
|
|
else if (stricmp(optName, "queue_size_io")==0)
|
|
|
{
|
|
@@ -1181,35 +1175,35 @@ public:
|
|
|
unsigned max_connections_per_host = getUnsignedOption(val, "max_connections_per_host");
|
|
|
checkSetOption(cass_cluster_set_max_connections_per_host(*cluster, max_connections_per_host), "max_connections_per_host");
|
|
|
}
|
|
|
- else if (stricmp(optName, "max_simultaneous_creation")==0)
|
|
|
+ else if (stricmp(optName, "max_concurrent_creation")==0)
|
|
|
{
|
|
|
- unsigned max_simultaneous_creation = getUnsignedOption(val, "max_simultaneous_creation");
|
|
|
- checkSetOption(cass_cluster_set_max_simultaneous_creation(*cluster, max_simultaneous_creation), "max_simultaneous_creation");
|
|
|
+ unsigned max_concurrent_creation = getUnsignedOption(val, "max_concurrent_creation");
|
|
|
+ checkSetOption(cass_cluster_set_max_concurrent_creation(*cluster, max_concurrent_creation), "max_concurrent_creation");
|
|
|
}
|
|
|
- else if (stricmp(optName, "max_pending_requests")==0)
|
|
|
+ else if (stricmp(optName, "pending_requests_high_water_mark")==0)
|
|
|
{
|
|
|
- unsigned max_pending_requests = getUnsignedOption(val, "max_pending_requests");
|
|
|
- checkSetOption(cass_cluster_set_max_pending_requests(*cluster, max_pending_requests), "max_pending_requests");
|
|
|
+ unsigned pending_requests_high_water_mark = getUnsignedOption(val, "pending_requests_high_water_mark");
|
|
|
+ checkSetOption(cass_cluster_set_pending_requests_high_water_mark(*cluster, pending_requests_high_water_mark), "pending_requests_high_water_mark");
|
|
|
}
|
|
|
- else if (stricmp(optName, "max_simultaneous_requests_threshold")==0)
|
|
|
+ else if (stricmp(optName, "pending_requests_low_water_mark")==0)
|
|
|
{
|
|
|
- unsigned max_simultaneous_requests_threshold = getUnsignedOption(val, "max_simultaneous_requests_threshold");
|
|
|
- checkSetOption(cass_cluster_set_max_simultaneous_requests_threshold(*cluster, max_simultaneous_requests_threshold), "max_simultaneous_requests_threshold");
|
|
|
+ unsigned pending_requests_low_water_mark = getUnsignedOption(val, "pending_requests_low_water_mark");
|
|
|
+ checkSetOption(cass_cluster_set_pending_requests_low_water_mark(*cluster, pending_requests_low_water_mark), "pending_requests_low_water_mark");
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "max_concurrent_requests_threshold")==0)
|
|
|
+ {
|
|
|
+ unsigned max_concurrent_requests_threshold = getUnsignedOption(val, "max_concurrent_requests_threshold");
|
|
|
+ checkSetOption(cass_cluster_set_max_concurrent_requests_threshold(*cluster, max_concurrent_requests_threshold), "max_concurrent_requests_threshold");
|
|
|
}
|
|
|
else if (stricmp(optName, "connect_timeout")==0)
|
|
|
{
|
|
|
unsigned connect_timeout = getUnsignedOption(val, "connect_timeout");
|
|
|
- checkSetOption(cass_cluster_set_connect_timeout(*cluster, connect_timeout), "connect_timeout");
|
|
|
+ cass_cluster_set_connect_timeout(*cluster, connect_timeout);
|
|
|
}
|
|
|
else if (stricmp(optName, "request_timeout")==0)
|
|
|
{
|
|
|
unsigned request_timeout = getUnsignedOption(val, "request_timeout");
|
|
|
- checkSetOption(cass_cluster_set_request_timeout(*cluster, request_timeout), "request_timeout");
|
|
|
- }
|
|
|
- else if (stricmp(optName, "log_level")==0)
|
|
|
- {
|
|
|
- unsigned log_level = getUnsignedOption(val, "log_level");
|
|
|
- checkSetOption(cass_cluster_set_log_level(*cluster, (CassLogLevel) log_level), "log_level");
|
|
|
+ cass_cluster_set_request_timeout(*cluster, request_timeout);
|
|
|
}
|
|
|
else
|
|
|
failx("Unrecognized option %s", optName.str());
|
|
@@ -1219,9 +1213,9 @@ public:
|
|
|
if (*user || *password)
|
|
|
cass_cluster_set_credentials(*cluster, user, password);
|
|
|
|
|
|
- CassandraFuture future(keyspace ? cass_cluster_connect_keyspace(*cluster, keyspace) : cass_cluster_connect(*cluster));
|
|
|
+ session.setown(new CassandraSession(cass_session_new()));
|
|
|
+ CassandraFuture future(keyspace ? cass_session_connect_keyspace(*session, *cluster, keyspace) : cass_session_connect(*session, *cluster));
|
|
|
future.wait("connect");
|
|
|
- session.setown(new CassandraSession(cass_future_get_session(future)));
|
|
|
}
|
|
|
virtual bool getBooleanResult()
|
|
|
{
|