123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #pragma warning (disable : 4786)
- #include "build-config.h"
- #ifdef _USE_OPENLDAP
- #include "ldapsecurity.ipp"
- #endif
- #include "ws_smcService.hpp"
- #include "wshelpers.hpp"
- #include "dasds.hpp"
- #include "daqueue.hpp"
- #include "WUWrapper.hpp"
- #include "dfuwu.hpp"
- #include "exception_util.hpp"
- #include "roxiecontrol.hpp"
- #include "workunit.hpp"
- #define STATUS_SERVER_THOR "ThorMaster"
- #define STATUS_SERVER_HTHOR "HThorServer"
- #define STATUS_SERVER_ROXIE "RoxieServer"
- #define STATUS_SERVER_DFUSERVER "DFUserver"
- #define STATUS_SERVER_ECLSERVER "ECLserver"
- #define STATUS_SERVER_ECLCCSERVER "ECLCCserver"
- #define STATUS_SERVER_ECLAGENT "ECLagent"
- #define CLUSTER_TYPE_THOR "Thor"
- #define CLUSTER_TYPE_HTHOR "HThor"
- #define CLUSTER_TYPE_ROXIE "Roxie"
- static const char* FEATURE_URL = "SmcAccess";
- const char* THORQUEUE_FEATURE = "ThorQueueAccess";
- static const char* ROXIE_CONTROL_URL = "RoxieControlAccess";
- static const char* OWN_WU_ACCESS = "OwnWorkunitsAccess";
- static const char* OTHERS_WU_ACCESS = "OthersWorkunitsAccess";
- static const char* SMC_ACCESS_DENIED = "Access Denied";
- static const char* QUEUE_ACCESS_DENIED = "Failed to access the queue functions. Permission denied.";
- const char* PERMISSIONS_FILENAME = "espsmc_permissions.xml";
- void AccessSuccess(IEspContext& context, char const * msg,...) __attribute__((format(printf, 2, 3)));
- void AccessSuccess(IEspContext& context, char const * msg,...)
- {
- StringBuffer buf;
- buf.appendf("User %s: ",context.queryUserId());
- va_list args;
- va_start(args, msg);
- buf.valist_appendf(msg, args);
- va_end(args);
- AUDIT(AUDIT_TYPE_ACCESS_SUCCESS,buf.str());
- }
- void AccessFailure(IEspContext& context, char const * msg,...) __attribute__((format(printf, 2, 3)));
- void AccessFailure(IEspContext& context, char const * msg,...)
- {
- StringBuffer buf;
- buf.appendf("User %s: ",context.queryUserId());
- va_list args;
- va_start(args, msg);
- buf.valist_appendf(msg, args);
- va_end(args);
- AUDIT(AUDIT_TYPE_ACCESS_FAILURE,buf.str());
- }
- struct QueueLock
- {
- QueueLock(IJobQueue* q): queue(q) { queue->lock(); }
- ~QueueLock()
- {
- queue->unlock();
- }
- Linked<IJobQueue> queue;
- };
- static int sortTargetClustersByNameDescending(IInterface * const *L, IInterface * const *R)
- {
- IEspTargetCluster *left = (IEspTargetCluster *) *L;
- IEspTargetCluster *right = (IEspTargetCluster *) *R;
- return strcmp(right->getClusterName(), left->getClusterName());
- }
- static int sortTargetClustersByNameAscending(IInterface * const *L, IInterface * const *R)
- {
- IEspTargetCluster *left = (IEspTargetCluster *) *L;
- IEspTargetCluster *right = (IEspTargetCluster *) *R;
- return strcmp(left->getClusterName(), right->getClusterName());
- }
- static int sortTargetClustersBySizeDescending(IInterface * const *L, IInterface * const *R)
- {
- IEspTargetCluster *left = (IEspTargetCluster *) *L;
- IEspTargetCluster *right = (IEspTargetCluster *) *R;
- return right->getClusterSize() - left->getClusterSize();
- }
- static int sortTargetClustersBySizeAscending(IInterface * const *L, IInterface * const *R)
- {
- IEspTargetCluster *left = (IEspTargetCluster *) *L;
- IEspTargetCluster *right = (IEspTargetCluster *) *R;
- return left->getClusterSize() - right->getClusterSize();
- }
- void CWsSMCEx::init(IPropertyTree *cfg, const char *process, const char *service)
- {
- if (!daliClientActive())
- {
- OERRLOG("No Dali Connection Active.");
- throw MakeStringException(-1, "No Dali Connection Active. Please Specify a Dali to connect to in you configuration file");
- }
- m_BannerAction = 0;
- m_EnableChatURL = false;
- m_BannerSize = "4";
- m_BannerColor = "red";
- m_BannerScroll = "2";
- StringBuffer xpath;
- xpath.appendf("Software/EspProcess[@name='%s']/@portalurl", process);
- const char* portalURL = cfg->queryProp(xpath.str());
- if (portalURL && *portalURL)
- m_PortalURL.append(portalURL);
- xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ActivityInfoCacheSeconds", process, service);
- unsigned activityInfoCacheSeconds = cfg->getPropInt(xpath.str(), DEFAULTACTIVITYINFOCACHEFORCEBUILDSECOND);
- xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/LogDaliConnection", process, service);
- if (cfg->getPropBool(xpath.str()))
- querySDS().setConfigOpt("Client/@LogConnection", "true");
- xpath.setf("Software/EspProcess[@name=\"%s\"]/EspService[@name=\"%s\"]/ActivityInfoCacheAutoRebuildSeconds", process, service);
- unsigned activityInfoCacheAutoRebuildSeconds = cfg->getPropInt(xpath.str(), DEFAULTACTIVITYINFOCACHEAUTOREBUILDSECOND);
- activityInfoReader.setown(new CActivityInfoReader(activityInfoCacheAutoRebuildSeconds, activityInfoCacheSeconds));
- }
- struct CActiveWorkunitWrapper: public CActiveWorkunit
- {
- CActiveWorkunitWrapper(IEspContext &context, const char* wuid,const char* location = NULL,unsigned index=0): CActiveWorkunit("","")
- {
- CWUWrapper wu(wuid, context);
- setActiveWorkunit(wu, wuid, location, index, context.getClientVersion(), false);
- }
- CActiveWorkunitWrapper(const char* wuid, const char* location = NULL, unsigned index=0): CActiveWorkunit("","")
- {
- CWUWrapper wu(wuid);
- setActiveWorkunit(wu, wuid, location, index, 0.0, true);
- }
- CActiveWorkunitWrapper(const char* wuid,const char* owner, const char* jobname, const char* state, const char* priority): CActiveWorkunit("","")
- {
- setWuid(wuid);
- setState(state);
- setOwner(owner);
- setJobname(jobname);
- setPriority(priority);
- }
- void setActiveWorkunit(CWUWrapper& wu, const char* wuid, const char* location, unsigned index, double version, bool notCheckVersion)
- {
- SCMStringBuffer stateEx;
- setWuid(wuid);
- const char *state = wu->queryStateDesc();
- setStateID(wu->getState());
- if (wu->getState() == WUStateBlocked)
- {
- wu->getStateEx(stateEx);
- if (notCheckVersion || (version > 1.00))
- setExtra(stateEx.str());
- }
- buildAndSetState(state, stateEx.str(), location, index);
- if ((notCheckVersion || (version > 1.09)) && (wu->getState() == WUStateFailed))
- setWarning("The job will ultimately not complete. Please check ECLAgent.");
- setOwner(wu->queryUser());
- setJobname(wu->queryJobName());
- setPriorityStr(wu->getPriority());
- if ((notCheckVersion || (version > 1.08)) && wu->isPausing())
- {
- setIsPausing(true);
- }
- if (notCheckVersion || (version > 1.14))
- {
- setClusterName(wu->queryClusterName());
- }
- }
- void buildAndSetState(const char* state, const char* stateEx, const char* location, unsigned index)
- {
- if (!state || !*state)
- return;
- StringBuffer stateStr;
- if(index && location && *location)
- stateStr.setf("queued(%d) [%s on %s]", index, state, location);
- else if(index)
- stateStr.setf("queued(%d) [%s]", index, state);
- else if(location && *location)
- stateStr.setf("%s [%s]", state, location);
- else
- stateStr.set(state);
- if (stateEx && *stateEx)
- stateStr.appendf(" %s", stateEx);
- setState(stateStr.str());
- }
- void setPriorityStr(unsigned priorityType)
- {
- switch(priorityType)
- {
- case PriorityClassHigh: setPriority("high"); break;
- default:
- case PriorityClassNormal: setPriority("normal"); break;
- case PriorityClassLow: setPriority("low"); break;
- }
- return;
- }
- };
- bool CActivityInfo::isCachedActivityInfoValid(unsigned timeOutSeconds)
- {
- CDateTime timeNow;
- timeNow.setNow();
- return timeNow.getSimple() <= timeCached.getSimple() + timeOutSeconds;;
- }
- void CActivityInfo::createActivityInfo(IEspContext& context)
- {
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> env = factory->openEnvironment();
- CConstWUClusterInfoArray clusters;
- Owned<IPropertyTree> envRoot= &env->getPTree();
- getEnvironmentClusterInfo(envRoot, clusters);
- try
- {
- jobQueueSnapshot.setown(createJQSnapshot());
- }
- catch(IException* e)
- {
- EXCLOG(e, "CActivityInfo::createActivityInfo");
- e->Release();
- }
- Owned<IRemoteConnection> connStatusServers = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
- if (!connStatusServers)
- throw MakeStringException(ECLWATCH_CANNOT_GET_STATUS_INFO, "Failed to get status server information.");
- IPropertyTree* serverStatusRoot = connStatusServers->queryRoot();
- readTargetClusterInfo(clusters, serverStatusRoot);
- readActiveWUsAndQueuedWUs(context, envRoot, serverStatusRoot);
- timeCached.setNow();
- }
- void CActivityInfo::readTargetClusterInfo(CConstWUClusterInfoArray& clusters, IPropertyTree* serverStatusRoot)
- {
- ForEachItemIn(c, clusters)
- {
- IConstWUClusterInfo &cluster = clusters.item(c);
- Owned<CWsSMCTargetCluster> targetCluster = new CWsSMCTargetCluster();
- readTargetClusterInfo(cluster, serverStatusRoot, targetCluster);
- if (cluster.getPlatform() == ThorLCRCluster)
- thorTargetClusters.append(*targetCluster.getClear());
- else if (cluster.getPlatform() == RoxieCluster)
- roxieTargetClusters.append(*targetCluster.getClear());
- else
- hthorTargetClusters.append(*targetCluster.getClear());
- }
- }
- void CActivityInfo::readTargetClusterInfo(IConstWUClusterInfo& cluster, IPropertyTree* serverStatusRoot, CWsSMCTargetCluster* targetCluster)
- {
- SCMStringBuffer clusterName;
- cluster.getName(clusterName);
- targetCluster->clusterName.set(clusterName.str());
- targetCluster->clusterType = cluster.getPlatform();
- targetCluster->clusterSize = cluster.getSize();
- cluster.getServerQueue(targetCluster->serverQueue.queueName);
- cluster.getAgentQueue(targetCluster->agentQueue.queueName);
- StringBuffer statusServerName;
- CWsSMCQueue* smcQueue = NULL;
- if (targetCluster->clusterType == ThorLCRCluster)
- {
- statusServerName.set(getStatusServerTypeName(WsSMCSSTThorLCRCluster));
- smcQueue = &targetCluster->clusterQueue;
- cluster.getThorQueue(smcQueue->queueName);
- }
- else if (targetCluster->clusterType == RoxieCluster)
- {
- statusServerName.set(getStatusServerTypeName(WsSMCSSTRoxieCluster));
- smcQueue = &targetCluster->agentQueue;
- }
- else
- {
- statusServerName.set(getStatusServerTypeName(WsSMCSSTHThorCluster));
- smcQueue = &targetCluster->agentQueue;
- }
- targetCluster->statusServerName.set(statusServerName.str());
- targetCluster->queueName.set(smcQueue->queueName.str());
- bool validQueue = readJobQueue(smcQueue->queueName.str(), targetCluster->queuedWUIDs, smcQueue->queueState, smcQueue->queueStateDetails);
- if (!validQueue)
- smcQueue->notFoundInJobQueues = true;
- if (validQueue && smcQueue->queueState.length())
- targetCluster->queueStatus.set(smcQueue->queueState.str());
- if (serverStatusRoot)
- {
- smcQueue->foundQueueInStatusServer = findQueueInStatusServer(serverStatusRoot, statusServerName.str(), targetCluster->queueName.get());
- if (!smcQueue->foundQueueInStatusServer)
- targetCluster->clusterStatusDetails.appendf("Cluster %s not listening for workunits; ", clusterName.str());
- }
- targetCluster->serverQueue.notFoundInJobQueues = !readJobQueue(targetCluster->serverQueue.queueName.str(), targetCluster->wuidsOnServerQueue, targetCluster->serverQueue.queueState, targetCluster->serverQueue.queueStateDetails);
- }
- bool CActivityInfo::readJobQueue(const char* queueName, StringArray& wuids, StringBuffer& state, StringBuffer& stateDetails)
- {
- if (!queueName || !*queueName)
- {
- state.set("Unknown");
- stateDetails.set("Empty queue name");
- return false;
- }
- if (!jobQueueSnapshot)
- {
- state.set("Unknown");
- stateDetails.set("jobQueueSnapshot not found");
- IWARNLOG("CActivityInfo::readJobQueue: jobQueueSnapshot not found.");
- return false;
- }
- Owned<IJobQueueConst> jobQueue;
- try
- {
- jobQueue.setown(jobQueueSnapshot->getJobQueue(queueName));
- if (!jobQueue)
- {
- IWARNLOG("CActivityInfo::readJobQueue: failed to get info for job queue %s", queueName);
- return false;
- }
- }
- catch(IException* e)
- {
- state.set("Unknown");
- e->errorMessage(stateDetails);
- e->Release();
- return false;
- }
- CJobQueueContents queuedJobs;
- jobQueue->copyItemsAndState(queuedJobs, state, stateDetails);
- Owned<IJobQueueIterator> iter = queuedJobs.getIterator();
- ForEach(*iter)
- {
- const char* wuid = iter->query().queryWUID();
- if (wuid && *wuid)
- wuids.append(wuid);
- }
- return true;
- }
- const char *CActivityInfo::getStatusServerTypeName(WsSMCStatusServerType type)
- {
- return (type < WsSMCSSTterm) ? WsSMCStatusServerTypeNames[type] : NULL;
- }
- bool CActivityInfo::findQueueInStatusServer(IPropertyTree* serverStatusRoot, const char* serverName, const char* queueName)
- {
- VStringBuffer path("Server[@name=\"%s\"]", serverName);
- Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements(path.str()));
- ForEach(*it)
- {
- IPropertyTree& serverStatusNode = it->query();
- const char* queue = serverStatusNode.queryProp("@queue");
- if (!queue || !*queue)
- continue;
- StringArray qlist;
- qlist.appendListUniq(queue, ",");
- ForEachItemIn(q, qlist)
- {
- if (strieq(qlist.item(q), queueName))
- return true;
- }
- }
- return false;
- }
- void CActivityInfo::readActiveWUsAndQueuedWUs(IEspContext& context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
- {
- readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTThorLCRCluster);
- readWUsInTargetClusterJobQueues(context, thorTargetClusters);
- readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTRoxieCluster);
- readWUsInTargetClusterJobQueues(context, roxieTargetClusters);
- readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTHThorCluster);
- readWUsInTargetClusterJobQueues(context, hthorTargetClusters);
- readRunningWUsOnStatusServer(context, serverStatusRoot, WsSMCSSTECLagent);
- readRunningWUsAndJobQueueforOtherStatusServers(context, serverStatusRoot);
- //TODO: add queued WUs for ECLCCServer/ECLServer here. Right now, they are under target clusters.
- getDFUServersAndWUs(context, envRoot, serverStatusRoot);
- getDFURecoveryJobs();
- }
- void CActivityInfo::readRunningWUsOnStatusServer(IEspContext& context, IPropertyTree* serverStatusRoot, WsSMCStatusServerType statusServerType)
- {
- const char* serverName = getStatusServerTypeName(statusServerType);
- if (!serverName || !*serverName)
- return;
- bool isECLAgent = (statusServerType == WsSMCSSTECLagent);
- VStringBuffer path("Server[@name=\"%s\"]", serverName);
- Owned<IPropertyTreeIterator> itrStatusServer(serverStatusRoot->getElements(path.str()));
- ForEach(*itrStatusServer)
- {
- IPropertyTree& serverStatusNode = itrStatusServer->query();
- StringBuffer serverInstance;
- if (statusServerType == WsSMCSSTThorLCRCluster)
- serverStatusNode.getProp("@thorname", serverInstance);
- else if (statusServerType == WsSMCSSTRoxieCluster)
- serverStatusNode.getProp("@cluster", serverInstance);
- else
- serverInstance.appendf("%s on %s", serverName, serverStatusNode.queryProp("@node"));
- Owned<IPropertyTreeIterator> wuids(serverStatusNode.getElements("WorkUnit"));
- ForEach(*wuids)
- {
- const char* wuid=wuids->query().queryProp(NULL);
- if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
- continue;
- CWsSMCTargetCluster* targetCluster;
- if (statusServerType == WsSMCSSTRoxieCluster)
- targetCluster = findWUClusterInfo(wuid, isECLAgent, roxieTargetClusters, thorTargetClusters, hthorTargetClusters);
- else if (statusServerType == WsSMCSSTHThorCluster)
- targetCluster = findWUClusterInfo(wuid, isECLAgent, hthorTargetClusters, thorTargetClusters, roxieTargetClusters);
- else
- targetCluster = findWUClusterInfo(wuid, isECLAgent, thorTargetClusters, roxieTargetClusters, hthorTargetClusters);
- if (!targetCluster)
- continue;
- const char* targetClusterName = targetCluster->clusterName.get();
- CWsSMCQueue* jobQueue;
- if (statusServerType == WsSMCSSTThorLCRCluster)
- jobQueue = &targetCluster->clusterQueue;
- else
- jobQueue = &targetCluster->agentQueue;
- Owned<IEspActiveWorkunit> wu;
- if (!isECLAgent)
- {
- const char *cluster = serverStatusNode.queryProp("Cluster");
- StringBuffer queueName;
- if (cluster) // backward compat check.
- getClusterThorQueueName(queueName, cluster);
- else
- queueName.append(targetCluster->queueName.get());
- createActiveWorkUnit(context, wu, wuid, !strieq(targetClusterName, serverInstance.str()) ? serverInstance.str() : NULL, 0, serverName,
- queueName, serverInstance.str(), targetClusterName, false);
- if (wu->getStateID() == WUStateRunning) //'aborting' may be another possible status
- {
- int sgDuration = serverStatusNode.getPropInt("@sg_duration", -1);
- int subgraph = serverStatusNode.getPropInt("@subgraph", -1);
- if (subgraph > -1 && sgDuration > -1)
- {
- const char* graph = serverStatusNode.queryProp("@graph");
- VStringBuffer durationStr("%d min", sgDuration);
- VStringBuffer subgraphStr("%d", subgraph);
- wu->setGraphName(graph);
- wu->setDuration(durationStr.str());
- wu->setGID(subgraphStr.str());
- }
- if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
- wu->setMemoryBlocked(1);
- }
- }
- else
- {
- createActiveWorkUnit(context, wu, wuid, serverInstance.str(), 0, serverName, serverName, serverInstance.str(), targetClusterName, false);
- if (targetCluster->clusterType == ThorLCRCluster)
- wu->setClusterType(CLUSTER_TYPE_THOR);
- else if (targetCluster->clusterType == RoxieCluster)
- wu->setClusterType(CLUSTER_TYPE_ROXIE);
- else
- wu->setClusterType(CLUSTER_TYPE_HTHOR);
- wu->setClusterQueueName(targetCluster->queueName.get());
- if (wu->getStateID() != WUStateRunning)
- {
- const char *extra = wu->getExtra();
- if (wu->getStateID() != WUStateBlocked || !extra || !*extra) // Blocked on persist treated as running here
- {
- aws.append(*wu.getClear());
- jobQueue->countQueuedJobs++;
- continue;
- }
- }
- //Should this be set only if wu->getStateID() == WUStateRunning?
- if (serverStatusNode.getPropInt("@memoryBlocked ", 0) != 0)
- wu->setMemoryBlocked(1);
- }
- aws.append(*wu.getClear());
- jobQueue->countRunningJobs++;
- }
- }
- }
- bool CActivityInfo::checkSetUniqueECLWUID(const char* wuid)
- {
- bool* idFound = uniqueECLWUIDs.getValue(wuid);
- if (!idFound || !*idFound)
- uniqueECLWUIDs.setValue(wuid, true);
- return idFound && *idFound;
- }
- CWsSMCTargetCluster* CActivityInfo::findWUClusterInfo(const char* wuid, bool isOnECLAgent, CIArrayOf<CWsSMCTargetCluster>& targetClusters,
- CIArrayOf<CWsSMCTargetCluster>& targetClusters1, CIArrayOf<CWsSMCTargetCluster>& targetClusters2)
- {
- StringAttr clusterName;
- try
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
- if (!cw)
- return NULL;
- clusterName.set(cw->queryClusterName());
- if (!clusterName.length())
- return NULL;
- }
- catch (IException *e)
- {//Exception may be thrown when the openWorkUnit() is called inside the CWUWrapper
- StringBuffer msg;
- IWARNLOG("Failed to open workunit %s: %s", wuid, e->errorMessage(msg).str());
- e->Release();
- return NULL;
- }
- const char* cluster = clusterName.str();
- CWsSMCTargetCluster* targetCluster = findTargetCluster(cluster, targetClusters);
- if (targetCluster || !isOnECLAgent)
- return targetCluster;
- targetCluster = findTargetCluster(cluster, targetClusters1);
- if (targetCluster)
- return targetCluster;
- return findTargetCluster(cluster, targetClusters2);
- }
- CWsSMCTargetCluster* CActivityInfo::findTargetCluster(const char* clusterName, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
- {
- ForEachItemIn(i, targetClusters)
- {
- CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
- if (strieq(targetCluster.clusterName.get(), clusterName))
- return &targetCluster;
- }
- return NULL;
- }
- void CActivityInfo::createActiveWorkUnit(IEspContext& context, Owned<IEspActiveWorkunit>& ownedWU, const char* wuid, const char* location,
- unsigned index, const char* serverName, const char* queueName, const char* instanceName, const char* targetClusterName, bool useContext)
- {
- try
- {
- if (useContext)
- ownedWU.setown(new CActiveWorkunitWrapper(context, wuid, location, index));
- else
- ownedWU.setown(new CActiveWorkunitWrapper(wuid, location, index));
- }
- catch (IException *e)
- { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
- //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
- //with the exception.
- StringBuffer msg;
- ownedWU.setown(new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal"));
- ownedWU->setStateID(WUStateUnknown);
- e->Release();
- }
- ownedWU->setServer(serverName);
- ownedWU->setQueueName(queueName);
- if (instanceName && *instanceName)
- ownedWU->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
- if (targetClusterName && *targetClusterName)
- ownedWU->setTargetClusterName(targetClusterName);
- }
- void CActivityInfo::readWUsInTargetClusterJobQueues(IEspContext& context, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
- {
- ForEachItemIn(i, targetClusters)
- {
- CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
- if (targetCluster.clusterType == ThorLCRCluster)
- readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.clusterQueue, targetCluster.clusterName.get());
- if (targetCluster.agentQueue.queueName.length())
- readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.agentQueue, targetCluster.agentQueue.queueName.str());
- if (targetCluster.serverQueue.queueName.length()) //TODO: queued WUs for ECLCCServer/ECLServer should not be here.
- readWUsInTargetClusterJobQueue(context, targetCluster, targetCluster.serverQueue, targetCluster.serverQueue.queueName.str());
- }
- }
- void CActivityInfo::readWUsInTargetClusterJobQueue(IEspContext& context, CWsSMCTargetCluster& targetCluster, CWsSMCQueue& jobQueue, const char* queueName)
- {
- ForEachItemIn(i, targetCluster.queuedWUIDs)
- {
- const char* wuid = targetCluster.queuedWUIDs.item(i);
- if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
- continue;
- Owned<IEspActiveWorkunit> wu;
- createActiveWorkUnit(context, wu, wuid, jobQueue.queueName.str(), ++jobQueue.countQueuedJobs, targetCluster.statusServerName.str(),
- queueName, NULL, targetCluster.clusterName.get(), false);
- aws.append(*wu.getClear());
- }
- }
- void CActivityInfo::addQueuedServerQueueJob(IEspContext& context, const char* serverName, const char* queueName, const char* instanceName, CIArrayOf<CWsSMCTargetCluster>& targetClusters)
- {
- ForEachItemIn(i, targetClusters)
- {
- CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
- if (!targetCluster.wuidsOnServerQueue.length() || !strieq(queueName, targetCluster.serverQueue.queueName.str()))
- continue;
- ForEachItemIn(i1, targetCluster.wuidsOnServerQueue)
- {
- const char* wuid = targetCluster.wuidsOnServerQueue.item(i1);
- if (!wuid || !*wuid) //Multiple servers may monitor one queue. The WU may be shown under the multiple servers.
- continue;
- Owned<IEspActiveWorkunit> wu;
- createActiveWorkUnit(context, wu, wuid, NULL, 0, serverName, queueName, instanceName, NULL, false);
- aws.append(*wu.getClear());
- }
- }
- }
- void CActivityInfo::readRunningWUsAndJobQueueforOtherStatusServers(IEspContext& context, IPropertyTree* serverStatusRoot)
- {
- BoolHash uniqueServers;
- Owned<IPropertyTreeIterator> it(serverStatusRoot->getElements("Server"));
- ForEach(*it)
- {
- IPropertyTree& serverNode = it->query();
- const char* cluster = serverNode.queryProp("@cluster");
- const char* serverName = serverNode.queryProp("@name");
- const char* node = serverNode.queryProp("@node");
- const char* queueName = serverNode.queryProp("@queue");
- unsigned port = serverNode.getPropInt("@mpport", 0);
- if (!serverName || !*serverName || !node || !*node || strieq(serverName, STATUS_SERVER_DFUSERVER)
- || strieq(serverName, getStatusServerTypeName(WsSMCSSTThorLCRCluster)) || strieq(serverName, getStatusServerTypeName(WsSMCSSTRoxieCluster))
- || strieq(serverName, getStatusServerTypeName(WsSMCSSTHThorCluster)) || strieq(serverName, getStatusServerTypeName(WsSMCSSTECLagent)))
- continue; //target clusters, ECLAgent, DFUServer already handled separately
- StringBuffer instanceName;
- if (!isEmptyString(cluster))
- instanceName.set(cluster);
- else
- instanceName.setf("%s_on_%s:%d", serverName, node, port); //for legacy
- Owned<IPropertyTreeIterator> wuids(serverNode.getElements("WorkUnit"));
- ForEach(*wuids)
- {
- const char* wuid=wuids->query().queryProp(NULL);
- if (!wuid || !*wuid || checkSetUniqueECLWUID(wuid))
- continue;
- Owned<IEspActiveWorkunit> wu;
- createActiveWorkUnit(context, wu, wuid, NULL, 0, serverName, queueName, instanceName.str(), NULL, false);
- aws.append(*wu.getClear());
- }
- bool* found = uniqueServers.getValue(instanceName);
- if (!found || !*found)
- {
- uniqueServers.setValue(instanceName, true);
- getServerJobQueue(context, queueName, instanceName, serverName, node, port);
- //Now, we found a new server. we need to add queued jobs from the queues the server is monitoring.
- StringArray qList;
- qList.appendListUniq(queueName, ",");
- ForEachItemIn(q, qList)
- {
- addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), thorTargetClusters);
- addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), roxieTargetClusters);
- addQueuedServerQueueJob(context, serverName, qList.item(q), instanceName.str(), hthorTargetClusters);
- }
- }
- }
- return;
- }
- void CActivityInfo::getDFUServersAndWUs(IEspContext& context, IPropertyTree* envRoot, IPropertyTree* serverStatusRoot)
- {
- if (!envRoot)
- return;
- VStringBuffer path("Software/%s", eqDfu);
- Owned<IPropertyTreeIterator> services = envRoot->getElements(path);
- ForEach(*services)
- {
- IPropertyTree &serviceTree = services->query();
- const char *qname = serviceTree.queryProp("@queue");
- if (!qname || !*qname)
- continue;
- StringArray queues;
- queues.appendListUniq(qname, ",");
- const char *serverName = serviceTree.queryProp("@name");
- ForEachItemIn(q, queues)
- {
- StringArray wuidList;
- const char *queueName = queues.item(q);
- readDFUWUDetails(queueName, serverName, wuidList, readDFUWUIDs(serverStatusRoot, queueName, wuidList));
- getServerJobQueue(context, queueName, serverName, STATUS_SERVER_DFUSERVER, NULL, 0);
- }
- }
- }
- unsigned CActivityInfo::readDFUWUIDs(IPropertyTree* serverStatusRoot, const char* queueName, StringArray& wuidList)
- {
- if (!queueName || !*queueName)
- {
- IWARNLOG("CActivityInfo::readDFUWUIDs: queue name not specified");
- return 0;
- }
- unsigned runningWUCount = 0;
- VStringBuffer path("Server[@name=\"DFUserver\"]/Queue[@name=\"%s\"]",queueName);
- Owned<IPropertyTreeIterator> iter = serverStatusRoot->getElements(path.str());
- ForEach(*iter)
- {
- Owned<IPropertyTreeIterator> iterj = iter->query().getElements("Job");
- ForEach(*iterj)
- {
- const char *wuid = iterj->query().queryProp("@wuid");
- if (wuid && *wuid && (*wuid!='!')) // filter escapes -- see queuedJobs() in dfuwu.cpp
- {
- wuidList.append(wuid);
- runningWUCount++;
- }
- }
- }
- if (!jobQueueSnapshot)
- return runningWUCount;
- //Read queued jobs
- Owned<IJobQueueConst> jobQueue = jobQueueSnapshot->getJobQueue(queueName);
- if (!jobQueue)
- {
- IWARNLOG("CActivityInfo::readDFUWUIDs: failed to get info for job queue %s.", queueName);
- return runningWUCount;
- }
- CJobQueueContents jobList;
- jobQueue->copyItems(jobList);
- Owned<IJobQueueIterator> iterq = jobList.getIterator();
- ForEach(*iterq)
- {
- const char* wuid = iterq->query().queryWUID();
- if (wuid && *wuid)
- wuidList.append(wuid);
- }
- return runningWUCount;
- }
- void CActivityInfo::readDFUWUDetails(const char* queueName, const char* serverName, StringArray& wuidList, unsigned runningWUCount)
- {
- Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
- ForEachItemIn(i, wuidList)
- {
- StringBuffer jname, uname, state, error;
- const char *wuid = wuidList.item(i);
- if (i<runningWUCount)
- state.set("running");
- else
- state.set("queued");
- try
- {
- Owned<IConstDFUWorkUnit> dfuwu = factory->openWorkUnit(wuid, false);
- dfuwu->getUser(uname);
- dfuwu->getJobName(jname);
- }
- catch (IException *e)
- {
- e->errorMessage(error);
- state.appendf(" (%s)", error.str());
- e->Release();
- }
- Owned<IEspActiveWorkunit> wu(new CActiveWorkunitWrapper(wuid, uname.str(), jname.str(), state.str(), "normal"));
- wu->setServer(STATUS_SERVER_DFUSERVER);
- wu->setInstance(serverName);
- wu->setQueueName(queueName);
- aws.append(*wu.getClear());
- }
- }
- void CActivityInfo::getDFURecoveryJobs()
- {
- Owned<IRemoteConnection> connDFURecovery = querySDS().connect("DFU/RECOVERY",myProcessSession(), RTM_LOCK_READ, 30000);
- if (!connDFURecovery)
- return;
- Owned<IPropertyTreeIterator> it(connDFURecovery->queryRoot()->getElements("job"));
- ForEach(*it)
- {
- IPropertyTree &jb=it->query();
- if (!jb.getPropBool("Running",false))
- continue;
- unsigned done = 0, total = 0;
- Owned<IPropertyTreeIterator> it = jb.getElements("DFT/progress");
- ForEach(*it)
- {
- IPropertyTree &p=it->query();
- if (p.getPropInt("@done",0))
- done++;
- total++;
- }
- StringBuffer cmd;
- cmd.append(jb.queryProp("@command")).append(" ").append(jb.queryProp("@command_parameters"));
- Owned<IEspDFUJob> job = new CDFUJob("","");
- job->setTimeStarted(jb.queryProp("@time_started"));
- job->setDone(done);
- job->setTotal(total);
- job->setCommand(cmd.str());
- DFURecoveryJobs.append(*job.getClear());
- }
- }
- void CActivityInfo::getServerJobQueue(IEspContext &context, const char* queueName, const char* serverName,
- const char* serverType, const char* networkAddress, unsigned port)
- {
- if (!queueName || !*queueName || !serverName || !*serverName || !serverType || !*serverType)
- return;
- double version = context.getClientVersion();
- Owned<IEspServerJobQueue> jobQueue = createServerJobQueue("", "");
- jobQueue->setServerName(serverName);
- jobQueue->setServerType(serverType);
- if (networkAddress && *networkAddress)
- {
- jobQueue->setNetworkAddress(networkAddress);
- jobQueue->setPort(port);
- }
- readServerJobQueueStatus(context, queueName, jobQueue);
- serverJobQueues.append(*jobQueue.getClear());
- }
- void CActivityInfo::readServerJobQueueStatus(IEspContext &context, const char* queueName, IEspServerJobQueue* jobQueue)
- {
- if (!jobQueueSnapshot)
- {
- IWARNLOG("CActivityInfo::readServerJobQueueStatus: jobQueueSnapshot not found.");
- return;
- }
- StringBuffer queueStateDetails;
- bool hasStopped = false, hasPaused = false;
- StringArray queueNames;
- queueNames.appendListUniq(queueName, ",");
- IArrayOf<IEspServerJobQueue> jobQueues;
- ForEachItemIn(i, queueNames)
- readServerJobQueueDetails(context, queueNames.item(i), hasStopped, hasPaused, queueStateDetails, jobQueues);
- double version = context.getClientVersion();
- if (version < 1.20)
- jobQueue->setQueueName(queueName);
- else if (version < 1.21)
- jobQueue->setQueueNames(queueNames);
- else
- jobQueue->setQueues(jobQueues);
- //The hasStopped, hasPaused, and queueStateDetails should be set inside readServerJobQueueDetails().
- if (hasStopped)
- jobQueue->setQueueStatus("stopped"); //Some of its job queues is stopped. So, return a warning here.
- else if (hasPaused)
- jobQueue->setQueueStatus("paused"); //Some of its job queues is paused. So, return a warning here.
- else
- jobQueue->setQueueStatus("running");
- jobQueue->setStatusDetails(queueStateDetails.str());
- }
- void CActivityInfo::readServerJobQueueDetails(IEspContext &context, const char* queueName, bool& hasStopped,
- bool& hasPaused, StringBuffer& queueStateDetails, IArrayOf<IEspServerJobQueue>& jobQueues)
- {
- double version = context.getClientVersion();
- StringBuffer status, details, stateDetailsString;
- Owned<IJobQueueConst> queue = jobQueueSnapshot->getJobQueue(queueName);
- if (queue)
- queue->getState(status, details);
- if (status.isEmpty())
- {
- if (version < 1.21)
- {
- if (!queue)
- queueStateDetails.appendf("%s not found in Status Server list; ", queueName);
- else
- queueStateDetails.appendf("No status set in Status Server list for %s; ", queueName);
- }
- else
- {
- Owned<IEspServerJobQueue> jobQueue = createServerJobQueue();
- jobQueue->setQueueName(queueName);
- if (!queue)
- stateDetailsString.setf("%s not found in Status Server list", queueName);
- else
- stateDetailsString.setf("No status set in Status Server list for %s", queueName);
- queueStateDetails.appendf("%s;", stateDetailsString.str());
- jobQueue->setStatusDetails(stateDetailsString.str());
- jobQueues.append(*jobQueue.getClear());
- }
- return;
- }
- if (version < 1.21)
- {
- if (strieq(status.str(), "paused"))
- hasPaused = true;
- else if (strieq(status.str(), "stopped"))
- hasStopped = true;
-
- if (details && *details)
- queueStateDetails.appendf("%s: queue %s; %s;", queueName, status.str(), details.str());
- else
- queueStateDetails.appendf("%s: queue %s;", queueName, status.str());
- }
- else
- {
- Owned<IEspServerJobQueue> jobQueue = createServerJobQueue();
- jobQueue->setQueueName(queueName);
- if (strieq(status.str(), "paused"))
- {
- hasPaused = true;
- jobQueue->setQueueStatus("paused");
- }
- else if (strieq(status.str(), "stopped"))
- {
- hasStopped = true;
- jobQueue->setQueueStatus("stopped");
- }
- else
- {
- jobQueue->setQueueStatus("running");
- }
-
- if (details && *details)
- {
- queueStateDetails.appendf("%s: queue %s; %s;", queueName, status.str(), details.str());
- stateDetailsString.setf("%s: queue %s; %s;", queueName, status.str(), details.str());
- }
- else
- {
- queueStateDetails.appendf("%s: queue %s;", queueName, status.str());
- stateDetailsString.setf("%s: queue %s;", queueName, status.str());
- }
- jobQueue->setStatusDetails(stateDetailsString.str());
- jobQueues.append(*jobQueue.getClear());
- }
- }
- bool CWsSMCEx::onIndex(IEspContext &context, IEspSMCIndexRequest &req, IEspSMCIndexResponse &resp)
- {
- resp.setRedirectUrl("/");
- return true;
- }
- void CWsSMCEx::readBannerAndChatRequest(IEspContext& context, IEspActivityRequest &req, IEspActivityResponse& resp)
- {
- StringBuffer chatURLStr, bannerStr;
- const char* chatURL = req.getChatURL();
- const char* banner = req.getBannerContent();
- //Filter out invalid chars
- if (chatURL && *chatURL)
- {
- const char* pStr = chatURL;
- unsigned len = strlen(chatURL);
- for (unsigned i = 0; i < len; i++)
- {
- if (isprint(*pStr))
- chatURLStr.append(*pStr);
- pStr++;
- }
- }
- if (banner && *banner)
- {
- const char* pStr = banner;
- unsigned len = strlen(banner);
- for (unsigned i = 0; i < len; i++)
- {
- bannerStr.append(isprint(*pStr) ? *pStr : '.');
- pStr++;
- }
- }
- chatURLStr.trim();
- bannerStr.trim();
- if (!req.getBannerAction_isNull() && req.getBannerAction() && (bannerStr.length() < 1))
- throw MakeStringException(ECLWATCH_MISSING_BANNER_CONTENT, "If a Banner is enabled, the Banner content must be specified.");
- if (!req.getEnableChatURL_isNull() && req.getEnableChatURL() && (chatURLStr.length() < 1))
- throw MakeStringException(ECLWATCH_MISSING_CHAT_URL, "If a Chat is enabled, the Chat URL must be specified.");
- //Now, store the strings since they are valid.
- m_ChatURL = chatURLStr;
- m_Banner = bannerStr;
- const char* bannerSize = req.getBannerSize();
- if (bannerSize && *bannerSize)
- m_BannerSize.set(bannerSize);
- const char* bannerColor = req.getBannerColor();
- if (bannerColor && *bannerColor)
- m_BannerColor.set(bannerColor);
- const char* bannerScroll = req.getBannerScroll();
- if (bannerScroll && *bannerScroll)
- m_BannerScroll.set(bannerScroll);
- m_BannerAction = req.getBannerAction();
- if(!req.getEnableChatURL_isNull())
- m_EnableChatURL = req.getEnableChatURL();
- }
- void CWsSMCEx::setBannerAndChatData(double version, IEspActivityResponse& resp)
- {
- resp.setShowBanner(m_BannerAction);
- resp.setShowChatURL(m_EnableChatURL);
- resp.setBannerContent(m_Banner.str());
- resp.setBannerSize(m_BannerSize.str());
- resp.setBannerColor(m_BannerColor.str());
- resp.setChatURL(m_ChatURL.str());
- if (version >= 1.08)
- resp.setBannerScroll(m_BannerScroll.str());
- }
- void CWsSMCEx::sortTargetClusters(IArrayOf<IEspTargetCluster>& clusters, const char* sortBy, bool descending)
- {
- if (!sortBy || !*sortBy || strieq(sortBy, "name"))
- clusters.sort(descending ? sortTargetClustersByNameDescending : sortTargetClustersByNameAscending);
- else
- clusters.sort(descending ? sortTargetClustersBySizeDescending : sortTargetClustersBySizeAscending);
- }
- void CWsSMCEx::getClusterQueueStatus(const CWsSMCTargetCluster& targetCluster, ClusterStatusType& queueStatusType, StringBuffer& queueStatusDetails)
- {
- const CWsSMCQueue* jobQueue = &targetCluster.clusterQueue;
- if (targetCluster.clusterType != ThorLCRCluster)
- jobQueue = &targetCluster.agentQueue;
- if (!jobQueue->queueName.length())
- return;
- bool queuePausedOrStopped = false;
- //get queueStatusDetails
- if (targetCluster.clusterStatusDetails.length())
- queueStatusDetails.set(targetCluster.clusterStatusDetails.str());
- if (jobQueue->queueState.length())
- {
- const char* queueState = jobQueue->queueState.str();
- queueStatusDetails.appendf("%s: queue %s; ", jobQueue->queueName.str(), queueState);
- if (jobQueue->queueStateDetails.length())
- queueStatusDetails.appendf(" %s;", jobQueue->queueStateDetails.str());
- if (strieq(queueState,"stopped") || strieq(queueState,"paused"))
- queuePausedOrStopped = true;
- }
- //get queueStatusType
- if (!jobQueue->foundQueueInStatusServer)
- {
- if (queuePausedOrStopped)
- queueStatusType = QueuePausedOrStoppedNotFound;
- else
- queueStatusType = QueueRunningNotFound;
- }
- else if (jobQueue->notFoundInJobQueues)
- queueStatusType = QueueNotFound;
- else if (!queuePausedOrStopped)
- queueStatusType = RunningNormal;
- else if (jobQueue->countRunningJobs > 0)
- queueStatusType = QueuePausedOrStoppedWithJobs;
- else
- queueStatusType = QueuePausedOrStoppedWithNoJob;
- return;
- }
- void CWsSMCEx::setClusterStatus(IEspContext& context, const CWsSMCTargetCluster& targetCluster, IEspTargetCluster* returnCluster)
- {
- ClusterStatusType queueStatusType = RunningNormal;
- StringBuffer queueStatusDetails;
- getClusterQueueStatus(targetCluster, queueStatusType, queueStatusDetails);
- returnCluster->setClusterStatus(queueStatusType);
- //Set 'Warning' which may be displayed beside cluster name
- if (queueStatusType == QueueRunningNotFound)
- returnCluster->setWarning("Cluster not listening for workunits");
- else if (queueStatusType == QueuePausedOrStoppedNotFound)
- returnCluster->setWarning("Queue paused or stopped - Cluster not listening for workunits");
- else if (queueStatusType == QueueNotFound)
- returnCluster->setWarning("Queue not found");
- else if (queueStatusType != RunningNormal)
- returnCluster->setWarning("Queue paused or stopped");
- //Set 'StatusDetails' which may be displayed when a mouse is moved over cluster icon
- if (queueStatusDetails.length())
- returnCluster->setStatusDetails(queueStatusDetails.str());
- }
- // This method reads job information from both /Status/Servers and IJobQueue.
- //
- // Each server component (a thor cluster, a dfuserver, or an eclagent) is one 'Server' branch under
- // /Status/Servers. A 'Server' branch has a @queue which indicates the queue name of the server.
- // A 'Server' branch also contains the information about running WUs on that 'Server'. This
- // method reads the information. Those WUs are displays under that server (identified by its queue name)
- // on Activity page.
- //
- // For the WUs list inside /Status/Servers/Server[@name=ECLagent] but not list under other 'Server', the
- // existing code has to find out WUID and @clusterName of the WU. Then, uses @clusterName to find out the
- // queue name in IConstWUClusterInfo. Those WUs list under that server (identified by its queue name) with
- // a note 'on ECLagent'. TBD: the logic here will be simpler if the /Status/Servers/Server is named the
- // instance and/or cluster.
- //
- // In order to get information about queued WUs, this method gets queue names from both IConstWUClusterInfo
- // and other environment functions. Each of those queue names is linked to one IJobQueues. From the
- // IJobQueues, this method reads queued jobs for each server component and list them under the server
- // component (identified by its queue name).
- bool CWsSMCEx::onActivity(IEspContext &context, IEspActivityRequest &req, IEspActivityResponse& resp)
- {
- try
- {
- context.ensureFeatureAccess(FEATURE_URL, SecAccess_Read, ECLWATCH_SMC_ACCESS_DENIED, SMC_ACCESS_DENIED);
- const char* build_ver = getBuildVersion();
- resp.setBuild(build_ver);
- double version = context.getClientVersion();
- bool isSuperUser = true;
- #ifdef _USE_OPENLDAP
- CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
- if(secmgr && !secmgr->isSuperUser(context.queryUser()))
- isSuperUser = false;
- #endif
- if(isSuperUser && req.getFromSubmitBtn())
- readBannerAndChatRequest(context, req, resp);
- if (version >= 1.12)
- resp.setSuperUser(isSuperUser);
- if (version >= 1.06)
- setBannerAndChatData(version, resp);
- Owned<CActivityInfo> activityInfo = activityInfoReader->getActivityInfo();
- if (!activityInfo)
- throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get Activity Info. Please try later.");
- setActivityResponse(context, activityInfo, req, resp);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void CWsSMCEx::addWUsToResponse(IEspContext &context, const IArrayOf<IEspActiveWorkunit>& aws, IEspActivityResponse& resp)
- {
- const char* user = context.queryUserId();
- IArrayOf<IEspActiveWorkunit> awsReturned;
- ForEachItemIn(i, aws)
- {
- IEspActiveWorkunit& wu = aws.item(i);
- const char* wuid = wu.getWuid();
- if (wuid[0] == 'D')//DFU WU
- {
- awsReturned.append(*LINK(&wu));
- continue;
- }
- try
- {
- //if no access, throw an exception and go to the 'catch' section.
- const char* owner = wu.getOwner();
- context.validateFeatureAccess((!owner || !*owner || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS, SecAccess_Read, true);
- awsReturned.append(*LINK(&wu));
- continue;
- }
- catch (IException *e)
- { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
- //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
- //with the exception.
- StringBuffer msg;
- Owned<IEspActiveWorkunit> cw = new CActiveWorkunitWrapper(wuid, "", "", e->errorMessage(msg).str(), "normal");
- cw->setStateID(WUStateUnknown);
- cw->setServer(wu.getServer());
- cw->setQueueName(wu.getQueueName());
- const char* instanceName = wu.getInstance();
- const char* targetClusterName = wu.getTargetClusterName();
- if (instanceName && *instanceName)
- cw->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
- if (targetClusterName && *targetClusterName)
- cw->setTargetClusterName(targetClusterName);
- awsReturned.append(*cw.getClear());
- e->Release();
- }
- }
- resp.setRunning(awsReturned);
- return;
- }
- void CWsSMCEx::setActivityResponse(IEspContext &context, CActivityInfo* activityInfo, IEspActivityRequest &req, IEspActivityResponse& resp)
- {
- double version = context.getClientVersion();
- const char* sortBy = req.getSortBy();
- bool descending = req.getDescending();
- if (version >= 1.22)
- {
- StringBuffer s;
- resp.setActivityTime(activityInfo->queryTimeCached(s));
- resp.setDaliDetached(activityInfoReader->isDaliDetached());
- }
- if (version >= 1.16)
- {
- IArrayOf<IEspTargetCluster> thorClusters;
- IArrayOf<IEspTargetCluster> hthorClusters;
- IArrayOf<IEspTargetCluster> roxieClusters;
- setESPTargetClusters(context, activityInfo->queryThorTargetClusters(), thorClusters);
- setESPTargetClusters(context, activityInfo->queryRoxieTargetClusters(), roxieClusters);
- setESPTargetClusters(context, activityInfo->queryHThorTargetClusters(), hthorClusters);
- sortTargetClusters(thorClusters, sortBy, descending);
- sortTargetClusters(roxieClusters, sortBy, descending);
- SecAccessFlags access;
- if (context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full)
- resp.setAccessRight("Access_Full");
- resp.setSortBy(sortBy);
- resp.setDescending(descending);
- resp.setThorClusterList(thorClusters);
- resp.setRoxieClusterList(roxieClusters);
- resp.setHThorClusterList(hthorClusters);
- resp.setServerJobQueues(activityInfo->queryServerJobQueues());
- }
- else
- {//for backward compatible
- IArrayOf<IEspThorCluster> thorClusters;
- CIArrayOf<CWsSMCTargetCluster>& thorTargetClusters = activityInfo->queryThorTargetClusters();
- ForEachItemIn(i, thorTargetClusters)
- {
- CWsSMCTargetCluster& targetCluster = thorTargetClusters.item(i);
- Owned<IEspThorCluster> respThorCluster = new CThorCluster("", "");
- respThorCluster->setClusterName(targetCluster.clusterName.get());
- respThorCluster->setQueueStatus(targetCluster.queueStatus.get());
- if (version >= 1.03)
- respThorCluster->setQueueName(targetCluster.queueName.get());
- if (version >= 1.11)
- respThorCluster->setClusterSize(targetCluster.clusterSize);
- thorClusters.append(*respThorCluster.getClear());
- }
- resp.setThorClusters(thorClusters);
- if (version > 1.06)
- {
- IArrayOf<IEspRoxieCluster> roxieClusters;
- CIArrayOf<CWsSMCTargetCluster>& roxieTargetClusters = activityInfo->queryRoxieTargetClusters();
- ForEachItemIn(i, roxieTargetClusters)
- {
- CWsSMCTargetCluster& targetCluster = roxieTargetClusters.item(i);
- Owned<IEspRoxieCluster> respRoxieCluster = new CRoxieCluster("", "");
- respRoxieCluster->setClusterName(targetCluster.clusterName.get());
- respRoxieCluster->setQueueStatus(targetCluster.queueStatus.get());
- respRoxieCluster->setQueueName(targetCluster.queueName.get());
- if (version >= 1.11)
- respRoxieCluster->setClusterSize(targetCluster.clusterSize);
- roxieClusters.append(*respRoxieCluster.getClear());
- }
- resp.setRoxieClusters(roxieClusters);
- }
- if (version > 1.10)
- {
- resp.setSortBy(sortBy);
- resp.setDescending(req.getDescending());
- }
- if (version > 1.11)
- {
- IArrayOf<IEspHThorCluster> hThorClusters;
- CIArrayOf<CWsSMCTargetCluster>& hthorTargetClusters = activityInfo->queryHThorTargetClusters();
- ForEachItemIn(i, hthorTargetClusters)
- {
- CWsSMCTargetCluster& targetCluster = hthorTargetClusters.item(i);
- Owned<IEspHThorCluster> respHThorCluster = new CHThorCluster("", "");
- respHThorCluster->setClusterName(targetCluster.clusterName.get());
- respHThorCluster->setQueueStatus(targetCluster.queueStatus.get());
- respHThorCluster->setQueueName(targetCluster.queueName.get());
- respHThorCluster->setClusterSize(targetCluster.clusterSize);
- hThorClusters.append(*respHThorCluster.getClear());
- }
- resp.setHThorClusters(hThorClusters);
- SecAccessFlags access;
- if (context.authorizeFeature(THORQUEUE_FEATURE, access) && access>=SecAccess_Full)
- resp.setAccessRight("Access_Full");
- }
- if (version > 1.03)
- resp.setServerJobQueues(activityInfo->queryServerJobQueues());
- }
- resp.setDFUJobs(activityInfo->queryDFURecoveryJobs());
- addWUsToResponse(context, activityInfo->queryActiveWUs(), resp);
- return;
- }
- void CWsSMCEx::setESPTargetClusters(IEspContext& context, const CIArrayOf<CWsSMCTargetCluster>& targetClusters, IArrayOf<IEspTargetCluster>& respTargetClusters)
- {
- ForEachItemIn(i, targetClusters)
- {
- Owned<IEspTargetCluster> respTargetCluster = new CTargetCluster("", "");
- setESPTargetCluster(context, targetClusters.item(i), respTargetCluster);
- respTargetClusters.append(*respTargetCluster.getClear());
- }
- }
- void CWsSMCEx::addCapabilities(IPropertyTree* pFeatureNode, const char* access,
- IArrayOf<IEspCapability>& capabilities)
- {
- StringBuffer xpath(access);
- xpath.append("/Capability");
- Owned<IPropertyTreeIterator> it = pFeatureNode->getElements(xpath.str());
- ForEach(*it)
- {
- IPropertyTree* pCapabilityNode = &it->query();
- IEspCapability* pCapability = new CCapability("ws_smc");
- pCapability->setName( pCapabilityNode->queryProp("@name") );
- pCapability->setDescription( pCapabilityNode->queryProp("@description") );
- capabilities.append(*pCapability);
- }
- }
- bool CWsSMCEx::onMoveJobDown(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- QueueLock lock(queue);
- unsigned index=queue->findRank(req.getWuid());
- if(index<queue->ordinality())
- {
- Owned<IJobQueueItem> item0 = queue->getItem(index);
- Owned<IJobQueueItem> item = queue->getItem(index+1);
- if(item && item0 && (item0->getPriority() == item->getPriority()))
- queue->moveAfter(req.getWuid(),item->queryWUID());
- }
- }
- AccessSuccess(context, "Changed job priority %s",req.getWuid());
- activityInfoReader->rebuild();
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onMoveJobUp(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- QueueLock lock(queue);
- unsigned index=queue->findRank(req.getWuid());
- if(index>0 && index<queue->ordinality())
- {
- Owned<IJobQueueItem> item0 = queue->getItem(index);
- Owned<IJobQueueItem> item = queue->getItem(index-1);
- if(item && item0 && (item0->getPriority() == item->getPriority()))
- queue->moveBefore(req.getWuid(),item->queryWUID());
- }
- }
- AccessSuccess(context, "Changed job priority %s",req.getWuid());
- activityInfoReader->rebuild();
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onMoveJobBack(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- QueueLock lock(queue);
- unsigned index=queue->findRank(req.getWuid());
- if(index<queue->ordinality())
- {
- Owned<IJobQueueItem> item = queue->getItem(index);
- int priority0 = item->getPriority();
- unsigned biggestIndoxInSamePriority = index;
- unsigned nextIndex = biggestIndoxInSamePriority + 1;
- while (nextIndex<queue->ordinality())
- {
- item.setown(queue->getItem(nextIndex));
- if (priority0 != item->getPriority())
- {
- break;
- }
- biggestIndoxInSamePriority = nextIndex;
- nextIndex++;
- }
- if (biggestIndoxInSamePriority != index)
- {
- item.setown(queue->getItem(biggestIndoxInSamePriority));
- queue->moveAfter(req.getWuid(), item->queryWUID());
- }
- }
- }
- AccessSuccess(context, "Changed job priority %s",req.getWuid());
- activityInfoReader->rebuild();
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onMoveJobFront(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue=createJobQueue(req.getQueueName());
- QueueLock lock(queue);
- unsigned index=queue->findRank(req.getWuid());
- if (index>0 && index<queue->ordinality())
- {
- Owned<IJobQueueItem> item = queue->getItem(index);
- int priority0 = item->getPriority();
- unsigned smallestIndoxInSamePriority = index;
- int nextIndex = smallestIndoxInSamePriority - 1;
- while (nextIndex >= 0)
- {
- item.setown(queue->getItem(nextIndex));
- if (priority0 != item->getPriority())
- {
- break;
- }
- smallestIndoxInSamePriority = nextIndex;
- nextIndex--;
- }
- if (smallestIndoxInSamePriority != index)
- {
- item.setown(queue->getItem(smallestIndoxInSamePriority));
- queue->moveBefore(req.getWuid(), item->queryWUID());
- }
- }
- }
- AccessSuccess(context, "Changed job priority %s",req.getWuid());
- activityInfoReader->rebuild();
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onRemoveJob(IEspContext &context, IEspSMCJobRequest &req, IEspSMCJobResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- abortWorkUnit(req.getWuid(), context.querySecManager(), context.queryUser());
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- QueueLock lock(queue);
- unsigned index=queue->findRank(req.getWuid());
- if(index<queue->ordinality())
- {
- if(!queue->cancelInitiateConversation(req.getWuid()))
- throw MakeStringException(ECLWATCH_CANNOT_DELETE_WORKUNIT,"Failed to remove the workunit %s",req.getWuid());
- }
- }
- AccessSuccess(context, "Removed job %s",req.getWuid());
- activityInfoReader->rebuild();
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onStopQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- StringBuffer info;
- queue->stop(createQueueActionInfo(context, "stopped", req, info));
- }
- AccessSuccess(context, "Stopped queue %s", req.getCluster());
- activityInfoReader->rebuild();
- double version = context.getClientVersion();
- if (version >= 1.19)
- getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onResumeQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- StringBuffer info;
- queue->resume(createQueueActionInfo(context, "resumed", req, info));
- }
- AccessSuccess(context, "Resumed queue %s", req.getCluster());
- activityInfoReader->rebuild();
- double version = context.getClientVersion();
- if (version >= 1.19)
- getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- const char* CWsSMCEx::createQueueActionInfo(IEspContext &context, const char* state, IEspSMCQueueRequest &req, StringBuffer& info)
- {
- StringBuffer peer, currentTime;
- context.getPeer(peer);
- const char* userId = context.queryUserId();
- if (!userId || !*userId)
- userId = "Unknown user";
- CDateTime now;
- now.setNow();
- now.getString(currentTime);
- info.appendf("%s by <%s> at <%s> from <%s>", state, userId, currentTime.str(), peer.str());
- const char* comment = req.getComment();
- if (comment && *comment)
- info.append(": ' ").append(comment).append("'");
- return info.str();
- }
- bool CWsSMCEx::onPauseQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- StringBuffer info;
- queue->pause(createQueueActionInfo(context, "paused", req, info));
- }
- AccessSuccess(context, "Paused queue %s", req.getCluster());
- activityInfoReader->rebuild();
- double version = context.getClientVersion();
- if (version >= 1.19)
- getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onClearQueue(IEspContext &context, IEspSMCQueueRequest &req, IEspSMCQueueResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- {
- Owned<IJobQueue> queue = createJobQueue(req.getQueueName());
- QueueLock lock(queue);
- for(unsigned i=0;i<queue->ordinality();i++)
- {
- Owned<IJobQueueItem> item = queue->getItem(i);
- abortWorkUnit(item->queryWUID(), context.querySecManager(), context.queryUser());
- }
- queue->clear();
- }
- AccessSuccess(context, "Cleared queue %s",req.getCluster());
- activityInfoReader->rebuild();
- double version = context.getClientVersion();
- if (version >= 1.19)
- getStatusServerInfo(context, req.getServerType(), req.getCluster(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void CWsSMCEx::setJobPriority(IEspContext &context, IWorkUnitFactory* factory, const char* wuid, const char* queueName, WUPriorityClass& priority)
- {
- if (!wuid || !*wuid)
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Workunit ID not specified.");
- if (!queueName || !*queueName)
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "queue not specified.");
- Owned<IWorkUnit> lw = factory->updateWorkUnit(wuid, context.querySecManager(), context.queryUser());
- if (!lw)
- throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update Workunit %s", wuid);
- lw->setPriority(priority);
- // set job priority to queue
- int priorityValue = lw->getPriorityValue();
- {
- CriticalBlock b(crit);
- Owned<IJobQueue> queue = createJobQueue(queueName);
- QueueLock lock(queue);
- queue->changePriority(wuid,priorityValue);
- }
- return;
- }
- bool CWsSMCEx::onSetJobPriority(IEspContext &context, IEspSMCPriorityRequest &req, IEspSMCPriorityResponse &resp)
- {
- try
- {
- context.ensureFeatureAccess(THORQUEUE_FEATURE, SecAccess_Full, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- WUPriorityClass priority = PriorityClassNormal;
- if(strieq(req.getPriority(),"high"))
- priority = PriorityClassHigh;
- else if(strieq(req.getPriority(),"low"))
- priority = PriorityClassLow;
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- IArrayOf<IConstSMCJob>& jobs = req.getSMCJobs();
- if (!jobs.length())
- setJobPriority(context, factory, req.getWuid(), req.getQueueName(), priority);
- else
- {
- ForEachItemIn(i, jobs)
- {
- IConstSMCJob &item = jobs.item(i);
- const char *wuid = item.getWuid();
- const char *queueName = item.getQueueName();
- if (wuid && *wuid && queueName && *queueName)
- setJobPriority(context, factory, wuid, queueName, priority);
- }
- }
- }
- activityInfoReader->rebuild();
- resp.setRedirectUrl("/WsSMC/");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onGetThorQueueAvailability(IEspContext &context, IEspGetThorQueueAvailabilityRequest &req, IEspGetThorQueueAvailabilityResponse& resp)
- {
- try
- {
- context.ensureFeatureAccess(FEATURE_URL, SecAccess_Read, ECLWATCH_THOR_QUEUE_ACCESS_DENIED, QUEUE_ACCESS_DENIED);
- StringArray thorNames, groupNames, targetNames, queueNames;
- getEnvironmentThorClusterNames(thorNames, groupNames, targetNames, queueNames);
- IArrayOf<IEspThorCluster> ThorClusters;
- ForEachItemIn(x, thorNames)
- {
- const char* targetName = targetNames.item(x);
- const char* queueName = queueNames.item(x);
- IEspThorCluster* returnCluster = new CThorCluster("","");
-
- returnCluster->setClusterName(targetName);
- returnCluster->setQueueName(queueName);
- StringBuffer info;
- Owned<IJobQueue> queue = createJobQueue(queueName);
- if(queue->stopped(info))
- returnCluster->setQueueStatus("stopped");
- else if (queue->paused(info))
- returnCluster->setQueueStatus("paused");
- else
- returnCluster->setQueueStatus("running");
- unsigned enqueued=0;
- unsigned connected=0;
- unsigned waiting=0;
- queue->getStats(connected,waiting,enqueued);
- returnCluster->setQueueAvailable(waiting);
- returnCluster->setJobsRunning(connected - waiting);
- returnCluster->setJobsInQueue(enqueued);
- ThorClusters.append(*returnCluster);
- }
- resp.setThorClusters(ThorClusters);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onSetBanner(IEspContext &context, IEspSetBannerRequest &req, IEspSetBannerResponse& resp)
- {
- try
- {
- #ifdef _USE_OPENLDAP
- CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
- if(secmgr && !secmgr->isSuperUser(context.queryUser()))
- {
- context.setAuthStatus(AUTH_STATUS_NOACCESS);
- throw MakeStringException(ECLWATCH_SUPER_USER_ACCESS_DENIED, "access denied, administrators only.");
- }
- #endif
- StringBuffer chatURLStr, bannerStr;
- const char* chatURL = req.getChatURL();
- const char* banner = req.getBannerContent();
- //Only display valid strings
- if (chatURL)
- {
- const char* pStr = chatURL;
- for (unsigned i = 0; i < strlen(chatURL); i++)
- {
- if ((pStr[0] > 31) && (pStr[0] < 127))
- chatURLStr.append(pStr[0]);
- pStr++;
- }
- }
- if (banner)
- {
- const char* pStr = banner;
- for (unsigned i = 0; i < strlen(banner); i++)
- {
- if ((pStr[0] > 31) && (pStr[0] < 127))
- bannerStr.append(pStr[0]);
- pStr++;
- }
- }
- chatURLStr.trim();
- bannerStr.trim();
- if (!req.getBannerAction_isNull() && req.getBannerAction() && (bannerStr.length() < 1))
- {
- throw MakeStringException(ECLWATCH_MISSING_BANNER_CONTENT, "If a Banner is enabled, the Banner content must be specified.");
- }
- if (!req.getEnableChatURL_isNull() && req.getEnableChatURL() && (!req.getChatURL() || !*req.getChatURL()))
- {
- throw MakeStringException(ECLWATCH_MISSING_CHAT_URL, "If a Chat is enabled, the Chat URL must be specified.");
- }
- m_ChatURL = chatURLStr;
- m_Banner = bannerStr;
- const char* bannerSize = req.getBannerSize();
- if (bannerSize && *bannerSize)
- m_BannerSize.clear().append(bannerSize);
- const char* bannerColor = req.getBannerColor();
- if (bannerColor && *bannerColor)
- m_BannerColor.clear().append(bannerColor);
- const char* bannerScroll = req.getBannerScroll();
- if (bannerScroll && *bannerScroll)
- m_BannerScroll.clear().append(bannerScroll);
- m_BannerAction = 0;
- if(!req.getBannerAction_isNull())
- m_BannerAction = req.getBannerAction();
- m_EnableChatURL = 0;
- if(!req.getEnableChatURL_isNull())
- m_EnableChatURL = req.getEnableChatURL();
- resp.setRedirectUrl("/WsSMC/Activity");
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- bool CWsSMCEx::onNotInCommunityEdition(IEspContext &context, IEspNotInCommunityEditionRequest &req, IEspNotInCommunityEditionResponse &resp)
- {
- return true;
- }
- bool CWsSMCEx::onBrowseResources(IEspContext &context, IEspBrowseResourcesRequest & req, IEspBrowseResourcesResponse & resp)
- {
- try
- {
- context.ensureFeatureAccess(FEATURE_URL, SecAccess_Read, ECLWATCH_SMC_ACCESS_DENIED, SMC_ACCESS_DENIED);
- double version = context.getClientVersion();
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
- Owned<IConstEnvironment> constEnv = factory->openEnvironment();
- //The resource files will be downloaded from the same box of ESP (not dali)
- StringBuffer ipStr;
- IpAddress ipaddr = queryHostIP();
- ipaddr.getIpText(ipStr);
- if (ipStr.length() > 0)
- {
- resp.setNetAddress(ipStr.str());
- Owned<IConstMachineInfo> machine = constEnv->getMachineByAddress(ipStr.str());
- if (machine)
- {
- int os = machine->getOS();
- resp.setOS(os);
- }
- }
- if (m_PortalURL.length() > 0)
- resp.setPortalURL(m_PortalURL.str());
- #ifndef USE_RESOURCE
- if (version > 1.12)
- resp.setUseResource(false);
- #else
- if (version > 1.12)
- resp.setUseResource(true);
- //Now, get a list of resources stored inside the ESP box
- IArrayOf<IEspHPCCResourceRepository> resourceRepositories;
- Owned<IPropertyTree> pEnvRoot = &constEnv->getPTree();
- const char* ossInstall = pEnvRoot->queryProp("EnvSettings/path");
- if (!ossInstall || !*ossInstall)
- {
- OWARNLOG("Failed to get EnvSettings/Path in environment settings.");
- return true;
- }
- StringBuffer path;
- path.appendf("%s/componentfiles/files/downloads", ossInstall);
- Owned<IFile> f = createIFile(path.str());
- if(!f->exists() || !f->isDirectory())
- {
- OWARNLOG("Invalid resource folder");
- return true;
- }
- Owned<IDirectoryIterator> di = f->directoryFiles(NULL, false, true);
- if(di.get() == NULL)
- {
- OWARNLOG("Resource folder is empty.");
- return true;
- }
- ForEach(*di)
- {
- if (!di->isDir())
- continue;
- StringBuffer folder, path0, tmpBuf;
- di->getName(folder);
- if (folder.length() == 0)
- continue;
- path0.appendf("%s/%s/description.xml", path.str(), folder.str());
- Owned<IFile> f0 = createIFile(path0.str());
- if(!f0->exists())
- {
- OWARNLOG("Description file not found for %s", folder.str());
- continue;
- }
- OwnedIFileIO rIO = f0->openShared(IFOread,IFSHfull);
- if(!rIO)
- {
- OWARNLOG("Failed to open the description file for %s", folder.str());
- continue;
- }
- offset_t fileSize = f0->size();
- tmpBuf.ensureCapacity((unsigned)fileSize);
- tmpBuf.setLength((unsigned)fileSize);
- size32_t nRead = rIO->read(0, (size32_t) fileSize, (char*)tmpBuf.str());
- if (nRead != fileSize)
- {
- OWARNLOG("Failed to read the description file for %s", folder.str());
- continue;
- }
- Owned<IPropertyTree> desc = createPTreeFromXMLString(tmpBuf.str());
- if (!desc)
- {
- OWARNLOG("Invalid description file for %s", folder.str());
- continue;
- }
- Owned<IPropertyTreeIterator> fileIterator = desc->getElements("file");
- if (!fileIterator->first())
- {
- OWARNLOG("Invalid description file for %s", folder.str());
- continue;
- }
- IArrayOf<IEspHPCCResource> resourcs;
- do {
- IPropertyTree &fileItem = fileIterator->query();
- const char* filename = fileItem.queryProp("filename");
- if (!filename || !*filename)
- continue;
- const char* name0 = fileItem.queryProp("name");
- const char* description0 = fileItem.queryProp("description");
- const char* version0 = fileItem.queryProp("version");
- Owned<IEspHPCCResource> onefile = createHPCCResource();
- onefile->setFileName(filename);
- if (name0 && *name0)
- onefile->setName(name0);
- if (description0 && *description0)
- onefile->setDescription(description0);
- if (version0 && *version0)
- onefile->setVersion(version0);
- resourcs.append(*onefile.getLink());
- } while (fileIterator->next());
- if (resourcs.ordinality())
- {
- StringBuffer path1;
- path1.appendf("%s/%s", path.str(), folder.str());
- Owned<IEspHPCCResourceRepository> oneRepository = createHPCCResourceRepository();
- oneRepository->setName(folder.str());
- oneRepository->setPath(path1.str());
- oneRepository->setHPCCResources(resourcs);
- resourceRepositories.append(*oneRepository.getLink());
- }
- }
- if (resourceRepositories.ordinality())
- resp.setHPCCResourceRepositories(resourceRepositories);
- #endif
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- int CWsSMCSoapBindingEx::onHttpEcho(CHttpRequest* request, CHttpResponse* response)
- {
- StringBuffer xml;
- xml.append(
- "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
- "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
- "<soap:Body>"
- "<HttpEchoResponse xmlns='urn:hpccsystems:ws:httpecho'>");
- appendXMLTag(xml, "Method", request->queryMethod());
- appendXMLTag(xml, "UrlPath", request->queryPath());
- appendXMLTag(xml, "UrlParameters", request->queryParamStr());
- appendXMLOpenTag(xml, "Headers");
- StringArray &headers = request->queryHeaders();
- headers.sortAscii(false);
- ForEachItemIn(i, headers)
- {
- const char *h = headers.item(i);
- if (strnicmp(h, "Authorization", 13))
- appendXMLTag(xml, "Header", h);
- }
- appendXMLCloseTag(xml, "Headers");
- const char *content = request->queryContent();
- if (content && *content)
- appendXMLTag(xml, "Content", content);
- xml.append("</HttpEchoResponse></soap:Body></soap:Envelope>");
- response->setContent(xml);
- response->setContentType("text/xml");
- response->send();
- return 0;
- }
- int CWsSMCSoapBindingEx::onGet(CHttpRequest* request, CHttpResponse* response)
- {
- const char *operation = request->queryServiceMethod();
- if (!operation || !strieq(operation, "HttpEcho"))
- return CWsSMCSoapBinding::onGet(request, response);
- return onHttpEcho(request, response);
- }
- void CWsSMCSoapBindingEx::handleHttpPost(CHttpRequest *request, CHttpResponse *response)
- {
- sub_service sstype;
- StringBuffer operation;
- request->getEspPathInfo(sstype, NULL, NULL, &operation, false);
- if (!operation || !strieq(operation, "HttpEcho"))
- CWsSMCSoapBinding::handleHttpPost(request, response);
- else
- onHttpEcho(request, response);
- }
- int CWsSMCSoapBindingEx::onGetForm(IEspContext &context, CHttpRequest* request, CHttpResponse* response, const char *service, const char *method)
- {
- try
- {
- if(stricmp(method,"NotInCommunityEdition")==0)
- {
- StringBuffer page, url, link;
- request->getParameter("EEPortal", url);
- if (url.length() > 0)
- link.appendf("Further information can be found at <a href=\"%s\" target=\"_blank\">%s</a>.", url.str(), url.str());
- page.append(
- "<html>"
- "<head>"
- "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\" />"
- "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/default.css\"/>"
- "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/yui/build/fonts/fonts-min.css\" />"
- "<title>Advanced feature in Enterprise Edition</title>"
- "</head>"
- "<body>"
- "<h3 style=\"text-align:centre;\">Advanced feature in the Enterprise Edition</h4>"
- "<p style=\"text-align:centre;\">Support for this feature is coming soon. ");
- if (link.length() > 0)
- page.append(link.str());
- page.append("</p></body>"
- "</html>");
- response->setContent(page.str());
- response->setContentType("text/html");
- response->send();
- return 0;
- }
- else if(stricmp(method,"DisabledInThisVersion")==0)
- {
- StringBuffer page;
- page.append(
- "<html>"
- "<head>"
- "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\" />"
- "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/default.css\"/>"
- "<link rel=\"stylesheet\" type=\"text/css\" href=\"/esp/files/yui/build/fonts/fonts-min.css\" />"
- "<title>Disabled Feature in This Version</title>"
- "</head>"
- "<body>"
- "<h3 style=\"text-align:centre;\">Disabled Feature in This Version</h4>"
- "<p style=\"text-align:centre;\">This feature is disabled in this version. ");
- page.append("</p></body>"
- "</html>");
- response->setContent(page.str());
- response->setContentType("text/html");
- response->send();
- return 0;
- }
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return onGetForm(context, request, response, service, method);
- }
- inline const char *controlCmdMessage(int cmd)
- {
- switch (cmd)
- {
- case CRoxieControlCmdType_ATTACH:
- return "<control:unlockDali/>";
- case CRoxieControlCmdType_DETACH:
- return "<control:lockDali/>";
- case CRoxieControlCmdType_RELOAD:
- return "<control:reload/>";
- case CRoxieControlCmdType_RELOAD_RETRY:
- return "<control:reload forceRetry='1' />";
- case CRoxieControlCmdType_STATE:
- return "<control:state/>";
- default:
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Unknown Roxie Control Command.");
- }
- return NULL;
- }
- bool CWsSMCEx::onRoxieControlCmd(IEspContext &context, IEspRoxieControlCmdRequest &req, IEspRoxieControlCmdResponse &resp)
- {
- context.ensureFeatureAccess(ROXIE_CONTROL_URL, SecAccess_Full, ECLWATCH_SMC_ACCESS_DENIED, SMC_ACCESS_DENIED);
- const char *process = req.getProcessCluster();
- if (!process || !*process)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Process cluster not specified.");
- const char *controlReq = controlCmdMessage(req.getCommand());
- SocketEndpointArray addrs;
- getRoxieProcessServers(process, addrs);
- if (!addrs.length())
- throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Process cluster not found.");
- Owned<IPropertyTree> controlResp = sendRoxieControlAllNodes(addrs.item(0), controlReq, true, req.getWait());
- if (!controlResp)
- throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get control response from roxie.");
- IArrayOf<IEspRoxieControlEndpointInfo> respEndpoints;
- Owned<IPropertyTreeIterator> roxieEndpoints = controlResp->getElements("Endpoint");
- ForEach(*roxieEndpoints)
- {
- IPropertyTree &roxieEndpoint = roxieEndpoints->query();
- Owned<IEspRoxieControlEndpointInfo> respEndpoint = createRoxieControlEndpointInfo();
- respEndpoint->setAddress(roxieEndpoint.queryProp("@ep"));
- respEndpoint->setStatus(roxieEndpoint.queryProp("Status"));
- if (roxieEndpoint.hasProp("Dali/@connected"))
- respEndpoint->setAttached(roxieEndpoint.getPropBool("Dali/@connected"));
- if (roxieEndpoint.hasProp("State/@hash"))
- respEndpoint->setStateHash(roxieEndpoint.queryProp("State/@hash"));
- respEndpoints.append(*respEndpoint.getClear());
- }
- resp.setEndpoints(respEndpoints);
- return true;
- }
- bool CWsSMCEx::onGetStatusServerInfo(IEspContext &context, IEspGetStatusServerInfoRequest &req, IEspGetStatusServerInfoResponse &resp)
- {
- context.ensureFeatureAccess(FEATURE_URL, SecAccess_Read, ECLWATCH_SMC_ACCESS_DENIED, SMC_ACCESS_DENIED);
- getStatusServerInfo(context, req.getServerType(), req.getServerName(), req.getNetworkAddress(), req.getPort(), resp.updateStatusServerInfo());
- return true;
- }
- void CWsSMCEx::getStatusServerInfo(IEspContext &context, const char *serverType, const char *server, const char *networkAddress, unsigned port,
- IEspStatusServerInfo& statusServerInfo)
- {
- if (!serverType || !*serverType)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server type not specified.");
- Owned<CActivityInfo> activityInfo = activityInfoReader->getActivityInfo();
- if (!activityInfo)
- throw MakeStringException(ECLWATCH_INTERNAL_ERROR, "Failed to get Activity Info. Please try later.");
- if (strieq(serverType,STATUS_SERVER_THOR))
- {
- setTargetClusterInfo(context, serverType, server, activityInfo->queryThorTargetClusters(), activityInfo->queryActiveWUs(), statusServerInfo);
- }
- else if (strieq(serverType,STATUS_SERVER_ROXIE))
- {
- setTargetClusterInfo(context, serverType, server, activityInfo->queryRoxieTargetClusters(), activityInfo->queryActiveWUs(), statusServerInfo);
- }
- else if (strieq(serverType,STATUS_SERVER_HTHOR))
- {
- setTargetClusterInfo(context, serverType, server, activityInfo->queryHThorTargetClusters(), activityInfo->queryActiveWUs(), statusServerInfo);
- }
- else if (strieq(serverType,STATUS_SERVER_DFUSERVER))
- {
- setServerQueueInfo(context, serverType, server, activityInfo->queryServerJobQueues(), activityInfo->queryActiveWUs(), statusServerInfo);
- }
- else
- {
- setServerQueueInfo(context, serverType, networkAddress, port, activityInfo->queryServerJobQueues(), activityInfo->queryActiveWUs(), statusServerInfo);
- }
- }
- void CWsSMCEx::setTargetClusterInfo(IEspContext &context, const char *serverType, const char *clusterName, const CIArrayOf<CWsSMCTargetCluster>& targetClusters,
- const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
- {
- if (!clusterName || !*clusterName)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not specified.");
- IEspTargetCluster& clusterInfo = statusServerInfo.updateTargetClusterInfo();
- ForEachItemIn(i, targetClusters)
- {
- CWsSMCTargetCluster& targetCluster = targetClusters.item(i);
- const char* name = targetCluster.clusterName.get();
- if (name && strieq(name, clusterName))
- {
- setESPTargetCluster(context, targetCluster, &clusterInfo);
- break;
- }
- }
- setActiveWUs(context, serverType, clusterName, clusterInfo.getQueueName(), aws, statusServerInfo);
- }
- void CWsSMCEx::setServerQueueInfo(IEspContext &context, const char *serverType, const char *serverName, const IArrayOf<IEspServerJobQueue>& serverJobQueues,
- const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
- {
- if (!serverName || !*serverName)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Server name not specified.");
- ForEachItemIn(i, serverJobQueues)
- {
- IEspServerJobQueue& serverJobQueue = serverJobQueues.item(i);
- const char* name = serverJobQueue.getServerName();
- if (name && strieq(name, serverName))
- {
- IEspServerJobQueue& serverQueue = statusServerInfo.updateServerInfo();
- serverQueue.copy(serverJobQueue);
- break;
- }
- }
- setActiveWUs(context, serverType, serverName, aws, statusServerInfo);
- }
- void CWsSMCEx::setServerQueueInfo(IEspContext &context, const char *serverType, const char *networkAddress, unsigned port, const IArrayOf<IEspServerJobQueue>& serverJobQueues,
- const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
- {
- if (!networkAddress || !*networkAddress)
- throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Network address not specified.");
- ForEachItemIn(i, serverJobQueues)
- {
- IEspServerJobQueue& serverJobQueue = serverJobQueues.item(i);
- const char* ipAddress = serverJobQueue.getNetworkAddress();
- unsigned thePort = serverJobQueue.getPort();
- if (ipAddress && strieq(ipAddress, networkAddress) && (thePort == port))
- {
- IEspServerJobQueue& serverQueue = statusServerInfo.updateServerInfo();
- serverQueue.copy(serverJobQueue);
- break;
- }
- }
- VStringBuffer instance("%s_on_%s:%d", serverType, networkAddress, port);
- setActiveWUs(context, serverType, instance.str(), aws, statusServerInfo);
- }
- void CWsSMCEx::setESPTargetCluster(IEspContext &context, const CWsSMCTargetCluster& targetCluster, IEspTargetCluster* espTargetCluster)
- {
- espTargetCluster->setClusterName(targetCluster.clusterName.get());
- espTargetCluster->setClusterSize(targetCluster.clusterSize);
- espTargetCluster->setClusterType(targetCluster.clusterType);
- espTargetCluster->setQueueName(targetCluster.queueName.get());
- espTargetCluster->setQueueStatus(targetCluster.queueStatus.get());
- setClusterStatus(context, targetCluster, espTargetCluster);
- }
- void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const char *clusterName, const char *queueName, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
- {
- const char* clusterType = CLUSTER_TYPE_THOR;
- if (strieq(serverType,STATUS_SERVER_ROXIE))
- clusterType = CLUSTER_TYPE_ROXIE;
- else if (strieq(serverType,STATUS_SERVER_HTHOR))
- clusterType = CLUSTER_TYPE_HTHOR;
- IArrayOf<IEspActiveWorkunit> awsOnThisQueue;
- ForEachItemIn(i, aws)
- {
- IEspActiveWorkunit& wu = aws.item(i);
- const char* wuid = wu.getWuid();
- if (!wuid || !*wuid)
- continue;
- const char* wuServerType = wu.getServer();
- const char* wuClusterName = wu.getTargetClusterName();
- if (!wuServerType || !wuClusterName || !strieq(serverType, wuServerType) || !strieq(clusterName, wuClusterName))
- {
- const char* wuClusterType = wu.getClusterType();
- const char* wuClusterQueueName = wu.getClusterQueueName();
- if (!wuClusterType || !wuClusterQueueName || !strieq(clusterType, wuClusterType) || !strieq(queueName, wuClusterQueueName))
- continue;
- }
- Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
- setActiveWUs(context, wu, wuOnThisQueue);
- awsOnThisQueue.append(*wuOnThisQueue.getClear());
- }
- statusServerInfo.setWorkunits(awsOnThisQueue);
- }
- void CWsSMCEx::setActiveWUs(IEspContext &context, const char *serverType, const char *instance, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo)
- {
- IArrayOf<IEspActiveWorkunit> awsOnThisQueue;
- ForEachItemIn(i, aws)
- {
- IEspActiveWorkunit& wu = aws.item(i);
- const char* wuid = wu.getWuid();
- if (!wuid || !*wuid)
- continue;
- const char* wuInstance = wu.getInstance();
- if (!wuInstance || !strieq(wuInstance, instance))
- continue;
- Owned<IEspActiveWorkunit> wuOnThisQueue = new CActiveWorkunitWrapper(wuid, "", "", "", "");
- setActiveWUs(context, wu, wuOnThisQueue);
- awsOnThisQueue.append(*wuOnThisQueue.getClear());
- }
- statusServerInfo.setWorkunits(awsOnThisQueue);
- }
- void CWsSMCEx::setActiveWUs(IEspContext &context, IEspActiveWorkunit& wu, IEspActiveWorkunit* wuToSet)
- {
- try
- {
- const char* user = context.queryUserId();
- const char* owner = wu.getOwner();
- //if no access, throw an exception and go to the 'catch' section.
- context.validateFeatureAccess((!owner || !*owner || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS, SecAccess_Read, true);
- wuToSet->copy(wu);
- }
- catch (IException *e)
- { //if the wu cannot be opened for some reason, the openWorkUnit() inside the CActiveWorkunitWrapper() may throw an exception.
- //We do not want the exception stops this process of retrieving/showing all active WUs. And that WU should still be displayed
- //with the exception.
- wuToSet->setStateID(WUStateUnknown);
- wuToSet->setServer(wu.getServer());
- wuToSet->setQueueName(wu.getQueueName());
- const char* instanceName = wu.getInstance();
- const char* targetClusterName = wu.getTargetClusterName();
- if (instanceName && *instanceName)
- wuToSet->setInstance(instanceName); // JCSMORE In thor case at least, if queued it is unknown which instance it will run on..
- if (targetClusterName && *targetClusterName)
- wuToSet->setTargetClusterName(targetClusterName);
- e->Release();
- }
- }
- static const char *LockModeNames[] = { "ALL", "READ", "WRITE", "HOLD", "SUB" };
- void CWsSMCEx::addLockInfo(CLockMetaData& lD, const char* xPath, const char* lfn, unsigned msNow, time_t ttNow, IArrayOf<IEspLock>& locks)
- {
- Owned<IEspLock> lock = createLock();
- if (xPath && *xPath)
- lock->setXPath(xPath);
- else if (lfn && *lfn)
- lock->setLogicalFile(lfn);
- else
- return; //Should not happen
- lock->setEPIP(lD.queryEp());
- lock->setSessionID(lD.sessId);
- unsigned duration = msNow-lD.timeLockObtained;
- lock->setDurationMS(duration);
- CDateTime timeLocked;
- StringBuffer timeStr;
- time_t ttLocked = ttNow - duration/1000;
- timeLocked.set(ttLocked);
- timeLocked.getString(timeStr);
- lock->setTimeLocked(timeStr.str());
- unsigned mode = lD.mode;
- VStringBuffer modeStr("%x", mode);
- lock->setModes(modeStr.str());
- StringArray modes;
- if (RTM_MODE(mode, RTM_LOCK_READ))
- modes.append(LockModeNames[CLockModes_READ]);
- if (RTM_MODE(mode, RTM_LOCK_WRITE))
- modes.append(LockModeNames[CLockModes_WRITE]);
- if (RTM_MODE(mode, RTM_LOCK_HOLD)) // long-term lock
- modes.append(LockModeNames[CLockModes_HOLD]);
- if (RTM_MODE(mode, RTM_LOCK_SUB)) // locks all descendants as well as self
- modes.append(LockModeNames[CLockModes_SUB]);
- lock->setModeNames(modes);
- locks.append(*lock.getClear());
- }
- bool CWsSMCEx::onLockQuery(IEspContext &context, IEspLockQueryRequest &req, IEspLockQueryResponse &resp)
- {
- class CLockPostFilter
- {
- CLockModes mode;
- time_t ttLTLow, ttLTHigh;
- bool checkLTLow, checkLTHigh;
- int durationLow, durationHigh;
- bool checkMode(unsigned lockMode)
- {
- unsigned modeReq;
- switch (mode)
- {
- case CLockModes_READ:
- modeReq = RTM_LOCK_READ;
- break;
- case CLockModes_WRITE:
- modeReq = RTM_LOCK_WRITE;
- break;
- case CLockModes_HOLD:
- modeReq = RTM_LOCK_HOLD;
- break;
- case CLockModes_SUB:
- modeReq = RTM_LOCK_SUB;
- break;
- default:
- return true;
- }
- if (lockMode & modeReq)
- return true;
- return false;
- }
- public:
- CLockPostFilter(IEspLockQueryRequest& req)
- {
- ttLTLow = 0;
- ttLTHigh = 0;
- mode = req.getMode();
- if (mode == LockModes_Undefined)
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Lock Mode");
- if (req.getDurationMSLow_isNull())
- durationLow = -1;
- else
- durationLow = req.getDurationMSLow();
- if (req.getDurationMSHigh_isNull())
- durationHigh = -1;
- else
- durationHigh = req.getDurationMSHigh();
- const char* timeLow = req.getTimeLockedLow();
- if (!timeLow || !*timeLow)
- checkLTLow = false;
- else
- {
- CDateTime dtLow;
- dtLow.setString(timeLow, NULL, false);
- ttLTLow = dtLow.getSimple();
- checkLTLow = true;
- }
- const char* timeHigh = req.getTimeLockedHigh();
- if (!timeHigh || !*timeHigh)
- checkLTHigh = false;
- else
- {
- CDateTime dtHigh;
- dtHigh.setString(timeHigh, NULL, false);
- ttLTHigh = dtHigh.getSimple();
- checkLTHigh = true;
- }
- }
- bool check(CLockMetaData& lD, unsigned msNow, time_t ttNow)
- {
- if (!checkMode(lD.mode))
- return false;
- int duration = msNow-lD.timeLockObtained;
- if (durationLow > duration)
- return false;
- if ((durationHigh >= 0) && (durationHigh < duration))
- return false;
- if (checkLTLow && (ttNow - duration/1000 < ttLTLow))
- return false;
- if (checkLTHigh && (ttNow - duration/1000 > ttLTHigh))
- return false;
- return true;
- }
- };
- try
- {
- context.ensureFeatureAccess(FEATURE_URL, SecAccess_Read, ECLWATCH_SMC_ACCESS_DENIED, SMC_ACCESS_DENIED);
- CLockPostFilter postFilter(req);
- StringBuffer xPath;
- if (req.getAllFileLocks())
- xPath.appendf("/%s/*", querySdsFilesRoot());
- else
- xPath = req.getXPath();
- Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(req.getEPIP(), xPath.str());
- IArrayOf<IEspLock> locks;
- CDateTime time;
- time.setNow();
- time_t ttNow = time.getSimple();
- unsigned msNow = msTick();
- for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
- {
- ILockInfo& lockInfo = lockInfoCollection->queryLock(l);
- CDfsLogicalFileName dlfn;
- const char* lfn = NULL;
- const char* xPath = NULL;
- if (dlfn.setFromXPath(lockInfo.queryXPath()))
- lfn = dlfn.get();
- else
- xPath = lockInfo.queryXPath();
- for (unsigned i=0; i<lockInfo.queryConnections(); i++)
- {
- CLockMetaData& lMD = lockInfo.queryLockData(i);
- if (postFilter.check(lMD, msNow, ttNow))
- addLockInfo(lMD, xPath, lfn, msNow, ttNow, locks);
- }
- }
- unsigned numLocks = locks.length();
- if (numLocks)
- resp.setLocks(locks);
- resp.setNumLocks(numLocks);
- }
- catch(IException* e)
- {
- FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
- }
- return true;
- }
- void CActivityInfoReader::threadmain()
- {
- #ifndef _CONTAINERIZED
- PROGLOG("WsSMC CActivityInfoReader Thread started.");
- unsigned int autoRebuildMillSeconds = 1000*autoRebuildSeconds;
- while (!stopping)
- {
- if (!detached)
- {
- try
- {
- EspTimeSection timer("createActivityInfo");
- Owned<IEspContext> espContext = createEspContext();
- Owned<CActivityInfo> activityInfo = new CActivityInfo();
- activityInfo->createActivityInfo(*espContext);
- PROGLOG("WsSMC CActivityInfoReader: ActivityInfo collected.");
- CriticalBlock b(crit);
- activityInfoCache.setown(activityInfo.getClear());
- // if 1st and getActivityInfo blocked, release it.
- if (first)
- {
- first = false;
- if (firstBlocked)
- {
- firstBlocked = false;
- firstSem.signal();
- }
- }
- }
- catch(IException *e)
- {
- StringBuffer msg;
- IERRLOG("Exception %d:%s in WsSMC CActivityInfoReader::run", e->errorCode(), e->errorMessage(msg).str());
- e->Release();
- }
- catch(...)
- {
- IERRLOG("Unknown exception in WsSMC CActivityInfoReader::run");
- }
- }
- waiting = true;
- if (!sem.wait(autoRebuildMillSeconds))
- {
- bool expected = true;
- waiting.compare_exchange_strong(expected, false);
- }
- }
- #endif
- }
|