|
@@ -313,7 +313,7 @@ public:
|
|
|
|
|
|
unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
|
|
|
unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
|
|
|
- unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
|
|
|
+ unsigned channelsPerWorker = globals->getPropInt("@channelsPerWorker", 1);
|
|
|
|
|
|
Owned<IGroup> processGroup;
|
|
|
|
|
@@ -323,7 +323,7 @@ public:
|
|
|
else
|
|
|
{
|
|
|
processGroup.setown(createIGroup(connectedSlaves.ordinality(), connectedSlaves.getArray()));
|
|
|
- setupCluster(queryMyNode(), processGroup, channelsPerSlave, slaveBasePort, localThorPortInc);
|
|
|
+ setupCluster(queryMyNode(), processGroup, channelsPerWorker, slaveBasePort, localThorPortInc);
|
|
|
}
|
|
|
|
|
|
PROGLOG("Slaves connected, initializing..");
|
|
@@ -640,7 +640,14 @@ int main( int argc, const char *argv[] )
|
|
|
const char *thorname = NULL;
|
|
|
StringBuffer nodeGroup, logUrl;
|
|
|
unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
|
|
|
- unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
|
|
|
+ unsigned channelsPerWorker;
|
|
|
+ if (globals->hasProp("@channelsPerWorker"))
|
|
|
+ channelsPerWorker = globals->getPropInt("@channelsPerWorker", 1);
|
|
|
+ else
|
|
|
+ { // for backward compatiblity only
|
|
|
+ channelsPerWorker = globals->getPropInt("@channelsPerSlave", 1);
|
|
|
+ globals->setPropInt("@channelsPerWorker", channelsPerWorker);
|
|
|
+ }
|
|
|
|
|
|
installDefaultFileHooks(globals);
|
|
|
ILogMsgHandler *logHandler;
|
|
@@ -892,7 +899,7 @@ int main( int argc, const char *argv[] )
|
|
|
masterSlaveMpTag = allocateClusterMPTag();
|
|
|
kjServiceMpTag = allocateClusterMPTag();
|
|
|
|
|
|
- unsigned numSlaves = 0;
|
|
|
+ unsigned numWorkers = 0;
|
|
|
StringBuffer cloudJobName;
|
|
|
const char *workunit = nullptr;
|
|
|
const char *graphName = nullptr;
|
|
@@ -904,21 +911,21 @@ int main( int argc, const char *argv[] )
|
|
|
if (isEmptyString(graphName))
|
|
|
throw makeStringException(0, "missing --graphName");
|
|
|
|
|
|
- if (!globals->hasProp("@numSlaves"))
|
|
|
- throw makeStringException(0, "Default number of slaves not defined (numSlaves)");
|
|
|
+ if (!globals->hasProp("@numWorkers"))
|
|
|
+ throw makeStringException(0, "Default number of workers not defined (numWorkers)");
|
|
|
else
|
|
|
{
|
|
|
- // check 'numSlaves' workunit option.
|
|
|
+ // check 'numWorkers' workunit option.
|
|
|
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
|
|
|
Owned<IConstWorkUnit> wuRead = factory->openWorkUnit(workunit);
|
|
|
if (!wuRead)
|
|
|
throw makeStringExceptionV(0, "Cannot open workunit: %s", workunit);
|
|
|
- if (wuRead->hasDebugValue("numSlaves"))
|
|
|
- numSlaves = wuRead->getDebugValueInt("numSlaves", 0);
|
|
|
+ if (wuRead->hasDebugValue("numWorkers"))
|
|
|
+ numWorkers = wuRead->getDebugValueInt("numWorkers", 0);
|
|
|
else
|
|
|
- numSlaves = globals->getPropInt("@numSlaves", 0);
|
|
|
- if (0 == numSlaves)
|
|
|
- throw makeStringException(0, "Number of slaves must be > 0 (numSlaves)");
|
|
|
+ numWorkers = globals->getPropInt("@numWorkers", 0);
|
|
|
+ if (0 == numWorkers)
|
|
|
+ throw makeStringException(0, "Number of workers must be > 0 (numWorkers)");
|
|
|
}
|
|
|
|
|
|
cloudJobName.appendf("%s-%s", workunit, graphName);
|
|
@@ -926,16 +933,16 @@ int main( int argc, const char *argv[] )
|
|
|
StringBuffer myEp;
|
|
|
queryMyNode()->endpoint().getUrlStr(myEp);
|
|
|
|
|
|
- applyK8sYaml("thorslave", workunit, cloudJobName, "jobspec", { { "graphName", graphName}, { "master", myEp.str() }, { "%numSlaves", std::to_string(numSlaves)} }, false);
|
|
|
+ applyK8sYaml("thorworker", workunit, cloudJobName, "jobspec", { { "graphName", graphName}, { "master", myEp.str() }, { "%numWorkers", std::to_string(numWorkers)} }, false);
|
|
|
#else
|
|
|
unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
|
|
|
unsigned slaveBasePort = globals->getPropInt("@slaveport", DEFAULT_THORSLAVEPORT);
|
|
|
Owned<IGroup> rawGroup = getClusterNodeGroup(thorname, "ThorCluster");
|
|
|
- setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerSlave, slaveBasePort, localThorPortInc);
|
|
|
- numSlaves = queryNodeClusterWidth();
|
|
|
+ setClusterGroup(queryMyNode(), rawGroup, slavesPerNode, channelsPerWorker, slaveBasePort, localThorPortInc);
|
|
|
+ numWorkers = queryNodeClusterWidth();
|
|
|
#endif
|
|
|
|
|
|
- if (registry->connect(numSlaves))
|
|
|
+ if (registry->connect(numWorkers))
|
|
|
{
|
|
|
if (globals->getPropBool("@replicateOutputs")&&globals->getPropBool("@validateDAFS",true)&&!checkClusterRelicateDAFS(queryNodeGroup()))
|
|
|
{
|
|
@@ -947,7 +954,7 @@ int main( int argc, const char *argv[] )
|
|
|
for (unsigned s=0; s<totSlaveProcs; s++)
|
|
|
{
|
|
|
StringBuffer slaveStr;
|
|
|
- for (unsigned c=0; c<channelsPerSlave; c++)
|
|
|
+ for (unsigned c=0; c<channelsPerWorker; c++)
|
|
|
{
|
|
|
unsigned o = s + (c * totSlaveProcs);
|
|
|
if (c)
|
|
@@ -955,7 +962,7 @@ int main( int argc, const char *argv[] )
|
|
|
slaveStr.append(o+1);
|
|
|
}
|
|
|
StringBuffer virtStr;
|
|
|
- if (channelsPerSlave>1)
|
|
|
+ if (channelsPerWorker>1)
|
|
|
virtStr.append("virtual slaves:");
|
|
|
else
|
|
|
virtStr.append("slave:");
|
|
@@ -1013,7 +1020,7 @@ int main( int argc, const char *argv[] )
|
|
|
#ifdef _CONTAINERIZED
|
|
|
registry.clear();
|
|
|
if (globals->getPropBool("@deleteJobs", true))
|
|
|
- deleteK8sResource("thorslave", cloudJobName, "job");
|
|
|
+ deleteK8sResource("thorworker", cloudJobName, "job");
|
|
|
setExitCode(0);
|
|
|
#endif
|
|
|
LOG(MCdebugProgress, thorJob, "ThorMaster terminated OK");
|