Browse Source

HPCC-10461 Add additional plugins for external databases

Additional MySQL fixes, including lazy streamed inputs, and minor fixup
spotted in code review.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
464ad2cafc

+ 41 - 0
cmake_modules/FindSQLITE3.cmake

@@ -0,0 +1,41 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+# - Try to find the Sqlite3 headers and libraries
+# Once done this will define
+#
+#  SQLITE3_FOUND - system has the SQLITE3 headers and library
+#  SQLITE3_INCLUDE_DIR - the SQLITE3 include directory
+#  SQLITE3_LIBRARIES - The libraries needed to use SQLITE3
+
+IF (NOT SQLITE3_FOUND)
+  IF (WIN32)
+    SET (sqlite3_lib "libsqlite3")
+  ELSE()
+    SET (sqlite3_lib "sqlite3")
+  ENDIF()
+
+  FIND_PATH (SQLITE3_INCLUDE_DIR NAMES sqlite3.h)
+  FIND_LIBRARY (SQLITE3_LIBRARIES NAMES ${sqlite3_lib})
+
+  include(FindPackageHandleStandardArgs)
+  find_package_handle_standard_args(SQLITE3 DEFAULT_MSG
+    SQLITE3_LIBRARIES
+    SQLITE3_INCLUDE_DIR
+  )
+
+  MARK_AS_ADVANCED(SQLITE3_INCLUDE_DIR SQLITE3_LIBRARIES)
+ENDIF()

+ 3 - 2
plugins/mysql/CMakeLists.txt

@@ -26,8 +26,8 @@
 project( mysqlembed )
 
 if (USE_MYSQL)
-  ADD_PLUGIN(mysqlembed PACKAGES mysql-client OPTION MAKE_MYSQL)
-  if ( 1 ) # MAKE_MYSQL )
+  ADD_PLUGIN(mysqlembed PACKAGES MYSQL OPTION MAKE_MYSQLEMBED)
+  if ( MAKE_MYSQLEMBED )
     set ( SRCS
           mysqlembed.cpp
         )
@@ -35,6 +35,7 @@ if (USE_MYSQL)
     include_directories (
          ./../../system/include
          ./../../rtl/eclrtl
+         ./../../roxie/roxiemem
          ./../../rtl/include
          ./../../rtl/nbcd
          ./../../common/deftype

+ 102 - 48
plugins/mysql/mysqlembed.cpp

@@ -26,6 +26,7 @@
 #include "eclrtl_imp.hpp"
 #include "rtlds_imp.hpp"
 #include "rtlfield_imp.hpp"
+#include "roxiemem.hpp"
 #include "nbcd.hpp"
 
 #ifdef _WIN32
@@ -291,6 +292,10 @@ public:
     {
         return inputBindings;
     }
+    inline bool hasResult() const
+    {
+        return *res != NULL;
+    }
 protected:
     Linked<MySQLConnection> conn;
     Linked<MySQLStatement> stmt;
@@ -547,42 +552,6 @@ protected:
     int colIdx;
 };
 
-// A MySQL function that returns a dataset will return a MySQLRowStream object that can be
-// interrogated to return each row of the result in turn
-
-class MySQLRowStream : public CInterfaceOf<IRowStream>
-{
-public:
-    MySQLRowStream(MySQLPreparedStatement *_stmtInfo, IEngineRowAllocator *_resultAllocator)
-    : stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
-    {
-    }
-    virtual const void *nextRow()
-    {
-        if (!stmtInfo->next())
-        {
-            stop();
-            return NULL;
-        }
-        RtlDynamicRowBuilder rowBuilder(resultAllocator);
-        MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
-        const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
-        assertex(typeInfo);
-        RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
-        size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
-        return rowBuilder.finalizeRowClear(len);
-    }
-    virtual void stop()
-    {
-        resultAllocator.clear();
-        stmtInfo->stop();
-    }
-
-protected:
-    Linked<MySQLPreparedStatement> stmtInfo;
-    Linked<IEngineRowAllocator> resultAllocator;
-};
-
 // Bind MySQL variables from an ECL record
 
 class MySQLRecordBinder : public CInterfaceOf<IFieldProcessor>
