瀏覽代碼

Merge pull request #7436 from richardkchapman/cassandra-2.0

HPCC-13723 Update Datastax Cassandra library to version 2.0

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 10 年之前
父節點
當前提交
82075237de
共有 2 個文件被更改,包括 145 次插入67 次删除
  1. 144 66
      plugins/cassandra/cassandraembed.cpp
  2. 1 1
      plugins/cassandra/cpp-driver

+ 144 - 66
plugins/cassandra/cassandraembed.cpp

@@ -70,6 +70,18 @@ extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
 
 namespace cassandraembed {
 
+static void logCallBack(const CassLogMessage *message, void *data)
+{
+    DBGLOG("cassandra: %s - %s", cass_log_level_string(message->severity), message->message);
+}
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    cass_log_set_callback(logCallBack, NULL);
+    cass_log_set_level(CASS_LOG_WARN);
+    return true;
+}
+
 static void failx(const char *msg, ...) __attribute__((noreturn))  __attribute__((format(printf, 1, 2)));
 static void fail(const char *msg) __attribute__((noreturn));
 
@@ -133,8 +145,10 @@ public:
         CassError rc = cass_future_error_code(future);
         if(rc != CASS_OK)
         {
-            CassString message = cass_future_error_message(future);
-            VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int)message.length, message.data);
+            const char *message;
+            size_t length;
+            cass_future_error_message(future, &message, &length);
+            VStringBuffer err("cassandra: failed to %s (%.*s)", why, (int) length, message);
             rtlFail(0, err.str());
         }
     }
@@ -487,9 +501,10 @@ static void getDataResult(const RtlFieldInfo *field, const CassValue *value, siz
     // it seems like it could be more useful to support anything
     // if (cass_value_type(value) != CASS_VALUE_TYPE_BLOB)
     //     typeError("blob", value, field);
-    CassBytes bytes;
-    check(cass_value_get_bytes(value, &bytes));
-    rtlStrToDataX(chars, result, bytes.size, bytes.data);
+    const cass_byte_t *bytes;
+    size_t size;
+    check(cass_value_get_bytes(value, &bytes, &size));
+    rtlStrToDataX(chars, result, size, bytes);
 }
 
 static __int64 getSignedResult(const RtlFieldInfo *field, const CassValue *value);
