ws_workunitsHelpers.cpp 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jlib.hpp"
  15. #include "ws_workunitsHelpers.hpp"
  16. #include "exception_util.hpp"
  17. #include "daclient.hpp"
  18. #include "dalienv.hpp"
  19. #include "daaudit.hpp"
  20. #include "portlist.h"
  21. #include "dadfs.hpp"
  22. #include "fileview.hpp"
  23. #include "wuwebview.hpp"
  24. #include "dllserver.hpp"
  25. #include "wujobq.hpp"
  26. #ifdef _USE_ZLIB
  27. #include "zcrypt.hpp"
  28. #endif
  29. //#include "workunit.hpp"
  30. //#include "daclient.hpp"
  31. //#include "dalienv.hpp"
  32. //#include "exception_util.hpp"
  33. //#include "wujobq.hpp"
  34. //#include "eventqueue.hpp"
  35. //#include "hqlerror.hpp"
  36. //#include "sacmd.hpp"
  37. //#include "portlist.h"
  38. //#define OWN_WU_ACCESS "OwnWorkunitsAccess"
  39. //#define OTHERS_WU_ACCESS "OthersWorkunitsAccess"
  40. //const unsigned MAXTHORS = 1024;
  41. //#define File_Cpp "cpp"
  42. //#define File_ThorLog "ThorLog"
  43. //#define File_ThorSlaveLog "ThorSlaveLog"
  44. //#define File_EclAgentLog "EclAgentLog"
  45. //#define File_XML "XML"
  46. //#define File_Res "res"
  47. //#define File_DLL "dll"
  48. //#define File_ArchiveQuery "ArchiveQuery"
  49. namespace ws_workunits {
  50. SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, const char *owner, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
  51. {
  52. return (isEmpty(owner) || (user && streq(user, owner))) ? accessOwn : accessOthers;
  53. }
  54. SecAccessFlags chooseWuAccessFlagsByOwnership(const char *user, IConstWorkUnit& cw, SecAccessFlags accessOwn, SecAccessFlags accessOthers)
  55. {
  56. SCMStringBuffer owner;
  57. return chooseWuAccessFlagsByOwnership(user, cw.getUser(owner).str(), accessOwn, accessOthers);
  58. }
  59. const char *getWuAccessType(const char *owner, const char *user)
  60. {
  61. return (isEmpty(owner) || (user && streq(user, owner))) ? OWN_WU_ACCESS : OTHERS_WU_ACCESS;
  62. }
  63. const char *getWuAccessType(IConstWorkUnit& cw, const char *user)
  64. {
  65. SCMStringBuffer owner;
  66. return getWuAccessType(cw.getUser(owner).str(), user);
  67. }
  68. void getUserWuAccessFlags(IEspContext& context, SecAccessFlags& accessOwn, SecAccessFlags& accessOthers, bool except)
  69. {
  70. if (!context.authorizeFeature(OWN_WU_ACCESS, accessOwn))
  71. accessOwn = SecAccess_None;
  72. if (!context.authorizeFeature(OTHERS_WU_ACCESS, accessOthers))
  73. accessOthers = SecAccess_None;
  74. if (except && (accessOwn == SecAccess_None) && (accessOthers == SecAccess_None))
  75. {
  76. AuditSystemAccess(context.queryUserId(), false, "Access Denied: User can't view any workunits");
  77. VStringBuffer msg("Access Denied: User %s does not have rights to access workunits.", context.queryUserId());
  78. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "%s", msg.str());
  79. }
  80. }
  81. SecAccessFlags getWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw)
  82. {
  83. SecAccessFlags accessFlag = SecAccess_None;
  84. cxt.authorizeFeature(getWuAccessType(cw, cxt.queryUserId()), accessFlag);
  85. return accessFlag;
  86. }
  87. void ensureWsWorkunitAccessByOwnerId(IEspContext& cxt, const char* owner, SecAccessFlags minAccess)
  88. {
  89. if (!cxt.validateFeatureAccess(getWuAccessType(owner, cxt.queryUserId()), minAccess, false))
  90. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
  91. }
  92. void ensureWsWorkunitAccess(IEspContext& cxt, IConstWorkUnit& cw, SecAccessFlags minAccess)
  93. {
  94. if (!cxt.validateFeatureAccess(getWuAccessType(cw, cxt.queryUserId()), minAccess, false))
  95. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to access workunit. Permission denied.");
  96. }
  97. void ensureWsWorkunitAccess(IEspContext& context, const char* wuid, SecAccessFlags minAccess)
  98. {
  99. Owned<IWorkUnitFactory> wf = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  100. Owned<IConstWorkUnit> cw = wf->openWorkUnit(wuid, false);
  101. if (!cw)
  102. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to open workunit %s", wuid);
  103. ensureWsWorkunitAccess(context, *cw, minAccess);
  104. }
  105. void ensureWsCreateWorkunitAccess(IEspContext& cxt)
  106. {
  107. if (!cxt.validateFeatureAccess(OWN_WU_ACCESS, SecAccess_Write, false))
  108. throw MakeStringException(ECLWATCH_ECL_WU_ACCESS_DENIED, "Failed to create workunit. Permission denied.");
  109. }
  110. StringBuffer &getWuidFromLogicalFileName(IEspContext &context, const char *logicalName, StringBuffer &wuid)
  111. {
  112. Owned<IUserDescriptor> userdesc = createUserDescriptor();
  113. userdesc->set(context.queryUserId(), context.queryPassword());
  114. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
  115. if (!df)
  116. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.",logicalName);
  117. return wuid.append(df->queryProperties().queryProp("@workunit"));
  118. }
  119. const char *getThorQueueName(const char *cluster)
  120. {
  121. SCMStringBuffer thorQueues;
  122. getThorQueueNames(thorQueues, cluster);
  123. return thorQueues.str();
  124. }
  125. void formatDuration(StringBuffer &s, unsigned ms)
  126. {
  127. unsigned days = ms / (1000*60*60*24);
  128. ms %= (1000*60*60*24);
  129. unsigned hours = ms / (1000*60*60);
  130. ms %= (1000*60*60);
  131. unsigned mins = ms / (1000*60);
  132. ms %= (1000*60);
  133. unsigned secs = ms / 1000;
  134. ms %= 1000;
  135. if (days)
  136. s.appendf("%d days ", days);
  137. if (hours || s.length())
  138. s.appendf("%d:", hours);
  139. if (mins || s.length())
  140. s.appendf("%d:", mins);
  141. if (s.length())
  142. s.appendf("%02d.%03d", secs, ms);
  143. else
  144. s.appendf("%d.%03d", secs, ms);
  145. }
  146. WsWUExceptions::WsWUExceptions(IConstWorkUnit& wu): numerr(0), numwrn(0), numinf(0)
  147. {
  148. Owned<IConstWUExceptionIterator> it = &wu.getExceptions();
  149. ForEach(*it)
  150. {
  151. SCMStringBuffer src, msg, file;
  152. Owned<IEspECLException> e= createECLException("","");
  153. e->setCode(it->query().getExceptionCode());
  154. e->setSource(it->query().getExceptionSource(src).str());
  155. e->setMessage(it->query().getExceptionMessage(msg).str());
  156. e->setFileName(it->query().getExceptionFileName(file).str());
  157. e->setLineNo(it->query().getExceptionLineNo());
  158. e->setColumn(it->query().getExceptionColumn());
  159. const char * label = "";
  160. switch (it->query().getSeverity())
  161. {
  162. default:
  163. case ExceptionSeverityError: label = "Error"; numerr++; break;
  164. case ExceptionSeverityWarning: label = "Warning"; numwrn++; break;
  165. case ExceptionSeverityInformation: label = "Info"; numinf++; break;
  166. }
  167. e->setSeverity(label);
  168. errors.append(*e.getLink());
  169. }
  170. }
  171. #define SDS_LOCK_TIMEOUT 30000
  172. void getSashaNode(SocketEndpoint &ep)
  173. {
  174. Owned<IRemoteConnection> econn = querySDS().connect("/Environment", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
  175. if (!econn)
  176. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI,"Cannot connect to DALI server.");
  177. IPropertyTree *root = econn->queryRoot();
  178. if (!root)
  179. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO,"Cannot get environment information.");
  180. IPropertyTree *pt = root->queryPropTree("Software/SashaServerProcess/Instance[1]");
  181. if (!pt)
  182. throw MakeStringException(ECLWATCH_ARCHIVE_SERVER_NOT_FOUND, "Archive Server not found.");
  183. ep.set(pt->queryProp("@netAddress"), pt->getPropInt("@port",DEFAULT_SASHA_PORT));
  184. }
  185. bool WsWuInfo::getSourceFiles(IEspECLWorkunit &info, unsigned flags)
  186. {
  187. if (!(flags & WUINFO_IncludeSourceFiles))
  188. return true;
  189. try
  190. {
  191. Owned<IUserDescriptor> userdesc;
  192. StringBuffer username;
  193. context.getUserID(username);
  194. const char* passwd = context.queryPassword();
  195. userdesc.setown(createUserDescriptor());
  196. userdesc->set(username.str(), passwd);
  197. IArrayOf<IEspECLSourceFile> files;
  198. if (version < 1.27)
  199. {
  200. Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
  201. ForEach(*f)
  202. {
  203. IPropertyTree &query = f->query();
  204. const char *clusterName = query.queryProp("@cluster");
  205. const char *fileName = query.queryProp("@name");
  206. int fileCount = query.getPropInt("@useCount");
  207. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  208. if(clusterName && *clusterName)
  209. {
  210. file->setFileCluster(clusterName);
  211. }
  212. if (version > 1.11)
  213. {
  214. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  215. if (filetrees->first())
  216. file->setIsSuperFile(true);
  217. }
  218. if (fileName && *fileName)
  219. {
  220. file->setName(fileName);
  221. }
  222. file->setCount(fileCount);
  223. files.append(*file.getLink());
  224. }
  225. }
  226. else
  227. {
  228. StringArray fileNames;
  229. Owned<IPropertyTreeIterator> f=&cw->getFilesReadIterator();
  230. ForEach(*f)
  231. {
  232. IPropertyTree &query = f->query();
  233. const char *clusterName = query.queryProp("@cluster");
  234. const char *fileName = query.queryProp("@name");
  235. int fileCount = query.getPropInt("@useCount");
  236. bool bFound = false;
  237. if (fileName && *fileName && (fileNames.length() > 0))
  238. {
  239. for (unsigned i = 0; i < fileNames.length(); i++ )
  240. {
  241. const char *fileName0 = fileNames.item(i);
  242. if (!stricmp(fileName, fileName0))
  243. {
  244. bFound = true;
  245. break;
  246. }
  247. }
  248. }
  249. if (bFound)
  250. continue;
  251. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  252. if(clusterName && *clusterName)
  253. {
  254. file->setFileCluster(clusterName);
  255. }
  256. if (fileName && *fileName)
  257. {
  258. file->setName(fileName);
  259. }
  260. file->setCount(fileCount);
  261. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  262. if (filetrees->first())
  263. {
  264. file->setIsSuperFile(true);
  265. getSubFiles(filetrees, file, fileNames);
  266. }
  267. files.append(*file.getLink());
  268. }
  269. }
  270. info.setSourceFiles(files);
  271. return true;
  272. }
  273. catch(IException* e)
  274. {
  275. StringBuffer eMsg;
  276. ERRLOG("%s", e->errorMessage(eMsg).str()); //log original exception
  277. e->Release();
  278. }
  279. return false;
  280. }
  281. void WsWuInfo::getExceptions(IEspECLWorkunit &info, unsigned flags)
  282. {
  283. if ((flags & WUINFO_IncludeExceptions) || version > 1.16)
  284. {
  285. WsWUExceptions errors(*cw);
  286. if (version > 1.16)
  287. {
  288. info.setErrorCount(errors.ErrCount());
  289. info.setWarningCount(errors.WrnCount());
  290. info.setInfoCount(errors.InfCount());
  291. }
  292. if ((flags & WUINFO_IncludeExceptions))
  293. info.setExceptions(errors);
  294. }
  295. }
  296. bool WsWuInfo::getVariables(IEspECLWorkunit &info, unsigned flags)
  297. {
  298. if (!(flags & WUINFO_IncludeVariables))
  299. return true;
  300. try
  301. {
  302. IArrayOf<IEspECLResult> results;
  303. Owned<IConstWUResultIterator> vars = &cw->getVariables();
  304. ForEach(*vars)
  305. getResult(vars->query(), results, flags);
  306. info.setVariables(results);
  307. results.kill();
  308. return true;
  309. }
  310. catch(IException* e)
  311. {
  312. StringBuffer eMsg;
  313. ERRLOG("%s", e->errorMessage(eMsg).str());
  314. e->Release();
  315. }
  316. return false;
  317. }
  318. bool WsWuInfo::getTimers(IEspECLWorkunit &info, unsigned flags)
  319. {
  320. if (!(flags & WUINFO_IncludeTimers))
  321. return true;
  322. try
  323. {
  324. IArrayOf<IEspECLTimer> timers;
  325. Owned<IStringIterator> it = &cw->getTimers();
  326. ForEach(*it)
  327. {
  328. SCMStringBuffer name;
  329. it->str(name);
  330. SCMStringBuffer value;
  331. unsigned count = cw->getTimerCount(name.str(), NULL);
  332. unsigned duration = cw->getTimerDuration(name.str(), NULL);
  333. StringBuffer fd;
  334. formatDuration(fd, duration);
  335. for (unsigned i = 0; i < name.length(); i++)
  336. if (name.s.charAt(i)=='_')
  337. name.s.setCharAt(i, ' ');
  338. Owned<IEspECLTimer> t= createECLTimer("","");
  339. t->setName(name.str());
  340. t->setValue(fd.str());
  341. t->setCount(count);
  342. if (version > 1.19)
  343. {
  344. StringBuffer graphName;
  345. unsigned subGraphNum;
  346. unsigned __int64 subId;
  347. if (parseGraphTimerLabel(name.str(), graphName, subGraphNum, subId))
  348. {
  349. if (graphName.length() > 0)
  350. {
  351. t->setGraphName(graphName.str());
  352. }
  353. if (subId > 0)
  354. {
  355. t->setSubGraphId((int)subId);
  356. }
  357. }
  358. }
  359. timers.append(*t.getLink());
  360. }
  361. info.setTimers(timers);
  362. return true;
  363. }
  364. catch(IException* e)
  365. {
  366. StringBuffer eMsg;
  367. e->errorMessage(eMsg);
  368. ERRLOG("%s", eMsg.str()); //log original exception
  369. e->Release();
  370. }
  371. return false;
  372. }
  373. const unsigned MAXTHORS = 1024;
  374. bool WsWuInfo::getHelpers(IEspECLWorkunit &info, unsigned flags)
  375. {
  376. try
  377. {
  378. Owned <IConstWUQuery> query = cw->getQuery();
  379. if(query)
  380. {
  381. SCMStringBuffer qname;
  382. query->getQueryShortText(qname);
  383. if(qname.length())
  384. {
  385. if((flags & WUINFO_TruncateEclTo64k) && (qname.length() > 64000))
  386. qname.setLen(qname.str(), 64000);
  387. IEspECLQuery* q=&info.updateQuery();
  388. q->setText(qname.str());
  389. }
  390. if (version > 1.30)
  391. {
  392. SCMStringBuffer qText;
  393. query->getQueryText(qText);
  394. if ((qText.length() > 0) && isArchiveQuery(qText.str()))
  395. info.setHasArchiveQuery(true);
  396. }
  397. IArrayOf<IEspECLHelpFile> helpers;
  398. getHelpFiles(query, FileTypeCpp, helpers);
  399. getHelpFiles(query, FileTypeDll, helpers);
  400. getHelpFiles(query, FileTypeResText, helpers);
  401. SCMStringBuffer name;
  402. for (int i0 = 1; i0 < MAXTHORS; i0++)
  403. {
  404. StringBuffer fileType;
  405. if (i0 < 2)
  406. fileType.append(File_ThorLog);
  407. else
  408. fileType.appendf("%s%d", File_ThorLog, i0);
  409. cw->getDebugValue(fileType.str(), name);
  410. if(name.length() < 1)
  411. break;
  412. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  413. h->setName(name.str());
  414. h->setType(fileType.str());
  415. helpers.append(*h.getLink());
  416. name.clear();
  417. }
  418. cw->getDebugValue("EclAgentLog", name);
  419. if(name.length())
  420. {
  421. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  422. h->setName(name.str());
  423. h->setType("EclAgentLog");
  424. helpers.append(*h.getLink());
  425. name.clear();
  426. }
  427. info.setHelpers(helpers);
  428. return true;
  429. }
  430. }
  431. catch(IException* e)
  432. {
  433. StringBuffer eMsg;
  434. e->errorMessage(eMsg);
  435. ERRLOG("%s", eMsg.str()); //log original exception
  436. e->Release();
  437. }
  438. return false;
  439. }
  440. bool WsWuInfo::getApplicationValues(IEspECLWorkunit &info, unsigned flags)
  441. {
  442. if (!(flags & WUINFO_IncludeApplicationValues))
  443. return true;
  444. try
  445. {
  446. IArrayOf<IEspApplicationValue> av;
  447. Owned<IConstWUAppValueIterator> app(&cw->getApplicationValues());
  448. ForEach(*app)
  449. {
  450. IConstWUAppValue& val=app->query();
  451. SCMStringBuffer buf;
  452. Owned<IEspApplicationValue> t= createApplicationValue("","");
  453. t->setApplication(val.getApplication(buf).str());
  454. t->setValue(val.getValue(buf).str());
  455. t->setName(val.getName(buf).str());
  456. t->setValue(val.getValue(buf).str());
  457. av.append(*t.getLink());
  458. }
  459. info.setApplicationValues(av);
  460. return true;
  461. }
  462. catch(IException* e)
  463. {
  464. StringBuffer eMsg;
  465. e->errorMessage(eMsg);
  466. ERRLOG("%s", eMsg.str()); //log original exception
  467. e->Release();
  468. }
  469. return false;
  470. }
  471. bool WsWuInfo::getDebugValues(IEspECLWorkunit &info, unsigned flags)
  472. {
  473. if (!(flags & WUINFO_IncludeDebugValues))
  474. return true;
  475. try
  476. {
  477. IArrayOf<IEspDebugValue> dv;
  478. Owned<IStringIterator> debugs(&cw->getDebugValues());
  479. ForEach(*debugs)
  480. {
  481. SCMStringBuffer name, val;
  482. debugs->str(name);
  483. cw->getDebugValue(name.str(),val);
  484. Owned<IEspDebugValue> t= createDebugValue("","");
  485. t->setName(name.str());
  486. t->setValue(val.str());
  487. dv.append(*t.getLink());
  488. }
  489. info.setDebugValues(dv);
  490. return true;
  491. }
  492. catch(IException* e)
  493. {
  494. StringBuffer eMsg;
  495. e->errorMessage(eMsg);
  496. ERRLOG("%s", eMsg.str()); //log original exception
  497. e->Release();
  498. }
  499. return false;
  500. }
  501. const char *getGraphNum(const char *s,unsigned &num)
  502. {
  503. while (*s && !isdigit(*s))
  504. s++;
  505. num = 0;
  506. while (isdigit(*s))
  507. {
  508. num = num*10+*s-'0';
  509. s++;
  510. }
  511. return s;
  512. }
  513. bool WsWuInfo::getGraphInfo(IEspECLWorkunit &info, unsigned flags)
  514. {
  515. if (version > 1.01)
  516. {
  517. info.setHaveSubGraphTimings(false);
  518. StringBuffer xpath("/WorkUnits/");
  519. xpath.append(wuid.str());
  520. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000);
  521. if (!conn)
  522. {
  523. DBGLOG("Could not connect to SDS");
  524. return false;
  525. }
  526. IPropertyTree *wpt = conn->queryRoot();
  527. if (!wpt)
  528. return false;
  529. Owned<IPropertyTreeIterator> iter = wpt->getElements("Timings/Timing");
  530. StringBuffer name;
  531. IArrayOf<IConstECLTimingData> timingdatarray;
  532. ForEach(*iter)
  533. {
  534. if (iter->query().getProp("@name",name.clear()))
  535. {
  536. if ((name.length()>11) && (strncmp("Graph graph", name.str(), 11)==0))
  537. {
  538. unsigned gn;
  539. const char *s = getGraphNum(name.str()+11, gn);
  540. unsigned sn;
  541. s = getGraphNum(s,sn);
  542. if (gn && sn)
  543. {
  544. info.setHaveSubGraphTimings(true);
  545. break;
  546. }
  547. }
  548. }
  549. }
  550. }
  551. if (!(flags & WUINFO_IncludeGraphs))
  552. return true;
  553. try
  554. {
  555. SCMStringBuffer runningGraph;
  556. WUGraphIDType id;
  557. WUState st = cw->getState();
  558. bool running = (!(st==WUStateFailed || st==WUStateAborted || st==WUStateCompleted) && cw->getRunningGraph(runningGraph,id));
  559. IArrayOf<IEspECLGraph> graphs;
  560. Owned<IConstWUGraphIterator> it = &cw->getGraphs(GraphTypeAny);
  561. ForEach(*it)
  562. {
  563. IConstWUGraph &graph = it->query();
  564. if(!graph.isValid())
  565. continue;
  566. SCMStringBuffer name, label, type;
  567. graph.getName(name);
  568. graph.getLabel(label);
  569. graph.getTypeName(type);
  570. Owned<IEspECLGraph> g= createECLGraph("","");
  571. g->setName(name.str());
  572. g->setLabel(label.str());
  573. g->setType(type.str());
  574. if(running && strcmp(name.str(),runningGraph.str())==0)
  575. {
  576. g->setRunning(true);
  577. g->setRunningId(id);
  578. }
  579. Owned<IConstWUGraphProgress> progress = cw->getGraphProgress(name.str());
  580. if (progress)
  581. {
  582. WUGraphState graphstate= progress->queryGraphState();
  583. if (graphstate == WUGraphComplete)
  584. g->setComplete(true);
  585. if (version > 1.13 && graphstate == WUGraphFailed)
  586. {
  587. g->setFailed(true);
  588. }
  589. }
  590. graphs.append(*g.getLink());
  591. }
  592. info.setGraphs(graphs);
  593. return true;
  594. }
  595. catch(IException* e)
  596. {
  597. StringBuffer eMsg;
  598. e->errorMessage(eMsg);
  599. ERRLOG("%s", eMsg.str()); //log original exception
  600. e->Release();
  601. }
  602. return false;
  603. }
  604. void WsWuInfo::getGraphTimingData(IArrayOf<IConstECLTimingData> &timingData, unsigned flags)
  605. {
  606. StringBuffer xpath("/WorkUnits/");
  607. xpath.append(wuid.str());
  608. Owned<IRemoteConnection> conn = querySDS().connect(xpath.str(), myProcessSession(), 0, 5*60*1000);
  609. if (!conn)
  610. {
  611. DBGLOG("Could not connect to SDS");
  612. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI, "Cannot connect to dali server.");
  613. }
  614. IPropertyTree *wpt = conn->queryRoot();
  615. Owned<IPropertyTreeIterator> iter = wpt->getElements("Timings/Timing");
  616. ForEach(*iter)
  617. {
  618. StringBuffer name;
  619. if (iter->query().getProp("@name", name))
  620. {
  621. if ((name.length()>11)&&(strncmp("Graph graph", name.str(), 11)==0))
  622. {
  623. unsigned gn;
  624. const char *s = getGraphNum(name.str(),gn);
  625. unsigned sn;
  626. s = getGraphNum(s, sn);
  627. if (gn && sn)
  628. {
  629. const char *gs = strchr(name.str(),'(');
  630. unsigned gid = 0;
  631. if (gs)
  632. getGraphNum(gs+1, gid);
  633. unsigned time = iter->query().getPropInt("@duration");
  634. Owned<IEspECLTimingData> g = createECLTimingData();
  635. g->setName(name.str());
  636. g->setGraphNum(gn);
  637. g->setSubGraphNum(sn);
  638. g->setGID(gid);
  639. g->setMS(time);
  640. g->setMin(time/60000);
  641. timingData.append(*g.getClear());
  642. }
  643. }
  644. }
  645. }
  646. }
  647. void WsWuInfo::getRoxieCluster(IEspECLWorkunit &info, unsigned flags)
  648. {
  649. if (version > 1.06)
  650. {
  651. Owned<IConstWURoxieQueryInfo> roxieQueryInfo = cw->getRoxieQueryInfo();
  652. if (roxieQueryInfo)
  653. {
  654. SCMStringBuffer roxieClusterName;
  655. roxieQueryInfo->getRoxieClusterName(roxieClusterName);
  656. info.setRoxieCluster(roxieClusterName.str());
  657. }
  658. }
  659. }
  660. void WsWuInfo::getCommon(IEspECLWorkunit &info, unsigned flags)
  661. {
  662. SCMStringBuffer s;
  663. info.setWuid(cw->getWuid(s).str());
  664. info.setProtected(cw->isProtected() ? 1 : 0);
  665. info.setJobname(cw->getJobName(s).str());
  666. info.setOwner(cw->getUser(s).str());
  667. info.setCluster(cw->getClusterName(clusterName).str());
  668. info.setSnapshot(cw->getSnapshot(s).str());
  669. if ((cw->getState() == WUStateScheduled) && cw->aborting())
  670. {
  671. info.setStateID(WUStateAborting);
  672. info.setState("aborting");
  673. }
  674. else
  675. {
  676. info.setStateID(cw->getState());
  677. info.setState(cw->getStateDesc(s).str());
  678. }
  679. if (cw->isPausing())
  680. info.setIsPausing(true);
  681. if (version > 1.27)
  682. {
  683. StringBuffer totalThorTimeStr;
  684. unsigned totalThorTimeMS = cw->getTimerDuration("Total thor time", NULL);
  685. formatDuration(totalThorTimeStr, totalThorTimeMS);
  686. info.setTotalThorTime(totalThorTimeStr.str());
  687. }
  688. WsWuDateTime dt;
  689. cw->getTimeScheduled(dt);
  690. if(dt.isValid())
  691. info.setDateTimeScheduled(dt.getString(s).str());
  692. getRoxieCluster(info, flags);
  693. }
  694. void WsWuInfo::getInfo(IEspECLWorkunit &info, unsigned flags)
  695. {
  696. getCommon(info, flags);
  697. SecAccessFlags accessFlag = getWsWorkunitAccess(context, *cw);
  698. info.setAccessFlag(accessFlag);
  699. SCMStringBuffer s;
  700. info.setStateEx(cw->getStateEx(s).str());
  701. info.setPriorityClass(cw->getPriority());
  702. info.setPriorityLevel(cw->getPriorityLevel());
  703. info.setScope(cw->getWuScope(s).str());
  704. info.setActionEx(cw->getActionEx(s).str());
  705. info.setDescription(cw->getDebugValue("description", s).str());
  706. if (version > 1.21)
  707. info.setXmlParams(cw->getXmlParams(s).str());
  708. info.setResultLimit(cw->getResultLimit());
  709. info.setArchived(false);
  710. info.setGraphCount(cw->getGraphCount());
  711. info.setSourceFileCount(cw->getSourceFileCount());
  712. info.setVariableCount(cw->getVariableCount());
  713. info.setTimerCount(cw->getTimerCount());
  714. info.setSourceFileCount(cw->getSourceFileCount());
  715. info.setApplicationValueCount(cw->getApplicationValueCount());
  716. info.setHasDebugValue(cw->hasDebugValue("__calculated__complexity__"));
  717. getClusterInfo(info, flags);
  718. getExceptions(info, flags);
  719. const char* msg = "This section cannot be dispayed due to an exception.";
  720. if (!getHelpers(info, flags))
  721. info.setHelpersDesc(msg);
  722. if (!getGraphInfo(info, flags))
  723. info.setGraphsDesc(msg);
  724. if (!getSourceFiles(info, flags))
  725. info.setSourceFilesDesc(msg);
  726. if (!getResults(info, flags))
  727. info.setResultsDesc(msg);
  728. if (!getVariables(info, flags))
  729. info.setVariablesDesc(msg);
  730. if (!getTimers(info, flags))
  731. info.setTimersDesc(msg);
  732. if (!getDebugValues(info, flags))
  733. info.setDebugValuesDesc(msg);
  734. if (!getApplicationValues(info, flags))
  735. info.setApplicationValuesDesc(msg);
  736. if (!getWorkflow(info, flags))
  737. info.setWorkflowsDesc(msg);
  738. }
  739. bool WsWuInfo::getClusterInfo(IEspECLWorkunit &info, unsigned flags)
  740. {
  741. if (version > 1.04)
  742. {
  743. StringArray allowedClusters;
  744. SCMStringBuffer val;
  745. cw->getAllowedClusters(val);
  746. if (val.length() > 0)
  747. {
  748. const char* ptr = val.str();
  749. while(*ptr != '\0')
  750. {
  751. StringBuffer onesub;
  752. while(*ptr != '\0' && *ptr != ',')
  753. {
  754. onesub.append((char)(*ptr));
  755. ptr++;
  756. }
  757. if(onesub.length() > 0)
  758. allowedClusters.append(onesub.str());
  759. if(*ptr != '\0')
  760. ptr++;
  761. }
  762. }
  763. if (allowedClusters.length() > 0)
  764. info.setAllowedClusters(allowedClusters);
  765. }
  766. if (version > 1.23 && clusterName.length())
  767. {
  768. int clusterTypeFlag = 0;
  769. StringBuffer clusterProcess = clusterName.str();
  770. Owned<IEnvironmentFactory> envFactory = getEnvironmentFactory();
  771. Owned<IConstEnvironment> constEnv = envFactory->openEnvironmentByFile();
  772. Owned<IPropertyTree> root = &constEnv->getPTree();
  773. if (!root)
  774. throw MakeStringException(ECLWATCH_CANNOT_CONNECT_DALI,"Cannot connect to DALI server.");
  775. VStringBuffer xpath("Software/Topology/Cluster[@name='%s']", clusterName.str());
  776. IPropertyTree *ptTopCluster = root->queryPropTree(xpath.str());
  777. if (ptTopCluster)
  778. {
  779. IPropertyTree *ptProcCluster = ptTopCluster->queryPropTree("ThorCluster");
  780. if (ptProcCluster)
  781. {
  782. clusterProcess.set(ptProcCluster->queryProp("@process"));
  783. clusterTypeFlag=1;
  784. }
  785. else
  786. {
  787. ptProcCluster = ptTopCluster->queryPropTree("RoxieCluster");
  788. if (ptProcCluster)
  789. {
  790. clusterProcess.set(ptProcCluster->queryProp("@process"));
  791. clusterTypeFlag=2;
  792. }
  793. }
  794. }
  795. if (clusterTypeFlag==0)
  796. {
  797. SCMStringBuffer val;
  798. cw->getDebugValue("targetclustertype", val);
  799. if (strieq(val.str(), "thor"))
  800. clusterTypeFlag=1;
  801. else if (strieq(val.str(), "roxie"))
  802. clusterTypeFlag = 2;
  803. }
  804. info.setClusterFlag(clusterTypeFlag);
  805. if (version > 1.29 && (clusterTypeFlag == 1))
  806. {
  807. VStringBuffer xpath("Software/ThorCluster[@name='%s']", clusterProcess.str());
  808. IPropertyTree *ptCluster = root->queryPropTree(xpath.str());
  809. if (ptCluster)
  810. info.setThorLCR(!ptCluster->getPropBool("@Legacy", false));
  811. }
  812. }
  813. return true;
  814. }
  815. bool WsWuInfo::getWorkflow(IEspECLWorkunit &info, unsigned flags)
  816. {
  817. bool success=true;
  818. bool eventCountRemaining = false;
  819. bool eventCountUnlimited = false;
  820. try
  821. {
  822. info.setEventSchedule(0);
  823. IArrayOf<IConstECLWorkflow> workflows;
  824. Owned<IConstWorkflowItemIterator> it = cw->getWorkflowItems();
  825. if (it)
  826. {
  827. ForEach(*it)
  828. {
  829. IConstWorkflowItem *r = it->query();
  830. if (r)
  831. {
  832. IWorkflowEvent *wfevent = r->getScheduleEvent();
  833. if (wfevent)
  834. {
  835. Owned<IEspECLWorkflow> g;
  836. if (flags & WUINFO_IncludeWorkflows)
  837. {
  838. StringBuffer id;
  839. g.setown(createECLWorkflow("",""));
  840. g->setWFID(id.appendf("%d", r->queryWfid()).str());
  841. g->setEventName(wfevent->queryName());
  842. g->setEventText(wfevent->queryText());
  843. }
  844. if (r->hasScheduleCount())
  845. {
  846. if (r->queryScheduleCountRemaining() > 0)
  847. eventCountRemaining = true;
  848. if (flags & WUINFO_IncludeWorkflows)
  849. {
  850. g->setCount(r->queryScheduleCount());
  851. g->setCountRemaining(r->queryScheduleCountRemaining());
  852. }
  853. }
  854. else
  855. {
  856. eventCountUnlimited = true;
  857. }
  858. if (flags & WUINFO_IncludeWorkflows)
  859. workflows.append(*g.getLink());
  860. }
  861. }
  862. }
  863. if (workflows.length() > 0)
  864. info.setWorkflows(workflows);
  865. workflows.kill();
  866. }
  867. }
  868. catch(IException* e)
  869. {
  870. success = false;
  871. StringBuffer eMsg;
  872. ERRLOG("%s", e->errorMessage(eMsg).str());
  873. e->Release();
  874. }
  875. if (info.getState() && !stricmp(info.getState(), "wait"))
  876. info.setEventSchedule(2); //Can deschedule
  877. else if (eventCountUnlimited || eventCountRemaining)
  878. info.setEventSchedule(1); //Can reschedule
  879. return success;
  880. }
  881. bool shouldFileContentBeShown(IEspContext &context, const char * logicalName)
  882. {
  883. StringBuffer username;
  884. context.getUserID(username);
  885. if(username.length() < 0)
  886. return true; //??TBD
  887. Owned<IUserDescriptor> userdesc(createUserDescriptor());
  888. userdesc->set(username.str(), context.queryPassword());
  889. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(logicalName, userdesc);
  890. if (!df)
  891. return false;
  892. bool blocked;
  893. if (df->isCompressed(&blocked) && !blocked)
  894. return false;
  895. IPropertyTree & properties = df->queryProperties();
  896. const char * format = properties.queryProp("@format");
  897. if (format && (stricmp(format,"csv")==0 || memicmp(format, "utf", 3) == 0))
  898. {
  899. return true;
  900. }
  901. const char * recordEcl = properties.queryProp("ECL");
  902. if (!recordEcl)
  903. return false;
  904. MultiErrorReceiver errs;
  905. Owned<IHqlExpression> ret = ::parseQuery(recordEcl, &errs);
  906. return errs.errCount() == 0;
  907. }
  908. void WsWuInfo::getEclSchemaChildFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
  909. {
  910. if(!expr)
  911. return;
  912. ForEachChild(idx, expr)
  913. getEclSchemaFields(schemas, expr->queryChild(idx), isConditional);
  914. }
  915. void WsWuInfo::getEclSchemaFields(IArrayOf<IEspECLSchemaItem>& schemas, IHqlExpression * expr, bool isConditional)
  916. {
  917. if(!expr)
  918. return;
  919. int ret = expr->getOperator();
  920. switch (ret)
  921. {
  922. case no_record:
  923. getEclSchemaChildFields(schemas, expr, isConditional);
  924. break;
  925. case no_ifblock:
  926. {
  927. getEclSchemaChildFields(schemas, expr->queryChild(1), true);
  928. break;
  929. }
  930. case no_field:
  931. {
  932. if (expr->hasProperty(__ifblockAtom))
  933. break;
  934. ITypeInfo * type = expr->queryType();
  935. IAtom * name = expr->queryName();
  936. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  937. StringBuffer outname;
  938. if (nameAttr && nameAttr->queryChild(0) && nameAttr->queryChild(0)->queryValue())
  939. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  940. else
  941. outname.append(name).toLowerCase();
  942. if(type)
  943. {
  944. type_t tc = type->getTypeCode();
  945. if (tc == type_row)
  946. {
  947. getEclSchemaChildFields(schemas, expr->queryRecord(), isConditional);
  948. }
  949. else
  950. {
  951. if (type->getTypeCode() == type_alien)
  952. {
  953. IHqlAlienTypeInfo * alien = queryAlienType(type);
  954. type = alien->queryPhysicalType();
  955. }
  956. Owned<IEspECLSchemaItem> schema = createECLSchemaItem("","");
  957. StringBuffer eclType;
  958. type->getECLType(eclType);
  959. schema->setColumnName(outname);
  960. schema->setColumnType(eclType.str());
  961. schema->setColumnTypeCode(tc);
  962. schema->setIsConditional(isConditional);
  963. schemas.append(*schema.getClear());
  964. }
  965. }
  966. break;
  967. }
  968. }
  969. }
  970. bool WsWuInfo::getResultEclSchemas(IConstWUResult &r, IArrayOf<IEspECLSchemaItem>& schemas)
  971. {
  972. SCMStringBuffer schema;
  973. r.getResultEclSchema(schema);
  974. if (!schema.length())
  975. return false;
  976. MultiErrorReceiver errs;
  977. Owned<IHqlExpression> expr = ::parseQuery(schema.str(), &errs);
  978. if (errs.errCount() != 0)
  979. return false;
  980. getEclSchemaFields(schemas, expr, false);
  981. return true;
  982. }
  983. void WsWuInfo::getResult(IConstWUResult &r, IArrayOf<IEspECLResult>& results, unsigned flags)
  984. {
  985. SCMStringBuffer name;
  986. r.getResultName(name);
  987. SCMStringBuffer filename;
  988. r.getResultLogicalName(filename);
  989. StringBuffer value, link;
  990. if (r.getResultStatus() == ResultStatusUndefined)
  991. value.set("[undefined]");
  992. else if (r.isResultScalar())
  993. {
  994. try
  995. {
  996. SCMStringBuffer xml;
  997. r.getResultXml(xml);
  998. Owned<IPropertyTree> props = createPTreeFromXMLString(xml.str(), ipt_caseInsensitive);
  999. IPropertyTree *val = props->queryPropTree("Row/*");
  1000. if(val)
  1001. value.set(val->queryProp(NULL));
  1002. else
  1003. {
  1004. StringBuffer user, password;
  1005. context.getUserID(user);
  1006. context.getPassword(password);
  1007. Owned<IResultSetFactory> resultSetFactory;
  1008. if (context.querySecManager())
  1009. resultSetFactory.setown(getSecResultSetFactory(*context.querySecManager(), *context.queryUser()));
  1010. else
  1011. resultSetFactory.setown(getResultSetFactory(user, password));
  1012. Owned<INewResultSet> result;
  1013. result.setown(resultSetFactory->createNewResultSet(&r, wuid.str()));
  1014. Owned<IResultSetCursor> cursor(result->createCursor());
  1015. cursor->first();
  1016. if (cursor->getIsAll(0))
  1017. {
  1018. value.set("<All/>");
  1019. }
  1020. else
  1021. {
  1022. Owned<IResultSetCursor> childCursor = cursor->getChildren(0);
  1023. if (childCursor)
  1024. {
  1025. ForEach(*childCursor)
  1026. {
  1027. StringBuffer out;
  1028. StringBufferAdaptor adaptor(out);
  1029. childCursor->getDisplayText(adaptor, 0);
  1030. if (!value.length())
  1031. value.append('[');
  1032. else
  1033. value.append(", ");
  1034. value.append('\'').append(out.str()).append('\'');
  1035. }
  1036. if (value.length())
  1037. value.append(']');
  1038. }
  1039. }
  1040. }
  1041. }
  1042. catch(...)
  1043. {
  1044. value.append("[value not available]");
  1045. }
  1046. }
  1047. else
  1048. {
  1049. value.append('[').append(r.getResultTotalRowCount()).append(" rows]");
  1050. if(r.getResultSequence()>=0)
  1051. {
  1052. if(filename.length())
  1053. {
  1054. StringBuffer username;
  1055. context.getUserID(username);
  1056. Owned<IUserDescriptor> userdesc(createUserDescriptor());
  1057. userdesc->set(username.str(), context.queryPassword());
  1058. Owned<IDistributedFile> df = queryDistributedFileDirectory().lookup(filename.str(), userdesc);
  1059. if(df && df->queryProperties().hasProp("ECL"))
  1060. link.append(r.getResultSequence());
  1061. }
  1062. else
  1063. link.append(r.getResultSequence());
  1064. }
  1065. }
  1066. Owned<IEspECLResult> result= createECLResult("","");
  1067. if (flags & WUINFO_IncludeEclSchemas)
  1068. {
  1069. IArrayOf<IEspECLSchemaItem> schemas;
  1070. if (getResultEclSchemas(r, schemas))
  1071. result->setECLSchemas(schemas);
  1072. }
  1073. if (filename.length())
  1074. result->setShowFileContent(shouldFileContentBeShown(context, filename.str()));
  1075. result->setName(name.str());
  1076. result->setLink(link.str());
  1077. result->setSequence(r.getResultSequence());
  1078. result->setValue(value.str());
  1079. result->setFileName(filename.str());
  1080. result->setIsSupplied(r.getResultStatus() == ResultStatusSupplied);
  1081. result->setTotal(r.getResultTotalRowCount());
  1082. results.append(*result.getLink());
  1083. }
  1084. bool WsWuInfo::getResults(IEspECLWorkunit &info, unsigned flags)
  1085. {
  1086. try
  1087. {
  1088. unsigned count = 0;
  1089. IArrayOf<IEspECLResult> results;
  1090. Owned<IConstWUResultIterator> it = &(cw->getResults());
  1091. ForEach(*it)
  1092. {
  1093. IConstWUResult &r = it->query();
  1094. if(r.getResultSequence()>=0)
  1095. {
  1096. if (flags & WUINFO_IncludeResults)
  1097. getResult(r, results, flags);
  1098. count++;
  1099. }
  1100. }
  1101. if (version >= 1.17)
  1102. info.setResultCount(count);
  1103. if ((flags & WUINFO_IncludeResults) && results.length() > 0)
  1104. info.setResults(results);
  1105. results.kill();
  1106. return true;
  1107. }
  1108. catch(IException* e)
  1109. {
  1110. StringBuffer eMsg;
  1111. ERRLOG("%s", e->errorMessage(eMsg).str());
  1112. e->Release();
  1113. }
  1114. return false;
  1115. }
  1116. void WsWuInfo::getHelpFiles(IConstWUQuery* query, WUFileType type, IArrayOf<IEspECLHelpFile>& helpers)
  1117. {
  1118. if (!query)
  1119. return;
  1120. SCMStringBuffer name, Ip, description;
  1121. Owned<IConstWUAssociatedFileIterator> iter = &query->getAssociatedFiles();
  1122. ForEach(*iter)
  1123. {
  1124. IConstWUAssociatedFile & cur = iter->query();
  1125. if (cur.getType() != type)
  1126. continue;
  1127. cur.getName(name);
  1128. Owned<IEspECLHelpFile> h= createECLHelpFile("","");
  1129. h->setName(name.str());
  1130. switch (type)
  1131. {
  1132. case FileTypeCpp:
  1133. h->setType("cpp");
  1134. break;
  1135. case FileTypeDll:
  1136. h->setType("dll");
  1137. break;
  1138. default:
  1139. h->setType("res");
  1140. break;
  1141. }
  1142. if (version > 1.31)
  1143. {
  1144. cur.getIp(Ip);
  1145. h->setIPAddress(Ip.str());
  1146. Ip.clear();
  1147. cur.getDescription(description);
  1148. if ((description.length() < 1) && (name.length() > 0))
  1149. {
  1150. const char* desc = pathTail(name.str());
  1151. if (desc && *desc)
  1152. description.set(desc);
  1153. }
  1154. if (description.length() < 1)
  1155. description.set("Help File");
  1156. h->setDescription(description.str());
  1157. description.clear();
  1158. }
  1159. helpers.append(*h.getLink());
  1160. name.clear();
  1161. }
  1162. }
  1163. void WsWuInfo::getSubFiles(IPropertyTreeIterator* f, IEspECLSourceFile* eclSuperFile, StringArray& fileNames)
  1164. {
  1165. IArrayOf<IEspECLSourceFile> files;
  1166. ForEach(*f)
  1167. {
  1168. IPropertyTree &query = f->query();
  1169. const char *clusterName = query.queryProp("@cluster");
  1170. const char *fileName = query.queryProp("@name");
  1171. int fileCount = query.getPropInt("@useCount");
  1172. bool bFound = false;
  1173. if (fileName && *fileName && (fileNames.length() > 0))
  1174. {
  1175. for (unsigned i = 0; i < fileNames.length(); i++ )
  1176. {
  1177. const char *fileName0 = fileNames.item(i);
  1178. if (!stricmp(fileName, fileName0))
  1179. {
  1180. bFound = true;
  1181. break;
  1182. }
  1183. }
  1184. }
  1185. if (bFound)
  1186. continue;
  1187. Owned<IEspECLSourceFile> file= createECLSourceFile("","");
  1188. if(clusterName && *clusterName)
  1189. {
  1190. file->setFileCluster(clusterName);
  1191. }
  1192. if (fileName && *fileName)
  1193. {
  1194. file->setName(fileName);
  1195. fileNames.append(fileName);
  1196. }
  1197. file->setCount(fileCount);
  1198. Owned<IPropertyTreeIterator> filetrees= query.getElements("Subfile");
  1199. if (filetrees->first())
  1200. {
  1201. file->setIsSuperFile(true);
  1202. getSubFiles(filetrees, file, fileNames);
  1203. }
  1204. files.append(*file.getLink());
  1205. }
  1206. eclSuperFile->setECLSourceFiles(files);
  1207. return;
  1208. }
  1209. bool WsWuInfo::getResultViews(StringArray &viewnames, unsigned flags)
  1210. {
  1211. if (!(flags & WUINFO_IncludeResultsViewNames))
  1212. return true;
  1213. try
  1214. {
  1215. Owned<IWuWebView> wv = createWuWebView(*cw, NULL, NULL, false);
  1216. if (wv)
  1217. wv->getResultViewNames(viewnames);
  1218. return true;
  1219. }
  1220. catch(IException* e)
  1221. {
  1222. StringBuffer eMsg;
  1223. ERRLOG("%s", e->errorMessage(eMsg).str());
  1224. e->Release();
  1225. }
  1226. return false;
  1227. }
  1228. void appendIOStreamContent(MemoryBuffer &mb, IFileIOStream *ios)
  1229. {
  1230. StringBuffer line;
  1231. bool eof = false;
  1232. while (!eof)
  1233. {
  1234. line.clear();
  1235. loop
  1236. {
  1237. char c;
  1238. size32_t numRead = ios->read(1, &c);
  1239. if (!numRead)
  1240. {
  1241. eof = true;
  1242. break;
  1243. }
  1244. line.append(c);
  1245. if (c=='\n')
  1246. break;
  1247. }
  1248. mb.append(line.length(), line.str());
  1249. if (mb.length() > 640000)
  1250. break;
  1251. }
  1252. }
  1253. void WsWuInfo::getWorkunitEclAgentLog(MemoryBuffer& buf)
  1254. {
  1255. SCMStringBuffer logname;
  1256. cw->getDebugValue("EclAgentLog", logname);
  1257. unsigned pid = cw->getAgentPID();
  1258. if(logname.length() == 0)
  1259. throw MakeStringException(ECLWATCH_ECLAGENT_LOG_NOT_FOUND,"EclAgent log file not available for workunit %s.", wuid.str());
  1260. Owned<IFile> rFile = createIFile(logname.str());
  1261. if(!rFile)
  1262. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open file %s.", logname.str());
  1263. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1264. if(!rIO)
  1265. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE, "Cannot read file %s.", logname.str());
  1266. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1267. StringBuffer line;
  1268. bool eof = false;
  1269. bool wuidFound = false;
  1270. VStringBuffer pidstr(" %5d ", pid);
  1271. char const * pidchars = pidstr.str();
  1272. while(!eof)
  1273. {
  1274. line.clear();
  1275. loop
  1276. {
  1277. char c;
  1278. size32_t numRead = ios->read(1, &c);
  1279. if (!numRead)
  1280. {
  1281. eof = true;
  1282. break;
  1283. }
  1284. line.append(c);
  1285. if (c=='\n')
  1286. break;
  1287. }
  1288. //Retain all rows that match a unique program instance - by retaining all rows that match a pid
  1289. if(strstr(line.str(), pidchars))
  1290. {
  1291. //Check if this is a new instance using line sequence number
  1292. if (strncmp(line.str(), "00000000", 8) == 0)
  1293. {
  1294. if (wuidFound) //If the correct instance has been found, return that instance before the next instance.
  1295. break;
  1296. //The last instance is not a correct instance. Clean the buf in order to start a new instance.
  1297. buf.clear();
  1298. }
  1299. //If we spot the workunit id anywhere in the tacing for this pid then assume it is the correct instance.
  1300. if(!wuidFound && strstr(line.str(), wuid.str()))
  1301. wuidFound = true;
  1302. buf.append(line.length(), line.str());
  1303. }
  1304. }
  1305. }
  1306. void WsWuInfo::getWorkunitThorLog(MemoryBuffer& buf)
  1307. {
  1308. SCMStringBuffer logname;
  1309. cw->getDebugValue(File_ThorLog, logname);
  1310. Owned<IFile> rFile = createIFile(logname.str());
  1311. if (!rFile)
  1312. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE,"Cannot open file %s.",logname.str());
  1313. OwnedIFileIO rIO = rFile->openShared(IFOread,IFSHfull);
  1314. if (!rIO)
  1315. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",logname.str());
  1316. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1317. StringBuffer line;
  1318. bool eof = false;
  1319. bool include = false;
  1320. VStringBuffer startwuid("Started wuid=%s", wuid.str());
  1321. VStringBuffer endwuid("Finished wuid=%s", wuid.str());
  1322. const char *sw = startwuid.str();
  1323. const char *ew = endwuid.str();
  1324. while (!eof)
  1325. {
  1326. line.clear();
  1327. loop
  1328. {
  1329. char c;
  1330. size32_t numRead = ios->read(1, &c);
  1331. if (!numRead)
  1332. {
  1333. eof = true;
  1334. break;
  1335. }
  1336. line.append(c);
  1337. if (c=='\n')
  1338. break;
  1339. }
  1340. if (strstr(line.str(), sw))
  1341. include = true;
  1342. if (include)
  1343. buf.append(line.length(), line.str());
  1344. if (strstr(line.str(), ew))
  1345. include = false;
  1346. }
  1347. }
  1348. void WsWuInfo::getWorkunitThorSlaveLog(const char *slaveip, MemoryBuffer& buf)
  1349. {
  1350. if (isEmpty(slaveip))
  1351. throw MakeStringException(ECLWATCH_INVALID_INPUT,"ThorSlave IP not specified.");
  1352. SCMStringBuffer logname;
  1353. cw->getDebugValue(File_ThorLog, logname);
  1354. StringBuffer logdir;
  1355. splitDirTail(logname.str(),logdir);
  1356. RemoteFilename rfn;
  1357. rfn.setRemotePath(logdir.str());
  1358. SocketEndpoint ep(slaveip);
  1359. rfn.setIp(ep);
  1360. Owned<IFile> dir = createIFile(rfn);
  1361. Owned<IDirectoryIterator> diriter = dir->directoryFiles("*.log");
  1362. if (!diriter->first())
  1363. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find Thor slave log file %s.", logdir.str());
  1364. Linked<IFile> logfile = &diriter->query();
  1365. diriter.clear();
  1366. dir.clear();
  1367. // logfile is now the file to load
  1368. OwnedIFileIO rIO = logfile->openShared(IFOread,IFSHfull);
  1369. if (!rIO)
  1370. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read file %s.",logdir.str());
  1371. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1372. appendIOStreamContent(buf, ios.get());
  1373. }
  1374. void WsWuInfo::getWorkunitResTxt(MemoryBuffer& buf)
  1375. {
  1376. Owned<IConstWUQuery> query = cw->getQuery();
  1377. if(!query)
  1378. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1379. SCMStringBuffer resname;
  1380. queryDllServer().getDll(query->getQueryResTxtName(resname).str(), buf);
  1381. }
  1382. void WsWuInfo::getWorkunitArchiveQuery(MemoryBuffer& buf)
  1383. {
  1384. Owned<IConstWUQuery> query = cw->getQuery();
  1385. if(!query)
  1386. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1387. SCMStringBuffer queryText;
  1388. query->getQueryText(queryText);
  1389. if ((queryText.length() < 1) || !isArchiveQuery(queryText.str()))
  1390. throw MakeStringException(ECLWATCH_CANNOT_GET_WORKUNIT, "Archive Query not found for workunit %s.", wuid.str());
  1391. buf.append(queryText.length(), queryText.str());
  1392. }
  1393. void WsWuInfo::getWorkunitDll(MemoryBuffer& buf)
  1394. {
  1395. Owned<IConstWUQuery> query = cw->getQuery();
  1396. if(!query)
  1397. throw MakeStringException(ECLWATCH_QUERY_NOT_FOUND_FOR_WU,"No query for workunit %s.",wuid.str());
  1398. SCMStringBuffer dllname;
  1399. query->getQueryDllName(dllname);
  1400. queryDllServer().getDll(dllname.str(), buf);
  1401. }
  1402. void WsWuInfo::getWorkunitXml(const char* plainText, MemoryBuffer& buf)
  1403. {
  1404. const char* header;
  1405. if (plainText && (!stricmp(plainText, "yes")))
  1406. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
  1407. else
  1408. header = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><?xml-stylesheet href=\"../esp/xslt/xmlformatter.xsl\" type=\"text/xsl\"?>";
  1409. SCMStringBuffer xml;
  1410. exportWorkUnitToXML(cw, xml);
  1411. buf.append(strlen(header), header);
  1412. buf.append(xml.length(), xml.str());
  1413. }
  1414. void WsWuInfo::getWorkunitCpp(const char *cppname, const char* description, const char* ipAddress, MemoryBuffer& buf)
  1415. {
  1416. if (isEmpty(description))
  1417. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File not specified.");
  1418. if (isEmpty(ipAddress))
  1419. throw MakeStringException(ECLWATCH_INVALID_INPUT, "File location not specified.");
  1420. if (isEmpty(cppname))
  1421. throw MakeStringException(ECLWATCH_INVALID_FILE_NAME, "File path not specified.");
  1422. RemoteFilename rfn;
  1423. rfn.setRemotePath(cppname);
  1424. SocketEndpoint ep(ipAddress);
  1425. rfn.setIp(ep);
  1426. Owned<IFile> cppfile = createIFile(rfn);
  1427. if (!cppfile)
  1428. throw MakeStringException(ECLWATCH_CANNOT_OPEN_FILE, "Cannot open %s.", description);
  1429. OwnedIFileIO rIO = cppfile->openShared(IFOread,IFSHfull);
  1430. if (!rIO)
  1431. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1432. OwnedIFileIOStream ios = createBufferedIOStream(rIO);
  1433. if (!ios)
  1434. throw MakeStringException(ECLWATCH_CANNOT_READ_FILE,"Cannot read %s.", description);
  1435. appendIOStreamContent(buf, ios.get());
  1436. }
  1437. WsWuSearch::WsWuSearch(IEspContext& context,const char* owner,const char* state,const char* cluster,const char* startDate,const char* endDate,const char* ecl,const char* jobname,const char* appname,const char* appkey,const char* appvalue)
  1438. {
  1439. SecAccessFlags accessOwn;
  1440. SecAccessFlags accessOthers;
  1441. getUserWuAccessFlags(context, accessOwn, accessOthers, true);
  1442. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1443. StringBuffer xpath("*");
  1444. if(ecl && *ecl)
  1445. xpath.append("[Query/Text=?~\"*").append(ecl).append("*\"]");
  1446. if(state && *state)
  1447. xpath.append("[@state=\"").append(state).append("\"]");
  1448. if(cluster && *cluster)
  1449. xpath.append("[@clusterName=\"").append(cluster).append("\"]");
  1450. if(owner && *owner)
  1451. xpath.append("[@submitID=?~\"").append(owner).append("\"]");
  1452. if(jobname && *jobname)
  1453. xpath.append("[@jobName=?~\"*").append(jobname).append("*\"]");
  1454. if(appname && *appname || appkey && *appkey || appvalue && *appvalue)
  1455. {
  1456. xpath.append("[Application/").append(appname && *appname ? appname : "*");
  1457. xpath.append("/").append(appkey && *appkey ? appkey : "*");
  1458. if(appvalue && *appvalue)
  1459. xpath.append("=?~\"").append(appvalue).append("\"");
  1460. xpath.append("]");
  1461. }
  1462. Owned<IConstWorkUnitIterator> it(factory->getWorkUnitsByXPath(xpath.str()));
  1463. StringBuffer wuFrom, wuTo;
  1464. if(startDate && *startDate)
  1465. createWuidFromDate(startDate, wuFrom);
  1466. if(endDate && *endDate)
  1467. createWuidFromDate(endDate, wuTo);
  1468. ForEach(*it)
  1469. {
  1470. IConstWorkUnit &cw = it->query();
  1471. if (chooseWuAccessFlagsByOwnership(context.queryUserId(), cw, accessOwn, accessOthers) < SecAccess_Read)
  1472. continue;
  1473. SCMStringBuffer wuid;
  1474. cw.getWuid(wuid);
  1475. if (wuFrom.length() && strcmp(wuid.str(),wuFrom.str())<0)
  1476. continue;
  1477. if (wuTo.length() && strcmp(wuid.str(),wuTo.str())>0)
  1478. continue;
  1479. if (state && *state)
  1480. {
  1481. SCMStringBuffer descr;
  1482. if(!strieq(cw.getStateDesc(descr).str(),state))
  1483. continue;
  1484. }
  1485. SCMStringBuffer parent;
  1486. if (!cw.getParentWuid(parent).length())
  1487. {
  1488. parent.clear();
  1489. wuids.push_back(cw.getWuid(parent).str());
  1490. }
  1491. }
  1492. std::sort(wuids.begin(),wuids.end(),std::greater<std::string>());
  1493. }
  1494. StringBuffer& WsWuSearch::createWuidFromDate(const char* timestamp,StringBuffer& s)
  1495. {
  1496. CDateTime wuTime;
  1497. wuTime.setString(timestamp,NULL,true);
  1498. unsigned year, month, day, hour, minute, second, nano;
  1499. wuTime.getDate(year, month, day, true);
  1500. wuTime.getTime(hour, minute, second, nano, true);
  1501. s.appendf("W%4d%02d%02d-%02d%02d%02d",year,month,day,hour,minute,second);
  1502. return s;
  1503. }
  1504. struct CompareData
  1505. {
  1506. CompareData(const char* _filter): filter(_filter) {}
  1507. bool operator()(const StlLinked<DataCacheElement>& e) const
  1508. {
  1509. return stricmp(e->m_filter.c_str(),filter)==0;
  1510. }
  1511. const char* filter;
  1512. };
  1513. DataCacheElement* DataCache::lookup(IEspContext &context, const char* filter, unsigned timeOutMin)
  1514. {
  1515. CriticalBlock block(crit);
  1516. if (cache.size() < 1)
  1517. return NULL;
  1518. //erase data if it should be
  1519. CDateTime timeNow;
  1520. int timeout = timeOutMin;
  1521. timeNow.setNow();
  1522. timeNow.adjustTime(-timeout);
  1523. while (true)
  1524. {
  1525. std::list<StlLinked<DataCacheElement> >::iterator list_iter = cache.begin();
  1526. if (list_iter == cache.end())
  1527. break;
  1528. DataCacheElement* awu = list_iter->getLink();
  1529. if (!awu || (awu->m_timeCached > timeNow))
  1530. break;
  1531. cache.pop_front();
  1532. }
  1533. if (cache.size() < 1)
  1534. return NULL;
  1535. //Check whether we have the data cache for this cluster. If yes, get the version
  1536. std::list<StlLinked<DataCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareData(filter));
  1537. if(it!=cache.end())
  1538. {
  1539. return it->getLink();
  1540. }
  1541. return NULL;
  1542. }
  1543. void DataCache::add(const char* filter, const char* data, const char* name, const char* localName, const char* wuid,
  1544. const char* resultName, unsigned seq, __int64 start, unsigned count, __int64 requested, __int64 total)
  1545. {
  1546. CriticalBlock block(crit);
  1547. //Save new data
  1548. Owned<DataCacheElement> e=new DataCacheElement(filter, data, name, localName, wuid, resultName, seq, start, count, requested, total);
  1549. if (cacheSize > 0)
  1550. {
  1551. if (cache.size() >= cacheSize)
  1552. cache.pop_front();
  1553. cache.push_back(e.get());
  1554. }
  1555. return;
  1556. }
  1557. struct CompareArchivedWUs
  1558. {
  1559. CompareArchivedWUs(const char* _filter): filter(_filter) {}
  1560. bool operator()(const StlLinked<ArchivedWuCacheElement>& e) const
  1561. {
  1562. return stricmp(e->m_filter.c_str(),filter)==0;
  1563. }
  1564. const char* filter;
  1565. };
  1566. ArchivedWuCacheElement* ArchivedWuCache::lookup(IEspContext &context, const char* filter, const char* sashaUpdatedWhen, unsigned timeOutMin)
  1567. {
  1568. CriticalBlock block(crit);
  1569. if (cache.size() < 1)
  1570. return NULL;
  1571. //erase data if it should be
  1572. CDateTime timeNow;
  1573. int timeout = timeOutMin;
  1574. timeNow.setNow();
  1575. timeNow.adjustTime(-timeout);
  1576. while (true)
  1577. {
  1578. std::list<StlLinked<ArchivedWuCacheElement> >::iterator list_iter = cache.begin();
  1579. if (list_iter == cache.end())
  1580. break;
  1581. ArchivedWuCacheElement* awu = list_iter->getLink();
  1582. if (awu && !stricmp(sashaUpdatedWhen, awu->m_sashaUpdatedWhen.c_str()) && (awu->m_timeCached > timeNow))
  1583. break;
  1584. cache.pop_front();
  1585. }
  1586. if (cache.size() < 1)
  1587. return NULL;
  1588. //Check whether we have the data cache for this cluster. If yes, get the version
  1589. std::list<StlLinked<ArchivedWuCacheElement> >::iterator it = std::find_if(cache.begin(),cache.end(),CompareArchivedWUs(filter));
  1590. if(it!=cache.end())
  1591. return it->getLink();
  1592. return NULL;
  1593. }
  1594. void ArchivedWuCache::add(const char* filter, const char* sashaUpdatedWhen, bool hasNextPage, IArrayOf<IEspECLWorkunit>& wus)
  1595. {
  1596. CriticalBlock block(crit);
  1597. //Save new data
  1598. Owned<ArchivedWuCacheElement> e=new ArchivedWuCacheElement(filter, sashaUpdatedWhen, hasNextPage, /*data.str(),*/ wus);
  1599. if (cacheSize > 0)
  1600. {
  1601. if (cache.size() >= cacheSize)
  1602. cache.pop_front();
  1603. cache.push_back(e.get());
  1604. }
  1605. return;
  1606. }
  1607. WsWuJobQueueAuditInfo::WsWuJobQueueAuditInfo(IEspContext &context, const char *cluster, const char *from , const char *to, CHttpResponse* response, const char *xls)
  1608. {
  1609. if(!response)
  1610. return;
  1611. unsigned maxDisplay = 125;
  1612. IArrayOf<IEspThorQueue> items;
  1613. CDateTime fromTime;
  1614. CDateTime toTime;
  1615. StringBuffer fromstr;
  1616. StringBuffer tostr;
  1617. if(from && *from)
  1618. {
  1619. fromTime.setString(from,NULL,false);
  1620. fromTime.getString(fromstr, false);
  1621. }
  1622. if(to && *to)
  1623. {
  1624. toTime.setString(to,NULL,false);
  1625. toTime.getString(tostr, false);
  1626. }
  1627. StringBuffer filter("ThorQueueMonitor");
  1628. if(notEmpty(cluster))
  1629. {
  1630. Owned<IEnvironmentFactory> factory = getEnvironmentFactory();
  1631. Owned<IConstEnvironment> environment = factory->openEnvironmentByFile();
  1632. Owned<IPropertyTree> root = &environment->getPTree();
  1633. if (!root)
  1634. throw MakeStringException(ECLWATCH_CANNOT_GET_ENV_INFO, "Failed to get environment information.");
  1635. VStringBuffer xpath("Software/ThorCluster[@name='%s']/@queueName", cluster);
  1636. const char* queuename = root->queryProp(xpath.str());
  1637. filter.appendf(",%s", notEmpty(queuename) ? queuename : cluster);
  1638. }
  1639. StringAttrArray lines;
  1640. queryAuditLogs(fromTime, toTime, filter.str(), lines);
  1641. unsigned countLines = 0;
  1642. unsigned maxConnected = 0;
  1643. unsigned longestQueue = 0;
  1644. ForEachItemIn(idx, lines)
  1645. {
  1646. const char* line = lines.item(idx).text;
  1647. if(!line || !*line)
  1648. continue;
  1649. if (idx < (lines.length() - 1))
  1650. getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 1, items);
  1651. else
  1652. getAuditLineInfo(line, longestQueue, maxConnected, maxDisplay, 2, items);
  1653. countLines++;
  1654. }
  1655. StringBuffer responsebuf;
  1656. if (items.length() < 1)
  1657. {
  1658. responsebuf.append("<script language=\"javascript\">\r\nparent.displayQEnd(\'No data found\')</script>\r\n");
  1659. response->sendChunk(responsebuf.str());
  1660. return;
  1661. }
  1662. unsigned itemCount = items.length();
  1663. if (itemCount > maxDisplay)
  1664. itemCount = maxDisplay;
  1665. responsebuf.append("<script language=\"javascript\">parent.displayQLegend()</script>\r\n");
  1666. response->sendChunk(responsebuf.str());
  1667. responsebuf.clear();
  1668. responsebuf.append("<script language=\"javascript\">parent.displayQBegin(").append(longestQueue).append(",").append(maxConnected).append(",").append(itemCount).append(")</script>\r\n");
  1669. response->sendChunk(responsebuf.str());
  1670. responsebuf.clear();
  1671. responsebuf.append("<script language=\"javascript\">\r\n");
  1672. //bool displayDT = false;
  1673. unsigned count = 0;
  1674. unsigned jobpending=0;
  1675. ForEachItemIn(i,items)
  1676. {
  1677. IEspThorQueue& tq = items.item(i);
  1678. //displayDT = !displayDT;
  1679. count++;
  1680. if (count > maxDisplay)
  1681. break;
  1682. StringBuffer countStr, dtStr;
  1683. countStr.appendulong(count);
  1684. //if (displayDT)
  1685. dtStr = tq.getDT();
  1686. responsebuf.append("parent.displayQueue(\'").append(count).append("\',\'").append(dtStr.str()).append("\',\'").append(tq.getRunningWUs()).append("\',");
  1687. responsebuf.append("\'").append(tq.getQueuedWUs()).append("\',\'").append(tq.getWaitingThors()).append("\',");
  1688. responsebuf.append("\'").append(tq.getConnectedThors()).append("\',\'").append(tq.getIdledThors()).append("\',");
  1689. responsebuf.append("\'").append(tq.getRunningWU1()).append("\',\'").append(tq.getRunningWU2()).append("\')\r\n");
  1690. if(++jobpending>=50)
  1691. {
  1692. responsebuf.append("</script>\r\n");
  1693. response->sendChunk(responsebuf.str());
  1694. responsebuf.clear();
  1695. responsebuf.append("<script language=\"javascript\">\r\n");
  1696. jobpending=0;
  1697. }
  1698. }
  1699. StringBuffer countStr;
  1700. countStr.appendulong(count);
  1701. StringBuffer msg("<table><tr><td>");
  1702. msg.append("Total Records in the Time Period: ").append(items.length()).append(" (<a href=\"/WsWorkunits/WUClusterJobQueueLOG?").append(xls).append("\">txt</a>...<a href=\"/WsWorkunits/WUClusterJobQueueXLS?").append(xls).append("\">xls</a>).");
  1703. msg.append("</td></tr><tr><td>");
  1704. if (count > maxDisplay)
  1705. msg.append("Displayed: First ").append(maxDisplay).append(". ");
  1706. msg.append("Max. Queue Length: ").append(longestQueue).append(".");
  1707. msg.append("</td></tr></table>");
  1708. responsebuf.append("parent.displayQEnd(\'").append(msg).append("\')</script>\r\n");
  1709. response->sendChunk(responsebuf.str());
  1710. }
  1711. void WsWuJobQueueAuditInfo::getAuditLineInfo(const char* line, unsigned& longestQueue, unsigned& maxConnected, unsigned maxDisplay, unsigned showAll, IArrayOf<IEspThorQueue>& items)
  1712. {
  1713. //2009-08-12 02:44:12 ,ThorQueueMonitor,thor400_88_dev,0,0,1,1,114,---,---
  1714. if(!line || !*line)
  1715. return;
  1716. Owned<IEspThorQueue> tq = createThorQueue();
  1717. StringBuffer dt, runningWUs, queuedWUs, waitingThors, connectedThors, idledThors, runningWU1, runningWU2;
  1718. // date/time
  1719. const char* bptr = line;
  1720. const char* eptr = strchr(bptr, ',');
  1721. if(eptr)
  1722. dt.append(eptr - bptr, bptr);
  1723. else
  1724. dt.append(bptr);
  1725. tq->setDT(dt.str());
  1726. if(!eptr)
  1727. {
  1728. if (checkNewThorQueueItem(tq, showAll, items))
  1729. items.append(*tq.getClear());
  1730. return;
  1731. }
  1732. //skip title
  1733. bptr = eptr + 1;
  1734. eptr = strchr(bptr, ',');
  1735. if(!eptr)
  1736. {
  1737. if (checkNewThorQueueItem(tq, showAll, items))
  1738. items.append(*tq.getClear());
  1739. return;
  1740. }
  1741. //skip queue name
  1742. bptr = eptr + 1;
  1743. eptr = strchr(bptr, ',');
  1744. if(!eptr)
  1745. {
  1746. if (checkNewThorQueueItem(tq, showAll, items))
  1747. items.append(*tq.getClear());
  1748. return;
  1749. }
  1750. //running
  1751. bptr = eptr + 1;
  1752. eptr = strchr(bptr, ',');
  1753. if(eptr)
  1754. runningWUs.append(eptr - bptr, bptr);
  1755. else
  1756. runningWUs.append(bptr);
  1757. tq->setRunningWUs(runningWUs.str());
  1758. if(!eptr)
  1759. {
  1760. if (checkNewThorQueueItem(tq, showAll, items))
  1761. items.append(*tq.getClear());
  1762. return;
  1763. }
  1764. //queued
  1765. bptr = eptr + 1;
  1766. eptr = strchr(bptr, ',');
  1767. if(eptr)
  1768. queuedWUs.append(eptr - bptr, bptr);
  1769. else
  1770. queuedWUs.append(bptr);
  1771. if (maxDisplay > items.length())
  1772. {
  1773. unsigned queueLen = atoi(queuedWUs.str());
  1774. if (queueLen > longestQueue)
  1775. longestQueue = queueLen;
  1776. }
  1777. tq->setQueuedWUs(queuedWUs.str());
  1778. if(!eptr)
  1779. {
  1780. if (checkNewThorQueueItem(tq, showAll, items))
  1781. items.append(*tq.getClear());
  1782. return;
  1783. }
  1784. //waiting
  1785. bptr = eptr + 1;
  1786. eptr = strchr(bptr, ',');
  1787. if(eptr)
  1788. waitingThors.append(eptr - bptr, bptr);
  1789. else
  1790. waitingThors.append(bptr);
  1791. tq->setWaitingThors(waitingThors.str());
  1792. if(!eptr)
  1793. {
  1794. if (checkNewThorQueueItem(tq, showAll, items))
  1795. items.append(*tq.getClear());
  1796. return;
  1797. }
  1798. //connected
  1799. bptr = eptr + 1;
  1800. eptr = strchr(bptr, ',');
  1801. if(eptr)
  1802. connectedThors.append(eptr - bptr, bptr);
  1803. else
  1804. connectedThors.append(bptr);
  1805. if (maxDisplay > items.length())
  1806. {
  1807. unsigned connnectedLen = atoi(connectedThors.str());
  1808. if (connnectedLen > maxConnected)
  1809. maxConnected = connnectedLen;
  1810. }
  1811. tq->setConnectedThors(connectedThors.str());
  1812. if(!eptr)
  1813. {
  1814. if (checkNewThorQueueItem(tq, showAll, items))
  1815. items.append(*tq.getClear());
  1816. return;
  1817. }
  1818. //idled
  1819. bptr = eptr + 1;
  1820. eptr = strchr(bptr, ',');
  1821. if(eptr)
  1822. idledThors.append(eptr - bptr, bptr);
  1823. else
  1824. idledThors.append(bptr);
  1825. tq->setIdledThors(idledThors.str());
  1826. if(!eptr)
  1827. {
  1828. items.append(*tq.getClear());
  1829. return;
  1830. }
  1831. //runningWU1
  1832. bptr = eptr + 1;
  1833. eptr = strchr(bptr, ',');
  1834. if(eptr)
  1835. runningWU1.append(eptr - bptr, bptr);
  1836. else
  1837. {
  1838. runningWU1.append(bptr);
  1839. }
  1840. if (!strcmp(runningWU1.str(), "---"))
  1841. runningWU1.clear();
  1842. if (runningWU1.length() > 0)
  1843. tq->setRunningWU1(runningWU1.str());
  1844. if(!eptr)
  1845. {
  1846. if (checkNewThorQueueItem(tq, showAll, items))
  1847. items.append(*tq.getClear());
  1848. return;
  1849. }
  1850. //runningWU2
  1851. bptr = eptr + 1;
  1852. eptr = strchr(bptr, ',');
  1853. if(eptr)
  1854. runningWU2.append(eptr - bptr, bptr);
  1855. else
  1856. {
  1857. runningWU2.append(bptr);
  1858. }
  1859. if (!strcmp(runningWU2.str(), "---"))
  1860. runningWU2.clear();
  1861. if (runningWU2.length() > 0)
  1862. tq->setRunningWU2(runningWU2.str());
  1863. if (checkNewThorQueueItem(tq, showAll, items))
  1864. items.append(*tq.getClear());
  1865. DBGLOG("Queue log: [%s]", line);
  1866. }
  1867. bool WsWuJobQueueAuditInfo::checkSameStrings(const char* s1, const char* s2)
  1868. {
  1869. if (s1)
  1870. {
  1871. if (!s2)
  1872. return false;
  1873. if (strcmp(s1, s2))
  1874. return false;
  1875. }
  1876. else if (s2)
  1877. {
  1878. if (!s1)
  1879. return false;
  1880. }
  1881. return true;
  1882. }
  1883. bool WsWuJobQueueAuditInfo::checkNewThorQueueItem(IEspThorQueue* tq, unsigned showAll, IArrayOf<IEspThorQueue>& items)
  1884. {
  1885. bool bAdd = false;
  1886. if (showAll < 1) //show every lines
  1887. bAdd = true;
  1888. else if (items.length() < 1)
  1889. bAdd = true;
  1890. else if (showAll > 1) //last line now
  1891. {
  1892. IEspThorQueue& tq0 = items.item(items.length()-1);
  1893. if (!checkSameStrings(tq->getDT(), tq0.getDT()))
  1894. bAdd = true;
  1895. }
  1896. else
  1897. {
  1898. IEspThorQueue& tq0 = items.item(items.length()-1);
  1899. if (!checkSameStrings(tq->getRunningWUs(), tq0.getRunningWUs()))
  1900. bAdd = true;
  1901. if (!checkSameStrings(tq->getQueuedWUs(), tq0.getQueuedWUs()))
  1902. bAdd = true;
  1903. if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
  1904. bAdd = true;
  1905. if (!checkSameStrings(tq->getConnectedThors(), tq0.getConnectedThors()))
  1906. bAdd = true;
  1907. if (!checkSameStrings(tq->getRunningWU1(), tq0.getRunningWU1()))
  1908. bAdd = true;
  1909. if (!checkSameStrings(tq->getRunningWU2(), tq0.getRunningWU2()))
  1910. bAdd = true;
  1911. }
  1912. return bAdd;
  1913. }
  1914. void xsltTransform(const char* xml, const char* sheet, IProperties *params, StringBuffer& ret)
  1915. {
  1916. StringBuffer xsl;
  1917. if(!checkFileExists(sheet))
  1918. throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Could not find stylesheet %s.",sheet);
  1919. Owned<IXslProcessor> proc = getXslProcessor();
  1920. Owned<IXslTransform> trans = proc->createXslTransform();
  1921. trans->setXmlSource(xml, strlen(xml));
  1922. trans->loadXslFromFile(sheet);
  1923. trans->copyParameters(params);
  1924. trans->transform(ret);
  1925. }
  1926. bool addToQueryString(StringBuffer &queryString, const char *name, const char *value, const char delim)
  1927. {
  1928. if (isEmpty(name) || isEmpty(value))
  1929. return false;
  1930. if (queryString.length() > 0)
  1931. queryString.append(delim);
  1932. queryString.append(name).append("=").append(value);
  1933. return true;
  1934. }
  1935. int WUSchedule::run()
  1936. {
  1937. try
  1938. {
  1939. while(true)
  1940. {
  1941. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  1942. Owned<IConstWorkUnitIterator> itr = factory->getWorkUnitsByState(WUStateScheduled);
  1943. if (itr)
  1944. {
  1945. ForEach(*itr)
  1946. {
  1947. try
  1948. {
  1949. IConstWorkUnit & cw = itr->query();
  1950. if (cw.aborting())
  1951. {
  1952. WorkunitUpdate wu(&cw.lock());
  1953. wu->setState(WUStateAborted);
  1954. continue;
  1955. }
  1956. WsWuDateTime dt, now;
  1957. now.setNow();
  1958. cw.getTimeScheduled(dt);
  1959. if (now.compare(dt)>=0)
  1960. {
  1961. SCMStringBuffer wuid;
  1962. runWorkUnit(cw.getWuid(wuid).str());
  1963. }
  1964. }
  1965. catch(IException *e)
  1966. {
  1967. StringBuffer msg;
  1968. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  1969. e->Release();
  1970. }
  1971. }
  1972. }
  1973. sleep(60);
  1974. }
  1975. }
  1976. catch(IException *e)
  1977. {
  1978. StringBuffer msg;
  1979. ERRLOG("Exception %d:%s in WsWorkunits Schedule::run", e->errorCode(), e->errorMessage(msg).str());
  1980. e->Release();
  1981. }
  1982. catch(...)
  1983. {
  1984. ERRLOG("Unknown exception in WsWorkunits Schedule::run");
  1985. }
  1986. if (m_container)
  1987. m_container->exitESP();
  1988. return 0;
  1989. }
  1990. }