Procházet zdrojové kódy

HPCC-10617 Add support for Cassandra

Added support for SET parameters and dataset fields

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 11 roky
rodič
revize
c858701299

+ 419 - 35
plugins/cassandra/cassandraembed.cpp

@@ -266,6 +266,26 @@ private:
     CassIterator *iterator;
 };
 
+class CassandraCollection : public CInterface
+{
+public:
+    CassandraCollection(CassCollection *_collection) : collection(_collection)
+    {
+    }
+    ~CassandraCollection()
+    {
+        if (collection)
+            cass_collection_free(collection);
+    }
+    inline operator CassCollection *() const
+    {
+        return collection;
+    }
+private:
+    CassandraCollection(const CassandraCollection &);
+    CassCollection *collection;
+};
+
 void check(CassError rc)
 {
     if (rc != CASS_OK)
@@ -419,6 +439,26 @@ static bool isInteger(const CassValue *value)
     }
 }
 
+// when extracting elements of a set, field will point at the SET info- we want to get the typeInfo for the element type
+static const RtlTypeInfo *getFieldBaseType(const RtlFieldInfo *field)
+{
+   const RtlTypeInfo *type = field->type;
+   if ((type->fieldType & RFTMkind) == type_set)
+       return type->queryChildType();
+   else
+       return type;
+}
+
+static int getNumFields(const RtlTypeInfo *record)
+{
+    int count = 0;
+    const RtlFieldInfo * const *fields = record->queryFields();
+    assertex(fields);
+    while (*fields++)
+        count++;
+    return count;
+}
+
 static bool getBooleanResult(const RtlFieldInfo *field, const CassValue *value)
 {
     if (cass_value_is_null(value))
@@ -626,7 +666,7 @@ class CassandraRowBuilder : public CInterfaceOf<IFieldSource>
 {
 public:
     CassandraRowBuilder(const CassandraStatementInfo *_stmtInfo)
-    : stmtInfo(_stmtInfo), colIdx(0)
+    : stmtInfo(_stmtInfo), colIdx(0),numIteratorFields(0),nextIteratedField(0)
     {
     }
     virtual bool getBooleanResult(const RtlFieldInfo *field)
@@ -668,30 +708,48 @@ public:
 
     virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
     {
-        UNSUPPORTED("SET fields");
+        isAll = false;
+        iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
     }
     virtual bool processNextSet(const RtlFieldInfo * field)
     {
-        throwUnexpected();
+        numIteratorFields = 1;
+        return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list)
+        // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
     }
     virtual void processBeginDataset(const RtlFieldInfo * field)
     {
-        UNSUPPORTED("Nested datasets");
+        numIteratorFields = getNumFields(field->type->queryChildType());
+        switch (numIteratorFields)
+        {
+        case 1:
+            iterator.setown(new CassandraIterator(cass_iterator_from_collection(nextField(field))));
+            break;
+        case 2:
+            iterator.setown(new CassandraIterator(cass_iterator_from_map(nextField(field))));
+            break;
+        default:
+            UNSUPPORTED("Nested datasets with > 2 fields");
+        }
     }
     virtual void processBeginRow(const RtlFieldInfo * field)
     {
     }
     virtual bool processNextRow(const RtlFieldInfo * field)
     {
-        throwUnexpected();
+        nextIteratedField = 0;
+        return *iterator && cass_iterator_next(*iterator); // If field was NULL, we'll have a NULL iterator (representing an empty set/list/map)
+        // Can't distinguish empty set from NULL field, so assume the former (rather than trying to deliver the default value for the set field)
     }
     virtual void processEndSet(const RtlFieldInfo * field)
     {
-        throwUnexpected();
+        iterator.clear();
+        numIteratorFields = 0;
     }
     virtual void processEndDataset(const RtlFieldInfo * field)
     {
-        throwUnexpected();
+        iterator.clear();
+        numIteratorFields = 0;
     }
     virtual void processEndRow(const RtlFieldInfo * field)
     {
@@ -699,13 +757,36 @@ public:
 protected:
     const CassValue *nextField(const RtlFieldInfo * field)
     {
-        const CassValue *ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
+        const CassValue *ret;
+        if (iterator)
+        {
+            switch (numIteratorFields)
+            {
+            case 1:
+                ret = cass_iterator_get_value(*iterator);
+                break;
+            case 2:
+                if (nextIteratedField==0)
+                    ret = cass_iterator_get_map_key(*iterator);
+                else
+                    ret = cass_iterator_get_map_value(*iterator);
+                nextIteratedField++;
+                break;
+            default:
+                throwUnexpected();
+            }
+        }
+        else
+            ret = cass_row_get_column(stmtInfo->queryRow(), colIdx++);
         if (!ret)
             failx("Too many fields in ECL output row, reading field %s", field->name->getAtomNamePtr());
         return ret;
     }
     const CassandraStatementInfo *stmtInfo;
+    Owned<CassandraIterator> iterator;
     int colIdx;
+    int numIteratorFields;
+    int nextIteratedField;
 };
 
 // Bind Cassandra columns from an ECL record
@@ -736,25 +817,46 @@ public:
         size32_t utf8chars;
         rtlDataAttr utfText;
         rtlStrToUtf8X(utf8chars, utfText.refstr(), len, value);
-        checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
-                                             checkNextParam(field),
-                                             cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
-                  field);
+        if (collection)
+            checkBind(cass_collection_append_string(*collection,
+                                                 cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                      field);
+        else
+            checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
+                                                 checkNextParam(field),
+                                                 cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                      field);
     }
     virtual void processBool(bool value, const RtlFieldInfo * field)
     {
-        checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
+        if (collection)
+            checkBind(cass_collection_append_bool(*collection, value ? cass_true : cass_false), field);
+        else
+            checkBind(cass_statement_bind_bool(stmtInfo->queryStatement(), checkNextParam(field), value ? cass_true : cass_false), field);
     }
     virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
     {
-        checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
+        if (collection)
+            checkBind(cass_collection_append_bytes(*collection, cass_bytes_init((const cass_byte_t*) value, len)), field);
+        else
+            checkBind(cass_statement_bind_bytes(stmtInfo->queryStatement(), checkNextParam(field), cass_bytes_init((const cass_byte_t*) value, len)), field);
     }
     virtual void processInt(__int64 value, const RtlFieldInfo * field)
     {
-        if (field->size(NULL,NULL)>4)
-            checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
+        if (getFieldBaseType(field)->size(NULL,NULL)>4)
+        {
+            if (collection)
+                checkBind(cass_collection_append_int64(*collection, value), field);
+            else
+                checkBind(cass_statement_bind_int64(stmtInfo->queryStatement(), checkNextParam(field), value), field);
+        }
         else
-            checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
+        {
+            if (collection)
+                checkBind(cass_collection_append_int32(*collection, value), field);
+            else
+                checkBind(cass_statement_bind_int32(stmtInfo->queryStatement(), checkNextParam(field), value), field);
+        }
     }
     virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
     {
@@ -762,10 +864,20 @@ public:
     }
     virtual void processReal(double value, const RtlFieldInfo * field)
     {
-        if (field->size(NULL,NULL)>4)
-            checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
+        if (getFieldBaseType(field)->size(NULL,NULL)>4)
+        {
+            if (collection)
+                checkBind(cass_collection_append_double(*collection, value), field);
+            else
+                checkBind(cass_statement_bind_double(stmtInfo->queryStatement(), checkNextParam(field), value), field);
+        }
         else
-            checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
+        {
+            if (collection)
+                checkBind(cass_collection_append_float(*collection, (float) value), field);
+            else
+                checkBind(cass_statement_bind_float(stmtInfo->queryStatement(), checkNextParam(field), (float) value), field);
+        }
     }
     virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
     {
@@ -785,10 +897,15 @@ public:
         size32_t utf8chars;
         rtlDataAttr utfText;
         rtlUnicodeToUtf8X(utf8chars, utfText.refstr(), chars, value);
-        checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
-                                                 checkNextParam(field),
-                                                 cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
-                  field);
+        if (collection)
+            checkBind(cass_collection_append_string(*collection,
+                                                        cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                      field);
+        else
+            checkBind(cass_statement_bind_string(stmtInfo->queryStatement(),
+                                                     checkNextParam(field),
+                                                     cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr()))),
+                      field);
     }
     virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
     {
@@ -799,16 +916,31 @@ public:
     }
     virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo * field)
     {
-        checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
+        if (collection)
+            checkBind(cass_collection_append_string(*collection, cass_string_init2(value, rtlUtf8Size(chars, value))), field);
+        else
+            checkBind(cass_statement_bind_string(stmtInfo->queryStatement(), checkNextParam(field), cass_string_init2(value, rtlUtf8Size(chars, value))), field);
     }
 
     virtual bool processBeginSet(const RtlFieldInfo * field, unsigned numElements, bool isAll, const byte *data)
     {
-        UNSUPPORTED("SET fields");
+        if (isAll)
+            UNSUPPORTED("SET(ALL)");
+        collection.setown(new CassandraCollection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElements)));
+        return true;
     }
     virtual bool processBeginDataset(const RtlFieldInfo * field, unsigned numRows)
     {
-        UNSUPPORTED("Nested datasets");
+        // If there's a single field, assume we are mapping to a SET/LIST
+        // If there are two, assume it's a MAP
+        // Otherwise, fail
+        int numFields = getNumFields(field->type->queryChildType());
+        if (numFields < 1 || numFields > 2)
+        {
+            UNSUPPORTED("Nested datasets with > 2 fields");
+        }
+        collection.setown(new CassandraCollection(cass_collection_new(numFields==1 ? CASS_COLLECTION_TYPE_SET : CASS_COLLECTION_TYPE_MAP, numRows)));
+        return true;
     }
     virtual bool processBeginRow(const RtlFieldInfo * field)
     {
@@ -816,11 +948,13 @@ public:
     }
     virtual void processEndSet(const RtlFieldInfo * field)
     {
-        throwUnexpected();
+        checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
+        collection.clear();
     }
     virtual void processEndDataset(const RtlFieldInfo * field)
     {
-        throwUnexpected();
+        checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(), checkNextParam(field), *collection), field);
+        collection.clear();
     }
     virtual void processEndRow(const RtlFieldInfo * field)
     {
@@ -828,6 +962,7 @@ public:
 protected:
     inline unsigned checkNextParam(const RtlFieldInfo * field)
     {
+        DBGLOG("Binding %s to %d", field->name->str(), thisParam);
         return thisParam++;
     }
     inline void checkBind(CassError rc, const RtlFieldInfo * field)
@@ -839,6 +974,7 @@ protected:
     }
     const RtlTypeInfo *typeInfo;
     const CassandraStatementInfo *stmtInfo;
