|
@@ -48,7 +48,6 @@
|
|
|
|
|
|
bool shuttingDown = false;
|
|
|
unsigned numChannels;
|
|
|
-unsigned numActiveChannels;
|
|
|
unsigned callbackRetries = 3;
|
|
|
unsigned callbackTimeout = 500;
|
|
|
unsigned lowTimeout = 10000;
|
|
@@ -157,7 +156,6 @@ unsigned minFilesOpen[2] = {2000, 500};
|
|
|
unsigned maxFilesOpen[2] = {4000, 1000};
|
|
|
|
|
|
SocketEndpoint ownEP;
|
|
|
-SocketEndpointArray allRoxieServers;
|
|
|
HardwareInfo hdwInfo;
|
|
|
unsigned parallelAggregate;
|
|
|
bool inMemoryKeysEnabled = true;
|
|
@@ -168,6 +166,8 @@ unsigned nodeCacheMB = 100;
|
|
|
unsigned leafCacheMB = 50;
|
|
|
unsigned blobCacheMB = 0;
|
|
|
|
|
|
+unsigned roxiePort = 0;
|
|
|
+
|
|
|
MODULE_INIT(INIT_PRIORITY_STANDARD)
|
|
|
{
|
|
|
topology = NULL;
|
|
@@ -254,15 +254,15 @@ void closedown()
|
|
|
waiter.onAbort();
|
|
|
}
|
|
|
|
|
|
-void getAccessList(const char *aclName, IPropertyTree *topology, IPropertyTree *serverInfo)
|
|
|
+void getAccessList(const char *aclName, const IPropertyTree *topology, IPropertyTree *aclInfo)
|
|
|
{
|
|
|
StringBuffer xpath;
|
|
|
xpath.append("ACL[@name='").append(aclName).append("']");
|
|
|
- if (serverInfo->queryPropTree(xpath))
|
|
|
+ if (aclInfo->queryPropTree(xpath))
|
|
|
throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - recursive ACL definition of %s", aclName);
|
|
|
Owned<IPropertyTree> X = createPTree("ACL");
|
|
|
X->setProp("@name", aclName);
|
|
|
- serverInfo->addPropTree("ACL", X.getClear());
|
|
|
+ aclInfo->addPropTree("ACL", X.getClear());
|
|
|
|
|
|
Owned<IPropertyTree> acl = topology->getPropTree(xpath.str());
|
|
|
if (!acl)
|
|
@@ -274,32 +274,11 @@ void getAccessList(const char *aclName, IPropertyTree *topology, IPropertyTree *
|
|
|
IPropertyTree &child = access->query();
|
|
|
const char *base = child.queryProp("@base");
|
|
|
if (base)
|
|
|
- getAccessList(base, topology, serverInfo);
|
|
|
+ getAccessList(base, topology, aclInfo);
|
|
|
else
|
|
|
- serverInfo->addPropTree(child.queryName(), LINK(&child));
|
|
|
+ aclInfo->addPropTree(child.queryName(), LINK(&child));
|
|
|
}
|
|
|
- serverInfo->removeProp(xpath);
|
|
|
-}
|
|
|
-
|
|
|
-void addServerChannel(unsigned port, unsigned threads, const char *access, IPropertyTree *topology)
|
|
|
-{
|
|
|
- if (!ownEP.port)
|
|
|
- ownEP.set(port, queryHostIP());
|
|
|
- Owned<IPropertyTreeIterator> servers = ccdChannels->getElements("RoxieServerProcess");
|
|
|
- ForEach(*servers)
|
|
|
- {
|
|
|
- IPropertyTree &f = servers->query();
|
|
|
- if (f.getPropInt("@port", 0) == port)
|
|
|
- throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - Roxie server port repeated");
|
|
|
- }
|
|
|
- IPropertyTree *ci = createPTree("RoxieServerProcess");
|
|
|
- ci->setPropInt("@port", port);
|
|
|
- ci->setPropInt("@numThreads", threads);
|
|
|
- if (access && *access)
|
|
|
- {
|
|
|
- getAccessList(access, topology, ci);
|
|
|
- }
|
|
|
- ccdChannels->addPropTree("RoxieServerProcess", ci);
|
|
|
+ aclInfo->removeProp(xpath);
|
|
|
}
|
|
|
|
|
|
bool ipMatch(IpAddress &ip)
|
|
@@ -307,7 +286,7 @@ bool ipMatch(IpAddress &ip)
|
|
|
return ip.isLocal();
|
|
|
}
|
|
|
|
|
|
-void addSlaveChannel(unsigned channel, unsigned level, bool suspended)
|
|
|
+void addSlaveChannel(unsigned channel, unsigned level)
|
|
|
{
|
|
|
StringBuffer xpath;
|
|
|
xpath.appendf("RoxieSlaveProcess[@channel=\"%d\"]", channel);
|
|
@@ -315,24 +294,23 @@ void addSlaveChannel(unsigned channel, unsigned level, bool suspended)
|
|
|
throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channel %d repeated", channel);
|
|
|
IPropertyTree *ci = createPTree("RoxieSlaveProcess");
|
|
|
ci->setPropInt("@channel", channel);
|
|
|
- ci->setPropBool("@suspended", suspended);
|
|
|
- ci->setPropInt("@subChannel", numSlaves[channel]); // Alternatively could probably use replication level as subchannel ?
|
|
|
- suspendedChannels[channel] = suspended;
|
|
|
- assertex(!replicationLevel[channel]); // implies channel repeated, caught above
|
|
|
- replicationLevel[channel] = level;
|
|
|
+ ci->setPropInt("@subChannel", numSlaves[channel]);
|
|
|
ccdChannels->addPropTree("RoxieSlaveProcess", ci);
|
|
|
}
|
|
|
|
|
|
-void addChannel(unsigned channel, unsigned level, bool isMe, bool suspended, IpAddress& slaveIp)
|
|
|
+void addChannel(unsigned nodeNumber, unsigned channel, unsigned level)
|
|
|
{
|
|
|
numSlaves[channel]++;
|
|
|
- if (isMe && channel > 0 && channel <= numChannels)
|
|
|
+ if (nodeNumber == myNodeIndex && channel > 0)
|
|
|
{
|
|
|
- addSlaveChannel(channel, level, suspended);
|
|
|
+ assertex(channel <= numChannels);
|
|
|
+ assertex(!replicationLevel[channel]);
|
|
|
+ replicationLevel[channel] = level;
|
|
|
+ addSlaveChannel(channel, level);
|
|
|
}
|
|
|
if (!localSlave)
|
|
|
{
|
|
|
- addEndpoint(channel, slaveIp, CCD_MULTICAST_PORT);
|
|
|
+ addEndpoint(channel, getNodeAddress(nodeNumber), CCD_MULTICAST_PORT);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -516,12 +494,12 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
}
|
|
|
topology=createPTreeFromXMLString(
|
|
|
"<RoxieTopology numChannels='1' localSlave='1'>"
|
|
|
- "<RoxieServerProcess netAddress='.'/>"
|
|
|
- "<RoxieSlaveProcess netAddress='.' channel='1' level='0'/>"
|
|
|
+ " <RoxieFarmProcess/>"
|
|
|
+ " <RoxieServerProcess netAddress='.'/>"
|
|
|
"</RoxieTopology>"
|
|
|
);
|
|
|
int port = globals->getPropInt("--port", 9876);
|
|
|
- topology->setPropInt("RoxieServerProcess/@port", port);
|
|
|
+ topology->setPropInt("RoxieFarmProcess/@port", port);
|
|
|
topology->setProp("@daliServers", globals->queryProp("--daliServers"));
|
|
|
topology->setProp("@traceLevel", globals->queryProp("--traceLevel"));
|
|
|
topology->setProp("@memTraceLevel", globals->queryProp("--memTraceLevel"));
|
|
@@ -625,8 +603,6 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
if (!setPreferredSubnet(preferredSubnet, preferredSubnetMask))
|
|
|
throw MakeStringException(ROXIE_INTERNAL_ERROR, "Error setting preferred subnet %s mask %s", preferredSubnet, preferredSubnetMask);
|
|
|
}
|
|
|
- bool multiHostMode = globals->hasProp("--host");
|
|
|
- unsigned myHostNumber = globals->getPropInt("--host", 0);
|
|
|
if (restarts)
|
|
|
{
|
|
|
if (traceLevel)
|
|
@@ -645,7 +621,6 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
|
|
|
headRegionSize = topology->getPropInt("@headRegionSize", 50);
|
|
|
numChannels = topology->getPropInt("@numChannels", 0);
|
|
|
- numActiveChannels = topology->getPropInt("@numActiveChannels", numChannels);
|
|
|
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
|
|
|
roxiemem::memTraceSizeLimit = (memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0);
|
|
|
callbackRetries = topology->getPropInt("@callbackRetries", 3);
|
|
@@ -846,58 +821,62 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
if (!numChannels)
|
|
|
throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels attribute must be specified");
|
|
|
|
|
|
- bool myIPadded = false;
|
|
|
Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");
|
|
|
ForEach(*roxieServers)
|
|
|
{
|
|
|
IPropertyTree &roxieServer = roxieServers->query();
|
|
|
const char *iptext = roxieServer.queryProp("@netAddress");
|
|
|
unsigned nodeIndex = addRoxieNode(iptext);
|
|
|
- unsigned port = roxieServer.getPropInt("@port", ROXIE_SERVER_PORT);
|
|
|
- if (iptext)
|
|
|
+ if (traceLevel > 3)
|
|
|
+ DBGLOG("Roxie server %u is at %s", nodeIndex, iptext);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Generate the slave channels
|
|
|
+ unsigned numDataCopies = topology->getPropInt("@numDataCopies", 1);
|
|
|
+ unsigned cyclicOffset = topology->getPropInt("@cyclicOffset", 0);
|
|
|
+ unsigned numNodes = getNumNodes();
|
|
|
+ if (cyclicOffset)
|
|
|
+ {
|
|
|
+ if (numChannels != numNodes)
|
|
|
+ throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels does not match number of servers");
|
|
|
+ for (int i=0; i<numNodes; i++)
|
|
|
{
|
|
|
- SocketEndpoint ep(iptext, port);
|
|
|
- unsigned roxieServerHost = roxieServer.getPropInt("@multihost", 0);
|
|
|
- if (ipMatch(ep) && ((roxieServerHost == myHostNumber) || (myHostNumber==-1)))
|
|
|
+ int channel = i+1;
|
|
|
+ for (int copy=0; copy<numDataCopies; copy++)
|
|
|
{
|
|
|
- unsigned numThreads = roxieServer.getPropInt("@numThreads", numServerThreads);
|
|
|
- const char *aclName = roxieServer.queryProp("@aclName");
|
|
|
- addServerChannel(port, numThreads, aclName, topology);
|
|
|
- if (!myIPadded || (myHostNumber==-1))
|
|
|
- {
|
|
|
- myNodeIndex = nodeIndex;
|
|
|
- allRoxieServers.append(ep);
|
|
|
- myIPadded = true;
|
|
|
- }
|
|
|
+ channel = channel + cyclicOffset;
|
|
|
+ if (channel > numNodes)
|
|
|
+ channel = channel - numNodes;
|
|
|
+ addChannel(i, channel, copy);
|
|
|
}
|
|
|
- else if (multiHostMode || !roxieServerHost)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (numChannels > numNodes) // overloaded mode
|
|
|
+ {
|
|
|
+ if (numChannels != numNodes * numDataCopies)
|
|
|
+ throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels does not match expected value");
|
|
|
+ for (int i=0; i<numNodes; i++)
|
|
|
+ {
|
|
|
+ int channel = i+1;
|
|
|
+ for (int copy=0; copy<numDataCopies; copy++)
|
|
|
{
|
|
|
- bool found = false;
|
|
|
- ForEachItemIn(idx, allRoxieServers)
|
|
|
- {
|
|
|
- if (multiHostMode)
|
|
|
- {
|
|
|
- if (ep.equals(allRoxieServers.item(idx)))
|
|
|
- {
|
|
|
- found = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- if (ep.ipequals(allRoxieServers.item(idx)))
|
|
|
- {
|
|
|
- found = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (!found)
|
|
|
- allRoxieServers.append(ep);
|
|
|
+ channel = channel + copy*numNodes;
|
|
|
+ addChannel(i, channel, copy);
|
|
|
}
|
|
|
}
|
|
|
- else
|
|
|
- throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - missing netAddress or port specification on RoxieServerProcess element");
|
|
|
+ }
|
|
|
+ else // 'Full redundancy' or 'simple' mode
|
|
|
+ {
|
|
|
+ if (numChannels != numNodes / numDataCopies)
|
|
|
+ throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - numChannels does not match expected value");
|
|
|
+ int channel = 1;
|
|
|
+ for (int i=0; i<numNodes; i++)
|
|
|
+ {
|
|
|
+ addChannel(i, channel, 0);
|
|
|
+ channel++;
|
|
|
+ if (channel > numChannels)
|
|
|
+ channel = 1;
|
|
|
+ }
|
|
|
}
|
|
|
if (!localSlave)
|
|
|
{
|
|
@@ -911,59 +890,10 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
multicastLast.ipset(topology->queryProp("@multicastLast"));
|
|
|
else
|
|
|
throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
|
|
|
+ joinMulticastChannel(0); // all slaves also listen on channel 0
|
|
|
}
|
|
|
}
|
|
|
- Owned<IPropertyTreeIterator> slaves = topology->getElements("./RoxieSlaveProcess");
|
|
|
- IpAddress *primaries = new IpAddress[numChannels+1]; // check each channel has a different primary, if possible. Leaks on fatal errors, but who cares
|
|
|
- ForEach(*slaves)
|
|
|
- {
|
|
|
- IPropertyTree &slave = slaves->query();
|
|
|
- const char *iptext = slave.queryProp("@netAddress");
|
|
|
- if (iptext)
|
|
|
- {
|
|
|
- addRoxieNode(iptext);
|
|
|
- IpAddress slaveIp(iptext);
|
|
|
- bool isMe = ipMatch(slaveIp) && slave.getPropInt("@multihost", 0) == myHostNumber;
|
|
|
- bool suspended = slave.getPropBool("@suspended", false);
|
|
|
- unsigned channel = slave.getPropInt("@channel", 0);
|
|
|
- if (!channel)
|
|
|
- channel = slave.getPropInt("@channels", 0); // legacy support
|
|
|
- unsigned replicationLevel = slave.getPropInt("@level", 0);
|
|
|
- if (channel && channel <= numChannels)
|
|
|
- {
|
|
|
- if (isMe)
|
|
|
- isCCD = true;
|
|
|
- if (!replicationLevel)
|
|
|
- primaries[channel] = slaveIp;
|
|
|
- addChannel(channel, replicationLevel, isMe, suspended, slaveIp);
|
|
|
- if (isMe)
|
|
|
- joinMulticastChannel(channel);
|
|
|
- }
|
|
|
- else
|
|
|
- throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - missing or invalid channel attribute on RoxieSlaveProcess element");
|
|
|
- }
|
|
|
- else
|
|
|
- throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - missing netAddress attribute on RoxieSlaveProcess element");
|
|
|
- }
|
|
|
- if (numActiveChannels)
|
|
|
- joinMulticastChannel(0); // all slaves also listen on channel 0
|
|
|
|
|
|
- for (unsigned n = 1; n < numActiveChannels; n++)
|
|
|
- {
|
|
|
- if (!numSlaves[n])
|
|
|
- throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - no slaves for channel %d", n);
|
|
|
- if (checkPrimaries)
|
|
|
- {
|
|
|
- for (unsigned m = n+1; m <= numChannels; m++)
|
|
|
- if (primaries[n].ipequals(primaries[m]))
|
|
|
- {
|
|
|
- StringBuffer s;
|
|
|
- primaries[n].getIpText(s);
|
|
|
- throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - slave %s is primary for multiple channels", s.str());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- delete [] primaries;
|
|
|
setDaliServixSocketCaching(true); // enable daliservix caching
|
|
|
loadPlugins();
|
|
|
globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());
|
|
@@ -1009,35 +939,44 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- Owned<IPropertyTreeIterator> it = ccdChannels->getElements("RoxieServerProcess");
|
|
|
- ForEach(*it)
|
|
|
+ Owned<IPropertyTreeIterator> roxieFarms = topology->getElements("./RoxieFarmProcess");
|
|
|
+ ForEach(*roxieFarms)
|
|
|
{
|
|
|
- // MORE - there are assumptions that everyone is a server (in deployment)
|
|
|
- IPropertyTree &serverInfo = it->query();
|
|
|
- unsigned port = serverInfo.getPropInt("@port", -1);
|
|
|
- bool suspended = serverInfo.getPropBool("@suspended", false);
|
|
|
- unsigned numThreads = serverInfo.getPropInt("@numThreads", -1);
|
|
|
- unsigned listenQueue = serverInfo.getPropInt("@listenQueue", DEFAULT_LISTEN_QUEUE_SIZE);
|
|
|
+ IPropertyTree &roxieFarm = roxieFarms->query();
|
|
|
+ unsigned listenQueue = roxieFarm.getPropInt("@listenQueue", DEFAULT_LISTEN_QUEUE_SIZE);
|
|
|
+ unsigned numThreads = roxieFarm.getPropInt("@numThreads", numServerThreads);
|
|
|
+ unsigned port = roxieFarm.getPropInt("@port", ROXIE_SERVER_PORT);
|
|
|
+ unsigned requestArrayThreads = roxieFarm.getPropInt("@requestArrayThreads", 5);
|
|
|
+ if (!roxiePort)
|
|
|
+ roxiePort = port;
|
|
|
+ bool suspended = roxieFarm.getPropBool("@suspended", false);
|
|
|
Owned <IRoxieListener> roxieServer;
|
|
|
if (port)
|
|
|
roxieServer.setown(createRoxieSocketListener(port, numThreads, listenQueue, suspended));
|
|
|
else
|
|
|
roxieServer.setown(createRoxieWorkUnitListener(numThreads, suspended));
|
|
|
- Owned<IPropertyTreeIterator> accesses = serverInfo.getElements("Access");
|
|
|
- ForEach(*accesses)
|
|
|
+
|
|
|
+ const char *aclName = roxieFarm.queryProp("@aclName");
|
|
|
+ if (aclName)
|
|
|
{
|
|
|
- IPropertyTree &access = accesses->query();
|
|
|
- try
|
|
|
+ Owned<IPropertyTree> aclInfo = createPTree("AccessInfo");
|
|
|
+ getAccessList(aclName, topology, aclInfo);
|
|
|
+ Owned<IPropertyTreeIterator> accesses = aclInfo->getElements("Access");
|
|
|
+ ForEach(*accesses)
|
|
|
{
|
|
|
- roxieServer->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
|
|
|
- }
|
|
|
- catch (IException *E)
|
|
|
- {
|
|
|
- StringBuffer s, x;
|
|
|
- E->errorMessage(s);
|
|
|
- E->Release();
|
|
|
- toXML(&access, x, 0, 0);
|
|
|
- throw MakeStringException(ROXIE_ACL_ERROR, "Error in access statement %s: %s", x.str(), s.str());
|
|
|
+ IPropertyTree &access = accesses->query();
|
|
|
+ try
|
|
|
+ {
|
|
|
+ roxieServer->addAccess(access.getPropBool("@allow", true), access.getPropBool("@allowBlind", true), access.queryProp("@ip"), access.queryProp("@mask"), access.queryProp("@query"), access.queryProp("@error"), access.getPropInt("@errorCode"));
|
|
|
+ }
|
|
|
+ catch (IException *E)
|
|
|
+ {
|
|
|
+ StringBuffer s, x;
|
|
|
+ E->errorMessage(s);
|
|
|
+ E->Release();
|
|
|
+ toXML(&access, x, 0, 0);
|
|
|
+ throw MakeStringException(ROXIE_ACL_ERROR, "Error in access statement %s: %s", x.str(), s.str());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
socketListeners.append(*roxieServer.getLink());
|
|
@@ -1075,7 +1014,6 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
|
|
|
}
|
|
|
|
|
|
roxieMetrics.clear();
|
|
|
- allRoxieServers.kill();
|
|
|
stopPerformanceMonitor();
|
|
|
::Release(globalPackageSetManager);
|
|
|
globalPackageSetManager = NULL;
|