@@ -743,6 +712,7 @@ protected:
 };
 
 //
+
 class MySQLDatasetBinder : public MySQLRecordBinder
 {
 public:
@@ -750,14 +720,18 @@ public:
       : input(_input), MySQLRecordBinder(_typeInfo, _bindings, _firstParam)
     {
     }
+    bool bindNext()
+    {
+        roxiemem::OwnedConstRoxieRow nextRow = (const byte *) input->ungroupedNextRow();
+        if (!nextRow)
+            return false;
+        processRow((const byte *) nextRow.get());   // Bind the variables for the current row
+        return true;
+    }
     void executeAll(MySQLPreparedStatement *stmtInfo)
     {
-        loop
+        while (bindNext())
         {
-            const byte *nextRow = (const byte *) input->ungroupedNextRow();
-            if (!nextRow)
-                break;
-            processRow(nextRow);   // Bind the variables for the current row
             stmtInfo->execute();
         }
     }
@@ -765,6 +739,75 @@ protected:
     Owned<IRowStream> input;
 };
 
+// A MySQL function that returns a dataset will return a MySQLRowStream object that can be
+// interrogated to return each row of the result in turn
+
+class MySQLRowStream : public CInterfaceOf<IRowStream>
+{
+public:
+    MySQLRowStream(MySQLDatasetBinder *_inputStream, MySQLPreparedStatement *_stmtInfo, IEngineRowAllocator *_resultAllocator)
+    : inputStream(_inputStream), stmtInfo(_stmtInfo), resultAllocator(_resultAllocator)
+    {
+        executePending = true;
+        eof = false;
+    }
+    virtual const void *nextRow()
+    {
+        // A little complex when streaming data in as well as out - want to execute for every input record
+        if (eof)
+            return NULL;
+        loop
+        {
+            if (executePending)
+            {
+                executePending = false;
+                if (inputStream && !inputStream->bindNext())
+                {
+                    noteEOF();
+                    return NULL;
+                }
+                stmtInfo->execute();
+            }
+            if (stmtInfo->next())
+                break;
+            if (inputStream)
+                executePending = true;
+            else
+            {
+                noteEOF();
+                return NULL;
+            }
+        }
+        RtlDynamicRowBuilder rowBuilder(resultAllocator);
+        MySQLRowBuilder mysqlRowBuilder(stmtInfo->queryResultBindings());
+        const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo();
+        assertex(typeInfo);
+        RtlFieldStrInfo dummyField("<row>", NULL, typeInfo);
+        size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, mysqlRowBuilder);
+        return rowBuilder.finalizeRowClear(len);
+    }
+    virtual void stop()
+    {
+        resultAllocator.clear();
+        stmtInfo->stop();
+    }
+
+protected:
+    void noteEOF()
+    {
+        if (!eof)
+        {
+            eof = true;
+            stop();
+        }
+    }
+    Linked<MySQLDatasetBinder> inputStream;
+    Linked<MySQLPreparedStatement> stmtInfo;
+    Linked<IEngineRowAllocator> resultAllocator;
+    bool executePending;
+    bool eof;
+};
+
 // Each call to a MySQL function will use a new MySQLEmbedFunctionContext object
 
 class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
