浏览代码

Merge pull request #2711 from richardkchapman/roxie-version-gh-2706

gh-2706 Ability to verify same queries are on all roxie nodes

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 13 年之前
父节点
当前提交
b7e01ad008
共有 2 个文件被更改,包括 46 次插入17 次删除
  1. 44 15
      roxie/ccd/ccdstate.cpp
  2. 2 2
      roxie/ccd/ccdstate.hpp

+ 44 - 15
roxie/ccd/ccdstate.cpp

@@ -735,19 +735,22 @@ protected:
     unsigned channelNo;
     bool active;
 
-    void addQuery(const char *id, IQueryFactory *n)
+    void addQuery(const char *id, IQueryFactory *n, hash64_t &hash)
     {
+        hash = rtlHash64Data(sizeof(hash), &hash, n->queryHash());
         queries.setValue(id, n);
         n->Release();  // setValue links
     }
 
-    void addAlias(const char *alias, const char *original)
+    void addAlias(const char *alias, const char *original, hash64_t &hash)
     {
         if (original && alias)
         {
             IQueryFactory *orig = queries.getValue(original);
             if (orig)
             {
+                hash = rtlHash64VStr(alias, hash);
+                hash = rtlHash64Data(sizeof(hash), &hash, orig->queryHash());
                 aliases.setValue(alias, orig);
             }
             else
@@ -772,7 +775,7 @@ public:
         return active;
     }
 
-    virtual void load(const IPropertyTree *querySet, const IPackageMap &packages)
+    virtual void load(const IPropertyTree *querySet, const IPackageMap &packages, hash64_t &hash)
     {
         Owned<IPropertyTreeIterator> queryNames = querySet->getElements("Query");
         ForEach (*queryNames)
@@ -800,7 +803,7 @@ public:
                     if (!package) package = &queryRootPackage();
                 }
                 assertex(package);
-                addQuery(id, loadQueryFromDll(id, queryDll.getClear(), *package, &query));
+                addQuery(id, loadQueryFromDll(id, queryDll.getClear(), *package, &query), hash);
             }
             catch (IException *E)
             {
@@ -821,7 +824,7 @@ public:
             const char *original = item.queryProp("@id");
             try
             {
-                addAlias(alias, original);
+                addAlias(alias, original, hash);
             }
             catch (IException *E)
             {
@@ -832,6 +835,8 @@ public:
             }
         }
         active = packages.isActive();
+        if (active)
+            hash = rtlHash64VStr("active", hash);
     }
 
     virtual void getStats(const char *queryName, const char *graphName, StringBuffer &reply, const IRoxieContextLogger &logctx) const
@@ -1013,11 +1018,11 @@ public:
         return managers[idx];
     }
 
-    virtual void load(const IPropertyTree *querySets, const IPackageMap &packages)
+    virtual void load(const IPropertyTree *querySets, const IPackageMap &packages, hash64_t &hash)
     {
         for (unsigned channel = 0; channel < numChannels; channel++)
             if (managers[channel])
-                managers[channel]->load(querySets, packages);
+                managers[channel]->load(querySets, packages, hash); // MORE - this means the hash depends on the number of channels. Is that desirable?
     }
 
 private:
@@ -1098,6 +1103,7 @@ public:
     CRoxieQueryPackageManager(unsigned _numChannels, const IPackageMap *_packages)
         : numChannels(_numChannels), packages(_packages)
     {
+        queryHash = 0;
     }
 
     ~CRoxieQueryPackageManager()
@@ -1116,6 +1122,12 @@ public:
 
     virtual void load() = 0;
 
+    virtual hash64_t getHash()
+    {
+        CriticalBlock b2(updateCrit);
+        return queryHash;
+    }
+
     IRoxieQuerySetManager* getRoxieServerManager()
     {
         CriticalBlock b2(updateCrit);
@@ -1186,7 +1198,7 @@ public:
         serverManager->getQueries(reply);
     }
 protected:
-    void reloadQueryManagers(CRoxieSlaveQuerySetManagerSet *newSlaveManagers, IRoxieQuerySetManager *newServerManager)
+    void reloadQueryManagers(CRoxieSlaveQuerySetManagerSet *newSlaveManagers, IRoxieQuerySetManager *newServerManager, hash64_t newHash)
     {
         Owned<CRoxieSlaveQuerySetManagerSet> oldSlaveManagers;
         Owned<IRoxieQuerySetManager> oldServerManager;
@@ -1197,6 +1209,7 @@ protected:
             oldServerManager.setown(serverManager.getClear()); // so that the release happens outside the critblock
             slaveManagers.setown(newSlaveManagers);
             serverManager.setown(newServerManager);
+            queryHash = newHash;
         }
     }
 
@@ -1206,6 +1219,7 @@ protected:
 
     Owned<const IPackageMap> packages;
     unsigned numChannels;
