Jelajahi Sumber

HPCC-10458 Streamed dataset support for Javascript

Also some minor cleanup in Python support.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 tahun lalu
induk
melakukan
db3baad471
3 mengubah file dengan 384 tambahan dan 29 penghapusan
  1. 11 12
      plugins/pyembed/pyembed.cpp
  2. 1 0
      plugins/v8embed/CMakeLists.txt
  3. 372 17
      plugins/v8embed/v8embed.cpp

+ 11 - 12
plugins/pyembed/pyembed.cpp

@@ -829,7 +829,6 @@ public:
     PythonNamedTupleBuilder(PythonThreadContext *_sharedCtx, const RtlFieldInfo *_outerRow)
     : outerRow(_outerRow), sharedCtx(_sharedCtx)
     {
-        argcount = 0;
     }
     virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
     {
@@ -857,11 +856,15 @@ public:
     }
     virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
     {
-        UNIMPLEMENTED;
+        Decimal val;
+        val.setDecimal(digits, precision, value);
+        addArg(PyFloat_FromDouble(val.getReal()));
     }
     virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
     {
-        UNIMPLEMENTED;
+        Decimal val;
+        val.setUDecimal(digits, precision, value);
+        addArg(PyFloat_FromDouble(val.getReal()));
     }
     virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field)
     {
@@ -877,7 +880,10 @@ public:
     }
     virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
     {
-        UNIMPLEMENTED;
+        size32_t charCount;
+        rtlDataAttr text;
+        rtlQStrToStrX(charCount, text.refstr(), len, value);
+        processString(charCount, text.getstr(), field);
     }
     virtual void processSetAll(const RtlFieldInfo * field)
     {
@@ -935,30 +941,23 @@ protected:
     void push()
     {
         stack.append(args.getClear());
-        counts.append(argcount);
-        argcount = 0;
     }
     void pop()
     {
         addArg(args.getClear());
         args.setown((PyObject *) stack.pop());
-        argcount = counts.pop();
     }
     void addArg(PyObject *arg)
     {
-        if (!argcount)
+        if (!args)
         {
             args.setown(PyList_New(0));
         }
         PyList_Append(args, arg);
         Py_DECREF(arg);
     }
-    unsigned argcount;
     OwnedPyObject args;
-    OwnedPyObject names;
-    PointerArray namestack;
     PointerArray stack;
-    UnsignedArray counts;
     const RtlFieldInfo *outerRow;
     PythonThreadContext *sharedCtx;
 };

+ 1 - 0
plugins/v8embed/CMakeLists.txt

@@ -36,6 +36,7 @@ if (USE_V8)
              ${V8_INCLUDE_DIR}
              ./../../system/include
              ./../../rtl/eclrtl
+             ./../../rtl/nbcd
              ./../../rtl/include
              ./../../common/deftype
              ./../../system/jlib

+ 372 - 17
plugins/v8embed/v8embed.cpp

@@ -23,6 +23,10 @@
 #include "deftype.hpp"
 #include "eclrtl.hpp"
 #include "eclrtl_imp.hpp"
+#include "rtlds_imp.hpp"
+#include "rtlfield_imp.hpp"
+#include "nbcd.hpp"
+#include <vector>
 
 #ifdef _WIN32
 #define EXPORT __declspec(dllexport)
@@ -54,8 +58,364 @@ extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
     return true;
 }
 