@@ -574,22 +589,20 @@ static void getStringResult(const RtlFieldInfo *field, const CassValue *value, s
     {
     case CASS_VALUE_TYPE_ASCII:
     {
-        CassString output;
-        check(cass_value_get_string(value, &output));
-        const char *text = output.data;
-        unsigned long bytes = output.length;
-        rtlStrToStrX(chars, result, bytes, text);
+        const char *output;
+        size_t length;
+        check(cass_value_get_string(value, &output, &length));
+        rtlStrToStrX(chars, result, length, output);
         break;
     }
     case CASS_VALUE_TYPE_VARCHAR:
     case CASS_VALUE_TYPE_TEXT:
     {
-        CassString output;
-        check(cass_value_get_string(value, &output));
-        const char *text = output.data;
-        unsigned long bytes = output.length;
-        unsigned numchars = rtlUtf8Length(bytes, text);
-        rtlUtf8ToStrX(chars, result, numchars, text);
+        const char *output;
+        size_t length;
+        check(cass_value_get_string(value, &output, &length));
+        unsigned numchars = rtlUtf8Length(length, output);
+        rtlUtf8ToStrX(chars, result, numchars, output);
         break;
     }
     default:
@@ -609,22 +622,20 @@ static void getUTF8Result(const RtlFieldInfo *field, const CassValue *value, siz
     {
     case CASS_VALUE_TYPE_ASCII:
     {
-        CassString output;
-        check(cass_value_get_string(value, &output));
-        const char *text = output.data;
-        unsigned long bytes = output.length;
-        rtlStrToUtf8X(chars, result, bytes, text);
+        const char *output;
+        size_t length;
+        check(cass_value_get_string(value, &output, &length));
+        rtlStrToUtf8X(chars, result, length, output);
         break;
     }
     case CASS_VALUE_TYPE_VARCHAR:
     case CASS_VALUE_TYPE_TEXT:
     {
-        CassString output;
-        check(cass_value_get_string(value, &output));
-        const char *text = output.data;
-        unsigned long bytes = output.length;
-        unsigned numchars = rtlUtf8Length(bytes, text);
-        rtlUtf8ToUtf8X(chars, result, numchars, text);
+        const char * output;
+        size_t length;
+        check(cass_value_get_string(value, &output, &length));
+        unsigned numchars = rtlUtf8Length(length, output);
+        rtlUtf8ToUtf8X(chars, result, numchars, output);
         break;
     }
     default:
@@ -644,22 +655,20 @@ static void getUnicodeResult(const RtlFieldInfo *field, const CassValue *value,
     {
     case CASS_VALUE_TYPE_ASCII:
     {
-        CassString output;
-        check(cass_value_get_string(value, &output));
-        const char *text = output.data;
-        unsigned long bytes = output.length;
-        rtlStrToUnicodeX(chars, result, bytes, text);
+        const char * output;
+        size_t length;
+        check(cass_value_get_string(value, &output, &length));
+        rtlStrToUnicodeX(chars, result, length, output);
         break;
     }
     case CASS_VALUE_TYPE_VARCHAR:
     case CASS_VALUE_TYPE_TEXT:
     {
-        CassString output;
-        check(cass_value_get_string(value, &output));
-        const char *text = output.data;
-        unsigned long bytes = output.length;
-        unsigned numchars = rtlUtf8Length(bytes, text);
-        rtlUtf8ToUnicodeX(chars, result, numchars, text);
+        const char * output;
+        size_t length;
+        check(cass_value_get_string(value, &output, &length));
+        unsigned numchars = rtlUtf8Length(length, output);
+        rtlUtf8ToUnicodeX(chars, result, numchars, output);
         break;
     }
     default:
@@ -845,13 +854,12 @@ public:
         rtlDataAttr utfText;
         rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
         if (collection)
-            checkBind(cass_collection_append_string(*collection,
-                                                    cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+            checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
                       field);
         else
-            checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
+            checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
                                                  checkNextParam(field),
-                                                 cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                                                 utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
                       field);
     }
     virtual void processBool(bool value, const RtlFieldInfo * field)
@@ -864,9 +872,9 @@ public:
     virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
     {
         if (collection)
-            checkBind(cass_collection_append_bytes(*collection, cass_bytes_init((const cass_byte_t*) value, len)), field);
+            checkBind(cass_collection_append_bytes(*collection, (const cass_byte_t*) value, len), field);
         else
-            checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
+            checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), (const cass_byte_t*) value, len), field);
     }
     virtual void processInt(__int64 value, const RtlFieldInfo * field)
     {
@@ -925,13 +933,12 @@ public:
         rtlDataAttr utfText;
         rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
         if (collection)
-            checkBind(cass_collection_append_string(*collection,
-                                                    cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+            checkBind(cass_collection_append_string_n(*collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
                       field);
         else
-            checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
-                                                 checkNextParam(field),
-                                                 cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+            checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
+                                                   checkNextParam(field),
+                                                   utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
                       field);
     }
     virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
@@ -944,9 +951,9 @@ public:
     virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
     {
         if (collection)
-            checkBind(cass_collection_append_string(*collection, cass_string_init2(value, rtlUtf8Size(chars, value))), field);
+            checkBind(cass_collection_append_string_n(*collection, value, rtlUtf8Size(chars, value)), field);
         else
-            checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
+            checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(field), value, rtlUtf8Size(chars, value)), field);
     }
 
     virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
@@ -1207,6 +1214,60 @@ public:
                     unsigned request_timeout = getUnsignedOption(val, "request_timeout");
                     cass_cluster_set_request_timeout(*cluster, request_timeout);
                 }
+                else if (stricmp(optName, "load_balance_round_robin")==0)
+                {
+                    cass_bool_t enable = getBoolOption(val, "load_balance_round_robin");
+                    if (enable==cass_true)
+                        cass_cluster_set_load_balance_round_robin(*cluster);
+                }
+                else if (stricmp(optName, "load_balance_dc_aware")==0)
+                {
+                    StringArray lbargs;
+                    lbargs.appendList(val, "|");
+                    if (lbargs.length() != 3)
+                        failx("Invalid value '%s' for option %s - expected 3 subvalues (separate with |)", val, optName.str());
+                    unsigned usedPerRemote = getUnsignedOption(lbargs.item(2), "load_balance_dc_aware");
+                    cass_bool_t allowRemote = getBoolOption(lbargs.item(2), "load_balance_dc_aware");
+                    checkSetOption(cass_cluster_set_load_balance_dc_aware(*cluster, lbargs.item(0), usedPerRemote, allowRemote), "load_balance_dc_aware");
+                }
+                else if (stricmp(optName, "token_aware_routing")==0)
+                {
+                    cass_bool_t enable = getBoolOption(val, "token_aware_routing");
+                    cass_cluster_set_token_aware_routing(*cluster, enable);
+                }
+                else if (stricmp(optName, "latency_aware_routing")==0)
+                {
+                    cass_bool_t enable = getBoolOption(val, "latency_aware_routing");
+                    cass_cluster_set_latency_aware_routing(*cluster, enable);
+                }
+                else if (stricmp(optName, "latency_aware_routing_settings")==0)
+                {
+                    StringArray subargs;
+                    subargs.appendList(val, "|");
+                    if (subargs.length() != 5)
+                        failx("Invalid value '%s' for option %s - expected 5 subvalues (separate with |)", val, optName.str());
+                    cass_double_t exclusion_threshold = getDoubleOption(subargs.item(0), "exclusion_threshold");
+                    cass_uint64_t scale_ms = getUnsigned64Option(subargs.item(1), "scale_ms");
+                    cass_uint64_t retry_period_ms = getUnsigned64Option(subargs.item(2), "retry_period_ms");
+                    cass_uint64_t update_rate_ms = getUnsigned64Option(subargs.item(3), "update_rate_ms");
+                    cass_uint64_t min_measured = getUnsigned64Option(subargs.item(4), "min_measured");
+                    cass_cluster_set_latency_aware_routing_settings(*cluster, exclusion_threshold, scale_ms, retry_period_ms, update_rate_ms, min_measured);
+                }
+                else if (stricmp(optName, "tcp_nodelay")==0)
+                {
+                    cass_bool_t enable = getBoolOption(val, "tcp_nodelay");
+                    cass_cluster_set_tcp_nodelay(*cluster, enable);
+                }
+                else if (stricmp(optName, "tcp_keepalive")==0)
+                {
+                    StringArray subargs;
+                    subargs.appendList(val, "|");
+                    if (subargs.length() != 2)
+                        failx("Invalid value '%s' for option %s - expected 2 subvalues (separate with |)", val, optName.str());
+                    cass_bool_t enabled = getBoolOption(subargs.item(0), "enabled");
+                    unsigned delay_secs = getUnsignedOption(subargs.item(0), "delay_secs");
+                    cass_cluster_set_tcp_keepalive(*cluster, enabled, delay_secs);
+                }
                 else
                     failx("Unrecognized option %s", optName.str());
             }
