Browse Source

HPCC-14420 Dali should initialize cassandra data store

Introduce a general mechanism for plugins that partially replace dali
functionality.

Don't overwrite version info if existing.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
a13f1c0492

+ 1 - 1
common/dllserver/thorplugin.cpp

@@ -142,7 +142,7 @@ HINSTANCE HelperDll::getInstance() const
 
 void * HelperDll::getEntry(const char * name) const
 {
-    return GetSharedProcedure(so.getInstanceHandle(), name);
+    return so.getEntry(name);
 }
 
 bool HelperDll::IsShared()

+ 17 - 17
common/workunit/workunit.cpp

@@ -2686,6 +2686,10 @@ public:
     {
         removeShutdownHook(*this);
     }
+    virtual bool initializeStore()
+    {
+        throwUnexpected(); // Used when loading a plugin factory - not applicable here
+    }
     virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
     {
         return new CDaliWorkUnitWatcher(subscriber, options, wuid);
@@ -3082,7 +3086,6 @@ extern WORKUNIT_API IConstWorkUnitIterator *createSecureConstWUIterator(IPropert
 
 
 static CriticalSection factoryCrit;
-static Owned<ILoadedDllEntry> workunitServerPlugin;  // NOTE - unload AFTER the factory is released!
 static Owned<IWorkUnitFactory> factory;
 
 void CDaliWorkUnitFactory::clientShutdown()
@@ -3113,24 +3116,17 @@ extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory()
         {
             const char *forceEnv = getenv("FORCE_DALI_WORKUNITS");
             bool forceDali = forceEnv && !strieq(forceEnv, "off") && !strieq(forceEnv, "0");
-            Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software/WorkUnitsServer", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-            // MORE - arguably should be looking in the config section that corresponds to the dali we connected to. If you want to allow some dalis to be configured to use a WU server and others not.
-            if (conn && !forceDali)
+            Owned<IRemoteConnection> env = querySDS().connect("/Environment", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
+            IPropertyTree *pluginInfo = NULL;
+            if (env)
             {
-                const IPropertyTree *ptree = conn->queryRoot();
-                const char *pluginName = ptree->queryProp("@plugin");
-                if (!pluginName)
-                    throw makeStringException(WUERR_WorkunitPluginError, "WorkUnitsServer information missing plugin name");
-                workunitServerPlugin.setown(createDllEntry(pluginName, false, NULL));
-                if (!workunitServerPlugin)
-                    throw makeStringExceptionV(WUERR_WorkunitPluginError, "WorkUnitsServer: failed to load plugin %s", pluginName);
-                WorkUnitFactoryFactory pf = (WorkUnitFactoryFactory) workunitServerPlugin->getEntry("createWorkUnitFactory");
-                if (!pf)
-                    throw makeStringExceptionV(WUERR_WorkunitPluginError, "WorkUnitsServer: function createWorkUnitFactory not found in plugin %s", pluginName);
-                factory.setown(pf(ptree));
-                if (!factory)
-                    throw makeStringExceptionV(WUERR_WorkunitPluginError, "WorkUnitsServer: createWorkUnitFactory returned NULL in plugin %s", pluginName);
+                SocketEndpoint targetDali = queryCoven().queryGroup().queryNode(0).endpoint();
+                IPropertyTree *daliInfo = findDaliProcess(env->queryRoot(), targetDali);
+                if (daliInfo)
+                    pluginInfo = daliInfo->queryPropTree("Plugin[@type='WorkunitServer']");
             }
+            if (pluginInfo && !forceDali)
+                factory.setown( (IWorkUnitFactory *) loadPlugin(pluginInfo));
             else
                 factory.setown(new CDaliWorkUnitFactory());
         }
@@ -3150,6 +3146,10 @@ public:
         : baseFactory(_baseFactory), defaultSecMgr(_secMgr), defaultSecUser(_secUser)
     {
     }
+    virtual bool initializeStore()
+    {
+        throwUnexpected(); // Used when loading a plugin factory - not applicable here
+    }
     virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
     {
         return baseFactory->getWatcher(subscriber, options, wuid);

+ 1 - 1
common/workunit/workunit.hpp

@@ -1249,7 +1249,7 @@ enum WUQuerySortField
 typedef IIteratorOf<IPropertyTree> IConstQuerySetQueryIterator;
 
 
-interface IWorkUnitFactory : extends IInterface
+interface IWorkUnitFactory : extends IPluggableFactory
 {
     virtual IWorkUnit *createWorkUnit(const char *app, const char *scope, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;
     virtual bool deleteWorkUnit(const char *wuid, ISecManager *secmgr = NULL, ISecUser *secuser = NULL) = 0;

+ 23 - 2
dali/base/daclient.cpp

@@ -231,6 +231,27 @@ void disconnectLogMsgListenerFromDali()
         disconnectLogMsgListenerFromChild(&servers.queryNode(idx));
 }
 
+IPropertyTree *findDaliProcess(IPropertyTree *env, const SocketEndpoint &dali)
+{
+    Owned<IPropertyTreeIterator> dalis = env->getElements("Software/DaliServerProcess");
+    ForEach(*dalis)
+    {
+        IPropertyTree *cur = &dalis->query();
+        Owned<IPropertyTreeIterator> instances = cur->getElements("Instance");
+        ForEach(*instances)
+        {
+            IPropertyTree *inst = &instances->query();
+            const char *ps = inst->queryProp("@port");
+            unsigned port = ps?atoi(ps):0;
+            if (!port)
+                port = DALI_SERVER_PORT;
+            SocketEndpoint daliep(inst->queryProp("@netAddress"),port);
+            if (dali.equals(daliep))
+                return cur;;
+        }
+    }
+    return NULL;
+}
 
 bool updateDaliEnv(IPropertyTree *env, bool forceGroupUpdate, const char *daliIp)
 {
@@ -241,11 +262,11 @@ bool updateDaliEnv(IPropertyTree *env, bool forceGroupUpdate, const char *daliIp
     }
     SocketEndpoint daliep;
     loop {
-        const char *ps = dalis->get().queryProp("@port");
+        const char *ps = dalis->query().queryProp("@port");
         unsigned port = ps?atoi(ps):0;
         if (!port)
             port = DALI_SERVER_PORT;
-        daliep.set(dalis->get().queryProp("@netAddress"),port);
+        daliep.set(dalis->query().queryProp("@netAddress"),port);
         if (daliIp && *daliIp) {
             SocketEndpoint testep;
             testep.set(daliIp,DALI_SERVER_PORT);

+ 3 - 1
dali/base/daclient.hpp

@@ -59,9 +59,11 @@ extern da_decl void disconnectLogMsgManagerFromDali();
 extern da_decl void connectLogMsgListenerToDali();
 extern da_decl void disconnectLogMsgListenerFromDali();
 
-// initates client session and updates dali pointed to by environment, unless daliIp supplied
+// initiates client session and updates dali pointed to by environment, unless daliIp supplied
 extern da_decl bool updateDaliEnv(IPropertyTree *env, bool updateDaliEnv=false, const char *daliIp=NULL);
 
+// Find the environment section for a given dali instance
+extern da_decl IPropertyTree *findDaliProcess(IPropertyTree *env, const SocketEndpoint &dali);
 
 
 // the class below fills in the Status/Servers branch

+ 13 - 0
dali/base/dasds.cpp

@@ -6015,6 +6015,19 @@ void CCovenSDSManager::loadStore(const char *storeName, const bool *abort)
             if (0 != stricmp("Environment", envTree->queryName()))
                 throw MakeStringException(0, "External environment file '%s', has '%s' as root, expecting a 'Environment' xml node.", environment, envTree->queryName());
 
+            Owned <IMPServer> thisDali = getMPServer();
+            assertex(thisDali);
+            IPropertyTree *thisDaliInfo = findDaliProcess(envTree, thisDali->queryMyNode()->endpoint());
+            assertex(thisDaliInfo);
+            Owned<IPropertyTreeIterator> plugins = thisDaliInfo->getElements("Plugin");
+            ForEach(*plugins)
+            {
+                Owned<IPluggableFactory> factory = loadPlugin(&plugins->query());
+                assertex (factory);
+                if (!factory->initializeStore())
+                    throw MakeStringException(0, "Failed to initialize plugin store '%s'", plugins->query().queryProp("@name"));
+            }
+
             oldEnvironment.setown(root->getPropTree("Environment"));
             root->removeTree(oldEnvironment);
             root->addPropTree("Environment", envTree.getClear());

+ 22 - 12
plugins/cassandra/cassandrawu.cpp

@@ -2973,7 +2973,7 @@ class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandr
 {
     IMPLEMENT_IINTERFACE;
 public:
-    CCasssandraWorkUnitFactory(const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
+    CCasssandraWorkUnitFactory(const SharedObject *_dll, const IPropertyTree *props) : cluster(cass_cluster_new()), randomizeSuffix(0), randState((unsigned) get_cycles_now()), cacheRetirer(*this)
     {
         StringArray options;
         Owned<IPTreeIterator> it = props->getElements("Option");
@@ -3030,6 +3030,7 @@ public:
             cluster.disconnect();
         }
         cacheRetirer.start();
+        LINK(_dll);  // Yes, this leaks. Not really sure how to avoid that.
     }
 
     ~CCasssandraWorkUnitFactory()
@@ -3039,6 +3040,11 @@ public:
         if (traceLevel)
             DBGLOG("CCasssandraWorkUnitFactory destroyed");
     }
+    virtual bool initializeStore()
+    {
+        createRepository();
+        return true;
+    }
     virtual IWorkUnitWatcher *getWatcher(IWorkUnitSubscriber *subscriber, WUSubscribeOptions options, const char *wuid) const
     {
         return new CCassandraWorkUnitWatcher(subscriber, options, wuid);
@@ -3628,7 +3634,7 @@ public:
         CassandraSession s(cass_session_new());
         CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
         future.wait("connect without keyspace");
-        VStringBuffer create("CREATE KEYSPACE %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' } ;", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
+        VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
         executeSimpleCommand(s, create);
         s.set(NULL);
         cluster.connect();
@@ -3661,16 +3667,20 @@ private:
     {
         StringBuffer schema;
         executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
-        VStringBuffer versionInfo("<Version major='%d' minor='%d'/>", majorVersion, minorVersion);
-        CassandraBatch versionBatch(cass_batch_new(CASS_BATCH_TYPE_LOGGED));
-        Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
-        for (int i = 0; i < NUM_PARTITIONS; i++)
+        Owned<IPTree> oldVersion = getVersionInfo();
+        if (!oldVersion)
         {
-            pt->setPropInt("@partition", i);
-            simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
+            VStringBuffer versionInfo("<Version major='%d' minor='%d'/>", majorVersion, minorVersion);
+            CassandraBatch versionBatch(cass_batch_new(CASS_BATCH_TYPE_LOGGED));
+            Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
+            for (int i = 0; i < NUM_PARTITIONS; i++)
+            {
+                pt->setPropInt("@partition", i);
+                simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
+            }
+            CassandraFuture futureBatch(cass_session_execute_batch(querySession(), versionBatch));
+            futureBatch.wait("createVersionTable");
         }
-        CassandraFuture futureBatch(cass_session_execute_batch(querySession(), versionBatch));
-        futureBatch.wait("createVersionTable");
     }
     IPTree *getVersionInfo()
     {
@@ -4212,7 +4222,7 @@ private:
 
 } // namespace
 
-extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const IPropertyTree *props)
+extern "C" EXPORT IWorkUnitFactory *createWorkUnitFactory(const SharedObject *dll, const IPropertyTree *props)
 {
-    return new cassandraembed::CCasssandraWorkUnitFactory(props);
+    return new cassandraembed::CCasssandraWorkUnitFactory(dll, props);
 }

+ 24 - 0
system/jlib/jutil.cpp

@@ -479,6 +479,10 @@ bool SharedObject::loadCurrentExecutable()
     return true;
 }
 
+void *SharedObject::getEntry(const char * name) const
+{
+    return GetSharedProcedure(getInstanceHandle(), name);
+}
 
 void SharedObject::unload()
 {
@@ -488,6 +492,26 @@ void SharedObject::unload()
 
 //-----------------------------------------------------------------------
 
+IPluggableFactory *loadPlugin(const IPropertyTree *pluginInfo)
+{
+    const char *pluginName = pluginInfo->queryProp("@name");
+    const char *entrypoint = pluginInfo->queryProp("@entrypoint");
+    if (!pluginName || !entrypoint)
+        throw makeStringException(0, "Plugin information missing plugin name or entrypoint");
+    Owned<SharedObject> pluginDll = new SharedObject;
+    if (!pluginDll->load(pluginName, false, true))
+        throw makeStringExceptionV(0, "Failed to load plugin %s", pluginName);
+    IPluggableFactoryFactory pf = (IPluggableFactoryFactory) pluginDll->getEntry(entrypoint);
+    if (!pf)
+        throw makeStringExceptionV(0, "Function %s not found in plugin %s", entrypoint,  pluginName);
+    IPluggableFactory *factory =  pf(pluginDll, pluginInfo);
+    if (!factory)
+        throw makeStringExceptionV(0, "Factory function %s returned NULL in plugin %s", entrypoint, pluginName);
+    return factory;
+}
+
+//-----------------------------------------------------------------------
+
 /*
 
   We use a 64 bit number for generating temporaries so that we are unlikely to get any

+ 15 - 1
system/jlib/jutil.hpp

@@ -59,7 +59,7 @@ int jlib_decl numtostr(char *dst, unsigned __int64 _value);
 extern jlib_decl HINSTANCE LoadSharedObject(const char *name, bool isGlobal, bool raiseOnError);
 extern jlib_decl void FreeSharedObject(HINSTANCE h);
 
-class jlib_decl SharedObject
+class jlib_decl SharedObject : public CInterfaceOf<IInterface>
 {
 public:
     SharedObject()      { h = 0; bRefCounted = false; }
@@ -70,11 +70,25 @@ public:
     bool loaded()       { return h != 0; }
     void unload();
     HINSTANCE getInstanceHandle() const { return h; }
+    void *getEntry(const char * name) const;
+
 public:
     HINSTANCE       h;
     bool bRefCounted;
 };
 
+// Interface for dynamically-loadable plugins
+
+interface IPluggableFactory : extends IInterface
+{
+    virtual bool initializeStore() = 0;
+};
+
+typedef IPluggableFactory * (* IPluggableFactoryFactory)(const SharedObject *dll, const IPropertyTree *);
+
+extern jlib_decl IPluggableFactory *loadPlugin(const IPropertyTree* pluginInfo);
+
+
 //---------------------------------------------------------------------------
 
 //functions for generating unique identifiers consisting of 0..9,A..V