+static void typeError(const char *expected, const RtlFieldInfo *field) __attribute__((noreturn));
+
+static void typeError(const char *expected, const RtlFieldInfo *field)
+{
+    VStringBuffer msg("v8embed: type mismatch - %s expected", expected);
+    if (field)
+        msg.appendf(" for field %s", field->name->str());
+    rtlFail(0, msg.str());
+}
+
 namespace javascriptLanguageHelper {
 
+// A JSRowBuilder object is used to construct an ECL row from a javascript object
+
+class JSRowBuilder : public CInterfaceOf<IFieldSource>
+{
+public:
+    JSRowBuilder(v8::Local<v8::Object> _row, const RtlFieldInfo *_outerRow)
+    : row(_row), outerRow(_outerRow), named(true), idx(0)
+    {
+    }
+    virtual bool getBooleanResult(const RtlFieldInfo *field)
+    {
+        return nextField(field)->BooleanValue();
+    }
+    virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void * &result)
+    {
+        UNIMPLEMENTED;
+    }
+    virtual double getRealResult(const RtlFieldInfo *field)
+    {
+        return v8::Number::Cast(*nextField(field))->Value();
+    }
+    virtual __int64 getSignedResult(const RtlFieldInfo *field)
+    {
+        return v8::Integer::Cast(*nextField(field))->Value();
+    }
+    virtual unsigned __int64 getUnsignedResult(const RtlFieldInfo *field)
+    {
+        return v8::Integer::Cast(*nextField(field))->Value();
+    }
+    virtual void getStringResult(const RtlFieldInfo *field, size32_t &chars, char * &result)
+    {
+        v8::String::AsciiValue ascii(nextField(field));
+        rtlStrToStrX(chars, result, ascii.length(), *ascii);
+    }
+    virtual void getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char * &result)
+    {
+        v8::Local<v8::Value> value = nextField(field);
+        if (!value->IsString())
+            typeError("string", field);
+        v8::String::Utf8Value utf8(value);
+        unsigned numchars = rtlUtf8Length(utf8.length(), *utf8);
+        rtlUtf8ToUtf8X(chars, result, numchars, *utf8);
+    }
+    virtual void getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar * &result)
+    {
+        v8::Local<v8::Value> value = nextField(field);
+        if (!value->IsString())
+            typeError("string", field);
+        v8::String::Utf8Value utf8(value);
+        unsigned numchars = rtlUtf8Length(utf8.length(), *utf8);
+        rtlUtf8ToUnicodeX(chars, result, numchars, *utf8);
+    }
+    virtual void getDecimalResult(const RtlFieldInfo *field, Decimal &value)
+    {
+        value.setReal(getRealResult(field));
+    }
+
+    virtual void processBeginSet(const RtlFieldInfo * field, bool &isAll)
+    {
+        isAll = false;
+        v8::Local<v8::Value> value = nextField(field);
+        if (!value->IsArray())
+            typeError("array", field);
+        push(false);
+        row = v8::Array::Cast(*value);
+    }
+    virtual bool processNextSet(const RtlFieldInfo * field)
+    {
+        assertex(!named);
+        return row->Has(idx);
+    }
+    virtual void processBeginDataset(const RtlFieldInfo * field)
+    {
+        v8::Local<v8::Value> value = nextField(field);
+        if (!value->IsArray())
+            typeError("array", field);
+        push(false);
+        row = v8::Array::Cast(*value);
+    }
+    virtual void processBeginRow(const RtlFieldInfo * field)
+    {
+        if (field != outerRow)
+        {
+            v8::Local<v8::Value> value = nextField(field);
+            if (!value->IsObject())
+                typeError("object", field);
+            push(true);
+            row = v8::Object::Cast(*value);
+        }
+    }
+    virtual bool processNextRow(const RtlFieldInfo * field)
+    {
+        assertex(!named);
+        return row->Has(idx);
+    }
+    virtual void processEndSet(const RtlFieldInfo * field)
+    {
+        pop();
+    }
+    virtual void processEndDataset(const RtlFieldInfo * field)
+    {
+        pop();
+    }
+    virtual void processEndRow(const RtlFieldInfo * field)
+    {
+        if (field != outerRow)
+            pop();
+    }
+protected:
+    void pop()
+    {
+        named = namedStack.pop();
+        idx = idxStack.pop();
+        row = stack.back();
+        stack.pop_back();
+    }
+    void push(bool _named)
+    {
+        namedStack.append(named);
+        idxStack.append(idx);
+        stack.push_back(row);
+        named = _named;
+        idx = 0;
+    }
+    v8::Local<v8::Value> nextField(const RtlFieldInfo * field)
+    {
+        v8::Local<v8::Value> v;
+        if (named)
+        {
+            v8::Local<v8::String> name = v8::String::New(field->name->str());
+            if (!row->Has(name))
+            {
+                VStringBuffer msg("v8embed: No value for field %s", field->name->str());
+                rtlFail(0, msg.str());
+            }
+            v = row->Get(name);
+        }
+        else
+        {
+            assertex(row->Has(idx));  // Logic in processNextXXX should have ensured
+            v = row->Get(idx++);
+        }
+        return v;
+    }
+    v8::Local<v8::Object> row; // current row, set, or dataset...
+    std::vector< v8::Local<v8::Object> > stack;
+    IntArray idxStack;
+    BoolArray namedStack;
+    const RtlFieldInfo *outerRow;
+    int idx;
+    bool named;
+};
+
+// A JSObjectBuilder object is used to construct a JS Object from an ECL row
+
+class JSObjectBuilder : public CInterfaceOf<IFieldProcessor>
+{
+public:
+    JSObjectBuilder(const RtlFieldInfo *_outerRow)
+    : outerRow(_outerRow)
+    {
+    }
+    virtual void processString(unsigned len, const char *value, const RtlFieldInfo * field)
+    {
+        size32_t utfCharCount;
+        rtlDataAttr utfText;
+        rtlStrToUtf8X(utfCharCount, utfText.refstr(), len, value);
+        processUtf8(utfCharCount, utfText.getstr(), field);
+    }
+    virtual void processBool(bool value, const RtlFieldInfo * field)
+    {
+        addProp(field, v8::Boolean::New(value));
+    }
+    virtual void processData(unsigned len, const void *value, const RtlFieldInfo * field)
+    {
+        v8::Local<v8::Array> array = v8::Array::New(len);
+        const byte *vval = (const byte *) value;
+        for (int i = 0; i < len; i++)
+        {
+            array->Set(v8::Number::New(i), v8::Integer::New(vval[i])); // feels horridly inefficient, but seems to be the expected approach
+        }
+        addProp(field, array);
+    }
+    virtual void processInt(__int64 value, const RtlFieldInfo * field)
+    {
+        addProp(field, v8::Integer::New(value));
+    }
+    virtual void processUInt(unsigned __int64 value, const RtlFieldInfo * field)
+    {
+        addProp(field, v8::Integer::NewFromUnsigned(value));
+    }
+    virtual void processReal(double value, const RtlFieldInfo * field)
+    {
+        addProp(field, v8::Number::New(value));
+    }
+    virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
+    {
+        Decimal val;
+        val.setDecimal(digits, precision, value);
+        addProp(field, v8::Number::New(val.getReal()));
+    }
+    virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo * field)
+    {
+        Decimal val;
+        val.setUDecimal(digits, precision, value);
+        addProp(field, v8::Number::New(val.getReal()));
+    }
+    virtual void processUnicode(unsigned len, const UChar *value, const RtlFieldInfo * field)
+    {
+        addProp(field, v8::String::New(value, len));
+    }
+    virtual void processQString(unsigned len, const char *value, const RtlFieldInfo * field)
+    {
+        size32_t charCount;
+        rtlDataAttr text;
+        rtlQStrToStrX(charCount, text.refstr(), len, value);
+        processString(charCount, text.getstr(), field);
+    }
+    virtual void processSetAll(const RtlFieldInfo * field)
+    {
+        rtlFail(0, "v8embed: ALL sets are not supported");
+    }
+    virtual void processUtf8(unsigned len, const char *value, const RtlFieldInfo * field)
+    {
+        addProp(field, v8::String::New(value, rtlUtf8Size(len, value)));
+    }
+
+    virtual bool processBeginSet(const RtlFieldInfo * field)
+    {
+        push();
+        return true;
+    }
+    virtual bool processBeginDataset(const RtlFieldInfo * field)
+    {
+        push();
+        return true;
+    }
+    virtual bool processBeginRow(const RtlFieldInfo * field)
+    {
+        if (field != outerRow)
+            push();
+        return true;
+    }
+    virtual void processEndSet(const RtlFieldInfo * field)
+    {
+        pop(field);
+    }
+    virtual void processEndDataset(const RtlFieldInfo * field)
+    {
+        pop(field);
+    }
+    virtual void processEndRow(const RtlFieldInfo * field)
+    {
+        if (field != outerRow)
+        {
+            pop(field);
+        }
+    }
+    v8::Local<v8::Object> getObject()
+    {
+        return obj;
+    }
+protected:
+    void push()
+    {
+        stack.push_back(obj);
+    }
+    void pop(const RtlFieldInfo * field)
+    {
+        addProp(field, obj);
+        obj = stack.back();
+        stack.pop_back();
+    }
+    void addProp(const RtlFieldInfo * field, v8::Handle<v8::Value> value)
+    {
+        if (obj.IsEmpty())
+            obj = v8::Object::New();
+        obj->Set(v8::String::New(field->name->str()), value);
+    }
+    v8::Local<v8::Object> obj;
+    std::vector< v8::Local<v8::Object> > stack;
+    const RtlFieldInfo *outerRow;
+};
+
+static size32_t getRowResult(v8::Handle<v8::Value> result, ARowBuilder &builder)
+{
+    if (result.IsEmpty() || !result->IsObject())
+        typeError("object", NULL);
+    v8::HandleScope scope;  // Probably not needed
+    v8::Local<v8::Object> row = v8::Object::Cast(*result);
+    const RtlTypeInfo *typeInfo = builder.queryAllocator()->queryOutputMeta()->queryTypeInfo();
+    assertex(typeInfo);
+    RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
+    JSRowBuilder jsRowBuilder(row, &dummyField);
+    return typeInfo->build(builder, 0, &dummyField, jsRowBuilder);
+}
+
+// An embedded javascript function that returns a dataset will return a JSRowStream object that can be
+// interrogated to return each row of the result in turn
+
+class JSRowStream : public CInterfaceOf<IRowStream>
+{
+public:
+    JSRowStream(v8::Handle<v8::Value> _result, IEngineRowAllocator *_resultAllocator)
+    : rowIdx(0), resultAllocator(_resultAllocator)
+    {
+        if (_result.IsEmpty() || !_result->IsArray())
+            typeError("array", NULL);
+        result = v8::Persistent<v8::Array>(v8::Array::Cast(*_result));
+    }
+    ~JSRowStream()
+    {
+        result.Dispose();
+    }
+    virtual const void *nextRow()
+    {
+        if (result.IsEmpty())
+            return NULL;
+        v8::HandleScope scope;
+        if (!result->Has(rowIdx))
+        {
+            stop();
+            return NULL;
+        }
+        v8::Local<v8::Value> row = result->Get(rowIdx);
+        rowIdx++;
+        if (!row->IsObject())
+            typeError("object", NULL);
+        v8::Local<v8::Object> rowObject = v8::Object::Cast(*row);
+        RtlDynamicRowBuilder rowBuilder(resultAllocator);
+        size32_t len = javascriptLanguageHelper::getRowResult(rowObject, rowBuilder);
+        return rowBuilder.finalizeRowClear(len);
+    }
+    virtual void stop()
+    {
+        resultAllocator.clear();
+        result.Clear();
+    }
+
+protected:
+    Linked<IEngineRowAllocator> resultAllocator;
+    unsigned numRows;
+    unsigned rowIdx;
+    v8::Persistent<v8::Array> result;
+};
+
 class V8JavascriptEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
