Преглед на файлове

HPCC-14844 Cache database connections when using Embedded SQL

Cache multiple connections from same thread.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman преди 9 години
родител
ревизия
dc68f68e81
променени са 3 файла, в които са добавени 118 реда и са изтрити 25 реда
  1. 94 4
      plugins/mysql/mysqlembed.cpp
  2. 1 1
      testing/regress/ecl/key/mysqlembed.xml
  3. 23 20
      testing/regress/ecl/mysqlembed.ecl

+ 94 - 4
plugins/mysql/mysqlembed.cpp

@@ -956,6 +956,76 @@ protected:
 
 // Each call to a MySQL function will use a new MySQLEmbedFunctionContext object
 
+static __thread ThreadTermFunc threadHookChain;
+static __thread MySQLConnection *cachedConnection = NULL;
+static __thread const char *cachedServer = NULL;
+static __thread const char *cachedUser = NULL;
+static __thread const char *cachedPassword = NULL;
+static __thread const char *cachedDatabase = NULL;
+static __thread unsigned cachedPort = 0;
+
+static bool cachedConnectionMatches(const char *server, unsigned port, const char *user, const char *password, const char *database)
+{
+    return streq(server, cachedServer) && port==cachedPort && streq(user, cachedUser) && streq(password, cachedPassword) && streq(database, cachedDatabase);
+}
+
+static void clearCache()
+{
+    ::Release(cachedConnection);
+    cachedConnection = NULL;
+    free((void *) cachedServer);
+    free((void *) cachedUser);
+    free((void *) cachedPassword);
+    free((void *) cachedDatabase);
+    cachedServer = cachedUser = cachedPassword = cachedDatabase = NULL;
+    cachedPort = 0;
+}
+
+static bool mysqlInitialized = false;
+static __thread bool mysqlThreadInitialized = false;
+static CriticalSection initCrit;
+
+static void terminateMySqlThread()
+{
+    clearCache();
+    mysql_thread_end();
+    mysqlThreadInitialized = false;  // In case it was a threadpool thread...
+    if (threadHookChain)
+    {
+        (*threadHookChain)();
+        threadHookChain = NULL;
+    }
+}
+
+static void initializeMySqlThread()
+{
+    if (!mysqlThreadInitialized)
+    {
+        {
+            CriticalBlock b(initCrit);
+            if (!mysqlInitialized)
+            {
+                mysqlInitialized = true;
+                mysql_library_init(0, NULL, NULL);
+            }
+        }
+        mysql_thread_init();
+        threadHookChain = addThreadTermFunc(terminateMySqlThread);
+        mysqlThreadInitialized = true;
+    }
+}
+
+static void cacheConnection(MySQLConnection *connection, const char *server, unsigned port, const char *user, const char *password, const char *database)
+{
+    clearCache();
+    cachedServer = strdup(server);
+    cachedUser = strdup(user);
+    cachedPassword = strdup(password);
+    cachedDatabase = strdup(database);
+    cachedPort = port;
+    cachedConnection = LINK(connection);
+}
+
 class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
@@ -966,6 +1036,7 @@ public:
         const char *user = "";
         const char *password = "";
         const char *database = "";
+        bool caching = true;
         unsigned port = 0;
         StringArray opts;
         opts.appendList(options, ",");
@@ -987,13 +1058,32 @@ public:
                     password = val;
                 else if (stricmp(optName, "database")==0)
                     database = val;
+                else if (stricmp(optName, "cache")==0)
+                    caching = clipStrToBool(val);
             }
         }
-        conn.setown(new MySQLConnection(mysql_init(NULL)));
-        if (!mysql_real_connect(*conn, server, user, password, database, port, NULL, 0))
+        initializeMySqlThread();
+        if (caching && cachedConnection && cachedConnectionMatches(server, port, user, password, database))
+        {
+            conn.set(cachedConnection);
+        }
+        else
         {
-            VStringBuffer err("mysql: failed to connect (%s)", mysql_error(*conn));
-            rtlFail(0, err.str());
+            if (cachedConnection)
+            {
+                ::Release(cachedConnection);
+                cachedConnection = NULL;
+            }
+            conn.setown(new MySQLConnection(mysql_init(NULL)));
+            if (!mysql_real_connect(*conn, server, user, password, database, port, NULL, 0))
+            {
+                VStringBuffer err("mysql: failed to connect (%s)", mysql_error(*conn));
+                rtlFail(0, err.str());
+            }
+            if (caching)
+            {
+                cacheConnection(conn, server, port, user, password, database);
+            }
         }
     }
     virtual bool getBooleanResult()

+ 1 - 1
testing/regress/ecl/key/mysqlembed.xml

@@ -21,7 +21,7 @@
  <Row><name>name2</name><value>2</value><boolval>false</boolval><r8>5.6</r8><r4>7.800000190734863</r4><d>3030</d><ddd>-1234567.89</ddd><u1>là</u1><u2>là      </u2><dt>2015-12-25 01:23:45</dt></Row>
 </Dataset>
 <Dataset name='Result 7'>
- <Row><Result_7>2</Result_7></Row>
+ <Row><Result_7>4</Result_7></Row>
 </Dataset>
 <Dataset name='Result 8'>
  <Row><Result_8>true</Result_8></Row>

+ 23 - 20
testing/regress/ecl/mysqlembed.ecl

@@ -19,6 +19,7 @@
 
 IMPORT mysql;
 
+myServer := 'localhost' : stored('myServer');
 myUser := 'rchapman' : stored('myUser');
 myDb := 'test' : stored('myDb');
 
@@ -46,31 +47,31 @@ END;
 init := DATASET([{'name1', 1, true, 1.2, 3.4, D'aa55aa55', 1234567.89, U'Straße', U'Straße'},
                  {'name2', 2, false, 5.6, 7.8, D'00', -1234567.89, U'là', U'là', '2015-12-25 01:23:45' }], childrec);
 
