Pārlūkot izejas kodu

Merge pull request #8997 from richardkchapman/mysql-cache-stale

HPCC-16085 MySQL plugin connection cache needs to retry on failure

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 gadi atpakaļ
vecāks
revīzija
4eb88b037f
1 mainītis faili ar 252 papildinājumiem un 225 dzēšanām
  1. 252 225
      plugins/mysql/mysqlembed.cpp

+ 252 - 225
plugins/mysql/mysqlembed.cpp

@@ -99,6 +99,100 @@ static __thread MySQLConnection *threadCachedConnection = nullptr;
 
 #define MAX_GLOBAL_CACHE 10
 
+enum MySQLOptionParamType
+{
+    ParamTypeNone,
+    ParamTypeString,
+    ParamTypeUInt,
+    ParamTypeULong,
+    ParamTypeBool
+};
+
+struct MySQLOptionDefinition
+{
+    const char *name;
+    enum mysql_option option;
+    MySQLOptionParamType paramType;
+};
+
+#define addoption(a,b) { #a, a, b }
+
+MySQLOptionDefinition options[] =
+{
+    addoption(MYSQL_OPT_COMPRESS, ParamTypeNone),
+    addoption(MYSQL_OPT_CONNECT_TIMEOUT, ParamTypeUInt),
+    addoption(MYSQL_OPT_GUESS_CONNECTION, ParamTypeNone),
+    addoption(MYSQL_OPT_LOCAL_INFILE, ParamTypeUInt),
+    addoption(MYSQL_OPT_NAMED_PIPE, ParamTypeNone),
+    addoption(MYSQL_OPT_PROTOCOL, ParamTypeUInt),
+    addoption(MYSQL_OPT_READ_TIMEOUT, ParamTypeUInt),
+    addoption(MYSQL_OPT_RECONNECT, ParamTypeBool),
+    addoption(MYSQL_OPT_SSL_VERIFY_SERVER_CERT, ParamTypeBool),
+    addoption(MYSQL_OPT_USE_EMBEDDED_CONNECTION, ParamTypeNone),
+    addoption(MYSQL_OPT_USE_REMOTE_CONNECTION, ParamTypeNone),
+    addoption(MYSQL_OPT_USE_RESULT, ParamTypeNone),
+    addoption(MYSQL_OPT_WRITE_TIMEOUT, ParamTypeUInt),
+    addoption(MYSQL_READ_DEFAULT_FILE, ParamTypeString),
+    addoption(MYSQL_READ_DEFAULT_GROUP, ParamTypeString),
+    addoption(MYSQL_REPORT_DATA_TRUNCATION, ParamTypeBool),
+    addoption(MYSQL_SECURE_AUTH, ParamTypeBool),
+    addoption(MYSQL_SET_CHARSET_DIR, ParamTypeString),
+    addoption(MYSQL_SET_CHARSET_NAME, ParamTypeString),
+    addoption(MYSQL_SET_CLIENT_IP, ParamTypeString),
+    addoption(MYSQL_SHARED_MEMORY_BASE_NAME, ParamTypeString),
+#if MYSQL_VERSION_ID >= 50507
+    addoption(MYSQL_DEFAULT_AUTH, ParamTypeString),
+    addoption(MYSQL_PLUGIN_DIR, ParamTypeString),
+#endif
+#if (MYSQL_VERSION_ID >= 50601)
+    addoption(MYSQL_OPT_BIND, ParamTypeString),
+#endif
+#if (MYSQL_VERSION_ID >= 50603)
+    addoption(MYSQL_OPT_SSL_CA, ParamTypeString),
+    addoption(MYSQL_OPT_SSL_CAPATH, ParamTypeString),
+    addoption(MYSQL_OPT_SSL_CERT, ParamTypeString),
+    addoption(MYSQL_OPT_SSL_CIPHER, ParamTypeString),
+    addoption(MYSQL_OPT_SSL_CRL, ParamTypeString),
+    addoption(MYSQL_OPT_SSL_CRLPATH, ParamTypeString),
+    addoption(MYSQL_OPT_SSL_KEY, ParamTypeString),
+#endif
+#if (MYSQL_VERSION_ID >= 50606)
+    addoption(MYSQL_SERVER_PUBLIC_KEY, ParamTypeString),
+#endif
+#if (MYSQL_VERSION_ID >= 50527 && MYSQL_VERSION_ID < 50600) || MYSQL_VERSION_ID >= 50607
+    addoption(MYSQL_ENABLE_CLEARTEXT_PLUGIN, ParamTypeBool),
+#endif
+    addoption(MYSQL_INIT_COMMAND, ParamTypeString),
+#if (MYSQL_VERSION_ID >= 50610)
+    addoption(MYSQL_OPT_CAN_HANDLE_EXPIRED_PASSWORDS, ParamTypeBool),
+#endif
+#if (MYSQL_VERSION_ID >= 50703)
+    addoption(MYSQL_OPT_SSL_ENFORCE, ParamTypeBool),
+#endif
+#if (MYSQL_VERSION_ID >= 50709)
+    addoption(MYSQL_OPT_MAX_ALLOWED_PACKET, ParamTypeULong),
+    addoption(MYSQL_OPT_NET_BUFFER_LENGTH, ParamTypeULong),
+#endif
+#if (MYSQL_VERSION_ID >= 50710)
+    addoption(MYSQL_OPT_TLS_VERSION, ParamTypeString),
+#endif
+#if (MYSQL_VERSION_ID >= 50711)
+    addoption(MYSQL_OPT_SSL_MODE, ParamTypeUInt),
+#endif
+    { nullptr, (enum mysql_option) 0, ParamTypeNone }
+};
+
+static MySQLOptionDefinition &lookupOption(const char *optName)
+{
+    for (MySQLOptionDefinition *optDef = options; optDef->name != nullptr; optDef++)
+    {
+        if (stricmp(optName, optDef->name)==0)
+            return *optDef;
+    }
+    failx("Unknown option %s", optName);
+}
+
+
 class MySQLConnection : public CInterface
 {
 public:
@@ -148,31 +242,142 @@ public:
         return _options && cacheOptions && streq(_options, cacheOptions);
     }
 