@@ -248,7 +608,13 @@ public:
     }
     virtual void bindRowParam(const char *name, IOutputMetaData & metaVal, byte *val)
     {
-        UNIMPLEMENTED;
+        v8::HandleScope handle_scope;
+        const RtlTypeInfo *typeInfo = metaVal.queryTypeInfo();
+        assertex(typeInfo);
+        RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
+        JSObjectBuilder objBuilder(&dummyField);
+        typeInfo->process(val, val, &dummyField, objBuilder); // Creates a JS object from the incoming ECL row
+        context->Global()->Set(v8::String::New(name), objBuilder.getObject());
     }
     virtual void bindDatasetParam(const char *name, IOutputMetaData & metaVal, IRowStream * val)
     {
@@ -258,7 +624,6 @@ public:
     virtual bool getBooleanResult()
     {
         assertex (!result.IsEmpty());
-        v8::HandleScope handle_scope;
         return result->BooleanValue();
     }
     virtual void getDataResult(size32_t &__len, void * &__result)
@@ -445,27 +810,17 @@ public:
 
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
-        assertex (!result.IsEmpty());
-        if (!result->IsArray())
-            rtlFail(0, "v8embed: type mismatch - return value was not an array");
-        UNIMPLEMENTED;
-//        resultIterator.setown(new ArrayIterator(result);
-//        resultAllocator.set(_resultAllocator);
-//        return LINK(this);
+        return new JSRowStream(result, _resultAllocator);
     }
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     {
-        UNIMPLEMENTED;
+        RtlDynamicRowBuilder rowBuilder(_resultAllocator);
+        size32_t len = javascriptLanguageHelper::getRowResult(result, rowBuilder);
+        return (byte *) rowBuilder.finalizeRowClear(len);
     }
     virtual size32_t getTransformResult(ARowBuilder & builder)
     {
-        UNIMPLEMENTED;
-    }
-    virtual const void *nextRow()
-    {
-//        assertex(resultAllocator);
-//        assertex(resultIterator);
-        UNIMPLEMENTED;
+        return javascriptLanguageHelper::getRowResult(result, builder);
     }
     virtual void compileEmbeddedScript(size32_t lenChars, const char *utf)
     {