+    Owned<CassandraCollection> collection;
     int firstParam;
     RtlFieldStrInfo dummyField;
     int thisParam;
@@ -1111,7 +1247,118 @@ public:
     }
     virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int elemType, size32_t elemSize)
     {
-        UNSUPPORTED("SET results");
+        CassandraIterator iterator(cass_iterator_from_collection(getScalarResult()));
+        rtlRowBuilder out;
+        byte *outData = NULL;
+        size32_t outBytes = 0;
+        while (cass_iterator_next(iterator))
+        {
+            const CassValue *value = cass_iterator_get_value(iterator);
+            assertex(value);
+            if (elemSize != UNKNOWN_LENGTH)
+            {
+                out.ensureAvailable(outBytes + elemSize);
+                outData = out.getbytes() + outBytes;
+            }
+            switch ((type_t) elemType)
+            {
+            case type_int:
+                rtlWriteInt(outData, cassandraembed::getSignedResult(NULL, value), elemSize);
+                break;
+            case type_unsigned:
+                rtlWriteInt(outData, cassandraembed::getUnsignedResult(NULL, value), elemSize);
+                break;
+            case type_real:
+                if (elemSize == sizeof(double))
+                    * (double *) outData = cassandraembed::getRealResult(NULL, value);
+                else
+                {
+                    assertex(elemSize == sizeof(float));
+                    * (float *) outData = (float) cassandraembed::getRealResult(NULL, value);
+                }
+                break;
+            case type_boolean:
+                assertex(elemSize == sizeof(bool));
+                * (bool *) outData = cassandraembed::getBooleanResult(NULL, value);
+                break;
+            case type_string:
+            case type_varstring:
+            {
+                rtlDataAttr str;
+                size32_t lenBytes;
+                cassandraembed::getStringResult(NULL, value, lenBytes, str.refstr());
+                if (elemSize == UNKNOWN_LENGTH)
+                {
+                    if (elemType == type_string)
+                    {
+                        out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
+                        outData = out.getbytes() + outBytes;
+                        * (size32_t *) outData = lenBytes;
+                        rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, str.getstr());
+                        outBytes += lenBytes + sizeof(size32_t);
+                    }
+                    else
+                    {
+                        out.ensureAvailable(outBytes + lenBytes + 1);
+                        outData = out.getbytes() + outBytes;
+                        rtlStrToVStr(0, outData, lenBytes, str.getstr());
+                        outBytes += lenBytes + 1;
+                    }
+                }
+                else
+                {
+                    if (elemType == type_string)
+                        rtlStrToStr(elemSize, outData, lenBytes, str.getstr());
+                    else
+                        rtlStrToVStr(elemSize, outData, lenBytes, str.getstr());  // Fixed size null terminated strings... weird.
+                }
+                break;
+            }
+            case type_unicode:
+            case type_utf8:
+            {
+                rtlDataAttr str;
+                size32_t lenChars;
+                cassandraembed::getUTF8Result(NULL, value, lenChars, str.refstr());
+                const char * text =  str.getstr();
+                size32_t lenBytes = rtlUtf8Size(lenChars, text);
+                if (elemType == type_utf8)
+                {
+                    assertex (elemSize == UNKNOWN_LENGTH);
+                    out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
+                    outData = out.getbytes() + outBytes;
+                    * (size32_t *) outData = lenChars;
+                    rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
+                    outBytes += lenBytes + sizeof(size32_t);
+                }
+                else
+                {
+                    if (elemSize == UNKNOWN_LENGTH)
+                    {
+                        // You can't assume that number of chars in utf8 matches number in unicode16 ...
+                        size32_t numchars16;
+                        rtlDataAttr unicode16;
+                        rtlUtf8ToUnicodeX(numchars16, unicode16.refustr(), lenChars, text);
+                        out.ensureAvailable(outBytes + numchars16*sizeof(UChar) + sizeof(size32_t));
+                        outData = out.getbytes() + outBytes;
+                        * (size32_t *) outData = numchars16;
+                        rtlUnicodeToUnicode(numchars16, (UChar *) (outData+sizeof(size32_t)), numchars16, unicode16.getustr());
+                        outBytes += numchars16*sizeof(UChar) + sizeof(size32_t);
+                    }
+                    else
+                        rtlUtf8ToUnicode(elemSize / sizeof(UChar), (UChar *) outData, lenChars, text);
+                }
+                break;
+            }
+            default:
+                fail("type mismatch - unsupported return type");
+            }
+            if (elemSize != UNKNOWN_LENGTH)
+                outBytes += elemSize;
+        }
+        __isAllResult = false;
+        __resultBytes = outBytes;
+        __result = out.detachdata();
     }
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
@@ -1223,7 +1470,133 @@ public:
     }
     virtual void bindSetParam(const char *name, int elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
     {
-        UNSUPPORTED("SET parameters");  // Cassandra does support sets ? MIGHT be possible...
+        if (isAll)
+            UNSUPPORTED("SET(ALL)");
+        type_t typecode = (type_t) elemType;
+        const byte *inData = (const byte *) setData;
+        const byte *endData = inData + totalBytes;
+        int numElems;
+        if (elemSize == UNKNOWN_LENGTH)
+        {
+            numElems = 0;
+            // Will need 2 passes to work out how many elements there are in the set :(
+            while (inData < endData)
+            {
+                int thisSize;
+                switch (elemType)
+                {
+                case type_varstring:
+                    thisSize = strlen((const char *) inData) + 1;
+                    break;
+                case type_string:
+                    thisSize = * (size32_t *) inData + sizeof(size32_t);
+                    break;
+                case type_unicode:
+                    thisSize = (* (size32_t *) inData) * sizeof(UChar) + sizeof(size32_t);
+                    break;
+                case type_utf8:
+                    thisSize = rtlUtf8Size(* (size32_t *) inData, inData + sizeof(size32_t)) + sizeof(size32_t);
+                    break;
+                default:
+                    fail("Unsupported parameter type");
+                    break;
+                }
+                inData += thisSize;
+                numElems++;
+            }
+            inData = (const byte *) setData;
+        }
+        else
+            numElems = totalBytes / elemSize;
+        CassandraCollection collection(cass_collection_new(CASS_COLLECTION_TYPE_SET, numElems));
+        while (inData < endData)
+        {
+            size32_t thisSize = elemSize;
+            CassError rc;
+            switch (typecode)
+            {
+            case type_int:
+                if (elemSize > 4)
+                    rc = cass_collection_append_int64(collection, rtlReadInt(inData, elemSize));
+                else
+                    rc = cass_collection_append_int32(collection, rtlReadInt(inData, elemSize));
+                break;
+            case type_unsigned:
+                UNSUPPORTED("UNSIGNED columns");
+                break;
+            case type_varstring:
+            {
+                size32_t numChars = strlen((const char *) inData);
+                if (elemSize == UNKNOWN_LENGTH)
+                    thisSize = numChars + 1;
+                size32_t utf8chars;
+                rtlDataAttr utfText;
+                rtlStrToUtf8X(utf8chars, utfText.refstr(), numChars, (const char *) inData);
+                rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
+                break;
+            }
+            case type_string:
+            {
+                if (elemSize == UNKNOWN_LENGTH)
+                {
+                    thisSize = * (size32_t *) inData;
+                    inData += sizeof(size32_t);
+                }
+                size32_t utf8chars;
+                rtlDataAttr utfText;
+                rtlStrToUtf8X(utf8chars, utfText.refstr(), thisSize, (const char *) inData);
+                rc = cass_collection_append_string(collection, cass_string_init2(utfText.getstr(), rtlUtf8Size(utf8chars, utfText.getstr())));
+                break;
+            }
+            case type_real:
+                if (elemSize == sizeof(double))
+                    rc = cass_collection_append_double(collection, * (double *) inData);
+                else
+                    rc = cass_collection_append_float(collection, * (float *) inData);
+                break;
+            case type_boolean:
+                assertex(elemSize == sizeof(bool));
+                rc = cass_collection_append_bool(collection, *(bool*)inData ? cass_true : cass_false);
+                break;
+            case type_unicode:
+            {
+                if (elemSize == UNKNOWN_LENGTH)
+                {
+                    thisSize = (* (size32_t *) inData) * sizeof(UChar); // NOTE - it's in chars...
+                    inData += sizeof(size32_t);
+                }
+                unsigned unicodeChars;
+                rtlDataAttr unicode;
+                rtlUnicodeToUtf8X(unicodeChars, unicode.refstr(), thisSize / sizeof(UChar), (const UChar *) inData);
+                size32_t sizeBytes = rtlUtf8Size(unicodeChars, unicode.getstr());
+                rc = cass_collection_append_string(collection, cass_string_init2(unicode.getstr(), sizeBytes));
+                break;
+            }
+            case type_utf8:
+            {
+                assertex (elemSize == UNKNOWN_LENGTH);
+                size32_t numChars = * (size32_t *) inData;
+                inData += sizeof(size32_t);
+                thisSize = rtlUtf8Size(numChars, inData);
+                rc = cass_collection_append_string(collection, cass_string_init2((const char *) inData, thisSize));
+                break;
+            }
+            case type_data:
+                if (elemSize == UNKNOWN_LENGTH)
+                {
+                    thisSize = * (size32_t *) inData;
+                    inData += sizeof(size32_t);
+                }
+                rc = cass_collection_append_bytes(collection, cass_bytes_init((const cass_byte_t*) inData, thisSize));
+                break;
+            }
+            checkBind(rc, name);
+            inData += thisSize;
+        }
+        checkBind(cass_statement_bind_collection(stmtInfo->queryStatement(),
+                                                 checkNextParam(name),
+                                                 collection),
+                  name);
     }
 
     virtual void importFunction(size32_t lenChars, const char *text)
@@ -1234,8 +1607,8 @@ public:
     {
         // Incoming script is not necessarily null terminated. Note that the chars refers to utf8 characters and not bytes.
         size32_t len = rtlUtf8Size(chars, _script);
-        StringBuffer zscript(len, _script);
-        const char *script = zscript; // Now null terminated
+        queryString.set(_script, len);
+        const char *script = queryString.get(); // Now null terminated
         if ((flags & (EFnoreturn|EFnoparams)) == (EFnoreturn|EFnoparams))
         {
             loop
@@ -1270,8 +1643,18 @@ public:
         // Does not seem to be a way to check number of parameters expected...
         // if (nextParam != cass_statement_bind_count(stmtInfo))
         //    fail("Not enough parameters");
-        if (stmtInfo && !stmtInfo->hasResult())
-            lazyExecute();
+        try
+        {
+            if (stmtInfo && !stmtInfo->hasResult())
+                lazyExecute();
+        }
+        catch (IException *E)
+        {
+            StringBuffer msg;
+            E->errorMessage(msg);
+            msg.appendf(" (processing query %s)", queryString.get());
+            throw makeStringException(E->errorCode(), msg);
+        }
     }
 protected:
     void lazyExecute()
@@ -1393,6 +1776,7 @@ protected:
     unsigned nextParam;
     unsigned numParams;
     CassBatchType batchMode;
+    StringAttr queryString;
 
 };
 

+ 1 - 0
rtl/eclrtl/CMakeLists.txt

@@ -45,6 +45,7 @@ set (    SRCS
          rtlbcd.hpp
          rtldistr.hpp
          rtlds_imp.hpp
+         rtlembed.hpp
          rtlfield_imp.hpp
          rtlkey2.hpp
          rtlkey.hpp

+ 2 - 2
rtl/eclrtl/rtlembed.hpp

@@ -15,8 +15,8 @@
     limitations under the License.
 ############################################################################## */
 
-#ifndef rtlfield_hpp
-#define rtlfield_hpp
+#ifndef rtlembed_hpp
+#define rtlembed_hpp
 
 // NOTE - not included from generated code (see rtlfield_imp.hpp)
 

+ 71 - 18
testing/regress/ecl/cassandra-simple.ecl

@@ -25,6 +25,10 @@ IMPORT cassandra;
 // Note that the default values specified in the fields will be used when a NULL value is being
 // returned from Cassandra
 
+maprec := RECORD
+   string fromVal => string toVal
+ END;
+
 childrec := RECORD
    string name,
    integer4 value { default(99999) },
@@ -34,28 +38,48 @@ childrec := RECORD
    DATA d {default (D'999999')},
    DECIMAL10_2 ddd {default(9.99)},
    UTF8 u1 {default(U'9999 ß')},
-   UNICODE8 u2 {default(U'9999 ßßßß')}
+   UNICODE u2 {default(U'9999 ßßßß')},
+   SET OF STRING set1,
+   SET OF INTEGER4 list1,
+   LINKCOUNTED DICTIONARY(maprec) map1{linkcounted};
 END;
 
 // Some data we will use to initialize the Cassandra table
 
-init := DATASET([{'name1', 1, true, 1.2, 3.4, D'aa55aa55', 1234567.89, U'Straße', U'Straße'},
-                 {'name2', 2, false, 5.6, 7.8, D'00', -1234567.89, U'là', U'là'}], childrec);
+init := DATASET([{'name1', 1, true, 1.2, 3.4, D'aa55aa55', 1234567.89, U'Straße', U'Straße',['one','two','two','three'],[5,4,4,3],[{'a'=>'apple'},{'b'=>'banana'}]},
+                 {'name2', 2, false, 5.6, 7.8, D'00', -1234567.89, U'là', U'là',[],[],[]}], childrec);
 
-init2 := ROW({'name4' , 3, true, 9.10, 11.12, D'aa55aa55', 987.65, U'Baße', U'Baße'}, childrec);
+init2 := ROW({'name4' , 3, true, 9.10, 11.12, D'aa55aa55', 987.65, U'Baße', U'Baße',[],[],[]}, childrec);
 
 // Set up the Cassandra database
 // Note that we can execute multiple statements in a single embed, provided that there are
 // no parameters and no result type
 
-createks() := EMBED(cassandra : user('rchapman'))
+// Note that server will default to localhost if not specified...
+
+createks() := EMBED(cassandra : server('127.0.0.1'),user('rchapman'))
   CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3' } ;
 ENDEMBED;
 
-createTables() := EMBED(cassandra : user('rchapman'),keyspace('test'))
+createTables() := EMBED(cassandra : server('127.0.0.1'),user('rchapman'),keyspace('test'))
   DROP TABLE IF EXISTS tbl1;
 
-  CREATE TABLE tbl1 ( name VARCHAR, value INT, boolval boolean , r8 DOUBLE, r4 FLOAT, d BLOB, ddd VARCHAR, u1 VARCHAR, u2 VARCHAR, PRIMARY KEY (name) );
+  // Note that an ECL SET can map to either a SET or a LIST in Cassandra (it's actually closer to a LIST since repeated values are allowed and order is preserved)
+  // When stored in a Cassandra SET field, duplicates will be discarded and order lost.
+  // You can also use an ECL child dataset (with a single field) to map to a Cassandra SET or LIST.
+  CREATE TABLE tbl1 ( name VARCHAR,
+                      value INT,
+                      boolval boolean,
+                      r8 DOUBLE,
+                      r4 FLOAT,
+                      d BLOB,
+                      ddd VARCHAR,
+                      u1 VARCHAR,
+                      u2 VARCHAR,
+                      set1 SET<varchar>,
+                      list1 LIST<INT>,
+                      map1 MAP<VARCHAR, VARCHAR>,
+                      PRIMARY KEY (name) );
   CREATE INDEX tbl1_value  ON tbl1 (value);
   CREATE INDEX tbl1_boolval  ON tbl1 (boolval);
   INSERT INTO tbl1 (name, u1) values ('nulls', 'ß');  // Creates some null fields. Also note that query is utf8
@@ -68,27 +92,31 @@ ENDEMBED;
 // unless told to...
 
 initialize(dataset(childrec) values) := EMBED(cassandra : user('rchapman'),keyspace('test'),batch('unlogged'))
-  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2) values (?,?,?,?,?,?,?,?,?);
+  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1) values (?,?,?,?,?,?,?,?,?,?,?,?);
 ENDEMBED;
 
 initialize2(row(childrec) values) := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2) values (?,?,?,?,?,?,?,?,?);
