Kaynağa Gözat

HPCC-23509 Allow master/slave to be decoupled from fixed group

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 yıl önce
ebeveyn
işleme
cb2c6e33fb

+ 15 - 10
thorlcr/master/mawatchdog.cpp

@@ -85,9 +85,12 @@ CMasterWatchdogBase::~CMasterWatchdogBase()
 
 void CMasterWatchdogBase::start()
 {
-    PROGLOG("Starting watchdog");
-    stopped = false;
-    threaded.init(this);
+    if (stopped)
+    {
+        PROGLOG("Starting watchdog");
+        stopped = false;
+        threaded.init(this);
+    }
 }
 
 void CMasterWatchdogBase::addSlave(const SocketEndpoint &slave)
@@ -260,10 +263,11 @@ class CMasterWatchdogUDP : public CMasterWatchdogBase
 {
     ISocket *sock;
 public:
-    CMasterWatchdogUDP()
+    CMasterWatchdogUDP(bool startNow)
     {
         sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
-        start();
+        if (startNow)
+            start();
     }
     ~CMasterWatchdogUDP()
     {
@@ -307,9 +311,10 @@ public:
 class CMasterWatchdogMP : public CMasterWatchdogBase
 {
 public:
-    CMasterWatchdogMP()
+    CMasterWatchdogMP(bool startNow)
     {
-        start();
+        if (startNow)
+            start();
     }
     virtual unsigned readData(MemoryBuffer &mb)
     {
@@ -327,10 +332,10 @@ public:
 
 /////////////////////
 
-CMasterWatchdogBase *createMasterWatchdog(bool udp)
+CMasterWatchdogBase *createMasterWatchdog(bool udp, bool startNow)
 {
     if (udp)
-        return new CMasterWatchdogUDP();
+        return new CMasterWatchdogUDP(startNow);
     else
-        return new CMasterWatchdogMP();
+        return new CMasterWatchdogMP(startNow);
 }

+ 1 - 1
thorlcr/master/mawatchdog.hpp

@@ -52,7 +52,7 @@ public:
     virtual void stopReading() = 0;
 };
 
-CMasterWatchdogBase *createMasterWatchdog(bool udp=false);
+CMasterWatchdogBase *createMasterWatchdog(bool udp=false, bool startNow=false);
 
 #endif
 

+ 79 - 62
thorlcr/master/thmastermain.cpp

@@ -136,7 +136,7 @@ public:
     Linked<CMasterWatchdogBase> watchdog;
     IBitSet *status;
 
-    CRegistryServer()  : deregistrationWatch(*this)
+    CRegistryServer() : deregistrationWatch(*this)
     {
         status = createThreadSafeBitSet();
         msgDelay = SLAVEREG_VERIFY_DELAY;
@@ -196,70 +196,76 @@ public:
             watchdog->addSlave(ep);
         ++slavesRegistered;
     }
-    bool connect()
+    bool connect(unsigned slaves)
     {
-        unsigned slaves = queryNodeClusterWidth();
         LOG(MCdebugProgress, thorJob, "Waiting for %d slaves to register", slaves);
-        unsigned timeWaited = 0;
-        unsigned connected = 0;
-        Owned<IBitSet> connectedSet = createThreadSafeBitSet();
-        for (;;)
+
+        IPointerArrayOf<INode> connectedSlaves;
+        connectedSlaves.ensure(slaves);
+        unsigned remaining = slaves;
+        INode *_sender = nullptr;
+        CMessageBuffer msg;
+        while (remaining)
         {
-            CTimeMon tm(msgDelay);
-            UnsignedArray todo;
-            unsigned s = 0;
-            while ((s=connectedSet->scan(s, false)) < slaves)
-                todo.append(s++);
-            Owned<IShuffledIterator> shuffled = createShuffledIterator(todo.ordinality());
-            ForEach(*shuffled)
+            if (!queryWorldCommunicator().recv(msg, nullptr, MPTAG_THORREGISTRATION, &_sender, MP_WAIT_FOREVER))
             {
-                s = todo.item(shuffled->get());
-                unsigned remaining;
-                if (tm.timedout(&remaining))
-                    break;
-                PROGLOG("Verifying connection to slave %d", s+1);
-                if (queryWorldCommunicator().verifyConnection(&queryNodeGroup().queryNode(s+1), remaining))
-                {
-                    StringBuffer str;
-                    PROGLOG("verified connection with %s", queryNodeGroup().queryNode(s+1).endpoint().getUrlStr(str.clear()).str());
-                    ++connected;
-                    connectedSet->set(s);
-                }
-                if (stopped)
-                    return false;
+                ::Release(_sender);
+                PROGLOG("Failed to initialize slaves");
+                return false;
+            }
+            Owned<INode> sender = _sender;
+            if (NotFound != connectedSlaves.find(sender))
+            {
+                StringBuffer epStr;
+                PROGLOG("Same slave registered twice!! : %s", sender->endpoint().getUrlStr(epStr).str());
+                return false;
+            }
+
+            /* NB: in base metal setup, the slaves know which slave number they are in advance, and send their slavenum at registration.
+             * In non attached storage setup, they do not send a slave by default and instead are given a # once all are registered
+             */
+            unsigned slaveNum;
+            msg.read(slaveNum);
+            if (NotFound == slaveNum)
+            {
+                connectedSlaves.append(sender.getLink());
+                slaveNum = connectedSlaves.ordinality();
             }
-            timeWaited += tm.elapsed();
-            if (connected == slaves)
-                break;
             else
             {
-                if (timeWaited >= MAX_SLAVEREG_DELAY)
-                    throw MakeThorException(TE_AbortException, "Have waited over %d minutes for all slaves to connect, quitting.", MAX_SLAVEREG_DELAY/1000/60);
-                unsigned outstanding = slaves - connected;
-                PROGLOG("Still Waiting for minimum %d slaves to connect", outstanding);
-                if ((outstanding) <= 5)
-                {
-                    unsigned s=0;
-                    for (;;)
-                    {
-                        unsigned ns = connectedSet->scan(s, false);
-                        if (ns<s || ns >= slaves)
-                            break;
-                        s = ns+1;
-                        StringBuffer str;
-                        PROGLOG("waiting for slave %d (%s)", s, queryNodeGroup().queryNode(s).endpoint().getUrlStr(str.clear()).str());
-                    }
-                }
-                msgDelay = (unsigned) ((float)msgDelay * 1.5);
-                if (timeWaited+msgDelay > MAX_SLAVEREG_DELAY)
-                    msgDelay = MAX_SLAVEREG_DELAY - timeWaited;
+                unsigned pos = slaveNum - 1; // NB: slaveNum is 1 based
+                while (connectedSlaves.ordinality() < pos)
+                    connectedSlaves.append(nullptr);
+                if (connectedSlaves.ordinality() == pos)
+                    connectedSlaves.append(sender.getLink());
+                else
+                    connectedSlaves.replace(sender.getLink(), pos);
             }
+            StringBuffer epStr;
+            PROGLOG("Slave %u connected from %s", slaveNum, sender->endpoint().getUrlStr(epStr).str());
+            --remaining;
         }
-        
+        assertex(slaves == connectedSlaves.ordinality());
+
+        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
+        unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
+        unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
+
+        Owned<IGroup> processGroup;
+
+        // NB: in bare metal Thor is bound to a group and cluster/communicator have alreday been setup (see earlier setClusterGroup call)
+        if (clusterInitialized())
+            processGroup.set(&queryProcessGroup());
+        else
+        {
+            processGroup.setown(createIGroup(connectedSlaves.ordinality(), connectedSlaves.getArray()));
+            setupCluster(queryMyNode(), processGroup, channelsPerSlave, slaveBasePort, localThorPortInc);
+        }
+
         PROGLOG("Slaves connected, initializing..");
-        CMessageBuffer msg;
+        msg.clear();
         msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
-        queryRawGroup().serialize(msg);
+        processGroup->serialize(msg);
         globals->serialize(msg);
         msg.append(masterSlaveMpTag);
         msg.append(kjServiceMpTag);
@@ -268,6 +274,8 @@ public:
             PROGLOG("Failed to initialize slaves");
             return false;
         }
+
+        // Wait for confirmation from slaves
         PROGLOG("Initialization sent to slave group");
         try
         {
@@ -304,6 +312,8 @@ public:
                 }
                 registerNode(sender-1);
             }
+
+            // this is like a barrier, let slaves know all slaves are now connected
             PROGLOG("Slaves initialized");
             unsigned s=0;
             for (; s<slaves; s++)
@@ -315,6 +325,8 @@ public:
                     return false;
                 }
             }
