thgraph.cpp 111 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "thgraph.hpp"
  14. #include "jptree.hpp"
  15. #include "commonext.hpp"
  16. #include "dasess.hpp"
  17. #include "jhtree.hpp"
  18. #include "thcodectx.hpp"
  19. #include "thbuf.hpp"
  20. #include "thormisc.hpp"
  21. #include "thbufdef.hpp"
  22. #include "thmem.hpp"
  23. #include "rtlformat.hpp"
  24. #include "thorsoapcall.hpp"
  25. #include "thorport.hpp"
  26. #include "roxiehelper.hpp"
  27. #include "hpccconfig.hpp"
  28. PointerArray createFuncs;
  29. void registerCreateFunc(CreateFunc func)
  30. {
  31. createFuncs.append((void *)func);
  32. }
  33. ///////////////////////////////////
  34. //////
  35. /////
  36. class CThorGraphResult : implements IThorResult, implements IRowWriter, public CInterface
  37. {
  38. CActivityBase &activity;
  39. rowcount_t rowStreamCount;
  40. IOutputMetaData *meta;
  41. Owned<IRowWriterMultiReader> rowBuffer;
  42. IThorRowInterfaces *rowIf;
  43. IEngineRowAllocator *allocator;
  44. bool stopped, readers;
  45. ThorGraphResultType resultType;
  46. void init()
  47. {
  48. stopped = readers = false;
  49. allocator = rowIf->queryRowAllocator();
  50. meta = allocator->queryOutputMeta();
  51. rowStreamCount = 0;
  52. }
  53. class CStreamWriter : implements IRowWriterMultiReader, public CSimpleInterface
  54. {
  55. CThorGraphResult &owner;
  56. CThorExpandingRowArray rows;
  57. public:
  58. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  59. CStreamWriter(CThorGraphResult &_owner, EmptyRowSemantics emptyRowSemantics) : owner(_owner), rows(owner.activity, owner.rowIf, emptyRowSemantics)
  60. {
  61. }
  62. //IRowWriterMultiReader
  63. virtual void putRow(const void *row)
  64. {
  65. rows.append(row);
  66. }
  67. virtual void flush() { }
  68. virtual IRowStream *getReader()
  69. {
  70. return rows.createRowStream(0, (rowidx_t)-1, false);
  71. }
  72. };
  73. public:
  74. IMPLEMENT_IINTERFACE;
  75. CThorGraphResult(CActivityBase &_activity, IThorRowInterfaces *_rowIf, ThorGraphResultType _resultType, unsigned spillPriority) : activity(_activity), rowIf(_rowIf), resultType(_resultType)
  76. {
  77. init();
  78. EmptyRowSemantics emptyRowSemantics;
  79. if (isGrouped())
  80. emptyRowSemantics = ers_eogonly;
  81. else if (isSparse())
  82. emptyRowSemantics = ers_allow;
  83. else
  84. emptyRowSemantics = ers_forbidden;
  85. if (SPILL_PRIORITY_DISABLE == spillPriority)
  86. rowBuffer.setown(new CStreamWriter(*this, emptyRowSemantics));
  87. else
  88. rowBuffer.setown(createOverflowableBuffer(activity, rowIf, emptyRowSemantics, true));
  89. }
  90. // IRowWriter
  91. virtual void putRow(const void *row)
  92. {
  93. assertex(!readers);
  94. ++rowStreamCount;
  95. rowBuffer->putRow(row);
  96. }
  97. virtual void flush() { }
  98. virtual offset_t getPosition() { UNIMPLEMENTED; return 0; }
  99. // IThorResult
  100. virtual IRowWriter *getWriter()
  101. {
  102. return LINK(this);
  103. }
  104. virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
  105. {
  106. assertex(!readers);
  107. rowBuffer.setown(stream);
  108. rowStreamCount = count;
  109. }
  110. virtual IRowStream *getRowStream()
  111. {
  112. readers = true;
  113. return rowBuffer->getReader();
  114. }
  115. virtual IThorRowInterfaces *queryRowInterfaces() { return rowIf; }
  116. virtual CActivityBase *queryActivity() { return &activity; }
  117. virtual bool isDistributed() const { return resultType & thorgraphresult_distributed; }
  118. virtual bool isSparse() const { return resultType & thorgraphresult_sparse; }
  119. virtual bool isGrouped() const { return resultType & thorgraphresult_grouped; }
  120. virtual void serialize(MemoryBuffer &mb)
  121. {
  122. Owned<IRowStream> stream = getRowStream();
  123. bool grouped = meta->isGrouped();
  124. if (grouped)
  125. {
  126. OwnedConstThorRow prev = stream->nextRow();
  127. if (prev)
  128. {
  129. bool eog;
  130. for (;;)
  131. {
  132. eog = false;
  133. OwnedConstThorRow next = stream->nextRow();
  134. if (!next)
  135. eog = true;
  136. size32_t sz = meta->getRecordSize(prev);
  137. mb.append(sz, prev.get());
  138. mb.append(eog);
  139. if (!next)
  140. {
  141. next.setown(stream->nextRow());
  142. if (!next)
  143. break;
  144. }
  145. prev.set(next);
  146. }
  147. }
  148. }
  149. else
  150. {
  151. for (;;)
  152. {
  153. OwnedConstThorRow row = stream->nextRow();
  154. if (!row)
  155. break;
  156. size32_t sz = meta->getRecordSize(row);
  157. mb.append(sz, row.get());
  158. }
  159. }
  160. }
  161. virtual void getResult(size32_t &len, void * & data)
  162. {
  163. MemoryBuffer mb;
  164. serialize(mb);
  165. len = mb.length();
  166. data = mb.detach();
  167. }
  168. virtual void getLinkedResult(unsigned &countResult, const byte * * & result) override
  169. {
  170. assertex(rowStreamCount==((unsigned)rowStreamCount)); // catch, just in case
  171. Owned<IRowStream> stream = getRowStream();
  172. countResult = 0;
  173. OwnedConstThorRow _rowset = allocator->createRowset((unsigned)rowStreamCount);
  174. const void **rowset = (const void **)_rowset.get();
  175. while (countResult < rowStreamCount)
  176. {
  177. OwnedConstThorRow row = stream->nextRow();
  178. rowset[countResult++] = row.getClear();
  179. }
  180. result = (const byte **)_rowset.getClear();
  181. }
  182. virtual const void * getLinkedRowResult()
  183. {
  184. assertex(rowStreamCount==1); // catch, just in case
  185. Owned<IRowStream> stream = getRowStream();
  186. return stream->nextRow();
  187. }
  188. };
  189. /////
  190. IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  191. {
  192. Owned<IThorResult> result = ::createResult(activity, rowIf, resultType, spillPriority);
  193. setResult(id, result);
  194. return result;
  195. }
  196. /////
  197. IThorResult *createResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  198. {
  199. return new CThorGraphResult(activity, rowIf, resultType, spillPriority);
  200. }
  201. /////
  202. class CThorBoundLoopGraph : implements IThorBoundLoopGraph, public CInterface
  203. {
  204. CGraphBase *graph;
  205. Linked<IOutputMetaData> resultMeta;
  206. Owned<IOutputMetaData> counterMeta, loopAgainMeta;
  207. Owned<IThorRowInterfaces> resultRowIf, countRowIf, loopAgainRowIf;
  208. public:
  209. IMPLEMENT_IINTERFACE;
  210. CThorBoundLoopGraph(CGraphBase *_graph, IOutputMetaData * _resultMeta, unsigned _activityId) : graph(_graph), resultMeta(_resultMeta)
  211. {
  212. counterMeta.setown(createFixedSizeMetaData(sizeof(thor_loop_counter_t)));
  213. loopAgainMeta.setown(createFixedSizeMetaData(sizeof(bool)));
  214. }
  215. virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos)
  216. {
  217. if (!countRowIf)
  218. countRowIf.setown(activity.createRowInterfaces(counterMeta));
  219. RtlDynamicRowBuilder counterRow(countRowIf->queryRowAllocator());
  220. thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
  221. *res = loopCounter;
  222. OwnedConstThorRow counterRowFinal = counterRow.finalizeRowClear(sizeof(thor_loop_counter_t));
  223. IThorResult *counterResult = results->createResult(activity, pos, countRowIf, thorgraphresult_nul, SPILL_PRIORITY_DISABLE);
  224. Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
  225. counterResultWriter->putRow(counterRowFinal.getClear());
  226. graph->setLoopCounter(loopCounter);
  227. }
  228. virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
  229. {
  230. if (!loopAgainRowIf)
  231. loopAgainRowIf.setown(activity.createRowInterfaces(loopAgainMeta));
  232. activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, activity.queryGraph().isLocalChild() ? thorgraphresult_nul : thorgraphresult_distributed, SPILL_PRIORITY_DISABLE);
  233. }
  234. virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results)
  235. {
  236. if (!resultRowIf)
  237. resultRowIf.setown(activity.createRowInterfaces(resultMeta));
  238. ThorGraphResultType resultType = activity.queryGraph().isLocalChild() ? thorgraphresult_nul : thorgraphresult_distributed;
  239. activity.queryGraph().createResult(activity, 0, results, resultRowIf, resultType); // loop output
  240. activity.queryGraph().createResult(activity, 1, results, resultRowIf, resultType); // loop input
  241. }
  242. virtual CGraphBase *queryGraph() { return graph; }
  243. };
  244. IThorBoundLoopGraph *createBoundLoopGraph(CGraphBase *graph, IOutputMetaData *resultMeta, unsigned activityId)
  245. {
  246. return new CThorBoundLoopGraph(graph, resultMeta, activityId);
  247. }
  248. ///////////////////////////////////
  249. bool isDiskInput(ThorActivityKind kind)
  250. {
  251. switch (kind)
  252. {
  253. case TAKcsvread:
  254. case TAKxmlread:
  255. case TAKjsonread:
  256. case TAKdiskread:
  257. case TAKdisknormalize:
  258. case TAKdiskaggregate:
  259. case TAKdiskcount:
  260. case TAKdiskgroupaggregate:
  261. case TAKindexread:
  262. case TAKindexcount:
  263. case TAKindexnormalize:
  264. case TAKindexaggregate:
  265. case TAKindexgroupaggregate:
  266. case TAKindexgroupexists:
  267. case TAKindexgroupcount:
  268. case TAKspillread:
  269. return true;
  270. default:
  271. return false;
  272. }
  273. }
  274. void CIOConnection::connect(unsigned which, CActivityBase *destActivity)
  275. {
  276. destActivity->setInput(which, activity->queryActivity(), index);
  277. }
  278. ///////////////////////////////////
  279. CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph)
  280. {
  281. CGraphElementBase *container = NULL;
  282. ForEachItemIn(m, createFuncs)
  283. {
  284. CreateFunc func = (CreateFunc)createFuncs.item(m);
  285. container = func(node, owner, resultsGraph);
  286. if (container) break;
  287. }
  288. if (NULL == container)
  289. {
  290. ThorActivityKind tak = (ThorActivityKind)node.getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
  291. throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(tak));
  292. }
  293. container->setResultsGraph(resultsGraph);
  294. return container;
  295. }
  296. ///////////////////////////////////
  297. CActivityCodeContext::CActivityCodeContext()
  298. {
  299. }
  300. IThorChildGraph * CActivityCodeContext::resolveChildQuery(__int64 gid, IHThorArg * colocal)
  301. {
  302. return parent->getChildGraph((graph_id)gid);
  303. }
  304. IEclGraphResults * CActivityCodeContext::resolveLocalQuery(__int64 gid)
  305. {
  306. if (gid == containerGraph->queryGraphId())
  307. return containerGraph;
  308. else
  309. return ctx->resolveLocalQuery(gid);
  310. }
  311. unsigned CActivityCodeContext::getGraphLoopCounter() const
  312. {
  313. return containerGraph->queryLoopCounter(); // only called if value is valid
  314. }
  315. ISectionTimer * CActivityCodeContext::registerTimer(unsigned activityId, const char * name)
  316. {
  317. if (!stats) // if master context and local CQ, there is no activity instance, and hence no setStats call
  318. return queryNullSectionTimer();
  319. CriticalBlock b(contextCrit);
  320. auto it = functionTimers.find(name);
  321. if (it != functionTimers.end())
  322. return it->second;
  323. ISectionTimer *timer = ThorSectionTimer::createTimer(*stats, name);
  324. functionTimers.insert({name, timer}); // owns
  325. return timer;
  326. }
  327. ///////////////////////////////////
  328. CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml, CGraphBase *_resultsGraph)
  329. : owner(&_owner), resultsGraph(_resultsGraph)
  330. {
  331. xgmml.setown(createPTreeFromIPT(&_xgmml));
  332. eclText.set(xgmml->queryProp("att[@name=\"ecl\"]/@value"));
  333. id = xgmml->getPropInt("@id", 0);
  334. kind = (ThorActivityKind)xgmml->getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
  335. sink = isActivitySink(kind);
  336. bool coLocal = xgmml->getPropBool("att[@name=\"coLocal\"]/@value", false);
  337. isLocalData = xgmml->getPropBool("att[@name=\"local\"]/@value", false); // local execute + local data access only
  338. isLocal = isLocalData || coLocal; // local execute
  339. isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
  340. ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
  341. onCreateCalled = prepared = haveCreateCtx = nullAct = false;
  342. onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
  343. isCodeSigned = isActivityCodeSigned(_xgmml);
  344. StringBuffer helperName("fAc");
  345. xgmml->getProp("@id", helperName);
  346. helperFactory = (EclHelperFactory) queryJob().queryDllEntry().getEntry(helperName.str());
  347. if (!helperFactory)
  348. throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
  349. alreadyUpdated = false;
  350. whichBranch = (unsigned)-1;
  351. log = true;
  352. sentActInitData.setown(createThreadSafeBitSet());
  353. maxCores = queryXGMML().getPropInt("hint[@name=\"max_cores\"]/@value", 0);
  354. if (0 == maxCores)
  355. maxCores = queryJob().queryMaxDefaultActivityCores();
  356. baseHelper.setown(helperFactory());
  357. CGraphBase *graphContainer = resultsGraph;
  358. if (!graphContainer)
  359. graphContainer = owner;
  360. activityCodeContext.setContext(owner, graphContainer, &queryJobChannel().queryCodeContext());
  361. }
  362. CGraphElementBase::~CGraphElementBase()
  363. {
  364. activity.clear();
  365. baseHelper.clear(); // clear before dll is unloaded
  366. }
  367. CJobBase &CGraphElementBase::queryJob() const
  368. {
  369. return owner->queryJob();
  370. }
  371. CJobChannel &CGraphElementBase::queryJobChannel() const
  372. {
  373. return owner->queryJobChannel();
  374. }
  375. IGraphTempHandler *CGraphElementBase::queryTempHandler(bool assert) const
  376. {
  377. if (resultsGraph)
  378. return resultsGraph->queryTempHandler(assert);
  379. else
  380. return queryJob().queryTempHandler(assert);
  381. }
  382. void CGraphElementBase::releaseIOs()
  383. {
  384. loopGraph.clear();
  385. if (activity)
  386. activity->releaseIOs();
  387. connectedInputs.kill();
  388. inputs.kill();
  389. outputs.kill();
  390. activity.clear();
  391. }
  392. void CGraphElementBase::addDependsOn(CGraphBase *graph, int controlId)
  393. {
  394. ForEachItemIn(i, dependsOn)
  395. {
  396. if (dependsOn.item(i).graph == graph)
  397. return;
  398. }
  399. dependsOn.append(*new CGraphDependency(graph, controlId));
  400. }
  401. StringBuffer &CGraphElementBase::getOpt(const char *prop, StringBuffer &out) const
  402. {
  403. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  404. if (!queryXGMML().getProp(path.toLowerCase().str(), out))
  405. queryJob().getOpt(prop, out);
  406. return out;
  407. }
  408. bool CGraphElementBase::getOptBool(const char *prop, bool defVal) const
  409. {
  410. bool def = queryJob().getOptBool(prop, defVal);
  411. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  412. return queryXGMML().getPropBool(path.toLowerCase().str(), def);
  413. }
  414. int CGraphElementBase::getOptInt(const char *prop, int defVal) const
  415. {
  416. int def = queryJob().getOptInt(prop, defVal);
  417. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  418. return queryXGMML().getPropInt(path.toLowerCase().str(), def);
  419. }
  420. __int64 CGraphElementBase::getOptInt64(const char *prop, __int64 defVal) const
  421. {
  422. __int64 def = queryJob().getOptInt64(prop, defVal);
  423. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  424. return queryXGMML().getPropInt64(path.toLowerCase().str(), def);
  425. }
  426. IThorGraphDependencyIterator *CGraphElementBase::getDependsIterator() const
  427. {
  428. return new ArrayIIteratorOf<const CGraphDependencyArray, CGraphDependency, IThorGraphDependencyIterator>(dependsOn);
  429. }
  430. bool CGraphElementBase::inChildQuery() const
  431. {
  432. return (nullptr != queryOwner().queryOwner()) && !queryOwner().isGlobal();
  433. }
  434. void CGraphElementBase::reset()
  435. {
  436. alreadyUpdated = false;
  437. if (activity)
  438. activity->reset();
  439. }
  440. void CGraphElementBase::ActPrintLog(const char *format, ...)
  441. {
  442. va_list args;
  443. va_start(args, format);
  444. ::ActPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
  445. va_end(args);
  446. }
  447. void CGraphElementBase::ActPrintLog(IException *e, const char *format, ...)
  448. {
  449. va_list args;
  450. va_start(args, format);
  451. ::ActPrintLogArgs(this, e, thorlog_all, MCexception(e), format, args);
  452. va_end(args);
  453. }
  454. void CGraphElementBase::ActPrintLog(IException *e)
  455. {
  456. ActPrintLog(e, "%s", "");
  457. }
  458. void CGraphElementBase::abort(IException *e)
  459. {
  460. CActivityBase *activity = queryActivity();
  461. if (activity)
  462. activity->abort();
  463. }
  464. void CGraphElementBase::doconnect()
  465. {
  466. ForEachItemIn(i, connectedInputs)
  467. {
  468. CIOConnection *io = connectedInputs.item(i);
  469. if (io)
  470. io->connect(i, queryActivity());
  471. }
  472. }
  473. void CGraphElementBase::clearConnections()
  474. {
  475. connectedInputs.kill();
  476. connectedOutputs.kill();
  477. if (activity)
  478. activity->clearConnections();
  479. }
  480. void CGraphElementBase::addInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
  481. {
  482. while (inputs.ordinality()<=input) inputs.append(NULL);
  483. inputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
  484. while (inputAct->outputs.ordinality()<=inputOutIdx) inputAct->outputs.append(NULL);
  485. inputAct->outputs.replace(new CIOConnection(this, input), inputOutIdx);
  486. }
  487. void CGraphElementBase::connectInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
  488. {
  489. ::ActPrintLog(this, thorDetailedLogLevel, "CONNECTING (id=%" ACTPF "d, idx=%d) to (id=%" ACTPF "d, idx=%d)", inputAct->queryId(), inputOutIdx, queryId(), input);
  490. while (connectedInputs.ordinality()<=input) connectedInputs.append(NULL);
  491. connectedInputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
  492. while (inputAct->connectedOutputs.ordinality()<=inputOutIdx) inputAct->connectedOutputs.append(NULL);
  493. inputAct->connectedOutputs.replace(new CIOConnection(this, input), inputOutIdx);
  494. }
  495. void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
  496. {
  497. if (!onCreateCalled) return;
  498. DelayedSizeMarker sizeMark(mb);
  499. queryHelper()->serializeCreateContext(mb);
  500. sizeMark.write();
  501. if (isSink())
  502. mb.append(alreadyUpdated);
  503. }
  504. void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
  505. {
  506. size32_t createCtxLen;
  507. mb.read(createCtxLen);
  508. createCtxMb.clear().append(createCtxLen, mb.readDirect(createCtxLen));
  509. haveCreateCtx = true;
  510. if (isSink())
  511. mb.read(alreadyUpdated);
  512. }
  513. void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
  514. {
  515. DelayedSizeMarker sizeMark(mb);
  516. queryHelper()->serializeStartContext(mb);
  517. sizeMark.write();
  518. }
  519. void CGraphElementBase::onCreate()
  520. {
  521. CriticalBlock b(crit);
  522. if (onCreateCalled)
  523. return;
  524. onCreateCalled = true;
  525. if (!nullAct)
  526. {
  527. CGraphElementBase *ownerActivity = owner->queryOwner() ? owner->queryOwner()->queryElement(ownerId) : NULL;
  528. if (ownerActivity)
  529. {
  530. ownerActivity->onCreate(); // ensure owner created, might not be if this is child query inside another child query.
  531. baseHelper->onCreate(queryCodeContext(), ownerActivity->queryHelper(), haveCreateCtx?&createCtxMb:NULL);
  532. }
  533. else
  534. baseHelper->onCreate(queryCodeContext(), NULL, haveCreateCtx?&createCtxMb:NULL);
  535. if (isLoopActivity(*this))
  536. {
  537. unsigned loopId = queryXGMML().getPropInt("att[@name=\"_loopid\"]/@value");
  538. Owned<CGraphStub> stub = owner->getChildGraph(loopId);
  539. Owned<IThorBoundLoopGraph> boundLoopGraph = createBoundLoopGraph(&stub->queryOriginalGraph(), baseHelper->queryOutputMeta(), queryId());
  540. setBoundGraph(boundLoopGraph);
  541. }
  542. }
  543. }
  544. void CGraphElementBase::onStart(size32_t parentExtractSz, const byte *parentExtract, MemoryBuffer *startCtx)
  545. {
  546. if (nullAct)
  547. return;
  548. CriticalBlock b(crit);
  549. baseHelper->onStart(parentExtract, startCtx);
  550. }
  551. bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async)
  552. {
  553. Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
  554. ForEach(*deps)
  555. {
  556. CGraphDependency &dep = deps->query();
  557. if (dep.controlId == controlId)
  558. dep.graph->execute(parentExtractSz, parentExtract, true, async);
  559. if (owner->queryJob().queryAborted() || owner->queryAborted()) return false;
  560. }
  561. return true;
  562. }
  563. bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async, bool connectOnly)
  564. {
  565. try
  566. {
  567. bool create = true;
  568. if (connectOnly)
  569. {
  570. if (prepared)
  571. return true;
  572. ForEachItemIn(i, inputs)
  573. {
  574. if (!queryInput(i)->prepareContext(parentExtractSz, parentExtract, false, false, async, true))
  575. return false;
  576. }
  577. }
  578. else
  579. {
  580. bool _shortCircuit = shortCircuit;
  581. Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
  582. bool depsDone = true;
  583. ForEach(*deps)
  584. {
  585. CGraphDependency &dep = deps->query();
  586. if (0 == dep.controlId && NotFound == owner->dependentSubGraphs.find(*dep.graph))
  587. {
  588. owner->dependentSubGraphs.append(*dep.graph);
  589. if (!dep.graph->isComplete())
  590. depsDone = false;
  591. }
  592. }
  593. if (depsDone) _shortCircuit = false;
  594. if (!depsDone && checkDependencies)
  595. {
  596. if (!executeDependencies(parentExtractSz, parentExtract, 0, async))
  597. return false;
  598. }
  599. whichBranch = (unsigned)-1;
  600. switch (getKind())
  601. {
  602. case TAKindexwrite:
  603. case TAKdiskwrite:
  604. case TAKcsvwrite:
  605. case TAKxmlwrite:
  606. case TAKjsonwrite:
  607. case TAKspillwrite:
  608. if (_shortCircuit) return true;
  609. onCreate();
  610. alreadyUpdated = checkUpdate();
  611. if (alreadyUpdated)
  612. return false;
  613. break;
  614. case TAKchildif:
  615. case TAKif:
  616. case TAKifaction:
  617. {
  618. if (_shortCircuit) return true;
  619. onCreate();
  620. onStart(parentExtractSz, parentExtract);
  621. IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
  622. whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
  623. /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
  624. * It should be removed, once we are positive there are no issues with in-line conditional actions
  625. */
  626. if (TAKifaction == getKind())
  627. {
  628. if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
  629. return false;
  630. create = false;
  631. }
  632. break;
  633. }
  634. case TAKchildcase:
  635. case TAKcase:
  636. {
  637. if (_shortCircuit) return true;
  638. onCreate();
  639. onStart(parentExtractSz, parentExtract);
  640. IHThorCaseArg *helper = (IHThorCaseArg *)baseHelper.get();
  641. whichBranch = helper->getBranch();
  642. if (whichBranch >= inputs.ordinality())
  643. whichBranch = inputs.ordinality()-1;
  644. break;
  645. }
  646. case TAKfilter:
  647. case TAKfiltergroup:
  648. case TAKfilterproject:
  649. {
  650. if (_shortCircuit) return true;
  651. onCreate();
  652. onStart(parentExtractSz, parentExtract);
  653. switch (getKind())
  654. {
  655. case TAKfilter:
  656. whichBranch = ((IHThorFilterArg *)baseHelper.get())->canMatchAny() ? 0 : 1;
  657. break;
  658. case TAKfiltergroup:
  659. whichBranch = ((IHThorFilterGroupArg *)baseHelper.get())->canMatchAny() ? 0 : 1;
  660. break;
  661. case TAKfilterproject:
  662. whichBranch = ((IHThorFilterProjectArg *)baseHelper.get())->canMatchAny() ? 0 : 1;
  663. break;
  664. default:
  665. break;
  666. }
  667. break;
  668. }
  669. case TAKsequential:
  670. case TAKparallel:
  671. {
  672. /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
  673. * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
  674. */
  675. for (unsigned s=1; s<=dependsOn.ordinality(); s++)
  676. executeDependencies(parentExtractSz, parentExtract, s, async);
  677. create = false;
  678. break;
  679. }
  680. case TAKwhen_dataset:
  681. case TAKwhen_action:
  682. {
  683. if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
  684. return false;
  685. if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
  686. return false;
  687. break;
  688. }
  689. default:
  690. break;
  691. }
  692. if (isActivitySink(getKind()))
  693. {
  694. // Suppress executing internal sinks with 0 generated dependencies
  695. if (queryXGMML().getPropBool("att[@name='_internal']/@value", false) && (0 == owner->queryDependents(id)))
  696. return false;
  697. }
  698. if (checkDependencies && ((unsigned)-1 != whichBranch))
  699. {
  700. if (inputs.queryItem(whichBranch))
  701. {
  702. if (!queryInput(whichBranch)->prepareContext(parentExtractSz, parentExtract, true, false, async, connectOnly))
  703. return false;
  704. }
  705. ForEachItemIn(i, inputs)
  706. {
  707. if (i != whichBranch)
  708. {
  709. if (!queryInput(i)->prepareContext(parentExtractSz, parentExtract, false, false, async, true))
  710. return false;
  711. }
  712. }
  713. }
  714. else
  715. {
  716. ForEachItemIn(i, inputs)
  717. {
  718. if (!queryInput(i)->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async, connectOnly))
  719. return false;
  720. }
  721. }
  722. }
  723. if (create)
  724. {
  725. if (prepared) // no need to recreate
  726. return true;
  727. prepared = true;
  728. ForEachItemIn(i2, inputs)
  729. {
  730. CIOConnection *inputIO = inputs.item(i2);
  731. connectInput(i2, inputIO->activity, inputIO->index);
  732. }
  733. createActivity();
  734. }
  735. return true;
  736. }
  737. catch (IException *_e)
  738. {
  739. IThorException *e = QUERYINTERFACE(_e, IThorException);
  740. if (e)
  741. {
  742. if (!e->queryActivityId())
  743. setExceptionActivityInfo(*this, e);
  744. }
  745. else
  746. {
  747. e = MakeActivityException(this, _e);
  748. _e->Release();
  749. }
  750. throw e;
  751. }
  752. }
  753. void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
  754. {
  755. activity->preStart(parentExtractSz, parentExtract);
  756. }
  757. void CGraphElementBase::createActivity()
  758. {
  759. CriticalBlock b(crit);
  760. if (activity)
  761. return;
  762. activity.setown(factory());
  763. activityCodeContext.setStats(&activity->queryStats());
  764. if (isSink())
  765. owner->addActiveSink(*this);
  766. }
  767. ICodeContextExt *CGraphElementBase::queryCodeContext()
  768. {
  769. return &activityCodeContext;
  770. }
  771. /////
  772. // JCSMORE loop - probably need better way to check if any act in graph is global(meaning needs some synchronization between slaves in activity execution)
  773. bool isGlobalActivity(CGraphElementBase &container)
  774. {
  775. switch (container.getKind())
  776. {
  777. // always global, but only co-ordinate init/done
  778. case TAKcsvwrite:
  779. case TAKxmlwrite:
  780. case TAKjsonwrite:
  781. case TAKindexwrite:
  782. case TAKkeydiff:
  783. case TAKkeypatch:
  784. case TAKdictionaryworkunitwrite:
  785. return true;
  786. case TAKdiskwrite:
  787. {
  788. Owned<IHThorDiskWriteArg> helper = (IHThorDiskWriteArg *)container.helperFactory();
  789. unsigned flags = helper->getFlags();
  790. return (0 == (TDXtemporary & flags)); // global if not temporary
  791. }
  792. case TAKspillwrite:
  793. case TAKspill:
  794. return false;
  795. case TAKcsvread:
  796. {
  797. Owned<IHThorCsvReadArg> helper = (IHThorCsvReadArg *)container.helperFactory();
  798. // if header lines, then [may] need to co-ordinate across slaves
  799. if (container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
  800. return false;
  801. return helper->queryCsvParameters()->queryHeaderLen() > 0;
  802. }
  803. // dependent on child acts?
  804. case TAKlooprow:
  805. case TAKloopcount:
  806. case TAKgraphloop:
  807. case TAKparallelgraphloop:
  808. case TAKloopdataset:
  809. case TAKexternalsink:
  810. case TAKexternalsource:
  811. case TAKexternalprocess:
  812. return false;
  813. // dependent on local/grouped
  814. case TAKkeyeddistribute:
  815. case TAKhashdistribute:
  816. case TAKhashdistributemerge:
  817. case TAKnwaydistribute:
  818. case TAKworkunitwrite:
  819. case TAKdistribution:
  820. case TAKpartition:
  821. case TAKdiskaggregate:
  822. case TAKdiskcount:
  823. case TAKdiskgroupaggregate:
  824. case TAKindexaggregate:
  825. case TAKindexcount:
  826. case TAKindexgroupaggregate:
  827. case TAKindexgroupexists:
  828. case TAKindexgroupcount:
  829. case TAKremoteresult:
  830. case TAKcountproject:
  831. case TAKcreaterowlimit:
  832. case TAKskiplimit:
  833. case TAKlimit:
  834. case TAKsort:
  835. case TAKdedup:
  836. case TAKjoin:
  837. case TAKselfjoin:
  838. case TAKhashjoin:
  839. case TAKsmartjoin:
  840. case TAKkeyeddenormalize:
  841. case TAKhashdenormalize:
  842. case TAKdenormalize:
  843. case TAKlookupdenormalize: //GH->JCS why are these here, and join not?
  844. case TAKalldenormalize:
  845. case TAKsmartdenormalize:
  846. case TAKdenormalizegroup:
  847. case TAKhashdenormalizegroup:
  848. case TAKlookupdenormalizegroup:
  849. case TAKkeyeddenormalizegroup:
  850. case TAKalldenormalizegroup:
  851. case TAKsmartdenormalizegroup:
  852. case TAKaggregate:
  853. case TAKexistsaggregate:
  854. case TAKcountaggregate:
  855. case TAKhashaggregate:
  856. case TAKhashdedup:
  857. case TAKrollup:
  858. case TAKiterate:
  859. case TAKselectn:
  860. case TAKfirstn:
  861. case TAKenth:
  862. case TAKsample:
  863. case TAKgroup:
  864. case TAKchoosesets:
  865. case TAKchoosesetsenth:
  866. case TAKchoosesetslast:
  867. case TAKtopn:
  868. case TAKprocess:
  869. case TAKchildcount:
  870. case TAKwhen_dataset:
  871. case TAKwhen_action:
  872. case TAKnonempty:
  873. case TAKifaction:
  874. if (!container.queryLocalOrGrouped())
  875. return true;
  876. break;
  877. case TAKkeyedjoin:
  878. case TAKalljoin:
  879. case TAKlookupjoin:
  880. if (!container.queryLocal())
  881. return true;
  882. // always local
  883. case TAKfilter:
  884. case TAKfilterproject:
  885. case TAKfiltergroup:
  886. case TAKsplit:
  887. case TAKpipewrite:
  888. case TAKdegroup:
  889. case TAKproject:
  890. case TAKprefetchproject:
  891. case TAKprefetchcountproject:
  892. case TAKnormalize:
  893. case TAKnormalizechild:
  894. case TAKnormalizelinkedchild:
  895. case TAKpipethrough:
  896. case TAKif:
  897. case TAKchildif:
  898. case TAKchildcase:
  899. case TAKcase:
  900. case TAKparse:
  901. case TAKpiperead:
  902. case TAKxmlparse:
  903. case TAKjoinlight:
  904. case TAKselfjoinlight:
  905. case TAKdiskread:
  906. case TAKdisknormalize:
  907. case TAKchildaggregate:
  908. case TAKchildgroupaggregate:
  909. case TAKchildthroughnormalize:
  910. case TAKchildnormalize:
  911. case TAKspillread:
  912. case TAKindexread:
  913. case TAKindexnormalize:
  914. case TAKxmlread:
  915. case TAKjsonread:
  916. case TAKdiskexists:
  917. case TAKindexexists:
  918. case TAKchildexists:
  919. case TAKthroughaggregate:
  920. case TAKmerge:
  921. case TAKfunnel:
  922. case TAKregroup:
  923. case TAKcombine:
  924. case TAKrollupgroup:
  925. case TAKcombinegroup:
  926. case TAKsoap_rowdataset:
  927. case TAKhttp_rowdataset:
  928. case TAKsoap_rowaction:
  929. case TAKsoap_datasetdataset:
  930. case TAKsoap_datasetaction:
  931. case TAKlinkedrawiterator:
  932. case TAKchilditerator:
  933. case TAKstreamediterator:
  934. case TAKworkunitread:
  935. case TAKchilddataset:
  936. case TAKinlinetable:
  937. case TAKnull:
  938. case TAKemptyaction:
  939. case TAKlocalresultread:
  940. case TAKlocalresultwrite:
  941. case TAKdictionaryresultwrite:
  942. case TAKgraphloopresultread:
  943. case TAKgraphloopresultwrite:
  944. case TAKnwaygraphloopresultread:
  945. case TAKapply:
  946. case TAKsideeffect:
  947. case TAKsimpleaction:
  948. case TAKsorted:
  949. case TAKdistributed:
  950. case TAKtrace:
  951. break;
  952. case TAKnwayjoin:
  953. case TAKnwaymerge:
  954. case TAKnwaymergejoin:
  955. case TAKnwayinput:
  956. case TAKnwayselect:
  957. return false; // JCSMORE - I think and/or have to be for now
  958. // undefined
  959. case TAKdatasetresult:
  960. case TAKrowresult:
  961. case TAKremotegraph:
  962. case TAKlibrarycall:
  963. default:
  964. return true; // if in doubt
  965. }
  966. return false;
  967. }
  968. bool isLoopActivity(CGraphElementBase &container)
  969. {
  970. switch (container.getKind())
  971. {
  972. case TAKlooprow:
  973. case TAKloopcount:
  974. case TAKloopdataset:
  975. case TAKgraphloop:
  976. case TAKparallelgraphloop:
  977. return true;
  978. default:
  979. break;
  980. }
  981. return false;
  982. }
  983. static void getGlobalDeps(CGraphBase &graph, CICopyArrayOf<CGraphDependency> &deps)
  984. {
  985. Owned<IThorActivityIterator> iter = graph.getIterator();
  986. ForEach(*iter)
  987. {
  988. CGraphElementBase &elem = iter->query();
  989. Owned<IThorGraphDependencyIterator> dependIterator = elem.getDependsIterator();
  990. ForEach(*dependIterator)
  991. {
  992. CGraphDependency &dependency = dependIterator->query();
  993. if (dependency.graph->isGlobal() && NULL==dependency.graph->queryOwner())
  994. deps.append(dependency);
  995. getGlobalDeps(*dependency.graph, deps);
  996. }
  997. }
  998. }
  999. static void addDependencies(IPropertyTree *xgmml, bool failIfMissing, CGraphTableCopy &graphs)
  1000. {
  1001. CGraphArrayCopy dependentchildGraphs;
  1002. CGraphElementArrayCopy targetActivities, sourceActivities;
  1003. Owned<IPropertyTreeIterator> iter = xgmml->getElements("edge");
  1004. ForEach(*iter)
  1005. {
  1006. IPropertyTree &edge = iter->query();
  1007. graph_id sourceGid = edge.getPropInt("@source");
  1008. graph_id targetGid = edge.getPropInt("@target");
  1009. Owned<CGraphBase> source = LINK(graphs.find(sourceGid));
  1010. Owned<CGraphBase> target = LINK(graphs.find(targetGid));
  1011. if (!source || !target)
  1012. {
  1013. if (failIfMissing)
  1014. throwUnexpected();
  1015. else
  1016. continue; // expected if assigning dependencies in slaves
  1017. }
  1018. CGraphElementBase *targetActivity = (CGraphElementBase *)target->queryElement(edge.getPropInt("att[@name=\"_targetActivity\"]/@value"));
  1019. CGraphElementBase *sourceActivity = (CGraphElementBase *)source->queryElement(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value"));
  1020. if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value"))
  1021. {
  1022. if (TAKlocalresultwrite == sourceActivity->getKind() && (TAKlocalresultread != targetActivity->getKind()))
  1023. {
  1024. if (source->isLoopSubGraph())
  1025. source->setGlobal(true);
  1026. }
  1027. }
  1028. int controlId = 0;
  1029. if (edge.getPropBool("att[@name=\"_dependsOn\"]/@value", false))
  1030. {
  1031. if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
  1032. controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  1033. CGraphBase &sourceGraph = sourceActivity->queryOwner();
  1034. unsigned sourceGraphContext = sourceGraph.queryParentActivityId();
  1035. CGraphBase *targetGraph = NULL;
  1036. unsigned targetGraphContext = -1;
  1037. for (;;)
  1038. {
  1039. targetGraph = &targetActivity->queryOwner();
  1040. targetGraphContext = targetGraph->queryParentActivityId();
  1041. if (sourceGraphContext == targetGraphContext)
  1042. break;
  1043. targetActivity = targetGraph->queryElement(targetGraphContext);
  1044. }
  1045. assertex(targetActivity && sourceActivity);
  1046. source->noteDependency(targetActivity, sourceActivity, controlId, true);
  1047. }
  1048. else if (edge.getPropBool("att[@name=\"_conditionSource\"]/@value", false))
  1049. { /* Ignore it */ }
  1050. else if (edge.getPropBool("att[@name=\"_childGraph\"]/@value", false))
  1051. {
  1052. // NB: any dependencies of the child acts. are dependencies of this act.
  1053. dependentchildGraphs.append(*source);
  1054. targetActivities.append(*targetActivity);
  1055. sourceActivities.append(*sourceActivity);
  1056. }
  1057. else
  1058. {
  1059. if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
  1060. controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  1061. source->noteDependency(targetActivity, sourceActivity, controlId, true);
  1062. }
  1063. }
  1064. ForEachItemIn(c, dependentchildGraphs)
  1065. {
  1066. CGraphBase &childGraph = dependentchildGraphs.item(c);
  1067. CGraphElementBase &targetActivity = targetActivities.item(c);
  1068. CGraphElementBase &sourceActivity = sourceActivities.item(c);
  1069. if (!childGraph.isGlobal())
  1070. {
  1071. CICopyArrayOf<CGraphDependency> globalChildGraphDeps;
  1072. getGlobalDeps(childGraph, globalChildGraphDeps);
  1073. ForEachItemIn(gcd, globalChildGraphDeps)
  1074. {
  1075. CGraphDependency &globalDep = globalChildGraphDeps.item(gcd);
  1076. childGraph.noteDependency(&targetActivity, &sourceActivity, globalDep.controlId, true);
  1077. }
  1078. }
  1079. }
  1080. SuperHashIteratorOf<CGraphBase> allIter(graphs);
  1081. ForEach(allIter)
  1082. {
  1083. CGraphBase &subGraph = allIter.query();
  1084. if (subGraph.queryOwner() && subGraph.queryParentActivityId())
  1085. {
  1086. CGraphElementBase *parentElement = subGraph.queryOwner()->queryElement(subGraph.queryParentActivityId());
  1087. if (isLoopActivity(*parentElement))
  1088. {
  1089. if (!parentElement->queryOwner().isLocalChild() && !subGraph.isLocalOnly())
  1090. subGraph.setGlobal(true);
  1091. }
  1092. }
  1093. }
  1094. }
  1095. void traceMemUsage()
  1096. {
  1097. StringBuffer memStatsStr;
  1098. roxiemem::memstats(memStatsStr);
  1099. LOG(MCthorDetailedDebugInfo, thorJob, "Roxiemem stats: %s", memStatsStr.str());
  1100. memsize_t heapUsage = getMapInfo("heap");
  1101. if (heapUsage) // if 0, assumed to be unavailable
  1102. {
  1103. memsize_t rmtotal = roxiemem::getTotalMemoryLimit();
  1104. LOG(MCthorDetailedDebugInfo, thorJob, "Heap usage (excluding Roxiemem) : %" I64F "d bytes", (unsigned __int64)(heapUsage-rmtotal));
  1105. }
  1106. }
  1107. /////
  1108. CGraphBase::CGraphBase(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(_jobChannel.queryJob()), progressUpdated(false)
  1109. {
  1110. xgmml = NULL;
  1111. parent = owner = graphResultsContainer = NULL;
  1112. complete = false;
  1113. parentActivityId = 0;
  1114. connected = started = graphDone = aborted = false;
  1115. startBarrier = waitBarrier = doneBarrier = NULL;
  1116. mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
  1117. executeReplyTag = TAG_NULL;
  1118. parentExtractSz = 0;
  1119. counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
  1120. loopBodySubgraph = false;
  1121. sourceActDependents.setown(createPTree());
  1122. }
  1123. CGraphBase::~CGraphBase()
  1124. {
  1125. clean();
  1126. }
  1127. CGraphBase *CGraphBase::cloneGraph()
  1128. {
  1129. Owned<CGraphBase> subGraph = queryJobChannel().createGraph();
  1130. CGraphTableCopy newGraphs;
  1131. subGraph->createFromXGMML(node, owner, parent, graphResultsContainer, newGraphs);
  1132. addDependencies(queryJob().queryXGMML(), false, newGraphs);
  1133. return subGraph.getClear();
  1134. }
  1135. void CGraphBase::init()
  1136. {
  1137. bool log = queryJob().queryForceLogging(queryGraphId(), (NULL == queryOwner()) || isGlobal());
  1138. setLogging(log);
  1139. }
  1140. void CGraphBase::clean()
  1141. {
  1142. ::Release(startBarrier);
  1143. ::Release(waitBarrier);
  1144. ::Release(doneBarrier);
  1145. localResults.clear();
  1146. graphLoopResults.clear();
  1147. childGraphsTable.releaseAll();
  1148. disconnectActivities();
  1149. containers.releaseAll();
  1150. sinks.kill();
  1151. activeSinks.kill();
  1152. }
  1153. void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
  1154. {
  1155. DelayedSizeMarker sizeMark(mb);
  1156. Owned<IThorActivityIterator> iter = getIterator();
  1157. ForEach (*iter)
  1158. {
  1159. CGraphElementBase &element = iter->query();
  1160. if (element.isOnCreated())
  1161. {
  1162. mb.append(element.queryId());
  1163. element.serializeCreateContext(mb);
  1164. }
  1165. }
  1166. mb.append((activity_id)0);
  1167. sizeMark.write();
  1168. }
  1169. void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
  1170. {
  1171. activity_id id;
  1172. for (;;)
  1173. {
  1174. mb.read(id);
  1175. if (0 == id) break;
  1176. CGraphElementBase *element = queryElement(id);
  1177. assertex(element);
  1178. element->deserializeCreateContext(mb);
  1179. }
  1180. }
  1181. void CGraphBase::reset()
  1182. {
  1183. setCompleteEx(false);
  1184. clearProgressUpdated();
  1185. graphCancelHandler.reset();
  1186. if (0 == containers.count())
  1187. {
  1188. Owned<IThorGraphIterator> iter = getChildGraphIterator();
  1189. ForEach(*iter)
  1190. iter->query().reset();
  1191. }
  1192. else
  1193. {
  1194. Owned<IThorActivityIterator> iter = getIterator();
  1195. ForEach(*iter)
  1196. {
  1197. CGraphElementBase &element = iter->query();
  1198. element.reset();
  1199. }
  1200. dependentSubGraphs.kill();
  1201. }
  1202. if (!queryOwner())
  1203. clearNodeStats();
  1204. }
  1205. void CGraphBase::addChildGraph(CGraphStub *stub)
  1206. {
  1207. CriticalBlock b(crit);
  1208. childGraphsTable.replace(*LINK(stub));
  1209. if (sequential)
  1210. orderedChildGraphs.append(*stub);
  1211. }
  1212. IThorGraphStubIterator *CGraphBase::getChildStubIterator() const
  1213. {
  1214. CriticalBlock b(crit);
  1215. class CIter : private SuperHashIteratorOf<CGraphStub>, public CSimpleInterfaceOf<IThorGraphStubIterator>
  1216. {
  1217. typedef SuperHashIteratorOf<CGraphStub> PARENT;
  1218. public:
  1219. CIter(const CChildGraphTable &table) : PARENT(table) { }
  1220. // IIterator
  1221. virtual bool first() { return PARENT::first(); }
  1222. virtual bool next() { return PARENT::next(); }
  1223. virtual bool isValid() { return PARENT::isValid(); }
  1224. virtual CGraphStub &query() { return PARENT::query(); }
  1225. };
  1226. return new CIter(childGraphsTable);
  1227. }
  1228. void CGraphBase::noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, unsigned controlId, bool interGraph)
  1229. {
  1230. if (interGraph)
  1231. targetActivity->addDependsOn(this, controlId);
  1232. // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
  1233. VStringBuffer srcActStr("act%u", sourceActivity->queryId());
  1234. sourceActDependents->setPropInt(srcActStr, sourceActDependents->getPropInt(srcActStr)+1);
  1235. }
  1236. unsigned CGraphBase::queryDependents(unsigned sourceActId)
  1237. {
  1238. VStringBuffer srcActStr("act%u", sourceActId);
  1239. return sourceActDependents->getPropInt(srcActStr);
  1240. }
  1241. IThorGraphIterator *CGraphBase::getChildGraphIterator() const
  1242. {
  1243. CriticalBlock b(crit);
  1244. class CIter : public CSimpleInterfaceOf<IThorGraphIterator>
  1245. {
  1246. Owned<IThorGraphStubIterator> iter;
  1247. public:
  1248. CIter(IThorGraphStubIterator *_iter) : iter(_iter)
  1249. {
  1250. }
  1251. // IIterator
  1252. virtual bool first() { return iter->first(); }
  1253. virtual bool next() { return iter->next(); }
  1254. virtual bool isValid() { return iter->isValid(); }
  1255. virtual CGraphBase &query()
  1256. {
  1257. CGraphStub &stub = iter->query();
  1258. return stub.queryOriginalGraph();
  1259. }
  1260. };
  1261. return new CIter(getChildStubIterator());
  1262. }
  1263. bool CGraphBase::fireException(IException *e)
  1264. {
  1265. return queryJobChannel().fireException(e);
  1266. }
  1267. bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
  1268. {
  1269. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1270. ForEach(*iter)
  1271. {
  1272. CGraphElementBase &element = iter->query();
  1273. element.preStart(parentExtractSz, parentExtract);
  1274. }
  1275. return true;
  1276. }
  1277. void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  1278. {
  1279. CriticalBlock b(executeCrit);
  1280. if (job.queryPausing())
  1281. return;
  1282. Owned<IException> exception;
  1283. try
  1284. {
  1285. if (!queryOwner())
  1286. {
  1287. if (!REJECTLOG(MCthorDetailedDebugInfo))
  1288. {
  1289. StringBuffer s;
  1290. toXML(&queryXGMML(), s, 2);
  1291. MLOG(MCthorDetailedDebugInfo, thorJob, "Running graph [%s] : %s", isGlobal()?"global":"local", s.str());
  1292. }
  1293. }
  1294. if (localResults)
  1295. localResults->clear();
  1296. doExecute(parentExtractSz, parentExtract, false);
  1297. }
  1298. catch (IException *e)
  1299. {
  1300. GraphPrintLog(e);
  1301. exception.setown(e);
  1302. }
  1303. if (!queryOwner())
  1304. {
  1305. GraphPrintLog("Graph Done");
  1306. if (!REJECTLOG(MCthorDetailedDebugInfo))
  1307. {
  1308. StringBuffer memStr;
  1309. getSystemTraceInfo(memStr, PerfMonStandard | PerfMonExtended);
  1310. ::GraphPrintLog(this, thorDetailedLogLevel, "%s", memStr.str());
  1311. }
  1312. }
  1313. if (exception)
  1314. throw exception.getClear();
  1315. }
  1316. void CGraphBase::onCreate()
  1317. {
  1318. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1319. ForEach(*iter)
  1320. {
  1321. CGraphElementBase &element = iter->query();
  1322. element.onCreate();
  1323. }
  1324. }
  1325. void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
  1326. {
  1327. if (isComplete())
  1328. return;
  1329. if (async)
  1330. queryJobChannel().startGraph(*this, checkDependencies, _parentExtractSz, parentExtract); // may block if enough running
  1331. else
  1332. {
  1333. if (!prepare(_parentExtractSz, parentExtract, checkDependencies, false, false))
  1334. {
  1335. setComplete();
  1336. return;
  1337. }
  1338. executeSubGraph(_parentExtractSz, parentExtract);
  1339. }
  1340. }
  1341. void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
  1342. {
  1343. if (isComplete()) return;
  1344. if (queryAborted())
  1345. {
  1346. if (abortException)
  1347. throw abortException.getLink();
  1348. throw MakeGraphException(this, 0, "subgraph aborted");
  1349. }
  1350. GraphPrintLog("Processing graph");
  1351. setParentCtx(parentExtractSz, parentExtract);
  1352. Owned<IException> exception;
  1353. try
  1354. {
  1355. if (started)
  1356. reset();
  1357. else
  1358. started = true;
  1359. ++numExecuted;
  1360. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1361. ForEach(*iter)
  1362. {
  1363. CGraphElementBase &element = iter->query();
  1364. element.onStart(parentExtractSz, parentExtract);
  1365. element.initActivity();
  1366. }
  1367. initialized = true;
  1368. if (!preStart(parentExtractSz, parentExtract)) return;
  1369. start();
  1370. if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
  1371. GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
  1372. else
  1373. graphDone = true;
  1374. }
  1375. catch (IException *e)
  1376. {
  1377. GraphPrintLog(e);
  1378. exception.setown(e);
  1379. }
  1380. try
  1381. {
  1382. if (!exception && abortException)
  1383. exception.setown(abortException.getClear());
  1384. if (exception)
  1385. {
  1386. if (NULL == owner || isGlobal())
  1387. waitBarrier->cancel(exception);
  1388. if (!queryOwner())
  1389. {
  1390. StringBuffer str;
  1391. Owned<IThorException> e = MakeGraphException(this, exception->errorCode(), "%s", exception->errorMessage(str).str());
  1392. e->setAction(tea_abort);
  1393. fireException(e);
  1394. }
  1395. }
  1396. }
  1397. catch (IException *e)
  1398. {
  1399. GraphPrintLog(e, "during abort()");
  1400. e->Release();
  1401. }
  1402. try
  1403. {
  1404. done();
  1405. if (doneBarrier)
  1406. doneBarrier->wait(false);
  1407. }
  1408. catch (IException *e)
  1409. {
  1410. GraphPrintLog(e);
  1411. if (!exception.get())
  1412. exception.setown(e);
  1413. else
  1414. e->Release();
  1415. }
  1416. end();
  1417. if (exception)
  1418. throw exception.getClear();
  1419. if (!queryAborted())
  1420. setComplete();
  1421. }
  1422. bool CGraphBase::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
  1423. {
  1424. if (isComplete()) return false;
  1425. bool needToExecute = false;
  1426. ForEachItemIn(s, sinks)
  1427. {
  1428. CGraphElementBase &sink = sinks.item(s);
  1429. if (sink.prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async, false))
  1430. needToExecute = true;
  1431. }
  1432. onCreate();
  1433. return needToExecute;
  1434. }
  1435. void CGraphBase::done()
  1436. {
  1437. if (aborted) return; // activity done methods only called on success
  1438. if (isLocalChild()) // CQ master activities are created on demand, call done() on any created
  1439. {
  1440. Owned<IThorActivityIterator> iter = getIterator();
  1441. ForEach(*iter)
  1442. {
  1443. CGraphElementBase &element = iter->query();
  1444. if (element.queryActivity())
  1445. element.queryActivity()->done();
  1446. }
  1447. }
  1448. else
  1449. {
  1450. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1451. ForEach (*iter)
  1452. {
  1453. CGraphElementBase &element = iter->query();
  1454. element.queryActivity()->done();
  1455. }
  1456. }
  1457. Owned<IThorGraphIterator> childIter = getChildGraphIterator();
  1458. ForEach(*childIter)
  1459. childIter->query().done();
  1460. }
  1461. unsigned CGraphBase::queryJobChannelNumber() const
  1462. {
  1463. return queryJobChannel().queryChannel();
  1464. }
  1465. IMPServer &CGraphBase::queryMPServer() const
  1466. {
  1467. return jobChannel.queryMPServer();
  1468. }
  1469. bool CGraphBase::syncInitData()
  1470. {
  1471. if (loopBodySubgraph)
  1472. {
  1473. CGraphElementBase *parentElement = queryOwner() ? queryOwner()->queryElement(queryParentActivityId()) : nullptr;
  1474. assertex(parentElement);
  1475. return parentElement->queryLoopGraph()->queryGraph()->isGlobal();
  1476. }
  1477. else
  1478. return !isLocalChild();
  1479. }
  1480. void CGraphBase::end()
  1481. {
  1482. // always called, any final action clear up
  1483. Owned<IThorActivityIterator> iter = getIterator();
  1484. ForEach(*iter)
  1485. {
  1486. CGraphElementBase &element = iter->query();
  1487. try
  1488. {
  1489. if (element.queryActivity())
  1490. element.queryActivity()->kill();
  1491. }
  1492. catch (IException *e)
  1493. {
  1494. Owned<IException> e2 = MakeActivityException(element.queryActivity(), e, "Error calling kill()");
  1495. GraphPrintLog(e2);
  1496. e->Release();
  1497. }
  1498. }
  1499. }
  1500. class CGraphTraverseIteratorBase : implements IThorActivityIterator, public CInterface
  1501. {
  1502. protected:
  1503. CGraphBase &graph;
  1504. Linked<CGraphElementBase> cur;
  1505. CIArrayOf<CGraphElementBase> others;
  1506. CGraphElementArrayCopy covered;
  1507. CGraphElementBase *popNext()
  1508. {
  1509. if (!others.ordinality())
  1510. {
  1511. cur.clear();
  1512. return NULL;
  1513. }
  1514. cur.setown(&others.popGet());
  1515. return cur;
  1516. }
  1517. void setNext(bool branchOnConditional)
  1518. {
  1519. if (branchOnConditional && ((unsigned)-1) != cur->whichBranch)
  1520. {
  1521. CIOConnection *io = cur->connectedInputs.queryItem(cur->whichBranch);
  1522. if (io)
  1523. cur.set(io->activity);
  1524. else
  1525. cur.clear();
  1526. }
  1527. else
  1528. {
  1529. CIOConnectionArray &inputs = cur->connectedInputs;
  1530. cur.clear();
  1531. unsigned n = inputs.ordinality();
  1532. bool first = true;
  1533. for (unsigned i=0; i<n; i++)
  1534. {
  1535. CIOConnection *io = inputs.queryItem(i);
  1536. if (io)
  1537. {
  1538. if (first)
  1539. {
  1540. first = false;
  1541. cur.set(io->activity);
  1542. }
  1543. else
  1544. others.append(*LINK(io->activity));
  1545. }
  1546. }
  1547. }
  1548. if (!cur)
  1549. {
  1550. if (!popNext())
  1551. return;
  1552. }
  1553. // check haven't been here before
  1554. for (;;)
  1555. {
  1556. if (cur->getOutputs() < 2)
  1557. break;
  1558. else if (NotFound == covered.find(*cur))
  1559. {
  1560. if (!cur->alreadyUpdated)
  1561. {
  1562. covered.append(*cur);
  1563. break;
  1564. }
  1565. }
  1566. if (!popNext())
  1567. return;
  1568. }
  1569. }
  1570. public:
  1571. IMPLEMENT_IINTERFACE;
  1572. CGraphTraverseIteratorBase(CGraphBase &_graph) : graph(_graph)
  1573. {
  1574. }
  1575. virtual bool first()
  1576. {
  1577. covered.kill();
  1578. others.kill();
  1579. cur.clear();
  1580. Owned<IThorActivityIterator> sinkIter = graph.getSinkIterator();
  1581. if (!sinkIter->first())
  1582. return false;
  1583. for (;;)
  1584. {
  1585. cur.set(& sinkIter->query());
  1586. if (!cur->alreadyUpdated)
  1587. break;
  1588. if (!sinkIter->next())
  1589. return false;
  1590. }
  1591. while (sinkIter->next())
  1592. others.append(sinkIter->get());
  1593. return true;
  1594. }
  1595. virtual bool isValid() { return NULL != cur.get(); }
  1596. virtual CGraphElementBase & query() { return *cur; }
  1597. CGraphElementBase & get() { return *LINK(cur); }
  1598. };
  1599. class CGraphTraverseConnectedIterator : public CGraphTraverseIteratorBase
  1600. {
  1601. bool branchOnConditional;
  1602. public:
  1603. CGraphTraverseConnectedIterator(CGraphBase &graph, bool _branchOnConditional) : CGraphTraverseIteratorBase(graph), branchOnConditional(_branchOnConditional) { }
  1604. virtual bool next()
  1605. {
  1606. setNext(branchOnConditional);
  1607. return NULL!=cur.get();
  1608. }
  1609. };
  1610. IThorActivityIterator *CGraphBase::getConnectedIterator(bool branchOnConditional)
  1611. {
  1612. return new CGraphTraverseConnectedIterator(*this, branchOnConditional);
  1613. }
  1614. bool CGraphBase::wait(unsigned timeout)
  1615. {
  1616. CTimeMon tm(timeout);
  1617. unsigned remaining = timeout;
  1618. class CWaitException
  1619. {
  1620. CGraphBase *graph;
  1621. Owned<IException> exception;
  1622. public:
  1623. CWaitException(CGraphBase *_graph) : graph(_graph) { }
  1624. IException *get() { return exception; }
  1625. void set(IException *e)
  1626. {
  1627. if (!exception)
  1628. exception.setown(e);
  1629. else
  1630. e->Release();
  1631. }
  1632. void throwException()
  1633. {
  1634. if (exception)
  1635. throw exception.getClear();
  1636. throw MakeGraphException(graph, 0, "Timed out waiting for graph to end");
  1637. }
  1638. } waitException(this);
  1639. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1640. ForEach (*iter)
  1641. {
  1642. CGraphElementBase &element = iter->query();
  1643. CActivityBase *activity = element.queryActivity();
  1644. if (INFINITE != timeout && tm.timedout(&remaining))
  1645. waitException.throwException();
  1646. try
  1647. {
  1648. if (!activity->wait(remaining))
  1649. waitException.throwException();
  1650. }
  1651. catch (IException *e)
  1652. {
  1653. waitException.set(e); // will discard if already set
  1654. if (timeout == INFINITE)
  1655. {
  1656. unsigned e = tm.elapsed();
  1657. if (e >= MEDIUMTIMEOUT)
  1658. waitException.throwException();
  1659. timeout = MEDIUMTIMEOUT-e;
  1660. tm.reset(timeout);
  1661. }
  1662. }
  1663. }
  1664. if (waitException.get())
  1665. waitException.throwException();
  1666. // synchronize all slaves to end of graphs
  1667. if (NULL == owner || isGlobal())
  1668. {
  1669. if (INFINITE != timeout && tm.timedout(&remaining))
  1670. waitException.throwException();
  1671. if (!waitBarrier->wait(true, remaining))
  1672. return false;
  1673. }
  1674. return true;
  1675. }
  1676. void CGraphBase::abort(IException *e)
  1677. {
  1678. if (aborted)
  1679. return;
  1680. GraphPrintLog("Aborted");
  1681. {
  1682. CriticalBlock cb(crit);
  1683. abortException.set(e);
  1684. aborted = true;
  1685. if (!job.getOptBool("coreOnAbort", false))
  1686. {
  1687. // prevent dtors throwing ... __verbose_terminate_handler()->abort()->raise()->core files
  1688. setProcessAborted(true);
  1689. }
  1690. graphCancelHandler.cancel(0);
  1691. if (0 == containers.count())
  1692. {
  1693. Owned<IThorGraphStubIterator> iter = getChildStubIterator();
  1694. ForEach(*iter)
  1695. {
  1696. CGraphStub &graph = iter->query();
  1697. graph.abort(e);
  1698. }
  1699. }
  1700. }
  1701. if (started && !graphDone)
  1702. {
  1703. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1704. ForEach (*iter)
  1705. {
  1706. iter->query().abort(e); // JCSMORE - could do in parallel, they can take some time to timeout
  1707. }
  1708. if (startBarrier)
  1709. startBarrier->cancel(e);
  1710. if (waitBarrier)
  1711. waitBarrier->cancel(e);
  1712. if (doneBarrier)
  1713. doneBarrier->cancel(e);
  1714. }
  1715. }
  1716. void CGraphBase::GraphPrintLog(const char *format, ...)
  1717. {
  1718. va_list args;
  1719. va_start(args, format);
  1720. ::GraphPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
  1721. va_end(args);
  1722. }
  1723. void CGraphBase::GraphPrintLog(IException *e, const char *format, ...)
  1724. {
  1725. va_list args;
  1726. va_start(args, format);
  1727. ::GraphPrintLogArgs(this, e, thorlog_null, MCdebugProgress, format, args);
  1728. va_end(args);
  1729. }
  1730. void CGraphBase::GraphPrintLog(IException *e)
  1731. {
  1732. GraphPrintLog(e, "%s", "");
  1733. }
  1734. void CGraphBase::setLogging(bool tf)
  1735. {
  1736. Owned<IThorActivityIterator> iter = getIterator();
  1737. ForEach(*iter)
  1738. iter->query().setLogging(tf);
  1739. }
  1740. void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGraphBase *_parent, CGraphBase *resultsGraph, CGraphTableCopy &newGraphs)
  1741. {
  1742. class CChildParallelFactory : public CGraphStub
  1743. {
  1744. Linked<CGraphBase> originalChildGraph;
  1745. CriticalSection crit;
  1746. CIArrayOf<CGraphBase> stack;
  1747. CIArrayOf<CGraphBase> active;
  1748. bool originalAvailable = true;
  1749. CGraphBase *getGraph()
  1750. {
  1751. Owned<CGraphBase> childGraph;
  1752. {
  1753. CriticalBlock b(crit);
  1754. if (originalAvailable)
  1755. {
  1756. originalAvailable = false;
  1757. active.append(*originalChildGraph.getLink());
  1758. return originalChildGraph.getLink();
  1759. }
  1760. if (stack.length())
  1761. childGraph.setown(&stack.popGet());
  1762. }
  1763. if (!childGraph)
  1764. childGraph.setown(originalChildGraph->cloneGraph());
  1765. if (originalChildGraph->queryAborted())
  1766. throw MakeGraphException(originalChildGraph, 0, "Job aborted");
  1767. {
  1768. CriticalBlock b(crit);
  1769. active.append(*childGraph.getLink());
  1770. }
  1771. return childGraph.getClear();
  1772. }
  1773. void pushGraph(CGraphBase *childGraph)
  1774. {
  1775. CriticalBlock b(crit);
  1776. verifyex(active.zap(*childGraph));
  1777. if (childGraph == originalChildGraph)
  1778. originalAvailable = true;
  1779. else
  1780. stack.append(*LINK(childGraph));
  1781. }
  1782. public:
  1783. CChildParallelFactory(CGraphBase *_originalChildGraph) : originalChildGraph(_originalChildGraph)
  1784. {
  1785. graphId = originalChildGraph->queryGraphId();
  1786. }
  1787. virtual CGraphBase &queryOriginalGraph() override { return *originalChildGraph; }
  1788. virtual void abort(IException *e) override
  1789. {
  1790. for (;;)
  1791. {
  1792. Owned<CGraphBase> activeChildGraph;
  1793. {
  1794. CriticalBlock b(crit);
  1795. activeChildGraph.setown(&active.popGet());
  1796. if (!activeChildGraph)
  1797. break;
  1798. }
  1799. activeChildGraph->abort(e);
  1800. }
  1801. }
  1802. virtual bool serializeStats(MemoryBuffer &mb) override
  1803. {
  1804. // JCSMORE - need to merge other instances
  1805. return originalChildGraph->serializeStats(mb);
  1806. }
  1807. virtual IEclGraphResults * evaluate(unsigned parentExtractSz, const byte * parentExtract) override
  1808. {
  1809. Owned<CGraphBase> childGraph = getGraph();
  1810. Owned<IEclGraphResults> results = childGraph->evaluate(parentExtractSz, parentExtract);
  1811. pushGraph(childGraph);
  1812. return results.getClear();
  1813. }
  1814. };
  1815. owner = _owner;
  1816. parent = _parent?_parent:owner;
  1817. node.setown(createPTreeFromIPT(_node));
  1818. xgmml = node->queryPropTree("att/graph");
  1819. sink = xgmml->getPropBool("att[@name=\"rootGraph\"]/@value", false);
  1820. sequential = xgmml->getPropBool("@sequential");
  1821. graphId = node->getPropInt("@id");
  1822. global = false;
  1823. localOnly = -1; // unset
  1824. parentActivityId = node->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
  1825. graphResultsContainer = resultsGraph;
  1826. unsigned numResults = xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0);
  1827. if (numResults)
  1828. {
  1829. localResults.setown(createThorGraphResults(numResults));
  1830. resultsGraph = this;
  1831. // JCSMORE - it might more sense if this temp handler was owned by parent act., which may finish(get stopped) earlier than the owning graph
  1832. tmpHandler.setown(queryJob().createTempHandler(false));
  1833. }
  1834. localChild = false;
  1835. if (owner && parentActivityId)
  1836. {
  1837. CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
  1838. if (isLoopActivity(*parentElement))
  1839. {
  1840. localChild = parentElement->queryOwner().isLocalChild();
  1841. unsigned loopId = parentElement->queryXGMML().getPropInt("att[@name=\"_loopid\"]/@value");
  1842. if ((graphId == loopId) || (owner->queryGraphId() == loopId))
  1843. loopBodySubgraph = true;
  1844. else
  1845. localChild = true;
  1846. }
  1847. else
  1848. localChild = true;
  1849. }
  1850. Owned<IPropertyTreeIterator> nodes = xgmml->getElements("node");
  1851. ForEach(*nodes)
  1852. {
  1853. IPropertyTree &e = nodes->query();
  1854. ThorActivityKind kind = (ThorActivityKind) e.getPropInt("att[@name=\"_kind\"]/@value");
  1855. if (TAKsubgraph == kind)
  1856. {
  1857. Owned<CGraphBase> subGraph = queryJobChannel().createGraph();
  1858. subGraph->createFromXGMML(&e, this, parent, resultsGraph, newGraphs);
  1859. activity_id subGraphParentActivityId = e.getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
  1860. if (subGraphParentActivityId) // JCS - not sure if ever false
  1861. {
  1862. Owned<CGraphStub> stub = new CChildParallelFactory(subGraph);
  1863. addChildGraph(stub);
  1864. }
  1865. else
  1866. addChildGraph(subGraph);
  1867. if (!global)
  1868. global = subGraph->isGlobal();
  1869. newGraphs.replace(*subGraph);
  1870. }
  1871. else
  1872. {
  1873. if (localChild && !e.getPropBool("att[@name=\"coLocal\"]/@value", false))
  1874. {
  1875. IPropertyTree *att = createPTree("att");
  1876. att->setProp("@name", "coLocal");
  1877. att->setPropBool("@value", true);
  1878. e.addPropTree("att", att);
  1879. }
  1880. CGraphElementBase *act = createGraphElement(e, *this, resultsGraph);
  1881. addActivity(act);
  1882. if (!global)
  1883. global = isGlobalActivity(*act);
  1884. }
  1885. }
  1886. Owned<IPropertyTreeIterator> edges = xgmml->getElements("edge");
  1887. ForEach(*edges)
  1888. {
  1889. IPropertyTree &edge = edges->query();
  1890. //Ignore edges that represent dependencies from parent activities to child activities.
  1891. if (edge.getPropBool("att[@name=\"_childGraph\"]/@value", false))
  1892. continue;
  1893. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1894. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  1895. CGraphElementBase *source = queryElement(edge.getPropInt("@source"));
  1896. CGraphElementBase *target = queryElement(edge.getPropInt("@target"));
  1897. target->addInput(targetInput, source, sourceOutput);
  1898. int controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  1899. if (controlId != 0)
  1900. noteDependency(target, source, controlId, false);
  1901. }
  1902. Owned<IThorActivityIterator> iter = getIterator();
  1903. ForEach(*iter)
  1904. {
  1905. CGraphElementBase &element = iter->query();
  1906. if (0 == element.getOutputs())
  1907. {
  1908. /* JCSMORE - Making some outputs conditional, will require:
  1909. * a) Pass through information as to which dependent graph causes this graph (and this sink) to execute)
  1910. * b) Allow the subgraph to re-executed by other dependent subgraphs and avoid re-executing completed sinks
  1911. * c) Keep common points (splitters) around (preferably in memory), re-execution of graph will need them
  1912. */
  1913. sinks.append(*LINK(&element));
  1914. }
  1915. }
  1916. init();
  1917. }
  1918. void CGraphBase::executeChildGraphs(size32_t parentExtractSz, const byte *parentExtract)
  1919. {
  1920. if (sequential)
  1921. {
  1922. // JCSMORE - would need to re-think how this is done if these sibling child queries could be executed in parallel
  1923. ForEachItemIn(o, orderedChildGraphs)
  1924. {
  1925. CGraphBase &graph = orderedChildGraphs.item(o).queryOriginalGraph();
  1926. if (graph.isSink())
  1927. graph.execute(parentExtractSz, parentExtract, true, false);
  1928. }
  1929. }
  1930. else
  1931. {
  1932. Owned<IThorGraphIterator> iter = getChildGraphIterator();
  1933. ForEach(*iter)
  1934. {
  1935. CGraphBase &graph = iter->query();
  1936. if (graph.isSink())
  1937. graph.execute(parentExtractSz, parentExtract, true, false);
  1938. }
  1939. }
  1940. }
  1941. void CGraphBase::doExecuteChild(size32_t parentExtractSz, const byte *parentExtract)
  1942. {
  1943. reset();
  1944. if (0 == containers.count())
  1945. executeChildGraphs(parentExtractSz, parentExtract);
  1946. else
  1947. execute(parentExtractSz, parentExtract, false, false);
  1948. IGraphTempHandler *tempHandler = queryTempHandler(false);
  1949. if (tempHandler)
  1950. tempHandler->clearTemps();
  1951. }
  1952. void CGraphBase::executeChild(size32_t & retSize, void * &ret, size32_t parentExtractSz, const byte *parentExtract)
  1953. {
  1954. reset();
  1955. doExecute(parentExtractSz, parentExtract, false);
  1956. UNIMPLEMENTED;
  1957. /*
  1958. ForEachItemIn(idx1, elements)
  1959. {
  1960. EclGraphElement & cur = elements.item(idx1);
  1961. if (cur.isResult)
  1962. {
  1963. cur.extractResult(retSize, ret);
  1964. return;
  1965. }
  1966. }
  1967. */
  1968. throwUnexpected();
  1969. }
  1970. void CGraphBase::setResults(IThorGraphResults *results) // used by master only
  1971. {
  1972. localResults.set(results);
  1973. }
  1974. void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *_graphLoopResults)
  1975. {
  1976. localResults.set(results);
  1977. graphLoopResults.set(_graphLoopResults);
  1978. doExecuteChild(parentExtractSz, parentExtract);
  1979. graphLoopResults.clear();
  1980. localResults.clear();
  1981. }
  1982. StringBuffer &getGlobals(CGraphBase &graph, StringBuffer &str)
  1983. {
  1984. bool first = true;
  1985. Owned<IThorActivityIterator> iter = graph.getIterator();
  1986. ForEach(*iter)
  1987. {
  1988. CGraphElementBase &e = iter->query();
  1989. if (isGlobalActivity(e))
  1990. {
  1991. if (first)
  1992. str.append("Graph(").append(graph.queryGraphId()).append("): [");
  1993. else
  1994. str.append(", ");
  1995. first = false;
  1996. ThorActivityKind kind = e.getKind();
  1997. str.append(activityKindStr(kind));
  1998. str.append("(").append(e.queryId()).append(")");
  1999. }
  2000. }
  2001. if (!first)
  2002. str.append("]");
  2003. Owned<IThorGraphIterator> childIter = graph.getChildGraphIterator();
  2004. ForEach(*childIter)
  2005. {
  2006. CGraphBase &childGraph = childIter->query();
  2007. getGlobals(childGraph, str);
  2008. }
  2009. return str;
  2010. }
  2011. void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract)
  2012. {
  2013. assertex(localResults);
  2014. localResults->clear();
  2015. if (isGlobal()) // any slave
  2016. {
  2017. StringBuffer str("Global acts = ");
  2018. getGlobals(*this, str);
  2019. throw MakeGraphException(this, 0, "Global child graph? : %s", str.str());
  2020. }
  2021. doExecuteChild(parentExtractSz, parentExtract);
  2022. }
  2023. IThorResult *CGraphBase::getResult(unsigned id, bool distributed)
  2024. {
  2025. return localResults->getResult(id, distributed);
  2026. }
  2027. IThorResult *CGraphBase::getGraphLoopResult(unsigned id, bool distributed)
  2028. {
  2029. return graphLoopResults->getResult(id, distributed);
  2030. }
  2031. IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  2032. {
  2033. return results->createResult(activity, id, rowIf, resultType, spillPriority);
  2034. }
  2035. IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  2036. {
  2037. return localResults->createResult(activity, id, rowIf, resultType, spillPriority);
  2038. }
  2039. IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IThorRowInterfaces *rowIf, ThorGraphResultType resultType, unsigned spillPriority)
  2040. {
  2041. return graphLoopResults->createResult(activity, rowIf, resultType, spillPriority);
  2042. }
  2043. // IEclGraphResults
  2044. void CGraphBase::getDictionaryResult(unsigned & count, const byte * * & ret, unsigned id)
  2045. {
  2046. Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
  2047. result->getLinkedResult(count, ret);
  2048. }
  2049. void CGraphBase::getLinkedResult(unsigned & count, const byte * * & ret, unsigned id)
  2050. {
  2051. Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
  2052. result->getLinkedResult(count, ret);
  2053. }
  2054. const void * CGraphBase::getLinkedRowResult(unsigned id)
  2055. {
  2056. Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
  2057. return result->getLinkedRowResult();
  2058. }
  2059. // IThorChildGraph impl.
  2060. IEclGraphResults *CGraphBase::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
  2061. {
  2062. CriticalBlock block(evaluateCrit);
  2063. localResults.setown(createThorGraphResults(xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0)));
  2064. parentExtractSz = _parentExtractSz;
  2065. executeChild(parentExtractSz, parentExtract);
  2066. return localResults.getClear();
  2067. }
  2068. static bool isLocalOnly(const CGraphElementBase &activity);
  2069. static bool isLocalOnly(const CGraphBase &graph) // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
  2070. {
  2071. if (0 == graph.activityCount())
  2072. {
  2073. Owned<IThorGraphIterator> iter = graph.getChildGraphIterator();
  2074. ForEach(*iter)
  2075. {
  2076. CGraphBase &childGraph = iter->query();
  2077. if (childGraph.isSink())
  2078. {
  2079. if (!isLocalOnly(childGraph))
  2080. return false;
  2081. }
  2082. }
  2083. }
  2084. else
  2085. {
  2086. if (graph.isGlobal())
  2087. return false;
  2088. Owned<IThorActivityIterator> sinkIter = graph.getAllSinkIterator();
  2089. ForEach(*sinkIter)
  2090. {
  2091. CGraphElementBase &sink = sinkIter->query();
  2092. if (!isLocalOnly(sink))
  2093. return false;
  2094. }
  2095. }
  2096. return true;
  2097. }
  2098. static bool isLocalOnly(const CGraphElementBase &activity)
  2099. {
  2100. Owned<IThorGraphDependencyIterator> deps = activity.getDependsIterator();
  2101. ForEach(*deps)
  2102. {
  2103. if (!isLocalOnly(*(deps->query().graph)))
  2104. return false;
  2105. }
  2106. StringBuffer match("edge[@target=\"");
  2107. match.append(activity.queryId()).append("\"]");
  2108. Owned<IPropertyTreeIterator> inputs = activity.queryOwner().queryXGMML().getElements(match.str());
  2109. ForEach(*inputs)
  2110. {
  2111. IPropertyTree &edge = inputs->query();
  2112. //Ignore edges that represent dependencies from parent activities to child activities.
  2113. if (edge.getPropBool("att[@name=\"_childGraph\"]/@value", false))
  2114. continue;
  2115. CGraphElementBase *sourceAct = activity.queryOwner().queryElement(edge.getPropInt("@source"));
  2116. if (!isLocalOnly(*sourceAct))
  2117. return false;
  2118. }
  2119. return true;
  2120. }
  2121. bool CGraphBase::isLocalOnly() const // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
  2122. {
  2123. if (-1 == localOnly)
  2124. localOnly = (int)::isLocalOnly(*this);
  2125. return 1==localOnly;
  2126. }
  2127. IThorGraphResults *CGraphBase::createThorGraphResults(unsigned num)
  2128. {
  2129. return new CThorGraphResults(num);
  2130. }
  2131. ////
  2132. void CGraphTempHandler::registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
  2133. {
  2134. assertex(temp);
  2135. LOG(MCdebugProgress, thorJob, "registerTmpFile name=%s, usageCount=%d", name, usageCount);
  2136. CriticalBlock b(crit);
  2137. if (tmpFiles.find(name))
  2138. throw MakeThorException(TE_FileAlreadyUsedAsTempFile, "File already used as temp file (%s)", name);
  2139. tmpFiles.replace(* new CFileUsageEntry(name, graphId, fileKind, usageCount));
  2140. }
  2141. void CGraphTempHandler::deregisterFile(const char *name, bool kept)
  2142. {
  2143. LOG(MCdebugProgress, thorJob, "deregisterTmpFile name=%s", name);
  2144. CriticalBlock b(crit);
  2145. CFileUsageEntry *fileUsage = tmpFiles.find(name);
  2146. if (!fileUsage)
  2147. {
  2148. if (errorOnMissing)
  2149. throw MakeThorException(TE_FileNotFound, "File not found (%s) deregistering tmp file", name);
  2150. return;
  2151. }
  2152. if (0 == fileUsage->queryUsage()) // marked 'not to be deleted' until workunit complete.
  2153. return;
  2154. else if (1 == fileUsage->queryUsage())
  2155. {
  2156. tmpFiles.remove(name);
  2157. try
  2158. {
  2159. if (!removeTemp(name))
  2160. LOG(MCwarning, thorJob, "Failed to delete tmp file : %s (not found)", name);
  2161. }
  2162. catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(name).str()); }
  2163. }
  2164. else
  2165. fileUsage->decUsage();
  2166. }
  2167. void CGraphTempHandler::clearTemps()
  2168. {
  2169. CriticalBlock b(crit);
  2170. Owned<IFileUsageIterator> iter = getIterator();
  2171. ForEach(*iter)
  2172. {
  2173. CFileUsageEntry &entry = iter->query();
  2174. const char *tmpname = entry.queryName();
  2175. try
  2176. {
  2177. if (!removeTemp(tmpname))
  2178. LOG(MCwarning, thorJob, "Failed to delete tmp file : %s (not found)", tmpname);
  2179. }
  2180. catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(tmpname).str()); }
  2181. }
  2182. iter.clear();
  2183. tmpFiles.kill();
  2184. }
  2185. /////
  2186. class CGraphExecutor;
  2187. class CGraphExecutorGraphInfo : public CInterface
  2188. {
  2189. public:
  2190. CGraphExecutorGraphInfo(CGraphExecutor &_executor, CGraphBase *_subGraph, IGraphCallback &_callback, const byte *parentExtract, size32_t parentExtractSz) : executor(_executor), subGraph(_subGraph), callback(_callback)
  2191. {
  2192. parentExtractMb.append(parentExtractSz, parentExtract);
  2193. }
  2194. CGraphExecutor &executor;
  2195. IGraphCallback &callback;
  2196. Linked<CGraphBase> subGraph;
  2197. MemoryBuffer parentExtractMb;
  2198. };
  2199. class CGraphExecutor : implements IGraphExecutor, public CInterface
  2200. {
  2201. CJobChannel &jobChannel;
  2202. CJobBase &job;
  2203. CIArrayOf<CGraphExecutorGraphInfo> stack, running, toRun;
  2204. UnsignedArray seen;
  2205. bool stopped;
  2206. unsigned limit;
  2207. unsigned waitOnRunning;
  2208. CriticalSection crit;
  2209. Semaphore runningSem;
  2210. Owned<IThreadPool> graphPool;
  2211. class CGraphExecutorFactory : implements IThreadFactory, public CInterface
  2212. {
  2213. public:
  2214. IMPLEMENT_IINTERFACE;
  2215. CGraphExecutorFactory() { }
  2216. // IThreadFactory
  2217. virtual IPooledThread *createNew()
  2218. {
  2219. class CGraphExecutorThread : implements IPooledThread, public CInterface
  2220. {
  2221. Owned<CGraphExecutorGraphInfo> graphInfo;
  2222. public:
  2223. IMPLEMENT_IINTERFACE;
  2224. CGraphExecutorThread()
  2225. {
  2226. }
  2227. virtual void init(void *startInfo) override
  2228. {
  2229. graphInfo.setown((CGraphExecutorGraphInfo *)startInfo);
  2230. }
  2231. virtual void threadmain() override
  2232. {
  2233. for (;;)
  2234. {
  2235. Linked<CGraphBase> graph = graphInfo->subGraph;
  2236. Owned<IException> e;
  2237. try
  2238. {
  2239. PROGLOG("CGraphExecutor: Running graph, graphId=%" GIDPF "d", graph->queryGraphId());
  2240. graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
  2241. }
  2242. catch (IException *_e)
  2243. {
  2244. e.setown(_e);
  2245. }
  2246. Owned<CGraphExecutorGraphInfo> nextGraphInfo;
  2247. try
  2248. {
  2249. nextGraphInfo.setown(graphInfo->executor.graphDone(*graphInfo, e));
  2250. }
  2251. catch (IException *e)
  2252. {
  2253. GraphPrintLog(graph, e, "graphDone");
  2254. e->Release();
  2255. }
  2256. graphInfo.clear(); // NB: at this point the graph will be destroyed
  2257. if (e)
  2258. throw e.getClear();
  2259. if (!nextGraphInfo)
  2260. return;
  2261. graphInfo.setown(nextGraphInfo.getClear());
  2262. }
  2263. }
  2264. virtual bool canReuse() const override { return true; }
  2265. virtual bool stop() override { return true; }
  2266. };
  2267. return new CGraphExecutorThread();
  2268. }
  2269. } *factory;
  2270. CGraphExecutorGraphInfo *findRunning(graph_id gid)
  2271. {
  2272. ForEachItemIn(r, running)
  2273. {
  2274. CGraphExecutorGraphInfo *graphInfo = &running.item(r);
  2275. if (gid == graphInfo->subGraph->queryGraphId())
  2276. return graphInfo;
  2277. }
  2278. return NULL;
  2279. }
  2280. public:
  2281. IMPLEMENT_IINTERFACE;
  2282. CGraphExecutor(CJobChannel &_jobChannel) : jobChannel(_jobChannel), job(_jobChannel.queryJob())
  2283. {
  2284. limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
  2285. PROGLOG("CGraphExecutor: limit = %d", limit);
  2286. waitOnRunning = 0;
  2287. stopped = false;
  2288. factory = new CGraphExecutorFactory();
  2289. graphPool.setown(createThreadPool("CGraphExecutor pool", factory, &jobChannel, limit));
  2290. }
  2291. ~CGraphExecutor()
  2292. {
  2293. stopped = true;
  2294. graphPool->joinAll();
  2295. factory->Release();
  2296. }
  2297. CGraphExecutorGraphInfo *graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
  2298. {
  2299. CriticalBlock b(crit);
  2300. running.zap(doneGraphInfo);
  2301. if (waitOnRunning)
  2302. {
  2303. runningSem.signal(waitOnRunning);
  2304. waitOnRunning = 0;
  2305. }
  2306. if (e || job.queryAborted())
  2307. {
  2308. stopped = true;
  2309. stack.kill();
  2310. return NULL;
  2311. }
  2312. if (job.queryPausing())
  2313. stack.kill();
  2314. else if (stack.ordinality())
  2315. {
  2316. CICopyArrayOf<CGraphExecutorGraphInfo> toMove;
  2317. ForEachItemIn(s, stack)
  2318. {
  2319. bool dependenciesDone = true;
  2320. CGraphExecutorGraphInfo &graphInfo = stack.item(s);
  2321. ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
  2322. {
  2323. CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
  2324. if (!subGraph.isComplete())
  2325. {
  2326. dependenciesDone = false;
  2327. break;
  2328. }
  2329. }
  2330. if (dependenciesDone)
  2331. {
  2332. graphInfo.subGraph->dependentSubGraphs.kill();
  2333. graphInfo.subGraph->prepare(graphInfo.parentExtractMb.length(), (const byte *)graphInfo.parentExtractMb.toByteArray(), true, true, true); // now existing deps done, maybe more to prepare
  2334. ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
  2335. {
  2336. CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
  2337. if (!subGraph.isComplete())
  2338. {
  2339. dependenciesDone = false;
  2340. break;
  2341. }
  2342. }
  2343. if (dependenciesDone)
  2344. {
  2345. graphInfo.subGraph->dependentSubGraphs.kill(); // none to track anymore
  2346. toMove.append(graphInfo);
  2347. }
  2348. }
  2349. }
  2350. ForEachItemIn(m, toMove)
  2351. {
  2352. Linked<CGraphExecutorGraphInfo> graphInfo = &toMove.item(m);
  2353. stack.zap(*graphInfo);
  2354. toRun.add(*graphInfo.getClear(), 0);
  2355. }
  2356. }
  2357. job.markWuDirty();
  2358. PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
  2359. while (toRun.ordinality())
  2360. {
  2361. if (job.queryPausing())
  2362. return NULL;
  2363. Linked<CGraphExecutorGraphInfo> nextGraphInfo = &toRun.item(0);
  2364. toRun.remove(0);
  2365. if (!nextGraphInfo->subGraph->isComplete() && (NULL == findRunning(nextGraphInfo->subGraph->queryGraphId())))
  2366. {
  2367. running.append(*nextGraphInfo.getLink());
  2368. return nextGraphInfo.getClear();
  2369. }
  2370. }
  2371. return NULL;
  2372. }
  2373. // IGraphExecutor
  2374. virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
  2375. {
  2376. bool alreadyRunning;
  2377. {
  2378. CriticalBlock b(crit);
  2379. if (job.queryPausing())
  2380. return;
  2381. if (subGraph->isComplete())
  2382. return;
  2383. alreadyRunning = NULL != findRunning(subGraph->queryGraphId());
  2384. if (alreadyRunning)
  2385. ++waitOnRunning;
  2386. }
  2387. if (alreadyRunning)
  2388. {
  2389. for (;;)
  2390. {
  2391. PROGLOG("Waiting on subgraph %" GIDPF "d", subGraph->queryGraphId());
  2392. if (runningSem.wait(MEDIUMTIMEOUT) || job.queryAborted() || job.queryPausing())
  2393. break;
  2394. }
  2395. return;
  2396. }
  2397. else
  2398. {
  2399. CriticalBlock b(crit);
  2400. if (seen.contains(subGraph->queryGraphId()))
  2401. return; // already queued;
  2402. seen.append(subGraph->queryGraphId());
  2403. }
  2404. if (!subGraph->prepare(parentExtractSz, parentExtract, checkDependencies, true, true))
  2405. {
  2406. subGraph->setComplete();
  2407. return;
  2408. }
  2409. if (subGraph->dependentSubGraphs.ordinality())
  2410. {
  2411. bool dependenciesDone = true;
  2412. ForEachItemIn (d, subGraph->dependentSubGraphs)
  2413. {
  2414. CGraphBase &graph = subGraph->dependentSubGraphs.item(d);
  2415. if (!graph.isComplete())
  2416. {
  2417. dependenciesDone = false;
  2418. break;
  2419. }
  2420. }
  2421. if (dependenciesDone)
  2422. subGraph->dependentSubGraphs.kill(); // none to track anymore
  2423. }
  2424. Owned<CGraphExecutorGraphInfo> graphInfo = new CGraphExecutorGraphInfo(*this, subGraph, callback, parentExtract, parentExtractSz);
  2425. CriticalBlock b(crit);
  2426. if (0 == subGraph->dependentSubGraphs.ordinality())
  2427. {
  2428. if (running.ordinality()<limit)
  2429. {
  2430. running.append(*LINK(graphInfo));
  2431. PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
  2432. graphPool->start(graphInfo.getClear());
  2433. }
  2434. else
  2435. stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
  2436. }
  2437. else
  2438. stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
  2439. }
  2440. virtual IThreadPool &queryGraphPool() { return *graphPool; }
  2441. virtual void wait()
  2442. {
  2443. PROGLOG("CGraphExecutor exiting, waiting on graph pool");
  2444. graphPool->joinAll();
  2445. PROGLOG("CGraphExecutor graphPool finished");
  2446. }
  2447. };
  2448. ////
  2449. // IContextLogger
  2450. class CThorContextLogger : implements IContextLogger, public CSimpleInterface
  2451. {
  2452. unsigned traceLevel = 1;
  2453. StringAttr globalIdHeader;
  2454. StringAttr callerIdHeader;
  2455. StringAttr globalId;
  2456. StringAttr callerId;
  2457. StringBuffer localId;
  2458. public:
  2459. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2460. CThorContextLogger()
  2461. {
  2462. if (globals->hasProp("@httpGlobalIdHeader"))
  2463. setHttpIdHeaders(globals->queryProp("@httpGlobalIdHeader"), globals->queryProp("@httpCallerIdHeader"));
  2464. }
  2465. virtual void CTXLOGva(const char *format, va_list args) const __attribute__((format(printf,2,0)))
  2466. {
  2467. StringBuffer ss;
  2468. ss.valist_appendf(format, args);
  2469. LOG(MCdebugProgress, thorJob, "%s", ss.str());
  2470. }
  2471. virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0)))
  2472. {
  2473. StringBuffer ss;
  2474. ss.append("ERROR");
  2475. if (E)
  2476. ss.append(": ").append(E->errorCode());
  2477. if (file)
  2478. ss.appendf(": %s(%d) ", file, line);
  2479. if (E)
  2480. E->errorMessage(ss.append(": "));
  2481. if (format)
  2482. ss.append(": ").valist_appendf(format, args);
  2483. LOG(MCoperatorProgress, thorJob, "%s", ss.str());
  2484. }
  2485. virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
  2486. {
  2487. }
  2488. virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const
  2489. {
  2490. }
  2491. virtual void mergeStats(const CRuntimeStatisticCollection &from) const
  2492. {
  2493. }
  2494. virtual unsigned queryTraceLevel() const
  2495. {
  2496. return traceLevel;
  2497. }
  2498. virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override
  2499. {
  2500. globalId.set(id);
  2501. appendGloballyUniqueId(localId.clear());
  2502. }
  2503. virtual void setCallerId(const char *id) override
  2504. {
  2505. callerId.set(id);
  2506. }
  2507. virtual const char *queryGlobalId() const
  2508. {
  2509. return globalId.get();
  2510. }
  2511. virtual const char *queryLocalId() const
  2512. {
  2513. return localId.str();
  2514. }
  2515. virtual const char *queryCallerId() const override
  2516. {
  2517. return callerId.str();
  2518. }
  2519. virtual void setHttpIdHeaders(const char *global, const char *caller)
  2520. {
  2521. if (global && *global)
  2522. globalIdHeader.set(global);
  2523. if (caller && *caller)
  2524. callerIdHeader.set(caller);
  2525. }
  2526. virtual const char *queryGlobalIdHttpHeader() const
  2527. {
  2528. return globalIdHeader.str();
  2529. }
  2530. virtual const char *queryCallerIdHttpHeader() const
  2531. {
  2532. return callerIdHeader.str();
  2533. }
  2534. };
  2535. ////
  2536. CJobBase::CJobBase(ILoadedDllEntry *_querySo, const char *_graphName) : querySo(_querySo), graphName(_graphName)
  2537. {
  2538. dirty = true;
  2539. aborted = false;
  2540. queryMemoryMB = 0;
  2541. channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
  2542. numChannels = channelsPerSlave;
  2543. pluginMap = new SafePluginMap(&pluginCtx, true);
  2544. // JCSMORE - Will pass down at job creation time...
  2545. jobGroup.set(&::queryClusterGroup());
  2546. slaveGroup.setown(jobGroup->remove(0));
  2547. nodeGroup.set(&queryNodeGroup());
  2548. myNodeRank = nodeGroup->rank(::queryMyNode());
  2549. unsigned channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
  2550. jobChannelSlaveNumbers.allocateN(channelsPerSlave, true); // filled when channels are added.
  2551. jobSlaveChannelNum.allocateN(querySlaves()); // filled when channels are added.
  2552. for (unsigned s=0; s<querySlaves(); s++)
  2553. jobSlaveChannelNum[s] = NotFound;
  2554. Owned<ILocalWorkUnit> localWU = createLocalWorkUnit(querySo);
  2555. if (!localWU)
  2556. throw MakeStringException(0, "Failed to locate workunit info in query : %s", querySo->queryName());
  2557. Owned<IConstWUGraph> graph = localWU->getGraph(graphName);
  2558. graphXGMML.setown(graph->getXGMMLTree(false, false));
  2559. if (!graphXGMML)
  2560. throwUnexpected();
  2561. }
  2562. void CJobBase::applyMemorySettings(const char *context)
  2563. {
  2564. // NB: 'total' memory has been calculated in advance from either resource settings or from system memory.
  2565. VStringBuffer memoryContext("%sMemory", context);
  2566. unsigned totalMemoryMB = globals->getPropInt(VStringBuffer("%s/@total", memoryContext.str()));
  2567. dbgassertex(totalMemoryMB);
  2568. auto getWorkUnitValueFunc = [this](const char *prop, StringBuffer &result) { getWorkUnitValue(prop, result); return result.length()>0;};
  2569. std::unordered_map<std::string, __uint64> memorySpecifications;
  2570. getMemorySpecifications(memorySpecifications, globals, memoryContext, totalMemoryMB, getWorkUnitValueFunc);
  2571. queryMemoryMB = (unsigned)(memorySpecifications["query"] / 0x100000);
  2572. if (0 == queryMemoryMB)
  2573. {
  2574. unsigned totalRequirementsMB = (unsigned)(memorySpecifications["total"] / 0x100000);
  2575. unsigned recommendedMaxMB = (unsigned)(memorySpecifications["recommendedMaxMemory"] / 0x100000);
  2576. queryMemoryMB = recommendedMaxMB - totalRequirementsMB;
  2577. unsigned sharedInstances = globals->getPropInt(VStringBuffer("%s/@sharedInstances", memoryContext.str()), 1);
  2578. queryMemoryMB /= sharedInstances;
  2579. }
  2580. // a simple helper used below, to fetch bool from workunit, or the memory settings (either managerMemory or workerMemory) or legacy location
  2581. auto getBoolSetting = [&](const char *setting, bool defaultValue)
  2582. {
  2583. VStringBuffer attrSetting("@%s", setting);
  2584. return getWorkUnitValueBool(setting,
  2585. globals->getPropBool(VStringBuffer("%s/%s", memoryContext.str(), attrSetting.str()),
  2586. globals->getPropBool(attrSetting,
  2587. defaultValue
  2588. )));
  2589. };
  2590. bool gmemAllowHugePages = getBoolSetting("heapUseHugePages", false);
  2591. gmemAllowHugePages = getBoolSetting("heapMasterUseHugePages", gmemAllowHugePages);
  2592. bool gmemAllowTransparentHugePages = getBoolSetting("heapUseTransparentHugePages", true);
  2593. bool gmemRetainMemory = getBoolSetting("heapRetainMemory", false);
  2594. roxiemem::setTotalMemoryLimit(gmemAllowHugePages, gmemAllowTransparentHugePages, gmemRetainMemory, ((memsize_t)queryMemoryMB) * 0x100000, 0, thorAllocSizes, NULL);
  2595. PROGLOG("Total memory = %u MB, query memory = %u MB, memory spill at = %u", totalMemoryMB, queryMemoryMB, memorySpillAtPercentage);
  2596. }
  2597. void CJobBase::init()
  2598. {
  2599. StringBuffer tmp;
  2600. tmp.append(wuid);
  2601. tmp.append(graphName);
  2602. key.set(tmp.str());
  2603. StringBuffer user;
  2604. extractFromWorkunitDAToken(token.str(), nullptr, &user, nullptr);
  2605. userDesc = createUserDescriptor();
  2606. userDesc->set(user.str(), token.str());//use workunit token as password
  2607. forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0);
  2608. forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0);
  2609. logctx.setown(new CThorContextLogger());
  2610. // global setting default on, can be overridden by #option
  2611. timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
  2612. maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", 0); // NB: 0 means system decides
  2613. if (0 == maxActivityCores)
  2614. maxActivityCores = getAffinityCpus();
  2615. pausing = false;
  2616. resumed = false;
  2617. crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
  2618. usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
  2619. memorySpillAtPercentage = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
  2620. failOnLeaks = getOptBool("failOnLeaks");
  2621. maxLfnBlockTimeMins = getOptInt(THOROPT_MAXLFN_BLOCKTIME_MINS, DEFAULT_MAXLFN_BLOCKTIME_MINS);
  2622. soapTraceLevel = getOptInt("soapTraceLevel", 1);
  2623. StringBuffer tracing("maxActivityCores = ");
  2624. if (maxActivityCores)
  2625. tracing.append(maxActivityCores);
  2626. else
  2627. tracing.append("[unbound]");
  2628. PROGLOG("%s", tracing.str());
  2629. }
  2630. void CJobBase::beforeDispose()
  2631. {
  2632. endJob();
  2633. }
  2634. CJobChannel &CJobBase::queryJobChannel(unsigned c) const
  2635. {
  2636. return jobChannels.item(c);
  2637. }
  2638. CActivityBase &CJobBase::queryChannelActivity(unsigned c, graph_id gid, activity_id id) const
  2639. {
  2640. CJobChannel &channel = queryJobChannel(c);
  2641. Owned<CGraphBase> graph = channel.getGraph(gid);
  2642. dbgassertex(graph);
  2643. CGraphElementBase *container = graph->queryElement(id);
  2644. dbgassertex(container);
  2645. return *container->queryActivity();
  2646. }
  2647. void CJobBase::startJob()
  2648. {
  2649. LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
  2650. perfmonhook.setown(createThorMemStatsPerfMonHook(*this, getOptInt(THOROPT_MAX_KERNLOG, 3)));
  2651. setPerformanceMonitorHook(perfmonhook);
  2652. PrintMemoryStatusLog();
  2653. logDiskSpace();
  2654. unsigned keyNodeCacheMB = getWorkUnitValueInt("keyNodeCacheMB", DEFAULT_KEYNODECACHEMB * queryJobChannels());
  2655. unsigned keyLeafCacheMB = getWorkUnitValueInt("keyLeafCacheMB", DEFAULT_KEYLEAFCACHEMB * queryJobChannels());
  2656. unsigned keyBlobCacheMB = getWorkUnitValueInt("keyBlobCacheMB", DEFAULT_KEYBLOBCACHEMB * queryJobChannels());
  2657. bool legacyNodeCache = getWorkUnitValueBool("legacyNodeCache", false);
  2658. keyNodeCacheBytes = ((memsize_t)0x100000) * keyNodeCacheMB;
  2659. keyLeafCacheBytes = ((memsize_t)0x100000) * keyLeafCacheMB;
  2660. keyBlobCacheBytes = ((memsize_t)0x100000) * keyBlobCacheMB;
  2661. setNodeCacheMem(keyNodeCacheBytes);
  2662. setLeafCacheMem(keyLeafCacheBytes);
  2663. setBlobCacheMem(keyBlobCacheBytes);
  2664. setLegacyNodeCache(legacyNodeCache);
  2665. PROGLOG("Key node caching setting: node=%u MB, leaf=%u MB, blob=%u MB", keyNodeCacheMB, keyLeafCacheMB, keyBlobCacheMB);
  2666. unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
  2667. if (!keyFileCacheLimit)
  2668. keyFileCacheLimit = (querySlaves()+1)*2;
  2669. setKeyIndexCacheSize(keyFileCacheLimit);
  2670. PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
  2671. if (getOptBool("dumpStacks")) // mainly as an example of printAllStacks() usage
  2672. {
  2673. StringBuffer output;
  2674. if (getAllStacks(output))
  2675. {
  2676. IERRLOG("%s", output.str());
  2677. }
  2678. else
  2679. IWARNLOG("Failed to capture process stacks: %s", output.str());
  2680. }
  2681. }
  2682. void CJobBase::endJob()
  2683. {
  2684. if (jobEnded)
  2685. return;
  2686. jobEnded = true;
  2687. setPerformanceMonitorHook(nullptr);
  2688. LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
  2689. clearKeyStoreCache(true);
  2690. PrintMemoryStatusLog();
  2691. Owned<IMultiException> exceptions;
  2692. ForEachItemIn(c, jobChannels)
  2693. {
  2694. try
  2695. {
  2696. jobChannels.item(c).clean();
  2697. }
  2698. catch (IException *e)
  2699. {
  2700. if (!exceptions)
  2701. exceptions.setown(makeMultiException());
  2702. exceptions->append(*LINK(e));
  2703. }
  2704. }
  2705. try
  2706. {
  2707. jobChannels.kill(); // avoiding circular references. Kill before other CJobBase components are destroyed that channels reference.
  2708. ::Release(userDesc);
  2709. callThreadTerminationHooks(true); // must call any installed thread termination functions, before unloading plugins
  2710. ::Release(pluginMap);
  2711. if (!REJECTLOG(MCthorDetailedDebugInfo))
  2712. traceMemUsage();
  2713. if (numChannels > 1) // if only 1 - then channel allocator is same as sharedAllocator, leaks will be reported by the single channel
  2714. checkAndReportLeaks(sharedAllocator->queryRowManager());
  2715. }
  2716. catch (IException *e)
  2717. {
  2718. if (!exceptions)
  2719. exceptions.setown(makeMultiException());
  2720. exceptions->append(*LINK(e));
  2721. }
  2722. ClearTempDir();
  2723. if (exceptions && exceptions->ordinality())
  2724. throw exceptions.getClear();
  2725. }
  2726. void CJobBase::checkAndReportLeaks(roxiemem::IRowManager *rowManager)
  2727. {
  2728. if (!failOnLeaks) // NB: leaks reported by row manager destructor anyway
  2729. return;
  2730. if (rowManager->allocated())
  2731. {
  2732. rowManager->reportLeaks();
  2733. throw MakeThorException(TE_RowLeaksDetected, "Row leaks detected");
  2734. }
  2735. }
  2736. bool CJobBase::queryForceLogging(graph_id graphId, bool def) const
  2737. {
  2738. // JCSMORE, could add comma separated range, e.g. 1-5,10-12
  2739. if ((graphId >= forceLogGraphIdMin) && (graphId <= forceLogGraphIdMax))
  2740. return true;
  2741. return def;
  2742. }
  2743. void CJobBase::addSubGraph(IPropertyTree &xgmml)
  2744. {
  2745. CriticalBlock b(crit);
  2746. for (unsigned c=0; c<queryJobChannels(); c++)
  2747. {
  2748. CJobChannel &jobChannel = queryJobChannel(c);
  2749. Owned<CGraphBase> subGraph = jobChannel.createGraph();
  2750. subGraph->createFromXGMML(&xgmml, NULL, NULL, NULL, jobChannel.queryAllGraphs());
  2751. jobChannel.addSubGraph(*subGraph.getClear());
  2752. }
  2753. }
  2754. void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
  2755. {
  2756. for (unsigned c=0; c<queryJobChannels(); c++)
  2757. {
  2758. CJobChannel &jobChannel = queryJobChannel(c);
  2759. jobChannel.addDependencies(xgmml, failIfMissing);
  2760. }
  2761. }
  2762. bool CJobBase::queryUseCheckpoints() const
  2763. {
  2764. return globals->getPropBool("@checkPointRecovery") || 0 != getWorkUnitValueInt("checkPointRecovery", 0);
  2765. }
  2766. void CJobBase::abort(IException *e)
  2767. {
  2768. aborted = true;
  2769. for (unsigned c=0; c<queryJobChannels(); c++)
  2770. {
  2771. CJobChannel &jobChannel = queryJobChannel(c);
  2772. jobChannel.abort(e);
  2773. }
  2774. }
  2775. // these getX methods for property in workunit settings, then global setting, defaulting to provided 'dft' if not present
  2776. StringBuffer &CJobBase::getOpt(const char *opt, StringBuffer &out)
  2777. {
  2778. if (!opt || !*opt)
  2779. return out; // probably error
  2780. getWorkUnitValue(opt, out);
  2781. if (0 == out.length())
  2782. getExpertOptString(opt, out);
  2783. return out;
  2784. }
  2785. bool CJobBase::getOptBool(const char *opt, bool dft)
  2786. {
  2787. if (!opt || !*opt)
  2788. return dft; // probably error
  2789. return getWorkUnitValueBool(opt, getExpertOptBool(opt, dft));
  2790. }
  2791. int CJobBase::getOptInt(const char *opt, int dft)
  2792. {
  2793. return (int)getOptInt64(opt, dft);
  2794. }
  2795. __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
  2796. {
  2797. if (!opt || !*opt)
  2798. return dft; // probably error
  2799. return getWorkUnitValueInt(opt, getExpertOptInt64(opt, dft));
  2800. }
  2801. IThorAllocator *CJobBase::getThorAllocator(unsigned channel)
  2802. {
  2803. return sharedAllocator.getLink();
  2804. }
  2805. /// CJobChannel
  2806. CJobChannel::CJobChannel(CJobBase &_job, IMPServer *_mpServer, unsigned _channel)
  2807. : job(_job), mpServer(_mpServer), channel(_channel)
  2808. {
  2809. aborted = false;
  2810. thorAllocator.setown(job.getThorAllocator(channel));
  2811. jobComm.setown(mpServer->createCommunicator(&job.queryJobGroup()));
  2812. myrank = job.queryJobGroup().rank(queryMyNode());
  2813. graphExecutor.setown(new CGraphExecutor(*this));
  2814. myBasePort = mpServer->queryMyNode()->endpoint().port;
  2815. portMap.setown(createBitSet());
  2816. }
  2817. CJobChannel::~CJobChannel()
  2818. {
  2819. if (!cleaned)
  2820. clean();
  2821. }
  2822. INode *CJobChannel::queryMyNode()
  2823. {
  2824. return mpServer->queryMyNode();
  2825. }
  2826. void CJobChannel::wait()
  2827. {
  2828. if (graphExecutor)
  2829. graphExecutor->wait();
  2830. }
  2831. ICodeContextExt &CJobChannel::queryCodeContext() const
  2832. {
  2833. return *codeCtx;
  2834. }
  2835. ICodeContext &CJobChannel::querySharedMemCodeContext() const
  2836. {
  2837. return *sharedMemCodeCtx;
  2838. }
  2839. mptag_t CJobChannel::deserializeMPTag(MemoryBuffer &mb)
  2840. {
  2841. mptag_t tag;
  2842. deserializeMPtag(mb, tag);
  2843. if (TAG_NULL != tag)
  2844. {
  2845. LOG(MCthorDetailedDebugInfo, thorJob, "deserializeMPTag: tag = %d", (int)tag);
  2846. jobComm->flush(tag);
  2847. }
  2848. return tag;
  2849. }
  2850. IEngineRowAllocator *CJobChannel::getRowAllocator(IOutputMetaData * meta, activity_id activityId, roxiemem::RoxieHeapFlags flags) const
  2851. {
  2852. return thorAllocator->getRowAllocator(meta, activityId, flags);
  2853. }
  2854. roxiemem::IRowManager *CJobChannel::queryRowManager() const
  2855. {
  2856. return thorAllocator->queryRowManager();
  2857. }
  2858. void CJobChannel::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
  2859. {
  2860. ::addDependencies(xgmml, failIfMissing, allGraphs);
  2861. }
  2862. IThorGraphIterator *CJobChannel::getSubGraphs()
  2863. {
  2864. CriticalBlock b(crit);
  2865. return new CGraphTableIterator(subGraphs);
  2866. }
  2867. void CJobChannel::clean()
  2868. {
  2869. if (cleaned)
  2870. return;
  2871. cleaned = true;
  2872. wait();
  2873. if (!REJECTLOG(MCthorDetailedDebugInfo))
  2874. {
  2875. queryRowManager()->reportMemoryUsage(false);
  2876. LOG(MCthorDetailedDebugInfo, thorJob, "CJobBase resetting memory manager");
  2877. }
  2878. if (graphExecutor)
  2879. {
  2880. graphExecutor->queryGraphPool().stopAll();
  2881. graphExecutor.clear();
  2882. }
  2883. subGraphs.kill();
  2884. job.checkAndReportLeaks(thorAllocator->queryRowManager());
  2885. thorAllocator.clear();
  2886. codeCtx.clear();
  2887. }
  2888. void CJobChannel::startGraph(CGraphBase &graph, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract)
  2889. {
  2890. graphExecutor->add(&graph, *this, checkDependencies, parentExtractSize, parentExtract);
  2891. }
  2892. IThorResult *CJobChannel::getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId)
  2893. {
  2894. Owned<CGraphBase> graph = getGraph(gid);
  2895. if (!graph)
  2896. {
  2897. Owned<IThorException> e = MakeThorException(0, "getOwnedResult: graph not found");
  2898. e->setGraphInfo(queryJob().queryGraphName(), gid);
  2899. throw e.getClear();
  2900. }
  2901. Owned<IThorResult> result;
  2902. if (ownerId)
  2903. {
  2904. CGraphElementBase *container = graph->queryElement(ownerId);
  2905. assertex(container);
  2906. CActivityBase *activity = container->queryActivity();
  2907. IThorGraphResults *results = activity->queryResults();
  2908. if (!results)
  2909. throw MakeGraphException(graph, 0, "GraphGetResult: no results created (requesting: %d)", resultId);
  2910. result.setown(activity->queryResults()->getResult(resultId));
  2911. }
  2912. else
  2913. result.setown(graph->getResult(resultId));
  2914. if (!result)
  2915. throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
  2916. return result.getClear();
  2917. }
  2918. unsigned short CJobChannel::allocPort(unsigned num)
  2919. {
  2920. CriticalBlock block(portAllocCrit);
  2921. if (num==0)
  2922. num = 1;
  2923. unsigned sp=0;
  2924. unsigned p;
  2925. for (;;)
  2926. {
  2927. p = portMap->scan(sp, false);
  2928. unsigned q;
  2929. for (q=p+1; q<p+num; q++)
  2930. {
  2931. if (portMap->test(q))
  2932. break;
  2933. }
  2934. if (q == p+num)
  2935. {
  2936. while (q != p)
  2937. portMap->set(--q);
  2938. break;
  2939. }
  2940. sp = p+1;
  2941. }
  2942. return (unsigned short)(p+queryMyBasePort());
  2943. }
  2944. void CJobChannel::freePort(unsigned short p, unsigned num)
  2945. {
  2946. CriticalBlock block(portAllocCrit);
  2947. if (!p)
  2948. return;
  2949. if (num == 0)
  2950. num = 1;
  2951. while (num--)
  2952. portMap->set(p-queryMyBasePort()+num, false);
  2953. }
  2954. void CJobChannel::reservePortKind(ThorPortKind kind)
  2955. {
  2956. CriticalBlock block(portAllocCrit);
  2957. portMap->set(getPortOffset(kind), true);
  2958. }
  2959. void CJobChannel::abort(IException *e)
  2960. {
  2961. aborted = true;
  2962. Owned<IThorGraphIterator> iter = getSubGraphs();
  2963. ForEach (*iter)
  2964. {
  2965. CGraphBase &graph = iter->query();
  2966. graph.abort(e);
  2967. }
  2968. }
  2969. // IGraphCallback
  2970. void CJobChannel::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
  2971. {
  2972. graph.executeSubGraph(parentExtractSz, parentExtract);
  2973. }
  2974. static IThorResource *iThorResource = NULL;
  2975. void setIThorResource(IThorResource &r)
  2976. {
  2977. iThorResource = &r;
  2978. }
  2979. IThorResource &queryThor()
  2980. {
  2981. return *iThorResource;
  2982. }
  2983. //
  2984. //
  2985. //
  2986. //
  2987. CActivityBase::CActivityBase(CGraphElementBase *_container, const StatisticsMapping &statsMapping)
  2988. : container(*_container), timeActivities(_container->queryJob().queryTimeActivities()), stats(statsMapping)
  2989. {
  2990. mpTag = TAG_NULL;
  2991. abortSoon = receiving = cancelledReceive = initialized = reInit = false;
  2992. baseHelper.set(container.queryHelper());
  2993. defaultRoxieMemHeapFlags = (roxiemem::RoxieHeapFlags)container.getOptInt("heapflags", defaultHeapFlags);
  2994. if (container.queryJob().queryUsePackedAllocators())
  2995. defaultRoxieMemHeapFlags = (roxiemem::RoxieHeapFlags)(defaultRoxieMemHeapFlags | roxiemem::RHFpacked);
  2996. }
  2997. CActivityBase::~CActivityBase()
  2998. {
  2999. }
  3000. void CActivityBase::abort()
  3001. {
  3002. if (!abortSoon)
  3003. {
  3004. ::ActPrintLog(this, thorDetailedLogLevel, "Abort condition set");
  3005. abortSoon = true;
  3006. }
  3007. }
  3008. void CActivityBase::kill()
  3009. {
  3010. ownedResults.clear();
  3011. }
  3012. bool CActivityBase::appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const
  3013. {
  3014. if (!meta.hasXML())
  3015. {
  3016. target.append("<xml-unavailable/>");
  3017. return false;
  3018. }
  3019. try
  3020. {
  3021. CommonXmlWriter xmlWrite(XWFnoindent);
  3022. meta.toXML((byte *) row, xmlWrite);
  3023. target.append(xmlWrite.str());
  3024. return true;
  3025. }
  3026. catch (IException * e)
  3027. {
  3028. e->Release();
  3029. target.append("<invalid-row/>");
  3030. return false;
  3031. }
  3032. }
  3033. void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row) const
  3034. {
  3035. bool blindLogging = false; // MORE: should check a workunit/global option
  3036. if (meta.hasXML() && !blindLogging)
  3037. {
  3038. StringBuffer xml;
  3039. appendRowXml(xml, meta, row);
  3040. ActPrintLog("%s: %s", prefix, xml.str());
  3041. }
  3042. }
  3043. void CActivityBase::ActPrintLog(const char *format, ...) const
  3044. {
  3045. va_list args;
  3046. va_start(args, format);
  3047. ::ActPrintLogArgs(&queryContainer(), thorlog_null, MCdebugProgress, format, args);
  3048. va_end(args);
  3049. }
  3050. void CActivityBase::ActPrintLog(IException *e, const char *format, ...) const
  3051. {
  3052. va_list args;
  3053. va_start(args, format);
  3054. ::ActPrintLogArgs(&queryContainer(), e, thorlog_all, MCexception(e), format, args);
  3055. va_end(args);
  3056. }
  3057. void CActivityBase::ActPrintLog(IException *e) const
  3058. {
  3059. ActPrintLog(e, "%s", "");
  3060. }
  3061. IThorRowInterfaces * CActivityBase::createRowInterfaces(IOutputMetaData * meta, byte seq)
  3062. {
  3063. activity_id id = createCompoundActSeqId(queryId(), seq);
  3064. return createThorRowInterfaces(queryRowManager(), meta, id, queryHeapFlags(), queryCodeContext());
  3065. }
  3066. IThorRowInterfaces * CActivityBase::createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq)
  3067. {
  3068. activity_id id = createCompoundActSeqId(queryId(), seq);
  3069. return createThorRowInterfaces(queryRowManager(), meta, id, heapFlags, queryCodeContext());
  3070. }
  3071. bool CActivityBase::fireException(IException *e)
  3072. {
  3073. Owned<IThorException> _te;
  3074. IThorException *te = QUERYINTERFACE(e, IThorException);
  3075. if (te)
  3076. {
  3077. if (!te->queryActivityId())
  3078. setExceptionActivityInfo(container, te);
  3079. }
  3080. else
  3081. {
  3082. te = MakeActivityException(this, e);
  3083. te->setAudience(e->errorAudience());
  3084. _te.setown(te);
  3085. }
  3086. return container.queryOwner().fireException(te);
  3087. }
  3088. void CActivityBase::processAndThrowOwnedException(IException * _e)
  3089. {
  3090. IThorException *e = QUERYINTERFACE(_e, IThorException);
  3091. if (e)
  3092. {
  3093. if (!e->queryActivityId())
  3094. setExceptionActivityInfo(container, e);
  3095. }
  3096. else
  3097. {
  3098. e = MakeActivityException(this, _e);
  3099. _e->Release();
  3100. }
  3101. throw e;
  3102. }
  3103. IEngineRowAllocator * CActivityBase::queryRowAllocator()
  3104. {
  3105. if (CABallocatorlock.lock()) {
  3106. if (!rowAllocator)
  3107. {
  3108. roxiemem::RoxieHeapFlags heapFlags = queryHeapFlags();
  3109. rowAllocator.setown(getRowAllocator(queryRowMetaData(), heapFlags));
  3110. }
  3111. CABallocatorlock.unlock();
  3112. }
  3113. return rowAllocator;
  3114. }
  3115. IOutputRowSerializer * CActivityBase::queryRowSerializer()
  3116. {
  3117. if (CABserializerlock.lock()) {
  3118. if (!rowSerializer)
  3119. rowSerializer.setown(queryRowMetaData()->createDiskSerializer(queryCodeContext(),queryId()));
  3120. CABserializerlock.unlock();
  3121. }
  3122. return rowSerializer;
  3123. }
  3124. IOutputRowDeserializer * CActivityBase::queryRowDeserializer()
  3125. {
  3126. if (CABdeserializerlock.lock()) {
  3127. if (!rowDeserializer)
  3128. rowDeserializer.setown(queryRowMetaData()->createDiskDeserializer(queryCodeContext(),queryId()));
  3129. CABdeserializerlock.unlock();
  3130. }
  3131. return rowDeserializer;
  3132. }
  3133. IThorRowInterfaces *CActivityBase::getRowInterfaces()
  3134. {
  3135. // create an independent instance, to avoid circular link dependency problems
  3136. return createThorRowInterfaces(queryRowManager(), queryRowMetaData(), container.queryId(), queryHeapFlags(), queryCodeContext());
  3137. }
  3138. IEngineRowAllocator *CActivityBase::getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, byte seq) const
  3139. {
  3140. activity_id actId = createCompoundActSeqId(queryId(), seq);
  3141. return queryJobChannel().getRowAllocator(meta, actId, flags);
  3142. }
  3143. bool CActivityBase::receiveMsg(ICommunicator &comm, CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
  3144. {
  3145. BooleanOnOff onOff(receiving);
  3146. CTimeMon t(timeout);
  3147. unsigned remaining = timeout;
  3148. // check 'cancelledReceive' every 10 secs
  3149. while (!cancelledReceive && ((MP_WAIT_FOREVER==timeout) || !t.timedout(&remaining)))
  3150. {
  3151. if (comm.recv(mb, rank, mpTag, sender, remaining>10000?10000:remaining))
  3152. return true;
  3153. }
  3154. return false;
  3155. }
  3156. bool CActivityBase::receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
  3157. {
  3158. return receiveMsg(queryJobChannel().queryJobComm(), mb, rank, mpTag, sender, timeout);
  3159. }
  3160. void CActivityBase::cancelReceiveMsg(ICommunicator &comm, const rank_t rank, const mptag_t mpTag)
  3161. {
  3162. cancelledReceive = true;
  3163. if (receiving)
  3164. comm.cancel(rank, mpTag);
  3165. }
  3166. void CActivityBase::cancelReceiveMsg(const rank_t rank, const mptag_t mpTag)
  3167. {
  3168. cancelReceiveMsg(queryJobChannel().queryJobComm(), rank, mpTag);
  3169. }