@@ -859,17 +902,20 @@ public:
     }
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
-        return new MySQLRowStream(stmtInfo, _resultAllocator);
+        return new MySQLRowStream(inputStream, stmtInfo, _resultAllocator);
     }
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     {
-        MySQLRowStream stream(stmtInfo, _resultAllocator);
-        byte * ret = (byte *) stream.nextRow();
-        byte * ret2 = (byte *) stream.nextRow();
+        if (!stmtInfo->hasResult())
+            typeError("row", NULL);
+        lazyExecute();
+        MySQLRowStream stream(NULL, stmtInfo, _resultAllocator);
+        roxiemem::OwnedConstRoxieRow ret = stream.nextRow();
+        roxiemem::OwnedConstRoxieRow ret2 = stream.nextRow();
         stream.stop();
         if (ret ==  NULL || ret2 != NULL)  // Check for exactly one returned row
             typeError("row", NULL);
-        return ret;
+        return (byte *) ret.getClear();
     }
     virtual size32_t getTransformResult(ARowBuilder & rowBuilder)
     {
@@ -990,15 +1036,23 @@ public:
     {
         if (nextParam != stmtInfo->queryInputBindings().numColumns())
             fail("Not enough parameters");
+        if (!stmtInfo->hasResult())
+            lazyExecute();
+    }
+protected:
+    void lazyExecute()
+    {
         if (inputStream)
             inputStream->executeAll(stmtInfo);
         else
             stmtInfo->execute();
     }
-protected:
     const MYSQL_BIND &getScalarResult()
     {
-        if (!stmtInfo->next() || stmtInfo->queryResultBindings().numColumns() != 1)
+        if (!stmtInfo->hasResult() || stmtInfo->queryResultBindings().numColumns() != 1)
+            typeError("scalar", NULL);
+        lazyExecute();
+        if (!stmtInfo->next())
             typeError("scalar", NULL);
         return stmtInfo->queryResultBindings().queryColumn(0);
     }

+ 4 - 3
plugins/sqlite3/CMakeLists.txt

@@ -26,8 +26,8 @@
 project( sqlite3embed )
 
 if (USE_SQLITE3)
-  ADD_PLUGIN(sqlite3embed PACKAGES sqlite3 OPTION MAKE_SQLITE3)
-  if ( 1 ) # MAKE_SQLITE3 )
+  ADD_PLUGIN(sqlite3embed PACKAGES SQLITE3 OPTION MAKE_SQLITEEMBED)
+  if ( MAKE_SQLITEEMBED )
     set ( SRCS
           sqlite3.cpp
         )
@@ -39,6 +39,7 @@ if (USE_SQLITE3)
          ./../../rtl/nbcd
          ./../../common/deftype
          ./../../system/jlib
+         ${SQLITE3_INCLUDE_DIR}
        )
 
     ADD_DEFINITIONS( -D_USRDLL -DSQLITE3_EXPORTS )
@@ -51,7 +52,7 @@ if (USE_SQLITE3)
     endif()
 
     install ( TARGETS sqlite3embed DESTINATION plugins )
-    target_link_libraries ( sqlite3embed sqlite3 )
+    target_link_libraries ( sqlite3embed ${SQLITE3_LIBRARIES} )
 
     target_link_libraries ( sqlite3embed
         eclrtl

+ 13 - 0
testing/ecl/mysqlembed.ecl

@@ -29,6 +29,14 @@ childrec := RECORD
    UNICODE8 u2
 END;
 
+stringrec := RECORD
+   string name
+END;
+
+stringrec extractName(childrec l) := TRANSFORM
+  SELF := l;
+END;
+
 init := DATASET([{'name1', 1, true, 1.2, 3.4, D'aa55aa55', 1234567.89, U'Straße', U'Straße'},
                  {'name2', 2, false, 5.6, 7.8, D'00', -1234567.89, U'là', U'là'}], childrec);
 
@@ -72,6 +80,10 @@ dataset(childrec) testMySQLStringParam(string filter) := EMBED(mysql : user('rch
   SELECT * from tbl1 where name = ?;
 ENDEMBED;
 
+dataset(childrec) testMySQLDSParam(dataset(stringrec) inrecs) := EMBED(mysql : user('rchapman'),database('test'))
+  SELECT * from tbl1 where name = ?;
+ENDEMBED;
+
 integer testMySQLInt() := EMBED(mysql : user('rchapman'),database('test'))
   SELECT max(value) from tbl1;
 ENDEMBED;
@@ -109,6 +121,7 @@ sequential (
   OUTPUT(testMySQLParms('name1', 1, true, 1.2, 3.4, D'aa55aa55', U'Straße', U'Straße')),
   OUTPUT(testMySQLString()),
   OUTPUT(testMySQLStringParam(testMySqlString())),
+  OUTPUT(testMySQLDSParam(PROJECT(init, extractName(LEFT)))),
   OUTPUT(testMySQLInt()),
   OUTPUT(testMySQLBool()),
   OUTPUT(testMySQLReal8()),