|
@@ -26,6 +26,10 @@
|
|
|
#include "eclrtl.hpp"
|
|
|
#include "eclrtl_imp.hpp"
|
|
|
|
|
|
+#include <map>
|
|
|
+#include <mutex>
|
|
|
+#include <thread>
|
|
|
+
|
|
|
static const char *g_moduleName = "couchbase";
|
|
|
static const char *g_moduleDescription = "Couchbase Embed Helper";
|
|
|
static const char *g_version = "Couchbase Embed Helper 1.0.0";
|
|
@@ -45,7 +49,7 @@ extern "C" COUCHBASEEMBED_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefini
|
|
|
pb->magicVersion = PLUGIN_VERSION;
|
|
|
pb->version = g_version;
|
|
|
pb->moduleName = g_moduleName;
|
|
|
- pb->ECL = NULL;
|
|
|
+ pb->ECL = nullptr;
|
|
|
pb->flags = PLUGIN_IMPLICIT_MODULE;
|
|
|
pb->description = g_moduleDescription;
|
|
|
return true;
|
|
@@ -53,6 +57,9 @@ extern "C" COUCHBASEEMBED_PLUGIN_API bool getECLPluginDefinition(ECLPluginDefini
|
|
|
|
|
|
namespace couchbaseembed
|
|
|
{
|
|
|
+ const time_t OBJECT_EXPIRE_TIMEOUT_SECONDS = 60 * 2; // Two minutes
|
|
|
+ static std::once_flag connectionCacheInitFlag;
|
|
|
+
|
|
|
//--------------------------------------------------------------------------
|
|
|
// Plugin Classes
|
|
|
//--------------------------------------------------------------------------
|
|
@@ -97,7 +104,7 @@ namespace couchbaseembed
|
|
|
|
|
|
const void * CouchbaseRowStream::nextRow()
|
|
|
{
|
|
|
- const void * result = NULL;
|
|
|
+ const void * result = nullptr;
|
|
|
if (m_shouldRead && m_currentRow < m_Rows.length())
|
|
|
{
|
|
|
auto json = m_Rows.item(m_currentRow++);
|
|
@@ -360,6 +367,324 @@ namespace couchbaseembed
|
|
|
return thisParam++;
|
|
|
}
|
|
|
|
|
|
+ static class ConnectionCacheObj
|
|
|
+ {
|
|
|
+ private:
|
|
|
+
|
|
|
+ typedef std::vector<CouchbaseConnection*> ConnectionList;
|
|
|
+ typedef std::map<hash64_t, ConnectionList> ObjMap;
|
|
|
+
|
|
|
+ public:
|
|
|
+
|
|
|
+ ConnectionCacheObj(int _traceLevel)
|
|
|
+ : traceLevel(_traceLevel)
|
|
|
+ {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ ~ConnectionCacheObj()
|
|
|
+ {
|
|
|
+ deleteAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ void deleteAll()
|
|
|
+ {
|
|
|
+ CriticalBlock block(cacheLock);
|
|
|
+
|
|
|
+ // Delete all idle connection objects
|
|
|
+ for (ObjMap::iterator keyIter = idleConnections.begin(); keyIter != idleConnections.end(); keyIter++)
|
|
|
+ {
|
|
|
+ for (ConnectionList::iterator connectionIter = keyIter->second.begin(); connectionIter != keyIter->second.end(); connectionIter++)
|
|
|
+ {
|
|
|
+ if (*connectionIter)
|
|
|
+ {
|
|
|
+ delete(*connectionIter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ idleConnections.clear();
|
|
|
+
|
|
|
+ // Delete all active connection objects
|
|
|
+ for (ObjMap::iterator keyIter = activeConnections.begin(); keyIter != activeConnections.end(); keyIter++)
|
|
|
+ {
|
|
|
+ for (ConnectionList::iterator connectionIter = keyIter->second.begin(); connectionIter != keyIter->second.end(); connectionIter++)
|
|
|
+ {
|
|
|
+ if (*connectionIter)
|
|
|
+ {
|
|
|
+ delete(*connectionIter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ activeConnections.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ void releaseActive(CouchbaseConnection* connectionPtr)
|
|
|
+ {
|
|
|
+ CriticalBlock block(cacheLock);
|
|
|
+
|
|
|
+ // Find given connection in our active list and move it to our
|
|
|
+ // idle list
|
|
|
+ for (ObjMap::iterator keyIter = activeConnections.begin(); keyIter != activeConnections.end(); keyIter++)
|
|
|
+ {
|
|
|
+ for (ConnectionList::iterator connectionIter = keyIter->second.begin(); connectionIter != keyIter->second.end(); connectionIter++)
|
|
|
+ {
|
|
|
+ if (*connectionIter == connectionPtr)
|
|
|
+ {
|
|
|
+ connectionPtr->updateTimeTouched();
|
|
|
+ keyIter->second.erase(connectionIter);
|
|
|
+ idleConnections[keyIter->first].push_back(connectionPtr);
|
|
|
+
|
|
|
+ if (traceLevel > 4)
|
|
|
+ {
|
|
|
+ DBGLOG("Couchbase: Released connection object %p", connectionPtr);
|
|
|
+ }
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void expire()
|
|
|
+ {
|
|
|
+ if (!idleConnections.empty())
|
|
|
+ {
|
|
|
+ CriticalBlock block(cacheLock);
|
|
|
+
|
|
|
+ time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
|
|
|
+ __int32 expireCount = 0;
|
|
|
+
|
|
|
+ for (ObjMap::iterator keyIter = idleConnections.begin(); keyIter != idleConnections.end(); keyIter++)
|
|
|
+ {
|
|
|
+ ConnectionList::iterator connectionIter = keyIter->second.begin();
|
|
|
+
|
|
|
+ while (connectionIter != keyIter->second.end())
|
|
|
+ {
|
|
|
+ if (*connectionIter)
|
|
|
+ {
|
|
|
+ if ((*connectionIter)->getTimeTouched() < oldestAllowedTime)
|
|
|
+ {
|
|
|
+ delete(*connectionIter);
|
|
|
+ connectionIter = keyIter->second.erase(connectionIter);
|
|
|
+ ++expireCount;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ ++connectionIter;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ connectionIter = keyIter->second.erase(connectionIter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (traceLevel > 4 && expireCount > 0)
|
|
|
+ {
|
|
|
+ DBGLOG("Couchbase: Expired %d cached connection%s", expireCount, (expireCount == 1 ? "" : "s"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ CouchbaseConnection* getConnection(bool useSSL, const char * host, unsigned port, const char * bucketname, const char * password, const char * connOptions, unsigned int maxConnections)
|
|
|
+ {
|
|
|
+ CouchbaseConnection* connectionObjPtr = nullptr;
|
|
|
+ StringBuffer connectionString;
|
|
|
+
|
|
|
+ CouchbaseConnection::makeConnectionString(useSSL, host, port, bucketname, connOptions, connectionString);
|
|
|
+
|
|
|
+ // Use a hash of the connection string as the key to finding
|
|
|
+ // any idle connection objects
|
|
|
+ hash64_t key = rtlHash64VStr(connectionString.str(), 0);
|
|
|
+
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ {
|
|
|
+ CriticalBlock block(cacheLock);
|
|
|
+ ConnectionList& idleConnectionList = idleConnections[key];
|
|
|
+
|
|
|
+ if (!idleConnectionList.empty())
|
|
|
+ {
|
|
|
+ // We have at least one idle connection; use that
|
|
|
+ connectionObjPtr = idleConnectionList.back();
|
|
|
+ idleConnectionList.pop_back();
|
|
|
+
|
|
|
+ connectionObjPtr->updateTimeTouched();
|
|
|
+
|
|
|
+ // Push the connection object onto our active list
|
|
|
+ activeConnections[key].push_back(connectionObjPtr);
|
|
|
+
|
|
|
+ if (traceLevel > 4)
|
|
|
+ {
|
|
|
+ DBGLOG("Couchbase: Using cached connection object %p: %s", connectionObjPtr, connectionString.str());
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ else if (maxConnections == 0 || activeConnections[key].size() < maxConnections)
|
|
|
+ {
|
|
|
+ // No idle connections but we don't have to wait for
|
|
|
+ // one; exit the loop and create a new connection
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // We can't exit the loop and allow a new connection to
|
|
|
+ // be created because there are too many active
|
|
|
+ // connections already; wait for a short while
|
|
|
+ // and try again
|
|
|
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!connectionObjPtr)
|
|
|
+ {
|
|
|
+ // An idle connection for that particular combination of
|
|
|
+ // options does not exist so we need to create one;
|
|
|
+ // use a small loop to retry connections if necessary
|
|
|
+ unsigned int connectAttempt = 0;
|
|
|
+ unsigned int MAX_ATTEMPTS = 10;
|
|
|
+ useconds_t SLEEP_TIME = 100 + (rand() % 200);
|
|
|
+
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ connectionObjPtr = new CouchbaseConnection(connectionString, password);
|
|
|
+ connectionObjPtr->connect();
|
|
|
+
|
|
|
+ if (connectionObjPtr->getConnectionStatus().success())
|
|
|
+ {
|
|
|
+ {
|
|
|
+ // Push new connection object onto our active list
|
|
|
+ CriticalBlock block(cacheLock);
|
|
|
+
|
|
|
+ connectionObjPtr->updateTimeTouched();
|
|
|
+ ConnectionList& activeConnectionList = activeConnections[key];
|
|
|
+ activeConnectionList.push_back(connectionObjPtr);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (traceLevel > 4)
|
|
|
+ {
|
|
|
+ DBGLOG("Couchbase: Created and cached new connection object %p: %s", connectionObjPtr, connectionString.str());
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ else if (connectionObjPtr->getConnectionStatus().isTemporary())
|
|
|
+ {
|
|
|
+ ++connectAttempt;
|
|
|
+ if (connectAttempt < MAX_ATTEMPTS)
|
|
|
+ {
|
|
|
+ // According to libcouchbase-cxx, we need
|
|
|
+ // to destroy the connection object if
|
|
|
+ // there has been a failure of any kind
|
|
|
+ delete(connectionObjPtr);
|
|
|
+ connectionObjPtr = nullptr;
|
|
|
+ std::this_thread::sleep_for(std::chrono::microseconds(SLEEP_TIME));
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // Capture the final failure reason and
|
|
|
+ // destroy the connection object before
|
|
|
+ // throwing an error
|
|
|
+ std::string reason = connectionObjPtr->getConnectionStatus().description();
|
|
|
+
|
|
|
+ delete(connectionObjPtr);
|
|
|
+ connectionObjPtr = nullptr;
|
|
|
+
|
|
|
+ failx("Failed to connect to couchbase instance: %s Reason: '%s'", connectionString.str(), reason.c_str());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // Capture the final failure reason and
|
|
|
+ // destroy the connection object before
|
|
|
+ // throwing an error
|
|
|
+ std::string reason = connectionObjPtr->getConnectionStatus().description();
|
|
|
+
|
|
|
+ delete(connectionObjPtr);
|
|
|
+ connectionObjPtr = nullptr;
|
|
|
+
|
|
|
+ failx("Failed to connect to couchbase instance: %s Reason: '%s'", connectionString.str(), reason.c_str());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!connectionObjPtr)
|
|
|
+ {
|
|
|
+ failx("Couchbase: Unable to create connection: %s", connectionString.str());
|
|
|
+ }
|
|
|
+
|
|
|
+ return connectionObjPtr;
|
|
|
+ }
|
|
|
+
|
|
|
+ private:
|
|
|
+
|
|
|
+ ObjMap idleConnections; //!< std::map of created CouchbaseConnection object pointers
|
|
|
+ ObjMap activeConnections; //!< std::map of created CouchbaseConnection object pointers
|
|
|
+ CriticalSection cacheLock; //!< Mutex guarding modifications to connection pools
|
|
|
+ int traceLevel; //!< The current logging level
|
|
|
+ } *connectionCache;
|
|
|
+
|
|
|
+ static class ConnectionCacheExpirerObj : public Thread
|
|
|
+ {
|
|
|
+ public:
|
|
|
+
|
|
|
+ ConnectionCacheExpirerObj()
|
|
|
+ : Thread("Couchbase::ConnectionCacheExpirer"),
|
|
|
+ shouldRun(false)
|
|
|
+ {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void start()
|
|
|
+ {
|
|
|
+ if (!isAlive())
|
|
|
+ {
|
|
|
+ shouldRun = true;
|
|
|
+ Thread::start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void stop()
|
|
|
+ {
|
|
|
+ if (isAlive())
|
|
|
+ {
|
|
|
+ shouldRun = false;
|
|
|
+ join();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual int run()
|
|
|
+ {
|
|
|
+ // Periodically delete connections that have been idle too long
|
|
|
+ while (shouldRun)
|
|
|
+ {
|
|
|
+ if (connectionCache)
|
|
|
+ {
|
|
|
+ connectionCache->expire();
|
|
|
+ }
|
|
|
+
|
|
|
+ std::this_thread::sleep_for(std::chrono::microseconds(1000));
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ private:
|
|
|
+
|
|
|
+ std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
|
|
|
+ } *connectionCacheExpirer;
|
|
|
+
|
|
|
+ static void setupConnectionCache(int traceLevel)
|
|
|
+ {
|
|
|
+ couchbaseembed::connectionCache = new couchbaseembed::ConnectionCacheObj(traceLevel);
|
|
|
+
|
|
|
+ couchbaseembed::connectionCacheExpirer = new couchbaseembed::ConnectionCacheExpirerObj;
|
|
|
+ couchbaseembed::connectionCacheExpirer->start();
|
|
|
+ }
|
|
|
+
|
|
|
CouchbaseEmbedFunctionContext::CouchbaseEmbedFunctionContext(const IContextLogger &_logctx, const char *options, unsigned _flags)
|
|
|
: logctx(_logctx), m_NextRow(), m_nextParam(0), m_numParams(0), m_scriptFlags(_flags)
|
|
|
{
|
|
@@ -373,6 +698,7 @@ namespace couchbaseembed
|
|
|
unsigned port = 8091;
|
|
|
bool useSSL = false;
|
|
|
StringBuffer connectionOptions;
|
|
|
+ unsigned int maxConnections = 0;
|
|
|
|
|
|
StringArray inputOptions;
|
|
|
inputOptions.appendList(options, ",");
|
|
@@ -389,28 +715,31 @@ namespace couchbaseembed
|
|
|
else if (stricmp(optName, "port")==0)
|
|
|
port = atoi(val);
|
|
|
else if (stricmp(optName, "user")==0)
|
|
|
- user = val;
|
|
|
+ user = val; // This is not used but retained for backwards-compatibility
|
|
|
else if (stricmp(optName, "password")==0)
|
|
|
password = val;
|
|
|
else if (stricmp(optName, "bucket")==0)
|
|
|
bucketname = val;
|
|
|
else if (stricmp(optName, "useSSL")==0)
|
|
|
useSSL = clipStrToBool(val);
|
|
|
+ else if (stricmp(optName, "max_connections")==0)
|
|
|
+ maxConnections = atoi(val);
|
|
|
|
|
|
//Connection String options
|
|
|
else if (stricmp(optName, "detailed_errcodes")==0
|
|
|
|| stricmp(optName, "operation_timeout")==0
|
|
|
|| stricmp(optName, "config_total_timeout")==0
|
|
|
- || stricmp(optName, "http_poolsize")==0
|
|
|
- || stricmp(optName, "detailed_errcodes")==0)
|
|
|
+ || stricmp(optName, "http_poolsize")==0)
|
|
|
connectionOptions.appendf("%s%s=%s", connectionOptions.length() == 0 ? "?" : "&", optName.str(), val);
|
|
|
else
|
|
|
failx("Unknown option %s", optName.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- m_oCBConnection.setown(new CouchbaseConnection(useSSL, server, port, bucketname, user, password, connectionOptions.str()));
|
|
|
- m_oCBConnection->connect();
|
|
|
+ std::call_once(connectionCacheInitFlag, setupConnectionCache, logctx.queryTraceLevel());
|
|
|
+
|
|
|
+ // Get a cached idle connection or create a new one
|
|
|
+ m_oCBConnection = connectionCache->getConnection(useSSL, server, port, bucketname, password, connectionOptions.str(), maxConnections);
|
|
|
}
|
|
|
|
|
|
CouchbaseEmbedFunctionContext::~CouchbaseEmbedFunctionContext()
|
|
@@ -426,6 +755,14 @@ namespace couchbaseembed
|
|
|
delete m_pQuery;
|
|
|
m_pQuery = nullptr;
|
|
|
}
|
|
|
+
|
|
|
+ if (m_oCBConnection)
|
|
|
+ {
|
|
|
+ // When the context is deleted we should return any connection
|
|
|
+ // object back to idle status
|
|
|
+ connectionCache->releaseActive(m_oCBConnection);
|
|
|
+ m_oCBConnection = nullptr;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
IPropertyTree * CouchbaseEmbedFunctionContext::nextResultRowTree()
|
|
@@ -1195,3 +1532,30 @@ namespace couchbaseembed
|
|
|
return true; // TO-DO
|
|
|
}
|
|
|
} // namespace
|
|
|
+
|
|
|
+MODULE_INIT(INIT_PRIORITY_STANDARD)
|
|
|
+{
|
|
|
+ couchbaseembed::connectionCache = nullptr;
|
|
|
+ couchbaseembed::connectionCacheExpirer = nullptr;
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+MODULE_EXIT()
|
|
|
+{
|
|
|
+ // Delete the background thread expiring items from the CouchbaseConnection
|
|
|
+ // cache before deleting the connection cache
|
|
|
+ if (couchbaseembed::connectionCacheExpirer)
|
|
|
+ {
|
|
|
+ couchbaseembed::connectionCacheExpirer->stop();
|
|
|
+ delete(couchbaseembed::connectionCacheExpirer);
|
|
|
+ couchbaseembed::connectionCacheExpirer = nullptr;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (couchbaseembed::connectionCache)
|
|
|
+ {
|
|
|
+ couchbaseembed::connectionCache->deleteAll();
|
|
|
+ delete(couchbaseembed::connectionCache);
|
|
|
+ couchbaseembed::connectionCache = nullptr;
|
|
|
+ }
|
|
|
+}
|