-    static MySQLConnection *findCachedConnection(const char *options, bool threadCache, bool globalCache)
+    static MySQLConnection *findCachedConnection(const char *options, bool bypassCache)
     {
-        if (threadCache)
+        const char *server = "localhost";
+        const char *user = "";
+        const char *password = "";
+        const char *database = "";
+        bool hasMySQLOpt = false;
+        bool threadCache = false;
+        bool globalCache = true;
+        unsigned port = 0;
+        StringArray opts;
+        opts.appendList(options, ",");
+        ForEachItemIn(idx, opts)
+        {
+            const char *opt = opts.item(idx);
+            const char *val = strchr(opt, '=');
+            if (val)
+            {
+                StringBuffer optName(val-opt, opt);
+                val++;
+                if (stricmp(optName, "server")==0)
+                    server = val;   // Note that lifetime of val is adequate for this to be safe
+                else if (stricmp(optName, "port")==0)
+                    port = atoi(val);
+                else if (stricmp(optName, "user")==0)
+                    user = val;
+                else if (stricmp(optName, "password")==0)
+                    password = val;
+                else if (stricmp(optName, "database")==0)
+                    database = val;
+                else if (stricmp(optName, "cache")==0)
+                {
+                    if (clipStrToBool(val) || strieq(val, "thread"))
+                    {
+                        threadCache = true;
+                        globalCache = false;
+                    }
+                    else if (strieq(val, "global"))
+                        globalCache = true;
+                    else if (strieq(val, "none") || strieq(val, "false") || strieq(val, "off") || strieq(val, "0"))
+                        globalCache = false;
+                    else
+                        failx("Unknown cache option %s", val);
+                }
+                else if (strnicmp(optName, "MYSQL_", 6)==0)
+                    hasMySQLOpt = true;
+                else
+                    failx("Unknown option %s", optName.str());
+            }
+        }
+        if (!bypassCache)
         {
-            if (threadCachedConnection && threadCachedConnection->matches(options))
+            if (threadCache)
             {
-                MySQLConnection *ret = threadCachedConnection;
-                threadCachedConnection = nullptr;
-                return ret;
+                if (threadCachedConnection && threadCachedConnection->matches(options))
+                {
+                    MySQLConnection *ret = threadCachedConnection;
+                    threadCachedConnection = nullptr;
+                    return ret;
+                }
+            }
+            else if (globalCache)
+            {
+                CriticalBlock b(globalCacheCrit);
+                ForEachItemIn(idx, globalCachedConnections)
+                {
+                    MySQLConnection &cached = globalCachedConnections.item(idx);
+                    if (cached.matches(options))
+                    {
+                        globalCachedConnections.remove(idx, true);
+                        return &cached;
+                    }
+                }
             }
         }
-        else if (globalCache)
+        MySQLConnection::clearThreadCache();
+        Owned<MySQLConnection> newConn = new MySQLConnection(mysql_init(NULL), options, threadCache, globalCache);
+        if (hasMySQLOpt)
         {
-            CriticalBlock b(globalCacheCrit);
-            ForEachItemIn(idx, globalCachedConnections)
+            ForEachItemIn(idx, opts)
             {
-                MySQLConnection &cached = globalCachedConnections.item(idx);
-                if (cached.matches(options))
+                const char *opt = opts.item(idx);
+                if (strnicmp(opt, "MYSQL_", 6)==0)
                 {
-                    globalCachedConnections.remove(idx, true);
-                    return &cached;
+                    const char *val = strchr(opt, '=');
+                    StringBuffer optName(opt);
+                    if (val)
+                    {
+                        optName.setLength(val-opt);
+                        val++;
+                    }
+                    MySQLOptionDefinition &optDef = lookupOption(optName);
+                    int rc;
+                    if (optDef.paramType == ParamTypeNone)
+                    {
+                        if (val)
+                            failx("Option %s does not take a value", optName.str());
+                        rc = mysql_options(*newConn, optDef.option, nullptr);
+                    }
+                    else
+                    {
+                        if (!val)
+                            failx("Option %s requires a value", optName.str());
+                        switch (optDef.paramType)
+                        {
+                        case ParamTypeString:
+                            rc = mysql_options(*newConn, optDef.option, val);
+                            break;
+                        case ParamTypeUInt:
+                            {
+                                unsigned int oval = strtoul(val, nullptr, 10);
+                                rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
+                                break;
+                            }
+                        case ParamTypeULong:
+                            {
+                                unsigned long oval = strtoul(val, nullptr, 10);
+                                rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
+                                break;
+                            }
+                        case ParamTypeBool:
+                            {
+                                my_bool oval = clipStrToBool(val);
+                                rc = mysql_options(*newConn, optDef.option, (const char *) &oval);
+                                break;
+                            }
+                        }
+                    }
+                    if (rc)
+                        failx("Failed to set option %s (%s)", optName.str(), mysql_error(*newConn));
                 }
             }
         }
-        return nullptr;
+        if (!mysql_real_connect(*newConn, server, user, password, database, port, NULL, 0))
+            failx("Failed to connect (%s)", mysql_error(*newConn));
+        return newConn.getClear();
     }
 
     static void clearThreadCache()