-drop() := EMBED(mysql : user('rchapman'),database('test'))
+drop() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   DROP TABLE IF EXISTS tbl1;
 ENDEMBED;
 
-create() := EMBED(mysql : user('rchapman'),database('test'))
+create() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   CREATE TABLE tbl1 ( name VARCHAR(20), value INT, boolval TINYINT, r8 DOUBLE, r4 FLOAT, d BLOB, ddd DECIMAL(10,2), u1 VARCHAR(10), u2 VARCHAR(10), dt DATETIME );
 ENDEMBED;
 
-initialize(dataset(childrec) values) := EMBED(mysql : user(myUser),database(myDb))
+initialize(dataset(childrec) values) := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   INSERT INTO tbl1 values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
 ENDEMBED;
 
-initializeNulls() := EMBED(mysql : user('rchapman'),database(myDb))
+initializeNulls() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   INSERT INTO tbl1 (name) values ('nulls');
 ENDEMBED;
 
-initializeUtf8() := EMBED(mysql : user(myUser),database('test'))
+initializeUtf8() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   INSERT INTO tbl1 values ('utf8test', 1, 1, 1.2, 3.4, 'aa55aa55', 1234567.89, 'Straße', 'Straße', '2019-02-01 23:59:59');
 ENDEMBED;
 
-dataset(childrec) testMySQLDS() := EMBED(mysql : user('rchapman'),database('test'))
+dataset(childrec) testMySQLDS() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT * from tbl1;
 ENDEMBED;
 
-childrec testMySQLRow() := EMBED(mysql : user('rchapman'),database('test'))
+childrec testMySQLRow() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT * from tbl1 LIMIT 1;
 ENDEMBED;
 
@@ -82,47 +83,47 @@ childrec testMySQLParms(
    real4 r4,
    DATA d,
    UTF8 u1,
-   UNICODE8 u2) := EMBED(mysql : user('rchapman'),database('test'))
+   UNICODE8 u2) := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT * from tbl1 WHERE name=? AND value=? AND boolval=? AND r8=? AND r4=? AND d=? AND u1=? AND u2=?;
 ENDEMBED;
 
-string testMySQLString() := EMBED(mysql : user('rchapman'),database('test'))
+string testMySQLString() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(name) from tbl1;
 ENDEMBED;
 
-dataset(childrec) testMySQLStringParam(string filter) := EMBED(mysql : user('rchapman'),database('test'))
+dataset(childrec) testMySQLStringParam(string filter) := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT * from tbl1 where name = ?;
 ENDEMBED;
 
-dataset(childrec) testMySQLDSParam(dataset(stringrec) inrecs) := EMBED(mysql : user('rchapman'),database('test'))
+dataset(childrec) testMySQLDSParam(dataset(stringrec) inrecs) := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT * from tbl1 where name = ?;
 ENDEMBED;
 
-integer testMySQLInt() := EMBED(mysql : user('rchapman'),database('test'))
+integer testMySQLInt() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(value) from tbl1;
 ENDEMBED;
 
-boolean testMySQLBool() := EMBED(mysql : user('rchapman'),database('test'))
+boolean testMySQLBool() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(boolval) from tbl1;
 ENDEMBED;
 
-real8 testMySQLReal8() := EMBED(mysql : user('rchapman'),database('test'))
+real8 testMySQLReal8() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(r8) from tbl1;
 ENDEMBED;
 
-real4 testMySQLReal4() := EMBED(mysql : user('rchapman'),database('test'))
+real4 testMySQLReal4() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(r4) from tbl1;
 ENDEMBED;
 
-data testMySQLData() := EMBED(mysql : user('rchapman'),database('test'))
+data testMySQLData() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(d) from tbl1;
 ENDEMBED;
 
-UTF8 testMySQLUtf8() := EMBED(mysql : user('rchapman'),database('test'))
+UTF8 testMySQLUtf8() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(u1) from tbl1;
 ENDEMBED;
 
-UNICODE testMySQLUnicode() := EMBED(mysql : user('rchapman'),database('test'))
+UNICODE testMySQLUnicode() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT max(u2) from tbl1;
 ENDEMBED;
 
@@ -131,7 +132,7 @@ datetimerec := RECORD
    STRING19 dt2;
 END;
 
-dataset(datetimerec) testMySQLDateTime() := EMBED(mysql : user('rchapman'),database('test'))
+dataset(datetimerec) testMySQLDateTime() := EMBED(mysql : server(myServer),user(myUser),database(myDB))
   SELECT dt, dt from tbl1;
 ENDEMBED;
 
@@ -142,13 +143,14 @@ sequential (
   initialize(init),
   initializeNulls(),
   initializeUtf8(),
+  PARALLEL (
   OUTPUT(testMySQLDS()),
   OUTPUT(testMySQLRow().name),
   OUTPUT(testMySQLParms('name1', 1, true, 1.2, 3.4, D'aa55aa55', U'Straße', U'Straße')),
   OUTPUT(testMySQLString()),
   OUTPUT(testMySQLStringParam(testMySqlString())),
   OUTPUT(testMySQLDSParam(PROJECT(init, extractName(LEFT)))),
-  OUTPUT(testMySQLInt()),
+    OUTPUT(testMySQLInt()+testMySQLInt()),
   OUTPUT(testMySQLBool()),
   OUTPUT(testMySQLReal8()),
   OUTPUT(testMySQLReal4()),
@@ -156,4 +158,5 @@ sequential (
   OUTPUT(testMySQLUtf8()),
   OUTPUT(testMySQLUnicode()),
   OUTPUT(testMySQLDateTime())
+  )
 );