+            if (watchdog)
+                watchdog->start();
             deregistrationWatch.start();
             return true;
         }
@@ -605,7 +617,9 @@ int main( int argc, char *argv[]  )
 #endif
     const char *thorname = NULL;
     StringBuffer nodeGroup, logUrl;
-    unsigned channelsPerSlave = 1;
+    unsigned numSlaves = globals->getPropInt("@numSlaves", 0); // >0 in container world, 0 in bare metal
+    unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
+    unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
 
     ILogMsgHandler *logHandler;
     try
@@ -664,12 +678,6 @@ int main( int argc, char *argv[]  )
             nodeGroup.append(thorname);
             globals->setProp("@nodeGroup", thorname);
         }
-        unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
-        channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
-        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
-        unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
-        Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
-        setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
         if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
         {
             FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
@@ -840,7 +848,16 @@ int main( int argc, char *argv[]  )
         masterSlaveMpTag = allocateClusterMPTag();
         kjServiceMpTag = allocateClusterMPTag();
 
-        if (registry->connect())
+        if (0 == numSlaves) // bare metal
+        {
+            unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
+            unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
+            Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
+            setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
+            numSlaves = queryNodeClusterWidth();
+        }
+
+        if (registry->connect(numSlaves))
         {
             unsigned totSlaveProcs = queryNodeClusterWidth();
             for (unsigned s=0; s<totSlaveProcs; s++)

+ 42 - 17
thorlcr/slave/thslavemain.cpp

@@ -90,7 +90,7 @@ static void replyError(unsigned errorCode, const char *errorMsg)
     Owned<IException> e = MakeStringException(errorCode, "%s", str.str());
     CMessageBuffer msg;
     serializeException(e, msg);
-    queryWorldCommunicator().send(msg, 0, MPTAG_THORREGISTRATION);
+    queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION);
 }
 
 static std::atomic<bool> isRegistered {false};
