|
@@ -75,6 +75,18 @@ extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
|
|
|
|
|
|
namespace cassandraembed {
|
|
|
|
|
|
+static void logCallBack(const CassLogMessage *message, void *data)
|
|
|
+{
|
|
|
+ DBGLOG("cassandra: %s - %s", cass_log_level_string(message->severity), message->message);
|
|
|
+}
|
|
|
+
|
|
|
+MODULE_INIT(INIT_PRIORITY_STANDARD)
|
|
|
+{
|
|
|
+ cass_log_set_callback(logCallBack, NULL);
|
|
|
+ cass_log_set_level(CASS_LOG_WARN);
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
static void failx(const char *msg, ...) __attribute__((noreturn)) __attribute__((format(printf, 1, 2)));
|
|
|
static void fail(const char *msg) __attribute__((noreturn));
|
|
|
|
|
@@ -193,6 +205,60 @@ public:
|
|
|
unsigned request_timeout = getUnsignedOption(val, "request_timeout");
|
|
|
cass_cluster_set_request_timeout(cluster, request_timeout);
|
|
|
}
|
|
|
+ else if (stricmp(optName, "load_balance_round_robin")==0)
|
|
|
+ {
|
|
|
+ cass_bool_t enable = getBoolOption(val, "load_balance_round_robin");
|
|
|
+ if (enable==cass_true)
|
|
|
+ cass_cluster_set_load_balance_round_robin(cluster);
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "load_balance_dc_aware")==0)
|
|
|
+ {
|
|
|
+ StringArray lbargs;
|
|
|
+ lbargs.appendList(val, "|");
|
|
|
+ if (lbargs.length() != 3)
|
|
|
+ failx("Invalid value '%s' for option %s - expected 3 subvalues (separate with |)", val, optName.str());
|
|
|
+ unsigned usedPerRemote = getUnsignedOption(lbargs.item(2), "load_balance_dc_aware");
|
|
|
+ cass_bool_t allowRemote = getBoolOption(lbargs.item(2), "load_balance_dc_aware");
|
|
|
+ checkSetOption(cass_cluster_set_load_balance_dc_aware(cluster, lbargs.item(0), usedPerRemote, allowRemote), "load_balance_dc_aware");
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "token_aware_routing")==0)
|
|
|
+ {
|
|
|
+ cass_bool_t enable = getBoolOption(val, "token_aware_routing");
|
|
|
+ cass_cluster_set_token_aware_routing(cluster, enable);
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "latency_aware_routing")==0)
|
|
|
+ {
|
|
|
+ cass_bool_t enable = getBoolOption(val, "latency_aware_routing");
|
|
|
+ cass_cluster_set_latency_aware_routing(cluster, enable);
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "latency_aware_routing_settings")==0)
|
|
|
+ {
|
|
|
+ StringArray subargs;
|
|
|
+ subargs.appendList(val, "|");
|
|
|
+ if (subargs.length() != 5)
|
|
|
+ failx("Invalid value '%s' for option %s - expected 5 subvalues (separate with |)", val, optName.str());
|
|
|
+ cass_double_t exclusion_threshold = getDoubleOption(subargs.item(0), "exclusion_threshold");
|
|
|
+ cass_uint64_t scale_ms = getUnsigned64Option(subargs.item(1), "scale_ms");
|
|
|
+ cass_uint64_t retry_period_ms = getUnsigned64Option(subargs.item(2), "retry_period_ms");
|
|
|
+ cass_uint64_t update_rate_ms = getUnsigned64Option(subargs.item(3), "update_rate_ms");
|
|
|
+ cass_uint64_t min_measured = getUnsigned64Option(subargs.item(4), "min_measured");
|
|
|
+ cass_cluster_set_latency_aware_routing_settings(cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured);
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "tcp_nodelay")==0)
|
|
|
+ {
|
|
|
+ cass_bool_t enable = getBoolOption(val, "tcp_nodelay");
|
|
|
+ cass_cluster_set_tcp_nodelay(cluster, enable);
|
|
|
+ }
|
|
|
+ else if (stricmp(optName, "tcp_keepalive")==0)
|
|
|
+ {
|
|
|
+ StringArray subargs;
|
|
|
+ subargs.appendList(val, "|");
|
|
|
+ if (subargs.length() != 2)
|
|
|
+ failx("Invalid value '%s' for option %s - expected 2 subvalues (separate with |)", val, optName.str());
|
|
|
+ cass_bool_t enabled = getBoolOption(subargs.item(0), "enabled");
|
|
|
+ unsigned delay_secs = getUnsignedOption(subargs.item(0), "delay_secs");
|
|
|
+ cass_cluster_set_tcp_keepalive(cluster, enabled, delay_secs);
|
|
|
+ }
|
|
|
else
|
|
|
failx("Unrecognized option %s", optName.str());
|
|
|
}
|
|
@@ -218,13 +284,30 @@ private:
|
|
|
failx("While setting option %s: %s", name, cass_error_desc(rc));
|
|
|
}
|
|
|
}
|
|
|
+ cass_bool_t getBoolOption(const char *val, const char *option)
|
|
|
+ {
|
|
|
+ return strToBool(val) ? cass_true : cass_false;
|
|
|
+ }
|
|
|
unsigned getUnsignedOption(const char *val, const char *option)
|
|
|
{
|
|
|
char *endp;
|
|
|
long value = strtoul(val, &endp, 0);
|
|
|
- if (endp==val || *endp != '\0' || value > INT_MAX || value < INT_MIN)
|
|
|
+ if (endp==val || *endp != '\0' || value > UINT_MAX || value < 0)
|
|
|
failx("Invalid value '%s' for option %s", val, option);
|
|
|
- return (int) value;
|
|
|
+ return (unsigned) value;
|
|
|
+ }
|
|
|
+ unsigned getDoubleOption(const char *val, const char *option)
|
|
|
+ {
|
|
|
+ char *endp;
|
|
|
+ double value = strtod(val, &endp);
|
|
|
+ if (endp==val || *endp != '\0')
|
|
|
+ failx("Invalid value '%s' for option %s", val, option);
|
|
|
+ return value;
|
|
|
+ }
|
|
|
+ __uint64 getUnsigned64Option(const char *val, const char *option)
|
|
|
+ {
|
|
|
+ // MORE - could check it's all digits (with optional leading spaces...), if we cared.
|
|
|
+ return rtlVStrToUInt8(val);
|
|
|
}
|
|
|
CassandraCluster(const CassandraCluster &);
|
|
|
CassCluster *cluster;
|
|
@@ -256,8 +339,10 @@ public:
|
|
|
CassError rc = cass_future_error_code(future);
|
|
|
if(rc != CASS_OK)
|
|
|
{
|
|
|
- CassString message = cass_future_error_message(future);
|
|
|
- VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int)message.length, message.data);
|
|
|
+ const char *message;
|
|
|
+ size_t length;
|
|
|
+ cass_future_error_message(future, &message, &length);
|
|
|
+ VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
|
|
|
rtlFail(0, err.str());
|
|
|
}
|
|
|
}
|
|
@@ -328,7 +413,7 @@ public:
|
|
|
CassandraStatement(CassStatement *_statement) : statement(_statement)
|
|
|
{
|
|
|
}
|
|
|
- CassandraStatement(const char *simple) : statement(cass_statement_new(cass_string_init(simple), 0))
|
|
|
+ CassandraStatement(const char *simple) : statement(cass_statement_new(simple, 0))
|
|
|
{
|
|
|
}
|
|
|
~CassandraStatement()
|
|
@@ -638,9 +723,10 @@ static void getDataResult(const RtlFieldInfo *field, const CassValue *value, siz
|
|
|
// it seems like it could be more useful to support anything
|
|
|
// if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
|
|
|
// typeError("blob", value, field);
|
|
|
- CassBytes bytes;
|
|
|
- check(cass_value_get_bytes(value, &bytes));
|
|
|
- rtlStrToDataX(chars, result, bytes.size, bytes.data);
|
|
|
+ const cass_byte_t *bytes;
|
|
|
+ size_t size;
|
|
|
+ check(cass_value_get_bytes(value, &bytes, &size));
|
|
|
+ rtlStrToDataX(chars, result, size, bytes);
|
|
|
}
|
|
|
|
|
|
static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
|
|
@@ -725,22 +811,20 @@ static void getStringResult(const RtlFieldInfo *field, const CassValue *value, s
|
|
|
{
|
|
|
case CASS_VALUE_TYPE_ASCII:
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- const char *text = output.data;
|
|
|
- unsigned long bytes = output.length;
|
|
|
- rtlStrToStrX(chars, result, bytes, text);
|
|
|
+ const char *output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ rtlStrToStrX(chars, result, length, output);
|
|
|
break;
|
|
|
}
|
|
|
case CASS_VALUE_TYPE_VARCHAR:
|
|
|
case CASS_VALUE_TYPE_TEXT:
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- const char *text = output.data;
|
|
|
- unsigned long bytes = output.length;
|
|
|
- unsigned numchars = rtlUtf8Length(bytes, text);
|
|
|
- rtlUtf8ToStrX(chars, result, numchars, text);
|
|
|
+ const char *output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ unsigned numchars = rtlUtf8Length(length, output);
|
|
|
+ rtlUtf8ToStrX(chars, result, numchars, output);
|
|
|
break;
|
|
|
}
|
|
|
default:
|
|
@@ -760,22 +844,20 @@ static void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, siz
|
|
|
{
|
|
|
case CASS_VALUE_TYPE_ASCII:
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- const char *text = output.data;
|
|
|
- unsigned long bytes = output.length;
|
|
|
- rtlStrToUtf8X(chars, result, bytes, text);
|
|
|
+ const char *output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ rtlStrToUtf8X(chars, result, length, output);
|
|
|
break;
|
|
|
}
|
|
|
case CASS_VALUE_TYPE_VARCHAR:
|
|
|
case CASS_VALUE_TYPE_TEXT:
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- const char *text = output.data;
|
|
|
- unsigned long bytes = output.length;
|
|
|
- unsigned numchars = rtlUtf8Length(bytes, text);
|
|
|
- rtlUtf8ToUtf8X(chars, result, numchars, text);
|
|
|
+ const char * output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ unsigned numchars = rtlUtf8Length(length, output);
|
|
|
+ rtlUtf8ToUtf8X(chars, result, numchars, output);
|
|
|
break;
|
|
|
}
|
|
|
default:
|
|
@@ -795,22 +877,20 @@ static void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value,
|
|
|
{
|
|
|
case CASS_VALUE_TYPE_ASCII:
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- const char *text = output.data;
|
|
|
- unsigned long bytes = output.length;
|
|
|
- rtlStrToUnicodeX(chars, result, bytes, text);
|
|
|
+ const char * output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ rtlStrToUnicodeX(chars, result, length, output);
|
|
|
break;
|
|
|
}
|
|
|
case CASS_VALUE_TYPE_VARCHAR:
|
|
|
case CASS_VALUE_TYPE_TEXT:
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- const char *text = output.data;
|
|
|
- unsigned long bytes = output.length;
|
|
|
- unsigned numchars = rtlUtf8Length(bytes, text);
|
|
|
- rtlUtf8ToUnicodeX(chars, result, numchars, text);
|
|
|
+ const char * output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ unsigned numchars = rtlUtf8Length(length, output);
|
|
|
+ rtlUtf8ToUnicodeX(chars, result, numchars, output);
|
|
|
break;
|
|
|
}
|
|
|
default:
|
|
@@ -996,13 +1076,12 @@ public:
|
|
|
rtlDataAttr utfText;
|
|
|
rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
|
|
|
if (collection)
|
|
|
- checkBind(cass_collection_append_string(*collection,
|
|
|
- cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
|
|
|
+ checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
|
|
|
field);
|
|
|
else
|
|
|
- checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
|
|
|
+ checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
|
|
|
checkNextParam(field),
|
|
|
- cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
|
|
|
+ utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
|
|
|
field);
|
|
|
}
|
|
|
virtual void processBool(bool value, const RtlFieldInfo * field)
|
|
@@ -1015,9 +1094,9 @@ public:
|
|
|
virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
|
|
|
{
|
|
|
if (collection)
|
|
|
- checkBind(cass_collection_append_bytes(*collection, cass_bytes_init((const cass_byte_t*) value, len)), field);
|
|
|
+ checkBind(cass_collection_append_bytes(*collection, (const cass_byte_t*) value, len), field);
|
|
|
else
|
|
|
- checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
|
|
|
+ checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), (const cass_byte_t*) value, len), field);
|
|
|
}
|
|
|
virtual void processInt(__int64 value, const RtlFieldInfo * field)
|
|
|
{
|
|
@@ -1076,13 +1155,12 @@ public:
|
|
|
rtlDataAttr utfText;
|
|
|
rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
|
|
|
if (collection)
|
|
|
- checkBind(cass_collection_append_string(*collection,
|
|
|
- cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
|
|
|
+ checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
|
|
|
field);
|
|
|
else
|
|
|
- checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
|
|
|
- checkNextParam(field),
|
|
|
- cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
|
|
|
+ checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
|
|
|
+ checkNextParam(field),
|
|
|
+ utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
|
|
|
field);
|
|
|
}
|
|
|
virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
|
|
@@ -1095,9 +1173,9 @@ public:
|
|
|
virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
|
|
|
{
|
|
|
if (collection)
|
|
|
- checkBind(cass_collection_append_string(*collection, cass_string_init2(value, rtlUtf8Size(chars, value))), field);
|
|
|
+ checkBind(cass_collection_append_string_n(*collection, value, rtlUtf8Size(chars, value)), field);
|
|
|
else
|
|
|
- checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
|
|
|
+ checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(field), value, rtlUtf8Size(chars, value)), field);
|
|
|
}
|
|
|
|
|
|
virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
|
|
@@ -1489,7 +1567,7 @@ public:
|
|
|
}
|
|
|
virtual void bindDataParam(const char *name, size32_t len, const void *val)
|
|
|
{
|
|
|
- checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), cass_bytes_init((const cass_byte_t*) val, len)), name);
|
|
|
+ checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), (const cass_byte_t*) val, len), name);
|
|
|
}
|
|
|
virtual void bindFloatParam(const char *name, float val)
|
|
|
{
|
|
@@ -1523,9 +1601,9 @@ public:
|
|
|
size32_t utf8chars;
|
|
|
rtlDataAttr utfText;
|
|
|
rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
|
|
|
- checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
|
|
|
+ checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
|
|
|
checkNextParam(name),
|
|
|
- cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
|
|
|
+ utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
|
|
|
name);
|
|
|
}
|
|
|
virtual void bindVStringParam(const char *name, const char *val)
|
|
@@ -1534,16 +1612,16 @@ public:
|
|
|
}
|
|
|
virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
|
|
|
{
|
|
|
- checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(name), cass_string_init2(val, rtlUtf8Size(chars, val))), name);
|
|
|
+ checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(name), val, rtlUtf8Size(chars, val)), name);
|
|
|
}
|
|
|
virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
|
|
|
{
|
|
|
size32_t utf8chars;
|
|
|
rtlDataAttr utfText;
|
|
|
rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
|
|
|
- checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
|
|
|
+ checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
|
|
|
checkNextParam(name),
|
|
|
- cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
|
|
|
+ utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
|
|
|
name);
|
|
|
}
|
|
|
virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
|
|
@@ -1610,7 +1688,7 @@ public:
|
|
|
size32_t utf8chars;
|
|
|
rtlDataAttr utfText;
|
|
|
rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
|
|
|
- rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
|
|
|
+ rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
|
|
|
break;
|
|
|
}
|
|
|
case type_string:
|
|
@@ -1623,7 +1701,7 @@ public:
|
|
|
size32_t utf8chars;
|
|
|
rtlDataAttr utfText;
|
|
|
rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
|
|
|
- rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
|
|
|
+ rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
|
|
|
break;
|
|
|
}
|
|
|
case type_real:
|
|
@@ -1647,7 +1725,7 @@ public:
|
|
|
rtlDataAttr unicode;
|
|
|
rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
|
|
|
size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
|
|
|
- rc = cass_collection_append_string(collection, cass_string_init2(unicode.getstr(), sizeBytes));
|
|
|
+ rc = cass_collection_append_string_n(collection, unicode.getstr(), sizeBytes);
|
|
|
break;
|
|
|
}
|
|
|
case type_utf8:
|
|
@@ -1656,7 +1734,7 @@ public:
|
|
|
size32_t numChars = * (size32_t *) inData;
|
|
|
inData += sizeof(size32_t);
|
|
|
thisSize = rtlUtf8Size(numChars, inData);
|
|
|
- rc = cass_collection_append_string(collection, cass_string_init2((const char *) inData, thisSize));
|
|
|
+ rc = cass_collection_append_string_n(collection, (const char *) inData, thisSize);
|
|
|
break;
|
|
|
}
|
|
|
case type_data:
|
|
@@ -1665,7 +1743,7 @@ public:
|
|
|
thisSize = * (size32_t *) inData;
|
|
|
inData += sizeof(size32_t);
|
|
|
}
|
|
|
- rc = cass_collection_append_bytes(collection, cass_bytes_init((const cass_byte_t*) inData, thisSize));
|
|
|
+ rc = cass_collection_append_bytes(collection, (const cass_byte_t*) inData, thisSize);
|
|
|
break;
|
|
|
}
|
|
|
checkBind(rc, name);
|
|
@@ -1697,7 +1775,7 @@ public:
|
|
|
// script should be pointing at only trailing whitespace, else it's a "missing ;" error
|
|
|
break;
|
|
|
}
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init2(script, nextScript-script), 0));
|
|
|
+ CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
|
|
|
CassandraFuture future(cass_session_execute(*session, statement));
|
|
|
future.wait("execute statement");
|
|
|
script = nextScript;
|
|
@@ -1706,7 +1784,7 @@ public:
|
|
|
else
|
|
|
{
|
|
|
// MORE - can cache this, perhaps, if script is same as last time?
|
|
|
- CassandraFuture future(cass_session_prepare(*session, cass_string_init(script)));
|
|
|
+ CassandraFuture future(cass_session_prepare(*session, script));
|
|
|
future.wait("prepare statement");
|
|
|
Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
|
|
|
if ((flags & EFnoparams) == 0)
|
|
@@ -1988,7 +2066,7 @@ void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const c
|
|
|
{
|
|
|
const char *value = parent->queryProp(name);
|
|
|
if (value)
|
|
|
- check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
|
|
|
+ check(cass_statement_bind_string(statement, idx, value));
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -2004,7 +2082,7 @@ void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const c
|
|
|
{
|
|
|
MemoryBuffer buf;
|
|
|
parent->getPropBin(name, buf);
|
|
|
- check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t*)buf.toByteArray(), buf.length())));
|
|
|
+ check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t*)buf.toByteArray(), buf.length()));
|
|
|
break;
|
|
|
}
|
|
|
case CASS_VALUE_TYPE_BOOLEAN:
|
|
@@ -2032,7 +2110,7 @@ void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const c
|
|
|
if (strcmp(name, "list1")==0)
|
|
|
check(cass_collection_append_int32(collection, items->query().getPropInt(NULL)));
|
|
|
else
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(items->query().queryProp(NULL))));
|
|
|
+ check(cass_collection_append_string(collection, items->query().queryProp(NULL)));
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
|
}
|
|
@@ -2057,8 +2135,8 @@ void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const c
|
|
|
{
|
|
|
const char *key = attrs->queryName();
|
|
|
const char *value = attrs->queryValue();
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(key+1))); // skip the @
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(value)));
|
|
|
+ check(cass_collection_append_string(collection, key+1)); // skip the @
|
|
|
+ check(cass_collection_append_string(collection, value));
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
|
}
|
|
@@ -2079,8 +2157,8 @@ void bindElement(CassStatement *statement, IPTree *parent, unsigned idx, const c
|
|
|
const char *value = item.queryProp(NULL);
|
|
|
if (key && value)
|
|
|
{
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(key)));
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(value)));
|
|
|
+ check(cass_collection_append_string(collection, key));
|
|
|
+ check(cass_collection_append_string(collection, value));
|
|
|
}
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
@@ -2104,7 +2182,7 @@ extern void cassandraToGenericXML()
|
|
|
CassandraSession session(cass_session_new());
|
|
|
CassandraFuture future(cass_session_connect_keyspace(session, cluster, "test"));
|
|
|
future.wait("connect");
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init("select * from tbl1 where name = 'name1';"), 0));
|
|
|
+ CassandraStatement statement(cass_statement_new("select * from tbl1 where name = 'name1';", 0));
|
|
|
CassandraFuture future2(cass_session_execute(session, statement));
|
|
|
future2.wait("execute");
|
|
|
CassandraResult result(cass_future_get_result(future2));
|
|
@@ -2112,8 +2190,10 @@ extern void cassandraToGenericXML()
|
|
|
UnsignedArray types;
|
|
|
for (int i = 0; i < cass_result_column_count(result); i++)
|
|
|
{
|
|
|
- CassString column = cass_result_column_name(result, i);
|
|
|
- StringBuffer name(column.length, column.data);
|
|
|
+ const char *column;
|
|
|
+ size_t length;
|
|
|
+ cass_result_column_name(result, i, &column, &length);
|
|
|
+ StringBuffer name(length, column);
|
|
|
names.append(name);
|
|
|
types.append(cass_result_column_type(result, i));
|
|
|
}
|
|
@@ -2153,7 +2233,7 @@ extern void cassandraToGenericXML()
|
|
|
ForEach(*xmlRows)
|
|
|
{
|
|
|
IPropertyTree *xmlrow = &xmlRows->query();
|
|
|
- CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), names.length()));
|
|
|
+ CassandraStatement update(cass_statement_new(insertQuery.str(), names.length()));
|
|
|
ForEachItemIn(idx, names)
|
|
|
{
|
|
|
bindElement(update, xmlrow, idx, names.item(idx), (CassValueType) types.item(idx));
|
|
@@ -2199,7 +2279,7 @@ public:
|
|
|
if (!value)
|
|
|
return false;
|
|
|
if (statement)
|
|
|
- check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
|
|
|
+ check(cass_statement_bind_string(statement, idx, value));
|
|
|
return true;
|
|
|
}
|
|
|
} stringColumnMapper;
|
|
@@ -2213,7 +2293,7 @@ public:
|
|
|
if (!value)
|
|
|
value = "";
|
|
|
if (statement)
|
|
|
- check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
|
|
|
+ check(cass_statement_bind_string(statement, idx, value));
|
|
|
return true;
|
|
|
}
|
|
|
} requiredStringColumnMapper;
|
|
@@ -2236,7 +2316,7 @@ public:
|
|
|
if (value.length())
|
|
|
{
|
|
|
if (statement)
|
|
|
- check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t *) value.toByteArray(), value.length())));
|
|
|
+ check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t *) value.toByteArray(), value.length()));
|
|
|
return true;
|
|
|
}
|
|
|
else
|
|
@@ -2276,7 +2356,7 @@ public:
|
|
|
if (statement)
|
|
|
{
|
|
|
const char *value = row->queryName();
|
|
|
- check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
|
|
|
+ check(cass_statement_bind_string(statement, idx, value));
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
@@ -2322,7 +2402,7 @@ public:
|
|
|
if (!value)
|
|
|
return false;
|
|
|
if (statement)
|
|
|
- check(cass_statement_bind_string(statement, idx, cass_string_init(value)));
|
|
|
+ check(cass_statement_bind_string(statement, idx, value));
|
|
|
return true;
|
|
|
}
|
|
|
} graphIdColumnMapper;
|
|
@@ -2347,7 +2427,7 @@ public:
|
|
|
if (value.length())
|
|
|
{
|
|
|
if (statement)
|
|
|
- check(cass_statement_bind_bytes(statement, idx, cass_bytes_init((const cass_byte_t *) value.str(), value.length())));
|
|
|
+ check(cass_statement_bind_bytes(statement, idx, (const cass_byte_t *) value.str(), value.length()));
|
|
|
return true;
|
|
|
}
|
|
|
else
|
|
@@ -2499,8 +2579,8 @@ public:
|
|
|
const char *value = item.queryProp(NULL);
|
|
|
if (key && value)
|
|
|
{
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(key)));
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(value)));
|
|
|
+ check(cass_collection_append_string(collection, key));
|
|
|
+ check(cass_collection_append_string(collection, value));
|
|
|
}
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
@@ -2553,8 +2633,8 @@ public:
|
|
|
if (strstr(name, key) == NULL)
|
|
|
{
|
|
|
const char *value = attrs->queryValue();
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(attrs->queryName()+1))); // skip the @
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(value)));
|
|
|
+ check(cass_collection_append_string(collection, attrs->queryName()+1)); // skip the @
|
|
|
+ check(cass_collection_append_string(collection, value));
|
|
|
}
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
@@ -2626,8 +2706,8 @@ public:
|
|
|
::toXML(&item, value, 0, 0);
|
|
|
if (key && value.length())
|
|
|
{
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(key)));
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(value)));
|
|
|
+ check(cass_collection_append_string(collection, key));
|
|
|
+ check(cass_collection_append_string(collection, value));
|
|
|
}
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
@@ -2700,7 +2780,7 @@ public:
|
|
|
IPTree &item = items->query();
|
|
|
const char *value = item.queryProp(nameAttr);
|
|
|
if (value)
|
|
|
- check(cass_collection_append_string(collection, cass_string_init(value)));
|
|
|
+ check(cass_collection_append_string(collection, value));
|
|
|
}
|
|
|
check(cass_statement_bind_collection(statement, idx, collection));
|
|
|
}
|
|
@@ -2981,7 +3061,7 @@ const CassResult *fetchDataForKey(const char *key, CassSession *session, const C
|
|
|
selectQuery.append(';');
|
|
|
//if (traceLevel >= 2)
|
|
|
// DBGLOG("%s", selectQuery.str());
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
|
|
|
+ CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|
|
|
return executeQuery(session, statement);
|
|
|
}
|
|
|
|
|
@@ -2994,7 +3074,7 @@ const CassResult *fetchDataForKeyAndWuid(const char *key, const char *wuid, Cass
|
|
|
selectQuery.append(';');
|
|
|
//if (traceLevel >= 2)
|
|
|
// DBGLOG("%s", selectQuery.str());
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init(selectQuery.str()), 0));
|
|
|
+ CassandraStatement statement(cass_statement_new(selectQuery.str(), 0));
|
|
|
return executeQuery(session, statement);
|
|
|
}
|
|
|
|
|
@@ -3008,8 +3088,8 @@ void deleteSecondaryByKey(const CassandraXmlMapping *mappings, const char *wuid,
|
|
|
VStringBuffer insertQuery("DELETE from %s where %s=? and wuid=?;", tableName.str(), mappings[0].columnName);
|
|
|
Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(insertQuery);
|
|
|
CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(update, 0, cass_string_init(key)));
|
|
|
- check(cass_statement_bind_string(update, 1, cass_string_init(wuid)));
|
|
|
+ check(cass_statement_bind_string(update, 0, key));
|
|
|
+ check(cass_statement_bind_string(update, 1, wuid));
|
|
|
check(cass_batch_add_statement(batch, update));
|
|
|
}
|
|
|
}
|
|
@@ -3022,13 +3102,13 @@ void deleteChildByWuid(const CassandraXmlMapping *mappings, const char *wuid, co
|
|
|
VStringBuffer insertQuery("DELETE from %s where wuid=?;", tableName.str());
|
|
|
Owned<CassandraPrepared> prepared = sessionCache->prepareStatement(insertQuery);
|
|
|
CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
|
|
|
+ check(cass_statement_bind_string(update, 0, wuid));
|
|
|
check(cass_batch_add_statement(batch, update));
|
|
|
}
|
|
|
|
|
|
void executeSimpleCommand(CassSession *session, const char *command)
|
|
|
{
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init(command), 0));
|
|
|
+ CassandraStatement statement(cass_statement_new(command, 0));
|
|
|
CassandraFuture future(cass_session_execute(session, statement));
|
|
|
future.wait("execute");
|
|
|
}
|
|
@@ -3072,7 +3152,7 @@ extern void childXMLtoCassandra(const ICassandraSession *session, CassBatch *bat
|
|
|
VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
|
|
|
Owned<CassandraPrepared> prepared = session->prepareStatement(insertQuery);
|
|
|
CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
|
|
|
+ check(cass_statement_bind_string(update, 0, wuid));
|
|
|
unsigned bindidx = 1; // We already bound wuid
|
|
|
unsigned colidx = 1; // We already bound wuid
|
|
|
while (mappings[colidx].columnName)
|
|
@@ -3192,7 +3272,7 @@ extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
|
|
|
VStringBuffer insertQuery("INSERT into %s (%s) values (%s);", tableName.str(), names.str()+1, bindings.str()+1);
|
|
|
DBGLOG("%s", insertQuery.str());
|
|
|
CassandraBatch batch(cass_batch_new(CASS_BATCH_TYPE_UNLOGGED));
|
|
|
- CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(insertQuery)));
|
|
|
+ CassandraFuture futurePrep(cass_session_prepare(session, insertQuery));
|
|
|
futurePrep.wait("prepare statement");
|
|
|
CassandraPrepared prepared(cass_future_get_prepared(futurePrep));
|
|
|
|
|
@@ -3216,7 +3296,7 @@ extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
|
|
|
check(cass_batch_add_statement(batch, update));
|
|
|
}
|
|
|
// And one more with subgraphid = 0 for the graph status
|
|
|
- CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), bindings.length()/2));
|
|
|
+ CassandraStatement update(cass_statement_new(insertQuery.str(), bindings.length()/2));
|
|
|
graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
|
|
|
graphProgressMappings[1].mapper.fromXML(update, 1, &graph, graphProgressMappings[1].xpath);
|
|
|
check(cass_statement_bind_int64(update, 3, 0)); // subgraphId can't be null, as it's in the key
|
|
@@ -3231,7 +3311,7 @@ extern void graphProgressXMLtoCassandra(CassSession *session, IPTree *inXML)
|
|
|
if (inXML->hasProp("Running"))
|
|
|
{
|
|
|
IPTree *running = inXML->queryPropTree("Running");
|
|
|
- CassandraStatement update(cass_statement_new(cass_string_init(insertQuery.str()), bindings.length()/2));
|
|
|
+ CassandraStatement update(cass_statement_new(insertQuery.str(), bindings.length()/2));
|
|
|
graphProgressMappings[0].mapper.fromXML(update, 0, inXML, graphProgressMappings[0].xpath);
|
|
|
graphProgressMappings[1].mapper.fromXML(update, 1, running, graphProgressMappings[1].xpath);
|
|
|
graphProgressMappings[2].mapper.fromXML(update, 2, running, graphProgressMappings[2].xpath);
|
|
@@ -3304,9 +3384,10 @@ static const CassValue *getSingleResult(const CassResult *result)
|
|
|
|
|
|
static StringBuffer &getCassString(StringBuffer &str, const CassValue *value)
|
|
|
{
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- return str.append(output.length, output.data);
|
|
|
+ const char *output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ return str.append(length, output);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -3366,7 +3447,7 @@ public:
|
|
|
deleteSecondaries(wuid);
|
|
|
Owned<CassandraPrepared> prepared = sessionCache->prepareStatement("DELETE from workunits where wuid=?;");
|
|
|
CassandraStatement update(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(update, 0, cass_string_init(wuid)));
|
|
|
+ check(cass_statement_bind_string(update, 0, wuid));
|
|
|
check(cass_batch_add_statement(*batch, update));
|
|
|
CassandraFuture futureBatch(cass_session_execute_batch(sessionCache->querySession(), *batch));
|
|
|
futureBatch.wait("execute");
|
|
@@ -3688,13 +3769,12 @@ public:
|
|
|
}
|
|
|
}
|
|
|
CassandraStatement statement(cass_prepared_bind(*prepared));
|
|
|
- check(cass_statement_bind_string(statement, 0, cass_string_init(useWuid.str())));
|
|
|
+ check(cass_statement_bind_string(statement, 0, useWuid.str()));
|
|
|
if (traceLevel >= 2)
|
|
|
DBGLOG("Try creating %s", useWuid.str());
|
|
|
CassandraFuture future(cass_session_execute(session, statement));
|
|
|
future.wait("execute");
|
|
|
CassandraResult result(cass_future_get_result(future));
|
|
|
- CassString columnName = cass_result_column_name(result, 0);
|
|
|
if (cass_result_column_count(result)==1)
|
|
|
{
|
|
|
// A single column result indicates success, - the single column should be called '[applied]' and have the value 'true'
|
|
@@ -3762,7 +3842,7 @@ public:
|
|
|
virtual WUState waitForWorkUnit(const char * wuid, unsigned timeout, bool compiled, bool returnOnWaitState)
|
|
|
{
|
|
|
VStringBuffer select("select state from workunits where wuid = '%s';", wuid);
|
|
|
- CassandraStatement statement(cass_statement_new(cass_string_init(select.str()), 0));
|
|
|
+ CassandraStatement statement(cass_statement_new(select.str(), 0));
|
|
|
unsigned start = msTick();
|
|
|
loop
|
|
|
{
|
|
@@ -3772,9 +3852,10 @@ public:
|
|
|
const CassValue *value = getSingleResult(result);
|
|
|
if (value == NULL)
|
|
|
return WUStateUnknown;
|
|
|
- CassString output;
|
|
|
- check(cass_value_get_string(value, &output));
|
|
|
- StringBuffer stateStr(output.length, output.data);
|
|
|
+ const char *output;
|
|
|
+ size_t length;
|
|
|
+ check(cass_value_get_string(value, &output, &length));
|
|
|
+ StringBuffer stateStr(length, output);
|
|
|
WUState state = getWorkUnitState(stateStr);
|
|
|
switch (state)
|
|
|
{
|
|
@@ -3889,7 +3970,7 @@ public:
|
|
|
// We don't want to block cache lookups while we prepare a new bound statement
|
|
|
// Note - if multiple threads try to prepare the same (new) statement at the same time, it's not catastrophic
|
|
|
CriticalUnblock b(cacheCrit);
|
|
|
- CassandraFuture futurePrep(cass_session_prepare(session, cass_string_init(query)));
|
|
|
+ CassandraFuture futurePrep(cass_session_prepare(session, query));
|
|
|
futurePrep.wait("prepare statement");
|
|
|
cached.setown(new CassandraPrepared(cass_future_get_prepared(futurePrep)));
|
|
|
}
|
|
@@ -3907,7 +3988,7 @@ private:
|
|
|
{
|
|
|
Owned<CassandraPrepared> prepared = prepareStatement("SELECT COUNT(*) FROM workunits where wuid=?;");
|
|
|
CassandraStatement statement(cass_prepared_bind(*prepared));
|
|
|
- cass_statement_bind_string(statement, 0, cass_string_init(wuid));
|
|
|
+ cass_statement_bind_string(statement, 0, wuid);
|
|
|
CassandraFuture future(cass_session_execute(session, statement));
|
|
|
future.wait("select count(*)");
|
|
|
CassandraResult result(cass_future_get_result(future));
|