@@ -187,6 +392,18 @@ public:
         threadCachedConnection = connection;
     }
 
+    bool wasCached() const
+    {
+        return reusing;
+    }
+
+    MySQLConnection *reopen()
+    {
+        threadCached = false;
+        globalCached = false;
+        return findCachedConnection(cacheOptions, true);
+    }
+
 private:
     MySQLConnection(const MySQLConnection &from)
     {
@@ -194,6 +411,7 @@ private:
         cacheOptions = from.cacheOptions;  // Taking over ownership
         threadCached = from.threadCached;
         globalCached = from.globalCached;
+        reusing = true;
     }
 
     static CIArrayOf<MySQLConnection> globalCachedConnections;
@@ -203,6 +421,7 @@ private:
     const char *cacheOptions;  // Not done as a StringAttr, in order to avoid reallocation when recaching after use (see copy constructor above)
     bool threadCached;
     bool globalCached;
+    bool reusing = false;
 };
 
 CIArrayOf<MySQLConnection> MySQLConnection::globalCachedConnections;
@@ -1088,218 +1307,14 @@ static void initializeMySqlThread()
     }
 }
 
-enum MySQLOptionParamType
-{
-    ParamTypeNone,
-    ParamTypeString,
-    ParamTypeUInt,
-    ParamTypeULong,
-    ParamTypeBool
-};
-
-struct MySQLOptionDefinition
-{
-    const char *name;
-    enum mysql_option option;
-    MySQLOptionParamType paramType;
-};
-
-#define addoption(a,b) { #a, a, b }
-
-MySQLOptionDefinition options[] =
-{
-    addoption(MYSQL_OPT_COMPRESS, ParamTypeNone),
-    addoption(MYSQL_OPT_CONNECT_TIMEOUT, ParamTypeUInt),
-    addoption(MYSQL_OPT_GUESS_CONNECTION, ParamTypeNone),
-    addoption(MYSQL_OPT_LOCAL_INFILE, ParamTypeUInt),
-    addoption(MYSQL_OPT_NAMED_PIPE, ParamTypeNone),
-    addoption(MYSQL_OPT_PROTOCOL, ParamTypeUInt),
-    addoption(MYSQL_OPT_READ_TIMEOUT, ParamTypeUInt),
-    addoption(MYSQL_OPT_RECONNECT, ParamTypeBool),
-    addoption(MYSQL_OPT_SSL_VERIFY_SERVER_CERT, ParamTypeBool),
-    addoption(MYSQL_OPT_USE_EMBEDDED_CONNECTION, ParamTypeNone),
-    addoption(MYSQL_OPT_USE_REMOTE_CONNECTION, ParamTypeNone),
-    addoption(MYSQL_OPT_USE_RESULT, ParamTypeNone),
-    addoption(MYSQL_OPT_WRITE_TIMEOUT, ParamTypeUInt),
-    addoption(MYSQL_READ_DEFAULT_FILE, ParamTypeString),
-    addoption(MYSQL_READ_DEFAULT_GROUP, ParamTypeString),
-    addoption(MYSQL_REPORT_DATA_TRUNCATION, ParamTypeBool),
-    addoption(MYSQL_SECURE_AUTH, ParamTypeBool),
-    addoption(MYSQL_SET_CHARSET_DIR, ParamTypeString),
-    addoption(MYSQL_SET_CHARSET_NAME, ParamTypeString),
-    addoption(MYSQL_SET_CLIENT_IP, ParamTypeString),
-    addoption(MYSQL_SHARED_MEMORY_BASE_NAME, ParamTypeString),
-#if MYSQL_VERSION_ID >= 50507
-    addoption(MYSQL_DEFAULT_AUTH, ParamTypeString),
-    addoption(MYSQL_PLUGIN_DIR, ParamTypeString),
-#endif
-#if (MYSQL_VERSION_ID >= 50601)
-    addoption(MYSQL_OPT_BIND, ParamTypeString),
-#endif
-#if (MYSQL_VERSION_ID >= 50603)
-    addoption(MYSQL_OPT_SSL_CA, ParamTypeString),
-    addoption(MYSQL_OPT_SSL_CAPATH, ParamTypeString),
-    addoption(MYSQL_OPT_SSL_CERT, ParamTypeString),
-    addoption(MYSQL_OPT_SSL_CIPHER, ParamTypeString),
-    addoption(MYSQL_OPT_SSL_CRL, ParamTypeString),
-    addoption(MYSQL_OPT_SSL_CRLPATH, ParamTypeString),
-    addoption(MYSQL_OPT_SSL_KEY, ParamTypeString),
-#endif
-#if (MYSQL_VERSION_ID >= 50606)
-    addoption(MYSQL_SERVER_PUBLIC_KEY, ParamTypeString),
-#endif
-#if (MYSQL_VERSION_ID >= 50527 && MYSQL_VERSION_ID < 50600) || MYSQL_VERSION_ID >= 50607
-    addoption(MYSQL_ENABLE_CLEARTEXT_PLUGIN, ParamTypeBool),
-#endif
-    addoption(MYSQL_INIT_COMMAND, ParamTypeString),
-#if (MYSQL_VERSION_ID >= 50610)
-    addoption(MYSQL_OPT_CAN_HANDLE_EXPIRED_PASSWORDS, ParamTypeBool),
-#endif
-#if (MYSQL_VERSION_ID >= 50703)
-    addoption(MYSQL_OPT_SSL_ENFORCE, ParamTypeBool),
-#endif
-#if (MYSQL_VERSION_ID >= 50709)
-    addoption(MYSQL_OPT_MAX_ALLOWED_PACKET, ParamTypeULong),
-    addoption(MYSQL_OPT_NET_BUFFER_LENGTH, ParamTypeULong),
-#endif
-#if (MYSQL_VERSION_ID >= 50710)
-    addoption(MYSQL_OPT_TLS_VERSION, ParamTypeString),
-#endif
-#if (MYSQL_VERSION_ID >= 50711)
-    addoption(MYSQL_OPT_SSL_MODE, ParamTypeUInt),
-#endif
-    { nullptr, (enum mysql_option) 0, ParamTypeNone }
-};
-
-static MySQLOptionDefinition &lookupOption(const char *optName)
-{
-    for (MySQLOptionDefinition *optDef = options; optDef->name != nullptr; optDef++)
-    {
-        if (stricmp(optName, optDef->name)==0)
-            return *optDef;
-    }
-    failx("Unknown option %s", optName);
-}
-
 class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
 {
 public:
     MySQLEmbedFunctionContext(const char *options)
       : nextParam(0)
     {
-        const char *server = "localhost";
-        const char *user = "";
-        const char *password = "";
-        const char *database = "";
-        bool hasMySQLOpt = false;
-        bool threadCache = false;
-        bool globalCache = true;
-        unsigned port = 0;
-        StringArray opts;
-        opts.appendList(options, ",");
-        ForEachItemIn(idx, opts)
-        {
-            const char *opt = opts.item(idx);
-            const char *val = strchr(opt, '=');
-            if (val)
-            {
-                StringBuffer optName(val-opt, opt);
-                val++;
-                if (stricmp(optName, "server")==0)
-                    server = val;   // Note that lifetime of val is adequate for this to be safe
-                else if (stricmp(optName, "port")==0)
-                    port = atoi(val);
-                else if (stricmp(optName, "user")==0)
-                    user = val;
-                else if (stricmp(optName, "password")==0)
-                    password = val;
-                else if (stricmp(optName, "database")==0)
-                    database = val;
-                else if (stricmp(optName, "cache")==0)
-                {
-                    if (clipStrToBool(val) || strieq(val, "thread"))
-                    {
-                        threadCache = true;
-                        globalCache = false;
-                    }
-                    else if (strieq(val, "global"))
-                        globalCache = true;
-                    else if (strieq(val, "none") || strieq(val, "false") || strieq(val, "off") || strieq(val, "0"))
-                        globalCache = false;
-                    else
-                        failx("Unknown cache option %s", val);
-                }
-                else if (strnicmp(optName, "MYSQL_", 6)==0)
-                    hasMySQLOpt = true;
-                else
-                    failx("Unknown option %s", optName.str());
-            }
-        }
         initializeMySqlThread();
-        conn.setown(MySQLConnection::findCachedConnection(options, threadCache, globalCache));
-        if (!conn)
-        {
-            MySQLConnection::clearThreadCache();
-            conn.setown(new MySQLConnection(mysql_init(NULL), options, threadCache, globalCache));
-            if (hasMySQLOpt)
-            {
-                ForEachItemIn(idx, opts)
-                {
-                    const char *opt = opts.item(idx);
-                    if (strnicmp(opt, "MYSQL_", 6)==0)
-                    {
-                        const char *val = strchr(opt, '=');
-                        StringBuffer optName(opt);
-                        if (val)
-                        {
-                            optName.setLength(val-opt);
-                            val++;
-                        }
-                        MySQLOptionDefinition &optDef = lookupOption(optName);
-                        int rc;
-                        if (optDef.paramType == ParamTypeNone)
-                        {
-                            if (val)
-                                failx("Option %s does not take a value", optName.str());
-                            rc = mysql_options(*conn, optDef.option, nullptr);
-                        }
-                        else
-                        {
-                            if (!val)
-                                failx("Option %s requires a value", optName.str());
-                            switch (optDef.paramType)
-                            {
-                            case ParamTypeString:
-                                rc = mysql_options(*conn, optDef.option, val);
-                                break;
-                            case ParamTypeUInt:
-                                {
-                                    unsigned int oval = strtoul(val, nullptr, 10);
-                                    rc = mysql_options(*conn, optDef.option, (const char *) &oval);
-                                    break;
-                                }
-                            case ParamTypeULong:
-                                {
-                                    unsigned long oval = strtoul(val, nullptr, 10);
-                                    rc = mysql_options(*conn, optDef.option, (const char *) &oval);
-                                    break;
-                                }
-                            case ParamTypeBool:
-                                {
-                                    my_bool oval = clipStrToBool(val);
-                                    rc = mysql_options(*conn, optDef.option, (const char *) &oval);
-                                    break;
-                                }
-                            }
-                        }
-                        if (rc)
-                            failx("Failed to set option %s (%s)", optName.str(), mysql_error(*conn));
-                    }
-                }
-            }
-            if (!mysql_real_connect(*conn, server, user, password, database, port, NULL, 0))
-                failx("Failed to connect (%s)", mysql_error(*conn));
-        }
+        conn.setown(MySQLConnection::findCachedConnection(options, false));
     }
     virtual bool getBooleanResult()
     {
@@ -1505,12 +1520,24 @@ public:
     virtual void compileEmbeddedScript(size32_t chars, const char *script)
     {
         size32_t len = rtlUtf8Size(chars, script);
-        Owned<MySQLStatement> stmt  = new MySQLStatement(mysql_stmt_init(*conn));
-        if (!*stmt)
-            fail("failed to create statement");
-        if (mysql_stmt_prepare(*stmt, script, len))
-            fail(mysql_stmt_error(*stmt));
-        stmtInfo.setown(new MySQLPreparedStatement(conn, stmt));
+        loop
+        {
+            Owned<MySQLStatement> stmt  = new MySQLStatement(mysql_stmt_init(*conn));
+            if (!*stmt)
+                fail("failed to create statement");
+            if (mysql_stmt_prepare(*stmt, script, len))
+            {
+                // If we get an error, it could be that the cached connection is stale - retry
+                if (conn->wasCached())
+                {
+                    conn.setown(conn->reopen());
+                    continue;
+                }
+                fail(mysql_stmt_error(*stmt));
+            }
+            stmtInfo.setown(new MySQLPreparedStatement(conn, stmt));
+            break;
+        }
     }
     virtual void callFunction()
     {