@@ -106,27 +106,36 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
         ep.port = getFixedPort(getMasterPortBase(), TPORT_mp);
         Owned<INode> masterNode = createINode(ep);
         CMessageBuffer msg;
+        msg.append(mySlaveNum);
+        queryWorldCommunicator().send(msg, masterNode, MPTAG_THORREGISTRATION);
         if (!queryWorldCommunicator().recv(msg, masterNode, MPTAG_THORREGISTRATION))
             return false;
         PROGLOG("Initialization received");
         unsigned vmajor, vminor;
         msg.read(vmajor);
         msg.read(vminor);
-        if (vmajor != THOR_VERSION_MAJOR || vminor != THOR_VERSION_MINOR)
-        {
-            replyError(TE_FailedToRegisterSlave, "Thor master/slave version mismatch");
-            return false;
-        }
-        Owned<IGroup> rawGroup = deserializeIGroup(msg);
+        Owned<IGroup> processGroup = deserializeIGroup(msg);
+        mySlaveNum = (unsigned)processGroup->rank(queryMyNode());
+        assertex(NotFound != mySlaveNum);
+        mySlaveNum++; // 1 based;
+        unsigned configSlaveNum = globals->getPropInt("@SLAVENUM", NotFound);
+        if (NotFound != configSlaveNum)
+            assertex(mySlaveNum == configSlaveNum);
+
         globals->Release();
         globals = createPTree(msg);
         mergeCmdParams(globals); // cmd line
 
-        unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
         unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
         unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
         unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
