ws_workunitsHelpers.cpp 106 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jlib.hpp"
  14. #include "ws_workunitsHelpers.hpp"
  15. #include "exception_util.hpp"
  16. #include "daclient.hpp"
  17. #include "dalienv.hpp"
  18. #include "daaudit.hpp"
  19. #include "portlist.h"
  20. #include "dadfs.hpp"
  21. #include "fileview.hpp"
  22. #include "wuwebview.hpp"
  23. #include "dllserver.hpp"
  24. #include "wujobq.hpp"
  25. #include "hqlexpr.hpp"
  26. #include "ldapsecurity.ipp"
  27. #ifdef _USE_ZLIB
  28. #include "zcrypt.hpp"
  29. #endif
  30. namespace ws_workunits {
  31. SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, const char *owner, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
  32. {
  33. return (isEmpty(owner) || (user && streq(user, owner))) ? accessOwn : accessOthers;
  34. }
  35. SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, IConstWorkUnitInfo& cw, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
  36. {
  37. return chooseWuAccessFlagsByOwnership(user, cw.queryUser(), accessOwn, accessOthers);
  38. }
  39. const char *getWuAccessType(const char *owner, const char *user)
  40. {
  41. return (isEmpty(owner) || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS;
  42. }
  43. const char *getWuAccessType(IConstWorkUnit& cw, const char *user)
  44. {
  45. return getWuAccessType(cw.queryUser(), user);
  46. }
  47. void getUserWuAccessFlags(IEspContext& context, SecAccessFlags& accessOwn, SecAccessFlags& accessOthers, bool except)
  48. {
  49. if (!context.authorizeFeature(OWN_WU_ACCESS, accessOwn))
  50. accessOwn = SecAccess_None;
  51. if (!context.authorizeFeature(OTHERS_WU_ACCESS, accessOthers))
  52. accessOthers = SecAccess_None;
  53. if (except && (accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
  54. {
  55. AuditSystemAccess(context.queryUserId(), false, "Access Denied: User can't view any workunits");
  56. VStringBuffer msg("Access Denied: User %s does not have rights to access workunits.", context.queryUserId());
  57. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "%s", msg.str());
  58. }
  59. }
  60. SecAccessFlags getWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw)
  61. {
  62. SecAccessFlags accessFlag = SecAccess_None;
  63. cxt.authorizeFeature(getWuAccessType(cw, cxt.queryUserId()), accessFlag);
  64. return accessFlag;
  65. }
  66. void ensureWsWorkunitAccessByOwnerId(IEspContext& cxt, const char* owner, SecAccessFlags minAccess)
  67. {
  68. if (!cxt.validateFeatureAccess(getWuAccessType(owner, cxt.queryUserId()), minAccess, false))
  69. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
  70. }
  71. void ensureWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw, SecAccessFlags minAccess)
  72. {
  73. if (!cxt.validateFeatureAccess(getWuAccessType(cw, cxt.queryUserId()), minAccess, false))
  74. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
  75. }
  76. void ensureWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess)
  77. {
  78. Owned<IWorkUnitFactory> wf = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  79. Owned<IConstWorkUnit> cw = wf->openWorkUnit(wuid);
  80. if (!cw)
  81. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT, "Failed to open workunit %s when ensuring workunit access", wuid);
  82. ensureWsWorkunitAccess(context, *cw, minAccess);
  83. }
  84. void ensureWsCreateWorkunitAccess(IEspContext& cxt)
  85. {
  86. if (!cxt.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
  87. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
  88. }
  89. StringBuffer &getWuidFromLogicalFileName(IEspContext &context, const char *logicalName, StringBuffer &wuid)
  90. {
  91. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  92. userdesc->set(context.queryUserId(), context.queryPassword());
  93. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
  94. if (!df)
  95. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.",logicalName);
  96. return wuid.append(df->queryAttributes().queryProp("@workunit"));
  97. }
  98. void formatDuration(StringBuffer &s, unsigned ms)
  99. {
  100. unsigned days = ms / (1000*60*60*24);
  101. ms %= (1000*60*60*24);
  102. unsigned hours = ms / (1000*60*60);
  103. ms %= (1000*60*60);
  104. unsigned mins = ms / (1000*60);
  105. ms %= (1000*60);
  106. unsigned secs = ms / 1000;
  107. ms %= 1000;
  108. if (days)
  109. s.appendf("%d days ", days);
  110. if (hours || s.length())
  111. s.appendf("%d:", hours);
  112. if (mins || s.length())
  113. s.appendf("%d:", mins);
  114. if (s.length())
  115. s.appendf("%02d.%03d", secs, ms);
  116. else
  117. s.appendf("%d.%03d", secs, ms);
  118. }
  119. WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf(0), numalert(0)
  120. {
  121. Owned<IConstWUExceptionIterator> it = &wu.getExceptions();
  122. ForEach(*it)
  123. {
  124. IConstWUException & cur = it->query();
  125. SCMStringBuffer src, msg, file;
  126. Owned<IEspECLException> e= createECLException("","");
  127. e->setCode(cur.getExceptionCode());
  128. e->setSource(cur.getExceptionSource(src).str());
  129. e->setMessage(cur.getExceptionMessage(msg).str());
  130. e->setFileName(cur.getExceptionFileName(file).str());
  131. e->setLineNo(cur.getExceptionLineNo());
  132. e->setColumn(cur.getExceptionColumn());
  133. if (cur.getActivityId())
  134. e->setActivity(cur.getActivityId());
  135. const char * label = "";
  136. switch (cur.getSeverity())
  137. {
  138. default:
  139. case SeverityError: label = "Error"; numerr++; break;
  140. case SeverityWarning: label = "Warning"; numwrn++; break;
  141. case SeverityInformation: label = "Info"; numinf++; break;
  142. case SeverityAlert: label = "Alert"; numalert++; break;
  143. }
  144. e->setSeverity(label);
  145. errors.append(*e.getLink());
  146. }
  147. }
  148. #define SDS_LOCK_TIMEOUT 30000
  149. void getSashaNode(SocketEndpoint &ep)
  150. {
  151. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  152. Owned<IConstEnvironment> env = factory->openEnvironment();
  153. if (!env)
  154. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Cannot get environment information.");
  155. Owned<IPropertyTree> root = &env->getPTree();
  156. IPropertyTree *pt = root->queryPropTree("Software/SashaServerProcess[1]/Instance[1]");
  157. if (!pt)
  158. throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Archive Server not found.");
  159. ep.set(pt->queryProp("@netAddress"), pt->getPropInt("@port",DEFAULT_SASHA_PORT));
  160. }
  161. void WsWuInfo::getSourceFiles(IEspECLWorkunit &info, unsigned long flags)
  162. {
  163. if (!(flags & WUINFO_IncludeSourceFiles))
  164. return;
  165. try
  166. {
  167. Owned<IUserDescriptor> userdesc;
  168. StringBuffer username;
  169. context.getUserID(username);
  170. const char* passwd = context.queryPassword();
  171. userdesc.setown(createUserDescriptor());
  172. userdesc->set(username.str(), passwd);
  173. IArrayOf<IEspECLSourceFile> files;
  174. if (version < 1.27)
  175. {
  176. Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
  177. ForEach(*f)
  178. {
  179. IPropertyTree &query = f->query();
  180. const char *clusterName = query.queryProp("@cluster");
  181. const char *fileName = query.queryProp("@name");
  182. int fileCount = query.getPropInt("@useCount");
  183. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  184. if(clusterName && *clusterName)
  185. {
  186. file->setFileCluster(clusterName);
  187. }
  188. if (version > 1.11)
  189. {
  190. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  191. if (filetrees->first())
  192. file->setIsSuperFile(true);
  193. }
  194. if (fileName && *fileName)
  195. {
  196. file->setName(fileName);
  197. }
  198. file->setCount(fileCount);
  199. files.append(*file.getLink());
  200. }
  201. }
  202. else
  203. {
  204. StringArray fileNames;
  205. Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
  206. ForEach(*f)
  207. {
  208. IPropertyTree &query = f->query();
  209. const char *clusterName = query.queryProp("@cluster");
  210. const char *fileName = query.queryProp("@name");
  211. int fileCount = query.getPropInt("@useCount");
  212. bool bFound = false;
  213. if (fileName && *fileName && (fileNames.length() > 0))
  214. {
  215. for (unsigned i = 0; i < fileNames.length(); i++ ) // MORE - unnecessary n^2 process
  216. {
  217. const char *fileName0 = fileNames.item(i);
  218. if (!stricmp(fileName, fileName0))
  219. {
  220. bFound = true;
  221. break;
  222. }
  223. }
  224. }
  225. if (bFound)
  226. continue;
  227. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  228. if(clusterName && *clusterName)
  229. {
  230. file->setFileCluster(clusterName);
  231. }
  232. if (fileName && *fileName)
  233. {
  234. file->setName(fileName);
  235. }
  236. file->setCount(fileCount);
  237. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  238. if (filetrees->first())
  239. {
  240. file->setIsSuperFile(true);
  241. getSubFiles(filetrees, file, fileNames);
  242. }
  243. files.append(*file.getLink());
  244. }
  245. }
  246. info.setSourceFiles(files);
  247. }
  248. catch(IException* e)
  249. {
  250. StringBuffer eMsg;
  251. ERRLOG("%s", e->errorMessage(eMsg).str());
  252. info.setSourceFilesDesc(eMsg.str());
  253. e->Release();
  254. }
  255. }
  256. void WsWuInfo::getExceptions(IEspECLWorkunit &info, unsigned long flags)
  257. {
  258. if ((flags & WUINFO_IncludeExceptions) || version > 1.16)
  259. {
  260. WsWUExceptions errors(*cw);
  261. if (version > 1.16)
  262. {
  263. info.setErrorCount(errors.ErrCount());
  264. info.setWarningCount(errors.WrnCount());
  265. info.setInfoCount(errors.InfCount());
  266. info.setAlertCount(errors.AlertCount());
  267. }
  268. if ((flags & WUINFO_IncludeExceptions))
  269. info.setExceptions(errors);
  270. }
  271. }
  272. void WsWuInfo::getVariables(IEspECLWorkunit &info, unsigned long flags)
  273. {
  274. if (!(flags & WUINFO_IncludeVariables))
  275. return;
  276. try
  277. {
  278. IArrayOf<IEspECLResult> results;
  279. Owned<IConstWUResultIterator> vars = &cw->getVariables();
  280. ForEach(*vars)
  281. getResult(vars->query(), results, flags);
  282. info.setVariables(results);
  283. results.kill();
  284. }
  285. catch(IException* e)
  286. {
  287. StringBuffer eMsg;
  288. ERRLOG("%s", e->errorMessage(eMsg).str());
  289. info.setVariablesDesc(eMsg.str());
  290. e->Release();
  291. }
  292. }
  293. void WsWuInfo::addTimerToList(SCMStringBuffer& name, const char * scope, IConstWUStatistic & stat, IArrayOf<IEspECLTimer>& timers)
  294. {
  295. StringBuffer fd;
  296. formatStatistic(fd, stat.getValue(), stat.getMeasure());
  297. Owned<IEspECLTimer> t= createECLTimer("","");
  298. name.s.replace('_', ' '); // yuk!
  299. t->setName(name.str());
  300. t->setValue(fd.str());
  301. //Theoretically this could overflow, in practice it is unlikely - fix in the new stats interface when implemented
  302. t->setCount((unsigned)stat.getCount());
  303. if (version > 1.19)
  304. {
  305. StringAttr graphName;
  306. unsigned graphNum;
  307. unsigned subGraphNum = 0;
  308. unsigned subId = 0;
  309. if (parseGraphScope(scope, graphName, graphNum, subId) ||
  310. parseGraphTimerLabel(name.str(), graphName, graphNum, subGraphNum, subId)) // leacy
  311. {
  312. if (graphName.length() > 0)
  313. t->setGraphName(graphName);
  314. if (subId > 0)
  315. t->setSubGraphId((int)subId);
  316. }
  317. }
  318. timers.append(*t.getLink());
  319. }
  320. void WsWuInfo::doGetTimers(IArrayOf<IEspECLTimer>& timers)
  321. {
  322. unsigned __int64 totalThorTimeValue = 0;
  323. unsigned __int64 totalThorTimerCount = 0; //Do we need this?
  324. StatisticsFilter filter;
  325. filter.setScopeDepth(1, 2);
  326. filter.setMeasure(SMeasureTimeNs);
  327. Owned<IConstWUStatisticIterator> it = &cw->getStatistics(&filter);
  328. if (it->first())
  329. {
  330. ForEach(*it)
  331. {
  332. IConstWUStatistic & cur = it->query();
  333. SCMStringBuffer name, scope;
  334. cur.getDescription(name, true);
  335. cur.getScope(scope);
  336. bool isThorTiming = false;//Should it be renamed as isClusterTiming?
  337. if ((cur.getCreatorType() == SCTsummary) && (cur.getKind() == StTimeElapsed) && streq(scope.str(), GLOBAL_SCOPE))
  338. {
  339. SCMStringBuffer creator;
  340. cur.getCreator(creator);
  341. if (streq(creator.str(), "thor") || streq(creator.str(), "hthor") ||
  342. streq(creator.str(), "roxie"))
  343. isThorTiming = true;
  344. }
  345. else if (strieq(name.str(), TOTALTHORTIME)) // legacy
  346. isThorTiming = true;
  347. if (isThorTiming)
  348. {
  349. totalThorTimeValue += cur.getValue();
  350. totalThorTimerCount += cur.getCount();
  351. }
  352. else
  353. addTimerToList(name, scope.str(), cur, timers);
  354. }
  355. }
  356. if (totalThorTimeValue > 0)
  357. {
  358. StringBuffer totalThorTimeText;
  359. formatStatistic(totalThorTimeText, totalThorTimeValue, SMeasureTimeNs);
  360. Owned<IEspECLTimer> t= createECLTimer("","");
  361. if (version > 1.52)
  362. t->setName(TOTALCLUSTERTIME);
  363. else
  364. t->setName(TOTALTHORTIME);
  365. t->setValue(totalThorTimeText.str());
  366. t->setCount((unsigned)totalThorTimerCount);
  367. timers.append(*t.getLink());
  368. }
  369. }
  370. void WsWuInfo::getTimers(IEspECLWorkunit &info, unsigned long flags)
  371. {
  372. if (!(flags & WUINFO_IncludeTimers))
  373. return;
  374. try
  375. {
  376. IArrayOf<IEspECLTimer> timers;
  377. doGetTimers(timers);
  378. info.setTimers(timers);
  379. }
  380. catch(IException* e)
  381. {
  382. StringBuffer eMsg;
  383. ERRLOG("%s", e->errorMessage(eMsg).str());
  384. info.setTimersDesc(eMsg.str());
  385. e->Release();
  386. }
  387. }
  388. unsigned WsWuInfo::getTimerCount()
  389. {
  390. unsigned numTimers = 0;
  391. try
  392. {
  393. //This filter must match the filter in the function above, otherwise it will be inconsistent
  394. StatisticsFilter filter;
  395. filter.setScopeDepth(1, 2);
  396. filter.setMeasure(SMeasureTimeNs);
  397. Owned<IConstWUStatisticIterator> it = &cw->getStatistics(&filter);
  398. ForEach(*it)
  399. numTimers++;
  400. }
  401. catch(IException* e)
  402. {
  403. StringBuffer eMsg;
  404. ERRLOG("%s", e->errorMessage(eMsg).str());
  405. e->Release();
  406. }
  407. return numTimers;
  408. }
  409. mapEnums queryFileTypes[] = {
  410. { FileTypeCpp, "cpp" },
  411. { FileTypeDll, "dll" },
  412. { FileTypeResText, "res" },
  413. { FileTypeHintXml, "hint" },
  414. { FileTypeXml, "xml" },
  415. { FileTypeSize, NULL },
  416. };
  417. void WsWuInfo::getHelpers(IEspECLWorkunit &info, unsigned long flags)
  418. {
  419. try
  420. {
  421. IArrayOf<IEspECLHelpFile> helpers;
  422. unsigned helpersCount = 2; // ECL + Workunit XML are also helpers...
  423. Owned <IConstWUQuery> query = cw->getQuery();
  424. if(!query)
  425. {
  426. ERRLOG("Cannot get Query for this workunit.");
  427. info.setHelpersDesc("Cannot get Query for this workunit.");
  428. }
  429. else
  430. {
  431. if (flags & WUINFO_IncludeECL)
  432. {
  433. SCMStringBuffer queryText;
  434. query->getQueryShortText(queryText);
  435. if (queryText.length())
  436. {
  437. if((flags & WUINFO_TruncateEclTo64k) && (queryText.length() > 64000))
  438. queryText.setLen(queryText.str(), 64000);
  439. IEspECLQuery* q=&info.updateQuery();
  440. q->setText(queryText.str());
  441. }
  442. }
  443. if (version > 1.34)
  444. {
  445. SCMStringBuffer mainDefinition;
  446. query->getQueryMainDefinition(mainDefinition);
  447. if(mainDefinition.length())
  448. {
  449. IEspECLQuery* q=&info.updateQuery();
  450. q->setQueryMainDefinition(mainDefinition.str());
  451. }
  452. }
  453. if (version > 1.30)
  454. {
  455. info.setHasArchiveQuery(query->hasArchive());
  456. }
  457. for (unsigned i = 0; i < FileTypeSize; i++)
  458. getHelpFiles(query, (WUFileType) i, helpers, flags, helpersCount);
  459. }
  460. getWorkunitThorLogInfo(helpers, info, flags, helpersCount);
  461. if (cw->getWuidVersion() > 0)
  462. {
  463. Owned<IPropertyTreeIterator> eclAgents = cw->getProcesses("EclAgent", NULL);
  464. ForEach (*eclAgents)
  465. {
  466. StringBuffer logName;
  467. IPropertyTree& eclAgent = eclAgents->query();
  468. eclAgent.getProp("@log",logName);
  469. if (!logName.length())
  470. continue;
  471. helpersCount++;
  472. if (!(flags & WUINFO_IncludeHelpers))
  473. continue;
  474. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  475. h->setName(logName.str());
  476. h->setType(File_EclAgentLog);
  477. if (version >= 1.43)
  478. {
  479. offset_t fileSize;
  480. if (getFileSize(logName.str(), NULL, fileSize))
  481. h->setFileSize(fileSize);
  482. if (version >= 1.44)
  483. {
  484. if (eclAgent.hasProp("@pid"))
  485. h->setPID(eclAgent.getPropInt("@pid"));
  486. else
  487. h->setPID(cw->getAgentPID());
  488. }
  489. }
  490. helpers.append(*h.getLink());
  491. }
  492. }
  493. else // legacy wuid
  494. {
  495. Owned<IStringIterator> eclAgentLogs = cw->getLogs("EclAgent");
  496. ForEach (*eclAgentLogs)
  497. {
  498. SCMStringBuffer name;
  499. eclAgentLogs->str(name);
  500. if (name.length() < 1)
  501. continue;
  502. helpersCount++;
  503. if (!(flags & WUINFO_IncludeHelpers))
  504. break;
  505. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  506. h->setName(name.str());
  507. h->setType(File_EclAgentLog);
  508. if (version >= 1.43)
  509. {
  510. offset_t fileSize;
  511. if (getFileSize(name.str(), NULL, fileSize))
  512. h->setFileSize(fileSize);
  513. }
  514. helpers.append(*h.getLink());
  515. break;
  516. }
  517. }
  518. info.setHelpers(helpers);
  519. info.setHelpersCount(helpersCount);
  520. }
  521. catch(IException* e)
  522. {
  523. StringBuffer eMsg;
  524. ERRLOG("%s", e->errorMessage(eMsg).str());
  525. info.setHelpersDesc(eMsg.str());
  526. e->Release();
  527. }
  528. }
  529. void WsWuInfo::getApplicationValues(IEspECLWorkunit &info, unsigned long flags)
  530. {
  531. if (!(flags & WUINFO_IncludeApplicationValues))
  532. return;
  533. try
  534. {
  535. IArrayOf<IEspApplicationValue> av;
  536. Owned<IConstWUAppValueIterator> app(&cw->getApplicationValues());
  537. ForEach(*app)
  538. {
  539. IConstWUAppValue& val=app->query();
  540. Owned<IEspApplicationValue> t= createApplicationValue("","");
  541. t->setApplication(val.queryApplication());
  542. t->setName(val.queryName());
  543. t->setValue(val.queryValue());
  544. av.append(*t.getLink());
  545. }
  546. info.setApplicationValues(av);
  547. }
  548. catch(IException* e)
  549. {
  550. StringBuffer eMsg;
  551. ERRLOG("%s", e->errorMessage(eMsg).str());
  552. info.setApplicationValuesDesc(eMsg.str());
  553. e->Release();
  554. }
  555. }
  556. void WsWuInfo::getDebugValues(IEspECLWorkunit &info, unsigned long flags)
  557. {
  558. if (!(flags & WUINFO_IncludeDebugValues))
  559. {
  560. if (version >= 1.50)
  561. {
  562. unsigned debugValueCount = 0;
  563. Owned<IStringIterator> debugs(&cw->getDebugValues());
  564. ForEach(*debugs)
  565. debugValueCount++;
  566. info.setDebugValueCount(debugValueCount);
  567. }
  568. return;
  569. }
  570. try
  571. {
  572. IArrayOf<IEspDebugValue> dv;
  573. Owned<IStringIterator> debugs(&cw->getDebugValues());
  574. ForEach(*debugs)
  575. {
  576. SCMStringBuffer name, val;
  577. debugs->str(name);
  578. cw->getDebugValue(name.str(),val);
  579. Owned<IEspDebugValue> t= createDebugValue("","");
  580. t->setName(name.str());
  581. t->setValue(val.str());
  582. dv.append(*t.getLink());
  583. }
  584. if (version >= 1.50)
  585. info.setDebugValueCount(dv.length());
  586. info.setDebugValues(dv);
  587. }
  588. catch(IException* e)
  589. {
  590. StringBuffer eMsg;
  591. ERRLOG("%s", e->errorMessage(eMsg).str());
  592. info.setDebugValuesDesc(eMsg.str());
  593. e->Release();
  594. }
  595. }
  596. const char *getGraphNum(const char *s,unsigned &num)
  597. {
  598. while (*s && !isdigit(*s))
  599. s++;
  600. num = 0;
  601. while (isdigit(*s))
  602. {
  603. num = num*10+*s-'0';
  604. s++;
  605. }
  606. return s;
  607. }
  608. bool WsWuInfo::hasSubGraphTimings()
  609. {
  610. StatisticsFilter filter;
  611. filter.setScopeType(SSTsubgraph);
  612. filter.setKind(StTimeElapsed);
  613. Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
  614. return times->first();
  615. }
  616. bool WsWuInfo::legacyHasSubGraphTimings()
  617. {
  618. StatisticsFilter filter;
  619. filter.setScopeDepth(1); // only "global" timers.
  620. filter.setMeasure(SMeasureTimeNs);
  621. Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
  622. ForEach(*times)
  623. {
  624. IConstWUStatistic & cur = times->query();
  625. SCMStringBuffer name;
  626. cur.getDescription(name, false);
  627. StringAttr graphName;
  628. unsigned graphNum;
  629. unsigned subGraphNum;
  630. unsigned subId;
  631. if (parseGraphTimerLabel(name.str(), graphName, graphNum, subGraphNum, subId))
  632. return true;
  633. }
  634. return false;
  635. }
  636. void WsWuInfo::doGetGraphs(IArrayOf<IEspECLGraph>& graphs)
  637. {
  638. SCMStringBuffer runningGraph;
  639. WUGraphIDType id;
  640. WUState st = cw->getState();
  641. bool running = (!(st==WUStateFailed || st==WUStateAborted || st==WUStateCompleted) && cw->getRunningGraph(runningGraph,id));
  642. Owned<IConstWUGraphMetaIterator> it = &cw->getGraphsMeta(GraphTypeAny);
  643. ForEach(*it)
  644. {
  645. IConstWUGraphMeta &graph = it->query();
  646. SCMStringBuffer name, label, type;
  647. graph.getName(name);
  648. graph.getLabel(label);
  649. graph.getTypeName(type);
  650. WUGraphState graphState = graph.getState();
  651. Owned<IEspECLGraph> g= createECLGraph();
  652. g->setName(name.str());
  653. g->setLabel(label.str());
  654. g->setType(type.str());
  655. if (WUGraphComplete == graphState)
  656. g->setComplete(true);
  657. else if (running && (WUGraphRunning == graphState))
  658. {
  659. g->setRunning(true);
  660. g->setRunningId(id);
  661. }
  662. else if (WUGraphFailed == graphState)
  663. g->setFailed(true);
  664. if (version >= 1.53)
  665. {
  666. SCMStringBuffer s;
  667. Owned<IConstWUStatistic> whenGraphStarted = cw->getStatistic(NULL, name.str(), StWhenGraphStarted);
  668. Owned<IConstWUStatistic> whenGraphFinished = cw->getStatistic(NULL, name.str(), StWhenGraphFinished);
  669. if (whenGraphStarted)
  670. g->setWhenStarted(whenGraphStarted->getFormattedValue(s).str());
  671. if (whenGraphFinished)
  672. g->setWhenFinished(whenGraphFinished->getFormattedValue(s).str());
  673. }
  674. graphs.append(*g.getLink());
  675. }
  676. }
  677. void WsWuInfo::getGraphInfo(IEspECLWorkunit &info, unsigned long flags)
  678. {
  679. if (version > 1.01)
  680. {
  681. info.setHaveSubGraphTimings(false);
  682. if (hasSubGraphTimings() || legacyHasSubGraphTimings())
  683. info.setHaveSubGraphTimings(true);
  684. }
  685. if (!(flags & WUINFO_IncludeGraphs))
  686. return;
  687. try
  688. {
  689. IArrayOf<IEspECLGraph> graphs;
  690. doGetGraphs(graphs);
  691. info.setGraphs(graphs);
  692. }
  693. catch(IException* e)
  694. {
  695. StringBuffer eMsg;
  696. ERRLOG("%s", e->errorMessage(eMsg).str());
  697. info.setGraphsDesc(eMsg.str());
  698. e->Release();
  699. }
  700. }
  701. void WsWuInfo::getWUGraphNameAndTypes(WUGraphType graphType, IArrayOf<IEspNameAndType>& graphNameAndTypes)
  702. {
  703. Owned<IConstWUGraphMetaIterator> it = &cw->getGraphsMeta(graphType);
  704. ForEach(*it)
  705. {
  706. SCMStringBuffer name, type;
  707. IConstWUGraphMeta &graph = it->query();
  708. Owned<IEspNameAndType> nameAndType = createNameAndType();
  709. nameAndType->setName(graph.getName(name).str());
  710. nameAndType->setType(graph.getTypeName(type).str());
  711. graphNameAndTypes.append(*nameAndType.getLink());
  712. }
  713. }
  714. void WsWuInfo::getGraphTimingData(IArrayOf<IConstECLTimingData> &timingData)
  715. {
  716. StatisticsFilter filter(SCTall, SSTsubgraph, SMeasureTimeNs, StTimeElapsed);
  717. Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
  718. bool matched = false;
  719. ForEach(*times)
  720. {
  721. IConstWUStatistic & cur = times->query();
  722. SCMStringBuffer scope;
  723. cur.getScope(scope);
  724. StringAttr graphName;
  725. unsigned graphNum;
  726. unsigned subGraphId;
  727. if (parseGraphScope(scope.str(), graphName, graphNum, subGraphId))
  728. {
  729. unsigned time = (unsigned)nanoToMilli(cur.getValue());
  730. SCMStringBuffer name;
  731. cur.getDescription(name, true);
  732. Owned<IEspECLTimingData> g = createECLTimingData();
  733. g->setName(name.str());
  734. g->setGraphNum(graphNum);
  735. g->setSubGraphNum(subGraphId); // Use the Id - the number is not known
  736. g->setGID(subGraphId);
  737. g->setMS(time);
  738. g->setMin(time/60000);
  739. timingData.append(*g.getClear());
  740. matched = true;
  741. }
  742. }
  743. if (!matched)
  744. legacyGetGraphTimingData(timingData);
  745. }
  746. void WsWuInfo::legacyGetGraphTimingData(IArrayOf<IConstECLTimingData> &timingData)
  747. {
  748. StatisticsFilter filter;
  749. filter.setScopeDepth(1);
  750. filter.setMeasure(SMeasureTimeNs);
  751. Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
  752. ForEach(*times)
  753. {
  754. IConstWUStatistic & cur = times->query();
  755. SCMStringBuffer name;
  756. cur.getDescription(name, false); // was previously always filled in.
  757. StringAttr graphName;
  758. unsigned graphNum;
  759. unsigned subGraphNum;
  760. unsigned subId;
  761. if (parseGraphTimerLabel(name.str(), graphName, graphNum, subGraphNum, subId))
  762. {
  763. unsigned time = (unsigned)nanoToMilli(cur.getValue());
  764. Owned<IEspECLTimingData> g = createECLTimingData();
  765. g->setName(name.str());
  766. g->setGraphNum(graphNum);
  767. g->setSubGraphNum(subGraphNum);
  768. g->setGID(subId);
  769. g->setMS(time);
  770. g->setMin(time/60000);
  771. timingData.append(*g.getClear());
  772. }
  773. }
  774. }
  775. void WsWuInfo::getEventScheduleFlag(IEspECLWorkunit &info)
  776. {
  777. info.setEventSchedule(0);
  778. if (info.getState() && !stricmp(info.getState(), "wait"))
  779. {
  780. info.setEventSchedule(2); //Can deschedule
  781. }
  782. else
  783. {
  784. Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
  785. if (it)
  786. {
  787. ForEach(*it)
  788. {
  789. IConstWorkflowItem *r = it->query();
  790. if (!r)
  791. continue;
  792. Owned<IWorkflowEvent> wfevent = r->getScheduleEvent();
  793. if (!wfevent)
  794. continue;
  795. if ((!r->hasScheduleCount() || (r->queryScheduleCountRemaining() > 0))
  796. && info.getState() && !strieq(info.getState(), "scheduled")
  797. && !strieq(info.getState(), "aborting") && !strieq(info.getState(), "aborted")
  798. && !strieq(info.getState(), "failed") && !strieq(info.getState(), "archived"))
  799. {
  800. info.setEventSchedule(1); //Can reschedule
  801. break;
  802. }
  803. }
  804. }
  805. }
  806. }
  807. unsigned WsWuInfo::getTotalThorTime()
  808. {
  809. StatisticsFilter filter;
  810. filter.setCreatorType(SCTsummary);
  811. filter.setScope(GLOBAL_SCOPE);
  812. filter.setKind(StTimeElapsed);
  813. //Should only be a single value
  814. unsigned totalThorTimeMS = 0;
  815. Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
  816. ForEach(*times)
  817. {
  818. totalThorTimeMS += (unsigned)nanoToMilli(times->query().getValue());
  819. }
  820. return totalThorTimeMS;
  821. }
  822. unsigned WsWuInfo::getLegacyTotalThorTime()
  823. {
  824. //4.2.x backward compatibility - only scope depth and measure filters work
  825. StatisticsFilter filter;
  826. filter.setScopeDepth(1); // only global
  827. filter.setMeasure(SMeasureTimeNs);
  828. Owned<IConstWUStatisticIterator> times = &cw->getStatistics(&filter);
  829. SCMStringBuffer oldname;
  830. ForEach(*times)
  831. {
  832. times->query().getDescription(oldname, false); // description will be set up
  833. if (streq(oldname.str(), TOTALTHORTIME))
  834. return (unsigned)nanoToMilli(times->query().getValue());
  835. }
  836. return 0;
  837. }
  838. void WsWuInfo::getCommon(IEspECLWorkunit &info, unsigned long flags)
  839. {
  840. info.setWuid(cw->queryWuid());
  841. info.setProtected(cw->isProtected() ? 1 : 0);
  842. info.setJobname(cw->queryJobName());
  843. info.setOwner(cw->queryUser());
  844. clusterName.set(cw->queryClusterName());
  845. info.setCluster(clusterName.str());
  846. SCMStringBuffer s;
  847. info.setSnapshot(cw->getSnapshot(s).str());
  848. if ((cw->getState() == WUStateScheduled) && cw->aborting())
  849. {
  850. info.setStateID(WUStateAborting);
  851. info.setState("aborting");
  852. }
  853. else
  854. {
  855. info.setStateID(cw->getState());
  856. info.setState(cw->queryStateDesc());
  857. }
  858. if (cw->isPausing())
  859. info.setIsPausing(true);
  860. getEventScheduleFlag(info);
  861. if (version > 1.27)
  862. {
  863. unsigned totalThorTimeMS = getTotalThorTime();
  864. if (totalThorTimeMS == 0)
  865. totalThorTimeMS = getLegacyTotalThorTime();
  866. if (totalThorTimeMS)
  867. {
  868. StringBuffer totalThorTimeStr;
  869. formatDuration(totalThorTimeStr, totalThorTimeMS);
  870. if (version > 1.52)
  871. info.setTotalClusterTime(totalThorTimeStr.str());
  872. else
  873. info.setTotalThorTime(totalThorTimeStr.str());
  874. }
  875. }
  876. WsWuDateTime dt;
  877. cw->getTimeScheduled(dt);
  878. if(dt.isValid())
  879. info.setDateTimeScheduled(dt.getString(s).str());
  880. }
  881. void WsWuInfo::setWUAbortTime(IEspECLWorkunit &info, unsigned __int64 abortTS)
  882. {
  883. StringBuffer abortTimeStr;
  884. formatStatistic(abortTimeStr, abortTS, SMeasureTimestampUs);
  885. if ((abortTimeStr.length() > 19) && (abortTimeStr.charAt(10) == 'T') && (abortTimeStr.charAt(19) == '.'))
  886. {
  887. abortTimeStr.setCharAt(10, ' ');
  888. abortTimeStr.setLength(19);
  889. }
  890. info.setAbortTime(abortTimeStr.str());
  891. }
  892. void WsWuInfo::getInfo(IEspECLWorkunit &info, unsigned long flags)
  893. {
  894. getCommon(info, flags);
  895. SecAccessFlags accessFlag = getWsWorkunitAccess(context, *cw);
  896. info.setAccessFlag(accessFlag);
  897. SCMStringBuffer s;
  898. info.setStateEx(cw->getStateEx(s).str());
  899. WUState state = cw->getState();
  900. if ((state == WUStateAborting) || (state == WUStateAborted))
  901. {
  902. unsigned __int64 abortTS = cw->getAbortTimeStamp();
  903. if (abortTS > 0) //AbortTimeStamp may not be set in old wu
  904. {
  905. setWUAbortTime(info, abortTS);
  906. cw->getAbortBy(s);
  907. if (s.length())
  908. info.setAbortBy(s.str());
  909. }
  910. }
  911. info.setPriorityClass(cw->getPriority());
  912. info.setPriorityLevel(cw->getPriorityLevel());
  913. if (context.querySecManager())
  914. info.setScope(cw->queryWuScope());
  915. info.setActionEx(cw->queryActionDesc());
  916. info.setDescription(cw->getDebugValue("description", s).str());
  917. if (version > 1.21)
  918. info.setXmlParams(cw->getXmlParams(s, true).str());
  919. info.setResultLimit(cw->getResultLimit());
  920. info.setArchived(false);
  921. info.setGraphCount(cw->getGraphCount());
  922. info.setSourceFileCount(cw->getSourceFileCount());
  923. info.setResultCount(cw->getResultCount());
  924. info.setWorkflowCount(cw->queryEventScheduledCount());
  925. info.setVariableCount(cw->getVariableCount());
  926. info.setTimerCount(getTimerCount());
  927. info.setSourceFileCount(cw->getSourceFileCount());
  928. info.setApplicationValueCount(cw->getApplicationValueCount());
  929. info.setHasDebugValue(cw->hasDebugValue("__calculated__complexity__"));
  930. getClusterInfo(info, flags);
  931. getExceptions(info, flags);
  932. getHelpers(info, flags);
  933. getGraphInfo(info, flags);
  934. getSourceFiles(info, flags);
  935. getResults(info, flags);
  936. getVariables(info, flags);
  937. getTimers(info, flags);
  938. getDebugValues(info, flags);
  939. getApplicationValues(info, flags);
  940. getWorkflow(info, flags);
  941. }
  942. unsigned WsWuInfo::getWorkunitThorLogInfo(IArrayOf<IEspECLHelpFile>& helpers, IEspECLWorkunit &info, unsigned long flags, unsigned& helpersCount)
  943. {
  944. unsigned countThorLog = 0;
  945. IArrayOf<IConstThorLogInfo> thorLogList;
  946. if (cw->getWuidVersion() > 0)
  947. {
  948. StringAttr clusterName(cw->queryClusterName());
  949. if (!clusterName.length()) //Cluster name may not be set yet
  950. return countThorLog;
  951. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
  952. if (!clusterInfo)
  953. {
  954. WARNLOG("Cannot find TargetClusterInfo for workunit %s", cw->queryWuid());
  955. return countThorLog;
  956. }
  957. unsigned numberOfSlaveLogs = clusterInfo->getNumberOfSlaveLogs();
  958. BoolHash uniqueProcesses;
  959. Owned<IStringIterator> thorInstances = cw->getProcesses("Thor");
  960. ForEach (*thorInstances)
  961. {
  962. SCMStringBuffer processName;
  963. thorInstances->str(processName);
  964. if (processName.length() < 1)
  965. continue;
  966. bool* found = uniqueProcesses.getValue(processName.str());
  967. if (found && *found)
  968. continue;
  969. uniqueProcesses.setValue(processName.str(), true);
  970. StringBuffer groupName;
  971. getClusterThorGroupName(groupName, processName.str());
  972. Owned<IStringIterator> thorLogs = cw->getLogs("Thor", processName.str());
  973. ForEach (*thorLogs)
  974. {
  975. SCMStringBuffer logName;
  976. thorLogs->str(logName);
  977. if (logName.length() < 1)
  978. continue;
  979. countThorLog++;
  980. StringBuffer fileType;
  981. if (countThorLog < 2)
  982. fileType.append(File_ThorLog);
  983. else
  984. fileType.appendf("%s%d", File_ThorLog, countThorLog);
  985. helpersCount++;
  986. if (flags & WUINFO_IncludeHelpers)
  987. {
  988. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  989. h->setName(logName.str());
  990. h->setDescription(processName.str());
  991. h->setType(fileType.str());
  992. if (version >= 1.43)
  993. {
  994. offset_t fileSize;
  995. if (getFileSize(logName.str(), NULL, fileSize))
  996. h->setFileSize(fileSize);
  997. }
  998. helpers.append(*h.getLink());
  999. }
  1000. if (version < 1.38)
  1001. continue;
  1002. const char* pStr = logName.str();
  1003. const char* ppStr = strstr(pStr, "/thormaster.");
  1004. if (!ppStr)
  1005. {
  1006. WARNLOG("Invalid thorlog entry in workunit xml: %s", logName.str());
  1007. continue;
  1008. }
  1009. ppStr += 12;
  1010. StringBuffer logDate = ppStr;
  1011. logDate.setLength(10);
  1012. Owned<IEspThorLogInfo> thorLog = createThorLogInfo("","");
  1013. thorLog->setProcessName(processName.str());
  1014. thorLog->setClusterGroup(groupName.str());
  1015. thorLog->setLogDate(logDate.str());
  1016. thorLog->setNumberSlaves(numberOfSlaveLogs);
  1017. thorLogList.append(*thorLog.getLink());
  1018. }
  1019. }
  1020. }
  1021. else //legacy wuid
  1022. {
  1023. Owned<IStringIterator> thorLogs = cw->getLogs("Thor");
  1024. ForEach (*thorLogs)
  1025. {
  1026. SCMStringBuffer name;
  1027. thorLogs->str(name);
  1028. if (name.length() < 1)
  1029. continue;
  1030. countThorLog++;
  1031. StringBuffer fileType;
  1032. if (countThorLog < 2)
  1033. fileType.append(File_ThorLog);
  1034. else
  1035. fileType.appendf("%s%d", File_ThorLog, countThorLog);
  1036. helpersCount++;
  1037. if (flags & WUINFO_IncludeHelpers)
  1038. {
  1039. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  1040. h->setName(name.str());
  1041. h->setType(fileType.str());
  1042. if (version >= 1.43)
  1043. {
  1044. offset_t fileSize;
  1045. if (getFileSize(name.str(), NULL, fileSize))
  1046. h->setFileSize(fileSize);
  1047. }
  1048. helpers.append(*h.getLink());
  1049. }
  1050. }
  1051. StringBuffer logDir;
  1052. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  1053. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  1054. Owned<IPropertyTree> logTree = &constEnv->getPTree();
  1055. if (logTree)
  1056. logTree->getProp("EnvSettings/log", logDir);
  1057. if (logDir.length() > 0)
  1058. {
  1059. Owned<IStringIterator> debugs = cw->getLogs("Thor");
  1060. ForEach(*debugs)
  1061. {
  1062. SCMStringBuffer val;
  1063. debugs->str(val);
  1064. if (val.length() < 1)
  1065. continue;
  1066. const char* pStr = val.str();
  1067. const char* ppStr = strstr(pStr, logDir.str());
  1068. if (!ppStr)
  1069. {
  1070. WARNLOG("Invalid thorlog entry in workunit xml: %s", val.str());
  1071. continue;
  1072. }
  1073. const char* pProcessName = ppStr + logDir.length();
  1074. char sep = pProcessName[0];
  1075. StringBuffer processName = pProcessName + 1;
  1076. ppStr = strchr(pProcessName + 1, sep);
  1077. if (!ppStr)
  1078. {
  1079. WARNLOG("Invalid thorlog entry in workunit xml: %s", val.str());
  1080. continue;
  1081. }
  1082. processName.setLength(ppStr - pProcessName - 1);
  1083. StringBuffer groupName;
  1084. getClusterThorGroupName(groupName, processName.str());
  1085. StringBuffer logDate = ppStr + 12;
  1086. logDate.setLength(10);
  1087. Owned<IEspThorLogInfo> thorLog = createThorLogInfo("","");
  1088. thorLog->setProcessName(processName.str());
  1089. thorLog->setClusterGroup(groupName.str());
  1090. thorLog->setLogDate(logDate.str());
  1091. //for legacy wuid, the log name does not contain slaveNum. So, a user may not specify
  1092. //a slaveNum and we only display the first slave log if > 1 per IP.
  1093. thorLog->setNumberSlaves(0);
  1094. thorLogList.append(*thorLog.getLink());
  1095. }
  1096. }
  1097. }
  1098. if (thorLogList.length() > 0)
  1099. info.setThorLogList(thorLogList);
  1100. thorLogList.kill();
  1101. return countThorLog;
  1102. }
  1103. bool WsWuInfo::getClusterInfo(IEspECLWorkunit &info, unsigned long flags)
  1104. {
  1105. if ((flags & WUINFO_IncludeAllowedClusters) && (version > 1.04))
  1106. {
  1107. StringArray allowedClusters;
  1108. SCMStringBuffer val;
  1109. cw->getAllowedClusters(val);
  1110. if (val.length() > 0)
  1111. {
  1112. const char* ptr = val.str();
  1113. while(*ptr != '\0')
  1114. {
  1115. StringBuffer onesub;
  1116. while(*ptr != '\0' && *ptr != ',')
  1117. {
  1118. onesub.append((char)(*ptr));
  1119. ptr++;
  1120. }
  1121. if(onesub.length() > 0)
  1122. allowedClusters.append(onesub.str());
  1123. if(*ptr != '\0')
  1124. ptr++;
  1125. }
  1126. }
  1127. if (allowedClusters.length() > 0)
  1128. info.setAllowedClusters(allowedClusters);
  1129. }
  1130. if (version > 1.23 && clusterName.length())
  1131. {
  1132. int clusterTypeFlag = 0;
  1133. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  1134. Owned<IConstEnvironment> constEnv = envFactory->openEnvironment();
  1135. Owned<IPropertyTree> root = &constEnv->getPTree();
  1136. if (!root)
  1137. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI,"Cannot connect to DALI server.");
  1138. Owned<IConstWUClusterInfo> clusterInfo = getTargetClusterInfo(clusterName.str());
  1139. if (clusterInfo.get())
  1140. {//Set thor flag or roxie flag in order to display some options for thor or roxie
  1141. ClusterType platform = clusterInfo->getPlatform();
  1142. if (isThorCluster(platform))
  1143. {
  1144. clusterTypeFlag=1;
  1145. if (version > 1.29)
  1146. info.setThorLCR(ThorLCRCluster == platform);
  1147. }
  1148. else if (RoxieCluster == platform)
  1149. clusterTypeFlag=2;
  1150. }
  1151. info.setClusterFlag(clusterTypeFlag);
  1152. }
  1153. return true;
  1154. }
  1155. void WsWuInfo::getWorkflow(IEspECLWorkunit &info, unsigned long flags)
  1156. {
  1157. if (!(flags & WUINFO_IncludeWorkflows))
  1158. return;
  1159. try
  1160. {
  1161. Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
  1162. if (!it)
  1163. return;
  1164. IArrayOf<IConstECLWorkflow> workflows;
  1165. ForEach(*it)
  1166. {
  1167. IConstWorkflowItem* r = it->query();
  1168. if (!r)
  1169. continue;
  1170. IWorkflowEvent* wfevent = r->getScheduleEvent();
  1171. if (!wfevent)
  1172. continue;
  1173. StringBuffer id;
  1174. Owned<IEspECLWorkflow> g = createECLWorkflow();
  1175. g->setWFID(id.appendf("%d", r->queryWfid()).str());
  1176. g->setEventName(wfevent->queryName());
  1177. g->setEventText(wfevent->queryText());
  1178. if (r->hasScheduleCount())
  1179. {
  1180. g->setCount(r->queryScheduleCount());
  1181. g->setCountRemaining(r->queryScheduleCountRemaining());
  1182. }
  1183. workflows.append(*g.getLink());
  1184. }
  1185. if (workflows.length() > 0)
  1186. info.setWorkflows(workflows);
  1187. }
  1188. catch(IException* e)
  1189. {
  1190. StringBuffer eMsg;
  1191. ERRLOG("%s", e->errorMessage(eMsg).str());
  1192. info.setWorkflowsDesc(eMsg.str());
  1193. e->Release();
  1194. }
  1195. }
  1196. IDistributedFile* WsWuInfo::getLogicalFileData(IEspContext& context, const char* logicalName, bool& showFileContent)
  1197. {
  1198. StringBuffer username;
  1199. context.getUserID(username);
  1200. Owned<IUserDescriptor> userdesc(createUserDescriptor());
  1201. userdesc->set(username.str(), context.queryPassword());
  1202. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
  1203. if (!df)
  1204. return NULL;
  1205. bool blocked;
  1206. if (df->isCompressed(&blocked) && !blocked)
  1207. return df.getClear();
  1208. IPropertyTree& properties = df->queryAttributes();
  1209. const char * format = properties.queryProp("@format");
  1210. if (format && (stricmp(format,"csv")==0 || memicmp(format, "utf", 3) == 0))
  1211. {
  1212. showFileContent = true;
  1213. return df.getClear();
  1214. }
  1215. const char * recordEcl = properties.queryProp("ECL");
  1216. if (!recordEcl)
  1217. return df.getClear();
  1218. MultiErrorReceiver errs;
  1219. Owned<IHqlExpression> ret = ::parseQuery(recordEcl, &errs);
  1220. showFileContent = errs.errCount() == 0;
  1221. return df.getClear();
  1222. }
  1223. void WsWuInfo::getEclSchemaChildFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
  1224. {
  1225. if(!expr)
  1226. return;
  1227. ForEachChild(idx, expr)
  1228. getEclSchemaFields(schemas, expr->queryChild(idx), isConditional);
  1229. }
  1230. void WsWuInfo::getEclSchemaFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
  1231. {
  1232. if(!expr)
  1233. return;
  1234. int ret = expr->getOperator();
  1235. switch (ret)
  1236. {
  1237. case no_record:
  1238. getEclSchemaChildFields(schemas, expr, isConditional);
  1239. break;
  1240. case no_ifblock:
  1241. {
  1242. getEclSchemaChildFields(schemas, expr->queryChild(1), true);
  1243. break;
  1244. }
  1245. case no_field:
  1246. {
  1247. if (expr->hasAttribute(__ifblockAtom))
  1248. break;
  1249. ITypeInfo * type = expr->queryType();
  1250. IAtom * name = expr->queryName();
  1251. IHqlExpression * nameAttr = expr->queryAttribute(namedAtom);
  1252. StringBuffer outname;
  1253. if (nameAttr && nameAttr->queryChild(0) && nameAttr->queryChild(0)->queryValue())
  1254. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  1255. else
  1256. outname.append(name).toLowerCase();
  1257. if(type)
  1258. {
  1259. type_t tc = type->getTypeCode();
  1260. if (tc == type_row)
  1261. {
  1262. getEclSchemaChildFields(schemas, expr->queryRecord(), isConditional);
  1263. }
  1264. else
  1265. {
  1266. if (type->getTypeCode() == type_alien)
  1267. {
  1268. IHqlAlienTypeInfo * alien = queryAlienType(type);
  1269. type = alien->queryPhysicalType();
  1270. }
  1271. Owned<IEspECLSchemaItem> schema = createECLSchemaItem("","");
  1272. StringBuffer eclType;
  1273. type->getECLType(eclType);
  1274. schema->setColumnName(outname);
  1275. schema->setColumnType(eclType.str());
  1276. schema->setColumnTypeCode(tc);
  1277. schema->setIsConditional(isConditional);
  1278. schemas.append(*schema.getClear());
  1279. }
  1280. }
  1281. break;
  1282. }
  1283. }
  1284. }
  1285. bool WsWuInfo::getResultEclSchemas(IConstWUResult &r, IArrayOf<IEspECLSchemaItem>& schemas)
  1286. {
  1287. SCMStringBuffer schema;
  1288. r.getResultEclSchema(schema);
  1289. if (!schema.length())
  1290. return false;
  1291. MultiErrorReceiver errs;
  1292. Owned<IHqlExpression> expr = ::parseQuery(schema.str(), &errs);
  1293. if (errs.errCount() != 0)
  1294. return false;
  1295. getEclSchemaFields(schemas, expr, false);
  1296. return true;
  1297. }
  1298. void WsWuInfo::getResult(IConstWUResult &r, IArrayOf<IEspECLResult>& results, unsigned long flags)
  1299. {
  1300. SCMStringBuffer name;
  1301. r.getResultName(name);
  1302. SCMStringBuffer filename;
  1303. r.getResultLogicalName(filename);
  1304. bool showFileContent = false;
  1305. Owned<IDistributedFile> df = NULL;
  1306. if (filename.length())
  1307. df.setown(getLogicalFileData(context, filename.str(), showFileContent));
  1308. StringBuffer value, link;
  1309. if (r.getResultStatus() == ResultStatusUndefined)
  1310. value.set("[undefined]");
  1311. else if (r.isResultScalar())
  1312. {
  1313. try
  1314. {
  1315. SCMStringBuffer xml;
  1316. r.getResultXml(xml, true);
  1317. Owned<IPropertyTree> props = createPTreeFromXMLString(xml.str(), ipt_caseInsensitive);
  1318. IPropertyTree *val = props->queryPropTree("Row/*");
  1319. if(val)
  1320. value.set(val->queryProp(NULL));
  1321. else
  1322. {
  1323. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  1324. Owned<INewResultSet> result;
  1325. result.setown(resultSetFactory->createNewResultSet(&r, wuid.str()));
  1326. Owned<IResultSetCursor> cursor(result->createCursor());
  1327. cursor->first();
  1328. if (cursor->getIsAll(0))
  1329. {
  1330. value.set("<All/>");
  1331. }
  1332. else
  1333. {
  1334. Owned<IResultSetCursor> childCursor = cursor->getChildren(0);
  1335. if (childCursor)
  1336. {
  1337. ForEach(*childCursor)
  1338. {
  1339. StringBuffer out;
  1340. StringBufferAdaptor adaptor(out);
  1341. childCursor->getDisplayText(adaptor, 0);
  1342. if (!value.length())
  1343. value.append('[');
  1344. else
  1345. value.append(", ");
  1346. value.append('\'').append(out.str()).append('\'');
  1347. }
  1348. if (value.length())
  1349. value.append(']');
  1350. }
  1351. }
  1352. }
  1353. }
  1354. catch(...)
  1355. {
  1356. value.append("[value not available]");
  1357. }
  1358. }
  1359. else
  1360. {
  1361. value.append('[').append(r.getResultTotalRowCount()).append(" rows]");
  1362. if((r.getResultSequence()>=0) && (!filename.length() || (df && df->queryAttributes().hasProp("ECL"))))
  1363. link.append(r.getResultSequence());
  1364. }
  1365. Owned<IEspECLResult> result= createECLResult("","");
  1366. if (flags & WUINFO_IncludeEclSchemas)
  1367. {
  1368. IArrayOf<IEspECLSchemaItem> schemas;
  1369. if (getResultEclSchemas(r, schemas))
  1370. result->setECLSchemas(schemas);
  1371. }
  1372. if (flags & WUINFO_IncludeXmlSchema)
  1373. {
  1374. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(context.querySecManager(), context.queryUser(), context.queryUserId(), context.queryPassword());
  1375. Owned<INewResultSet> rs = resultSetFactory->createNewResultSet(&r, wuid.str());
  1376. Owned<IResultSetCursor> cursor(rs->createCursor());
  1377. SCMStringBuffer xsd;
  1378. const IResultSetMetaData & meta = cursor->queryResultSet()->getMetaData();
  1379. meta.getXmlXPathSchema(xsd, false);
  1380. result->setXmlSchema(xsd.str());
  1381. }
  1382. if (filename.length())
  1383. result->setShowFileContent(showFileContent);
  1384. result->setName(name.str());
  1385. result->setLink(link.str());
  1386. result->setSequence(r.getResultSequence());
  1387. result->setValue(value.str());
  1388. result->setFileName(filename.str());
  1389. result->setIsSupplied(r.getResultStatus() == ResultStatusSupplied);
  1390. result->setTotal(r.getResultTotalRowCount());
  1391. results.append(*result.getLink());
  1392. }
  1393. void WsWuInfo::getResults(IEspECLWorkunit &info, unsigned long flags)
  1394. {
  1395. if (!(flags & WUINFO_IncludeResults))
  1396. return;
  1397. try
  1398. {
  1399. IArrayOf<IEspECLResult> results;
  1400. Owned<IConstWUResultIterator> it = &(cw->getResults());
  1401. ForEach(*it)
  1402. {
  1403. IConstWUResult &r = it->query();
  1404. if(r.getResultSequence()>=0)
  1405. getResult(r, results, flags);
  1406. }
  1407. if (results.length())
  1408. info.setResults(results);
  1409. results.kill();
  1410. }
  1411. catch(IException* e)
  1412. {
  1413. StringBuffer eMsg;
  1414. ERRLOG("%s", e->errorMessage(eMsg).str());
  1415. info.setResultsDesc(eMsg.str());
  1416. e->Release();
  1417. }
  1418. }
  1419. void WsWuInfo::getStats(StatisticsFilter& filter, bool createDescriptions, IArrayOf<IEspWUStatisticItem>& statistics)
  1420. {
  1421. Owned<IConstWUStatisticIterator> stats = &cw->getStatistics(&filter);
  1422. ForEach(*stats)
  1423. {
  1424. IConstWUStatistic & cur = stats->query();
  1425. StringBuffer xmlBuf, tsValue;
  1426. SCMStringBuffer curCreator, curScope, curDescription, curFormattedValue;
  1427. StatisticCreatorType curCreatorType = cur.getCreatorType();
  1428. StatisticScopeType curScopeType = cur.getScopeType();
  1429. StatisticMeasure curMeasure = cur.getMeasure();
  1430. StatisticKind curKind = cur.getKind();
  1431. unsigned __int64 value = cur.getValue();
  1432. unsigned __int64 count = cur.getCount();
  1433. unsigned __int64 max = cur.getMax();
  1434. unsigned __int64 ts = cur.getTimestamp();
  1435. cur.getCreator(curCreator);
  1436. cur.getScope(curScope);
  1437. cur.getDescription(curDescription, createDescriptions);
  1438. cur.getFormattedValue(curFormattedValue);
  1439. Owned<IEspWUStatisticItem> wuStatistic = createWUStatisticItem();
  1440. if (version > 1.61)
  1441. wuStatistic->setWuid(wuid);
  1442. if (curCreatorType != SCTnone)
  1443. wuStatistic->setCreatorType(queryCreatorTypeName(curCreatorType));
  1444. if (curCreator.length())
  1445. wuStatistic->setCreator(curCreator.str());
  1446. if (curScopeType != SSTnone)
  1447. wuStatistic->setScopeType(queryScopeTypeName(curScopeType));
  1448. if (curScope.length())
  1449. wuStatistic->setScope(curScope.str());
  1450. if (curMeasure != SMeasureNone)
  1451. wuStatistic->setMeasure(queryMeasureName(curMeasure));
  1452. if (curKind != StKindNone)
  1453. wuStatistic->setKind(queryStatisticName(curKind));
  1454. wuStatistic->setRawValue(value);
  1455. wuStatistic->setValue(curFormattedValue.str());
  1456. if (count != 1)
  1457. wuStatistic->setCount(count);
  1458. if (max)
  1459. wuStatistic->setMax(max);
  1460. if (ts)
  1461. {
  1462. formatStatistic(tsValue, ts, SMeasureTimestampUs);
  1463. wuStatistic->setTimeStamp(tsValue.str());
  1464. }
  1465. if (curDescription.length())
  1466. wuStatistic->setDescription(curDescription.str());
  1467. statistics.append(*wuStatistic.getClear());
  1468. }
  1469. }
  1470. bool WsWuInfo::getFileSize(const char* fileName, const char* IPAddress, offset_t& fileSize)
  1471. {
  1472. if (!fileName || !*fileName)
  1473. return false;
  1474. Owned<IFile> aFile;
  1475. if (!IPAddress || !*IPAddress)
  1476. {
  1477. aFile.setown(createIFile(fileName));
  1478. }
  1479. else
  1480. {
  1481. RemoteFilename rfn;
  1482. rfn.setRemotePath(fileName);
  1483. SocketEndpoint ep(IPAddress);
  1484. rfn.setIp(ep);
  1485. aFile.setown(createIFile(rfn));
  1486. }
  1487. if (!aFile)
  1488. return false;
  1489. bool isDir;
  1490. CDateTime modtime;
  1491. if (!aFile->getInfo(isDir, fileSize, modtime) || isDir)
  1492. return false;
  1493. return true;
  1494. }
  1495. void WsWuInfo::getHelpFiles(IConstWUQuery* query, WUFileType type, IArrayOf<IEspECLHelpFile>& helpers, unsigned long flags, unsigned& helpersCount)
  1496. {
  1497. if (!query)
  1498. return;
  1499. Owned<IConstWUAssociatedFileIterator> iter = &query->getAssociatedFiles();
  1500. ForEach(*iter)
  1501. {
  1502. SCMStringBuffer name, Ip, description;
  1503. IConstWUAssociatedFile & cur = iter->query();
  1504. if (cur.getType() != type)
  1505. continue;
  1506. helpersCount++;
  1507. if (!(flags & WUINFO_IncludeHelpers))
  1508. continue;
  1509. cur.getName(name);
  1510. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  1511. h->setName(name.str());
  1512. h->setType(getEnumText(type, queryFileTypes));
  1513. if (version > 1.31)
  1514. {
  1515. cur.getIp(Ip);
  1516. h->setIPAddress(Ip.str());
  1517. cur.getDescription(description);
  1518. if ((description.length() < 1) && (name.length() > 0))
  1519. {
  1520. const char* desc = pathTail(name.str());
  1521. if (desc && *desc)
  1522. description.set(desc);
  1523. }
  1524. if (description.length() < 1)
  1525. description.set("Help File");
  1526. h->setDescription(description.str());
  1527. if (version >= 1.43)
  1528. {
  1529. offset_t fileSize;
  1530. if (getFileSize(name.str(), Ip.str(), fileSize))
  1531. h->setFileSize(fileSize);
  1532. }
  1533. if (version >= 1.58)
  1534. {
  1535. h->setMinActivityId(cur.getMinActivityId());
  1536. h->setMaxActivityId(cur.getMaxActivityId());
  1537. }
  1538. }
  1539. helpers.append(*h.getLink());
  1540. }
  1541. }
  1542. void WsWuInfo::getSubFiles(IPropertyTreeIterator* f, IEspECLSourceFile* eclSuperFile, StringArray& fileNames)
  1543. {
  1544. IArrayOf<IEspECLSourceFile> files;
  1545. ForEach(*f)
  1546. {
  1547. IPropertyTree &query = f->query();
  1548. const char *clusterName = query.queryProp("@cluster");
  1549. const char *fileName = query.queryProp("@name");
  1550. int fileCount = query.getPropInt("@useCount");
  1551. bool bFound = false;
  1552. if (fileName && *fileName && (fileNames.length() > 0)) // MORE - this is an n^2 process and as far as I can tell unnecessary as there will be no dups
  1553. {
  1554. for (unsigned i = 0; i < fileNames.length(); i++ )
  1555. {
  1556. const char *fileName0 = fileNames.item(i);
  1557. if (!stricmp(fileName, fileName0))
  1558. {
  1559. bFound = true;
  1560. break;
  1561. }
  1562. }
  1563. }
  1564. if (bFound)
  1565. continue;
  1566. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  1567. if(clusterName && *clusterName)
  1568. {
  1569. file->setFileCluster(clusterName);
  1570. }
  1571. if (fileName && *fileName)
  1572. {
  1573. file->setName(fileName);
  1574. fileNames.append(fileName);
  1575. }
  1576. file->setCount(fileCount);
  1577. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile"); // We do not store subfiles of subfiles like this - so this code will never be triggered
  1578. if (filetrees->first())
  1579. {
  1580. file->setIsSuperFile(true);
  1581. getSubFiles(filetrees, file, fileNames);
  1582. }
  1583. files.append(*file.getLink());
  1584. }
  1585. eclSuperFile->setECLSourceFiles(files);
  1586. return;
  1587. }
  1588. bool WsWuInfo::getResourceInfo(StringArray &viewnames, StringArray &urls, unsigned long flags)
  1589. {
  1590. if (!(flags & (WUINFO_IncludeResultsViewNames | WUINFO_IncludeResourceURLs)))
  1591. return true;
  1592. try
  1593. {
  1594. Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, NULL, false);
  1595. if (wv)
  1596. {
  1597. if (flags & WUINFO_IncludeResultsViewNames)
  1598. wv->getResultViewNames(viewnames);
  1599. if (flags & WUINFO_IncludeResourceURLs)
  1600. wv->getResourceURLs(urls, NULL);
  1601. }
  1602. return true;
  1603. }
  1604. catch(IException* e)
  1605. {
  1606. StringBuffer eMsg;
  1607. ERRLOG("%s", e->errorMessage(eMsg).str());
  1608. e->Release();
  1609. }
  1610. return false;
  1611. }
  1612. unsigned WsWuInfo::getResourceURLCount()
  1613. {
  1614. try
  1615. {
  1616. Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, NULL, false);
  1617. if (wv)
  1618. return wv->getResourceURLCount();
  1619. }
  1620. catch(IException* e)
  1621. {
  1622. StringBuffer eMsg;
  1623. ERRLOG("%s", e->errorMessage(eMsg).str());
  1624. e->Release();
  1625. }
  1626. return 0;
  1627. }
  1628. void appendIOStreamContent(MemoryBuffer &mb, IFileIOStream *ios, bool forDownload)
  1629. {
  1630. StringBuffer line;
  1631. bool eof = false;
  1632. while (!eof)
  1633. {
  1634. line.clear();
  1635. for (;;)
  1636. {
  1637. char c;
  1638. size32_t numRead = ios->read(1, &c);
  1639. if (!numRead)
  1640. {
  1641. eof = true;
  1642. break;
  1643. }
  1644. line.append(c);
  1645. if (c=='\n')
  1646. break;
  1647. }
  1648. mb.append(line.length(), line.str());
  1649. if (!forDownload && (mb.length() > 640000))
  1650. break;
  1651. }
  1652. }
  1653. void WsWuInfo::getWorkunitEclAgentLog(const char* fileName, const char* agentPid, MemoryBuffer& buf)
  1654. {
  1655. if(!fileName || !*fileName)
  1656. throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"Log file not specified");
  1657. Owned<IFile> rFile = createIFile(fileName);
  1658. if(!rFile)
  1659. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open file %s.", fileName);
  1660. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1661. if(!rIO)
  1662. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE, "Cannot read file %s.", fileName);
  1663. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1664. StringBuffer line;
  1665. bool eof = false;
  1666. bool wuidFound = false;
  1667. StringBuffer pidstr;
  1668. if (agentPid && *agentPid)
  1669. pidstr.appendf(" %s ", agentPid);
  1670. else
  1671. pidstr.appendf(" %5d ", cw->getAgentPID());
  1672. /*
  1673. Scan the master daily logfile for given PID/WUID. We make the following assumptions
  1674. Column ordering (time, date, pid) is unknown, but we must assume it is constant throughout the logfile.
  1675. It is assumed that the first column is the 8 digit workunit logfile line number.
  1676. Rows from concurrent workunits are intermixed.
  1677. Logfiles are searched via PID and WUID. You are not assured of a match until you have both.
  1678. PIDS and TIDS can and are reused. Beware that a TID could match the search PID.
  1679. Once you have both, you know the offset of the PID column. It is assumed this offset remains constant.
  1680. Search stops at EOF, or early exit if the search PID reappears on different WUID.
  1681. */
  1682. char const * pidchars = pidstr.str();
  1683. size32_t pidLen = pidstr.length();
  1684. unsigned pidOffset = 0;//offset of PID in logfile entry
  1685. while(!eof)
  1686. {
  1687. line.clear();
  1688. for (;;)
  1689. {
  1690. char c;
  1691. size32_t numRead = ios->read(1, &c);
  1692. if (!numRead)
  1693. {
  1694. eof = true;
  1695. break;
  1696. }
  1697. line.append(c);
  1698. if (c=='\n')
  1699. break;
  1700. }
  1701. //Retain all rows that match a unique program instance - by retaining all rows that match a pid
  1702. const char * pPid = strstr(line.str() + pidOffset, pidchars);
  1703. if (pPid)
  1704. {
  1705. //Check if this is a new instance using line sequence number (PIDs are often reused)
  1706. if (strncmp(line.str(), "00000000", 8) == 0)
  1707. {
  1708. if (wuidFound) //If the correct instance has been found, return that instance before the next instance.
  1709. break;
  1710. //The last instance is not a correct instance. Clean the buf in order to start a new instance.
  1711. buf.clear();
  1712. }
  1713. //If we spot the workunit id anywhere in the tracing for this pid then assume it is the correct instance.
  1714. if(!wuidFound && strstr(line.str(), wuid.str()))
  1715. {
  1716. pidOffset = pPid - line.str();//remember offset of PID within line
  1717. wuidFound = true;
  1718. }
  1719. if (pidOffset && 0 == strncmp(line.str() + pidOffset, pidchars, pidLen))//this makes sure the match was the PID and not the TID or something else
  1720. buf.append(line.length(), line.str());
  1721. }
  1722. }
  1723. if (buf.length() < 1)
  1724. {
  1725. const char * msg = "(No logfile entries found for this workunit)";
  1726. buf.append(strlen(msg), msg);
  1727. }
  1728. }
  1729. void WsWuInfo::getWorkunitThorLog(const char* fileName, MemoryBuffer& buf)
  1730. {
  1731. if(!fileName || !*fileName)
  1732. throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"Log file not specified");
  1733. Owned<IFile> rFile = createIFile(fileName);
  1734. if (!rFile)
  1735. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE,"Cannot open file %s.",fileName);
  1736. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1737. if (!rIO)
  1738. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",fileName);
  1739. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1740. StringBuffer line;
  1741. bool eof = false;
  1742. bool include = false;
  1743. VStringBuffer startwuid("Started wuid=%s", wuid.str());
  1744. VStringBuffer endwuid("Finished wuid=%s", wuid.str());
  1745. const char *sw = startwuid.str();
  1746. const char *ew = endwuid.str();
  1747. while (!eof)
  1748. {
  1749. line.clear();
  1750. for (;;)
  1751. {
  1752. char c;
  1753. size32_t numRead = ios->read(1, &c);
  1754. if (!numRead)
  1755. {
  1756. eof = true;
  1757. break;
  1758. }
  1759. line.append(c);
  1760. if (c=='\n')
  1761. break;
  1762. }
  1763. if (strstr(line.str(), sw))
  1764. include = true;
  1765. if (include)
  1766. buf.append(line.length(), line.str());
  1767. if (strstr(line.str(), ew))
  1768. include = false;
  1769. }
  1770. }
  1771. void WsWuInfo::getWorkunitThorSlaveLog(const char *groupName, const char *ipAddress, const char* logDate, const char* logDir, int slaveNum, MemoryBuffer& buf, bool forDownload)
  1772. {
  1773. if (isEmpty(logDir))
  1774. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log path not specified.");
  1775. if (isEmpty(logDate))
  1776. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log date not specified.");
  1777. StringBuffer slaveIPAddress, logName;
  1778. if (slaveNum > 0)
  1779. {
  1780. if (isEmpty(groupName))
  1781. throw MakeStringException(ECLWATCH_INVALID_INPUT,"Thor group not specified.");
  1782. Owned<IGroup> nodeGroup = queryNamedGroupStore().lookup(groupName);
  1783. if (!nodeGroup || (nodeGroup->ordinality() == 0))
  1784. {
  1785. WARNLOG("Node group %s not found", groupName);
  1786. return;
  1787. }
  1788. nodeGroup->queryNode(slaveNum-1).endpoint().getIpText(slaveIPAddress);
  1789. if (slaveIPAddress.length() < 1)
  1790. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave log network address not found.");
  1791. logName.appendf("thorslave.%d.%s.log", slaveNum, logDate);
  1792. }
  1793. else
  1794. {//legacy wuid: a user types in an IP address for a thor slave
  1795. if (isEmpty(ipAddress))
  1796. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave address not specified.");
  1797. //thorslave.10.239.219.6_20100.2012_05_23.log
  1798. logName.appendf("thorslave.%s*.%s.log", ipAddress, logDate);
  1799. const char* portPtr = strchr(ipAddress, '_');
  1800. if (!portPtr)
  1801. slaveIPAddress.append(ipAddress);
  1802. else
  1803. {
  1804. StringBuffer ipAddressStr = ipAddress;
  1805. ipAddressStr.setLength(portPtr - ipAddress);
  1806. slaveIPAddress.append(ipAddressStr.str());
  1807. }
  1808. }
  1809. RemoteFilename rfn;
  1810. rfn.setRemotePath(logDir);
  1811. SocketEndpoint ep(slaveIPAddress.str());
  1812. rfn.setIp(ep);
  1813. Owned<IFile> dir = createIFile(rfn);
  1814. Owned<IDirectoryIterator> diriter = dir->directoryFiles(logName.str());
  1815. if (!diriter->first())
  1816. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find Thor slave log file %s.", logName.str());
  1817. Linked<IFile> logfile = &diriter->query();
  1818. diriter.clear();
  1819. dir.clear();
  1820. // logfile is now the file to load
  1821. OwnedIFileIO rIO = logfile->openShared(IFOread,IFSHfull);
  1822. if (!rIO)
  1823. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",logName.str());
  1824. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1825. if (slaveNum > 0)
  1826. {
  1827. StringBuffer line;
  1828. bool eof = false;
  1829. bool include = false;
  1830. VStringBuffer startwuid("Started wuid=%s", wuid.str());
  1831. VStringBuffer endwuid("Finished wuid=%s", wuid.str());
  1832. const char *sw = startwuid.str();
  1833. const char *ew = endwuid.str();
  1834. while (!eof)
  1835. {
  1836. line.clear();
  1837. for (;;)
  1838. {
  1839. char c;
  1840. size32_t numRead = ios->read(1, &c);
  1841. if (!numRead)
  1842. {
  1843. eof = true;
  1844. break;
  1845. }
  1846. line.append(c);
  1847. if (c=='\n')
  1848. break;
  1849. }
  1850. if (strstr(line.str(), sw))
  1851. include = true;
  1852. if (include)
  1853. buf.append(line.length(), line.str());
  1854. if (strstr(line.str(), ew))
  1855. include = false;
  1856. }
  1857. }
  1858. else
  1859. {//legacy wuid
  1860. appendIOStreamContent(buf, ios.get(), forDownload);
  1861. }
  1862. }
  1863. void WsWuInfo::getWorkunitResTxt(MemoryBuffer& buf)
  1864. {
  1865. Owned<IConstWUQuery> query = cw->getQuery();
  1866. if(!query)
  1867. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1868. SCMStringBuffer resname;
  1869. queryDllServer().getDll(query->getQueryResTxtName(resname).str(), buf);
  1870. }
  1871. IConstWUQuery* WsWuInfo::getEmbeddedQuery()
  1872. {
  1873. Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, NULL, false);
  1874. if (wv)
  1875. return wv->getEmbeddedQuery();
  1876. return NULL;
  1877. }
  1878. void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf)
  1879. {
  1880. Owned<IConstWUQuery> query = cw->getQuery();
  1881. if(!query)
  1882. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1883. SCMStringBuffer queryText;
  1884. query->getQueryText(queryText);
  1885. if ((queryText.length() < 1) || !isArchiveQuery(queryText.str()))
  1886. {
  1887. if (!query->hasArchive())
  1888. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive query not found for workunit %s.", wuid.str());
  1889. Owned<IConstWUQuery> embeddedQuery = getEmbeddedQuery();
  1890. if (!embeddedQuery)
  1891. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Embedded query not found for workunit %s.", wuid.str());
  1892. embeddedQuery->getQueryText(queryText);
  1893. if ((queryText.length() < 1) || !isArchiveQuery(queryText.str()))
  1894. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive query not found for workunit %s.", wuid.str());
  1895. }
  1896. buf.append(queryText.length(), queryText.str());
  1897. }
  1898. void WsWuInfo::getWorkunitQueryShortText(MemoryBuffer& buf)
  1899. {
  1900. Owned<IConstWUQuery> query = cw->getQuery();
  1901. if(!query)
  1902. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1903. SCMStringBuffer queryText;
  1904. query->getQueryShortText(queryText);
  1905. if (queryText.length() < 1)
  1906. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU, "No query for workunit %s.",wuid.str());
  1907. buf.append(queryText.length(), queryText.str());
  1908. }
  1909. void WsWuInfo::getWorkunitDll(StringBuffer &dllname, MemoryBuffer& buf)
  1910. {
  1911. Owned<IConstWUQuery> query = cw->getQuery();
  1912. if(!query)
  1913. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1914. StringBufferAdaptor isvName(dllname);
  1915. query->getQueryDllName(isvName);
  1916. queryDllServer().getDll(dllname.str(), buf);
  1917. }
  1918. void WsWuInfo::getWorkunitXml(const char* plainText, MemoryBuffer& buf)
  1919. {
  1920. const char* header;
  1921. if (plainText && (!stricmp(plainText, "yes")))
  1922. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
  1923. else
  1924. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
  1925. StringBuffer xml;
  1926. exportWorkUnitToXML(cw, xml, true, false, true);
  1927. buf.append(strlen(header), header);
  1928. buf.append(xml.length(), xml.str());
  1929. }
  1930. void WsWuInfo::getWorkunitCpp(const char *cppname, const char* description, const char* ipAddress, MemoryBuffer& buf, bool forDownload)
  1931. {
  1932. if (isEmpty(description))
  1933. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
  1934. if (isEmpty(ipAddress))
  1935. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File location not specified.");
  1936. if (isEmpty(cppname))
  1937. throw MakeStringException(ECLWATCH_INVALID_FILE_NAME, "File path not specified.");
  1938. RemoteFilename rfn;
  1939. rfn.setRemotePath(cppname);
  1940. SocketEndpoint ep(ipAddress);
  1941. rfn.setIp(ep);
  1942. Owned<IFile> cppfile = createIFile(rfn);
  1943. if (!cppfile)
  1944. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open %s.", description);
  1945. OwnedIFileIO rIO = cppfile->openShared(IFOread,IFSHfull);
  1946. if (!rIO)
  1947. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1948. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1949. if (!ios)
  1950. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1951. appendIOStreamContent(buf, ios.get(), forDownload);
  1952. }
  1953. void WsWuInfo::getWorkunitAssociatedXml(const char* name, const char* ipAddress, const char* plainText,
  1954. const char* description, bool forDownload, bool addXMLDeclaration, MemoryBuffer& buf)
  1955. {
  1956. if (isEmpty(description)) //'File Name' as shown in WU Details page
  1957. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
  1958. if (isEmpty(ipAddress))
  1959. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File location not specified.");
  1960. if (isEmpty(name)) //file name with full path
  1961. throw MakeStringException(ECLWATCH_INVALID_FILE_NAME, "File path not specified.");
  1962. RemoteFilename rfn;
  1963. rfn.setRemotePath(name);
  1964. SocketEndpoint ep(ipAddress);
  1965. rfn.setIp(ep);
  1966. Owned<IFile> rFile = createIFile(rfn);
  1967. if (!rFile)
  1968. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open %s.", description);
  1969. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1970. if (!rIO)
  1971. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1972. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1973. if (!ios)
  1974. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1975. if (addXMLDeclaration)
  1976. {
  1977. const char* header;
  1978. if (plainText && (!stricmp(plainText, "yes")))
  1979. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
  1980. else
  1981. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
  1982. buf.append(strlen(header), header);
  1983. }
  1984. appendIOStreamContent(buf, ios.get(), forDownload);
  1985. }
  1986. IPropertyTree* WsWuInfo::getWorkunitArchive()
  1987. {
  1988. Owned <IConstWUQuery> query = cw->getQuery();
  1989. if(!query)
  1990. return NULL;
  1991. SCMStringBuffer name, ip;
  1992. Owned<IConstWUAssociatedFileIterator> iter = &query->getAssociatedFiles();
  1993. ForEach(*iter)
  1994. {
  1995. IConstWUAssociatedFile& cur = iter->query();
  1996. if (cur.getType() != FileTypeXml)
  1997. continue;
  1998. cur.getName(name);
  1999. if (name.length() < 15)
  2000. continue;
  2001. const char* pStr = name.str() + name.length() - 15;
  2002. if (strieq(pStr, ".archive.eclxml"))
  2003. {
  2004. cur.getIp(ip);
  2005. break;
  2006. }
  2007. }
  2008. if (!ip.length())
  2009. return NULL;
  2010. MemoryBuffer content;
  2011. getWorkunitAssociatedXml(name.str(), ip.str(), "", "WU archive eclxml", true, false, content);
  2012. if (!content.length())
  2013. return NULL;
  2014. return createPTreeFromXMLString(content.length(), content.toByteArray());
  2015. }
  2016. IEspWUArchiveFile* WsWuInfo::readArchiveFileAttr(IPropertyTree& fileTree, const char* path)
  2017. {
  2018. const char* fileName = fileTree.queryProp("@name");
  2019. if (isEmpty(fileName))
  2020. return NULL;
  2021. Owned<IEspWUArchiveFile> file= createWUArchiveFile();
  2022. file->setName(fileName);
  2023. if (!isEmpty(path))
  2024. file->setPath(path);
  2025. if (fileTree.hasProp("@key"))
  2026. file->setKey(fileTree.queryProp("@key"));
  2027. if (fileTree.hasProp("@sourcePath"))
  2028. file->setSourcePath(fileTree.queryProp("@sourcePath"));
  2029. return file.getClear();
  2030. }
  2031. IEspWUArchiveModule* WsWuInfo::readArchiveModuleAttr(IPropertyTree& moduleTree, const char* path)
  2032. {
  2033. const char* moduleName = moduleTree.queryProp("@name");
  2034. if (isEmpty(moduleName))
  2035. return NULL;
  2036. Owned<IEspWUArchiveModule> module= createWUArchiveModule();
  2037. module->setName(moduleName);
  2038. if (!isEmpty(path))
  2039. module->setPath(path);
  2040. if (moduleTree.hasProp("@fullName"))
  2041. module->setFullName(moduleTree.queryProp("@fullName"));
  2042. if (moduleTree.hasProp("@key"))
  2043. module->setKey(moduleTree.queryProp("@key"));
  2044. if (moduleTree.hasProp("@plugin"))
  2045. module->setPlugin(moduleTree.queryProp("@plugin"));
  2046. if (moduleTree.hasProp("@version"))
  2047. module->setVersion(moduleTree.queryProp("@version"));
  2048. if (moduleTree.hasProp("@sourcePath"))
  2049. module->setSourcePath(moduleTree.queryProp("@sourcePath"));
  2050. if (moduleTree.hasProp("@flags"))
  2051. module->setFlags(moduleTree.getPropInt("@flags", 0));
  2052. return module.getClear();
  2053. }
  2054. void WsWuInfo::readArchiveFiles(IPropertyTree* archiveTree, const char* path, IArrayOf<IEspWUArchiveFile>& files)
  2055. {
  2056. Owned<IPropertyTreeIterator> iter = archiveTree->getElements("Attribute");
  2057. ForEach(*iter)
  2058. {
  2059. IPropertyTree& item = iter->query();
  2060. Owned<IEspWUArchiveFile> file = readArchiveFileAttr(item, path);
  2061. if (file)
  2062. files.append(*file.getClear());
  2063. }
  2064. }
  2065. void WsWuInfo::listArchiveFiles(IPropertyTree* archiveTree, const char* path, IArrayOf<IEspWUArchiveModule>& modules, IArrayOf<IEspWUArchiveFile>& files)
  2066. {
  2067. if (!archiveTree)
  2068. return;
  2069. Owned<IPropertyTreeIterator> iter = archiveTree->getElements("Module");
  2070. ForEach(*iter)
  2071. {
  2072. IPropertyTree& item = iter->query();
  2073. Owned<IEspWUArchiveModule> module = readArchiveModuleAttr(item, path);
  2074. if (!module)
  2075. continue;
  2076. StringBuffer newPath;
  2077. if (isEmpty(path))
  2078. newPath.set(module->getName());
  2079. else
  2080. newPath.setf("%s/%s", path, module->getName());
  2081. IArrayOf<IEspWUArchiveModule> modulesInModule;
  2082. IArrayOf<IEspWUArchiveFile> filesInModule;
  2083. listArchiveFiles(&item, newPath.str(), modulesInModule, filesInModule);
  2084. if (modulesInModule.length())
  2085. module->setArchiveModules(modulesInModule);
  2086. if (filesInModule.length())
  2087. module->setFiles(filesInModule);
  2088. modules.append(*module.getClear());
  2089. }
  2090. readArchiveFiles(archiveTree, path, files);
  2091. }
  2092. void WsWuInfo::getArchiveFile(IPropertyTree* archive, const char* moduleName, const char* attrName, const char* path, StringBuffer& file)
  2093. {
  2094. StringBuffer xPath;
  2095. if (!isEmpty(path))
  2096. {
  2097. StringArray list;
  2098. list.appendListUniq(path, "/");
  2099. ForEachItemIn(m, list)
  2100. {
  2101. const char* module = list.item(m);
  2102. if (!isEmpty(module))
  2103. xPath.appendf("Module[@name=\"%s\"]/", module);
  2104. }
  2105. }
  2106. if (isEmpty(moduleName))
  2107. xPath.appendf("Attribute[@name=\"%s\"]", attrName);
  2108. else
  2109. xPath.appendf("Module[@name=\"%s\"]/Text", moduleName);
  2110. file.set(archive->queryProp(xPath.str()));
  2111. }
  2112. WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* jobname)
  2113. {
  2114. SecAccessFlags accessOwn;
  2115. SecAccessFlags accessOthers;
  2116. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  2117. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2118. Owned<IConstWorkUnitIterator> it(factory->getWorkUnitsByOwner(owner)); // null owner means fetch all
  2119. StringBuffer wuFrom, wuTo, jobPattern;
  2120. if (startDate && *startDate)
  2121. createWuidFromDate(startDate, wuFrom);
  2122. if (endDate && *endDate)
  2123. createWuidFromDate(endDate, wuTo);
  2124. if (jobname && *jobname)
  2125. jobPattern.appendf("*%s*", jobname);
  2126. ForEach(*it)
  2127. {
  2128. IConstWorkUnitInfo &cw = it->query();
  2129. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
  2130. continue;
  2131. if (state && *state && !strieq(cw.queryStateDesc(), state))
  2132. continue;
  2133. if (cluster && *cluster && !strieq(cw.queryClusterName(), cluster))
  2134. continue;
  2135. if (jobPattern.length() && !WildMatch(cw.queryJobName(), jobPattern, true))
  2136. continue;
  2137. const char *wuid = cw.queryWuid();
  2138. if (wuFrom.length() && strcmp(wuid,wuFrom.str())<0)
  2139. continue;
  2140. if (wuTo.length() && strcmp(wuid, wuTo.str())>0)
  2141. continue;
  2142. wuids.push_back(wuid);
  2143. }
  2144. std::sort(wuids.begin(), wuids.end(),std::greater<std::string>());
  2145. }
  2146. StringBuffer& WsWuSearch::createWuidFromDate(const char* timestamp,StringBuffer& s)
  2147. {
  2148. CDateTime wuTime;
  2149. wuTime.setString(timestamp,NULL,true);
  2150. unsigned year, month, day, hour, minute, second, nano;
  2151. wuTime.getDate(year, month, day, true);
  2152. wuTime.getTime(hour, minute, second, nano, true);
  2153. s.appendf("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second);
  2154. return s;
  2155. }
  2156. struct CompareData
  2157. {
  2158. CompareData(const char* _filter): filter(_filter) {}
  2159. bool operator()(const Linked<DataCacheElement>& e) const
  2160. {
  2161. return stricmp(e->m_filter.c_str(),filter)==0;
  2162. }
  2163. const char* filter;
  2164. };
  2165. DataCacheElement* DataCache::lookup(IEspContext &context, const char* filter, unsigned timeOutMin)
  2166. {
  2167. CriticalBlock block(crit);
  2168. if (cache.size() < 1)
  2169. return NULL;
  2170. //erase data if it should be
  2171. CDateTime timeNow;
  2172. int timeout = timeOutMin;
  2173. timeNow.setNow();
  2174. timeNow.adjustTime(-timeout);
  2175. while (true)
  2176. {
  2177. std::list<Linked<DataCacheElement> >::iterator list_iter = cache.begin();
  2178. if (list_iter == cache.end())
  2179. break;
  2180. DataCacheElement* awu = list_iter->get();
  2181. if (!awu || (awu->m_timeCached > timeNow))
  2182. break;
  2183. cache.pop_front();
  2184. }
  2185. if (cache.size() < 1)
  2186. return NULL;
  2187. //Check whether we have the data cache for this cluster. If yes, get the version
  2188. std::list<Linked<DataCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareData(filter));
  2189. if(it!=cache.end())
  2190. {
  2191. return it->getLink();
  2192. }
  2193. return NULL;
  2194. }
  2195. void DataCache::add(const char* filter, const char* data, const char* name, const char* localName, const char* wuid,
  2196. const char* resultName, unsigned seq, __int64 start, unsigned count, __int64 requested, __int64 total)
  2197. {
  2198. CriticalBlock block(crit);
  2199. //Save new data
  2200. Owned<DataCacheElement> e=new DataCacheElement(filter, data, name, localName, wuid, resultName, seq, start, count, requested, total);
  2201. if (cacheSize > 0)
  2202. {
  2203. if (cache.size() >= cacheSize)
  2204. cache.pop_front();
  2205. cache.push_back(e.get());
  2206. }
  2207. return;
  2208. }
  2209. struct CompareArchivedWUs
  2210. {
  2211. CompareArchivedWUs(const char* _filter): filter(_filter) {}
  2212. bool operator()(const Linked<ArchivedWuCacheElement>& e) const
  2213. {
  2214. return stricmp(e->m_filter.c_str(),filter)==0;
  2215. }
  2216. const char* filter;
  2217. };
  2218. ArchivedWuCacheElement* ArchivedWuCache::lookup(IEspContext &context, const char* filter, const char* sashaUpdatedWhen, unsigned timeOutMin)
  2219. {
  2220. CriticalBlock block(crit);
  2221. if (cache.size() < 1)
  2222. return NULL;
  2223. //erase data if it should be
  2224. CDateTime timeNow;
  2225. int timeout = timeOutMin;
  2226. timeNow.setNow();
  2227. timeNow.adjustTime(-timeout);
  2228. while (true)
  2229. {
  2230. std::list<Linked<ArchivedWuCacheElement> >::iterator list_iter = cache.begin();
  2231. if (list_iter == cache.end())
  2232. break;
  2233. ArchivedWuCacheElement* awu = list_iter->get();
  2234. if (awu && !stricmp(sashaUpdatedWhen, awu->m_sashaUpdatedWhen.c_str()) && (awu->m_timeCached > timeNow))
  2235. break;
  2236. cache.pop_front();
  2237. }
  2238. if (cache.size() < 1)
  2239. return NULL;
  2240. //Check whether we have the data cache for this cluster. If yes, get the version
  2241. std::list<Linked<ArchivedWuCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareArchivedWUs(filter));
  2242. if(it!=cache.end())
  2243. return it->getLink();
  2244. return NULL;
  2245. }
  2246. void ArchivedWuCache::add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, unsigned numWUsReturned, IArrayOf<IEspECLWorkunit>& wus, IArrayOf<IEspECLWorkunitLW>& lwwus)
  2247. {
  2248. CriticalBlock block(crit);
  2249. //Save new data
  2250. Owned<ArchivedWuCacheElement> e=new ArchivedWuCacheElement(filter, sashaUpdatedWhen, hasNextPage, numWUsReturned, wus, lwwus);
  2251. if (cacheSize > 0)
  2252. {
  2253. if (cache.size() >= cacheSize)
  2254. cache.pop_front();
  2255. cache.push_back(e.get());
  2256. }
  2257. return;
  2258. }
  2259. WsWuJobQueueAuditInfo::WsWuJobQueueAuditInfo(IEspContext &context, const char *cluster, const char *from , const char *to, CHttpResponse* response, const char *xls)
  2260. {
  2261. if(!response)
  2262. return;
  2263. unsigned maxDisplay = 125;
  2264. IArrayOf<IEspThorQueue> items;
  2265. CDateTime fromTime;
  2266. CDateTime toTime;
  2267. StringBuffer fromstr;
  2268. StringBuffer tostr;
  2269. if(from && *from)
  2270. {
  2271. fromTime.setString(from,NULL,false);
  2272. fromTime.getString(fromstr, false);
  2273. }
  2274. if(to && *to)
  2275. {
  2276. toTime.setString(to,NULL,false);
  2277. toTime.getString(tostr, false);
  2278. }
  2279. StringBuffer filter("ThorQueueMonitor");
  2280. if(notEmpty(cluster))
  2281. filter.appendf(",%s", cluster);
  2282. StringAttrArray lines;
  2283. queryAuditLogs(fromTime, toTime, filter.str(), lines);
  2284. unsigned countLines = 0;
  2285. unsigned maxConnected = 0;
  2286. unsigned longestQueue = 0;
  2287. ForEachItemIn(idx, lines)
  2288. {
  2289. const char* line = lines.item(idx).text;
  2290. if(!line || !*line)
  2291. continue;
  2292. if (idx < (lines.length() - 1))
  2293. getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 1, items);
  2294. else
  2295. getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 2, items);
  2296. countLines++;
  2297. }
  2298. StringBuffer responsebuf;
  2299. if (items.length() < 1)
  2300. {
  2301. responsebuf.append("<script language=\"javascript\">\r\nparent.displayQEnd(\'No data found\')</script>\r\n");
  2302. response->sendChunk(responsebuf.str());
  2303. return;
  2304. }
  2305. unsigned itemCount = items.length();
  2306. if (itemCount > maxDisplay)
  2307. itemCount = maxDisplay;
  2308. responsebuf.append("<script language=\"javascript\">parent.displayQLegend()</script>\r\n");
  2309. response->sendChunk(responsebuf.str());
  2310. responsebuf.clear();
  2311. responsebuf.append("<script language=\"javascript\">parent.displayQBegin(").append(longestQueue).append(",").append(maxConnected).append(",").append(itemCount).append(")</script>\r\n");
  2312. response->sendChunk(responsebuf.str());
  2313. responsebuf.clear();
  2314. responsebuf.append("<script language=\"javascript\">\r\n");
  2315. //bool displayDT = false;
  2316. unsigned count = 0;
  2317. unsigned jobpending=0;
  2318. ForEachItemIn(i,items)
  2319. {
  2320. IEspThorQueue& tq = items.item(i);
  2321. //displayDT = !displayDT;
  2322. count++;
  2323. if (count > maxDisplay)
  2324. break;
  2325. StringBuffer countStr, dtStr;
  2326. countStr.appendulong(count);
  2327. //if (displayDT)
  2328. dtStr = tq.getDT();
  2329. responsebuf.append("parent.displayQueue(\'").append(count).append("\',\'").append(dtStr.str()).append("\',\'").append(tq.getRunningWUs()).append("\',");
  2330. responsebuf.append("\'").append(tq.getQueuedWUs()).append("\',\'").append(tq.getWaitingThors()).append("\',");
  2331. responsebuf.append("\'").append(tq.getConnectedThors()).append("\',\'").append(tq.getIdledThors()).append("\',");
  2332. responsebuf.append("\'").append(tq.getRunningWU1()).append("\',\'").append(tq.getRunningWU2()).append("\')\r\n");
  2333. if(++jobpending>=50)
  2334. {
  2335. responsebuf.append("</script>\r\n");
  2336. response->sendChunk(responsebuf.str());
  2337. responsebuf.clear();
  2338. responsebuf.append("<script language=\"javascript\">\r\n");
  2339. jobpending=0;
  2340. }
  2341. }
  2342. StringBuffer countStr;
  2343. countStr.appendulong(count);
  2344. StringBuffer msg("<table><tr><td>");
  2345. msg.append("Total Records in the Time Period: ").append(items.length()).append(" (<a href=\"/WsWorkunits/WUClusterJobQueueLOG?").append(xls).append("\">txt</a>...<a href=\"/WsWorkunits/WUClusterJobQueueXLS?").append(xls).append("\">xls</a>).");
  2346. msg.append("</td></tr><tr><td>");
  2347. if (count > maxDisplay)
  2348. msg.append("Displayed: First ").append(maxDisplay).append(". ");
  2349. msg.append("Max. Queue Length: ").append(longestQueue).append(".");
  2350. msg.append("</td></tr></table>");
  2351. responsebuf.append("parent.displayQEnd(\'").append(msg).append("\')</script>\r\n");
  2352. response->sendChunk(responsebuf.str());
  2353. }
  2354. void WsWuJobQueueAuditInfo::getAuditLineInfo(const char* line, unsigned& longestQueue, unsigned& maxConnected, unsigned maxDisplay, unsigned showAll, IArrayOf<IEspThorQueue>& items)
  2355. {
  2356. //2009-08-12 02:44:12 ,ThorQueueMonitor,thor400_88_dev,0,0,1,1,114,---,---
  2357. if(!line || !*line)
  2358. return;
  2359. Owned<IEspThorQueue> tq = createThorQueue();
  2360. StringBuffer dt, runningWUs, queuedWUs, waitingThors, connectedThors, idledThors, runningWU1, runningWU2;
  2361. // date/time
  2362. const char* bptr = line;
  2363. const char* eptr = strchr(bptr, ',');
  2364. if(eptr)
  2365. dt.append(eptr - bptr, bptr);
  2366. else
  2367. dt.append(bptr);
  2368. tq->setDT(dt.str());
  2369. if(!eptr)
  2370. {
  2371. if (checkNewThorQueueItem(tq, showAll, items))
  2372. items.append(*tq.getClear());
  2373. return;
  2374. }
  2375. //skip title
  2376. bptr = eptr + 1;
  2377. eptr = strchr(bptr, ',');
  2378. if(!eptr)
  2379. {
  2380. if (checkNewThorQueueItem(tq, showAll, items))
  2381. items.append(*tq.getClear());
  2382. return;
  2383. }
  2384. //skip queue name
  2385. bptr = eptr + 1;
  2386. eptr = strchr(bptr, ',');
  2387. if(!eptr)
  2388. {
  2389. if (checkNewThorQueueItem(tq, showAll, items))
  2390. items.append(*tq.getClear());
  2391. return;
  2392. }
  2393. //running
  2394. bptr = eptr + 1;
  2395. eptr = strchr(bptr, ',');
  2396. if(eptr)
  2397. runningWUs.append(eptr - bptr, bptr);
  2398. else
  2399. runningWUs.append(bptr);
  2400. tq->setRunningWUs(runningWUs.str());
  2401. if(!eptr)
  2402. {
  2403. if (checkNewThorQueueItem(tq, showAll, items))
  2404. items.append(*tq.getClear());
  2405. return;
  2406. }
  2407. //queued
  2408. bptr = eptr + 1;
  2409. eptr = strchr(bptr, ',');
  2410. if(eptr)
  2411. queuedWUs.append(eptr - bptr, bptr);
  2412. else
  2413. queuedWUs.append(bptr);
  2414. if (maxDisplay > items.length())
  2415. {
  2416. unsigned queueLen = atoi(queuedWUs.str());
  2417. if (queueLen > longestQueue)
  2418. longestQueue = queueLen;
  2419. }
  2420. tq->setQueuedWUs(queuedWUs.str());
  2421. if(!eptr)
  2422. {
  2423. if (checkNewThorQueueItem(tq, showAll, items))
  2424. items.append(*tq.getClear());
  2425. return;
  2426. }
  2427. //waiting
  2428. bptr = eptr + 1;
  2429. eptr = strchr(bptr, ',');
  2430. if(eptr)
  2431. waitingThors.append(eptr - bptr, bptr);
  2432. else
  2433. waitingThors.append(bptr);
  2434. tq->setWaitingThors(waitingThors.str());
  2435. if(!eptr)
  2436. {
  2437. if (checkNewThorQueueItem(tq, showAll, items))
  2438. items.append(*tq.getClear());
  2439. return;
  2440. }
  2441. //connected
  2442. bptr = eptr + 1;
  2443. eptr = strchr(bptr, ',');
  2444. if(eptr)
  2445. connectedThors.append(eptr - bptr, bptr);
  2446. else
  2447. connectedThors.append(bptr);
  2448. if (maxDisplay > items.length())
  2449. {
  2450. unsigned connnectedLen = atoi(connectedThors.str());
  2451. if (connnectedLen > maxConnected)
  2452. maxConnected = connnectedLen;
  2453. }
  2454. tq->setConnectedThors(connectedThors.str());
  2455. if(!eptr)
  2456. {
  2457. if (checkNewThorQueueItem(tq, showAll, items))
  2458. items.append(*tq.getClear());
  2459. return;
  2460. }
  2461. //idled
  2462. bptr = eptr + 1;
  2463. eptr = strchr(bptr, ',');
  2464. if(eptr)
  2465. idledThors.append(eptr - bptr, bptr);
  2466. else
  2467. idledThors.append(bptr);
  2468. tq->setIdledThors(idledThors.str());
  2469. if(!eptr)
  2470. {
  2471. items.append(*tq.getClear());
  2472. return;
  2473. }
  2474. //runningWU1
  2475. bptr = eptr + 1;
  2476. eptr = strchr(bptr, ',');
  2477. if(eptr)
  2478. runningWU1.append(eptr - bptr, bptr);
  2479. else
  2480. {
  2481. runningWU1.append(bptr);
  2482. }
  2483. if (!strcmp(runningWU1.str(), "---"))
  2484. runningWU1.clear();
  2485. if (runningWU1.length() > 0)
  2486. tq->setRunningWU1(runningWU1.str());
  2487. if(!eptr)
  2488. {
  2489. if (checkNewThorQueueItem(tq, showAll, items))
  2490. items.append(*tq.getClear());
  2491. return;
  2492. }
  2493. //runningWU2
  2494. bptr = eptr + 1;
  2495. eptr = strchr(bptr, ',');
  2496. if(eptr)
  2497. runningWU2.append(eptr - bptr, bptr);
  2498. else
  2499. {
  2500. runningWU2.append(bptr);
  2501. }
  2502. if (!strcmp(runningWU2.str(), "---"))
  2503. runningWU2.clear();
  2504. if (runningWU2.length() > 0)
  2505. tq->setRunningWU2(runningWU2.str());
  2506. if (checkNewThorQueueItem(tq, showAll, items))
  2507. items.append(*tq.getClear());
  2508. }
  2509. bool WsWuJobQueueAuditInfo::checkSameStrings(const char* s1, const char* s2)
  2510. {
  2511. if (s1)
  2512. {
  2513. if (!s2)
  2514. return false;
  2515. if (strcmp(s1, s2))
  2516. return false;
  2517. }
  2518. else if (s2)
  2519. {
  2520. if (!s1)
  2521. return false;
  2522. }
  2523. return true;
  2524. }
  2525. bool WsWuJobQueueAuditInfo::checkNewThorQueueItem(IEspThorQueue* tq, unsigned showAll, IArrayOf<IEspThorQueue>& items)
  2526. {
  2527. bool bAdd = false;
  2528. if (showAll < 1) //show every lines
  2529. bAdd = true;
  2530. else if (items.length() < 1)
  2531. bAdd = true;
  2532. else if (showAll > 1) //last line now
  2533. {
  2534. IEspThorQueue& tq0 = items.item(items.length()-1);
  2535. if (!checkSameStrings(tq->getDT(), tq0.getDT()))
  2536. bAdd = true;
  2537. }
  2538. else
  2539. {
  2540. IEspThorQueue& tq0 = items.item(items.length()-1);
  2541. if (!checkSameStrings(tq->getRunningWUs(), tq0.getRunningWUs()))
  2542. bAdd = true;
  2543. if (!checkSameStrings(tq->getQueuedWUs(), tq0.getQueuedWUs()))
  2544. bAdd = true;
  2545. if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
  2546. bAdd = true;
  2547. if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
  2548. bAdd = true;
  2549. if (!checkSameStrings(tq->getRunningWU1(), tq0.getRunningWU1()))
  2550. bAdd = true;
  2551. if (!checkSameStrings(tq->getRunningWU2(), tq0.getRunningWU2()))
  2552. bAdd = true;
  2553. }
  2554. return bAdd;
  2555. }
  2556. void xsltTransform(const char* xml, const char* sheet, IProperties *params, StringBuffer& ret)
  2557. {
  2558. StringBuffer xsl;
  2559. if(!checkFileExists(sheet))
  2560. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Could not find stylesheet %s.",sheet);
  2561. Owned<IXslProcessor> proc = getXslProcessor();
  2562. Owned<IXslTransform> trans = proc->createXslTransform();
  2563. trans->setXmlSource(xml, strlen(xml));
  2564. trans->loadXslFromFile(sheet);
  2565. trans->copyParameters(params);
  2566. trans->transform(ret);
  2567. }
  2568. bool addToQueryString(StringBuffer &queryString, const char *name, const char *value, const char delim)
  2569. {
  2570. if (isEmpty(name) || isEmpty(value))
  2571. return false;
  2572. if (queryString.length() > 0)
  2573. queryString.append(delim);
  2574. queryString.append(name).append("=").append(value);
  2575. return true;
  2576. }
  2577. int WUSchedule::run()
  2578. {
  2579. try
  2580. {
  2581. PROGLOG("ECLWorkunit WUSchedule Thread started.");
  2582. while(!stopping)
  2583. {
  2584. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  2585. Owned<IConstWorkUnitIterator> itr = factory->getScheduledWorkUnits();
  2586. if (itr)
  2587. {
  2588. ForEach(*itr)
  2589. {
  2590. try
  2591. {
  2592. IConstWorkUnitInfo & cw = itr->query();
  2593. if (factory->isAborting(cw.queryWuid()))
  2594. {
  2595. WorkunitUpdate wu(factory->updateWorkUnit(cw.queryWuid()));
  2596. wu->setState(WUStateAborted);
  2597. continue;
  2598. }
  2599. WsWuDateTime dt, now;
  2600. now.setNow();
  2601. cw.getTimeScheduled(dt);
  2602. if (now.compare(dt)>=0)
  2603. runWorkUnit(cw.queryWuid(), cw.queryClusterName());
  2604. }
  2605. catch(IException *e)
  2606. {
  2607. StringBuffer msg;
  2608. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  2609. e->Release();
  2610. }
  2611. }
  2612. }
  2613. semSchedule.wait(1000*60);
  2614. }
  2615. }
  2616. catch(IException *e)
  2617. {
  2618. StringBuffer msg;
  2619. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  2620. e->Release();
  2621. }
  2622. catch(...)
  2623. {
  2624. ERRLOG("Unknown exception in WsWorkunits Schedule::run");
  2625. }
  2626. if (m_container)
  2627. m_container->exitESP();
  2628. return 0;
  2629. }
  2630. void WsWuHelpers::setXmlParameters(IWorkUnit *wu, const char *xml, bool setJobname)
  2631. {
  2632. if (!xml || !*xml)
  2633. return;
  2634. Owned<IPropertyTree> tree = createPTreeFromXMLString(xml, ipt_none, (PTreeReaderOptions)(ptr_ignoreWhiteSpace | ptr_ignoreNameSpaces));
  2635. IPropertyTree *root = tree.get();
  2636. if (strieq(root->queryName(), "Envelope"))
  2637. root = root->queryPropTree("Body/*[1]");
  2638. if (!root)
  2639. return;
  2640. if (setJobname)
  2641. {
  2642. const char *name = wu->queryJobName();
  2643. if (!name || !*name)
  2644. wu->setJobName(root->queryName());
  2645. }
  2646. wu->setXmlParams(LINK(root));
  2647. }
  2648. void WsWuHelpers::setXmlParameters(IWorkUnit *wu, const char *xml, IArrayOf<IConstNamedValue> *variables, bool setJobname)
  2649. {
  2650. StringBuffer extParamXml;
  2651. if (variables && variables->length())
  2652. {
  2653. Owned<IPropertyTree> paramTree = (xml && *xml) ? createPTreeFromXMLString(xml) : createPTree("input");
  2654. ForEachItemIn(i, *variables)
  2655. {
  2656. IConstNamedValue &item = variables->item(i);
  2657. const char *name = item.getName();
  2658. const char *value = item.getValue();
  2659. if (!name || !*name)
  2660. continue;
  2661. if (!value)
  2662. {
  2663. size_t len = strlen(name);
  2664. char last = name[len-1];
  2665. if (last == '-' || last == '+')
  2666. {
  2667. StringAttr s(name, len-1);
  2668. paramTree->setPropInt(s.get(), last == '+' ? 1 : 0);
  2669. }
  2670. else
  2671. paramTree->setPropInt(name, 1);
  2672. continue;
  2673. }
  2674. paramTree->setProp(name, value);
  2675. }
  2676. toXML(paramTree, extParamXml);
  2677. xml=extParamXml.str();
  2678. }
  2679. setXmlParameters(wu, xml, setJobname);
  2680. }
  2681. void WsWuHelpers::submitWsWorkunit(IEspContext& context, IConstWorkUnit* cw, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, bool resetVariables,
  2682. const char *paramXml, IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs, IArrayOf<IConstApplicationValue> *applications)
  2683. {
  2684. ensureWsWorkunitAccess(context, *cw, SecAccess_Write);
  2685. #ifndef _NO_LDAP
  2686. CLdapSecManager* secmgr = dynamic_cast<CLdapSecManager*>(context.querySecManager());
  2687. // View Scope is checked only when LDAP secmgr is available AND checkViewPermissions config is also enabled.
  2688. // Otherwise, the view permission check is skipped, and WU is submitted as normal.
  2689. if (secmgr && secmgr->getCheckViewPermissions())
  2690. {
  2691. StringArray filenames, columnnames;
  2692. if (cw->getFieldUsageArray(filenames, columnnames, cluster)) // check view permission only for a query with fieldUsage information
  2693. {
  2694. if (!secmgr->authorizeViewScope(*context.queryUser(), filenames, columnnames))
  2695. throw MakeStringException(ECLWATCH_VIEW_ACCESS_DENIED, "View Access denied for a WU: %s", cw->queryWuid());
  2696. }
  2697. }
  2698. #endif
  2699. switch(cw->getState())
  2700. {
  2701. case WUStateRunning:
  2702. case WUStateDebugPaused:
  2703. case WUStateDebugRunning:
  2704. case WUStateCompiling:
  2705. case WUStateAborting:
  2706. case WUStateBlocked:
  2707. throw MakeStringException(ECLWATCH_CANNOT_SUBMIT_WORKUNIT, "Cannot submit the workunit. Workunit state is '%s'.", cw->queryStateDesc());
  2708. }
  2709. StringAttr wuid(cw->queryWuid());
  2710. WorkunitUpdate wu(&cw->lock());
  2711. if(!wu.get())
  2712. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Cannot update workunit %s.", wuid.str());
  2713. wu->clearExceptions();
  2714. if(notEmpty(cluster))
  2715. wu->setClusterName(cluster);
  2716. if(notEmpty(snapshot))
  2717. wu->setSnapshot(snapshot);
  2718. wu->setState(WUStateSubmitted);
  2719. if (maxruntime)
  2720. wu->setDebugValueInt("maxRunTime",maxruntime,true);
  2721. if (debugs && debugs->length())
  2722. {
  2723. ForEachItemIn(i, *debugs)
  2724. {
  2725. IConstNamedValue &item = debugs->item(i);
  2726. const char *name = item.getName();
  2727. const char *value = item.getValue();
  2728. if (!name || !*name)
  2729. continue;
  2730. if (!value)
  2731. {
  2732. size_t len = strlen(name);
  2733. char last = name[len-1];
  2734. if (last == '-' || last == '+')
  2735. {
  2736. StringAttr s(name, len-1);
  2737. wu->setDebugValueInt(s.get(), last == '+' ? 1 : 0, true);
  2738. }
  2739. else
  2740. wu->setDebugValueInt(name, 1, true);
  2741. continue;
  2742. }
  2743. wu->setDebugValue(name, value, true);
  2744. }
  2745. }
  2746. if (applications)
  2747. {
  2748. ForEachItemIn(ii, *applications)
  2749. {
  2750. IConstApplicationValue& item = applications->item(ii);
  2751. if(notEmpty(item.getApplication()) && notEmpty(item.getName()))
  2752. wu->setApplicationValue(item.getApplication(), item.getName(), item.getValue(), true);
  2753. }
  2754. }
  2755. if (resetWorkflow)
  2756. wu->resetWorkflow();
  2757. if (!compile)
  2758. wu->schedule();
  2759. if (resetVariables)
  2760. {
  2761. SCMStringBuffer varname;
  2762. Owned<IConstWUResultIterator> vars = &wu->getVariables();
  2763. ForEach (*vars)
  2764. {
  2765. vars->query().getResultName(varname);
  2766. Owned<IWUResult> v = wu->updateVariableByName(varname.str());
  2767. if (v)
  2768. v->setResultStatus(ResultStatusUndefined);
  2769. }
  2770. }
  2771. setXmlParameters(wu, paramXml, variables, (wu->getAction()==WUActionExecuteExisting));
  2772. wu->commit();
  2773. wu.clear();
  2774. if (!compile)
  2775. runWorkUnit(wuid.str());
  2776. else if (context.querySecManager())
  2777. secSubmitWorkUnit(wuid.str(), *context.querySecManager(), *context.queryUser());
  2778. else
  2779. submitWorkUnit(wuid.str(), context.queryUserId(), context.queryPassword());
  2780. AuditSystemAccess(context.queryUserId(), true, "Submitted %s", wuid.str());
  2781. }
  2782. void WsWuHelpers::submitWsWorkunit(IEspContext& context, const char *wuid, const char* cluster, const char* snapshot, int maxruntime, bool compile, bool resetWorkflow, bool resetVariables,
  2783. const char *paramXml, IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs, IArrayOf<IConstApplicationValue> *applications)
  2784. {
  2785. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2786. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid);
  2787. if(!cw)
  2788. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot open workunit %s.",wuid);
  2789. return submitWsWorkunit(context, cw, cluster, snapshot, maxruntime, compile, resetWorkflow, resetVariables, paramXml, variables, debugs, applications);
  2790. }
  2791. void WsWuHelpers::copyWsWorkunit(IEspContext &context, IWorkUnit &wu, const char *srcWuid)
  2792. {
  2793. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  2794. Owned<IConstWorkUnit> src(factory->openWorkUnit(srcWuid));
  2795. queryExtendedWU(&wu)->copyWorkUnit(src, false);
  2796. SCMStringBuffer token;
  2797. wu.setSecurityToken(createToken(wu.queryWuid(), context.queryUserId(), context.queryPassword(), token).str());
  2798. wu.commit();
  2799. }
  2800. void WsWuHelpers::runWsWorkunit(IEspContext &context, StringBuffer &wuid, const char *srcWuid, const char *cluster, const char *paramXml,
  2801. IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs, IArrayOf<IConstApplicationValue> *applications)
  2802. {
  2803. NewWsWorkunit wu(context);
  2804. wuid.set(wu->queryWuid());
  2805. copyWsWorkunit(context, *wu, srcWuid);
  2806. wu.clear();
  2807. submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, true, paramXml, variables, debugs, applications);
  2808. }
  2809. void WsWuHelpers::runWsWorkunit(IEspContext &context, IConstWorkUnit *cw, const char *srcWuid, const char *cluster, const char *paramXml,
  2810. IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs, IArrayOf<IConstApplicationValue> *applications)
  2811. {
  2812. WorkunitUpdate wu(&cw->lock());
  2813. copyWsWorkunit(context, *wu, srcWuid);
  2814. wu.clear();
  2815. submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, true, paramXml, variables, debugs, applications);
  2816. }
  2817. IException * WsWuHelpers::noteException(IWorkUnit *wu, IException *e, ErrorSeverity level)
  2818. {
  2819. if (wu)
  2820. {
  2821. Owned<IWUException> we = wu->createException();
  2822. StringBuffer s;
  2823. we->setExceptionMessage(e->errorMessage(s).str());
  2824. we->setExceptionSource("WsWorkunits");
  2825. we->setSeverity(level);
  2826. if (level==SeverityError)
  2827. wu->setState(WUStateFailed);
  2828. }
  2829. return e;
  2830. }
  2831. StringBuffer & WsWuHelpers::resolveQueryWuid(StringBuffer &wuid, const char *queryset, const char *query, bool notSuspended, IWorkUnit *wu)
  2832. {
  2833. Owned<IPropertyTree> qs = getQueryRegistry(queryset, true);
  2834. if (!qs)
  2835. throw noteException(wu, MakeStringException(ECLWATCH_QUERYSET_NOT_FOUND, "QuerySet '%s' not found", queryset));
  2836. Owned<IPropertyTree> q = resolveQueryAlias(qs, query);
  2837. if (!q)
  2838. throw noteException(wu, MakeStringException(ECLWATCH_QUERYID_NOT_FOUND, "Query '%s/%s' not found", queryset, query));
  2839. if (notSuspended && q->getPropBool("@suspended"))
  2840. throw noteException(wu, MakeStringException(ECLWATCH_QUERY_SUSPENDED, "Query '%s/%s' is suspended", queryset, query));
  2841. return wuid.append(q->queryProp("@wuid"));
  2842. }
  2843. void WsWuHelpers::runWsWuQuery(IEspContext &context, IConstWorkUnit *cw, const char *queryset, const char *query,
  2844. const char *cluster, const char *paramXml, IArrayOf<IConstApplicationValue> *applications)
  2845. {
  2846. StringBuffer srcWuid;
  2847. WorkunitUpdate wu(&cw->lock());
  2848. resolveQueryWuid(srcWuid, queryset, query, true, wu);
  2849. copyWsWorkunit(context, *wu, srcWuid);
  2850. wu.clear();
  2851. submitWsWorkunit(context, cw, cluster, NULL, 0, false, true, true, paramXml, NULL, NULL, applications);
  2852. }
  2853. void WsWuHelpers::runWsWuQuery(IEspContext &context, StringBuffer &wuid, const char *queryset, const char *query,
  2854. const char *cluster, const char *paramXml, IArrayOf<IConstApplicationValue> *applications)
  2855. {
  2856. StringBuffer srcWuid;
  2857. NewWsWorkunit wu(context);
  2858. wuid.set(wu->queryWuid());
  2859. resolveQueryWuid(srcWuid, queryset, query, true, wu);
  2860. copyWsWorkunit(context, *wu, srcWuid);
  2861. wu.clear();
  2862. submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, false, true, true, paramXml, NULL, NULL, applications);
  2863. }
  2864. void WsWuHelpers::checkAndTrimWorkunit(const char* methodName, StringBuffer& input)
  2865. {
  2866. const char* trimmedInput = input.trim().str();
  2867. if (isEmpty(trimmedInput))
  2868. throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: Workunit ID not set", methodName);
  2869. if (!looksLikeAWuid(trimmedInput, 'W'))
  2870. throw MakeStringException(ECLWATCH_INVALID_INPUT, "%s: Invalid Workunit ID: %s", methodName, trimmedInput);
  2871. return;
  2872. }
  2873. }