+  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1) values (?,?,?,?,?,?,?,?,?,?,?,?);
 ENDEMBED;
 
 // Returning a dataset
 
 dataset(childrec) testCassandraDS() := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2 from tbl1;
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1 from tbl1;
 ENDEMBED;
 
 // Returning a single row
 
 childrec testCassandraRow() := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2 from tbl1 LIMIT 1;
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1 from tbl1 LIMIT 1;
 ENDEMBED;
 
 // Passing in parameters
 
+mapwrapper := RECORD
+  LINKCOUNTED DICTIONARY(maprec) map1;
+END;
+
 testCassandraParms(
    string name,
    integer4 value,
@@ -98,8 +126,14 @@ testCassandraParms(
    DATA d,
 //   DECIMAL10_2 ddd,
    UTF8 u1,
-   UNICODE8 u2) := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2) values (?,?,?,?,?,?,'8.76543',?,?);
+   UNICODE u2,
+   SET OF STRING set1,
+   SET OF INTEGER4 list1,
+   // Note we can't pass a dataset as a paramter to bind to a collection field - it would be interpreted as 'execute once per value in the dataset'
+   // You have to pass a record containing the field as a child dataset
+   ROW(mapwrapper) map1
+   ) := EMBED(cassandra : user('rchapman'),keyspace('test'))
+  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1) values (?,?,?,?,?,?,'8.76543',?,?,?,?,?);
 ENDEMBED;
 
 // Returning scalars
