thormisc.cpp 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _WIN32
  14. #include <sys/types.h>
  15. #include <dirent.h>
  16. #endif
  17. #include <stdio.h>
  18. #include <time.h>
  19. #include "jexcept.hpp"
  20. #include "jfile.hpp"
  21. #include "jmisc.hpp"
  22. #include "jsocket.hpp"
  23. #include "jmutex.hpp"
  24. #include "commonext.hpp"
  25. #include "dadfs.hpp"
  26. #include "dasds.hpp"
  27. #include "dafdesc.hpp"
  28. #include "thor.hpp"
  29. #include "thorport.hpp"
  30. #include "thormisc.hpp"
  31. #include "thgraph.hpp"
  32. #include "thbufdef.hpp"
  33. #include "thmem.hpp"
  34. #include "thcompressutil.hpp"
  35. #include "eclrtl.hpp"
  36. #include "eclhelper.hpp"
  37. #include "eclrtl_imp.hpp"
  38. #include "rtlread_imp.hpp"
  39. #include "rtlfield.hpp"
  40. #include "rtlrecord.hpp"
  41. #include "rtlds_imp.hpp"
  42. #include "rtlformat.hpp"
  43. #include "rmtfile.hpp"
  44. #include "roxiestream.hpp"
  45. #define SDS_LOCK_TIMEOUT 30000
  46. static Owned<INode> masterNode;
  47. static Owned<IGroup> processGroup; // group of slave processes
  48. static Owned<IGroup> nodeGroup; // master + processGroup
  49. static Owned<IGroup> slaveGroup; // group containing all channels
  50. static Owned<IGroup> clusterGroup; // master + slaveGroup
  51. static Owned<IGroup> dfsGroup; // same as slaveGroup, but without ports
  52. static Owned<IGroup> localGroup; // used as a placeholder in IFileDescriptors for local files (spills)
  53. static Owned<ICommunicator> nodeComm; // communicator based on nodeGroup (master+slave processes)
  54. mptag_t masterSlaveMpTag;
  55. mptag_t kjServiceMpTag;
  56. Owned<IPropertyTree> globals;
  57. static Owned<IMPtagAllocator> ClusterMPAllocator;
  58. // stat. mappings shared between master and slave activities
  59. const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile});
  60. const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked});
  61. const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
  62. const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics);
  63. const StatisticsMapping indexReadStatistics({StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks});
  64. const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, indexReadStatistics);
  65. const StatisticsMapping indexWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
  66. const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected, StNumIndexWildSeeks}, basicActivityStatistics);
  67. const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);
  68. const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics);
  69. const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics);
  70. const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead}, basicActivityStatistics, diskReadRemoteStatistics);
  71. const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
  72. const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
  73. const StatisticsMapping graphStatistics({StNumExecutions}, basicActivityStatistics);
  74. const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
  75. MODULE_INIT(INIT_PRIORITY_STANDARD)
  76. {
  77. ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
  78. return true;
  79. }
  80. MODULE_EXIT()
  81. {
  82. masterNode.clear();
  83. nodeGroup.clear();
  84. processGroup.clear();
  85. clusterGroup.clear();
  86. slaveGroup.clear();
  87. dfsGroup.clear();
  88. localGroup.clear();
  89. nodeComm.clear();
  90. ClusterMPAllocator.clear();
  91. }
  92. #define EXTRAS 1024
  93. #define NL 3
  94. StringBuffer &ActPrintLogArgsPrep(StringBuffer &res, const CGraphElementBase *container, const ActLogEnum flags, const char *format, va_list args)
  95. {
  96. if (format)
  97. res.valist_appendf(format, args).append(" - ");
  98. res.appendf("activity(ch=%d, %s, %" ACTPF "d)", container->queryOwner().queryJobChannelNumber(), activityKindStr(container->getKind()), container->queryId());
  99. if (0 != (flags & thorlog_ecl))
  100. {
  101. StringBuffer ecltext;
  102. container->getEclText(ecltext);
  103. ecltext.trim();
  104. if (ecltext.length() > 0)
  105. res.append(" [ecl=").append(ecltext.str()).append(']');
  106. }
  107. #ifdef _WIN32
  108. #ifdef MEMLOG
  109. MEMORYSTATUS mS;
  110. GlobalMemoryStatus(&mS);
  111. res.appendf(", mem=%ld",mS.dwAvailPhys);
  112. #endif
  113. #endif
  114. return res;
  115. }
  116. void ActPrintLogArgs(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args)
  117. {
  118. if ((0 == (flags & thorlog_all)) && !container->doLogging())
  119. return; // suppress logging child activities unless thorlog_all flag
  120. StringBuffer res;
  121. ActPrintLogArgsPrep(res, container, flags, format, args);
  122. LOG(logCat, thorJob, "%s", res.str());
  123. }
  124. void ActPrintLogArgs(const CGraphElementBase *container, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args)
  125. {
  126. StringBuffer res;
  127. ActPrintLogArgsPrep(res, container, flags, format, args);
  128. if (e)
  129. {
  130. res.append(" : ");
  131. e->errorMessage(res);
  132. }
  133. LOG(logCat, thorJob, "%s", res.str());
  134. }
  135. void ActPrintLogEx(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...)
  136. {
  137. if ((0 == (flags & thorlog_all)) && (NULL != container->queryOwner().queryOwner() && !container->queryOwner().isGlobal()))
  138. return; // suppress logging child activities unless thorlog_all flag
  139. StringBuffer res;
  140. va_list args;
  141. va_start(args, format);
  142. ActPrintLogArgsPrep(res, container, flags, format, args);
  143. va_end(args);
  144. LOG(logCat, thorJob, "%s", res.str());
  145. }
  146. void ActPrintLog(const CActivityBase *activity, const char *format, ...)
  147. {
  148. va_list args;
  149. va_start(args, format);
  150. ActPrintLogArgs(&activity->queryContainer(), thorlog_null, MCdebugProgress, format, args);
  151. va_end(args);
  152. }
  153. void ActPrintLog(const CActivityBase *activity, unsigned traceLevel, const char *format, ...)
  154. {
  155. va_list args;
  156. va_start(args, format);
  157. ActPrintLogArgs(&activity->queryContainer(), thorlog_null, MCdebugInfo(traceLevel), format, args);
  158. va_end(args);
  159. }
  160. void ActPrintLog(const CActivityBase *activity, IException *e, const char *format, ...)
  161. {
  162. va_list args;
  163. va_start(args, format);
  164. ActPrintLogArgs(&activity->queryContainer(), e, thorlog_null, MCexception(e, MSGCLS_error), format, args);
  165. va_end(args);
  166. }
  167. void ActPrintLog(const CActivityBase *activity, IException *e)
  168. {
  169. ActPrintLog(activity, e, "%s", "");
  170. }
  171. void GraphPrintLogArgsPrep(StringBuffer &res, CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args)
  172. {
  173. if (format)
  174. res.valist_appendf(format, args).append(" - ");
  175. res.appendf("graph(%s, %" GIDPF "d)", graph->queryJob().queryGraphName(), graph->queryGraphId());
  176. }
  177. void GraphPrintLogArgs(CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args)
  178. {
  179. if ((0 == (flags & thorlog_all)) && (NULL != graph->queryOwner() && !graph->isGlobal()))
  180. return; // suppress logging from child graph unless thorlog_all flag
  181. StringBuffer res;
  182. GraphPrintLogArgsPrep(res, graph, flags, logCat, format, args);
  183. LOG(logCat, thorJob, "%s", res.str());
  184. }
  185. void GraphPrintLogArgs(CGraphBase *graph, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args)
  186. {
  187. if ((0 == (flags & thorlog_all)) && (NULL != graph->queryOwner() && !graph->isGlobal()))
  188. return; // suppress logging from child graph unless thorlog_all flag
  189. StringBuffer res;
  190. GraphPrintLogArgsPrep(res, graph, flags, logCat, format, args);
  191. if (e)
  192. {
  193. res.append(" : ");
  194. e->errorMessage(res);
  195. }
  196. LOG(logCat, thorJob, "%s", res.str());
  197. }
  198. void GraphPrintLog(CGraphBase *graph, IException *e, const char *format, ...)
  199. {
  200. va_list args;
  201. va_start(args, format);
  202. GraphPrintLogArgs(graph, e, thorlog_null, MCexception(e, MSGCLS_error), format, args);
  203. va_end(args);
  204. }
  205. class DECL_EXCEPTION CThorException : public CSimpleInterface, implements IThorException
  206. {
  207. protected:
  208. ThorExceptionAction action;
  209. ThorActivityKind kind;
  210. activity_id id;
  211. graph_id graphId;
  212. StringAttr jobId;
  213. int errorcode;
  214. StringAttr msg;
  215. LogMsgAudience audience;
  216. unsigned slave;
  217. MemoryBuffer data; // extra exception specific data
  218. unsigned line, column;
  219. StringAttr file, origin, graphName;
  220. ErrorSeverity severity;
  221. Linked<IException> originalException;
  222. public:
  223. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  224. CThorException(LogMsgAudience _audience,int code, const char *str)
  225. : audience(_audience), errorcode(code), msg(str), action(tea_null), graphId(0), id(0), slave(0), line(0), column(0), severity(SeverityInformation), kind(TAKnone) { };
  226. CThorException(MemoryBuffer &mb)
  227. {
  228. readUnderlyingType<ThorExceptionAction>(mb, action);
  229. mb.read(jobId);
  230. mb.read(graphName);
  231. mb.read(graphId);
  232. readUnderlyingType(mb, kind);
  233. mb.read(id);
  234. mb.read(slave);
  235. readUnderlyingType(mb, audience);
  236. mb.read(errorcode);
  237. mb.read(msg);
  238. mb.read(file);
  239. mb.read(line);
  240. mb.read(column);
  241. readUnderlyingType(mb, severity);
  242. mb.read(origin);
  243. if (0 == origin.length()) // simpler to clear serialized 0 length terminated string here than check on query
  244. origin.clear();
  245. bool oe;
  246. mb.read(oe);
  247. if (oe)
  248. originalException.setown(deserializeThorException(mb));
  249. size32_t sz;
  250. mb.read(sz);
  251. if (sz)
  252. data.append(sz, mb.readDirect(sz));
  253. }
  254. // IThorException
  255. virtual ThorExceptionAction queryAction() const { return action; }
  256. virtual ThorActivityKind queryActivityKind() const { return kind; }
  257. virtual activity_id queryActivityId() const { return id; }
  258. virtual const char *queryGraphName() const { return graphName; }
  259. virtual graph_id queryGraphId() const { return graphId; }
  260. virtual const char *queryJobId() const { return jobId; }
  261. virtual unsigned querySlave() const { return slave; }
  262. virtual void getAssert(StringAttr &_file, unsigned &_line, unsigned &_column) const { _file.set(file); _line = line; _column = column; }
  263. virtual const char *queryOrigin() const { return origin; }
  264. virtual const char *queryMessage() const { return msg; }
  265. virtual ErrorSeverity querySeverity() const { return severity; }
  266. virtual MemoryBuffer &queryData() { return data; }
  267. virtual IException *queryOriginalException() const { return originalException; }
  268. virtual void setActivityId(activity_id _id) { id = _id; }
  269. virtual void setActivityKind(ThorActivityKind _kind) { kind = _kind; }
  270. virtual void setGraphInfo(const char *_graphName, graph_id _graphId) { graphName.set(_graphName); graphId = _graphId; }
  271. virtual void setJobId(const char *_jobId) { jobId.set(_jobId); }
  272. virtual void setAction(ThorExceptionAction _action) { action = _action; }
  273. virtual void setAudience(MessageAudience _audience) { audience = _audience; }
  274. virtual void setSlave(unsigned _slave) { slave = _slave; }
  275. virtual void setMessage(const char *_msg) { msg.set(_msg); }
  276. virtual void setAssert(const char *_file, unsigned _line, unsigned _column) { file.set(_file); line = _line; column = _column; }
  277. virtual void setOrigin(const char *_origin) { origin.set(_origin); }
  278. virtual void setSeverity(ErrorSeverity _severity) { severity = _severity; }
  279. virtual void setOriginalException(IException *e) { originalException.set(e); }
  280. // IException
  281. int errorCode() const { return errorcode; }
  282. StringBuffer &errorMessage(StringBuffer &str) const
  283. {
  284. if (!origin.length() || 0 != stricmp("user", origin.get())) // don't report slave in user message
  285. {
  286. if (graphId)
  287. str.append("Graph ").append(graphName).append("[").append(graphId).append("], ");
  288. if (kind)
  289. str.append(activityKindStr(kind));
  290. if (id)
  291. {
  292. if (kind) str.append('[');
  293. str.append(id);
  294. if (kind) str.append(']');
  295. str.append(": ");
  296. }
  297. if (slave)
  298. {
  299. str.appendf("SLAVE #%d [", slave);
  300. queryClusterGroup().queryNode(slave).endpoint().getUrlStr(str);
  301. str.append("]: ");
  302. }
  303. }
  304. str.append(msg);
  305. if (originalException)
  306. {
  307. if (msg.length())
  308. str.append(" - ");
  309. str.append("caused by (");
  310. str.append(originalException->errorCode());
  311. str.append(", ");
  312. originalException->errorMessage(str);
  313. str.append(")");
  314. }
  315. return str;
  316. }
  317. MessageAudience errorAudience() const { return audience; }
  318. };
  319. CThorException *_MakeThorException(LogMsgAudience audience,int code, const char *format, va_list args) __attribute__((format(printf,3,0)));
  320. CThorException *_MakeThorException(LogMsgAudience audience, int code, const char *format, va_list args)
  321. {
  322. StringBuffer eStr;
  323. eStr.limited_valist_appendf(1024, format, args);
  324. return new CThorException(audience, code, eStr.str());
  325. }
  326. CThorException *_ThorWrapException(IException *e, const char *format, va_list args) __attribute__((format(printf,2,0)));
  327. CThorException *_ThorWrapException(IException *e, const char *format, va_list args)
  328. {
  329. StringBuffer eStr;
  330. eStr.appendf("%d, ", e->errorCode());
  331. e->errorMessage(eStr).append(" : ");
  332. eStr.limited_valist_appendf(2048, format, args);
  333. CThorException *te = new CThorException(e->errorAudience(), e->errorCode(), eStr.str());
  334. return te;
  335. }
  336. // convert exception (if necessary) to an exception with action=shutdown
  337. IThorException *MakeThorFatal(IException *e, int code, const char *format, ...)
  338. {
  339. CThorException *te = QUERYINTERFACE(e, CThorException);
  340. if (te)
  341. te->Link();
  342. else
  343. {
  344. va_list args;
  345. va_start(args, format);
  346. if (e) te = _ThorWrapException(e, format, args);
  347. else te = _MakeThorException(MSGAUD_user,code, format, args);
  348. va_end(args);
  349. }
  350. te->setAction(tea_shutdown);
  351. return te;
  352. }
  353. IThorException *MakeThorAudienceException(LogMsgAudience audience, int code, const char *format, ...)
  354. {
  355. va_list args;
  356. va_start(args, format);
  357. IThorException *e = _MakeThorException(audience, code, format, args);
  358. va_end(args);
  359. return e;
  360. }
  361. IThorException *MakeThorOperatorException(int code, const char *format, ...)
  362. {
  363. va_list args;
  364. va_start(args, format);
  365. IThorException *e = _MakeThorException(MSGAUD_operator,code, format, args);
  366. va_end(args);
  367. return e;
  368. }
  369. void setExceptionActivityInfo(CGraphElementBase &container, IThorException *e)
  370. {
  371. e->setActivityKind(container.getKind());
  372. e->setActivityId(container.queryId());
  373. e->setGraphInfo(container.queryJob().queryGraphName(), container.queryOwner().queryGraphId());
  374. }
  375. IThorException *_MakeActivityException(CGraphElementBase &container, int code, const char *format, va_list args) __attribute__((format(printf,3,0)));
  376. IThorException *_MakeActivityException(CGraphElementBase &container, int code, const char *format, va_list args)
  377. {
  378. IThorException *e = _MakeThorException(MSGAUD_user, code, format, args);
  379. setExceptionActivityInfo(container, e);
  380. return e;
  381. }
  382. IThorException *_MakeActivityException(CGraphElementBase &container, IException *e, const char *_format, va_list args) __attribute__((format(printf,3,0)));
  383. IThorException *_MakeActivityException(CGraphElementBase &container, IException *e, const char *_format, va_list args)
  384. {
  385. StringBuffer msg;
  386. e->errorMessage(msg);
  387. if (_format)
  388. msg.append(", ").limited_valist_appendf(1024, _format, args);
  389. IThorException *e2 = new CThorException(e->errorAudience(), e->errorCode(), msg.str());
  390. e2->setOriginalException(e);
  391. setExceptionActivityInfo(container, e2);
  392. return e2;
  393. }
  394. IThorException *MakeActivityException(CActivityBase *activity, int code, const char *format, ...)
  395. {
  396. va_list args;
  397. va_start(args, format);
  398. IThorException *e = _MakeActivityException(activity->queryContainer(), code, format, args);
  399. va_end(args);
  400. return e;
  401. }
  402. IThorException *MakeActivityException(CActivityBase *activity, IException *e, const char *format, ...)
  403. {
  404. va_list args;
  405. va_start(args, format);
  406. IThorException *e2 = _MakeActivityException(activity->queryContainer(), e, format, args);
  407. va_end(args);
  408. return e2;
  409. }
  410. IThorException *MakeActivityException(CActivityBase *activity, IException *e)
  411. {
  412. return MakeActivityException(activity, e, "%s", "");
  413. }
  414. IThorException *MakeActivityWarning(CActivityBase *activity, int code, const char *format, ...)
  415. {
  416. va_list args;
  417. va_start(args, format);
  418. IThorException *e = _MakeActivityException(activity->queryContainer(), code, format, args);
  419. e->setAction(tea_warning);
  420. e->setSeverity(SeverityWarning);
  421. va_end(args);
  422. return e;
  423. }
  424. IThorException *MakeActivityWarning(CActivityBase *activity, IException *e, const char *format, ...)
  425. {
  426. va_list args;
  427. va_start(args, format);
  428. IThorException *e2 = _MakeActivityException(activity->queryContainer(), e, format, args);
  429. e2->setAction(tea_warning);
  430. e2->setSeverity(SeverityWarning);
  431. va_end(args);
  432. return e2;
  433. }
  434. IThorException *MakeActivityException(CGraphElementBase *container, int code, const char *format, ...)
  435. {
  436. va_list args;
  437. va_start(args, format);
  438. IThorException *e = _MakeActivityException(*container, code, format, args);
  439. va_end(args);
  440. return e;
  441. }
  442. IThorException *MakeActivityException(CGraphElementBase *container, IException *e, const char *format, ...)
  443. {
  444. va_list args;
  445. va_start(args, format);
  446. IThorException *e2 = _MakeActivityException(*container, e, format, args);
  447. va_end(args);
  448. return e2;
  449. }
  450. IThorException *MakeActivityException(CGraphElementBase *container, IException *e)
  451. {
  452. return MakeActivityException(container, e, "%s", "");
  453. }
  454. IThorException *MakeActivityWarning(CGraphElementBase *container, int code, const char *format, ...)
  455. {
  456. va_list args;
  457. va_start(args, format);
  458. IThorException *e = _MakeActivityException(*container, code, format, args);
  459. e->setAction(tea_warning);
  460. e->setSeverity(SeverityWarning);
  461. va_end(args);
  462. return e;
  463. }
  464. IThorException *MakeActivityWarning(CGraphElementBase *container, IException *e, const char *format, ...)
  465. {
  466. va_list args;
  467. va_start(args, format);
  468. IThorException *e2 = _MakeActivityException(*container, e, format, args);
  469. e2->setAction(tea_warning);
  470. e2->setSeverity(SeverityWarning);
  471. va_end(args);
  472. return e2;
  473. }
  474. IThorException *MakeThorException(int code, const char *format, ...)
  475. {
  476. va_list args;
  477. va_start(args, format);
  478. IThorException *e2 = _MakeThorException(MSGAUD_user,code, format, args);
  479. va_end(args);
  480. return e2;
  481. }
  482. IThorException *MakeThorException(IException *e)
  483. {
  484. IThorException *te = QUERYINTERFACE(e, IThorException);
  485. if (te)
  486. return LINK(te);
  487. StringBuffer msg;
  488. return new CThorException(MSGAUD_user, e->errorCode(), e->errorMessage(msg).str());
  489. }
  490. IThorException *ThorWrapException(IException *e, const char *format, ...)
  491. {
  492. va_list args;
  493. va_start(args, format);
  494. ThorExceptionAction action=tea_null;
  495. if (QUERYINTERFACE(e, ISEH_Exception))
  496. action = tea_shutdown;
  497. CThorException *te = _ThorWrapException(e, format, args);
  498. te->setAction(action);
  499. va_end(args);
  500. return te;
  501. }
  502. IThorException *MakeGraphException(CGraphBase *graph, int code, const char *format, ...)
  503. {
  504. va_list args;
  505. va_start(args, format);
  506. IThorException *e = _MakeThorException(MSGAUD_user, code, format, args);
  507. e->setGraphInfo(graph->queryJob().queryGraphName(), graph->queryGraphId());
  508. va_end(args);
  509. return e;
  510. }
  511. IThorException *MakeGraphException(CGraphBase *graph, IException *e)
  512. {
  513. StringBuffer msg;
  514. IThorException *e2 = new CThorException(MSGAUD_user, e->errorCode(), e->errorMessage(msg).str());
  515. e2->setGraphInfo(graph->queryJob().queryGraphName(), graph->queryGraphId());
  516. return e2;
  517. }
  518. #if 0
  519. void SetLogName(const char *prefix, const char *logdir, const char *thorname, bool master)
  520. {
  521. StringBuffer logname;
  522. if (logdir && *logdir !='\0')
  523. {
  524. if (!recursiveCreateDirectory(logdir))
  525. {
  526. OWARNLOG("Failed to use %s as log directory, using current working directory", logdir); // default working directory should be open already
  527. return;
  528. }
  529. logname.append(logdir);
  530. }
  531. else
  532. {
  533. char cwd[1024];
  534. GetCurrentDirectory(1024, cwd);
  535. logname.append(cwd);
  536. }
  537. if (logname.length() && logname.charAt(logname.length()-1) != PATHSEPCHAR)
  538. logname.append(PATHSEPCHAR);
  539. logname.append(prefix);
  540. #if 0
  541. time_t tNow;
  542. time(&tNow);
  543. char timeStamp[32];
  544. #ifdef _WIN32
  545. struct tm *ltNow;
  546. ltNow = localtime(&tNow);
  547. strftime(timeStamp, 32, ".%m_%d_%y_%H_%M_%S", ltNow);
  548. #else
  549. struct tm ltNow;
  550. localtime_r(&tNow, &ltNow);
  551. strftime(timeStamp, 32, ".%m_%d_%y_%H_%M_%S", &ltNow);
  552. #endif
  553. logname.append(timeStamp);
  554. #endif
  555. logname.append(".log");
  556. StringBuffer lf;
  557. openLogFile(lf, logname.str());
  558. PROGLOG("Opened log file %s", lf.str());
  559. PROGLOG("Build %s", hpccBuildInfo.buildTag);
  560. }
  561. #endif
  562. class CTempNameHandler
  563. {
  564. public:
  565. unsigned num;
  566. StringBuffer rootDir, subDirName, prefix, subDirPath;
  567. CriticalSection crit;
  568. CTempNameHandler()
  569. {
  570. num = 0;
  571. }
  572. const char *queryTempDir()
  573. {
  574. return subDirPath;
  575. }
  576. void clearTempDirectory(bool log)
  577. {
  578. assertex(subDirPath.length());
  579. Owned<IDirectoryIterator> iter = createDirectoryIterator(subDirPath);
  580. ForEach (*iter)
  581. {
  582. IFile &file = iter->query();
  583. if (file.isFile()==fileBool::foundYes)
  584. {
  585. if (log)
  586. LOG(MCdebugInfo, thorJob, "Deleting %s", file.queryFilename());
  587. try { file.remove(); }
  588. catch (IException *e)
  589. {
  590. if (log)
  591. FLLOG(MCwarning, thorJob, e);
  592. e->Release();
  593. }
  594. }
  595. }
  596. }
  597. void setTempDir(const char *_rootDir, const char *_subDirName, const char *_prefix, bool clearDir)
  598. {
  599. assertex(!isEmptyString(_rootDir) && !isEmptyString(_prefix) && !isEmptyString(_subDirName));
  600. CriticalBlock block(crit);
  601. assertex(subDirPath.isEmpty());
  602. rootDir.set(_rootDir);
  603. addPathSepChar(rootDir);
  604. subDirName.set(_subDirName);
  605. prefix.set(_prefix);
  606. subDirPath.setf("%s%s", rootDir.str(), subDirName.str());
  607. recursiveCreateDirectory(subDirPath);
  608. if (clearDir)
  609. clearTempDirectory(true);
  610. }
  611. void clear(bool log)
  612. {
  613. clearTempDirectory(log);
  614. try
  615. {
  616. Owned<IFile> dirIFile = createIFile(subDirPath);
  617. bool success = dirIFile->remove();
  618. if (log)
  619. PROGLOG("%s to delete temp directory: %s", subDirPath.str(), success ? "succeeded" : "failed");
  620. }
  621. catch (IException *e)
  622. {
  623. if (log)
  624. FLLOG(MCwarning, thorJob, e);
  625. e->Release();
  626. }
  627. subDirPath.clear();
  628. }
  629. void getTempName(StringBuffer &name, const char *suffix, bool inTempDir)
  630. {
  631. CriticalBlock block(crit);
  632. assertex(!subDirPath.isEmpty());
  633. if (inTempDir)
  634. {
  635. name.append(rootDir);
  636. name.append(subDirName);
  637. addPathSepChar(name);
  638. }
  639. else
  640. name.append(subDirName).append('_');
  641. name.append(prefix).append('_').append(++num);
  642. if (suffix)
  643. name.append("__").append(suffix);
  644. name.append(".tmp");
  645. }
  646. } TempNameHandler;
  647. void GetTempFileName(StringBuffer &name, const char *suffix)
  648. {
  649. TempNameHandler.getTempName(name, suffix, false);
  650. }
  651. void GetTempFilePath(StringBuffer &name, const char *suffix)
  652. {
  653. TempNameHandler.getTempName(name, suffix, true);
  654. }
  655. void SetTempDir(const char *rootTempDir, const char *uniqueSubDir, const char *tempPrefix, bool clearDir)
  656. {
  657. TempNameHandler.setTempDir(rootTempDir, uniqueSubDir, tempPrefix, clearDir);
  658. LOG(MCdebugProgress, thorJob, "temporary rootTempdir: %s, uniqueSubDir: %s, prefix: %s", rootTempDir, uniqueSubDir, tempPrefix);
  659. }
  660. void ClearTempDir()
  661. {
  662. try
  663. {
  664. TempNameHandler.clear(true);
  665. LOG(MCthorDetailedDebugInfo, thorJob, "temp directory cleared");
  666. }
  667. catch (IException *e)
  668. {
  669. EXCLOG(e, "ClearTempDir");
  670. e->Release();
  671. }
  672. }
  673. const char *queryTempDir()
  674. {
  675. return TempNameHandler.queryTempDir();
  676. }
  677. class DECL_EXCEPTION CBarrierAbortException: public CSimpleInterface, public IBarrierException
  678. {
  679. public:
  680. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  681. // IThorException
  682. int errorCode() const { return -1; }
  683. StringBuffer &errorMessage(StringBuffer &str) const { str.append("Barrier Aborted"); return str; }
  684. MessageAudience errorAudience() const { return MSGAUD_user; }
  685. };
  686. IBarrierException *createBarrierAbortException()
  687. {
  688. return new CBarrierAbortException();
  689. }
  690. void loadCmdProp(IPropertyTree *tree, const char *cmdProp)
  691. {
  692. StringBuffer prop("@"), val;
  693. while (*cmdProp && *cmdProp != '=')
  694. prop.append(*cmdProp++);
  695. if (*cmdProp)
  696. {
  697. cmdProp++;
  698. while (isspace(*cmdProp))
  699. cmdProp++;
  700. while (*cmdProp)
  701. val.append(*cmdProp++);
  702. prop.clip();
  703. val.clip();
  704. if (prop.length())
  705. tree->setProp(prop.str(), val.str());
  706. }
  707. }
  708. LogMsgJobInfo thorJob(UnknownJob, UnknownUser); // configured at job start (in manager and workers)
  709. void ensureDirectoryForFile(const char *fName)
  710. {
  711. if (!recursiveCreateDirectoryForFile(fName))
  712. throw makeOsExceptionV(GetLastError(), "Failed to create directory for file: %s", fName);
  713. }
  714. // Not recommended to be used from slaves as tend to be one or more trying at same time.
  715. void reportExceptionToWorkunit(IConstWorkUnit &workunit,IException *e, ErrorSeverity severity)
  716. {
  717. LOG(MCwarning, thorJob, e, "Reporting exception to WU");
  718. Owned<IWorkUnit> wu = &workunit.lock();
  719. if (wu)
  720. {
  721. Owned<IWUException> we = wu->createException();
  722. StringBuffer s;
  723. we->setExceptionMessage(e->errorMessage(s.clear()).str());
  724. we->setExceptionCode(e->errorCode());
  725. IThorException *te = QUERYINTERFACE(e, IThorException);
  726. if (te)
  727. {
  728. we->setSeverity(te->querySeverity());
  729. if (!te->queryOrigin()) // will have an origin if from slaves already
  730. te->setOrigin("master");
  731. we->setExceptionSource(te->queryOrigin());
  732. StringAttr file;
  733. unsigned line, column;
  734. te->getAssert(file, line, column);
  735. if (file.length())
  736. we->setExceptionFileName(file);
  737. if (line || column)
  738. {
  739. we->setExceptionLineNo(line);
  740. we->setExceptionColumn(column);
  741. }
  742. if (te->queryActivityId())
  743. we->setActivityId(te->queryActivityId());
  744. }
  745. else
  746. we->setSeverity(severity);
  747. }
  748. }
  749. void reportExceptionToWorkunitCheckIgnore(IConstWorkUnit &workunit, IException *e, ErrorSeverity severity)
  750. {
  751. ErrorSeverity mappedSeverity = workunit.getWarningSeverity(e->errorCode(), severity);
  752. if (SeverityIgnore == mappedSeverity)
  753. return;
  754. reportExceptionToWorkunit(workunit, e, mappedSeverity);
  755. }
  756. StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *queryName, unsigned version)
  757. {
  758. return compoundName.append('V').append(version).append('_').append(queryName);
  759. }
  760. void setupGroups(INode *_masterNode, IGroup *_processGroup, IGroup *_slaveGroup)
  761. {
  762. masterNode.set(_masterNode);
  763. processGroup.set(_processGroup);
  764. slaveGroup.set(_slaveGroup);
  765. // nodeGroup contains master + all slave processes (excludes virtual slaves)
  766. nodeGroup.setown(processGroup->add(LINK(masterNode), 0));
  767. // clusterGroup contains master + all slaves (including virtuals)
  768. clusterGroup.setown(slaveGroup->add(LINK(masterNode), 0));
  769. // dfsGroup is same as slaveGroup, but stripped of ports. So is a IP group as wide as slaveGroup, used for publishing
  770. IArrayOf<INode> dfsGroupNodes;
  771. Owned<INodeIterator> nodeIter = slaveGroup->getIterator();
  772. ForEach(*nodeIter)
  773. dfsGroupNodes.append(*createINodeIP(nodeIter->query().endpoint(), 0));
  774. dfsGroup.setown(createIGroup(dfsGroupNodes.ordinality(), dfsGroupNodes.getArray()));
  775. Owned<INode> localNode = createINode("localhost");
  776. INode *p = localNode;
  777. localGroup.setown(createIGroup(1, &p));
  778. nodeComm.setown(createCommunicator(nodeGroup));
  779. }
  780. void setupCluster(INode *_masterNode, IGroup *_processGroup, unsigned channelsPerSlave, unsigned portBase, unsigned portInc)
  781. {
  782. IArrayOf<INode> slaveGroupNodes;
  783. for (unsigned s=0; s<channelsPerSlave; s++)
  784. {
  785. for (unsigned p=0; p<_processGroup->ordinality(); p++)
  786. {
  787. INode &processNode = _processGroup->queryNode(p);
  788. SocketEndpoint ep = processNode.endpoint();
  789. ep.port = ep.port + (s * portInc);
  790. Owned<INode> node = createINode(ep);
  791. slaveGroupNodes.append(*node.getClear());
  792. }
  793. }
  794. Owned<IGroup> _slaveGroup = createIGroup(slaveGroupNodes.ordinality(), slaveGroupNodes.getArray());
  795. setupGroups(_masterNode, _processGroup, _slaveGroup);
  796. }
  797. void setClusterGroup(INode *_masterNode, IGroup *rawGroup, unsigned slavesPerNode, unsigned channelsPerSlave, unsigned portBase, unsigned portInc)
  798. {
  799. SocketEndpointArray epa;
  800. OwnedMalloc<unsigned> hostStartPort, hostNextStartPort;
  801. hostStartPort.allocateN(rawGroup->ordinality());
  802. hostNextStartPort.allocateN(rawGroup->ordinality());
  803. for (unsigned n=0; n<rawGroup->ordinality(); n++)
  804. {
  805. SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
  806. unsigned hostPos = epa.find(ep);
  807. if (NotFound == hostPos)
  808. {
  809. hostPos = epa.ordinality();
  810. epa.append(ep);
  811. hostStartPort[n] = portBase;
  812. hostNextStartPort[hostPos] = portBase + (slavesPerNode * channelsPerSlave * portInc);
  813. }
  814. else
  815. {
  816. hostStartPort[n] = hostNextStartPort[hostPos];
  817. hostNextStartPort[hostPos] += (slavesPerNode * channelsPerSlave * portInc);
  818. }
  819. }
  820. IArrayOf<INode> slaveGroupNodes, processGroupNodes;
  821. for (unsigned s=0; s<channelsPerSlave; s++)
  822. {
  823. for (unsigned p=0; p<slavesPerNode; p++)
  824. {
  825. for (unsigned n=0; n<rawGroup->ordinality(); n++)
  826. {
  827. SocketEndpoint ep = rawGroup->queryNode(n).endpoint();
  828. ep.port = hostStartPort[n] + (((p * channelsPerSlave) + s) * portInc);
  829. Owned<INode> node = createINode(ep);
  830. slaveGroupNodes.append(*node.getLink());
  831. if (0 == s)
  832. processGroupNodes.append(*node.getLink());
  833. }
  834. }
  835. }
  836. Owned<IGroup> _processGroup = createIGroup(processGroupNodes.ordinality(), processGroupNodes.getArray());
  837. Owned<IGroup> _slaveGroup = createIGroup(slaveGroupNodes.ordinality(), slaveGroupNodes.getArray());
  838. setupGroups(_masterNode, _processGroup, _slaveGroup);
  839. }
  840. bool clusterInitialized() { return NULL != nodeComm; }
  841. INode &queryMasterNode() { return *masterNode; }
  842. ICommunicator &queryNodeComm() { return *nodeComm; }
  843. IGroup &queryNodeGroup() { return *nodeGroup; }
  844. IGroup &queryProcessGroup() { return *processGroup; }
  845. IGroup &queryClusterGroup() { return *clusterGroup; }
  846. IGroup &querySlaveGroup() { return *slaveGroup; }
  847. IGroup &queryDfsGroup() { return *dfsGroup; }
  848. IGroup &queryLocalGroup() { return *localGroup; }
  849. unsigned queryClusterWidth() { return clusterGroup->ordinality()-1; }
  850. unsigned queryNodeClusterWidth() { return nodeGroup->ordinality()-1; }
  851. mptag_t allocateClusterMPTag()
  852. {
  853. return ClusterMPAllocator->alloc();
  854. }
  855. void freeClusterMPTag(mptag_t tag)
  856. {
  857. ClusterMPAllocator->release(tag);
  858. }
  859. IThorException *deserializeThorException(MemoryBuffer &in)
  860. {
  861. unsigned te;
  862. in.read(te);
  863. if (!te)
  864. {
  865. Owned<IException> e = deserializeException(in);
  866. StringBuffer s;
  867. return new CThorException(e->errorAudience(), e->errorCode(), e->errorMessage(s).str());
  868. }
  869. return new CThorException(in);
  870. }
  871. void serializeThorException(IException *e, MemoryBuffer &out)
  872. {
  873. IThorException *te = QUERYINTERFACE(e, IThorException);
  874. if (!te)
  875. {
  876. out.append(0);
  877. serializeException(e, out);
  878. return;
  879. }
  880. out.append(1);
  881. out.append((unsigned)te->queryAction());
  882. out.append(te->queryJobId());
  883. out.append(te->queryGraphName());
  884. out.append(te->queryGraphId());
  885. out.append((unsigned)te->queryActivityKind());
  886. out.append(te->queryActivityId());
  887. out.append(te->querySlave());
  888. out.append((unsigned)te->errorAudience());
  889. out.append(te->errorCode());
  890. out.append(te->queryMessage());
  891. StringAttr file;
  892. unsigned line, column;
  893. te->getAssert(file, line, column);
  894. out.append(file);
  895. out.append(line);
  896. out.append(column);
  897. out.append((unsigned)te->querySeverity());
  898. out.append(te->queryOrigin());
  899. IException *oe = te->queryOriginalException();
  900. if (oe)
  901. {
  902. out.append(true);
  903. serializeThorException(oe, out);
  904. }
  905. else
  906. out.append(false);
  907. MemoryBuffer &data = te->queryData();
  908. out.append((size32_t)data.length());
  909. if (data.length())
  910. out.append(data.length(), data.toByteArray());
  911. }
  912. bool getBestFilePart(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path, IExceptionHandler *eHandler)
  913. {
  914. if (0 == partDesc.numCopies()) // not sure this is poss.
  915. return false;
  916. SocketEndpoint slfEp((unsigned short)0);
  917. unsigned l;
  918. RemoteFilename rfn;
  919. StringBuffer locationName, primaryName;
  920. //First check for local matches
  921. for (l=0; l<partDesc.numCopies(); l++)
  922. {
  923. rfn.clear();
  924. partDesc.getFilename(l, rfn);
  925. if (0 == l)
  926. {
  927. rfn.getPath(locationName.clear());
  928. assertex(locationName.length());
  929. primaryName.append(locationName);
  930. locationName.clear();
  931. }
  932. if (rfn.isLocal())
  933. {
  934. rfn.getPath(locationName.clear());
  935. assertex(locationName.length());
  936. Owned<IFile> file;
  937. if (activity->getOptBool("forceDafilesrv"))
  938. {
  939. PROGLOG("Using dafilesrv for: %s", locationName.str());
  940. file.setown(createDaliServixFile(rfn));
  941. }
  942. else
  943. file.setown(createIFile(locationName.str()));
  944. try
  945. {
  946. if (file->exists())
  947. {
  948. ifile.set(file);
  949. location = l;
  950. path.append(locationName);
  951. return true;
  952. }
  953. }
  954. catch (IException *e)
  955. {
  956. ActPrintLog(&activity->queryContainer(), e, "getBestFilePart");
  957. e->Release();
  958. }
  959. }
  960. }
  961. //Now check for a remote match...
  962. for (l=0; l<partDesc.numCopies(); l++)
  963. {
  964. rfn.clear();
  965. partDesc.getFilename(l, rfn);
  966. if (!rfn.isLocal())
  967. {
  968. rfn.getPath(locationName.clear());
  969. assertex(locationName.length());
  970. Owned<IFile> file = createIFile(locationName.str());
  971. try
  972. {
  973. if (file->exists())
  974. {
  975. ifile.set(file);
  976. location = l;
  977. if (0 != l)
  978. {
  979. Owned<IThorException> e = MakeActivityWarning(activity, 0, "Primary file missing: %s, using remote copy: %s", primaryName.str(), locationName.str());
  980. if (!eHandler)
  981. throw e.getClear();
  982. eHandler->fireException(e);
  983. }
  984. path.append(locationName);
  985. return true;
  986. }
  987. }
  988. catch (IException *e)
  989. {
  990. ActPrintLog(&activity->queryContainer(), e, "In getBestFilePart");
  991. e->Release();
  992. }
  993. }
  994. }
  995. return false;
  996. }
  997. StringBuffer &getFilePartLocations(IPartDescriptor &partDesc, StringBuffer &locations)
  998. {
  999. unsigned l;
  1000. for (l=0; l<partDesc.numCopies(); l++)
  1001. {
  1002. RemoteFilename rfn;
  1003. partDesc.getFilename(l, rfn);
  1004. rfn.getRemotePath(locations);
  1005. if (l != partDesc.numCopies()-1)
  1006. locations.append(", ");
  1007. }
  1008. return locations;
  1009. }
  1010. StringBuffer &getPartFilename(IPartDescriptor &partDesc, unsigned copy, StringBuffer &filePath, bool localMount)
  1011. {
  1012. RemoteFilename rfn;
  1013. if (localMount && copy)
  1014. {
  1015. partDesc.getFilename(0, rfn);
  1016. if (!rfn.isLocal())
  1017. localMount = false;
  1018. rfn.clear();
  1019. }
  1020. partDesc.getFilename(copy, rfn);
  1021. rfn.getPath(filePath);
  1022. return filePath;
  1023. }
  1024. // CFifoFileCache impl.
  1025. void CFifoFileCache::deleteFile(IFile &ifile)
  1026. {
  1027. try
  1028. {
  1029. if (!ifile.remove())
  1030. FLLOG(MCoperatorWarning, thorJob, "CFifoFileCache: Failed to remove file (missing) : %s", ifile.queryFilename());
  1031. }
  1032. catch (IException *e)
  1033. {
  1034. StringBuffer s("Failed to remove file: ");
  1035. FLLOG(MCoperatorWarning, thorJob, e, s.append(ifile.queryFilename()));
  1036. }
  1037. }
  1038. void CFifoFileCache::init(const char *cacheDir, unsigned _limit, const char *pattern)
  1039. {
  1040. limit = _limit;
  1041. Owned<IDirectoryIterator> iter = createDirectoryIterator(cacheDir, pattern);
  1042. ForEach (*iter)
  1043. {
  1044. IFile &file = iter->query();
  1045. if (file.isFile()==fileBool::foundYes)
  1046. deleteFile(file);
  1047. }
  1048. }
  1049. void CFifoFileCache::add(const char *filename)
  1050. {
  1051. unsigned pos = files.find(filename);
  1052. if (NotFound != pos)
  1053. files.remove(pos);
  1054. files.add(filename, 0);
  1055. if (files.ordinality() > limit)
  1056. {
  1057. const char *toRemoveFname = files.item(limit);
  1058. PROGLOG("Removing %s from fifo cache", toRemoveFname);
  1059. OwnedIFile ifile = createIFile(toRemoveFname);
  1060. deleteFile(*ifile);
  1061. files.remove(limit);
  1062. }
  1063. }
  1064. bool CFifoFileCache::isAvailable(const char *filename)
  1065. {
  1066. unsigned pos = files.find(filename);
  1067. if (NotFound != pos)
  1068. {
  1069. OwnedIFile ifile = createIFile(filename);
  1070. if (ifile->exists())
  1071. return true;
  1072. }
  1073. return false;
  1074. }
  1075. IOutputMetaData *createFixedSizeMetaData(size32_t sz)
  1076. {
  1077. // sure if this allowed or is cheating!
  1078. return new CFixedOutputMetaData(sz);
  1079. }
  1080. class CRowStreamFromNode : public CSimpleInterface, implements IRowStream
  1081. {
  1082. CActivityBase &activity;
  1083. unsigned node, myNode;
  1084. ICommunicator &comm;
  1085. MemoryBuffer mb;
  1086. bool eos;
  1087. const bool &abortSoon;
  1088. mptag_t mpTag, replyTag;
  1089. Owned<ISerialStream> bufferStream;
  1090. CThorStreamDeserializerSource memDeserializer;
  1091. CMessageBuffer msg;
  1092. public:
  1093. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1094. CRowStreamFromNode(CActivityBase &_activity, unsigned _node, ICommunicator &_comm, mptag_t _mpTag, const bool &_abortSoon) : activity(_activity), node(_node), comm(_comm), mpTag(_mpTag), abortSoon(_abortSoon)
  1095. {
  1096. bufferStream.setown(createMemoryBufferSerialStream(mb));
  1097. memDeserializer.setStream(bufferStream);
  1098. myNode = comm.queryGroup().rank(activity.queryMPServer().queryMyNode());
  1099. replyTag = activity.queryMPServer().createReplyTag();
  1100. msg.setReplyTag(replyTag);
  1101. eos = false;
  1102. }
  1103. // IRowStream
  1104. const void *nextRow()
  1105. {
  1106. if (eos) return NULL;
  1107. for (;;)
  1108. {
  1109. while (!memDeserializer.eos())
  1110. {
  1111. RtlDynamicRowBuilder rowBuilder(activity.queryRowAllocator());
  1112. size32_t sz = activity.queryRowDeserializer()->deserialize(rowBuilder, memDeserializer);
  1113. return rowBuilder.finalizeRowClear(sz);
  1114. }
  1115. // no msg just give me data
  1116. if (!comm.send(msg, node, mpTag, LONGTIMEOUT)) // should never timeout, unless other end down
  1117. throw MakeStringException(0, "CRowStreamFromNode: Failed to send data request from node %d, to node %d", myNode, node);
  1118. for (;;)
  1119. {
  1120. if (abortSoon)
  1121. break;
  1122. if (comm.recv(msg, node, replyTag, NULL, 60000))
  1123. break;
  1124. ActPrintLog(&activity, "CRowStreamFromNode, request more from node %d, tag %d timedout, retrying", node, mpTag);
  1125. }
  1126. if (!msg.length())
  1127. break;
  1128. if (abortSoon)
  1129. break;
  1130. msg.swapWith(mb);
  1131. msg.clear();
  1132. }
  1133. eos = true;
  1134. return NULL;
  1135. }
  1136. void stop()
  1137. {
  1138. CMessageBuffer msg;
  1139. msg.append(1); // stop
  1140. verifyex(comm.send(msg, node, mpTag));
  1141. }
  1142. };
  1143. IRowStream *createRowStreamFromNode(CActivityBase &activity, unsigned node, ICommunicator &comm, mptag_t mpTag, const bool &abortSoon)
  1144. {
  1145. return new CRowStreamFromNode(activity, node, comm, mpTag, abortSoon);
  1146. }
  1147. #define DEFAULT_ROWSERVER_BUFF_SIZE (0x10000) // 64K
  1148. class CRowServer : public CSimpleInterface, implements IThreaded, implements IRowServer
  1149. {
  1150. CThreaded threaded;
  1151. ICommunicator &comm;
  1152. CActivityBase *activity;
  1153. mptag_t mpTag;
  1154. unsigned fetchBuffSize;
  1155. Linked<IRowStream> seq;
  1156. std::atomic<bool> running;
  1157. public:
  1158. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1159. CRowServer(CActivityBase *_activity, IRowStream *_seq, ICommunicator &_comm, mptag_t _mpTag)
  1160. : activity(_activity), seq(_seq), comm(_comm), mpTag(_mpTag), threaded("CRowServer")
  1161. {
  1162. fetchBuffSize = DEFAULT_ROWSERVER_BUFF_SIZE;
  1163. running = true;
  1164. threaded.init(this);
  1165. }
  1166. ~CRowServer()
  1167. {
  1168. stop();
  1169. threaded.join();
  1170. }
  1171. virtual void threadmain() override
  1172. {
  1173. CMessageBuffer mb;
  1174. while (running)
  1175. {
  1176. rank_t sender;
  1177. if (comm.recv(mb, RANK_ALL, mpTag, &sender))
  1178. {
  1179. unsigned code;
  1180. if (mb.length())
  1181. {
  1182. mb.read(code);
  1183. if (1 == code) // stop
  1184. {
  1185. seq->stop();
  1186. break;
  1187. }
  1188. else
  1189. throwUnexpected();
  1190. }
  1191. mb.clear();
  1192. CMemoryRowSerializer mbs(mb);
  1193. do
  1194. {
  1195. OwnedConstThorRow row = seq->nextRow();
  1196. if (!row)
  1197. break;
  1198. activity->queryRowSerializer()->serialize(mbs,(const byte *)row.get());
  1199. } while (mb.length() < fetchBuffSize); // NB: allows at least 1
  1200. if (!comm.reply(mb, LONGTIMEOUT))
  1201. throw MakeStringException(0, "CRowStreamFromNode: Failed to send data back to node: %d", activity->queryContainer().queryJobChannel().queryMyRank());
  1202. mb.clear();
  1203. }
  1204. }
  1205. running = false;
  1206. }
  1207. void stop() { running = false; comm.cancel(RANK_ALL, mpTag); }
  1208. };
  1209. IRowServer *createRowServer(CActivityBase *activity, IRowStream *seq, ICommunicator &comm, mptag_t mpTag)
  1210. {
  1211. return new CRowServer(activity, seq, comm, mpTag);
  1212. }
  1213. IEngineRowStream *createUngroupStream(IRowStream *input)
  1214. {
  1215. class CUngroupStream : public CSimpleInterfaceOf<IEngineRowStream>
  1216. {
  1217. IRowStream *input;
  1218. public:
  1219. CUngroupStream(IRowStream *_input) : input(_input) { input->Link(); }
  1220. ~CUngroupStream() { input->Release(); }
  1221. virtual const void *nextRow() override
  1222. {
  1223. const void *ret = input->nextRow();
  1224. if (ret)
  1225. return ret;
  1226. else
  1227. return input->nextRow();
  1228. }
  1229. virtual void stop() override
  1230. {
  1231. input->stop();
  1232. }
  1233. virtual void resetEOF() override { throwUnexpected(); }
  1234. };
  1235. return new CUngroupStream(input);
  1236. }
  1237. void sendInChunks(ICommunicator &comm, rank_t dst, mptag_t mpTag, IRowStream *input, IThorRowInterfaces *rowIf)
  1238. {
  1239. CMessageBuffer msg;
  1240. MemoryBuffer mb;
  1241. CMemoryRowSerializer mbs(mb);
  1242. IOutputRowSerializer *serializer = rowIf->queryRowSerializer();
  1243. for (;;)
  1244. {
  1245. for (;;)
  1246. {
  1247. OwnedConstThorRow row = input->nextRow();
  1248. if (!row)
  1249. {
  1250. row.setown(input->nextRow());
  1251. if (!row)
  1252. break;
  1253. }
  1254. serializer->serialize(mbs, (const byte *)row.get());
  1255. if (mb.length() > 0x80000)
  1256. break;
  1257. }
  1258. msg.clear();
  1259. if (mb.length())
  1260. {
  1261. msg.append(false); // no error
  1262. ThorCompress(mb.toByteArray(), mb.length(), msg);
  1263. mb.clear();
  1264. }
  1265. comm.send(msg, dst, mpTag, LONGTIMEOUT);
  1266. if (0 == msg.length())
  1267. break;
  1268. }
  1269. }
  1270. void logDiskSpace()
  1271. {
  1272. StringBuffer diskSpaceMsg("Disk space: ");
  1273. diskSpaceMsg.append(queryBaseDirectory(grp_unknown, 0)).append(" = ").append(getFreeSpace(queryBaseDirectory(grp_unknown, 0))/0x100000).append(" MB, ");
  1274. diskSpaceMsg.append(queryBaseDirectory(grp_unknown, 1)).append(" = ").append(getFreeSpace(queryBaseDirectory(grp_unknown, 1))/0x100000).append(" MB, ");
  1275. const char *tempDir = globals->queryProp("@thorTempDirectory");
  1276. diskSpaceMsg.append(tempDir).append(" = ").append(getFreeSpace(tempDir)/0x100000).append(" MB");
  1277. PROGLOG("%s", diskSpaceMsg.str());
  1278. }
  1279. IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int maxLevel, IPerfMonHook *chain)
  1280. {
  1281. class CPerfMonHook : public CSimpleInterfaceOf<IPerfMonHook>
  1282. {
  1283. CJobBase &job;
  1284. int maxLevel;
  1285. Linked<IPerfMonHook> chain;
  1286. public:
  1287. CPerfMonHook(CJobBase &_job, unsigned _maxLevel, IPerfMonHook *_chain) : chain(_chain), maxLevel(_maxLevel), job(_job)
  1288. {
  1289. }
  1290. virtual void processPerfStats(unsigned processorUsage, unsigned memoryUsage, unsigned memoryTotal, unsigned __int64 firstDiskUsage, unsigned __int64 firstDiskTotal, unsigned __int64 secondDiskUsage, unsigned __int64 secondDiskTotal, unsigned threadCount)
  1291. {
  1292. if (chain)
  1293. chain->processPerfStats(processorUsage, memoryUsage, memoryTotal, firstDiskUsage,firstDiskTotal, secondDiskUsage, secondDiskTotal, threadCount);
  1294. }
  1295. virtual StringBuffer &extraLogging(StringBuffer &extra)
  1296. {
  1297. if (chain)
  1298. return chain->extraLogging(extra);
  1299. return extra;
  1300. }
  1301. virtual void log(int level, const char *msg)
  1302. {
  1303. PROGLOG("%s", msg);
  1304. if ((maxLevel != -1) && (level <= maxLevel)) // maxLevel of -1 means disabled
  1305. {
  1306. Owned<IThorException> e = MakeThorException(TE_KERN, "%s", msg);
  1307. e->setSeverity(SeverityAlert);
  1308. e->setAction(tea_warning);
  1309. job.fireException(e);
  1310. }
  1311. }
  1312. };
  1313. return new CPerfMonHook(job, maxLevel, chain);
  1314. }
  1315. bool isOOMException(IException *_e)
  1316. {
  1317. if (_e)
  1318. {
  1319. IThorException *e = QUERYINTERFACE(_e, IThorException);
  1320. IException *oe = e && e->queryOriginalException() ? e->queryOriginalException() : _e;
  1321. int ecode = oe->errorCode();
  1322. if (ecode >= ROXIEMM_ERROR_START && ecode <= ROXIEMM_ERROR_END)
  1323. return true;
  1324. }
  1325. return false;
  1326. }
  1327. IThorException *checkAndCreateOOMContextException(CActivityBase *activity, IException *e, const char *msg, rowcount_t numRows, IOutputMetaData *meta, const void *row)
  1328. {
  1329. VStringBuffer errorMsg("Out of memory whilst %s", msg);
  1330. if (RCUNSET != numRows)
  1331. errorMsg.appendf(", group/set size = %" RCPF "u", numRows);
  1332. if (meta)
  1333. {
  1334. if (meta->isFixedSize())
  1335. errorMsg.appendf(", Fixed rows, size = %d", meta->getFixedSize());
  1336. else
  1337. errorMsg.appendf(", Variable rows, min. size = %d", meta->getMinRecordSize());
  1338. if (row && meta->hasXML())
  1339. {
  1340. CommonXmlWriter xmlwrite(0);
  1341. meta->toXML((byte *) row, xmlwrite);
  1342. errorMsg.newline().append("Leading row of group: ").append(xmlwrite.str());
  1343. }
  1344. }
  1345. Owned<IThorException> te = MakeActivityException(activity, e, "%s", errorMsg.str());
  1346. e->Release();
  1347. return te.getClear();
  1348. }
  1349. RecordTranslationMode getTranslationMode(CActivityBase &activity)
  1350. {
  1351. bool local = true;
  1352. StringBuffer val;
  1353. activity.queryContainer().queryXGMML().getProp("hint[@name=\"layouttranslation\"]/@value", val);
  1354. if (!val.length())
  1355. {
  1356. local = false;
  1357. activity.queryJob().getOpt("layoutTranslation", val);
  1358. if (!val.length())
  1359. globals->getProp("@fieldTranslationEnabled", val);
  1360. }
  1361. return getTranslationMode(val, local);
  1362. }
  1363. void getLayoutTranslations(IConstPointerArrayOf<ITranslator> &translators, const char *fname, IArrayOf<IPartDescriptor> &partDescriptors, RecordTranslationMode translationMode, unsigned expectedFormatCrc, IOutputMetaData *expectedFormat, unsigned projectedFormatCrc, IOutputMetaData *projectedFormat)
  1364. {
  1365. if (0 == partDescriptors.ordinality())
  1366. return;
  1367. IPropertyTree &props = partDescriptors.item(0).queryOwner().queryProperties();
  1368. typedef OwningHTMapping<const ITranslator, unsigned> CITranslatorMapping;
  1369. OwningSimpleHashTableOf<CITranslatorMapping, unsigned> translatorTable;
  1370. ForEachItemIn(p, partDescriptors)
  1371. {
  1372. unsigned publishedFormatCrc = (unsigned)props.getPropInt("@formatCrc", 0);
  1373. Owned<const ITranslator> translatorContainer;
  1374. if (translatorTable.ordinality())
  1375. {
  1376. CITranslatorMapping *entry = translatorTable.find(publishedFormatCrc);
  1377. if (entry)
  1378. translatorContainer.set(&entry->queryElement());
  1379. }
  1380. if (!translatorContainer)
  1381. {
  1382. Owned<IOutputMetaData> publishedFormat = getDaliLayoutInfo(props);
  1383. translatorContainer.setown(getTranslators(fname, expectedFormatCrc, expectedFormat, publishedFormatCrc, publishedFormat, projectedFormatCrc, projectedFormat, translationMode));
  1384. if (translatorContainer)
  1385. translatorTable.replace(*new CITranslatorMapping(*translatorContainer.getLink(), publishedFormatCrc));
  1386. }
  1387. translators.append(translatorContainer.getClear());
  1388. }
  1389. }
  1390. const ITranslator *getLayoutTranslation(const char *fname, IPartDescriptor &partDesc, RecordTranslationMode translationMode, unsigned expectedFormatCrc, IOutputMetaData *expectedFormat, unsigned projectedFormatCrc, IOutputMetaData *projectedFormat)
  1391. {
  1392. IPropertyTree const &props = partDesc.queryOwner().queryProperties();
  1393. Owned<IOutputMetaData> actualFormat = getDaliLayoutInfo(props);
  1394. unsigned publishedFormatCrc = (unsigned)props.getPropInt("@formatCrc", 0);
  1395. return getTranslators(fname, expectedFormatCrc, expectedFormat, publishedFormatCrc, actualFormat, projectedFormatCrc, projectedFormat, translationMode);
  1396. }
  1397. bool isRemoteReadCandidate(const CActivityBase &activity, const RemoteFilename &rfn)
  1398. {
  1399. #ifndef _CONTAINERIZED
  1400. if (!activity.getOptBool(THOROPT_FORCE_REMOTE_DISABLED))
  1401. {
  1402. if (!rfn.isLocal())
  1403. return true;
  1404. StringBuffer localPath;
  1405. rfn.getLocalPath(localPath);
  1406. if (activity.getOptBool(THOROPT_FORCE_REMOTE_READ, testForceRemote(localPath)))
  1407. return true;
  1408. }
  1409. #endif
  1410. return false;
  1411. }
  1412. void checkAndDumpAbortInfo(const char *cmd)
  1413. {
  1414. try
  1415. {
  1416. StringBuffer dumpInfoCmd(cmd);
  1417. if (dumpInfoCmd.length())
  1418. {
  1419. /* add some params that might be useful to script
  1420. * 1) Thor instance name
  1421. * 2) base port
  1422. * 3) exe path
  1423. * 4) PID
  1424. */
  1425. const char *myInstanceName = globals->queryProp("@name");
  1426. unsigned myBasePort = getMachinePortBase();
  1427. StringBuffer exePath(queryCurrentProcessPath());
  1428. if (0 == exePath.length())
  1429. exePath.append("process-name-unknown");
  1430. unsigned pid = GetCurrentProcessId();
  1431. dumpInfoCmd.appendf(" %s %u %s %u", myInstanceName, myBasePort, exePath.str(), pid);
  1432. }
  1433. else
  1434. getDebuggerGetStacksCmd(dumpInfoCmd);
  1435. StringBuffer cmdOutput;
  1436. unsigned retCode = getCommandOutput(cmdOutput, dumpInfoCmd, "slave dump info", globals->queryProp("@allowedPipePrograms"));
  1437. PROGLOG("\n%s, return code = %u\n%s\n", dumpInfoCmd.str(), retCode, cmdOutput.str());
  1438. }
  1439. catch (IException *e)
  1440. {
  1441. EXCLOG(e, nullptr);
  1442. e->Release();
  1443. }
  1444. }
  1445. void checkFileType(CActivityBase *activity, IDistributedFile *file, const char *expectedType, bool throwException)
  1446. {
  1447. if (activity->getOptBool(THOROPT_VALIDATE_FILE_TYPE, true))
  1448. {
  1449. const char *kind = queryFileKind(file);
  1450. if (isEmptyString(kind)) // file has no published kind, can't validate
  1451. return;
  1452. if (!strieq(kind, expectedType))
  1453. {
  1454. Owned<IThorException> e = MakeActivityException(activity, TE_FileTypeMismatch, "File format mismatch reading file: '%s'. Expected type '%s', but file is type '%s'", file->queryLogicalName(), expectedType, kind);
  1455. if (throwException)
  1456. throw e.getClear();
  1457. e->setAction(tea_warning);
  1458. e->setSeverity(SeverityWarning);
  1459. activity->fireException(e); // will propagate to workunit warning
  1460. }
  1461. }
  1462. }
  1463. StringBuffer &getExpertOptPath(const char *opt, StringBuffer &out)
  1464. {
  1465. #ifdef _CONTAINERIZED
  1466. if (opt)
  1467. return out.append("expert/@").append(opt);
  1468. return out.append("expert");
  1469. #else
  1470. if (opt)
  1471. return out.append("Debug/@").append(opt);
  1472. return out.append("Debug");
  1473. #endif
  1474. }
  1475. bool getExpertOptBool(const char *opt, bool dft)
  1476. {
  1477. StringBuffer xpath;
  1478. getExpertOptPath(opt, xpath);
  1479. return globals->getPropBool(xpath, dft);
  1480. }
  1481. __int64 getExpertOptInt64(const char *opt, __int64 dft)
  1482. {
  1483. StringBuffer xpath;
  1484. getExpertOptPath(opt, xpath);
  1485. return globals->getPropInt64(xpath, dft);
  1486. }
  1487. StringBuffer &getExpertOptString(const char *opt, StringBuffer &out)
  1488. {
  1489. StringBuffer xpath;
  1490. getExpertOptPath(opt, xpath);
  1491. globals->getProp(xpath, out);
  1492. return out;
  1493. }