Browse Source

Merge pull request #7805 from jakesmith/hpcc-14243

HPCC-14243 Add method to retrieve cluster group

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 9 years ago
parent
commit
08456508c6
2 changed files with 58 additions and 9 deletions
  1. 57 9
      dali/base/dadfs.cpp
  2. 1 0
      dali/base/dadfs.hpp

+ 57 - 9
dali/base/dadfs.cpp

@@ -8976,7 +8976,17 @@ class CInitGroups
     CConnectLock groupsconnlock;
     StringArray clusternames;
     unsigned defaultTimeout;
+    bool machinesLoaded;
 
+    GroupType getGroupType(const char *type)
+    {
+        if (0 == strcmp("ThorCluster", type))
+            return grp_thor;
+        else if (0 == strcmp("RoxieCluster", type))
+            return grp_roxie;
+        else
+            throwUnexpected();
+    }
     bool clusterGroupCompare(IPropertyTree *newClusterGroup, IPropertyTree *oldClusterGroup)
     {
         if (!newClusterGroup && oldClusterGroup)
@@ -9053,13 +9063,20 @@ class CInitGroups
         grp->setProp("@name", name);
     }
 
-    IGroup *getGroupFromCluster(GroupType groupType, IPropertyTree &cluster)
+#define DEFAULT_SLAVEBASEPORT 20100 // defaults are in thor.xsl.in AND init_thor at the moment
+#define DEFAULT_LOCALTHORPORTINC 200
+    IGroup *getGroupFromCluster(GroupType groupType, IPropertyTree &cluster, bool expand)
     {
         SocketEndpointArray eps;
         const char *processName=NULL;
-        switch (groupType) {
+        unsigned slavePort = 0;
+        unsigned localThorPortInc = 0;
+        switch (groupType)
+        {
             case grp_thor:
                 processName = "ThorSlaveProcess";
+                slavePort = cluster.getPropInt("@slaveport", DEFAULT_SLAVEBASEPORT);
+                localThorPortInc = cluster.getPropInt("@localThorPortInc", DEFAULT_LOCALTHORPORTINC);
                 break;
             case grp_thorspares:
                 processName = "ThorSpareProcess";
@@ -9072,7 +9089,8 @@ class CInitGroups
         }
         SocketEndpoint nullep;
         Owned<IPropertyTreeIterator> nodes = cluster.getElements(processName);
-        ForEach(*nodes) {
+        ForEach(*nodes)
+        {
             IPropertyTree &node = nodes->query();
             SocketEndpoint ep;
             const char *computer = node.queryProp("@computer");
@@ -9080,7 +9098,8 @@ class CInitGroups
             if (computer && *computer)
             {
                 CMachineEntryPtr *m = machinemap.getValue(computer);
-                if (!m) {
+                if (!m)
+                {
                     ERRLOG("Cannot construct %s, computer name %s not found\n", cluster.queryProp("@name"), computer);
                     return NULL;
                 }
@@ -9095,7 +9114,8 @@ class CInitGroups
                 ERRLOG("Cannot construct %s, missing computer spec on node\n", cluster.queryProp("@name"));
                 return NULL;
             }
-            switch (groupType) {
+            switch (groupType)
+            {
                 case grp_roxie:
                     // Redundant copies are located via the flags.
                     // Old environments may contain duplicated sever information for multiple ports
@@ -9103,6 +9123,7 @@ class CInitGroups
                     break;
                 case grp_thor:
                 case grp_thorspares:
+                    ep.port = slavePort;
                     eps.append(ep);
                     break;
                 default:
@@ -9115,12 +9136,17 @@ class CInitGroups
         unsigned slavesPerNode = 0;
         if (grp_thor == groupType)
             slavesPerNode = cluster.getPropInt("@slavesPerNode");
-        if (slavesPerNode)
+        if (expand && slavesPerNode)
         {
             SocketEndpointArray msEps;
-            for (unsigned s=0; s<slavesPerNode; s++) {
+            for (unsigned s=0; s<slavesPerNode; s++)
+            {
                 ForEachItemIn(e, eps)
-                    msEps.append(eps.item(e));
+                {
+                    SocketEndpoint ep = eps.item(e);
+                    ep.port = slavePort + (s * localThorPortInc);
+                    msEps.append(ep);
+                }
             }
             grp.setown(createIGroup(msEps));
         }
@@ -9131,6 +9157,8 @@ class CInitGroups
 
     bool loadMachineMap()
     {
+        if (machinesLoaded)
+            return true;
         //GH->JCS This can't be changed to use getEnvironmentFactory() unless that moved inside dalibase;
         Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Hardware", myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
         if (!conn) {
@@ -9147,6 +9175,7 @@ class CInitGroups
             machinemap.setValue(name, entry);
             machinelist.append(*entry);
         }
+        machinesLoaded = true;
         return true;
     }
 
@@ -9184,9 +9213,10 @@ class CInitGroups
 
     IPropertyTree *createClusterGroupFromEnvCluster(GroupType groupType, IPropertyTree &cluster, const char *dir, bool realCluster)
     {
-        Owned<IGroup> group = getGroupFromCluster(groupType, cluster);
+        Owned<IGroup> group = getGroupFromCluster(groupType, cluster, true);
         if (!group)
             return NULL;
+        // NB: creates IP group, ignore any ports in group
         return createClusterGroup(groupType, group, dir, realCluster);
     }
 
@@ -9300,6 +9330,7 @@ public:
         : groupsconnlock("constructGroup",SDS_GROUPSTORE_ROOT,true,false,false,_defaultTimeout)
     {
         defaultTimeout = _defaultTimeout;
+        machinesLoaded = false;
     }
 
     bool doClusterGroup(CgCmd cmd, const char *_clusterName, const char *type, bool spares, SocketEndpointArray *eps, StringBuffer &messages)
@@ -9484,6 +9515,12 @@ public:
             }
         }
     }
+    IGroup *getGroupFromCluster(const char *type, IPropertyTree &cluster, bool expand)
+    {
+        loadMachineMap();
+        GroupType gt = getGroupType(type);
+        return getGroupFromCluster(gt, cluster, expand);
+    }
 };
 
 void initClusterGroups(bool force, StringBuffer &response, IPropertyTree *oldEnvironment, unsigned timems)
@@ -9510,6 +9547,17 @@ bool removeClusterSpares(const char *clusterName, const char *type, SocketEndpoi
     return init.removeSpares(clusterName, type, eps, response);
 }
 
+IGroup *getClusterGroup(const char *clusterName, const char *type, bool expand, unsigned timems)
+{
+    CInitGroups init(timems);
+    VStringBuffer cluster("/Environment/Software/%s[@name=\"%s\"]", type, clusterName);
+    Owned<IRemoteConnection> conn = querySDS().connect(cluster.str(), myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
+    if (!conn)
+        return NULL;
+    return init.getGroupFromCluster(type, *conn->queryRoot(), expand);
+}
+
+
 class CDaliDFSServer: public Thread, public CTransactionLogTracker, implements IDaliServer, implements IExceptionHandler
 {  // Coven size
     

+ 1 - 0
dali/base/dadfs.hpp

@@ -772,6 +772,7 @@ extern da_decl bool removeClusterSpares(const char *clusterName, const char *typ
 // should poss. belong in lib workunit
 extern da_decl StringBuffer &getClusterGroupName(IPropertyTree &cluster, StringBuffer &groupName);
 extern da_decl StringBuffer &getClusterSpareGroupName(IPropertyTree &cluster, StringBuffer &groupName);
+extern da_decl IGroup *getClusterGroup(const char *clusterName, const char *type, bool expand, unsigned timems=INFINITE);
 
 extern da_decl IDistributedFileTransaction *createDistributedFileTransaction(IUserDescriptor *user);