123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jlib.hpp"
- #include "ws_workunitsHelpers.hpp"
- #include "exception_util.hpp"
- #include "daclient.hpp"
- #include "dalienv.hpp"
- #include "daaudit.hpp"
- #include "portlist.h"
- #include "dadfs.hpp"
- #include "fileview.hpp"
- #include "wuwebview.hpp"
- #include "dllserver.hpp"
- #include "wujobq.hpp"
- #include "hqlexpr.hpp"
- #ifdef _USE_ZLIB
- #include "zcrypt.hpp"
- #endif
- namespace ws_workunits {
- SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, const char *owner, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
- {
- return (isEmpty(owner) || (user && streq(user, owner))) ? accessOwn : accessOthers;
- }
- SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, IConstWorkUnit& cw, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
- {
- SCMStringBuffer owner;
- return chooseWuAccessFlagsByOwnership(user, cw.getUser(owner).str(), accessOwn, accessOthers);
- }
- const char *getWuAccessType(const char *owner, const char *user)
- {
- return (isEmpty(owner) || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS;
- }
- const char *getWuAccessType(IConstWorkUnit& cw, const char *user)
- {
- SCMStringBuffer owner;
- return getWuAccessType(cw.getUser(owner).str(), user);
- }
- void getUserWuAccessFlags(IEspContext& context, SecAccessFlags& accessOwn, SecAccessFlags& accessOthers, bool except)
- {
- if (!context.authorizeFeature(OWN_WU_ACCESS, accessOwn))
- accessOwn = SecAccess_None;
- if (!context.authorizeFeature(OTHERS_WU_ACCESS, accessOthers))
- accessOthers = SecAccess_None;
- if (except && (accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
- {
- AuditSystemAccess(context.queryUserId(), false, "Access Denied: User can't view any workunits");
- VStringBuffer msg("Access Denied: User %s does not have rights to access workunits.", context.queryUserId());
- throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "%s", msg.str());
- }
- }
- SecAccessFlags getWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw)
- {
- SecAccessFlags accessFlag = SecAccess_None;
- cxt.authorizeFeature(getWuAccessType(cw, cxt.queryUserId()), accessFlag);
- return accessFlag;
- }
- void ensureWsWorkunitAccessByOwnerId(IEspContext& cxt, const char* owner, SecAccessFlags minAccess)
- {
- if (!cxt.validateFeatureAccess(getWuAccessType(owner, cxt.queryUserId()), minAccess, false))
- throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
- }
- void ensureWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw, SecAccessFlags minAccess)
- {
- if (!cxt.validateFeatureAccess(getWuAccessType(cw, cxt.queryUserId()), minAccess, false))
- throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
- }
- void ensureWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess)
- {
- Owned<IWorkUnitFactory> wf = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = wf->openWorkUnit(wuid, false);
- if (!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Failed to open workunit %s when ensuring workunit access", wuid);
- ensureWsWorkunitAccess(context, *cw, minAccess);
- }
- void ensureWsCreateWorkunitAccess(IEspContext& cxt)
- {
- if (!cxt.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
- throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
- }
- StringBuffer &getWuidFromLogicalFileName(IEspContext &context, const char *logicalName, StringBuffer &wuid)
- {
- Owned<IUserDescriptor> userdesc = createUserDescriptor();
- userdesc->set(context.queryUserId(), context.queryPassword());
- Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
- if (!df)
- throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.",logicalName);
- return wuid.append(df->queryAttributes().queryProp("@workunit"));
- }
- void formatDuration(StringBuffer &s, unsigned ms)
- {
- unsigned days = ms / (1000*60*60*24);
- ms %= (1000*60*60*24);
- unsigned hours = ms / (1000*60*60);
- ms %= (1000*60*60);
- unsigned mins = ms / (1000*60);
- ms %= (1000*60);
- unsigned secs = ms / 1000;
- ms %= 1000;
- if (days)
- s.appendf("%d days ", days);
- if (hours || s.length())
- s.appendf("%d:", hours);
- if (mins || s.length())
- s.appendf("%d:", mins);
- if (s.length())
- s.appendf("%02d.%03d", secs, ms);
- else
- s.appendf("%d.%03d", secs, ms);
- }
- WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf(0), numalert(0)
- {
- Owned<IConstWUExceptionIterator> it = &wu.getExceptions();
- ForEach(*it)
- {
- SCMStringBuffer src, msg, file;
- Owned<IEspECLException> e= createECLException("","");
- e->setCode(it->query().getExceptionCode());
- e->setSource(it->query().getExceptionSource(src).str());
- e->setMessage(it->query().getExceptionMessage(msg).str());
- e->setFileName(it->query().getExceptionFileName(file).str());
- e->setLineNo(it->query().getExceptionLineNo());
- e->setColumn(it->query().getExceptionColumn());
- const char * label = "";
- switch (it->query().getSeverity())
- {
- default:
- case SeverityError: label = "Error"; numerr++; break;
- case SeverityWarning: label = "Warning"; numwrn++; break;
- case SeverityInformation: label = "Info"; numinf++; break;
- case SeverityAlert: label = "Alert"; numalert++; break;
- }
- e->setSeverity(label);
- errors.append(*e.getLink());
- }
- }
- #define SDS_LOCK_TIMEOUT 30000
- void getSashaNode(SocketEndpoint &ep)
- {
- Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
- Owned<IConstEnvironment> env = factory->openEnvironment();
- if (!env)
- throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Cannot get environment information.");
- Owned<IPropertyTree> root = &env->getPTree();
- IPropertyTree *pt = root->queryPropTree("Software/SashaServerProcess[1]/Instance[1]");
- if (!pt)
- throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Archive Server not found.");
- ep.set(pt->queryProp("@netAddress"), pt->getPropInt("@port",DEFAULT_SASHA_PORT));
- }
- void WsWuInfo::getSourceFiles(IEspECLWorkunit &info, unsigned flags)
- {
- if (!(flags & WUINFO_IncludeSourceFiles))
- return;
- try
- {
- Owned<IUserDescriptor> userdesc;
- StringBuffer username;
- context.getUserID(username);
- const char* passwd = context.queryPassword();
- userdesc.setown(createUserDescriptor());
- userdesc->set(username.str(), passwd);
- IArrayOf<IEspECLSourceFile> files;
- if (version < 1.27)
- {
- Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
- ForEach(*f)
- {
- IPropertyTree &query = f->query();
- const char *clusterName = query.queryProp("@cluster");
- const char *fileName = query.queryProp("@name");
- int fileCount = query.getPropInt("@useCount");
- Owned<IEspECLSourceFile> file= createECLSourceFile("","");
- if(clusterName && *clusterName)
- {
- file->setFileCluster(clusterName);
- }
- if (version > 1.11)
- {
- Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
- if (filetrees->first())
- file->setIsSuperFile(true);
- }
- if (fileName && *fileName)
- {
- file->setName(fileName);
- }
- file->setCount(fileCount);
- files.append(*file.getLink());
- }
- }
- else
- {
- StringArray fileNames;
- Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
- ForEach(*f)
- {
- IPropertyTree &query = f->query();
- const char *clusterName = query.queryProp("@cluster");
- const char *fileName = query.queryProp("@name");
- int fileCount = query.getPropInt("@useCount");
- bool bFound = false;
- if (fileName && *fileName && (fileNames.length() > 0))
- {
- for (unsigned i = 0; i < fileNames.length(); i++ )
- {
- const char *fileName0 = fileNames.item(i);
- if (!stricmp(fileName, fileName0))
- {
- bFound = true;
- break;
- }
- }
- }
- if (bFound)
- continue;
- Owned<IEspECLSourceFile> file= createECLSourceFile("","");
- if(clusterName && *clusterName)
- {
- file->setFileCluster(clusterName);
- }
- if (fileName && *fileName)
- {
- file->setName(fileName);
- }
- file->setCount(fileCount);
- Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
- if (filetrees->first())
- {
- file->setIsSuperFile(true);
- getSubFiles(filetrees, file, fileNames);
- }
- files.append(*file.getLink());
- }
- }
- info.setSourceFiles(files);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setSourceFilesDesc(eMsg.str());
- e->Release();
- }
- }
- void WsWuInfo::getExceptions(IEspECLWorkunit &info, unsigned flags)
- {
- if ((flags & WUINFO_IncludeExceptions) || version > 1.16)
- {
- WsWUExceptions errors(*cw);
- if (version > 1.16)
- {
- info.setErrorCount(errors.ErrCount());
- info.setWarningCount(errors.WrnCount());
- info.setInfoCount(errors.InfCount());
- info.setAlertCount(errors.AlertCount());
- }
- if ((flags & WUINFO_IncludeExceptions))
- info.setExceptions(errors);
- }
- }
- void WsWuInfo::getVariables(IEspECLWorkunit &info, unsigned flags)
- {
- if (!(flags & WUINFO_IncludeVariables))
- return;
- try
- {
- IArrayOf<IEspECLResult> results;
- Owned<IConstWUResultIterator> vars = &cw->getVariables();
- ForEach(*vars)
- getResult(vars->query(), results, flags);
- info.setVariables(results);
- results.kill();
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setVariablesDesc(eMsg.str());
- e->Release();
- }
- }
- void WsWuInfo::addTimerToList(SCMStringBuffer& name, const char * scope, IConstWUStatistic & stat, IArrayOf<IEspECLTimer>& timers)
- {
- StringBuffer fd;
- formatStatistic(fd, stat.getValue(), stat.getMeasure());
- Owned<IEspECLTimer> t= createECLTimer("","");
- name.s.replace('_', ' '); // yuk!
- t->setName(name.str());
- t->setValue(fd.str());
- //Theoretically this could overflow, in practice it is unlikely - fix in the new stats interface when implemented
- t->setCount((unsigned)stat.getCount());
- if (version > 1.19)
- {
- StringAttr graphName;
- unsigned graphNum;
- unsigned subGraphNum = 0;
- unsigned subId = 0;
- if (parseGraphScope(scope, graphName, graphNum, subId) ||
- parseGraphTimerLabel(name.str(), graphName, graphNum, subGraphNum, subId)) // leacy
- {
- if (graphName.length() > 0)
- t->setGraphName(graphName);
- if (subId > 0)
- t->setSubGraphId((int)subId);
- }
- }
- timers.append(*t.getLink());
- }
- void WsWuInfo::getTimers(IEspECLWorkunit &info, unsigned flags)
- {
- if (!(flags & WUINFO_IncludeTimers))
- return;
- try
- {
- unsigned __int64 totalThorTimeValue = 0;
- unsigned __int64 totalThorTimerCount = 0; //Do we need this?
- IArrayOf<IEspECLTimer> timers;
- StatisticsFilter filter;
- filter.setScopeDepth(1, 2);
- filter.setMeasure(SMeasureTimeNs);
- Owned<IConstWUStatisticIterator> it = &cw->getStatistics(&filter);
- if (it->first())
- {
- ForEach(*it)
- {
- IConstWUStatistic & cur = it->query();
- SCMStringBuffer name, scope;
- cur.getDescription(name, true);
- cur.getScope(scope);
- bool isThorTiming = false;//Should it be renamed as isClusterTiming?
- if ((cur.getCreatorType() == SCTsummary) && (cur.getKind() == StTimeElapsed) && streq(scope.str(), GLOBAL_SCOPE))
- {
- SCMStringBuffer creator;
- cur.getCreator(creator);
- if (streq(creator.str(), "thor") || streq(creator.str(), "hthor") ||
- streq(creator.str(), "roxie"))
- isThorTiming = true;
- }
- else if (strieq(name.str(), TOTALTHORTIME)) // legacy
- isThorTiming = true;
- if (isThorTiming)
- {
- totalThorTimeValue += cur.getValue();
- totalThorTimerCount += cur.getCount();
- }
- else
- addTimerToList(name, scope.str(), cur, timers);
- }
- }
- if (totalThorTimeValue > 0)
- {
- StringBuffer totalThorTimeText;
- formatStatistic(totalThorTimeText, totalThorTimeValue, SMeasureTimeNs);
- Owned<IEspECLTimer> t= createECLTimer("","");
- if (version > 1.52)
- t->setName(TOTALCLUSTERTIME);
- else
- t->setName(TOTALTHORTIME);
- t->setValue(totalThorTimeText.str());
- t->setCount((unsigned)totalThorTimerCount);
- timers.append(*t.getLink());
- }
- info.setTimers(timers);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setTimersDesc(eMsg.str());
- e->Release();
- }
- }
- unsigned WsWuInfo::getTimerCount()
- {
- unsigned numTimers = 0;
- try
- {
- //This filter must match the filter in the function above, otherwise it will be inconsistent
- StatisticsFilter filter;
- filter.setScopeDepth(1, 2);
- filter.setMeasure(SMeasureTimeNs);
- Owned<IConstWUStatisticIterator> it = &cw->getStatistics(&filter);
- ForEach(*it)
- numTimers++;
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- e->Release();
- }
- return numTimers;
- }
- struct mapEnums { int val; const char *str; };
- mapEnums queryFileTypes[] = {
- { FileTypeCpp, "cpp" },
- { FileTypeDll, "dll" },
- { FileTypeResText, "res" },
- { FileTypeHintXml, "hint" },
- { FileTypeXml, "xml" },
- { FileTypeSize, NULL },
- };
- const char *getEnumText(int value, mapEnums *map)
- {
- const char *defval = map->str;
- while (map->str)
- {
- if (value==map->val)
- return map->str;
- map++;
- }
- assertex(!"Unexpected value in setEnum");
- return defval;
- }
- void WsWuInfo::getHelpers(IEspECLWorkunit &info, unsigned flags)
- {
- try
- {
- IArrayOf<IEspECLHelpFile> helpers;
- Owned <IConstWUQuery> query = cw->getQuery();
- if(!query)
- {
- ERRLOG("Cannot get Query for this workunit.");
- info.setHelpersDesc("Cannot get Query for this workunit.");
- }
- else
- {
- SCMStringBuffer qname;
- query->getQueryShortText(qname);
- if(qname.length())
- {
- if((flags & WUINFO_TruncateEclTo64k) && (qname.length() > 64000))
- qname.setLen(qname.str(), 64000);
- IEspECLQuery* q=&info.updateQuery();
- q->setText(qname.str());
- }
- if (version > 1.34)
- {
- SCMStringBuffer mainDefinition;
- query->getQueryMainDefinition(mainDefinition);
- if(mainDefinition.length())
- {
- IEspECLQuery* q=&info.updateQuery();
- q->setQueryMainDefinition(mainDefinition.str());
- }
- }
- if (version > 1.30)
- {
- SCMStringBuffer qText;
- query->getQueryText(qText);
- if ((qText.length() > 0) && isArchiveQuery(qText.str()))
- info.setHasArchiveQuery(true);
- }
- for (unsigned i = 0; i < FileTypeSize; i++)
- getHelpFiles(query, (WUFileType) i, helpers);
- }
- getWorkunitThorLogInfo(helpers, info);
- if (cw->getWuidVersion() > 0)
- {
- Owned<IPropertyTreeIterator> eclAgents = cw->getProcesses("EclAgent", NULL);
- ForEach (*eclAgents)
- {
- StringBuffer logName;
- IPropertyTree& eclAgent = eclAgents->query();
- eclAgent.getProp("@log",logName);
- if (!logName.length())
- continue;
- Owned<IEspECLHelpFile> h= createECLHelpFile("","");
- h->setName(logName.str());
- h->setType(File_EclAgentLog);
- if (version >= 1.43)
- {
- offset_t fileSize;
- if (getFileSize(logName.str(), NULL, fileSize))
- h->setFileSize(fileSize);
- if (version >= 1.44)
- {
- if (eclAgent.hasProp("@pid"))
- h->setPID(eclAgent.getPropInt("@pid"));
- else
- h->setPID(cw->getAgentPID());
- }
- }
- helpers.append(*h.getLink());
- }
- }
- else // legacy wuid
- {
- Owned<IStringIterator> eclAgentLogs = cw->getLogs("EclAgent");
- ForEach (*eclAgentLogs)
- {
- SCMStringBuffer name;
- eclAgentLogs->str(name);
- if (name.length() < 1)
- continue;
- Owned<IEspECLHelpFile> h= createECLHelpFile("","");
- h->setName(name.str());
- h->setType(File_EclAgentLog);
- if (version >= 1.43)
- {
- offset_t fileSize;
- if (getFileSize(name.str(), NULL, fileSize))
- h->setFileSize(fileSize);
- }
- helpers.append(*h.getLink());
- break;
- }
- }
- info.setHelpers(helpers);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setHelpersDesc(eMsg.str());
- e->Release();
- }
- }
- void WsWuInfo::getApplicationValues(IEspECLWorkunit &info, unsigned flags)
- {
- if (!(flags & WUINFO_IncludeApplicationValues))
- return;
- try
- {
- IArrayOf<IEspApplicationValue> av;
- Owned<IConstWUAppValueIterator> app(&cw->getApplicationValues());
- ForEach(*app)
- {
- IConstWUAppValue& val=app->query();
- SCMStringBuffer buf;
- Owned<IEspApplicationValue> t= createApplicationValue("","");
- t->setApplication(val.getApplication(buf).str());
- t->setValue(val.getValue(buf).str());
- t->setName(val.getName(buf).str());
- t->setValue(val.getValue(buf).str());
- av.append(*t.getLink());
- }
- info.setApplicationValues(av);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setApplicationValuesDesc(eMsg.str());
- e->Release();
- }
- }
- void WsWuInfo::getDebugValues(IEspECLWorkunit &info, unsigned flags)
- {
- if (!(flags & WUINFO_IncludeDebugValues))
- {
- if (version >= 1.50)
- {
- unsigned debugValueCount = 0;
- Owned<IStringIterator> debugs(&cw->getDebugValues());
- ForEach(*debugs)
- debugValueCount++;
- info.setDebugValueCount(debugValueCount);
- }
- return;
- }
- try
- {
- IArrayOf<IEspDebugValue> dv;
- Owned<IStringIterator> debugs(&cw->getDebugValues());
- ForEach(*debugs)
- {
- SCMStringBuffer name, val;
- debugs->str(name);
- cw->getDebugValue(name.str(),val);
- Owned<IEspDebugValue> t= createDebugValue("","");
- t->setName(name.str());
- t->setValue(val.str());
- dv.append(*t.getLink());
- }
- if (version >= 1.50)
- info.setDebugValueCount(dv.length());
- info.setDebugValues(dv);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setDebugValuesDesc(eMsg.str());
- e->Release();
- }
- }
- const char *getGraphNum(const char *s,unsigned &num)
- {
- while (*s && !isdigit(*s))
- s++;
- num = 0;
- while (isdigit(*s))
- {
- num = num*10+*s-'0';
- s++;
- }
- return s;
- }
- bool WsWuInfo::hasSubGraphTimings()
- {
- StatisticsFilter filter;
- filter.setScopeType(SSTsubgraph);
- filter.setKind(StTimeElapsed);
- Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
- return times->first();
- }
- bool WsWuInfo::legacyHasSubGraphTimings()
- {
- StatisticsFilter filter;
- filter.setScopeDepth(1); // only "global" timers.
- filter.setMeasure(SMeasureTimeNs);
- Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
- ForEach(*times)
- {
- IConstWUStatistic & cur = times->query();
- SCMStringBuffer name;
- cur.getDescription(name, false);
- StringAttr graphName;
- unsigned graphNum;
- unsigned subGraphNum;
- unsigned subId;
- if (parseGraphTimerLabel(name.str(), graphName, graphNum, subGraphNum, subId))
- return true;
- }
- return false;
- }
- void WsWuInfo::getGraphInfo(IEspECLWorkunit &info, unsigned flags)
- {
- if (version > 1.01)
- {
- info.setHaveSubGraphTimings(false);
- if (hasSubGraphTimings() || legacyHasSubGraphTimings())
- info.setHaveSubGraphTimings(true);
- }
- if (!(flags & WUINFO_IncludeGraphs))
- return;
- try
- {
- SCMStringBuffer runningGraph;
- WUGraphIDType id;
- WUState st = cw->getState();
- bool running = (!(st==WUStateFailed || st==WUStateAborted || st==WUStateCompleted) && cw->getRunningGraph(runningGraph,id));
- IArrayOf<IEspECLGraph> graphs;
- Owned<IConstWUGraphMetaIterator> it = &cw->getGraphsMeta(GraphTypeAny);
- ForEach(*it)
- {
- IConstWUGraphMeta &graph = it->query();
- SCMStringBuffer name, label, type;
- graph.getName(name);
- graph.getLabel(label);
- graph.getTypeName(type);
- WUGraphState graphState = graph.getState();
- Owned<IEspECLGraph> g= createECLGraph("","");
- g->setName(name.str());
- g->setLabel(label.str());
- g->setType(type.str());
- if (WUGraphComplete == graphState)
- g->setComplete(true);
- else if (running && (WUGraphRunning == graphState))
- {
- g->setRunning(true);
- g->setRunningId(id);
- }
- else if (version > 1.13 && (WUGraphFailed == graphState))
- g->setFailed(true);
- if (version >= 1.53)
- {
- SCMStringBuffer s;
- Owned<IConstWUStatistic> whenGraphStarted = cw->getStatistic(NULL, name.str(), StWhenGraphStarted);
- Owned<IConstWUStatistic> whenGraphFinished = cw->getStatistic(NULL, name.str(), StWhenGraphFinished);
- if (whenGraphStarted)
- g->setWhenStarted(whenGraphStarted->getFormattedValue(s).str());
- if (whenGraphFinished)
- g->setWhenFinished(whenGraphFinished->getFormattedValue(s).str());
- }
- Owned<IConstWUGraphProgress> progress = cw->getGraphProgress(name.str());
- if (progress)
- {
- WUGraphState graphstate= progress->queryGraphState();
- if (graphstate == WUGraphComplete)
- g->setComplete(true);
- if (version > 1.13 && graphstate == WUGraphFailed)
- {
- g->setFailed(true);
- }
- }
- graphs.append(*g.getLink());
- }
- info.setGraphs(graphs);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setGraphsDesc(eMsg.str());
- e->Release();
- }
- }
- void WsWuInfo::getGraphTimingData(IArrayOf<IConstECLTimingData> &timingData, unsigned flags)
- {
- StatisticsFilter filter(SCTall, SSTsubgraph, SMeasureTimeNs, StTimeElapsed);
- Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
- bool matched = false;
- ForEach(*times)
- {
- IConstWUStatistic & cur = times->query();
- SCMStringBuffer scope;
- cur.getScope(scope);
- StringAttr graphName;
- unsigned graphNum;
- unsigned subGraphId;
- if (parseGraphScope(scope.str(), graphName, graphNum, subGraphId))
- {
- unsigned time = (unsigned)nanoToMilli(cur.getValue());
- SCMStringBuffer name;
- cur.getDescription(name, true);
- Owned<IEspECLTimingData> g = createECLTimingData();
- g->setName(name.str());
- g->setGraphNum(graphNum);
- g->setSubGraphNum(subGraphId); // Use the Id - the number is not known
- g->setMS(time);
- g->setMin(time/60000);
- timingData.append(*g.getClear());
- matched = true;
- }
- }
- if (!matched)
- legacyGetGraphTimingData(timingData, flags);
- }
- void WsWuInfo::legacyGetGraphTimingData(IArrayOf<IConstECLTimingData> &timingData, unsigned flags)
- {
- StatisticsFilter filter;
- filter.setScopeDepth(1);
- filter.setMeasure(SMeasureTimeNs);
- Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
- ForEach(*times)
- {
- IConstWUStatistic & cur = times->query();
- SCMStringBuffer name;
- cur.getDescription(name, false); // was previously always filled in.
- StringAttr graphName;
- unsigned graphNum;
- unsigned subGraphNum;
- unsigned subId;
- if (parseGraphTimerLabel(name.str(), graphName, graphNum, subGraphNum, subId))
- {
- unsigned time = (unsigned)nanoToMilli(cur.getValue());
- Owned<IEspECLTimingData> g = createECLTimingData();
- g->setName(name.str());
- g->setGraphNum(graphNum);
- g->setSubGraphNum(subGraphNum);
- g->setGID(subId);
- g->setMS(time);
- g->setMin(time/60000);
- timingData.append(*g.getClear());
- }
- }
- }
- void WsWuInfo::getRoxieCluster(IEspECLWorkunit &info, unsigned flags)
- {
- if (version > 1.06)
- {
- Owned<IConstWURoxieQueryInfo> roxieQueryInfo = cw->getRoxieQueryInfo();
- if (roxieQueryInfo)
- {
- SCMStringBuffer roxieClusterName;
- roxieQueryInfo->getRoxieClusterName(roxieClusterName);
- info.setRoxieCluster(roxieClusterName.str());
- }
- }
- }
- void WsWuInfo::getEventScheduleFlag(IEspECLWorkunit &info)
- {
- info.setEventSchedule(0);
- if (info.getState() && !stricmp(info.getState(), "wait"))
- {
- info.setEventSchedule(2); //Can deschedule
- }
- else
- {
- Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
- if (it)
- {
- ForEach(*it)
- {
- IConstWorkflowItem *r = it->query();
- if (!r)
- continue;
- Owned<IWorkflowEvent> wfevent = r->getScheduleEvent();
- if (!wfevent)
- continue;
- if (!r->hasScheduleCount() || (r->queryScheduleCountRemaining() > 0))
- {
- info.setEventSchedule(1); //Can reschedule
- break;
- }
- }
- }
- }
- }
- unsigned WsWuInfo::getTotalThorTime()
- {
- StatisticsFilter filter;
- filter.setCreatorType(SCTsummary);
- filter.setScope(GLOBAL_SCOPE);
- filter.setKind(StTimeElapsed);
- //Should only be a single value
- unsigned totalThorTimeMS = 0;
- Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
- ForEach(*times)
- {
- totalThorTimeMS += (unsigned)nanoToMilli(times->query().getValue());
- }
- return totalThorTimeMS;
- }
- unsigned WsWuInfo::getLegacyTotalThorTime()
- {
- //4.2.x backward compatibility - only scope depth and measure filters work
- StatisticsFilter filter;
- filter.setScopeDepth(1); // only global
- filter.setMeasure(SMeasureTimeNs);
- Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
- SCMStringBuffer oldname;
- ForEach(*times)
- {
- times->query().getDescription(oldname, false); // description will be set up
- if (streq(oldname.str(), TOTALTHORTIME))
- return (unsigned)nanoToMilli(times->query().getValue());
- }
- return 0;
- }
- void WsWuInfo::getCommon(IEspECLWorkunit &info, unsigned flags)
- {
- SCMStringBuffer s;
- info.setWuid(cw->getWuid(s).str());
- info.setProtected(cw->isProtected() ? 1 : 0);
- info.setJobname(cw->getJobName(s).str());
- info.setOwner(cw->getUser(s).str());
- info.setCluster(cw->getClusterName(clusterName).str());
- info.setSnapshot(cw->getSnapshot(s).str());
- if ((cw->getState() == WUStateScheduled) && cw->aborting())
- {
- info.setStateID(WUStateAborting);
- info.setState("aborting");
- }
- else
- {
- info.setStateID(cw->getState());
- info.setState(cw->getStateDesc(s).str());
- }
- if (cw->isPausing())
- info.setIsPausing(true);
- getEventScheduleFlag(info);
- if (version > 1.27)
- {
- unsigned totalThorTimeMS = getTotalThorTime();
- if (totalThorTimeMS == 0)
- totalThorTimeMS = getLegacyTotalThorTime();
- if (totalThorTimeMS)
- {
- StringBuffer totalThorTimeStr;
- formatDuration(totalThorTimeStr, totalThorTimeMS);
- if (version > 1.52)
- info.setTotalClusterTime(totalThorTimeStr.str());
- else
- info.setTotalThorTime(totalThorTimeStr.str());
- }
- }
- WsWuDateTime dt;
- cw->getTimeScheduled(dt);
- if(dt.isValid())
- info.setDateTimeScheduled(dt.getString(s).str());
- getRoxieCluster(info, flags);
- }
- void WsWuInfo::getInfo(IEspECLWorkunit &info, unsigned flags)
- {
- getCommon(info, flags);
- SecAccessFlags accessFlag = getWsWorkunitAccess(context, *cw);
- info.setAccessFlag(accessFlag);
- SCMStringBuffer s;
- info.setStateEx(cw->getStateEx(s).str());
- info.setPriorityClass(cw->getPriority());
- info.setPriorityLevel(cw->getPriorityLevel());
- if (context.querySecManager())
- info.setScope(cw->getWuScope(s).str());
- info.setActionEx(cw->getActionEx(s).str());
- info.setDescription(cw->getDebugValue("description", s).str());
- if (version > 1.21)
- info.setXmlParams(cw->getXmlParams(s).str());
- info.setResultLimit(cw->getResultLimit());
- info.setArchived(false);
- info.setGraphCount(cw->getGraphCount());
- info.setSourceFileCount(cw->getSourceFileCount());
- info.setVariableCount(cw->getVariableCount());
- info.setTimerCount(getTimerCount());
- info.setSourceFileCount(cw->getSourceFileCount());
- info.setApplicationValueCount(cw->getApplicationValueCount());
- info.setHasDebugValue(cw->hasDebugValue("__calculated__complexity__"));
- getClusterInfo(info, flags);
- getExceptions(info, flags);
- getHelpers(info, flags);
- getGraphInfo(info, flags);
- getSourceFiles(info, flags);
- getResults(info, flags);
- getVariables(info, flags);
- getTimers(info, flags);
- getDebugValues(info, flags);
- getApplicationValues(info, flags);
- getWorkflow(info, flags);
- }
- unsigned WsWuInfo::getWorkunitThorLogInfo(IArrayOf<IEspECLHelpFile>& helpers, IEspECLWorkunit &info)
- {
- unsigned countThorLog = 0;
- IArrayOf<IConstThorLogInfo> thorLogList;
- if (cw->getWuidVersion() > 0)
- {
- SCMStringBuffer clusterName;
- cw->getClusterName(clusterName);
- if (!clusterName.length()) //Cluster name may not be set yet
- return countThorLog;
- Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
- if (!clusterInfo)
- {
- SCMStringBuffer wuid;
- WARNLOG("Cannot find TargetClusterInfo for workunit %s", cw->getWuid(wuid).str());
- return countThorLog;
- }
- unsigned numberOfSlaves = clusterInfo->getSize();
- BoolHash uniqueProcesses;
- Owned<IStringIterator> thorInstances = cw->getProcesses("Thor");
- ForEach (*thorInstances)
- {
- SCMStringBuffer processName;
- thorInstances->str(processName);
- if (processName.length() < 1)
- continue;
- bool* found = uniqueProcesses.getValue(processName.str());
- if (found && *found)
- continue;
- uniqueProcesses.setValue(processName.str(), true);
- StringBuffer groupName;
- getClusterThorGroupName(groupName, processName.str());
- Owned<IStringIterator> thorLogs = cw->getLogs("Thor", processName.str());
- ForEach (*thorLogs)
- {
- SCMStringBuffer logName;
- thorLogs->str(logName);
- if (logName.length() < 1)
- continue;
- countThorLog++;
- StringBuffer fileType;
- if (countThorLog < 2)
- fileType.append(File_ThorLog);
- else
- fileType.appendf("%s%d", File_ThorLog, countThorLog);
- Owned<IEspECLHelpFile> h= createECLHelpFile("","");
- h->setName(logName.str());
- h->setDescription(processName.str());
- h->setType(fileType.str());
- if (version >= 1.43)
- {
- offset_t fileSize;
- if (getFileSize(logName.str(), NULL, fileSize))
- h->setFileSize(fileSize);
- }
- helpers.append(*h.getLink());
- if (version < 1.38)
- continue;
- const char* pStr = logName.str();
- const char* ppStr = strstr(pStr, "/thormaster.");
- if (!ppStr)
- {
- WARNLOG("Invalid thorlog entry in workunit xml: %s", logName.str());
- continue;
- }
- ppStr += 12;
- StringBuffer logDate = ppStr;
- logDate.setLength(10);
- Owned<IEspThorLogInfo> thorLog = createThorLogInfo("","");
- thorLog->setProcessName(processName.str());
- thorLog->setClusterGroup(groupName.str());
- thorLog->setLogDate(logDate.str());
- thorLog->setNumberSlaves(numberOfSlaves);
- thorLogList.append(*thorLog.getLink());
- }
- }
- }
- else //legacy wuid
- {
- Owned<IStringIterator> thorLogs = cw->getLogs("Thor");
- ForEach (*thorLogs)
- {
- SCMStringBuffer name;
- thorLogs->str(name);
- if (name.length() < 1)
- continue;
- countThorLog++;
- StringBuffer fileType;
- if (countThorLog < 2)
- fileType.append(File_ThorLog);
- else
- fileType.appendf("%s%d", File_ThorLog, countThorLog);
- Owned<IEspECLHelpFile> h= createECLHelpFile("","");
- h->setName(name.str());
- h->setType(fileType.str());
- if (version >= 1.43)
- {
- offset_t fileSize;
- if (getFileSize(name.str(), NULL, fileSize))
- h->setFileSize(fileSize);
- }
- helpers.append(*h.getLink());
- }
- StringBuffer logDir;
- Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
- Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
- Owned<IPropertyTree> logTree = &constEnv->getPTree();
- if (logTree)
- logTree->getProp("EnvSettings/log", logDir);
- if (logDir.length() > 0)
- {
- Owned<IStringIterator> debugs = cw->getLogs("Thor");
- ForEach(*debugs)
- {
- SCMStringBuffer val;
- debugs->str(val);
- if (val.length() < 1)
- continue;
- const char* pStr = val.str();
- const char* ppStr = strstr(pStr, logDir.str());
- if (!ppStr)
- {
- WARNLOG("Invalid thorlog entry in workunit xml: %s", val.str());
- continue;
- }
- const char* pProcessName = ppStr + logDir.length();
- char sep = pProcessName[0];
- StringBuffer processName = pProcessName + 1;
- ppStr = strchr(pProcessName + 1, sep);
- if (!ppStr)
- {
- WARNLOG("Invalid thorlog entry in workunit xml: %s", val.str());
- continue;
- }
- processName.setLength(ppStr - pProcessName - 1);
- StringBuffer groupName;
- getClusterThorGroupName(groupName, processName.str());
- StringBuffer logDate = ppStr + 12;
- logDate.setLength(10);
- Owned<IEspThorLogInfo> thorLog = createThorLogInfo("","");
- thorLog->setProcessName(processName.str());
- thorLog->setClusterGroup(groupName.str());
- thorLog->setLogDate(logDate.str());
- //for legacy wuid, the log name does not contain slaveNum. So, a user may not specify
- //a slaveNum and we only display the first slave log if > 1 per IP.
- thorLog->setNumberSlaves(0);
- thorLogList.append(*thorLog.getLink());
- }
- }
- }
- if (thorLogList.length() > 0)
- info.setThorLogList(thorLogList);
- thorLogList.kill();
- return countThorLog;
- }
- bool WsWuInfo::getClusterInfo(IEspECLWorkunit &info, unsigned flags)
- {
- if (version > 1.04)
- {
- StringArray allowedClusters;
- SCMStringBuffer val;
- cw->getAllowedClusters(val);
- if (val.length() > 0)
- {
- const char* ptr = val.str();
- while(*ptr != '\0')
- {
- StringBuffer onesub;
- while(*ptr != '\0' && *ptr != ',')
- {
- onesub.append((char)(*ptr));
- ptr++;
- }
- if(onesub.length() > 0)
- allowedClusters.append(onesub.str());
- if(*ptr != '\0')
- ptr++;
- }
- }
- if (allowedClusters.length() > 0)
- info.setAllowedClusters(allowedClusters);
- }
- if (version > 1.23 && clusterName.length())
- {
- int clusterTypeFlag = 0;
- Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
- Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
- Owned<IPropertyTree> root = &constEnv->getPTree();
- if (!root)
- throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI,"Cannot connect to DALI server.");
- Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
- if (clusterInfo.get())
- {//Set thor flag or roxie flag in order to display some options for thor or roxie
- ClusterType platform = clusterInfo->getPlatform();
- if (isThorCluster(platform))
- {
- clusterTypeFlag=1;
- if (version > 1.29)
- info.setThorLCR(ThorLCRCluster == platform);
- }
- else if (RoxieCluster == platform)
- clusterTypeFlag=2;
- }
- info.setClusterFlag(clusterTypeFlag);
- }
- return true;
- }
- void WsWuInfo::getWorkflow(IEspECLWorkunit &info, unsigned flags)
- {
- bool eventCountRemaining = false;
- bool eventCountUnlimited = false;
- try
- {
- info.setEventSchedule(0);
- unsigned workflowsCount = 0;
- IArrayOf<IConstECLWorkflow> workflows;
- Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
- if (it)
- {
- ForEach(*it)
- {
- IConstWorkflowItem *r = it->query();
- if (r)
- {
- IWorkflowEvent *wfevent = r->getScheduleEvent();
- if (wfevent)
- {
- Owned<IEspECLWorkflow> g;
- if (flags & WUINFO_IncludeWorkflows)
- {
- StringBuffer id;
- g.setown(createECLWorkflow("",""));
- g->setWFID(id.appendf("%d", r->queryWfid()).str());
- g->setEventName(wfevent->queryName());
- g->setEventText(wfevent->queryText());
- }
- if (r->hasScheduleCount())
- {
- if (r->queryScheduleCountRemaining() > 0)
- eventCountRemaining = true;
- if (flags & WUINFO_IncludeWorkflows)
- {
- g->setCount(r->queryScheduleCount());
- g->setCountRemaining(r->queryScheduleCountRemaining());
- }
- }
- else
- {
- eventCountUnlimited = true;
- }
- workflowsCount++;
- if (flags & WUINFO_IncludeWorkflows)
- workflows.append(*g.getLink());
- }
- }
- }
- if (workflows.length() > 0)
- info.setWorkflows(workflows);
- workflows.kill();
- }
- if (version >= 1.50)
- info.setWorkflowCount(workflowsCount);
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setWorkflowsDesc(eMsg.str());
- e->Release();
- }
- if (info.getState() && !stricmp(info.getState(), "wait"))
- info.setEventSchedule(2); //Can deschedule
- else if (eventCountUnlimited || eventCountRemaining)
- info.setEventSchedule(1); //Can reschedule
- }
- IDistributedFile* WsWuInfo::getLogicalFileData(IEspContext& context, const char* logicalName, bool& showFileContent)
- {
- StringBuffer username;
- context.getUserID(username);
- Owned<IUserDescriptor> userdesc(createUserDescriptor());
- userdesc->set(username.str(), context.queryPassword());
- Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
- if (!df)
- return NULL;
- bool blocked;
- if (df->isCompressed(&blocked) && !blocked)
- return df.getClear();
- IPropertyTree& properties = df->queryAttributes();
- const char * format = properties.queryProp("@format");
- if (format && (stricmp(format,"csv")==0 || memicmp(format, "utf", 3) == 0))
- {
- showFileContent = true;
- return df.getClear();
- }
- const char * recordEcl = properties.queryProp("ECL");
- if (!recordEcl)
- return df.getClear();
- MultiErrorReceiver errs;
- Owned<IHqlExpression> ret = ::parseQuery(recordEcl, &errs);
- showFileContent = errs.errCount() == 0;
- return df.getClear();
- }
- void WsWuInfo::getEclSchemaChildFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
- {
- if(!expr)
- return;
- ForEachChild(idx, expr)
- getEclSchemaFields(schemas, expr->queryChild(idx), isConditional);
- }
- void WsWuInfo::getEclSchemaFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
- {
- if(!expr)
- return;
- int ret = expr->getOperator();
- switch (ret)
- {
- case no_record:
- getEclSchemaChildFields(schemas, expr, isConditional);
- break;
- case no_ifblock:
- {
- getEclSchemaChildFields(schemas, expr->queryChild(1), true);
- break;
- }
- case no_field:
- {
- if (expr->hasAttribute(__ifblockAtom))
- break;
- ITypeInfo * type = expr->queryType();
- IAtom * name = expr->queryName();
- IHqlExpression * nameAttr = expr->queryAttribute(namedAtom);
- StringBuffer outname;
- if (nameAttr && nameAttr->queryChild(0) && nameAttr->queryChild(0)->queryValue())
- nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
- else
- outname.append(name).toLowerCase();
- if(type)
- {
- type_t tc = type->getTypeCode();
- if (tc == type_row)
- {
- getEclSchemaChildFields(schemas, expr->queryRecord(), isConditional);
- }
- else
- {
- if (type->getTypeCode() == type_alien)
- {
- IHqlAlienTypeInfo * alien = queryAlienType(type);
- type = alien->queryPhysicalType();
- }
- Owned<IEspECLSchemaItem> schema = createECLSchemaItem("","");
- StringBuffer eclType;
- type->getECLType(eclType);
- schema->setColumnName(outname);
- schema->setColumnType(eclType.str());
- schema->setColumnTypeCode(tc);
- schema->setIsConditional(isConditional);
- schemas.append(*schema.getClear());
- }
- }
- break;
- }
- }
- }
- bool WsWuInfo::getResultEclSchemas(IConstWUResult &r, IArrayOf<IEspECLSchemaItem>& schemas)
- {
- SCMStringBuffer schema;
- r.getResultEclSchema(schema);
- if (!schema.length())
- return false;
- MultiErrorReceiver errs;
- Owned<IHqlExpression> expr = ::parseQuery(schema.str(), &errs);
- if (errs.errCount() != 0)
- return false;
- getEclSchemaFields(schemas, expr, false);
- return true;
- }
- void WsWuInfo::getResult(IConstWUResult &r, IArrayOf<IEspECLResult>& results, unsigned flags)
- {
- SCMStringBuffer name;
- r.getResultName(name);
- SCMStringBuffer filename;
- r.getResultLogicalName(filename);
- bool showFileContent = false;
- Owned<IDistributedFile> df = NULL;
- if (filename.length())
- df.setown(getLogicalFileData(context, filename.str(), showFileContent));
- StringBuffer value, link;
- if (r.getResultStatus() == ResultStatusUndefined)
- value.set("[undefined]");
- else if (r.isResultScalar())
- {
- try
- {
- SCMStringBuffer xml;
- r.getResultXml(xml);
- Owned<IPropertyTree> props = createPTreeFromXMLString(xml.str(), ipt_caseInsensitive);
- IPropertyTree *val = props->queryPropTree("Row/*");
- if(val)
- value.set(val->queryProp(NULL));
- else
- {
- Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
- Owned<INewResultSet> result;
- result.setown(resultSetFactory->createNewResultSet(&r, wuid.str()));
- Owned<IResultSetCursor> cursor(result->createCursor());
- cursor->first();
- if (cursor->getIsAll(0))
- {
- value.set("<All/>");
- }
- else
- {
- Owned<IResultSetCursor> childCursor = cursor->getChildren(0);
- if (childCursor)
- {
- ForEach(*childCursor)
- {
- StringBuffer out;
- StringBufferAdaptor adaptor(out);
- childCursor->getDisplayText(adaptor, 0);
- if (!value.length())
- value.append('[');
- else
- value.append(", ");
- value.append('\'').append(out.str()).append('\'');
- }
- if (value.length())
- value.append(']');
- }
- }
- }
- }
- catch(...)
- {
- value.append("[value not available]");
- }
- }
- else
- {
- value.append('[').append(r.getResultTotalRowCount()).append(" rows]");
- if((r.getResultSequence()>=0) && (!filename.length() || (df && df->queryAttributes().hasProp("ECL"))))
- link.append(r.getResultSequence());
- }
- Owned<IEspECLResult> result= createECLResult("","");
- if (flags & WUINFO_IncludeEclSchemas)
- {
- IArrayOf<IEspECLSchemaItem> schemas;
- if (getResultEclSchemas(r, schemas))
- result->setECLSchemas(schemas);
- }
- if (flags & WUINFO_IncludeXmlSchema)
- {
- Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
- Owned<INewResultSet> rs = resultSetFactory->createNewResultSet(&r, wuid.str());
- Owned<IResultSetCursor> cursor(rs->createCursor());
- SCMStringBuffer xsd;
- const IResultSetMetaData & meta = cursor->queryResultSet()->getMetaData();
- meta.getXmlXPathSchema(xsd, false);
- result->setXmlSchema(xsd.str());
- }
- if (filename.length())
- result->setShowFileContent(showFileContent);
- result->setName(name.str());
- result->setLink(link.str());
- result->setSequence(r.getResultSequence());
- result->setValue(value.str());
- result->setFileName(filename.str());
- result->setIsSupplied(r.getResultStatus() == ResultStatusSupplied);
- result->setTotal(r.getResultTotalRowCount());
- results.append(*result.getLink());
- }
- void WsWuInfo::getResults(IEspECLWorkunit &info, unsigned flags)
- {
- try
- {
- unsigned count = 0;
- IArrayOf<IEspECLResult> results;
- Owned<IConstWUResultIterator> it = &(cw->getResults());
- ForEach(*it)
- {
- IConstWUResult &r = it->query();
- if(r.getResultSequence()>=0)
- {
- if (flags & WUINFO_IncludeResults)
- getResult(r, results, flags);
- count++;
- }
- }
- if (version >= 1.17)
- info.setResultCount(count);
- if ((flags & WUINFO_IncludeResults) && results.length() > 0)
- info.setResults(results);
- results.kill();
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- info.setResultsDesc(eMsg.str());
- e->Release();
- }
- }
- void WsWuInfo::getStats(StatisticsFilter& filter, bool createDescriptions, IArrayOf<IEspWUStatisticItem>& statistics)
- {
- Owned<IConstWUStatisticIterator> stats = &cw->getStatistics(&filter);
- ForEach(*stats)
- {
- IConstWUStatistic & cur = stats->query();
- StringBuffer xmlBuf, tsValue;
- SCMStringBuffer curCreator, curScope, curDescription, curFormattedValue;
- StatisticCreatorType curCreatorType = cur.getCreatorType();
- StatisticScopeType curScopeType = cur.getScopeType();
- StatisticMeasure curMeasure = cur.getMeasure();
- StatisticKind curKind = cur.getKind();
- unsigned __int64 value = cur.getValue();
- unsigned __int64 count = cur.getCount();
- unsigned __int64 max = cur.getMax();
- unsigned __int64 ts = cur.getTimestamp();
- cur.getCreator(curCreator);
- cur.getScope(curScope);
- cur.getDescription(curDescription, createDescriptions);
- cur.getFormattedValue(curFormattedValue);
- Owned<IEspWUStatisticItem> wuStatistic = createWUStatisticItem();
- if (curCreatorType != SCTnone)
- wuStatistic->setCreatorType(queryCreatorTypeName(curCreatorType));
- if (curCreator.length())
- wuStatistic->setCreator(curCreator.str());
- if (curScopeType != SSTnone)
- wuStatistic->setScopeType(queryScopeTypeName(curScopeType));
- if (curScope.length())
- wuStatistic->setScope(curScope.str());
- if (curMeasure != SMeasureNone)
- wuStatistic->setMeasure(queryMeasureName(curMeasure));
- if (curKind != StKindNone)
- wuStatistic->setKind(queryStatisticName(curKind));
- wuStatistic->setRawValue(value);
- wuStatistic->setValue(curFormattedValue.str());
- if (count != 1)
- wuStatistic->setCount(count);
- if (max)
- wuStatistic->setMax(max);
- if (ts)
- {
- formatStatistic(tsValue, ts, SMeasureTimestampUs);
- wuStatistic->setTimeStamp(tsValue.str());
- }
- if (curDescription.length())
- wuStatistic->setDescription(curDescription.str());
- statistics.append(*wuStatistic.getClear());
- }
- }
- bool WsWuInfo::getFileSize(const char* fileName, const char* IPAddress, offset_t& fileSize)
- {
- if (!fileName || !*fileName)
- return false;
- Owned<IFile> aFile;
- if (!IPAddress || !*IPAddress)
- {
- aFile.setown(createIFile(fileName));
- }
- else
- {
- RemoteFilename rfn;
- rfn.setRemotePath(fileName);
- SocketEndpoint ep(IPAddress);
- rfn.setIp(ep);
- aFile.setown(createIFile(rfn));
- }
- if (!aFile)
- return false;
- bool isDir;
- CDateTime modtime;
- if (!aFile->getInfo(isDir, fileSize, modtime) || isDir)
- return false;
- return true;
- }
- void WsWuInfo::getHelpFiles(IConstWUQuery* query, WUFileType type, IArrayOf<IEspECLHelpFile>& helpers)
- {
- if (!query)
- return;
- Owned<IConstWUAssociatedFileIterator> iter = &query->getAssociatedFiles();
- ForEach(*iter)
- {
- SCMStringBuffer name, Ip, description;
- IConstWUAssociatedFile & cur = iter->query();
- if (cur.getType() != type)
- continue;
- cur.getName(name);
- Owned<IEspECLHelpFile> h= createECLHelpFile("","");
- h->setName(name.str());
- h->setType(getEnumText(type, queryFileTypes));
- if (version > 1.31)
- {
- cur.getIp(Ip);
- h->setIPAddress(Ip.str());
- cur.getDescription(description);
- if ((description.length() < 1) && (name.length() > 0))
- {
- const char* desc = pathTail(name.str());
- if (desc && *desc)
- description.set(desc);
- }
- if (description.length() < 1)
- description.set("Help File");
- h->setDescription(description.str());
- if (version >= 1.43)
- {
- offset_t fileSize;
- if (getFileSize(name.str(), Ip.str(), fileSize))
- h->setFileSize(fileSize);
- }
- }
- helpers.append(*h.getLink());
- }
- }
- void WsWuInfo::getSubFiles(IPropertyTreeIterator* f, IEspECLSourceFile* eclSuperFile, StringArray& fileNames)
- {
- IArrayOf<IEspECLSourceFile> files;
- ForEach(*f)
- {
- IPropertyTree &query = f->query();
- const char *clusterName = query.queryProp("@cluster");
- const char *fileName = query.queryProp("@name");
- int fileCount = query.getPropInt("@useCount");
- bool bFound = false;
- if (fileName && *fileName && (fileNames.length() > 0))
- {
- for (unsigned i = 0; i < fileNames.length(); i++ )
- {
- const char *fileName0 = fileNames.item(i);
- if (!stricmp(fileName, fileName0))
- {
- bFound = true;
- break;
- }
- }
- }
- if (bFound)
- continue;
- Owned<IEspECLSourceFile> file= createECLSourceFile("","");
- if(clusterName && *clusterName)
- {
- file->setFileCluster(clusterName);
- }
- if (fileName && *fileName)
- {
- file->setName(fileName);
- fileNames.append(fileName);
- }
- file->setCount(fileCount);
- Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
- if (filetrees->first())
- {
- file->setIsSuperFile(true);
- getSubFiles(filetrees, file, fileNames);
- }
- files.append(*file.getLink());
- }
- eclSuperFile->setECLSourceFiles(files);
- return;
- }
- bool WsWuInfo::getResourceInfo(StringArray &viewnames, StringArray &urls, unsigned flags)
- {
- if (!(flags & (WUINFO_IncludeResultsViewNames | WUINFO_IncludeResourceURLs)))
- return true;
- try
- {
- Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, NULL, false);
- if (wv)
- {
- if (flags & WUINFO_IncludeResultsViewNames)
- wv->getResultViewNames(viewnames);
- if (flags & WUINFO_IncludeResourceURLs)
- wv->getResourceURLs(urls, NULL);
- }
- return true;
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- e->Release();
- }
- return false;
- }
- unsigned WsWuInfo::getResourceURLCount()
- {
- try
- {
- Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, NULL, false);
- if (wv)
- return wv->getResourceURLCount();
- }
- catch(IException* e)
- {
- StringBuffer eMsg;
- ERRLOG("%s", e->errorMessage(eMsg).str());
- e->Release();
- }
- return 0;
- }
- void appendIOStreamContent(MemoryBuffer &mb, IFileIOStream *ios, bool forDownload)
- {
- StringBuffer line;
- bool eof = false;
- while (!eof)
- {
- line.clear();
- loop
- {
- char c;
- size32_t numRead = ios->read(1, &c);
- if (!numRead)
- {
- eof = true;
- break;
- }
- line.append(c);
- if (c=='\n')
- break;
- }
- mb.append(line.length(), line.str());
- if (!forDownload && (mb.length() > 640000))
- break;
- }
- }
- void WsWuInfo::getWorkunitEclAgentLog(const char* fileName, const char* agentPid, MemoryBuffer& buf)
- {
- if(!fileName || !*fileName)
- throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"Log file not specified");
- Owned<IFile> rFile = createIFile(fileName);
- if(!rFile)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open file %s.", fileName);
- OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
- if(!rIO)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE, "Cannot read file %s.", fileName);
- OwnedIFileIOStream ios = createBufferedIOStream(rIO);
- StringBuffer line;
- bool eof = false;
- bool wuidFound = false;
- StringBuffer pidstr;
- if (agentPid && *agentPid)
- pidstr.appendf(" %s ", agentPid);
- else
- pidstr.appendf(" %5d ", cw->getAgentPID());
- char const * pidchars = pidstr.str();
- while(!eof)
- {
- line.clear();
- loop
- {
- char c;
- size32_t numRead = ios->read(1, &c);
- if (!numRead)
- {
- eof = true;
- break;
- }
- line.append(c);
- if (c=='\n')
- break;
- }
- //Retain all rows that match a unique program instance - by retaining all rows that match a pid
- if(strstr(line.str(), pidchars))
- {
- //Check if this is a new instance using line sequence number
- if (strncmp(line.str(), "00000000", 8) == 0)
- {
- if (wuidFound) //If the correct instance has been found, return that instance before the next instance.
- break;
- //The last instance is not a correct instance. Clean the buf in order to start a new instance.
- buf.clear();
- }
- //If we spot the workunit id anywhere in the tacing for this pid then assume it is the correct instance.
- if(!wuidFound && strstr(line.str(), wuid.str()))
- wuidFound = true;
- buf.append(line.length(), line.str());
- }
- }
- if (buf.length() < 1)
- buf.append(47, "(Not found a log line related to this workunit)");
- }
- void WsWuInfo::getWorkunitThorLog(const char* fileName, MemoryBuffer& buf)
- {
- if(!fileName || !*fileName)
- throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"Log file not specified");
- Owned<IFile> rFile = createIFile(fileName);
- if (!rFile)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE,"Cannot open file %s.",fileName);
- OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
- if (!rIO)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",fileName);
- OwnedIFileIOStream ios = createBufferedIOStream(rIO);
- StringBuffer line;
- bool eof = false;
- bool include = false;
- VStringBuffer startwuid("Started wuid=%s", wuid.str());
- VStringBuffer endwuid("Finished wuid=%s", wuid.str());
- const char *sw = startwuid.str();
- const char *ew = endwuid.str();
- while (!eof)
- {
- line.clear();
- loop
- {
- char c;
- size32_t numRead = ios->read(1, &c);
- if (!numRead)
- {
- eof = true;
- break;
- }
- line.append(c);
- if (c=='\n')
- break;
- }
- if (strstr(line.str(), sw))
- include = true;
- if (include)
- buf.append(line.length(), line.str());
- if (strstr(line.str(), ew))
- include = false;
- }
- }
- void WsWuInfo::getWorkunitThorSlaveLog(const char *groupName, const char *ipAddress, const char* logDate, const char* logDir, int slaveNum, MemoryBuffer& buf, bool forDownload)
- {
- if (isEmpty(logDir))
- throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log path not specified.");
- if (isEmpty(logDate))
- throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log date not specified.");
- StringBuffer slaveIPAddress, logName;
- if (slaveNum > 0)
- {
- if (isEmpty(groupName))
- throw MakeStringException(ECLWATCH_INVALID_INPUT,"Thor group not specified.");
- Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(groupName);
- if (!nodeGroup || (nodeGroup->ordinality() == 0))
- {
- WARNLOG("Node group %s not found", groupName);
- return;
- }
- nodeGroup->queryNode(slaveNum-1).endpoint().getIpText(slaveIPAddress);
- if (slaveIPAddress.length() < 1)
- throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log network address not found.");
- logName.appendf("thorslave.%d.%s.log", slaveNum, logDate);
- }
- else
- {//legacy wuid: a user types in an IP address for a thor slave
- if (isEmpty(ipAddress))
- throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave address not specified.");
- //thorslave.10.239.219.6_20100.2012_05_23.log
- logName.appendf("thorslave.%s*.%s.log", ipAddress, logDate);
- const char* portPtr = strchr(ipAddress, '_');
- if (!portPtr)
- slaveIPAddress.append(ipAddress);
- else
- {
- StringBuffer ipAddressStr = ipAddress;
- ipAddressStr.setLength(portPtr - ipAddress);
- slaveIPAddress.append(ipAddressStr.str());
- }
- }
- RemoteFilename rfn;
- rfn.setRemotePath(logDir);
- SocketEndpoint ep(slaveIPAddress.str());
- rfn.setIp(ep);
- Owned<IFile> dir = createIFile(rfn);
- Owned<IDirectoryIterator> diriter = dir->directoryFiles(logName.str());
- if (!diriter->first())
- throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find Thor slave log file %s.", logName.str());
- Linked<IFile> logfile = &diriter->query();
- diriter.clear();
- dir.clear();
- // logfile is now the file to load
- OwnedIFileIO rIO = logfile->openShared(IFOread,IFSHfull);
- if (!rIO)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",logName.str());
- OwnedIFileIOStream ios = createBufferedIOStream(rIO);
- if (slaveNum > 0)
- {
- StringBuffer line;
- bool eof = false;
- bool include = false;
- VStringBuffer startwuid("Started wuid=%s", wuid.str());
- VStringBuffer endwuid("Finished wuid=%s", wuid.str());
- const char *sw = startwuid.str();
- const char *ew = endwuid.str();
- while (!eof)
- {
- line.clear();
- loop
- {
- char c;
- size32_t numRead = ios->read(1, &c);
- if (!numRead)
- {
- eof = true;
- break;
- }
- line.append(c);
- if (c=='\n')
- break;
- }
- if (strstr(line.str(), sw))
- include = true;
- if (include)
- buf.append(line.length(), line.str());
- if (strstr(line.str(), ew))
- include = false;
- }
- }
- else
- {//legacy wuid
- appendIOStreamContent(buf, ios.get(), forDownload);
- }
- }
- void WsWuInfo::getWorkunitResTxt(MemoryBuffer& buf)
- {
- Owned<IConstWUQuery> query = cw->getQuery();
- if(!query)
- throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
- SCMStringBuffer resname;
- queryDllServer().getDll(query->getQueryResTxtName(resname).str(), buf);
- }
- void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf)
- {
- Owned<IConstWUQuery> query = cw->getQuery();
- if(!query)
- throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
- SCMStringBuffer queryText;
- query->getQueryText(queryText);
- if ((queryText.length() < 1) || !isArchiveQuery(queryText.str()))
- throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive Query not found for workunit %s.", wuid.str());
- buf.append(queryText.length(), queryText.str());
- }
- void WsWuInfo::getWorkunitQueryShortText(MemoryBuffer& buf)
- {
- Owned<IConstWUQuery> query = cw->getQuery();
- if(!query)
- throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
- SCMStringBuffer queryText;
- query->getQueryShortText(queryText);
- if (queryText.length() < 1)
- throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU, "No query for workunit %s.",wuid.str());
- buf.append(queryText.length(), queryText.str());
- }
- void WsWuInfo::getWorkunitDll(StringBuffer &dllname, MemoryBuffer& buf)
- {
- Owned<IConstWUQuery> query = cw->getQuery();
- if(!query)
- throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
- StringBufferAdaptor isvName(dllname);
- query->getQueryDllName(isvName);
- queryDllServer().getDll(dllname.str(), buf);
- }
- void WsWuInfo::getWorkunitXml(const char* plainText, MemoryBuffer& buf)
- {
- const char* header;
- if (plainText && (!stricmp(plainText, "yes")))
- header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
- else
- header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
- SCMStringBuffer xml;
- exportWorkUnitToXML(cw, xml, true, false);
- buf.append(strlen(header), header);
- buf.append(xml.length(), xml.str());
- }
- void WsWuInfo::getWorkunitCpp(const char *cppname, const char* description, const char* ipAddress, MemoryBuffer& buf, bool forDownload)
- {
- if (isEmpty(description))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
- if (isEmpty(ipAddress))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "File location not specified.");
- if (isEmpty(cppname))
- throw MakeStringException(ECLWATCH_INVALID_FILE_NAME, "File path not specified.");
- RemoteFilename rfn;
- rfn.setRemotePath(cppname);
- SocketEndpoint ep(ipAddress);
- rfn.setIp(ep);
- Owned<IFile> cppfile = createIFile(rfn);
- if (!cppfile)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open %s.", description);
- OwnedIFileIO rIO = cppfile->openShared(IFOread,IFSHfull);
- if (!rIO)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
- OwnedIFileIOStream ios = createBufferedIOStream(rIO);
- if (!ios)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
- appendIOStreamContent(buf, ios.get(), forDownload);
- }
- void WsWuInfo::getWorkunitAssociatedXml(const char* name, const char* ipAddress, const char* plainText,
- const char* description, bool forDownload, MemoryBuffer& buf)
- {
- if (isEmpty(description)) //'File Name' as shown in WU Details page
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
- if (isEmpty(ipAddress))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "File location not specified.");
- if (isEmpty(name)) //file name with full path
- throw MakeStringException(ECLWATCH_INVALID_FILE_NAME, "File path not specified.");
- RemoteFilename rfn;
- rfn.setRemotePath(name);
- SocketEndpoint ep(ipAddress);
- rfn.setIp(ep);
- Owned<IFile> rFile = createIFile(rfn);
- if (!rFile)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open %s.", description);
- OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
- if (!rIO)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
- OwnedIFileIOStream ios = createBufferedIOStream(rIO);
- if (!ios)
- throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
- const char* header;
- if (plainText && (!stricmp(plainText, "yes")))
- header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
- else
- header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
- buf.append(strlen(header), header);
- appendIOStreamContent(buf, ios.get(), forDownload);
- }
- WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* ecl,const char* jobname,const char* appname,const char* appkey,const char* appvalue)
- {
- SecAccessFlags accessOwn;
- SecAccessFlags accessOthers;
- getUserWuAccessFlags(context, accessOwn, accessOthers, true);
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- StringBuffer xpath("*");
- if(ecl && *ecl)
- xpath.append("[Query/Text=?~\"*").append(ecl).append("*\"]");
- if(state && *state)
- xpath.append("[@state=\"").append(state).append("\"]");
- if(cluster && *cluster)
- xpath.append("[@clusterName=\"").append(cluster).append("\"]");
- if(owner && *owner)
- xpath.append("[@submitID=?~\"").append(owner).append("\"]");
- if(jobname && *jobname)
- xpath.append("[@jobName=?~\"*").append(jobname).append("*\"]");
- if((appname && *appname) || (appkey && *appkey) || (appvalue && *appvalue))
- {
- xpath.append("[Application/").append(appname && *appname ? appname : "*");
- xpath.append("/").append(appkey && *appkey ? appkey : "*");
- if(appvalue && *appvalue)
- xpath.append("=?~\"").append(appvalue).append("\"");
- xpath.append("]");
- }
- Owned<IConstWorkUnitIterator> it(factory->getWorkUnitsByXPath(xpath.str()));
- StringBuffer wuFrom, wuTo;
- if(startDate && *startDate)
- createWuidFromDate(startDate, wuFrom);
- if(endDate && *endDate)
- createWuidFromDate(endDate, wuTo);
- ForEach(*it)
- {
- IConstWorkUnit &cw = it->query();
- if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
- continue;
- SCMStringBuffer wuid;
- cw.getWuid(wuid);
- if (wuFrom.length() && strcmp(wuid.str(),wuFrom.str())<0)
- continue;
- if (wuTo.length() && strcmp(wuid.str(),wuTo.str())>0)
- continue;
- if (state && *state)
- {
- SCMStringBuffer descr;
- if(!strieq(cw.getStateDesc(descr).str(),state))
- continue;
- }
- wuids.push_back(wuid.str());
- }
- std::sort(wuids.begin(),wuids.end(),std::greater<std::string>());
- }
- StringBuffer& WsWuSearch::createWuidFromDate(const char* timestamp,StringBuffer& s)
- {
- CDateTime wuTime;
- wuTime.setString(timestamp,NULL,true);
- unsigned year, month, day, hour, minute, second, nano;
- wuTime.getDate(year, month, day, true);
- wuTime.getTime(hour, minute, second, nano, true);
- s.appendf("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second);
- return s;
- }
- struct CompareData
- {
- CompareData(const char* _filter): filter(_filter) {}
- bool operator()(const Linked<DataCacheElement>& e) const
- {
- return stricmp(e->m_filter.c_str(),filter)==0;
- }
- const char* filter;
- };
- DataCacheElement* DataCache::lookup(IEspContext &context, const char* filter, unsigned timeOutMin)
- {
- CriticalBlock block(crit);
- if (cache.size() < 1)
- return NULL;
- //erase data if it should be
- CDateTime timeNow;
- int timeout = timeOutMin;
- timeNow.setNow();
- timeNow.adjustTime(-timeout);
- while (true)
- {
- std::list<Linked<DataCacheElement> >::iterator list_iter = cache.begin();
- if (list_iter == cache.end())
- break;
- DataCacheElement* awu = list_iter->get();
- if (!awu || (awu->m_timeCached > timeNow))
- break;
- cache.pop_front();
- }
- if (cache.size() < 1)
- return NULL;
- //Check whether we have the data cache for this cluster. If yes, get the version
- std::list<Linked<DataCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareData(filter));
- if(it!=cache.end())
- {
- return it->getLink();
- }
- return NULL;
- }
- void DataCache::add(const char* filter, const char* data, const char* name, const char* localName, const char* wuid,
- const char* resultName, unsigned seq, __int64 start, unsigned count, __int64 requested, __int64 total)
- {
- CriticalBlock block(crit);
- //Save new data
- Owned<DataCacheElement> e=new DataCacheElement(filter, data, name, localName, wuid, resultName, seq, start, count, requested, total);
- if (cacheSize > 0)
- {
- if (cache.size() >= cacheSize)
- cache.pop_front();
- cache.push_back(e.get());
- }
- return;
- }
- struct CompareArchivedWUs
- {
- CompareArchivedWUs(const char* _filter): filter(_filter) {}
- bool operator()(const Linked<ArchivedWuCacheElement>& e) const
- {
- return stricmp(e->m_filter.c_str(),filter)==0;
- }
- const char* filter;
- };
- ArchivedWuCacheElement* ArchivedWuCache::lookup(IEspContext &context, const char* filter, const char* sashaUpdatedWhen, unsigned timeOutMin)
- {
- CriticalBlock block(crit);
- if (cache.size() < 1)
- return NULL;
- //erase data if it should be
- CDateTime timeNow;
- int timeout = timeOutMin;
- timeNow.setNow();
- timeNow.adjustTime(-timeout);
- while (true)
- {
- std::list<Linked<ArchivedWuCacheElement> >::iterator list_iter = cache.begin();
- if (list_iter == cache.end())
- break;
- ArchivedWuCacheElement* awu = list_iter->get();
- if (awu && !stricmp(sashaUpdatedWhen, awu->m_sashaUpdatedWhen.c_str()) && (awu->m_timeCached > timeNow))
- break;
- cache.pop_front();
- }
- if (cache.size() < 1)
- return NULL;
- //Check whether we have the data cache for this cluster. If yes, get the version
- std::list<Linked<ArchivedWuCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareArchivedWUs(filter));
- if(it!=cache.end())
- return it->getLink();
- return NULL;
- }
- void ArchivedWuCache::add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, unsigned numWUsReturned, IArrayOf<IEspECLWorkunit>& wus)
- {
- CriticalBlock block(crit);
- //Save new data
- Owned<ArchivedWuCacheElement> e=new ArchivedWuCacheElement(filter, sashaUpdatedWhen, hasNextPage, numWUsReturned, wus);
- if (cacheSize > 0)
- {
- if (cache.size() >= cacheSize)
- cache.pop_front();
- cache.push_back(e.get());
- }
- return;
- }
- WsWuJobQueueAuditInfo::WsWuJobQueueAuditInfo(IEspContext &context, const char *cluster, const char *from , const char *to, CHttpResponse* response, const char *xls)
- {
- if(!response)
- return;
- unsigned maxDisplay = 125;
- IArrayOf<IEspThorQueue> items;
- CDateTime fromTime;
- CDateTime toTime;
- StringBuffer fromstr;
- StringBuffer tostr;
- if(from && *from)
- {
- fromTime.setString(from,NULL,false);
- fromTime.getString(fromstr, false);
- }
- if(to && *to)
- {
- toTime.setString(to,NULL,false);
- toTime.getString(tostr, false);
- }
- StringBuffer filter("ThorQueueMonitor");
- if(notEmpty(cluster))
- filter.appendf(",%s", cluster);
- StringAttrArray lines;
- queryAuditLogs(fromTime, toTime, filter.str(), lines);
- unsigned countLines = 0;
- unsigned maxConnected = 0;
- unsigned longestQueue = 0;
- ForEachItemIn(idx, lines)
- {
- const char* line = lines.item(idx).text;
- if(!line || !*line)
- continue;
- if (idx < (lines.length() - 1))
- getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 1, items);
- else
- getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 2, items);
- countLines++;
- }
- StringBuffer responsebuf;
- if (items.length() < 1)
- {
- responsebuf.append("<script language=\"javascript\">\r\nparent.displayQEnd(\'No data found\')</script>\r\n");
- response->sendChunk(responsebuf.str());
- return;
- }
- unsigned itemCount = items.length();
- if (itemCount > maxDisplay)
- itemCount = maxDisplay;
- responsebuf.append("<script language=\"javascript\">parent.displayQLegend()</script>\r\n");
- response->sendChunk(responsebuf.str());
- responsebuf.clear();
- responsebuf.append("<script language=\"javascript\">parent.displayQBegin(").append(longestQueue).append(",").append(maxConnected).append(",").append(itemCount).append(")</script>\r\n");
- response->sendChunk(responsebuf.str());
- responsebuf.clear();
- responsebuf.append("<script language=\"javascript\">\r\n");
- //bool displayDT = false;
- unsigned count = 0;
- unsigned jobpending=0;
- ForEachItemIn(i,items)
- {
- IEspThorQueue& tq = items.item(i);
- //displayDT = !displayDT;
- count++;
- if (count > maxDisplay)
- break;
- StringBuffer countStr, dtStr;
- countStr.appendulong(count);
- //if (displayDT)
- dtStr = tq.getDT();
- responsebuf.append("parent.displayQueue(\'").append(count).append("\',\'").append(dtStr.str()).append("\',\'").append(tq.getRunningWUs()).append("\',");
- responsebuf.append("\'").append(tq.getQueuedWUs()).append("\',\'").append(tq.getWaitingThors()).append("\',");
- responsebuf.append("\'").append(tq.getConnectedThors()).append("\',\'").append(tq.getIdledThors()).append("\',");
- responsebuf.append("\'").append(tq.getRunningWU1()).append("\',\'").append(tq.getRunningWU2()).append("\')\r\n");
- if(++jobpending>=50)
- {
- responsebuf.append("</script>\r\n");
- response->sendChunk(responsebuf.str());
- responsebuf.clear();
- responsebuf.append("<script language=\"javascript\">\r\n");
- jobpending=0;
- }
- }
- StringBuffer countStr;
- countStr.appendulong(count);
- StringBuffer msg("<table><tr><td>");
- msg.append("Total Records in the Time Period: ").append(items.length()).append(" (<a href=\"/WsWorkunits/WUClusterJobQueueLOG?").append(xls).append("\">txt</a>...<a href=\"/WsWorkunits/WUClusterJobQueueXLS?").append(xls).append("\">xls</a>).");
- msg.append("</td></tr><tr><td>");
- if (count > maxDisplay)
- msg.append("Displayed: First ").append(maxDisplay).append(". ");
- msg.append("Max. Queue Length: ").append(longestQueue).append(".");
- msg.append("</td></tr></table>");
- responsebuf.append("parent.displayQEnd(\'").append(msg).append("\')</script>\r\n");
- response->sendChunk(responsebuf.str());
- }
- void WsWuJobQueueAuditInfo::getAuditLineInfo(const char* line, unsigned& longestQueue, unsigned& maxConnected, unsigned maxDisplay, unsigned showAll, IArrayOf<IEspThorQueue>& items)
- {
- //2009-08-12 02:44:12 ,ThorQueueMonitor,thor400_88_dev,0,0,1,1,114,---,---
- if(!line || !*line)
- return;
- Owned<IEspThorQueue> tq = createThorQueue();
- StringBuffer dt, runningWUs, queuedWUs, waitingThors, connectedThors, idledThors, runningWU1, runningWU2;
- // date/time
- const char* bptr = line;
- const char* eptr = strchr(bptr, ',');
- if(eptr)
- dt.append(eptr - bptr, bptr);
- else
- dt.append(bptr);
- tq->setDT(dt.str());
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //skip title
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //skip queue name
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //running
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- runningWUs.append(eptr - bptr, bptr);
- else
- runningWUs.append(bptr);
- tq->setRunningWUs(runningWUs.str());
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //queued
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- queuedWUs.append(eptr - bptr, bptr);
- else
- queuedWUs.append(bptr);
- if (maxDisplay > items.length())
- {
- unsigned queueLen = atoi(queuedWUs.str());
- if (queueLen > longestQueue)
- longestQueue = queueLen;
- }
- tq->setQueuedWUs(queuedWUs.str());
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //waiting
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- waitingThors.append(eptr - bptr, bptr);
- else
- waitingThors.append(bptr);
- tq->setWaitingThors(waitingThors.str());
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //connected
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- connectedThors.append(eptr - bptr, bptr);
- else
- connectedThors.append(bptr);
- if (maxDisplay > items.length())
- {
- unsigned connnectedLen = atoi(connectedThors.str());
- if (connnectedLen > maxConnected)
- maxConnected = connnectedLen;
- }
- tq->setConnectedThors(connectedThors.str());
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //idled
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- idledThors.append(eptr - bptr, bptr);
- else
- idledThors.append(bptr);
- tq->setIdledThors(idledThors.str());
- if(!eptr)
- {
- items.append(*tq.getClear());
- return;
- }
- //runningWU1
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- runningWU1.append(eptr - bptr, bptr);
- else
- {
- runningWU1.append(bptr);
- }
- if (!strcmp(runningWU1.str(), "---"))
- runningWU1.clear();
- if (runningWU1.length() > 0)
- tq->setRunningWU1(runningWU1.str());
- if(!eptr)
- {
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- return;
- }
- //runningWU2
- bptr = eptr + 1;
- eptr = strchr(bptr, ',');
- if(eptr)
- runningWU2.append(eptr - bptr, bptr);
- else
- {
- runningWU2.append(bptr);
- }
- if (!strcmp(runningWU2.str(), "---"))
- runningWU2.clear();
- if (runningWU2.length() > 0)
- tq->setRunningWU2(runningWU2.str());
- if (checkNewThorQueueItem(tq, showAll, items))
- items.append(*tq.getClear());
- }
- bool WsWuJobQueueAuditInfo::checkSameStrings(const char* s1, const char* s2)
- {
- if (s1)
- {
- if (!s2)
- return false;
- if (strcmp(s1, s2))
- return false;
- }
- else if (s2)
- {
- if (!s1)
- return false;
- }
- return true;
- }
- bool WsWuJobQueueAuditInfo::checkNewThorQueueItem(IEspThorQueue* tq, unsigned showAll, IArrayOf<IEspThorQueue>& items)
- {
- bool bAdd = false;
- if (showAll < 1) //show every lines
- bAdd = true;
- else if (items.length() < 1)
- bAdd = true;
- else if (showAll > 1) //last line now
- {
- IEspThorQueue& tq0 = items.item(items.length()-1);
- if (!checkSameStrings(tq->getDT(), tq0.getDT()))
- bAdd = true;
- }
- else
- {
- IEspThorQueue& tq0 = items.item(items.length()-1);
- if (!checkSameStrings(tq->getRunningWUs(), tq0.getRunningWUs()))
- bAdd = true;
- if (!checkSameStrings(tq->getQueuedWUs(), tq0.getQueuedWUs()))
- bAdd = true;
- if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
- bAdd = true;
- if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
- bAdd = true;
- if (!checkSameStrings(tq->getRunningWU1(), tq0.getRunningWU1()))
- bAdd = true;
- if (!checkSameStrings(tq->getRunningWU2(), tq0.getRunningWU2()))
- bAdd = true;
- }
- return bAdd;
- }
- void xsltTransform(const char* xml, const char* sheet, IProperties *params, StringBuffer& ret)
- {
- StringBuffer xsl;
- if(!checkFileExists(sheet))
- throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Could not find stylesheet %s.",sheet);
- Owned<IXslProcessor> proc = getXslProcessor();
- Owned<IXslTransform> trans = proc->createXslTransform();
- trans->setXmlSource(xml, strlen(xml));
- trans->loadXslFromFile(sheet);
- trans->copyParameters(params);
- trans->transform(ret);
- }
- bool addToQueryString(StringBuffer &queryString, const char *name, const char *value, const char delim)
- {
- if (isEmpty(name) || isEmpty(value))
- return false;
- if (queryString.length() > 0)
- queryString.append(delim);
- queryString.append(name).append("=").append(value);
- return true;
- }
- int WUSchedule::run()
- {
- try
- {
- while(!stopping)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
- Owned<IConstWorkUnitIterator> itr = factory->getWorkUnitsByState(WUStateScheduled);
- if (itr)
- {
- ForEach(*itr)
- {
- try
- {
- IConstWorkUnit & cw = itr->query();
- if (cw.aborting())
- {
- WorkunitUpdate wu(&cw.lock());
- wu->setState(WUStateAborted);
- continue;
- }
- WsWuDateTime dt, now;
- now.setNow();
- cw.getTimeScheduled(dt);
- if (now.compare(dt)>=0)
- {
- SCMStringBuffer wuid;
- runWorkUnit(cw.getWuid(wuid).str());
- }
- }
- catch(IException *e)
- {
- StringBuffer msg;
- ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
- e->Release();
- }
- }
- }
- semSchedule.wait(1000*60);
- }
- }
- catch(IException *e)
- {
- StringBuffer msg;
- ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
- e->Release();
- }
- catch(...)
- {
- ERRLOG("Unknown exception in WsWorkunits Schedule::run");
- }
- if (m_container)
- m_container->exitESP();
- return 0;
- }
- void WsWuHelpers::setXmlParameters(IWorkUnit *wu, const char *xml, bool setJobname)
- {
- if (!xml || !*xml)
- return;
- Owned<IPropertyTree> tree = createPTreeFromXMLString(xml, ipt_none, (PTreeReaderOptions)(ptr_ignoreWhiteSpace | ptr_ignoreNameSpaces));
- IPropertyTree *root = tree.get();
- if (strieq(root->queryName(), "Envelope"))
- root = root->queryPropTree("Body/*[1]");
- if (!root)
- return;
- if (setJobname)
- {
- SCMStringBuffer name;
- wu->getJobName(name);
- if (!name.length())
- wu->setJobName(root->queryName());
- }
- wu->setXmlParams(LINK(root));
- }
- void WsWuHelpers::setXmlParameters(IWorkUnit *wu, const char *xml, IArrayOf<IConstNamedValue> *variables, bool setJobname)
- {
- StringBuffer extParamXml;
- if (variables && variables->length())
- {
- Owned<IPropertyTree> paramTree = (xml && *xml) ? createPTreeFromXMLString(xml) : createPTree("input");
- ForEachItemIn(i, *variables)
- {
- IConstNamedValue &item = variables->item(i);
- const char *name = item.getName();
- const char *value = item.getValue();
- if (!name || !*name)
- continue;
- if (!value)
- {
- size_t len = strlen(name);
- char last = name[len-1];
- if (last == '-' || last == '+')
- {
- StringAttr s(name, len-1);
- paramTree->setPropInt(s.get(), last == '+' ? 1 : 0);
- }
- else
- paramTree->setPropInt(name, 1);
- continue;
- }
- paramTree->setProp(name, value);
- }
- toXML(paramTree, extParamXml);
- xml=extParamXml.str();
- }
- setXmlParameters(wu, xml, setJobname);
- }
- void WsWuHelpers::submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, bool resetVariables,
- const char *paramXml, IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs)
- {
- ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
- switch(cw->getState())
- {
- case WUStateRunning:
- case WUStateDebugPaused:
- case WUStateDebugRunning:
- case WUStateCompiling:
- case WUStateAborting:
- case WUStateBlocked:
- {
- SCMStringBuffer descr;
- throw MakeStringException(ECLWATCH_CANNOT_SUBMIT_WORKUNIT, "Cannot submit the workunit. Workunit state is '%s'.", cw->getStateDesc(descr).str());
- }
- }
- SCMStringBuffer wuid;
- cw->getWuid(wuid);
- WorkunitUpdate wu(&cw->lock());
- if(!wu.get())
- throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s.", wuid.str());
- wu->clearExceptions();
- if(notEmpty(cluster))
- wu->setClusterName(cluster);
- if(notEmpty(snapshot))
- wu->setSnapshot(snapshot);
- wu->setState(WUStateSubmitted);
- if (maxruntime)
- wu->setDebugValueInt("maxRunTime",maxruntime,true);
- if (debugs && debugs->length())
- {
- ForEachItemIn(i, *debugs)
- {
- IConstNamedValue &item = debugs->item(i);
- const char *name = item.getName();
- const char *value = item.getValue();
- if (!name || !*name)
- continue;
- if (!value)
- {
- size_t len = strlen(name);
- char last = name[len-1];
- if (last == '-' || last == '+')
- {
- StringAttr s(name, len-1);
- wu->setDebugValueInt(s.get(), last == '+' ? 1 : 0, true);
- }
- else
- wu->setDebugValueInt(name, 1, true);
- continue;
- }
- wu->setDebugValue(name, value, true);
- }
- }
- if (resetWorkflow)
- wu->resetWorkflow();
- if (!compile)
- wu->schedule();
- if (resetVariables)
- {
- SCMStringBuffer varname;
- Owned<IConstWUResultIterator> vars = &wu->getVariables();
- ForEach (*vars)
- {
- vars->query().getResultName(varname);
- Owned<IWUResult> v = wu->updateVariableByName(varname.str());
- if (v)
- v->setResultStatus(ResultStatusUndefined);
- }
- }
- setXmlParameters(wu, paramXml, variables, (wu->getAction()==WUActionExecuteExisting));
- wu->commit();
- wu.clear();
- if (!compile)
- runWorkUnit(wuid.str());
- else if (context.querySecManager())
- secSubmitWorkUnit(wuid.str(), *context.querySecManager(), *context.queryUser());
- else
- submitWorkUnit(wuid.str(), context.queryUserId(), context.queryPassword());
- AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str());
- }
- void WsWuHelpers::submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, bool resetVariables,
- const char *paramXml, IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
- if(!cw)
- throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
- return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow, resetVariables, paramXml, variables, debugs);
- }
- void WsWuHelpers::copyWsWorkunit(IEspContext &context, IWorkUnit &wu, const char *srcWuid)
- {
- Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
- Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid, false));
- SCMStringBuffer wuid;
- wu.getWuid(wuid);
- queryExtendedWU(&wu)->copyWorkUnit(src, false);
- SCMStringBuffer token;
- wu.setSecurityToken(createToken(wuid.str(), context.queryUserId(), context.queryPassword(), token).str());
- wu.commit();
- }
- void WsWuHelpers::runWsWorkunit(IEspContext &context, StringBuffer &wuid, const char *srcWuid, const char *cluster, const char *paramXml,
- IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs)
- {
- StringBufferAdaptor isvWuid(wuid);
- NewWsWorkunit wu(context);
- wu->getWuid(isvWuid);
- copyWsWorkunit(context, *wu, srcWuid);
- wu.clear();
- submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, true, paramXml, variables, debugs);
- }
- void WsWuHelpers::runWsWorkunit(IEspContext &context, IConstWorkUnit *cw, const char *srcWuid, const char *cluster, const char *paramXml,
- IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs)
- {
- WorkunitUpdate wu(&cw->lock());
- copyWsWorkunit(context, *wu, srcWuid);
- wu.clear();
- submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, true, paramXml, variables, debugs);
- }
- IException * WsWuHelpers::noteException(IWorkUnit *wu, IException *e, ErrorSeverity level)
- {
- if (wu)
- {
- Owned<IWUException> we = wu->createException();
- StringBuffer s;
- we->setExceptionMessage(e->errorMessage(s).str());
- we->setExceptionSource("WsWorkunits");
- we->setSeverity(level);
- if (level==SeverityError)
- wu->setState(WUStateFailed);
- }
- return e;
- }
- StringBuffer & WsWuHelpers::resolveQueryWuid(StringBuffer &wuid, const char *queryset, const char *query, bool notSuspended, IWorkUnit *wu)
- {
- Owned<IPropertyTree> qs = getQueryRegistry(queryset, true);
- if (!qs)
- throw noteException(wu, MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet '%s' not found", queryset));
- Owned<IPropertyTree> q = resolveQueryAlias(qs, query);
- if (!q)
- throw noteException(wu, MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query '%s/%s' not found", queryset, query));
- if (notSuspended && q->getPropBool("@suspended"))
- throw noteException(wu, MakeStringException(ECLWATCH_QUERY_SUSPENDED, "Query '%s/%s' is suspended", queryset, query));
- return wuid.append(q->queryProp("@wuid"));
- }
- void WsWuHelpers::runWsWuQuery(IEspContext &context, IConstWorkUnit *cw, const char *queryset, const char *query, const char *cluster, const char *paramXml)
- {
- StringBuffer srcWuid;
- WorkunitUpdate wu(&cw->lock());
- resolveQueryWuid(srcWuid, queryset, query, true, wu);
- copyWsWorkunit(context, *wu, srcWuid);
- wu.clear();
- submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, true, paramXml);
- }
- void WsWuHelpers::runWsWuQuery(IEspContext &context, StringBuffer &wuid, const char *queryset, const char *query, const char *cluster, const char *paramXml)
- {
- StringBuffer srcWuid;
- StringBufferAdaptor isvWuid(wuid);
- NewWsWorkunit wu(context);
- wu->getWuid(isvWuid);
- resolveQueryWuid(srcWuid, queryset, query, true, wu);
- copyWsWorkunit(context, *wu, srcWuid);
- wu.clear();
- submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, true, paramXml);
- }
- void WsWuHelpers::checkAndTrimWorkunit(const char* methodName, StringBuffer& input)
- {
- const char* trimmedInput = input.trim().str();
- if (isEmpty(trimmedInput))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: Workunit ID not set", methodName);
- if (!looksLikeAWuid(trimmedInput))
- throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: Invalid Workunit ID: %s", methodName, trimmedInput);
- return;
- }
- }
|