@@ -109,7 +143,7 @@ string testCassandraString() := EMBED(cassandra : user('rchapman'),keyspace('tes
 ENDEMBED;
 
 dataset(childrec) testCassandraStringParam(string filter) := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2 from tbl1 where name = ?;
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1 from tbl1 where name = ?;
 ENDEMBED;
 
 integer testCassandraInt() := EMBED(cassandra : user('rchapman'),keyspace('test'))
@@ -140,6 +174,21 @@ UNICODE testCassandraUnicode() := EMBED(cassandra : user('rchapman'),keyspace('t
   SELECT u2 from tbl1 WHERE name='name1';
 ENDEMBED;
 
+SET OF STRING testCassandraSet() := EMBED(cassandra : user('rchapman'),keyspace('test'))
+  SELECT set1 from tbl1 WHERE name='name1';
+ENDEMBED;
+
+SET OF INTEGER4 testCassandraList() := EMBED(cassandra : user('rchapman'),keyspace('test'))
+  SELECT list1 from tbl1 WHERE name='name1';
+ENDEMBED;
+
+// Just as you can't pass a dataset parameter to bind to a map column (only a child dataset of a record),
+// if you wanted to return just a map column you have to do so via a child dataset
+
+MapWrapper testCassandraMap() := EMBED(cassandra : user('rchapman'),keyspace('test'))
+  SELECT map1 from tbl1 WHERE name='name1';
+ENDEMBED;
+
 // Coding a TRANSFORM to call Cassandra - this ends up acting a little like a join
 
 stringrec := RECORD
@@ -147,7 +196,7 @@ stringrec := RECORD
 END;
 
 TRANSFORM(childrec) t(stringrec L) := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2 from tbl1 where name = ?;
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1 from tbl1 where name = ?;
 ENDEMBED;
 
 init3 := DATASET([{'name1'},
@@ -162,7 +211,7 @@ stringrec extractName(childrec l) := TRANSFORM
 END;
 
 dataset(childrec) testCassandraDSParam(dataset(stringrec) inrecs) := EMBED(cassandra : user('rchapman'),keyspace('test'))
-  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2 from tbl1 where name = ?;
+  SELECT name, value, boolval, r8, r4,d,ddd,u1,u2,set1,list1,map1 from tbl1 where name = ?;
 ENDEMBED;
 
 // Testing performance of batch inserts
@@ -190,7 +239,7 @@ sequential (
   createTables(),
   initialize(init),
 
-  testCassandraParms('name3', 1, true, 1.2, 3.4, D'aa55aa55', U'Straße', U'Straße'),
+  testCassandraParms('name3', 1, true, 1.2, 3.4, D'aa55aa55', U'Straße', U'Straße', ['four','five'], [2,2,3,1], ROW({[{'f'=>'fish'}]},MapWrapper)),
   initialize2(init2),
   OUTPUT(SORT(testCassandraDS(), name)),
   OUTPUT(testCassandraRow().name),
@@ -203,8 +252,12 @@ sequential (
   OUTPUT(testCassandraData()),
   OUTPUT(testCassandraUtf8()),
   OUTPUT(testCassandraUnicode()),
+  OUTPUT(testCassandraSet()),
+  OUTPUT(testCassandraList()),
+  OUTPUT(testCassandraMap().map1),
   OUTPUT(testCassandraTransform()),
   OUTPUT(testCassandraDSParam(PROJECT(init, extractName(LEFT)))),
   testCassandraBulk,
-  OUTPUT(testCassandraCount())
+  OUTPUT(testCassandraCount()),
+  OUTPUT('Done');
 );

+ 23 - 10
testing/regress/ecl/key/cassandra-simple.xml

@@ -1,9 +1,9 @@
 <Dataset name='Result 1'>
- <Row><name>name1</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2></Row>
- <Row><name>name2</name><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2></Row>
- <Row><name>name3</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>8.76</ddd><u1>Straße</u1><u2>Straße  </u2></Row>
- <Row><name>name4</name><value>3</value><boolval>true</boolval><r8>9.1</r8><r4>11.11999988555908</r4><d>6161353561613535</d><ddd>987.65</ddd><u1>Baße</u1><u2>Baße    </u2></Row>
- <Row><name>nulls</name><value>99999</value><boolval>true</boolval><r8>99.98999999999999</r8><r4>999.989990234375</r4><d>393939393939</d><ddd>9.99</ddd><u1>ß</u1><u2>9999 ßßß</u2></Row>
+ <Row><name>name1</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße</u2><set1><Item>one</Item><Item>three</Item><Item>two</Item></set1><list1><Item>5</Item><Item>4</Item><Item>4</Item><Item>3</Item></list1><map1><Row><fromval>b</fromval><toval>banana</toval></Row><Row><fromval>a</fromval><toval>apple</toval></Row></map1></Row>
+ <Row><name>name2</name><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là</u2><set1/><list1/><map1/></Row>
+ <Row><name>name3</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>8.76</ddd><u1>Straße</u1><u2>Straße</u2><set1><Item>five</Item><Item>four</Item></set1><list1><Item>2</Item><Item>2</Item><Item>3</Item><Item>1</Item></list1><map1><Row><fromval>f</fromval><toval>fish</toval></Row></map1></Row>
+ <Row><name>name4</name><value>3</value><boolval>true</boolval><r8>9.1</r8><r4>11.11999988555908</r4><d>6161353561613535</d><ddd>987.65</ddd><u1>Baße</u1><u2>Baße</u2><set1/><list1/><map1/></Row>
+ <Row><name>nulls</name><value>99999</value><boolval>true</boolval><r8>99.98999999999999</r8><r4>999.989990234375</r4><d>393939393939</d><ddd>9.99</ddd><u1>ß</u1><u2>9999 ßßßß</u2><set1/><list1/><map1/></Row>
 </Dataset>
 <Dataset name='Result 2'>
  <Row><Result_2>name1</Result_2></Row>
@@ -12,7 +12,7 @@
  <Row><Result_3>name1</Result_3></Row>
 </Dataset>
 <Dataset name='Result 4'>
- <Row><name>name1</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2></Row>
+ <Row><name>name1</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße</u2><set1><Item>one</Item><Item>three</Item><Item>two</Item></set1><list1><Item>5</Item><Item>4</Item><Item>4</Item><Item>3</Item></list1><map1><Row><fromval>b</fromval><toval>banana</toval></Row><Row><fromval>a</fromval><toval>apple</toval></Row></map1></Row>
 </Dataset>
 <Dataset name='Result 5'>
  <Row><Result_5>1</Result_5></Row>
@@ -33,14 +33,27 @@
  <Row><Result_10>Straße</Result_10></Row>
 </Dataset>
 <Dataset name='Result 11'>
- <Row><Result_11>Straße  </Result_11></Row>
+ <Row><Result_11>Straße</Result_11></Row>
 </Dataset>
 <Dataset name='Result 12'>
- <Row><name>name1</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße  </u2></Row>
- <Row><name>name2</name><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2></Row>
+ <Row><Result_12><Item>one</Item><Item>three</Item><Item>two</Item></Result_12></Row>
 </Dataset>
 <Dataset name='Result 13'>
+ <Row><Result_13><Item>5</Item><Item>4</Item><Item>4</Item><Item>3</Item></Result_13></Row>
 </Dataset>
 <Dataset name='Result 14'>
- <Row><Result_14>25001</Result_14></Row>
+ <Row><fromval>b</fromval><toval>banana</toval></Row>
+ <Row><fromval>a</fromval><toval>apple</toval></Row>
+</Dataset>
+<Dataset name='Result 15'>
+ <Row><name>name1</name><value>1</value><boolval>true</boolval><r8>1.2</r8><r4>3.400000095367432</r4><d>6161353561613535</d><ddd>1234567.89</ddd><u1>Straße</u1><u2>Straße</u2><set1><Item>one</Item><Item>three</Item><Item>two</Item></set1><list1><Item>5</Item><Item>4</Item><Item>4</Item><Item>3</Item></list1><map1><Row><fromval>b</fromval><toval>banana</toval></Row><Row><fromval>a</fromval><toval>apple</toval></Row></map1></Row>
+ <Row><name>name2</name><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là</u2><set1/><list1/><map1/></Row>
+</Dataset>
+<Dataset name='Result 16'>
+</Dataset>
+<Dataset name='Result 17'>
+ <Row><Result_17>25001</Result_17></Row>
+</Dataset>
+<Dataset name='Result 18'>
+ <Row><Result_18>Done</Result_18></Row>
 </Dataset>