-        setClusterGroup(masterNode, rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
+        setupCluster(masterNode, processGroup, channelsPerSlave, slaveBasePort, localThorPortInc);
+
+        if (vmajor != THOR_VERSION_MAJOR || vminor != THOR_VERSION_MINOR)
+        {
+            replyError(TE_FailedToRegisterSlave, "Thor master/slave version mismatch");
+            return false;
+        }
 
         unsigned numStrands, blockSize;
         if (globals->hasProp("Debug/@forceNumStrands"))
@@ -160,14 +169,15 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
         }
         readUnderlyingType<mptag_t>(msg, masterSlaveMpTag);
         readUnderlyingType<mptag_t>(msg, kjServiceMpTag);
+
         msg.clear();
-        msg.setReplyTag(MPTAG_THORREGISTRATION);
-        if (!queryNodeComm().reply(msg))
+        if (!queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION))
             return false;
-
         PROGLOG("Registration confirmation sent");
-        if (!queryNodeComm().recv(msg, 0, MPTAG_THORREGISTRATION)) // when all registered
+
+        if (!queryNodeComm().recv(msg, 0, MPTAG_THORREGISTRATION))
             return false;
+        PROGLOG("Registration confirmation receipt received");
 
         ::masterNode = LINK(masterNode);
 
@@ -353,6 +363,18 @@ int main( int argc, char *argv[]  )
         if (!master)
             usage();
 
+        mySlaveNum = globals->getPropInt("@SLAVENUM", NotFound);
+        /* NB: in cloud/non-local storage mode, slave number is not known until after registration with the master
+        * For the time being log file names are based on their slave number, so can only start when known.
+        */
+        bool loggingStarted = false;
+        if (NotFound != mySlaveNum)
+        {
+            startSlaveLog();
+            loggingStarted = true;
+        }
+
+        // In container world, SLAVE= will not be used
         const char *slave = globals->queryProp("@SLAVE");
         if (slave)
         {
@@ -362,11 +384,10 @@ int main( int argc, char *argv[]  )
         else 
             slfEp.setLocalHost(0);
 
-        mySlaveNum = globals->getPropInt("@SLAVENUM");
-
+        // TBD: use new config/init system for generic handling of init settings vs command line overrides
+        if (0 == slfEp.port) // assume default from config if not on command line
+            slfEp.port = globals->getPropInt("@slaveport", THOR_BASESLAVE_PORT);
         setMachinePortBase(slfEp.port);
-        slfEp.port = getMachinePortBase();
-        startSlaveLog();
 
         setSlaveAffinity(globals->getPropInt("@SLAVEPROCESSNUM"));
 
@@ -383,8 +404,12 @@ int main( int argc, char *argv[]  )
         localHostToNIC(masterEp);
         setMasterPortBase(masterEp.port);
         markNodeCentral(masterEp);
+
         if (RegisterSelf(masterEp))
         {
+            if (!loggingStarted)
+                startSlaveLog();
+
             if (globals->getPropBool("Debug/@slaveDaliClient"))
                 enableThorSlaveAsDaliClient();
 

+ 65 - 58
thorlcr/thorutil/thormisc.cpp

@@ -56,44 +56,36 @@
 
 #define SDS_LOCK_TIMEOUT 30000
 
-static INode *masterNode;
-static IGroup *rawGroup;
-static IGroup *nodeGroup;
-static IGroup *clusterGroup;
-static IGroup *slaveGroup;
-static IGroup *dfsGroup;
-static ICommunicator *nodeComm;
+static Owned<INode> masterNode;
+static Owned<IGroup> processGroup; // group of slave processes
+static Owned<IGroup> nodeGroup;    // master + processGroup
+static Owned<IGroup> slaveGroup;   // group containing all channels
+static Owned<IGroup> clusterGroup; // master + slaveGroup
+static Owned<IGroup> dfsGroup;     // same as slaveGroup, but without ports
+static Owned<ICommunicator> nodeComm; // communicator based on nodeGroup (master+slave processes)
 
 
 mptag_t masterSlaveMpTag;
 mptag_t kjServiceMpTag;
 IPropertyTree *globals;
-static IMPtagAllocator *ClusterMPAllocator = NULL;
+static Owned<IMPtagAllocator> ClusterMPAllocator;
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)
 {
-    masterNode = NULL;
-    globals = NULL;
-    rawGroup = NULL;
-    nodeGroup = NULL;
-    clusterGroup = NULL;
-    slaveGroup = NULL;
-    dfsGroup = NULL;
-    nodeComm = NULL;
-    ClusterMPAllocator = createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT);
+    ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
     return true;
 }
 
 MODULE_EXIT()
 {
-    ::Release(masterNode);
-    ::Release(rawGroup);
-    ::Release(nodeGroup);
-    ::Release(clusterGroup);
-    ::Release(slaveGroup);
-    ::Release(dfsGroup);
-    ::Release(nodeComm);
-    ::Release(ClusterMPAllocator);
+    masterNode.clear();
+    nodeGroup.clear();
+    processGroup.clear();
+    clusterGroup.clear();
+    slaveGroup.clear();
+    dfsGroup.clear();
+    nodeComm.clear();
+    ClusterMPAllocator.clear();
 }
 
 
@@ -820,18 +812,48 @@ StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *query
     return compoundName.append('V').append(version).append('_').append(queryName);
 }
 
-void setClusterGroup(INode *_masterNode, IGroup *_rawGroup, unsigned slavesPerNode, unsigned channelsPerSlave, unsigned portBase, unsigned portInc)
+void setupGroups(INode *_masterNode, IGroup *_processGroup, IGroup *_slaveGroup)
 {
-    ::Release(masterNode);
-    ::Release(rawGroup);
-    ::Release(nodeGroup);
-    ::Release(clusterGroup);
-    ::Release(slaveGroup);
-    ::Release(dfsGroup);
-    ::Release(nodeComm);
-    masterNode = LINK(_masterNode);
-    rawGroup = LINK(_rawGroup);
+    masterNode.set(_masterNode);
+    processGroup.set(_processGroup);
+    slaveGroup.set(_slaveGroup);
 
+    // nodeGroup contains master + all slave processes (excludes virtual slaves)
+    nodeGroup.setown(processGroup->add(LINK(masterNode), 0));
+
+    // clusterGroup contains master + all slaves (including virtuals)
+    clusterGroup.setown(slaveGroup->add(LINK(masterNode), 0));
+
+    // dfsGroup is same as slaveGroup, but stripped of ports. So is a IP group as wide as slaveGroup, used for publishing
+    IArrayOf<INode> dfsGroupNodes;
+    Owned<INodeIterator> nodeIter = slaveGroup->getIterator();
+    ForEach(*nodeIter)
+        dfsGroupNodes.append(*createINodeIP(nodeIter->query().endpoint(), 0));
+    dfsGroup.setown(createIGroup(dfsGroupNodes.ordinality(), dfsGroupNodes.getArray()));
+
+    nodeComm.setown(createCommunicator(nodeGroup));
+}
+    
+void setupCluster(INode *_masterNode, IGroup *_processGroup, unsigned channelsPerSlave, unsigned portBase, unsigned portInc)
+{
+    IArrayOf<INode> slaveGroupNodes;
+    for (unsigned s=0; s<channelsPerSlave; s++)
+    {
+        for (unsigned p=0; p<_processGroup->ordinality(); p++)
+        {
+            INode &processNode = _processGroup->queryNode(p);
+            SocketEndpoint ep = processNode.endpoint();
+            ep.port = ep.port + (s * portInc);
+            Owned<INode> node = createINode(ep);
+            slaveGroupNodes.append(*node.getClear());
+        }
+    }
+    Owned<IGroup> _slaveGroup = createIGroup(slaveGroupNodes.ordinality(), slaveGroupNodes.getArray());
+    setupGroups(_masterNode, _processGroup, _slaveGroup);
+}
+
+void setClusterGroup(INode *_masterNode, IGroup *rawGroup, unsigned slavesPerNode, unsigned channelsPerSlave, unsigned portBase, unsigned portInc)
+{
     SocketEndpointArray epa;
     OwnedMalloc<unsigned> hostStartPort, hostNextStartPort;
     hostStartPort.allocateN(rawGroup->ordinality());
@@ -853,9 +875,7 @@ void setClusterGroup(INode *_masterNode, IGroup *_rawGroup, unsigned slavesPerNo
             hostNextStartPort[hostPos] += (slavesPerNode * channelsPerSlave * portInc);
         }
     }
-    IArrayOf<INode> clusterGroupNodes, nodeGroupNodes;
-    clusterGroupNodes.append(*LINK(masterNode));
-    nodeGroupNodes.append(*LINK(masterNode));
+    IArrayOf<INode> slaveGroupNodes, processGroupNodes;
     for (unsigned s=0; s<channelsPerSlave; s++)
     {
         for (unsigned p=0; p<slavesPerNode; p++)
@@ -865,35 +885,22 @@ void setClusterGroup(INode *_masterNode, IGroup *_rawGroup, unsigned slavesPerNo
                 SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
                 ep.port = hostStartPort[n] + (((p * channelsPerSlave) + s) * portInc);
                 Owned<INode> node = createINode(ep);
-                clusterGroupNodes.append(*node.getLink());
+                slaveGroupNodes.append(*node.getLink());
                 if (0 == s)
-                    nodeGroupNodes.append(*node.getLink());
+                    processGroupNodes.append(*node.getLink());
             }
         }
     }
-    // clusterGroup contains master + all slaves (including virtuals)
-    clusterGroup = createIGroup(clusterGroupNodes.ordinality(), clusterGroupNodes.getArray());
-
-    // nodeGroup container master + all slave processes (excludes virtual slaves)
-    nodeGroup = createIGroup(nodeGroupNodes.ordinality(), nodeGroupNodes.getArray());
-
-    // slaveGroup contains all slaves (including virtuals) but excludes master
-    slaveGroup = clusterGroup->remove(0);
-
-    // dfsGroup is same as slaveGroup, but stripped of ports. So is a IP group as wide as slaveGroup, used for publishing
-    IArrayOf<INode> slaveGroupNodes;
-    Owned<INodeIterator> nodeIter = slaveGroup->getIterator();
-    ForEach(*nodeIter)
-    slaveGroupNodes.append(*createINodeIP(nodeIter->query().endpoint(),0));
-    dfsGroup = createIGroup(slaveGroupNodes.ordinality(), slaveGroupNodes.getArray());
-
-    nodeComm = createCommunicator(nodeGroup);
+    Owned<IGroup> _processGroup = createIGroup(processGroupNodes.ordinality(), processGroupNodes.getArray());
+    Owned<IGroup> _slaveGroup = createIGroup(slaveGroupNodes.ordinality(), slaveGroupNodes.getArray());
+    setupGroups(_masterNode, _processGroup, _slaveGroup);
 }
+
 bool clusterInitialized() { return NULL != nodeComm; }
 INode &queryMasterNode() { return *masterNode; }
 ICommunicator &queryNodeComm() { return *nodeComm; }
-IGroup &queryRawGroup() { return *rawGroup; }
 IGroup &queryNodeGroup() { return *nodeGroup; }
+IGroup &queryProcessGroup() { return *processGroup; }
 IGroup &queryClusterGroup() { return *clusterGroup; }
 IGroup &querySlaveGroup() { return *slaveGroup; }
 IGroup &queryDfsGroup() { return *dfsGroup; }

+ 2 - 1
thorlcr/thorutil/thormisc.hpp

@@ -478,11 +478,12 @@ extern graph_decl const LogMsgJobInfo thorJob;
 
 extern graph_decl StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *queryName, unsigned version);
 
+extern graph_decl void setupCluster(INode *masterNode, IGroup *processGroup, unsigned channelsPerSlave, unsigned portBase, unsigned portInc);
 extern graph_decl void setClusterGroup(INode *masterNode, IGroup *group, unsigned slavesPerNode, unsigned channelsPerSlave, unsigned portBase, unsigned portInc);
 extern graph_decl bool clusterInitialized();
 extern graph_decl INode &queryMasterNode();
-extern graph_decl IGroup &queryRawGroup();
 extern graph_decl IGroup &queryNodeGroup();
+extern graph_decl IGroup &queryProcessGroup();
 extern graph_decl ICommunicator &queryNodeComm();
 extern graph_decl IGroup &queryClusterGroup();
 extern graph_decl IGroup &querySlaveGroup();