@@ -1434,7 +1495,7 @@ public:
     }
     virtual void bindDataParam(const char *name, size32_t len, const void *val)
     {
-        checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), cass_bytes_init((const cass_byte_t*) val, len)), name);
+        checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(name), (const cass_byte_t*) val, len), name);
     }
     virtual void bindFloatParam(const char *name, float val)
     {
@@ -1468,9 +1529,9 @@ public:
         size32_t utf8chars;
         rtlDataAttr utfText;
         rtlStrToUtf8X(utf8chars, utfText.refstr(), len, val);
-        checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
+        checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
                                              checkNextParam(name),
-                                             cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                                             utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
                   name);
     }
     virtual void bindVStringParam(const char *name, const char *val)
@@ -1479,16 +1540,16 @@ public:
     }
     virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
     {
-        checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(name), cass_string_init2(val, rtlUtf8Size(chars, val))), name);
+        checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(), checkNextParam(name), val, rtlUtf8Size(chars, val)), name);
     }
     virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
     {
         size32_t utf8chars;
         rtlDataAttr utfText;
         rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, val);
-        checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
+        checkBind(cass_statement_bind_string_n(stmtInfo->queryStatement(),
                                                  checkNextParam(name),
-                                                 cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                                                 utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())),
                   name);
     }
     virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
