|
@@ -19,6 +19,8 @@
|
|
|
#include "jarray.hpp"
|
|
|
#include "dadfs.hpp"
|
|
|
#include "exception_util.hpp"
|
|
|
+#include "workunit.hpp"
|
|
|
+#include "roxiecommlibscm.hpp"
|
|
|
|
|
|
#ifndef eqHoleCluster
|
|
|
#define eqHoleCluster "HoleCluster"
|
|
@@ -72,6 +74,8 @@ static const int THREAD_POOL_SIZE = 40;
|
|
|
static const int THREAD_POOL_STACK_SIZE = 64000;
|
|
|
static const char* FEATURE_URL = "MachineInfoAccess";
|
|
|
|
|
|
+const unsigned ROXIECONTROLSTATETIMEOUT = 5000; //5 second
|
|
|
+
|
|
|
class CMachineInfoThreadParam : public CWsMachineThreadParam
|
|
|
{
|
|
|
public:
|
|
@@ -112,6 +116,24 @@ private:
|
|
|
|
|
|
Mutex CMachineInfoThreadParam::s_mutex;
|
|
|
|
|
|
+class CRoxieStateInfoThreadParam : public CWsMachineThreadParam
|
|
|
+{
|
|
|
+public:
|
|
|
+ StringAttr clusterName;
|
|
|
+ IArrayOf<IEspMachineInfoEx>& machineInfoTable; //For response
|
|
|
+
|
|
|
+ CRoxieStateInfoThreadParam(Cws_machineEx* pService, const char* _clusterName, IArrayOf<IEspMachineInfoEx>& _machineInfoTable)
|
|
|
+ : CWsMachineThreadParam(pService), clusterName(_clusterName), machineInfoTable(_machineInfoTable)
|
|
|
+ {
|
|
|
+ }
|
|
|
+
|
|
|
+ virtual void doWork()
|
|
|
+ {
|
|
|
+ m_pService->getRoxieStateInfo(this);
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+
|
|
|
void Cws_machineEx::init(IPropertyTree *cfg, const char *process, const char *service)
|
|
|
{
|
|
|
//Read settings from esp.xml
|
|
@@ -171,10 +193,9 @@ void Cws_machineEx::init(IPropertyTree *cfg, const char *process, const char *se
|
|
|
m_threadPoolStackSize = pServiceNode->getPropInt("ThreadPoolStackSize", THREAD_POOL_STACK_SIZE);
|
|
|
|
|
|
//Start thread pool
|
|
|
- IThreadFactory* pThreadFactory = new CWsMachineThreadFactory();
|
|
|
+ Owned<IThreadFactory> pThreadFactory = new CWsMachineThreadFactory();
|
|
|
m_threadPool.setown(createThreadPool("WsMachine Thread Pool", pThreadFactory,
|
|
|
NULL, m_threadPoolSize, 10000, m_threadPoolStackSize)); //10 sec timeout for available thread; use stack size of 2MB
|
|
|
- pThreadFactory->Release();
|
|
|
|
|
|
setupLegacyFilters();
|
|
|
}
|
|
@@ -318,6 +339,8 @@ void Cws_machineEx::readMachineInfoRequest(IEspContext& context, bool getProcess
|
|
|
}
|
|
|
|
|
|
setProcessRequest(machineInfoData, uniqueProcesses, address1.str(), address2.str(), processType.str(), compName.str(), path.str(), processNumber);
|
|
|
+ if (strieq(processType.str(), eqRoxieServerProcess))
|
|
|
+ machineInfoData.appendRoxieClusters(compName.str());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -656,6 +679,9 @@ void Cws_machineEx::readTargetClusterProcesses(IPropertyTree &processNode, const
|
|
|
pClusterProcess = pEnvironmentRoot->queryPropTree(path.str());
|
|
|
if (!pClusterProcess)
|
|
|
throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Process not set for %s in environment setting.", path.str());
|
|
|
+
|
|
|
+ if (strieq(nodeType, eqRoxieCluster))
|
|
|
+ machineInfoData.appendRoxieClusters(process);
|
|
|
}
|
|
|
|
|
|
IPropertyTree *pInfo = targetClustersOut->addPropTree("Process", createPTree("Process"));
|
|
@@ -857,30 +883,174 @@ bool Cws_machineEx::isLegacyFilter(const char* processType, const char* dependen
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
-//////////////////////////////////////////////////////////////////
|
|
|
-// Get Machine Infomation based on Machine Infomation request //
|
|
|
-//////////////////////////////////////////////////////////////////
|
|
|
+//The stateHashes stores different state hashes in one roxie cluster.
|
|
|
+//It also stores how many roxie nodes have the same state hashes.
|
|
|
+int Cws_machineEx::addRoxieStateHash(const char* hash, CIArrayOf<CStateHash>& stateHashes)
|
|
|
+{
|
|
|
+ if (!hash || !*hash)
|
|
|
+ return -1;
|
|
|
+ ForEachItemIn(i, stateHashes)
|
|
|
+ {
|
|
|
+ CStateHash& stateHash = stateHashes.item(i);
|
|
|
+ //if the 'hash' matches this 'stateHash', the matchHash() increases the count for this 'stateHash' by 1.
|
|
|
+ if (stateHash.matchHash(hash))
|
|
|
+ return i;
|
|
|
+ }
|
|
|
+ stateHashes.append(*new CStateHash(hash));
|
|
|
+ return stateHashes.length() - 1;
|
|
|
+}
|
|
|
|
|
|
-void Cws_machineEx::getMachineInfo(IEspContext& context, CGetMachineInfoData& machineInfoData)
|
|
|
+void Cws_machineEx::updateMajorRoxieStateHash(CIArrayOf<CStateHash>& stateHashes, CIArrayOf<CRoxieStateData>& roxieStates)
|
|
|
{
|
|
|
- UnsignedArray threadHandles;
|
|
|
- CIArrayOf<CMachineData>& machines = machineInfoData.getMachineData();
|
|
|
- ForEachItemIn(idx, machines)
|
|
|
+ if (stateHashes.length() < 2)
|
|
|
+ return;
|
|
|
+
|
|
|
+ //Find out which state hash is for the most of the roxie nodes inside this roxie cluster.
|
|
|
+ unsigned majorHash = 0;
|
|
|
+ unsigned majorHashCount = 0;
|
|
|
+ ForEachItemIn(i, stateHashes)
|
|
|
+ {
|
|
|
+ unsigned hashCount = stateHashes.item(i).getCount();
|
|
|
+ if (majorHashCount >= hashCount)
|
|
|
+ continue;
|
|
|
+ majorHashCount = hashCount;
|
|
|
+ majorHash = i;
|
|
|
+ }
|
|
|
+
|
|
|
+ //Set the MajorHash to false if the roxie node has a different hash.
|
|
|
+ ForEachItemIn(ii, roxieStates)
|
|
|
+ {
|
|
|
+ CRoxieStateData& roxieState = roxieStates.item(ii);
|
|
|
+ if (roxieState.getHashID() != majorHash)
|
|
|
+ roxieState.setMajorHash(false);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void Cws_machineEx::readRoxieStatus(const Owned<IPropertyTree> controlResp, CIArrayOf<CRoxieStateData>& roxieStates)
|
|
|
+{
|
|
|
+ CIArrayOf<CStateHash> stateHashes;
|
|
|
+ Owned<IPropertyTreeIterator> roxieEndpoints = controlResp->getElements("Endpoint");
|
|
|
+ ForEach(*roxieEndpoints)
|
|
|
+ {
|
|
|
+ IPropertyTree& roxieEndpoint = roxieEndpoints->query();
|
|
|
+ const char *ep = roxieEndpoint.queryProp("@ep");
|
|
|
+ if (!ep || !*ep)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ bool ok = false, attached = false, detached = false;
|
|
|
+ const char *status = roxieEndpoint.queryProp("Status");
|
|
|
+ if (status && strieq(status, "ok"))
|
|
|
+ ok = true;
|
|
|
+ const char *stateHash = roxieEndpoint.queryProp("State/@hash");
|
|
|
+ if (roxieEndpoint.hasProp("Dali/@connected"))
|
|
|
+ {
|
|
|
+ if (roxieEndpoint.getPropBool("Dali/@connected"))
|
|
|
+ attached = true;
|
|
|
+ else
|
|
|
+ detached = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ StringArray locations;
|
|
|
+ locations.appendListUniq(ep, ":");
|
|
|
+ Owned<CRoxieStateData> roxieState = new CRoxieStateData(locations.item(0), addRoxieStateHash(stateHash, stateHashes));
|
|
|
+ roxieState->setState(ok, attached, detached, stateHash);
|
|
|
+ roxieStates.append(*roxieState.getClear());
|
|
|
+ }
|
|
|
+ updateMajorRoxieStateHash(stateHashes, roxieStates);
|
|
|
+}
|
|
|
+
|
|
|
+void Cws_machineEx::getRoxieStateInfo(CRoxieStateInfoThreadParam* param)
|
|
|
+{
|
|
|
+ const char* clusterName = param->clusterName.get();
|
|
|
+ if (!clusterName || !*clusterName)
|
|
|
+ throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Roxie cluster not specified.");
|
|
|
+
|
|
|
+ SocketEndpointArray servers;
|
|
|
+ getRoxieProcessServers(clusterName, servers);
|
|
|
+ if (!servers.length())
|
|
|
+ throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Roxie Process server not found.");
|
|
|
+
|
|
|
+ Owned<IRoxieCommunicationClient> roxieClient = createRoxieCommunicationClient(servers.item(0), ROXIECONTROLSTATETIMEOUT);
|
|
|
+ Owned<IPropertyTree> controlResp = roxieClient->sendRoxieControlAllNodes("<control:state/>", true);
|
|
|
+ if (!controlResp)
|
|
|
+ throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get control response from roxie %s.", clusterName);
|
|
|
+
|
|
|
+ CIArrayOf<CRoxieStateData> roxieStates;
|
|
|
+ readRoxieStatus(controlResp, roxieStates);
|
|
|
+
|
|
|
+ ForEachItemIn(i, param->machineInfoTable)
|
|
|
{
|
|
|
- Owned<CMachineInfoThreadParam> pThreadReq = new CMachineInfoThreadParam( this, context, machineInfoData.getOptions(),
|
|
|
- machines.item(idx), machineInfoData.getMachineInfoTable(), machineInfoData.getMachineInfoColumns());
|
|
|
- PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear() );
|
|
|
- threadHandles.append(handle);
|
|
|
+ IEspMachineInfoEx& machineInfo = param->machineInfoTable.item(i);
|
|
|
+ if (!streq(machineInfo.getProcessType(), eqRoxieServerProcess) || !streq(machineInfo.getComponentName(), clusterName))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ //This method is thread safe because each machineInfo (for one roxie node) belongs to only one Roxie cluster.
|
|
|
+ //It is impossible for different threads to update the same machineInfo.
|
|
|
+ bool foundRoxieState = false;
|
|
|
+ ForEachItemIn(ii, roxieStates)
|
|
|
+ {
|
|
|
+ CRoxieStateData& roxieState = roxieStates.item(ii);
|
|
|
+ if (!roxieState.matchIPAddress(machineInfo.getAddress()))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ StringBuffer state, stateDetails;
|
|
|
+ roxieState.reportState(state, stateDetails);
|
|
|
+ machineInfo.setRoxieState(state.str());
|
|
|
+ machineInfo.setRoxieStateDetails(stateDetails.str());
|
|
|
+ foundRoxieState = true;
|
|
|
+ }
|
|
|
+ if (!foundRoxieState)
|
|
|
+ {
|
|
|
+ machineInfo.setRoxieState("??");
|
|
|
+ machineInfo.setRoxieStateDetails("Roxie state not found");
|
|
|
+ }
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- //block for worker theads to finish, if necessary and then collect results
|
|
|
- PooledThreadHandle* pThreadHandle = threadHandles.getArray();
|
|
|
- unsigned i=threadHandles.ordinality();
|
|
|
- while (i--)
|
|
|
+void Cws_machineEx::getMachineInfo(IEspContext& context, bool getRoxieState, CGetMachineInfoData& machineInfoData)
|
|
|
+{
|
|
|
+ UnsignedArray threadHandles;
|
|
|
+ if (!getRoxieState)
|
|
|
{
|
|
|
- m_threadPool->join(*pThreadHandle);
|
|
|
- pThreadHandle++;
|
|
|
+ CIArrayOf<CMachineData>& machines = machineInfoData.getMachineData();
|
|
|
+ ForEachItemIn(idx, machines)
|
|
|
+ {
|
|
|
+ Owned<CMachineInfoThreadParam> pThreadReq = new CMachineInfoThreadParam(this, context, machineInfoData.getOptions(),
|
|
|
+ machines.item(idx), machineInfoData.getMachineInfoTable(), machineInfoData.getMachineInfoColumns());
|
|
|
+ PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
|
|
|
+ threadHandles.append(handle);
|
|
|
+ }
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ StringArray& roxieClusters = machineInfoData.getRoxieClusters();
|
|
|
+ ForEachItemIn(i, roxieClusters)
|
|
|
+ {
|
|
|
+ Owned<CRoxieStateInfoThreadParam> pThreadReq = new CRoxieStateInfoThreadParam(this, roxieClusters.item(i),
|
|
|
+ machineInfoData.getMachineInfoTable());
|
|
|
+ PooledThreadHandle handle = m_threadPool->start( pThreadReq.getClear());
|
|
|
+ threadHandles.append(handle);
|
|
|
+ }
|
|
|
+ machineInfoData.getMachineInfoColumns().append("Roxie State");
|
|
|
+ }
|
|
|
+
|
|
|
+ //Block for worker threads to finish, if necessary and then collect results
|
|
|
+ //Not use joinAll() because multiple threads may call this method. Each call uses the pool to create
|
|
|
+ //its own threads of checking query state. Each call should only join the ones created by that call.
|
|
|
+ ForEachItemIn(i, threadHandles)
|
|
|
+ m_threadPool->join(threadHandles.item(i));
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
+// Get Machine Information based on Machine Information request //
|
|
|
+////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+void Cws_machineEx::getMachineInfo(IEspContext& context, CGetMachineInfoData& machineInfoData)
|
|
|
+{
|
|
|
+ double version = context.getClientVersion();
|
|
|
+ getMachineInfo(context, false, machineInfoData);
|
|
|
+ if ((version >= 1.13) && !machineInfoData.getRoxieClusters().empty())
|
|
|
+ getMachineInfo(context, true, machineInfoData);
|
|
|
}
|
|
|
|
|
|
// the following method is invoked on worker threads of CMachineInfoThreadParam
|