ソースを参照

Merge pull request #8998 from richardkchapman/mysql-cache-timeout

HPCC-16084 MySQL Plugin connection cache - retire old connections

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 年 前
コミット
59ba06c557
1 ファイル変更69 行追加3 行削除
  1. 69 3
      plugins/mysql/mysqlembed.cpp

+ 69 - 3
plugins/mysql/mysqlembed.cpp

@@ -39,6 +39,13 @@
 
 __declspec(noreturn) static void UNSUPPORTED(const char *feature) __attribute__((noreturn));
 
+// These should be made configurable at some point
+
+static const unsigned CACHE_CHECK_PERIOD = 10000;
+static const unsigned CACHE_TIMEOUT_PERIOD = 60000;
+static const unsigned CACHE_SIZE = 10;
+
+
 static void UNSUPPORTED(const char *feature)
 {
     throw MakeStringException(-1, "UNSUPPORTED feature: %s not supported in mysql plugin", feature);
@@ -97,8 +104,6 @@ class MySQLConnection;
 
 static __thread MySQLConnection *threadCachedConnection = nullptr;
 
-#define MAX_GLOBAL_CACHE 10
-
 enum MySQLOptionParamType
 {
     ParamTypeNone,
@@ -192,6 +197,13 @@ static MySQLOptionDefinition &lookupOption(const char *optName)
     failx("Unknown option %s", optName);
 }
 
+class MySQLConnectionCloserThread : public Thread
+{
+    virtual int run() override;
+public:
+    static Semaphore closing;
+} *connectionCloserThread = nullptr;
+
 
 class MySQLConnection : public CInterface
 {
@@ -202,6 +214,7 @@ public:
             cacheOptions = strdup(_cacheOptions);
         else
             cacheOptions = nullptr;
+        created = msTick();
     }
     ~MySQLConnection()
     {
@@ -217,13 +230,18 @@ public:
                 else // globalCached
                 {
                     CriticalBlock b(globalCacheCrit);
-                    if (globalCachedConnections.length()==MAX_GLOBAL_CACHE)
+                    if (globalCachedConnections.length()==CACHE_SIZE)
                     {
                         MySQLConnection &goer = globalCachedConnections.popGet();
                         goer.globalCached = false;  // Make sure we don't recache it!
                         goer.Release();
                     }
                     globalCachedConnections.add(*cacheEntry.getClear(), 0);
+                    if (!connectionCloserThread)
+                    {
+                        connectionCloserThread = new MySQLConnectionCloserThread;
+                        connectionCloserThread->start();
+                    }
                 }
             }
             else
@@ -404,6 +422,21 @@ public:
         return findCachedConnection(cacheOptions, true);
     }
 
+    static void retireCache(unsigned maxAge)
+    {
+        CriticalBlock b(globalCacheCrit);
+        unsigned now = msTick();
+        ForEachItemInRev(idx, globalCachedConnections)
+        {
+            MySQLConnection &cached = globalCachedConnections.item(idx);
+            if (now - cached.created > maxAge)
+            {
+                cached.globalCached = false;  // Make sure we don't re-add it!
+                globalCachedConnections.remove(idx);
+            }
+        }
+    }
+
 private:
     MySQLConnection(const MySQLConnection &from)
     {
@@ -412,6 +445,7 @@ private:
         threadCached = from.threadCached;
         globalCached = from.globalCached;
         reusing = true;
+        created = msTick();
     }
 
     static CIArrayOf<MySQLConnection> globalCachedConnections;
@@ -419,6 +453,7 @@ private:
 
     MYSQL *conn;
     const char *cacheOptions;  // Not done as a StringAttr, in order to avoid reallocation when recaching after use (see copy constructor above)
+    unsigned created;
     bool threadCached;
     bool globalCached;
     bool reusing = false;
@@ -427,6 +462,37 @@ private:
 CIArrayOf<MySQLConnection> MySQLConnection::globalCachedConnections;
 CriticalSection MySQLConnection::globalCacheCrit;
 
+Semaphore MySQLConnectionCloserThread::closing;
+
+int MySQLConnectionCloserThread::run()
+{
+    loop
+    {
+        if (closing.wait(CACHE_CHECK_PERIOD))
+        {
+            break;
+        }
+        MySQLConnection::retireCache(CACHE_TIMEOUT_PERIOD);
+    }
+    return 0;
+}
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    return true;
+}
+
+MODULE_EXIT()
+{
+    if (connectionCloserThread)
+    {
+        MySQLConnectionCloserThread::closing.signal();
+        connectionCloserThread->join();
+        connectionCloserThread->Release();
+    }
+}
+
+
 class MySQLResult : public CInterface
 {
 public: