瀏覽代碼

HPCC-14429 Add options to specify slave processes and channels

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 9 年之前
父節點
當前提交
52959d3bcd

+ 3 - 1
common/workunit/workunit.cpp

@@ -4336,7 +4336,9 @@ public:
                 unsigned nodes = thor.getCount("ThorSlaveProcess");
                 if (!nodes)
                     throw MakeStringException(WUERR_MismatchClusterSize,"CEnvironmentClusterInfo: Thor cluster can not have 0 slave processes");
-                unsigned ts = nodes * thor.getPropInt("@slavesPerNode", 1);
+                unsigned slavesPerNode = thor.getPropInt("@slavesPerNode", 1);
+                unsigned channelsPerSlave = thor.getPropInt("@channelsPerSlave", 1);
+                unsigned ts = nodes * slavesPerNode * channelsPerSlave;
                 if (clusterWidth && (ts!=clusterWidth)) 
                     throw MakeStringException(WUERR_MismatchClusterSize,"CEnvironmentClusterInfo: mismatched thor sizes in cluster");
                 clusterWidth = ts;

+ 40 - 25
dali/base/dadfs.cpp

@@ -9067,20 +9067,14 @@ class CInitGroups
         grp->setProp("@name", name);
     }
 
-#define DEFAULT_SLAVEBASEPORT 20100 // defaults are in thor.xsl.in AND init_thor at the moment
-#define DEFAULT_LOCALTHORPORTINC 200
     IGroup *getGroupFromCluster(GroupType groupType, IPropertyTree &cluster, bool expand)
     {
         SocketEndpointArray eps;
         const char *processName=NULL;
-        unsigned slavePort = 0;
-        unsigned localThorPortInc = 0;
         switch (groupType)
         {
             case grp_thor:
                 processName = "ThorSlaveProcess";
-                slavePort = cluster.getPropInt("@slaveport", DEFAULT_SLAVEBASEPORT);
-                localThorPortInc = cluster.getPropInt("@localThorPortInc", DEFAULT_LOCALTHORPORTINC);
                 break;
             case grp_thorspares:
                 processName = "ThorSpareProcess";
@@ -9127,7 +9121,6 @@ class CInitGroups
                     break;
                 case grp_thor:
                 case grp_thorspares:
-                    ep.port = slavePort;
                     eps.append(ep);
                     break;
                 default:
@@ -9137,20 +9130,17 @@ class CInitGroups
         if (!eps.ordinality())
             return NULL;
         Owned<IGroup> grp;
-        unsigned slavesPerNode = 0;
-        if (grp_thor == groupType)
-            slavesPerNode = cluster.getPropInt("@slavesPerNode", 1);
-        if (expand && slavesPerNode)
+        if (grp_thor != groupType)
+            expand = false;
+        if (expand)
         {
+            unsigned slavesPerNode = cluster.getPropInt("@slavesPerNode", 1);
+            unsigned channelsPerSlave = cluster.getPropInt("@channelsPerSlave", 1);
             SocketEndpointArray msEps;
-            for (unsigned s=0; s<slavesPerNode; s++)
+            for (unsigned s=0; s<(slavesPerNode*channelsPerSlave); s++)
             {
                 ForEachItemIn(e, eps)
-                {
-                    SocketEndpoint ep = eps.item(e);
-                    ep.port = slavePort + (s * localThorPortInc);
-                    msEps.append(ep);
-                }
+                    msEps.append(eps.item(e));
             }
             grp.setown(createIGroup(msEps));
         }
@@ -9220,7 +9210,6 @@ class CInitGroups
         Owned<IGroup> group = getGroupFromCluster(groupType, cluster, true);
         if (!group)
             return NULL;
-        // NB: creates IP group, ignore any ports in group
         return createClusterGroup(groupType, group, dir, realCluster);
     }
 
@@ -9248,7 +9237,7 @@ class CInitGroups
                 break;
             case grp_thorspares:
                 getClusterSpareGroupName(cluster, gname);
-                realCluster = false;
+                oldRealCluster = realCluster = false;
                 break;
             case grp_roxie:
                 gname.append(cluster.queryProp("@name"));
@@ -9283,13 +9272,18 @@ class CInitGroups
                 VStringBuffer msg("Forcing new group layout for %s [ matched active = %s, matched old environment = %s ]", gname.str(), matchExisting?"true":"false", matchOldEnv?"true":"false");
                 WARNLOG("%s", msg.str());
                 messages.append(msg).newline();
-                matchExisting = matchOldEnv = false;
+                matchOldEnv = false;
             }
             else
             {
                 VStringBuffer msg("Active cluster '%s' group layout does not match environment [matched old environment=%s]", gname.str(), matchOldEnv?"true":"false");
                 LOG(MCoperatorWarning, unknownJob, "%s", msg.str());                                                                        \
                 messages.append(msg).newline();
+                if (existingClusterGroup)
+                {
+                    // NB: not used at moment, but may help spot clusters that have swapped nodes
+                    existingClusterGroup->setPropBool("@mismatched", true);
+                }
             }
         }
         if (!existingClusterGroup || (!matchExisting && !matchOldEnv))
@@ -9551,14 +9545,35 @@ bool removeClusterSpares(const char *clusterName, const char *type, SocketEndpoi
     return init.removeSpares(clusterName, type, eps, response);
 }
 
-IGroup *getClusterGroup(const char *clusterName, const char *type, bool expand, unsigned timems)
+IGroup *getClusterNodeGroup(const char *clusterName, const char *type, unsigned timems)
 {
-    CInitGroups init(timems);
-    VStringBuffer cluster("/Environment/Software/%s[@name=\"%s\"]", type, clusterName);
-    Owned<IRemoteConnection> conn = querySDS().connect(cluster.str(), myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
+    VStringBuffer clusterPath("/Environment/Software/%s[@name=\"%s\"]", type, clusterName);
+    Owned<IRemoteConnection> conn = querySDS().connect(clusterPath.str(), myProcessSession(), RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
     if (!conn)
         return NULL;
-    return init.getGroupFromCluster(type, *conn->queryRoot(), expand);
+    IPropertyTree &cluster = *conn->queryRoot();
+    StringBuffer nodeGroupName;
+    getClusterGroupName(cluster, nodeGroupName);
+    if (0 == nodeGroupName.length())
+        throwUnexpected();
+
+    /* NB: Due to the way node groups and swapNode work, we need to return the IP's from the node group corresponding to the cluster
+     * which may no longer match the cluster IP's due to node swapping.
+     * As the node group is an expanded form of the cluster group (with a IP per partition/slave), with the cluster group repeated
+     * N times, where N is slavesPerNode*channelsPerSlave, return the first M (cluster group width) IP's of the node group.
+     * Ideally the node group representation would change to match the cluster group definition, but that require a lot of changes
+     * to DFS and elsewhere.
+     */
+    Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(nodeGroupName);
+    CInitGroups init(timems);
+    Owned<IGroup> expandedClusterGroup = init.getGroupFromCluster(type, cluster, true);
+    if (nodeGroup->ordinality() != expandedClusterGroup->ordinality()) // sanity check
+        throwUnexpected();
+    Owned<IGroup> clusterGroup = init.getGroupFromCluster(type, cluster, false);
+    ICopyArrayOf<INode> nodes;
+    for (unsigned n=0; n<clusterGroup->ordinality(); n++)
+        nodes.append(nodeGroup->queryNode(n));
+    return createIGroup(nodes.ordinality(), nodes.getArray());
 }
 
 

+ 1 - 1
dali/base/dadfs.hpp

@@ -773,7 +773,7 @@ extern da_decl bool removeClusterSpares(const char *clusterName, const char *typ
 // should poss. belong in lib workunit
 extern da_decl StringBuffer &getClusterGroupName(IPropertyTree &cluster, StringBuffer &groupName);
 extern da_decl StringBuffer &getClusterSpareGroupName(IPropertyTree &cluster, StringBuffer &groupName);
-extern da_decl IGroup *getClusterGroup(const char *clusterName, const char *type, bool expand, unsigned timems=INFINITE);
+extern da_decl IGroup *getClusterNodeGroup(const char *clusterName, const char *type, unsigned timems=INFINITE);
 
 extern da_decl IDistributedFileTransaction *createDistributedFileTransaction(IUserDescriptor *user);
 

+ 25 - 24
dali/daliadmin/daliadmin.cpp

@@ -635,7 +635,7 @@ void dfscsv(const char *dali,IUserDescriptor *udesc)
 
 //=============================================================================
 
-static void dfsgroup(const char *name, const char *outputFilename, bool cluster)
+static void writeGroup(IGroup *group, const char *name, const char *outputFilename)
 {
     Owned<IFileIOStream> io;
     if (outputFilename)
@@ -644,27 +644,6 @@ static void dfsgroup(const char *name, const char *outputFilename, bool cluster)
         OwnedIFileIO iFileIO = iFile->open(IFOcreate);
         io.setown(createIOStream(iFileIO));
     }
-    Owned<IGroup> group;
-    if (cluster)
-    {
-        group.setown(getClusterGroup(name, "ThorCluster", false));
-        Owned<INodeIterator> iter = group->getIterator();
-        IArrayOf<INode> nodes;
-        ForEach(*iter)
-        {
-            SocketEndpoint ep = iter->query().endpoint();
-            ep.port = 0;
-            nodes.append(*createINode(ep));
-        }
-        group.setown(createIGroup(nodes.ordinality(), nodes.getArray()));
-    }
-    else
-        group.setown(queryNamedGroupStore().lookup(name));
-    if (!group)
-    {
-        ERRLOG("cannot find group %s",name);
-        return;
-    }
     StringBuffer eps;
     for (unsigned i=0;i<group->ordinality();i++)
     {
@@ -679,6 +658,28 @@ static void dfsgroup(const char *name, const char *outputFilename, bool cluster)
     }
 }
 
+static void dfsGroup(const char *name, const char *outputFilename)
+{
+    Owned<IGroup> group = queryNamedGroupStore().lookup(name);
+    if (!group)
+    {
+        ERRLOG("cannot find group %s",name);
+        return;
+    }
+    writeGroup(group, name, outputFilename);
+}
+
+static void clusterGroup(const char *name, const char *outputFilename)
+{
+    Owned<IGroup> group = getClusterNodeGroup(name, "ThorCluster");
+    if (!group)
+    {
+        ERRLOG("cannot find group %s",name);
+        return;
+    }
+    writeGroup(group, name, outputFilename);
+}
+
 //=============================================================================
 
 static void dfsmap(const char *lname, IUserDescriptor *user)
@@ -2806,11 +2807,11 @@ int main(int argc, char* argv[])
                     }
                     else if (stricmp(cmd,"dfsgroup")==0) {
                         CHECKPARAMS(1,2);
-                        dfsgroup(params.item(1),(np>1)?params.item(2):NULL, false);
+                        dfsGroup(params.item(1),(np>1)?params.item(2):NULL);
                     }
                     else if (stricmp(cmd,"clusternodes")==0) {
                         CHECKPARAMS(1,2);
-                        dfsgroup(params.item(1),(np>1)?params.item(2):NULL, true);
+                        clusterGroup(params.item(1),(np>1)?params.item(2):NULL);
                     }
                     else if (stricmp(cmd,"dfsmap")==0) {
                         CHECKPARAMS(1,1);

+ 6 - 5
initfiles/bash/etc/init.d/hpcc_common.in

@@ -676,11 +676,12 @@ status_component() {
     log "${compName} ---> Stopped"
     printf "%-15s is stopped" "$compName"
   elif [[ ${compType} == "thor" ]]; then
-    if [[ -e ${runtime}/${compName}/thorgroup && -e ${runtime}/${compName}/slaves ]]; then
-      __slavenum=$(cat ${runtime}/${compName}/thorgroup | wc -l)
-      __hostnum=$(cat ${runtime}/${compName}/slaves | sort | uniq | wc -l)
-      log "${compName} ---> Running ( pid ${__pidValue} ) with {__slavenum} slaves across {__hostnum} hosts"
-      printf "%-15s ( pid %8s ) is running with %s slaves across %s hosts ..." "${compName}" "${__pidValue}" "${__slavenum}" "${__hostnum}"
+    if [[ -e ${runtime}/${compName}/slaves && -e ${runtime}/${compName}/setvars ]]; then
+      source ${runtime}/${compName}/setvars
+      __slaves=$(cat ${runtime}/${compName}/slaves | wc -l)
+      __slaveprocesses=$((${__slaves} * ${slavespernode}))
+      log "${compName} ---> Running ( pid ${__pidValue} ) with {__slaveprocesses} slave process(es)"
+      printf "%-15s ( pid %8s ) is running with %s slave process(es) ..." "${compName}" "${__pidValue}" "${__slaveprocesses}"
     else
       log "${compName} missing file in ${runtime}/${compName} necessary for status_component"
       printf "${compName} missing file in ${runtime}/${compName} necessary for status_component"

+ 18 - 73
initfiles/bin/init_thor

@@ -45,12 +45,8 @@ fi
 
 instancedir=$(pwd -P)
 source $instancedir/setvars
+slaveportinc=$(($channelsperslave * $localthorportinc))
 
-if [[ ! -z ${THORPRIMARY} ]]; then
-    groupName=${THORPRIMARY}
-else
-    groupName=${THORNAME}
-fi
 ln -s -f $deploydir/thormaster_lcr thormaster_$THORNAME
 
 ENV_DIR=$(cat ${HPCC_CONFIG} | sed -n "/\[DEFAULT\]/,/\[/p" | grep "^configs=" | sed -e 's/^configs=//')
@@ -70,44 +66,21 @@ contains()
     return 1
 }
 
-makethorgroup()
-{
-    if [[ -z "$localthorportinc" ]]; then
-        localthorportinc=200
-    fi
-    rm -f thorgroup
-
-    declare -a ports_used
-    declare -a hosts
-    for slave in $(cat slaves); do
-        p=$(contains "${hosts[@]}" "${slave}")
-        if [[ 0 == ${p} ]]; then
-            echo "${slave}:${THORSLAVEPORT}" >> thorgroup
-            p=$((${#hosts[@]}+1))
-            ports[${p}]=${THORSLAVEPORT}
-            hosts=(${hosts[@]} $slave)
-        else
-            newport=$((${ports[${p}]}+${localthorportinc}))
-            echo "${slave}:${newport}" >> thorgroup
-            ports[${p}]=${newport}
-        fi
-    done
-}
-
 kill_slaves()
 {
     log "Killing slaves"
+    # NB: many of the parameters to init_thorslave not used by 'stop' command
     if [[ "$localthor" = "true" ]]; then
-        $deploydir/init_thorslave stop localhost $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect
+        $deploydir/init_thorslave stop localhost $slavespernode $THORSLAVEPORT $slaveportinc $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect
     else
         # we want to kill only slaves that have already been started in run_thor
-        if [[ -r $instancedir/uslaves.start ]]; then
-            nslaves=$(cat $instancedir/uslaves.start 2> /dev/null | wc -l)
-            $deploydir/frunssh $instancedir/uslaves.start "/bin/sh -c '$deploydir/init_thorslave stop %a $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect'" -i:$SSHidentityfile -u:$SSHusername -pe:$SSHpassword -t:$SSHtimeout -a:$SSHretries -n:$nslaves 2>&1
+        if [[ -r $instancedir/uslaves ]]; then
+            clusternodes=$(cat $instancedir/uslaves 2> /dev/null | wc -l)
+            $deploydir/frunssh $instancedir/slaves "/bin/sh -c '$deploydir/init_thorslave stop localhost $slavespernode $THORSLAVEPORT $slaveportinc $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect'" -i:$SSHidentityfile -u:$SSHusername -pe:$SSHpassword -t:$SSHtimeout -a:$SSHretries -n:$clusternodes 2>&1
         fi
     fi
 
-    rm -f $instancedir/uslaves.start > /dev/null 2>&1
+    rm -f $instancedir/uslaves > /dev/null 2>&1
 }
 
 killed()
@@ -120,8 +93,8 @@ killed()
         log "$component Stopped"
         unlock /var/lock/HPCCSystems/$component/${component}.lock
         kill_slaves
-        log "removing init.pid file and uslaves.start file"
-        rm -f $INIT_PID_NAME $instancedir/uslaves.start > /dev/null 2>&1
+        log "removing init.pid file and slaves file"
+        rm -f $INIT_PID_NAME $instancedir/slaves > /dev/null 2>&1
     fi
     exit 255
 }
@@ -133,40 +106,10 @@ thorpid=0
 
 while [[ 1 ]]; do
     # update slaves file in case state of environment has been altered since last run
-    if [[ "$processperslave" = "true" ]]; then
-        daliadmin server=$DALISERVER dfsgroup ${groupName} slaves
-    else
-        daliadmin server=$DALISERVER clusternodes ${groupName} slaves
-    fi
+    daliadmin server=$DALISERVER clusternodes ${component} slaves
     errcode=$?
     if [[ 0 != ${errcode} ]]; then
-    log "failed to lookup dali group for $groupName"
-        exit 1
-    fi
-    makethorgroup
-    sort $instancedir/slaves | uniq > $instancedir/uslaves.start
-
-    ${INSTALL_DIR}/sbin/configgen -env ${CONFIG_DIR}/${ENV_XML_FILE} -c ${THORNAME} -listall2 | awk 'BEGIN {FS=",";} /ThorSlaveProcess/ { print $3 }' > $instancedir/configgen.out
-    rm -f $instancedir/hosts.resolve > /dev/null 2>&1
-    errcode=0
-    while read line; do
-        resolved_value=$(python -c "import socket; print socket.gethostbyname('${line}')" 2> /dev/null)
-        if [[ $? -eq 0 ]]; then
-            echo $resolved_value >> $instancedir/hosts.resolve
-        else
-            log "ERROR:  Could not resolve the line -${line}- from configgen -listall2 output"
-            exit 1
-        fi
-    done < $instancedir/configgen.out
-
-    sort $instancedir/hosts.resolve | uniq > $instancedir/hosts.uniq
-    diff $instancedir/uslaves.start $instancedir/hosts.uniq > /dev/null 2>&1
-    DFSGROUP_CHECK=$?
-    rm -f $instancedir/hosts.resolve $instancedir/configgen.out $instancedir/hosts.uniq > /dev/null 2>&1
-    if [[ $DFSGROUP_CHECK -ne 0 ]]; then
-        log "ERROR:  Your environment has changed. dfsgroup $groupName does not match environmnent.xml file"
-        log "If this was intentional, please run 'updtdalienv ${CONFIG_DIR}/${ENV_XML_FILE} -f'"
-        log "Stopping $groupName"
+        log "failed to lookup dali group for ${component}"
         exit 1
     fi
 
@@ -174,12 +117,14 @@ while [[ 1 ]]; do
     log "starting thorslaves ..."
 
     # Would be simpler, if there was simple way to test if ip is local and get rid of 'localthor' setting
+    sort $instancedir/slaves | uniq > $instancedir/uslaves
+
     if [[ "$localthor" = "true" ]]; then
-        slaveip=$(head -n 1 $instancedir/uslaves.start)
-        $deploydir/init_thorslave start $slaveip $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect
+        slaveip=$(head -n 1 $instancedir/uslaves)
+        $deploydir/init_thorslave start $slaveip $slavespernode $THORSLAVEPORT $slaveportinc $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect
     else
-        nslaves=$(cat $instancedir/uslaves.start | wc -l)
-        $deploydir/frunssh $instancedir/uslaves.start "/bin/sh -c '$deploydir/init_thorslave start %a $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect'" -i:$SSHidentityfile -u:$SSHusername -pe:$SSHpassword -t:$SSHtimeout -a:$SSHretries -n:$nslaves 2>&1
+        clusternodes=$(cat $instancedir/uslaves | wc -l)
+        $deploydir/frunssh $instancedir/uslaves "/bin/sh -c '$deploydir/init_thorslave start %a $slavespernode $THORSLAVEPORT $slaveportinc $THORMASTER $THORMASTERPORT $LOG_DIR $instancedir $deploydir $THORNAME $PATH_PRE $logredirect'" -i:$SSHidentityfile -u:$SSHusername -pe:$SSHpassword -t:$SSHtimeout -a:$SSHretries -n:$clusternodes 2>&1
         FRUNSSH_RC=$?
         if [[ ${FRUNSSH_RC} -gt 0 ]]; then
             log "Error ${FRUNSSH_RC} in frunssh"
@@ -202,7 +147,7 @@ while [[ 1 ]]; do
         case $errcode in
         # TEC_Clean
         0)  log "Thormaster ($thorpid) Exited cleanly"
-            rm -f $instancedir/uslaves.start $PID_NAME $INIT_PID_NAME > /dev/null 2>&1
+            rm -f $instancedir/slaves $instancedir/uslaves $PID_NAME $INIT_PID_NAME > /dev/null 2>&1
             exit 0
             ;;
         # TEC_CtrlC

+ 38 - 32
initfiles/bin/init_thorslave

@@ -17,14 +17,18 @@
 
 cmd=$1
 ip=$2
-master=$3
-masterport=$4
-logpth=$5
-instancedir=$6
-deploydir=$7
-hpcc_compname=$8
-hpcc_setenv=$9
-export logfile="${logpth}/${hpcc_compname}/${10}"
+slavespernode=$3
+slaveport=$4
+slaveportinc=$5
+master=$6
+masterport=$7
+logpth=$8
+instancedir=$9
+deploydir=${10}
+hpcc_compname=${11}
+hpcc_setenv=${12}
+
+export logfile="${logpth}/${hpcc_compname}/${13}"
 
 source "$hpcc_setenv"
 source "$(dirname $hpcc_setenv)/../etc/init.d/hpcc_common"
@@ -55,7 +59,6 @@ stop_slaves()
 
 start_slaves()
 {
-
     # insuring dafilesrv is running on the machine as it is a prerequisite
     sudo /etc/init.d/dafilesrv status > /dev/null 2>&1
     if [[ $? -ne 0 ]];then
@@ -78,42 +81,45 @@ start_slaves()
     ulimit -Sn hard > /dev/null 2>&1
     [[ $? -ne 0 ]] && log "Failed to set ulimit for number of file descriptors open"
 
-    log "slave(${ip}) init"
+    log "slave init"
     log "slave(s) starting"
 
     # create symlink for easier identification of slaves by compName
     ln -s -f $deploydir/thorslave_lcr ${slavename}
 
-    # sync to current master thorgroup
-    log "rsync -e ssh -o LogLevel=QUIET -o StrictHostKeyChecking=no ${master}:${instancedir}/thorgroup ${instancedir}/thorgroup.slave"
-    rsync -e "ssh -o LogLevel=QUIET -o StrictHostKeyChecking=no" $master:$instancedir/thorgroup $instancedir/thorgroup.slave
+    # sync to current master slaves list
+    log "rsync -e ssh -o LogLevel=QUIET -o StrictHostKeyChecking=no ${master}:${instancedir}/slaves ${instancedir}/slaves.tmp"
+    rsync -e "ssh -o LogLevel=QUIET -o StrictHostKeyChecking=no" $master:$instancedir/slaves $instancedir/slaves.tmp
 
-    let "slavenum = 1";
-    for slave in $(cat $instancedir/thorgroup.slave); do
-        slaveip=${slave/:*/}
+    # NB: Would simply use slavesPerNode to create N slaves, but for backward compatilibty reasons, need to cater for clusters
+    # defined with arbitrary repeated IP's listed in their definitions.    
+    clusternodes=$(cat $instancedir/slaves.tmp | wc -l)
+    clusternode=1
+    for slaveip in $(cat $instancedir/slaves.tmp); do
         if [[ ${slaveip} = ${ip} ]]; then
-            slaveport=${slave/*:/}
-            if [[ "$slaveport" = "" ]]; then
-                slaveport=$THORSLAVEPORT
-            fi
-            log "$slavename  master=$master:$masterport slave=.:$slaveport slavenum=$slavenum logDir=$logpth/$hpcc_compname"
-            ./$slavename master=$master:$masterport slave=.:$slaveport slavenum=$slavenum logDir=$logpth/$hpcc_compname 2>/dev/null 1>/dev/null &
-            slavepid=$!
-            if [[ "$slavepid" -eq "0" ]]; then
-                log "failed to start"
-            else
-                log "slave pid $slavepid started"
-                PID_NAME="$PID/${slavename}_${slavenum}.pid"
-                echo $slavepid > $PID_NAME
-            fi
+            for (( slave=0; slave<${slavespernode}; slave++ )); do
+                slavenum=$(( ${clusternode} + (${slave} * ${clusternodes}) ))
+                log "$slavename master=$master:$masterport slave=.:$slaveport slavenum=$slavenum logDir=$logpth/$hpcc_compname"
+                ./$slavename master=$master:$masterport slave=.:$slaveport slavenum=$slavenum logDir=$logpth/$hpcc_compname 2>/dev/null 1>/dev/null &
+                slavepid=$!
+                if [[ "$slavepid" -eq "0" ]]; then
+                    log "failed to start"
+                else
+                    log "slave pid $slavepid started"
+                    PID_NAME="$PID/${slavename}_${slavenum}.pid"
+                    echo $slavepid > $PID_NAME
+                fi
+                slaveport=$(( ${slaveport} + ${slaveportinc} ))
+            done
         fi
-        let "slavenum += 1";
+        clusternode=$(( $clusternode + 1 ))
     done
+    rm -f $instancedir/slaves.tmp > /dev/null 2>&1
 }
 
 print_usage()
 {
-  log "usage: cmd ip master masterport logdir workingdir deploydir hpcc_compname hpcc_setenv logredirect"
+    log "usage: cmd ip slavespernode slaveport slaveportinc master masterport logdir workingdir deploydir hpcc_compname hpcc_setenv logredirect"
 }
 
 ##  Main

+ 4 - 7
initfiles/componentfiles/configxml/setvars_linux.xsl

@@ -41,9 +41,6 @@ export THORNAME=<xsl:value-of select="@name"/>
 
     <xsl:if test="@nodeGroup">
 export THORPRIMARY=<xsl:value-of select="@nodeGroup"/>
-        <xsl:if test="string(@name) != string(@nodeGroup)">
-export THORSECONDARY=<xsl:value-of select="@name"/>
-        </xsl:if>
     </xsl:if>
 export THORMASTER=<xsl:call-template name="getNetAddress">
                     <xsl:with-param name="computer" select="ThorMasterProcess/@computer"/>
@@ -64,6 +61,10 @@ export slavespernode=<xsl:call-template name="setOrDefault">
                         <xsl:with-param name="attribute" select="@slavesPerNode"/>
                         <xsl:with-param name="default" select="'1'"/>
                      </xsl:call-template>
+export channelsperslave=<xsl:call-template name="setOrDefault">
+                        <xsl:with-param name="attribute" select="@channelsPerSlave"/>
+                        <xsl:with-param name="default" select="'1'"/>
+                     </xsl:call-template>
 export DALISERVER=<xsl:call-template name="getDaliServers">
                     <xsl:with-param name="daliServer" select="@daliServers"/>
                 </xsl:call-template>
@@ -71,10 +72,6 @@ export localthor=<xsl:call-template name="setOrDefault">
                     <xsl:with-param name="attribute" select="@localThor"/>
                     <xsl:with-param name="default" select="'false'"/>
                 </xsl:call-template>
-export processperslave=<xsl:call-template name="setOrDefault">
-                            <xsl:with-param name="attribute" select="@processPerSlave"/>
-                            <xsl:with-param name="default" select="'true'"/>
-                       </xsl:call-template>
 export breakoutlimit=<xsl:call-template name="setOrDefault">
                         <xsl:with-param name="attribute" select="Storage/@breakoutLimit"/>
                         <xsl:with-param name="default" select="'3600'"/>

+ 4 - 4
initfiles/componentfiles/configxml/thor.xsd.in

@@ -392,7 +392,7 @@
       <xs:attribute name="globalMemorySize" type="xs:nonNegativeInteger" use="optional">
         <xs:annotation>
           <xs:appinfo>
-            <tooltip>Memory (in MB) to use for rows. If unset, default = [75% of physical memory] / slavesPerNode</tooltip>
+            <tooltip>Memory (in MB) to use for rows per Thor slave process. If unset, default = [75% of physical memory] / slavesPerNode</tooltip>
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
@@ -472,14 +472,14 @@
       <xs:attribute name="slavesPerNode" type="xs:nonNegativeInteger" default="1">
         <xs:annotation>
           <xs:appinfo>
-            <tooltip>This allows multiple slaves to exist on each node</tooltip>
+            <tooltip>Defines how many slave processes there are on each node</tooltip>
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>
-      <xs:attribute name="processPerSlave" type="xs:boolean" default="true">
+      <xs:attribute name="channelsPerSlave" type="xs:nonNegativeInteger" default="1">
         <xs:annotation>
           <xs:appinfo>
-            <tooltip>Makes each slave run in it's own process</tooltip>
+            <tooltip>Defines how many slave channels per slave process</tooltip>
           </xs:appinfo>
         </xs:annotation>
       </xs:attribute>

+ 1 - 1
initfiles/etc/DIR_NAME/environment.xml.in

@@ -997,6 +997,7 @@
   <ThorCluster autoCopyBackup="false"
                build="_"
                buildSet="thor"
+               channelsPerSlave="1"
                computer="localhost"
                daliServers="mydali"
                description="Thor process"
@@ -1004,7 +1005,6 @@
                monitorDaliFileServer="true"
                name="mythor"
                pluginsPath="${PLUGINS_PATH}"
-               processPerSlave="true"
                replicateAsync="false"
                replicateOutputs="false"
                slavesPerNode="1"

+ 2 - 2
testing/regress/environment.xml.in

@@ -997,6 +997,7 @@
   <ThorCluster autoCopyBackup="false"
                build="_"
                buildSet="thor"
+               channelsPerSlave="1"
                computer="localhost"
                daliServers="mydali"
                description="Thor process"
@@ -1004,7 +1005,6 @@
                monitorDaliFileServer="true"
                name="mythor"
                pluginsPath="${PLUGINS_PATH}"
-               processPerSlave="true"
                replicateAsync="false"
                replicateOutputs="false"
                slavesPerNode="1"
@@ -1024,6 +1024,7 @@
   <ThorCluster autoCopyBackup="false"
                build="_"
                buildSet="thor"
+               channelsPerSlave="1"
                computer="localhost"
                daliServers="mydali"
                description="Thor process"
@@ -1032,7 +1033,6 @@
                monitorDaliFileServer="true"
                name="mythor3way"
                pluginsPath="${PLUGINS_PATH}"
-               processPerSlave="true"
                replicateAsync="false"
                replicateOutputs="false"
                slaveport="21100"

+ 2 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -2563,9 +2563,8 @@ void CMasterGraph::getFinalProgress()
         if (0 == msg.remaining())
             continue;
 
-        bool processPerSlave = globals->getPropBool("@processPerSlave", true);
-        unsigned slavesPerProcess = processPerSlave ? 1 : globals->getPropInt("@slavesPerNode", 1); // JCSMORE - should move somewhere common
-        for (unsigned sc=0; sc<slavesPerProcess; sc++)
+        unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1); // JCSMORE - should move somewhere common
+        for (unsigned sc=0; sc<channelsPerSlave; sc++)
         {
             unsigned slave;
             msg.read(slave);

+ 1 - 7
thorlcr/graph/thgraphslave.cpp

@@ -1108,13 +1108,7 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
 #endif
     querySo.setown(createDllEntry(_querySo, false, NULL));
     tmpHandler.setown(createTempHandler(true));
-    if (globals->getPropBool("@processPerSlave", true))
-        channelMemorySize = globalMemorySize;
-    else
-    {
-        unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
-        channelMemorySize = globalMemorySize/slavesPerNode;
-    }
+    channelMemorySize = globalMemorySize / globals->getPropInt("@channelsPerSlave", 1);
 }
 
 void CJobSlave::addChannel(IMPServer *mpServer)

+ 12 - 29
thorlcr/master/thmastermain.cpp

@@ -73,11 +73,6 @@
 #define SHUTDOWN_IN_PARALLEL 20
 
 
-#define DEFAULT_THORMASTERPORT 20000
-#define DEFAULT_THORSLAVEPORT 20100
-#define DEFAULT_SLAVEPORTINC 200
-
-
 class CRegistryServer : public CSimpleInterface
 {
     unsigned msgDelay, slavesRegistered;
@@ -375,13 +370,13 @@ CRegistryServer *CRegistryServer::registryServer = NULL;
 //
 //////////////////
 
-bool checkClusterRelicateDAFS(IGroup *grp)
+bool checkClusterRelicateDAFS(IGroup &grp)
 {
     // check the dafilesrv is running (and right version) 
     unsigned start = msTick();
     PROGLOG("Checking cluster replicate nodes");
     SocketEndpointArray epa;
-    grp->getSocketEndpoints(epa);
+    grp.getSocketEndpoints(epa);
     ForEachItemIn(i1,epa) {
         epa.element(i1).port = getDaliServixPort();
     }
@@ -505,7 +500,6 @@ int main( int argc, char *argv[]  )
     removeSentinelFile(sentinelFile);
 
     setMachinePortBase(thorEp.port);
-    unsigned slavePort = globals->hasProp("@slaveport")?atoi(globals->queryProp("@slaveport")):THOR_BASESLAVE_PORT;
 
     EnableSEHtoExceptionMapping(); 
 #ifndef __64BIT__
@@ -565,22 +559,19 @@ int main( int argc, char *argv[]  )
             globals->setProp("@name", thorname);
         }
 
-        if (!globals->getProp("@nodeGroup", nodeGroup)) {
+        if (!globals->getProp("@nodeGroup", nodeGroup))
+        {
             nodeGroup.append(thorname);
             globals->setProp("@nodeGroup", thorname);
         }
-        bool processPerSlave = globals->getPropBool("@processPerSlave", true);
-        Owned<IGroup> rawGroup = getClusterGroup(thorname, "ThorCluster", processPerSlave);
         unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
-        if (processPerSlave)
-            setClusterGroup(queryMyNode(), rawGroup);
-        else
+        unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
+        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
+        unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
+        Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
+        setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
+        if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
         {
-            unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
-            unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
-            setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, slaveBasePort, localThorPortInc);
-        }
-        if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(rawGroup)) {
             FLLOG(MCoperatorError, thorJob, "ERROR: Validate failure(s) detected, exiting Thor");
             return globals->getPropBool("@validateDAFSretCode"); // default is no recycle!
         }
@@ -633,9 +624,9 @@ int main( int argc, char *argv[]  )
                     mmemSize = gmemSize; // default to same as slaves
             }
             unsigned perSlaveSize = gmemSize;
-            if (processPerSlave && slavesPerNode>1)
+            if (slavesPerNode>1)
             {
-                PROGLOG("Sharing globalMemorySize(%d MB), between %d slave. %d MB each", perSlaveSize, slavesPerNode, perSlaveSize / slavesPerNode);
+                PROGLOG("Sharing globalMemorySize(%d MB), between %d slave processes. %d MB each", perSlaveSize, slavesPerNode, perSlaveSize / slavesPerNode);
                 perSlaveSize /= slavesPerNode;
             }
             globals->setPropInt("@globalMemorySize", perSlaveSize);
@@ -648,14 +639,6 @@ int main( int argc, char *argv[]  )
             }
             if (0 == mmemSize)
                 mmemSize = gmemSize;
-            if (!processPerSlave)
-            {
-                /* Preserving previous semantics, if globalMemorySize defined, it defined how much per slave.
-                 * So if N virtual slaves, the total memory needs to be globalMemorySize * slavesPerNode
-                 * The slave process will give each slave channel row manager a split of the total
-                 */
-                globals->setPropInt("@globalMemorySize", gmemSize * slavesPerNode);
-            }
         }
         bool gmemAllowHugePages = globals->getPropBool("@heapUseHugePages", false);
         gmemAllowHugePages = globals->getPropBool("@heapMasterUseHugePages", gmemAllowHugePages);

+ 11 - 20
thorlcr/slave/slavmain.cpp

@@ -100,8 +100,7 @@ class CJobListener : public CSimpleInterface
     OwningStringSuperHashTableOf<CJobSlave> jobs;
     CFifoFileCache querySoCache; // used to mirror master cache
     IArrayOf<IMPServer> mpServers;
-    bool processPerSlave;
-    unsigned slavesPerNode;
+    unsigned channelsPerSlave;
 
     class CThreadExceptionCatcher : implements IExceptionHandler
     {
@@ -162,27 +161,19 @@ public:
     CJobListener() : excptHandler(*this)
     {
         stopped = true;
-        processPerSlave = globals->getPropBool("@processPerSlave", true);
-        slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
+        channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
+        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", 200);
         mpServers.append(* getMPServer());
-        if (!processPerSlave)
+        for (unsigned sc=1; sc<channelsPerSlave; sc++)
         {
-            unsigned port = getMachinePortBase();
-            unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", 200);
-            for (unsigned sc=1; sc<slavesPerNode; sc++)
-            {
-                port += localThorPortInc;
-                mpServers.append(*startNewMPServer(port));
-            }
+            unsigned port = getMachinePortBase() + (sc * localThorPortInc);
+            mpServers.append(*startNewMPServer(port));
         }
     }
     ~CJobListener()
     {
-        if (!processPerSlave)
-        {
-            for (unsigned sc=1; sc<slavesPerNode; sc++)
-                mpServers.item(sc).stop();
-        }
+        for (unsigned sc=1; sc<channelsPerSlave; sc++)
+            mpServers.item(sc).stop();
         mpServers.kill();
         stop();
     }
@@ -192,7 +183,7 @@ public:
     }
     virtual void main()
     {
-        if (!processPerSlave)
+        if (channelsPerSlave>1)
         {
             class CVerifyThread : public CInterface, implements IThreaded
             {
@@ -219,7 +210,7 @@ public:
                 }
             };
             CIArrayOf<CInterface> verifyThreads;
-            for (unsigned c=0; c<slavesPerNode; c++)
+            for (unsigned c=0; c<channelsPerSlave; c++)
                 verifyThreads.append(*new CVerifyThread(*this, c));
         }
 
@@ -347,7 +338,7 @@ public:
 
                         Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, soPath.str(), mptag, slaveMsgTag);
                         job->setXGMML(deps);
-                        for (unsigned sc=0; sc<mpServers.ordinality(); sc++)
+                        for (unsigned sc=0; sc<channelsPerSlave; sc++)
                             job->addChannel(&mpServers.item(sc));
                         jobs.replace(*job.getLink());
                         job->startJob();

+ 5 - 16
thorlcr/slave/thslavemain.cpp

@@ -110,28 +110,17 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
             replyError(TE_FailedToRegisterSlave, "Thor master/slave version mismatch");
             return false;
         }
-        Owned<IGroup> group = deserializeIGroup(msg);
+        Owned<IGroup> rawGroup = deserializeIGroup(msg);
         globals->Release();
         globals = createPTree(msg);
         mergeCmdParams(globals); // cmd line
 
-        bool processPerSlave = globals->getPropBool("@processPerSlave", true);
         unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
-        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", 200);
-        unsigned basePort = getMachinePortBase();
-        if (processPerSlave)
-            setClusterGroup(masterNode, group);
-        else
-            setClusterGroup(masterNode, group, slavesPerNode, basePort, localThorPortInc);
+        unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
+        unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
+        unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
+        setClusterGroup(masterNode, rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
 
-        SocketEndpoint myEp = queryMyNode()->endpoint();
-        unsigned _mySlaveNum = group->rank(queryMyNode())+1;
-        assertex(_mySlaveNum == mySlaveNum);
-        if (RANK_NULL == mySlaveNum)
-        {
-            replyError(TE_FailedToRegisterSlave, "Node not part of thorgroup");
-            return false;
-        }
         const char *_masterBuildTag = globals->queryProp("@masterBuildTag");
         const char *masterBuildTag = _masterBuildTag?_masterBuildTag:"no build tag";
         PROGLOG("Master build: %s", masterBuildTag);

+ 46 - 31
thorlcr/thorutil/thormisc.cpp

@@ -777,7 +777,7 @@ StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *query
     return compoundName.append('V').append(version).append('_').append(queryName);
 }
 
-void setClusterGroup(INode *_masterNode, IGroup *_rawGroup, unsigned slavesPerProcess, unsigned portBase, unsigned portInc)
+void setClusterGroup(INode *_masterNode, IGroup *_rawGroup, unsigned slavesPerNode, unsigned channelsPerSlave, unsigned portBase, unsigned portInc)
 {
     ::Release(masterNode);
     ::Release(rawGroup);
@@ -788,46 +788,61 @@ void setClusterGroup(INode *_masterNode, IGroup *_rawGroup, unsigned slavesPerPr
     ::Release(nodeComm);
     masterNode = LINK(_masterNode);
     rawGroup = LINK(_rawGroup);
-    IArrayOf<INode> nodes;
-    if (slavesPerProcess) // if 0, then processPerSlave is enabled and slavesPerProcess/portBase/portInc not provided
+
+    SocketEndpointArray epa;
+    OwnedMalloc<unsigned> hostStartPort, hostNextStartPort;
+    hostStartPort.allocateN(rawGroup->ordinality());
+    hostNextStartPort.allocateN(rawGroup->ordinality());
+    for (unsigned n=0; n<rawGroup->ordinality(); n++)
     {
-        nodes.append(*LINK(masterNode));
-        for (unsigned s=0; s<slavesPerProcess; s++)
+        SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
+        unsigned hostPos = epa.find(ep);
+        if (NotFound == hostPos)
         {
-            for (unsigned n=0; n<rawGroup->ordinality(); n++)
-            {
-                SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
-                ep.port = portBase + (s * portInc);
-                nodes.append(*createINode(ep));
-            }
+            hostPos = epa.ordinality();
+            epa.append(ep);
+            hostStartPort[n] = portBase;
+            hostNextStartPort[hostPos] = portBase + (slavesPerNode * channelsPerSlave * portInc);
         }
-        clusterGroup = createIGroup(nodes.ordinality(), nodes.getArray());
-        // slaveGroup contains endpoints with mp ports of slaves
-        slaveGroup = clusterGroup->remove(0);
-        nodes.kill();
-
-        nodes.append(*LINK(masterNode));
-        unsigned n=0;
-        for (n=0; n<rawGroup->ordinality(); n++)
+        else
         {
-            SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
-            ep.port = portBase;
-            nodes.append(*createINode(ep));
+            hostStartPort[n] = hostNextStartPort[hostPos];
+            hostNextStartPort[hostPos] += (slavesPerNode * channelsPerSlave * portInc);
         }
-        nodeGroup = createIGroup(nodes.ordinality(), nodes.getArray());
-        nodes.kill();
     }
-    else
+    IArrayOf<INode> clusterGroupNodes, nodeGroupNodes;
+    clusterGroupNodes.append(*LINK(masterNode));
+    nodeGroupNodes.append(*LINK(masterNode));
+    for (unsigned p=0; p<slavesPerNode; p++)
     {
-        slaveGroup = LINK(rawGroup);
-        clusterGroup = rawGroup->add(masterNode, 0);
-        nodeGroup = LINK(clusterGroup);
+        for (unsigned s=0; s<channelsPerSlave; s++)
+        {
+            for (unsigned n=0; n<rawGroup->ordinality(); n++)
+            {
+                SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
+                ep.port = hostStartPort[n] + (((p * channelsPerSlave) + s) * portInc);
+                Owned<INode> node = createINode(ep);
+                clusterGroupNodes.append(*node.getLink());
+                if (0 == s)
+                    nodeGroupNodes.append(*node.getLink());
+            }
+        }
     }
-    // dfsGroup will match named group in dfs
+    // clusterGroup contains master + all slaves (including virtuals)
+    clusterGroup = createIGroup(clusterGroupNodes.ordinality(), clusterGroupNodes.getArray());
+
+    // nodeGroup container master + all slave processes (excludes virtual slaves)
+    nodeGroup = createIGroup(nodeGroupNodes.ordinality(), nodeGroupNodes.getArray());
+
+    // slaveGroup contains all slaves (including virtuals) but excludes master
+    slaveGroup = clusterGroup->remove(0);
+
+    // dfsGroup is same as slaveGroup, but stripped of ports. So is a IP group as wide as slaveGroup, used for publishing
+    IArrayOf<INode> slaveGroupNodes;
     Owned<INodeIterator> nodeIter = slaveGroup->getIterator();
     ForEach(*nodeIter)
-        nodes.append(*createINodeIP(nodeIter->query().endpoint(),0));
-    dfsGroup = createIGroup(nodes.ordinality(), nodes.getArray());
+    slaveGroupNodes.append(*createINodeIP(nodeIter->query().endpoint(),0));
+    dfsGroup = createIGroup(slaveGroupNodes.ordinality(), slaveGroupNodes.getArray());
 
     nodeComm = createCommunicator(nodeGroup);
 }

+ 4 - 1
thorlcr/thorutil/thormisc.hpp

@@ -276,6 +276,9 @@ public:
 };
 
 
+#define DEFAULT_THORMASTERPORT 20000
+#define DEFAULT_THORSLAVEPORT 20100
+#define DEFAULT_SLAVEPORTINC 200
 #define DEFAULT_QUERYSO_LIMIT 10
 
 class graph_decl CFifoFileCache : public CSimpleInterface
@@ -428,7 +431,7 @@ extern graph_decl const LogMsgJobInfo thorJob;
 
 extern graph_decl StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *queryName, unsigned version);
 
-extern graph_decl void setClusterGroup(INode *masterNode, IGroup *group, unsigned slavesPerProcess=0, unsigned portBase=0, unsigned portInc=0);
+extern graph_decl void setClusterGroup(INode *masterNode, IGroup *group, unsigned slavesPerNode, unsigned channelsPerSlave, unsigned portBase, unsigned portInc);
 extern graph_decl bool clusterInitialized();
 extern graph_decl INode &queryMasterNode();
 extern graph_decl IGroup &queryRawGroup();