+    hash64_t queryHash;
 };
 
 /**
@@ -1262,13 +1276,14 @@ public:
 
     virtual void reload()
     {
+        hash64_t newHash = numChannels;
         const char *querySetId = packages->queryQuerySetId();
         Owned<IPropertyTree> newQuerySet = daliHelper->getQuerySet(querySetId);
         Owned<CRoxieSlaveQuerySetManagerSet> newSlaveManagers = new CRoxieSlaveQuerySetManagerSet(numChannels);
         Owned<IRoxieQuerySetManager> newServerManager = createServerManager();
-        newServerManager->load(newQuerySet, *packages);
-        newSlaveManagers->load(newQuerySet, *packages);
-        reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear());
+        newServerManager->load(newQuerySet, *packages, newHash);
+        newSlaveManagers->load(newQuerySet, *packages, newHash);
+        reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear(), newHash);
         clearKeyStoreCache(false);   // Allows us to fully release files we no longer need because of unloaded queries
     }
 
@@ -1291,16 +1306,17 @@ public:
     {
     }
 
-    void load()
+    virtual void load()
     {
+        hash64_t newHash = numChannels;
         Owned<IPropertyTree> newQuerySet = createPTree("QuerySet");
         newQuerySet->setProp("@name", "_standalone");
         newQuerySet->addPropTree("Query", standaloneDll.getLink());
         Owned<CRoxieSlaveQuerySetManagerSet> newSlaveManagers = new CRoxieSlaveQuerySetManagerSet(numChannels);
         Owned<IRoxieQuerySetManager> newServerManager = createServerManager();
-        newServerManager->load(newQuerySet, *packages);
-        newSlaveManagers->load(newQuerySet, *packages);
-        reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear());
+        newServerManager->load(newQuerySet, *packages, newHash);
+        newSlaveManagers->load(newQuerySet, *packages, newHash);
+        reloadQueryManagers(newSlaveManagers.getClear(), newServerManager.getClear(), newHash);
     }
 };
 
@@ -1410,6 +1426,7 @@ private:
     InterruptableSemaphore controlSem;
     IArrayOf<IDaliPackageWatcher> notifiers;
     Owned<IRoxieDaliHelper> daliHelper;
+    hash64_t stateHash;
 
     void reload()
     {
@@ -1972,6 +1989,7 @@ private:
                     reply.appendf("<Dali connected='1'/>");
                 else
                     reply.appendf("<Dali connected='0'/>");
+                reply.appendf("<State hash='%"I64F"u'/>", (unsigned __int64) stateHash);
             }
             else if (stricmp(queryName, "control:resetindexmetrics")==0)
             {
@@ -2053,6 +2071,14 @@ private:
                 socketCheckInterval = (unsigned) control->getPropInt64("@val", 0);
                 topology->setPropInt64("@socketCheckInterval", socketCheckInterval);
             }
+            else if (stricmp(queryName, "control:state")==0)
+            {
+                if (daliHelper && daliHelper->connected())
+                    reply.appendf("<Dali connected='1'/>");
+                else
+                    reply.appendf("<Dali connected='0'/>");
+                reply.appendf("<State hash='%"I64F"u'/>", (unsigned __int64) stateHash);
+            }
             else if (stricmp(queryName, "control:status")==0)
             {
                 CriticalBlock b(ccdChannelsCrit);
@@ -2274,6 +2300,7 @@ private:
         // Called from reload inside critical block
         Owned<CRoxieQueryPackageManager> qpm = new CRoxieDaliQueryPackageManager(numChannels, packageMap);
         qpm->load();
+        stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
         allQueryPackages.append(*qpm.getClear());
     }
 
@@ -2286,6 +2313,7 @@ private:
         CIArrayOf<CRoxieQueryPackageManager> oldQueryPackages;
         appendArray(oldQueryPackages, allQueryPackages);
         allQueryPackages.kill();
+        stateHash = 0;
 
         Owned<IDaliPackageWatcher> notifier = daliHelper->getPackageSetSubscription(roxieName, this);
         if (notifier)
@@ -2340,6 +2368,7 @@ private:
         standAloneDllTree->setProp("@dll", standAloneDll->queryDll()->queryName());
         Owned<CRoxieQueryPackageManager> qpm = new CStandaloneQueryPackageManager(numChannels, LINK(&queryEmptyPackageMap()), standAloneDllTree.getClear());
         qpm->load();
+        stateHash = rtlHash64Data(sizeof(stateHash), &stateHash, qpm->getHash());
         allQueryPackages.append(*qpm.getClear());
     }
 

+ 2 - 2
roxie/ccd/ccdstate.hpp

@@ -106,7 +106,7 @@ interface IRoxieQuerySetManager : extends IInterface
     virtual bool isActive() const = 0;
     virtual IQueryFactory *lookupLibrary(const char * libraryName, unsigned expectedInterfaceHash, const IRoxieContextLogger &logctx) const = 0;
     virtual IQueryFactory *getQuery(const char *id, const IRoxieContextLogger &ctx) const = 0;
-    virtual void load(const IPropertyTree *querySet, const IPackageMap &packages) = 0;
+    virtual void load(const IPropertyTree *querySet, const IPackageMap &packages, hash64_t &hash) = 0;
     virtual void getStats(const char *queryName, const char *graphName, StringBuffer &reply, const IRoxieContextLogger &logctx) const = 0;
     virtual void resetQueryTimings(const char *queryName, const IRoxieContextLogger &logctx) = 0;
     virtual void resetAllQueryTimings() = 0;
@@ -123,7 +123,7 @@ interface IRoxieDebugSessionManager : extends IInterface
 
 interface IRoxieQuerySetManagerSet : extends IInterface
 {
-    virtual void load(const IPropertyTree *querySets, const IPackageMap &packages) = 0;
+    virtual void load(const IPropertyTree *querySets, const IPackageMap &packages, hash64_t &hash) = 0;
 };
 
 interface IRoxieQueryPackageManagerSet : extends IInterface