@@ -1555,7 +1616,7 @@ public:
                 size32_t utf8chars;
                 rtlDataAttr utfText;
                 rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
-                rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
+                rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
                 break;
             }
             case type_string:
@@ -1568,7 +1629,7 @@ public:
                 size32_t utf8chars;
                 rtlDataAttr utfText;
                 rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
-                rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
+                rc = cass_collection_append_string_n(collection, utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()));
                 break;
             }
             case type_real:
@@ -1592,7 +1653,7 @@ public:
                 rtlDataAttr unicode;
                 rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
                 size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
-                rc = cass_collection_append_string(collection, cass_string_init2(unicode.getstr(), sizeBytes));
+                rc = cass_collection_append_string_n(collection, unicode.getstr(), sizeBytes);
                 break;
             }
             case type_utf8:
@@ -1601,7 +1662,7 @@ public:
                 size32_t numChars = * (size32_t *) inData;
                 inData += sizeof(size32_t);
                 thisSize = rtlUtf8Size(numChars, inData);
-                rc = cass_collection_append_string(collection, cass_string_init2((const char *) inData, thisSize));
+                rc = cass_collection_append_string_n(collection, (const char *) inData, thisSize);
                 break;
             }
             case type_data:
@@ -1610,7 +1671,7 @@ public:
                     thisSize = * (size32_t *) inData;
                     inData += sizeof(size32_t);
                 }
-                rc = cass_collection_append_bytes(collection, cass_bytes_init((const cass_byte_t*) inData, thisSize));
+                rc = cass_collection_append_bytes(collection, (const cass_byte_t*) inData, thisSize);
                 break;
             }
             checkBind(rc, name);
@@ -1642,7 +1703,7 @@ public:
                     // script should be pointing at only trailing whitespace, else it's a "missing ;" error
                     break;
                 }
-                CassandraStatement statement(cass_statement_new(cass_string_init2(script, nextScript-script), 0));
+                CassandraStatement statement(cass_statement_new_n(script, nextScript-script, 0));
                 CassandraFuture future(cass_session_execute(*session, statement));
                 future.wait("execute statement");
                 script = nextScript;
@@ -1651,7 +1712,7 @@ public:
         else
         {
             // MORE - can cache this, perhaps, if script is same as last time?
-            CassandraFuture future(cass_session_prepare(*session, cass_string_init(script)));
+            CassandraFuture future(cass_session_prepare(*session, script));
             future.wait("prepare statement");
             Owned<CassandraPrepared> prepared = new CassandraPrepared(cass_future_get_prepared(future));
             if ((flags & EFnoparams) == 0)
@@ -1783,13 +1844,30 @@ protected:
             failx("While setting option %s: %s", name, cass_error_desc(rc));
         }
     }
+    cass_bool_t getBoolOption(const char *val, const char *option)
+    {
+        return strToBool(val) ? cass_true : cass_false;
+    }
     unsigned getUnsignedOption(const char *val, const char *option)
     {
         char *endp;
         long value = strtoul(val, &endp, 0);
-        if (endp==val || *endp != '\0' || value > INT_MAX || value < INT_MIN)
+        if (endp==val || *endp != '\0' || value > UINT_MAX || value < 0)
             failx("Invalid value '%s' for option %s", val, option);
-        return (int) value;
+        return (unsigned) value;
+    }
+    unsigned getDoubleOption(const char *val, const char *option)
+    {
+        char *endp;
+        double value = strtod(val, &endp);
+        if (endp==val || *endp != '\0')
+            failx("Invalid value '%s' for option %s", val, option);
+        return value;
+    }
+    __uint64 getUnsigned64Option(const char *val, const char *option)
+    {
+        // MORE - could check it's all digits (with optional leading spaces...), if we cared.
+        return rtlVStrToUInt8(val);
     }
     Owned<CassandraCluster> cluster;
     Owned<CassandraSession> session;

+ 1 - 1
plugins/cassandra/cpp-driver

@@ -1 +1 @@
-Subproject commit 189e46e3386cc2be0fd7428c1072339b9c611d6e
+Subproject commit 4c009fc8e56177173f11a738802be5e6b98bccb9