thgraph.cpp 97 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "thgraph.hpp"
  14. #include "jptree.hpp"
  15. #include "commonext.hpp"
  16. #include "dasess.hpp"
  17. #include "jhtree.hpp"
  18. #include "thcodectx.hpp"
  19. #include "thbuf.hpp"
  20. #include "thormisc.hpp"
  21. #include "thbufdef.hpp"
  22. #include "thmem.hpp"
  23. PointerArray createFuncs;
  24. void registerCreateFunc(CreateFunc func)
  25. {
  26. createFuncs.append((void *)func);
  27. }
  28. ///////////////////////////////////
  29. //////
  30. /////
  31. class CThorGraphResult : public CInterface, implements IThorResult, implements IRowWriter
  32. {
  33. CActivityBase &activity;
  34. rowcount_t rowStreamCount;
  35. IOutputMetaData *meta;
  36. Owned<IRowWriterMultiReader> rowBuffer;
  37. IRowInterfaces *rowIf;
  38. IEngineRowAllocator *allocator;
  39. bool stopped, readers, distributed;
  40. void init()
  41. {
  42. stopped = readers = false;
  43. allocator = rowIf->queryRowAllocator();
  44. meta = allocator->queryOutputMeta();
  45. rowStreamCount = 0;
  46. }
  47. class CStreamWriter : public CSimpleInterface, implements IRowWriterMultiReader
  48. {
  49. CThorGraphResult &owner;
  50. CThorExpandingRowArray rows;
  51. public:
  52. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  53. CStreamWriter(CThorGraphResult &_owner) : owner(_owner), rows(owner.activity, owner.rowIf, true)
  54. {
  55. }
  56. //IRowWriterMultiReader
  57. virtual void putRow(const void *row)
  58. {
  59. rows.append(row);
  60. }
  61. virtual void flush() { }
  62. virtual IRowStream *getReader()
  63. {
  64. return rows.createRowStream(0, (rowidx_t)-1, false);
  65. }
  66. };
  67. public:
  68. IMPLEMENT_IINTERFACE;
  69. CThorGraphResult(CActivityBase &_activity, IRowInterfaces *_rowIf, bool _distributed, unsigned spillPriority) : activity(_activity), rowIf(_rowIf), distributed(_distributed)
  70. {
  71. init();
  72. if (SPILL_PRIORITY_DISABLE == spillPriority)
  73. rowBuffer.setown(new CStreamWriter(*this));
  74. else
  75. rowBuffer.setown(createOverflowableBuffer(activity, rowIf, true, true));
  76. }
  77. // IRowWriter
  78. virtual void putRow(const void *row)
  79. {
  80. assertex(!readers);
  81. ++rowStreamCount;
  82. rowBuffer->putRow(row);
  83. }
  84. virtual void flush() { }
  85. virtual offset_t getPosition() { UNIMPLEMENTED; return 0; }
  86. // IThorResult
  87. virtual IRowWriter *getWriter()
  88. {
  89. return LINK(this);
  90. }
  91. virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count)
  92. {
  93. assertex(!readers);
  94. rowBuffer.setown(stream);
  95. rowStreamCount = count;
  96. }
  97. virtual IRowStream *getRowStream()
  98. {
  99. readers = true;
  100. return rowBuffer->getReader();
  101. }
  102. virtual IRowInterfaces *queryRowInterfaces() { return rowIf; }
  103. virtual CActivityBase *queryActivity() { return &activity; }
  104. virtual bool isDistributed() const { return distributed; }
  105. virtual void serialize(MemoryBuffer &mb)
  106. {
  107. Owned<IRowStream> stream = getRowStream();
  108. bool grouped = meta->isGrouped();
  109. if (grouped)
  110. {
  111. OwnedConstThorRow prev = stream->nextRow();
  112. if (prev)
  113. {
  114. bool eog;
  115. loop
  116. {
  117. eog = false;
  118. OwnedConstThorRow next = stream->nextRow();
  119. if (!next)
  120. eog = true;
  121. size32_t sz = meta->getRecordSize(prev);
  122. mb.append(sz, prev.get());
  123. mb.append(eog);
  124. if (!next)
  125. {
  126. next.setown(stream->nextRow());
  127. if (!next)
  128. break;
  129. }
  130. prev.set(next);
  131. }
  132. }
  133. }
  134. else
  135. {
  136. loop
  137. {
  138. OwnedConstThorRow row = stream->nextRow();
  139. if (!row)
  140. break;
  141. size32_t sz = meta->getRecordSize(row);
  142. mb.append(sz, row.get());
  143. }
  144. }
  145. }
  146. virtual void getResult(size32_t &len, void * & data)
  147. {
  148. MemoryBuffer mb;
  149. serialize(mb);
  150. len = mb.length();
  151. data = mb.detach();
  152. }
  153. virtual void getLinkedResult(unsigned &countResult, byte * * & result)
  154. {
  155. assertex(rowStreamCount==((unsigned)rowStreamCount)); // catch, just in case
  156. Owned<IRowStream> stream = getRowStream();
  157. countResult = 0;
  158. OwnedConstThorRow _rowset = allocator->createRowset((unsigned)rowStreamCount);
  159. const void **rowset = (const void **)_rowset.get();
  160. while (countResult < rowStreamCount)
  161. {
  162. OwnedConstThorRow row = stream->nextRow();
  163. rowset[countResult++] = row.getClear();
  164. }
  165. result = (byte **)_rowset.getClear();
  166. }
  167. virtual const void * getLinkedRowResult()
  168. {
  169. assertex(rowStreamCount==1); // catch, just in case
  170. Owned<IRowStream> stream = getRowStream();
  171. return stream->nextRow();
  172. }
  173. };
  174. /////
  175. IThorResult *CThorGraphResults::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  176. {
  177. Owned<IThorResult> result = ::createResult(activity, rowIf, distributed, spillPriority);
  178. setResult(id, result);
  179. return result;
  180. }
  181. /////
  182. IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  183. {
  184. return new CThorGraphResult(activity, rowIf, distributed, spillPriority);
  185. }
  186. /////
  187. class CThorBoundLoopGraph : public CInterface, implements IThorBoundLoopGraph
  188. {
  189. CGraphBase *graph;
  190. activity_id activityId;
  191. Linked<IOutputMetaData> resultMeta;
  192. Owned<IOutputMetaData> counterMeta, loopAgainMeta;
  193. Owned<IRowInterfaces> resultRowIf, countRowIf, loopAgainRowIf;
  194. public:
  195. IMPLEMENT_IINTERFACE;
  196. CThorBoundLoopGraph(CGraphBase *_graph, IOutputMetaData * _resultMeta, unsigned _activityId) : graph(_graph), resultMeta(_resultMeta), activityId(_activityId)
  197. {
  198. counterMeta.setown(createFixedSizeMetaData(sizeof(thor_loop_counter_t)));
  199. loopAgainMeta.setown(createFixedSizeMetaData(sizeof(bool)));
  200. }
  201. virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos)
  202. {
  203. if (!countRowIf)
  204. countRowIf.setown(createRowInterfaces(counterMeta, activityId, activity.queryCodeContext()));
  205. RtlDynamicRowBuilder counterRow(countRowIf->queryRowAllocator());
  206. thor_loop_counter_t * res = (thor_loop_counter_t *)counterRow.ensureCapacity(sizeof(thor_loop_counter_t),NULL);
  207. *res = loopCounter;
  208. OwnedConstThorRow counterRowFinal = counterRow.finalizeRowClear(sizeof(thor_loop_counter_t));
  209. IThorResult *counterResult = results->createResult(activity, pos, countRowIf, false, SPILL_PRIORITY_DISABLE);
  210. Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
  211. counterResultWriter->putRow(counterRowFinal.getClear());
  212. }
  213. virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
  214. {
  215. if (!loopAgainRowIf)
  216. loopAgainRowIf.setown(createRowInterfaces(loopAgainMeta, activityId, activity.queryCodeContext()));
  217. activity.queryGraph().createResult(activity, pos, results, loopAgainRowIf, !activity.queryGraph().isLocalChild(), SPILL_PRIORITY_DISABLE);
  218. }
  219. virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results)
  220. {
  221. if (!resultRowIf)
  222. resultRowIf.setown(createRowInterfaces(resultMeta, activityId, activity.queryCodeContext()));
  223. IThorResult *loopResult = results->createResult(activity, 0, resultRowIf, !activity.queryGraph().isLocalChild()); // loop output
  224. IThorResult *inputResult = results->createResult(activity, 1, resultRowIf, !activity.queryGraph().isLocalChild()); // loop input
  225. }
  226. virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
  227. {
  228. if (counter)
  229. graph->setLoopCounter(counter);
  230. Owned<IThorResult> inputResult = results->getResult(1);
  231. if (inputStream)
  232. inputResult->setResultStream(inputStream, rowStreamCount);
  233. graph->executeChild(parentExtractSz, parentExtract, results, NULL);
  234. }
  235. virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
  236. {
  237. Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
  238. if (counter)
  239. {
  240. prepareCounterResult(activity, results, counter, 0);
  241. graph->setLoopCounter(counter);
  242. }
  243. try
  244. {
  245. graph->executeChild(parentExtractSz, parentExtract, results, graphLoopResults);
  246. }
  247. catch (IException *e)
  248. {
  249. IThorException *te = QUERYINTERFACE(e, IThorException);
  250. if (!te)
  251. {
  252. Owned<IThorException> e2 = MakeActivityException(&activity, e, "Exception running child graphs");
  253. e->Release();
  254. te = e2.getClear();
  255. }
  256. else if (!te->queryActivityId())
  257. setExceptionActivityInfo(activity.queryContainer(), te);
  258. try { graph->abort(te); }
  259. catch (IException *abortE)
  260. {
  261. Owned<IThorException> e2 = MakeActivityException(&activity, abortE, "Exception whilst aborting graph");
  262. abortE->Release();
  263. EXCLOG(e2, NULL);
  264. }
  265. graph->queryJob().fireException(te);
  266. throw te;
  267. }
  268. }
  269. virtual CGraphBase *queryGraph() { return graph; }
  270. };
  271. IThorBoundLoopGraph *createBoundLoopGraph(CGraphBase *graph, IOutputMetaData *resultMeta, unsigned activityId)
  272. {
  273. return new CThorBoundLoopGraph(graph, resultMeta, activityId);
  274. }
  275. ///////////////////////////////////
  276. bool isDiskInput(ThorActivityKind kind)
  277. {
  278. switch (kind)
  279. {
  280. case TAKcsvread:
  281. case TAKxmlread:
  282. case TAKjsonread:
  283. case TAKdiskread:
  284. case TAKdisknormalize:
  285. case TAKdiskaggregate:
  286. case TAKdiskcount:
  287. case TAKdiskgroupaggregate:
  288. case TAKindexread:
  289. case TAKindexcount:
  290. case TAKindexnormalize:
  291. case TAKindexaggregate:
  292. case TAKindexgroupaggregate:
  293. case TAKindexgroupexists:
  294. case TAKindexgroupcount:
  295. return true;
  296. default:
  297. return false;
  298. }
  299. }
  300. void CIOConnection::connect(unsigned which, CActivityBase *destActivity)
  301. {
  302. destActivity->setInput(which, activity->queryActivity(), index);
  303. }
  304. ///////////////////////////////////
  305. CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph)
  306. {
  307. CGraphElementBase *container = NULL;
  308. ForEachItemIn(m, createFuncs)
  309. {
  310. CreateFunc func = (CreateFunc)createFuncs.item(m);
  311. container = func(node, owner, resultsGraph);
  312. if (container) break;
  313. }
  314. if (NULL == container)
  315. {
  316. ThorActivityKind tak = (ThorActivityKind)node.getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
  317. throw MakeStringException(TE_UnsupportedActivityKind, "Unsupported activity kind: %s", activityKindStr(tak));
  318. }
  319. container->setResultsGraph(resultsGraph);
  320. return container;
  321. }
  322. CGraphElementBase::CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml) : owner(&_owner)
  323. {
  324. xgmml.setown(createPTreeFromIPT(&_xgmml));
  325. eclText.set(xgmml->queryProp("att[@name=\"ecl\"]/@value"));
  326. id = xgmml->getPropInt("@id", 0);
  327. kind = (ThorActivityKind)xgmml->getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
  328. sink = isActivitySink(kind);
  329. bool coLocal = xgmml->getPropBool("att[@name=\"coLocal\"]/@value", false);
  330. isLocalData = xgmml->getPropBool("att[@name=\"local\"]/@value", false); // local execute + local data access only
  331. isLocal = isLocalData || coLocal; // local execute
  332. isGrouped = xgmml->getPropBool("att[@name=\"grouped\"]/@value", false);
  333. resultsGraph = NULL;
  334. ownerId = xgmml->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
  335. onCreateCalled = onStartCalled = prepared = haveCreateCtx = haveStartCtx = nullAct = false;
  336. onlyUpdateIfChanged = xgmml->getPropBool("att[@name=\"_updateIfChanged\"]/@value", false);
  337. StringBuffer helperName("fAc");
  338. xgmml->getProp("@id", helperName);
  339. helperFactory = (EclHelperFactory) queryJob().queryDllEntry().getEntry(helperName.str());
  340. if (!helperFactory)
  341. throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), queryJob().queryDllEntry().getInstance());
  342. alreadyUpdated = false;
  343. whichBranch = (unsigned)-1;
  344. whichBranchBitSet.setown(createThreadSafeBitSet());
  345. newWhichBranch = false;
  346. hasNullInput = false;
  347. log = true;
  348. sentActInitData.setown(createThreadSafeBitSet());
  349. }
  350. CGraphElementBase::~CGraphElementBase()
  351. {
  352. activity.clear();
  353. baseHelper.clear(); // clear before dll is unloaded
  354. }
  355. CJobBase &CGraphElementBase::queryJob() const
  356. {
  357. return owner->queryJob();
  358. }
  359. IGraphTempHandler *CGraphElementBase::queryTempHandler() const
  360. {
  361. if (resultsGraph)
  362. return resultsGraph->queryTempHandler();
  363. else
  364. return queryJob().queryTempHandler();
  365. }
  366. void CGraphElementBase::releaseIOs()
  367. {
  368. loopGraph.clear();
  369. associatedChildGraphs.kill();
  370. if (activity)
  371. activity->releaseIOs();
  372. connectedInputs.kill();
  373. inputs.kill();
  374. outputs.kill();
  375. activity.clear();
  376. }
  377. void CGraphElementBase::addDependsOn(CGraphBase *graph, int controlId)
  378. {
  379. ForEachItemIn(i, dependsOn)
  380. {
  381. if (dependsOn.item(i).graph == graph)
  382. return;
  383. }
  384. dependsOn.append(*new CGraphDependency(graph, controlId));
  385. }
  386. IThorGraphIterator *CGraphElementBase::getAssociatedChildGraphs() const
  387. {
  388. return new CGraphArrayIterator(associatedChildGraphs);
  389. }
  390. IThorGraphDependencyIterator *CGraphElementBase::getDependsIterator() const
  391. {
  392. return new ArrayIIteratorOf<const CGraphDependencyArray, CGraphDependency, IThorGraphDependencyIterator>(dependsOn);
  393. }
  394. void CGraphElementBase::reset()
  395. {
  396. onStartCalled = false;
  397. // prepared = false;
  398. if (activity)
  399. activity->reset();
  400. }
  401. void CGraphElementBase::ActPrintLog(const char *format, ...)
  402. {
  403. va_list args;
  404. va_start(args, format);
  405. ::ActPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
  406. va_end(args);
  407. }
  408. void CGraphElementBase::ActPrintLog(IException *e, const char *format, ...)
  409. {
  410. va_list args;
  411. va_start(args, format);
  412. ::ActPrintLogArgs(this, e, thorlog_all, MCexception(e), format, args);
  413. va_end(args);
  414. }
  415. void CGraphElementBase::ActPrintLog(IException *e)
  416. {
  417. ActPrintLog(e, "%s", "");
  418. }
  419. void CGraphElementBase::abort(IException *e)
  420. {
  421. CActivityBase *activity = queryActivity();
  422. if (activity)
  423. activity->abort();
  424. Owned<IThorGraphIterator> graphIter = getAssociatedChildGraphs();
  425. ForEach (*graphIter)
  426. {
  427. graphIter->query().abort(e);
  428. }
  429. }
  430. void CGraphElementBase::doconnect()
  431. {
  432. ForEachItemIn(i, connectedInputs)
  433. {
  434. CIOConnection *io = connectedInputs.item(i);
  435. if (io)
  436. io->connect(i, queryActivity());
  437. }
  438. }
  439. void CGraphElementBase::clearConnections()
  440. {
  441. connectedInputs.kill();
  442. connectedOutputs.kill();
  443. if (activity)
  444. activity->clearConnections();
  445. }
  446. void CGraphElementBase::addInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
  447. {
  448. while (inputs.ordinality()<=input) inputs.append(NULL);
  449. inputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
  450. while (inputAct->outputs.ordinality()<=inputOutIdx) inputAct->outputs.append(NULL);
  451. inputAct->outputs.replace(new CIOConnection(this, input), inputOutIdx);
  452. }
  453. void CGraphElementBase::connectInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx)
  454. {
  455. ActPrintLog("CONNECTING (id=%" ACTPF "d, idx=%d) to (id=%" ACTPF "d, idx=%d)", inputAct->queryId(), inputOutIdx, queryId(), input);
  456. while (connectedInputs.ordinality()<=input) connectedInputs.append(NULL);
  457. connectedInputs.replace(new COwningSimpleIOConnection(LINK(inputAct), inputOutIdx), input);
  458. while (inputAct->connectedOutputs.ordinality()<=inputOutIdx) inputAct->connectedOutputs.append(NULL);
  459. inputAct->connectedOutputs.replace(new CIOConnection(this, input), inputOutIdx);
  460. }
  461. void CGraphElementBase::addAssociatedChildGraph(CGraphBase *childGraph)
  462. {
  463. if (!associatedChildGraphs.contains(*childGraph))
  464. associatedChildGraphs.append(*LINK(childGraph));
  465. }
  466. void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
  467. {
  468. if (!onCreateCalled) return;
  469. DelayedSizeMarker sizeMark(mb);
  470. queryHelper()->serializeCreateContext(mb);
  471. sizeMark.write();
  472. }
  473. void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
  474. {
  475. if (!onStartCalled) return;
  476. DelayedSizeMarker sizeMark(mb);
  477. queryHelper()->serializeStartContext(mb);
  478. sizeMark.write();
  479. }
  480. void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
  481. {
  482. size32_t createCtxLen;
  483. mb.read(createCtxLen);
  484. createCtxMb.clear().append(createCtxLen, mb.readDirect(createCtxLen));
  485. haveCreateCtx = true;
  486. }
  487. void CGraphElementBase::deserializeStartContext(MemoryBuffer &mb)
  488. {
  489. size32_t startCtxLen;
  490. mb.read(startCtxLen);
  491. startCtxMb.append(startCtxLen, mb.readDirect(startCtxLen));
  492. haveStartCtx = true;
  493. onStartCalled = false; // allow to be called again
  494. }
  495. void CGraphElementBase::onCreate()
  496. {
  497. CriticalBlock b(crit);
  498. if (onCreateCalled)
  499. return;
  500. onCreateCalled = true;
  501. baseHelper.setown(helperFactory());
  502. if (!nullAct)
  503. {
  504. CGraphElementBase *ownerActivity = owner->queryOwner() ? owner->queryOwner()->queryElement(ownerId) : NULL;
  505. if (ownerActivity)
  506. {
  507. ownerActivity->onCreate(); // ensure owner created, might not be if this is child query inside another child query.
  508. baseHelper->onCreate(queryCodeContext(), ownerActivity->queryHelper(), haveCreateCtx?&createCtxMb:NULL);
  509. }
  510. else
  511. baseHelper->onCreate(queryCodeContext(), NULL, haveCreateCtx?&createCtxMb:NULL);
  512. }
  513. }
  514. void CGraphElementBase::onStart(size32_t parentExtractSz, const byte *parentExtract)
  515. {
  516. if (onStartCalled)
  517. return;
  518. onStartCalled = true;
  519. if (!nullAct)
  520. {
  521. if (haveStartCtx)
  522. {
  523. baseHelper->onStart(parentExtract, &startCtxMb);
  524. startCtxMb.reset();
  525. haveStartCtx = false;
  526. }
  527. else
  528. baseHelper->onStart(parentExtract, NULL);
  529. }
  530. }
  531. bool CGraphElementBase::executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async)
  532. {
  533. Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
  534. ForEach(*deps)
  535. {
  536. CGraphDependency &dep = deps->query();
  537. if (dep.controlId == controlId)
  538. dep.graph->execute(parentExtractSz, parentExtract, true, async);
  539. if (owner->queryJob().queryAborted() || owner->queryAborted()) return false;
  540. }
  541. return true;
  542. }
  543. bool CGraphElementBase::prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
  544. {
  545. try
  546. {
  547. bool _shortCircuit = shortCircuit;
  548. Owned<IThorGraphDependencyIterator> deps = getDependsIterator();
  549. bool depsDone = true;
  550. ForEach(*deps)
  551. {
  552. CGraphDependency &dep = deps->query();
  553. if (0 == dep.controlId && NotFound == owner->dependentSubGraphs.find(*dep.graph))
  554. {
  555. owner->dependentSubGraphs.append(*dep.graph);
  556. if (!dep.graph->isComplete())
  557. depsDone = false;
  558. }
  559. }
  560. if (depsDone) _shortCircuit = false;
  561. if (!depsDone && checkDependencies)
  562. {
  563. if (!executeDependencies(parentExtractSz, parentExtract, 0, async))
  564. return false;
  565. }
  566. whichBranch = (unsigned)-1;
  567. hasNullInput = false;
  568. alreadyUpdated = false;
  569. switch (getKind())
  570. {
  571. case TAKindexwrite:
  572. case TAKdiskwrite:
  573. case TAKcsvwrite:
  574. case TAKxmlwrite:
  575. case TAKjsonwrite:
  576. if (_shortCircuit) return true;
  577. onCreate();
  578. alreadyUpdated = checkUpdate();
  579. if (alreadyUpdated)
  580. return false;
  581. break;
  582. case TAKchildif:
  583. owner->ifs.append(*this);
  584. // fall through
  585. case TAKif:
  586. case TAKifaction:
  587. {
  588. if (_shortCircuit) return true;
  589. onCreate();
  590. onStart(parentExtractSz, parentExtract);
  591. IHThorIfArg *helper = (IHThorIfArg *)baseHelper.get();
  592. whichBranch = helper->getCondition() ? 0 : 1; // True argument precedes false...
  593. /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
  594. * It should be removed, once we are positive there are no issues with in-line conditional actions
  595. */
  596. if (TAKifaction == getKind())
  597. {
  598. if (!executeDependencies(parentExtractSz, parentExtract, whichBranch+1, async)) //NB whenId 1 based
  599. return false;
  600. }
  601. if (inputs.queryItem(whichBranch))
  602. {
  603. if (!whichBranchBitSet->testSet(whichBranch)) // if not set, new
  604. newWhichBranch = true;
  605. return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
  606. }
  607. return true;
  608. }
  609. case TAKchildcase:
  610. owner->ifs.append(*this);
  611. // fall through
  612. case TAKcase:
  613. {
  614. if (_shortCircuit) return true;
  615. onCreate();
  616. onStart(parentExtractSz, parentExtract);
  617. IHThorCaseArg *helper = (IHThorCaseArg *)baseHelper.get();
  618. whichBranch = helper->getBranch();
  619. if (whichBranch >= inputs.ordinality())
  620. whichBranch = inputs.ordinality()-1;
  621. if (inputs.queryItem(whichBranch))
  622. return inputs.item(whichBranch)->activity->prepareContext(parentExtractSz, parentExtract, checkDependencies, false, async);
  623. return true;
  624. }
  625. case TAKfilter:
  626. case TAKfiltergroup:
  627. case TAKfilterproject:
  628. {
  629. if (_shortCircuit) return true;
  630. onCreate();
  631. onStart(parentExtractSz, parentExtract);
  632. switch (getKind())
  633. {
  634. case TAKfilter:
  635. hasNullInput = !((IHThorFilterArg *)baseHelper.get())->canMatchAny();
  636. break;
  637. case TAKfiltergroup:
  638. hasNullInput = !((IHThorFilterGroupArg *)baseHelper.get())->canMatchAny();
  639. break;
  640. case TAKfilterproject:
  641. hasNullInput = !((IHThorFilterProjectArg *)baseHelper.get())->canMatchAny();
  642. break;
  643. }
  644. if (hasNullInput)
  645. return true;
  646. break;
  647. }
  648. case TAKsequential:
  649. case TAKparallel:
  650. {
  651. /* NB - The executeDependencies code below is only needed if actionLinkInNewGraph=true, which is no longer the default
  652. * It should be removed, once we are positive there are no issues with in-line sequential/parallel activities
  653. */
  654. for (unsigned s=1; s<=dependsOn.ordinality(); s++)
  655. {
  656. if (!executeDependencies(parentExtractSz, parentExtract, s, async))
  657. return false;
  658. }
  659. break;
  660. }
  661. case TAKwhen_dataset:
  662. case TAKwhen_action:
  663. {
  664. if (!executeDependencies(parentExtractSz, parentExtract, WhenBeforeId, async))
  665. return false;
  666. if (!executeDependencies(parentExtractSz, parentExtract, WhenParallelId, async))
  667. return false;
  668. break;
  669. }
  670. }
  671. ForEachItemIn(i, inputs)
  672. {
  673. CGraphElementBase *input = inputs.item(i)->activity;
  674. if (!input->prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
  675. return false;
  676. }
  677. return true;
  678. }
  679. catch (IException *_e)
  680. {
  681. IThorException *e = QUERYINTERFACE(_e, IThorException);
  682. if (e)
  683. {
  684. if (!e->queryActivityId())
  685. setExceptionActivityInfo(*this, e);
  686. }
  687. else
  688. {
  689. e = MakeActivityException(this, _e);
  690. _e->Release();
  691. }
  692. throw e;
  693. }
  694. }
  695. void CGraphElementBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
  696. {
  697. activity->preStart(parentExtractSz, parentExtract);
  698. }
  699. void CGraphElementBase::initActivity()
  700. {
  701. CriticalBlock b(crit);
  702. if (isSink())
  703. owner->addActiveSink(*this);
  704. if (activity) // no need to recreate
  705. return;
  706. activity.setown(factory());
  707. if (isLoopActivity(*this))
  708. {
  709. unsigned loopId = queryXGMML().getPropInt("att[@name=\"_loopid\"]/@value");
  710. Owned<CGraphBase> childGraph = owner->getChildGraph(loopId);
  711. Owned<IThorBoundLoopGraph> boundLoopGraph = createBoundLoopGraph(childGraph, baseHelper->queryOutputMeta(), queryId());
  712. setBoundGraph(boundLoopGraph);
  713. }
  714. }
  715. void CGraphElementBase::createActivity(size32_t parentExtractSz, const byte *parentExtract)
  716. {
  717. if (connectedInputs.ordinality()) // ensure not traversed twice (e.g. via splitter)
  718. return;
  719. try
  720. {
  721. switch (getKind())
  722. {
  723. case TAKchildif:
  724. case TAKchildcase:
  725. {
  726. if (inputs.queryItem(whichBranch))
  727. {
  728. CGraphElementBase *input = inputs.item(whichBranch)->activity;
  729. input->createActivity(parentExtractSz, parentExtract);
  730. }
  731. onCreate();
  732. initActivity();
  733. if (inputs.queryItem(whichBranch))
  734. {
  735. CIOConnection *inputIO = inputs.item(whichBranch);
  736. connectInput(whichBranch, inputIO->activity, inputIO->index);
  737. }
  738. break;
  739. }
  740. case TAKif:
  741. case TAKcase:
  742. if (inputs.queryItem(whichBranch))
  743. {
  744. CGraphElementBase *input = inputs.item(whichBranch)->activity;
  745. input->createActivity(parentExtractSz, parentExtract);
  746. }
  747. else
  748. {
  749. onCreate();
  750. if (!activity)
  751. factorySet(TAKnull);
  752. }
  753. break;
  754. case TAKifaction:
  755. if (inputs.queryItem(whichBranch))
  756. {
  757. CGraphElementBase *input = inputs.item(whichBranch)->activity;
  758. input->createActivity(parentExtractSz, parentExtract);
  759. }
  760. break;
  761. case TAKsequential:
  762. case TAKparallel:
  763. {
  764. ForEachItemIn(i, inputs)
  765. {
  766. if (inputs.queryItem(i))
  767. {
  768. CGraphElementBase *input = inputs.item(i)->activity;
  769. input->createActivity(parentExtractSz, parentExtract);
  770. }
  771. }
  772. break;
  773. }
  774. default:
  775. if (!hasNullInput)
  776. {
  777. ForEachItemIn(i, inputs)
  778. {
  779. CGraphElementBase *input = inputs.item(i)->activity;
  780. input->createActivity(parentExtractSz, parentExtract);
  781. }
  782. onCreate();
  783. if (isDiskInput(getKind()))
  784. onStart(parentExtractSz, parentExtract);
  785. ForEachItemIn(i2, inputs)
  786. {
  787. CIOConnection *inputIO = inputs.item(i2);
  788. loop
  789. {
  790. CGraphElementBase *input = inputIO->activity;
  791. switch (input->getKind())
  792. {
  793. case TAKif:
  794. case TAKcase:
  795. {
  796. if (input->whichBranch >= input->getInputs()) // if, will have TAKnull activity, made at create time.
  797. {
  798. input = NULL;
  799. break;
  800. }
  801. inputIO = input->inputs.item(input->whichBranch);
  802. assertex(inputIO);
  803. break;
  804. }
  805. default:
  806. input = NULL;
  807. break;
  808. }
  809. if (!input)
  810. break;
  811. }
  812. connectInput(i2, inputIO->activity, inputIO->index);
  813. }
  814. }
  815. initActivity();
  816. break;
  817. }
  818. }
  819. catch (IException *e) { ActPrintLog(e); activity.clear(); throw; }
  820. }
  821. ICodeContext *CGraphElementBase::queryCodeContext()
  822. {
  823. return queryOwner().queryCodeContext();
  824. }
  825. /////
  826. // JCSMORE loop - probably need better way to check if any act in graph is global(meaning needs some synchronization between slaves in activity execution)
  827. bool isGlobalActivity(CGraphElementBase &container)
  828. {
  829. switch (container.getKind())
  830. {
  831. // always global, but only co-ordinate init/done
  832. case TAKcsvwrite:
  833. case TAKxmlwrite:
  834. case TAKjsonwrite:
  835. case TAKindexwrite:
  836. case TAKkeydiff:
  837. case TAKkeypatch:
  838. case TAKdictionaryworkunitwrite:
  839. return true;
  840. case TAKdiskwrite:
  841. {
  842. Owned<IHThorDiskWriteArg> helper = (IHThorDiskWriteArg *)container.helperFactory();
  843. unsigned flags = helper->getFlags();
  844. return (0 == (TDXtemporary & flags)); // global if not temporary
  845. }
  846. case TAKspill:
  847. return false;
  848. case TAKcsvread:
  849. {
  850. Owned<IHThorCsvReadArg> helper = (IHThorCsvReadArg *)container.helperFactory();
  851. // if header lines, then [may] need to co-ordinate across slaves
  852. if (container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
  853. return false;
  854. return helper->queryCsvParameters()->queryHeaderLen() > 0;
  855. }
  856. // dependent on child acts?
  857. case TAKlooprow:
  858. case TAKloopcount:
  859. case TAKgraphloop:
  860. case TAKparallelgraphloop:
  861. case TAKloopdataset:
  862. return false;
  863. // dependent on local/grouped
  864. case TAKkeyeddistribute:
  865. case TAKhashdistribute:
  866. case TAKhashdistributemerge:
  867. case TAKworkunitwrite:
  868. case TAKdistribution:
  869. case TAKpartition:
  870. case TAKdiskaggregate:
  871. case TAKdiskcount:
  872. case TAKdiskgroupaggregate:
  873. case TAKindexaggregate:
  874. case TAKindexcount:
  875. case TAKindexgroupaggregate:
  876. case TAKindexgroupexists:
  877. case TAKindexgroupcount:
  878. case TAKremoteresult:
  879. case TAKcountproject:
  880. case TAKcreaterowlimit:
  881. case TAKskiplimit:
  882. case TAKlimit:
  883. case TAKsort:
  884. case TAKdedup:
  885. case TAKjoin:
  886. case TAKselfjoin:
  887. case TAKhashjoin:
  888. case TAKsmartjoin:
  889. case TAKkeyeddenormalize:
  890. case TAKhashdenormalize:
  891. case TAKdenormalize:
  892. case TAKlookupdenormalize: //GH->JCS why are these here, and join not?
  893. case TAKalldenormalize:
  894. case TAKsmartdenormalize:
  895. case TAKdenormalizegroup:
  896. case TAKhashdenormalizegroup:
  897. case TAKlookupdenormalizegroup:
  898. case TAKkeyeddenormalizegroup:
  899. case TAKalldenormalizegroup:
  900. case TAKsmartdenormalizegroup:
  901. case TAKaggregate:
  902. case TAKexistsaggregate:
  903. case TAKcountaggregate:
  904. case TAKhashaggregate:
  905. case TAKhashdedup:
  906. case TAKrollup:
  907. case TAKiterate:
  908. case TAKselectn:
  909. case TAKfirstn:
  910. case TAKenth:
  911. case TAKsample:
  912. case TAKgroup:
  913. case TAKchoosesets:
  914. case TAKchoosesetsenth:
  915. case TAKchoosesetslast:
  916. case TAKtopn:
  917. case TAKprocess:
  918. case TAKchildcount:
  919. case TAKwhen_dataset:
  920. case TAKwhen_action:
  921. case TAKnonempty:
  922. if (!container.queryLocalOrGrouped())
  923. return true;
  924. break;
  925. case TAKkeyedjoin:
  926. case TAKalljoin:
  927. case TAKlookupjoin:
  928. if (!container.queryLocal())
  929. return true;
  930. // always local
  931. case TAKfilter:
  932. case TAKfilterproject:
  933. case TAKfiltergroup:
  934. case TAKsplit:
  935. case TAKpipewrite:
  936. case TAKdegroup:
  937. case TAKproject:
  938. case TAKprefetchproject:
  939. case TAKprefetchcountproject:
  940. case TAKnormalize:
  941. case TAKnormalizechild:
  942. case TAKnormalizelinkedchild:
  943. case TAKpipethrough:
  944. case TAKif:
  945. case TAKchildif:
  946. case TAKchildcase:
  947. case TAKcase:
  948. case TAKparse:
  949. case TAKpiperead:
  950. case TAKxmlparse:
  951. case TAKjoinlight:
  952. case TAKselfjoinlight:
  953. case TAKdiskread:
  954. case TAKdisknormalize:
  955. case TAKchildaggregate:
  956. case TAKchildgroupaggregate:
  957. case TAKchildthroughnormalize:
  958. case TAKchildnormalize:
  959. case TAKindexread:
  960. case TAKindexnormalize:
  961. case TAKxmlread:
  962. case TAKjsonread:
  963. case TAKdiskexists:
  964. case TAKindexexists:
  965. case TAKchildexists:
  966. case TAKthroughaggregate:
  967. case TAKmerge:
  968. case TAKfunnel:
  969. case TAKregroup:
  970. case TAKcombine:
  971. case TAKrollupgroup:
  972. case TAKcombinegroup:
  973. case TAKsoap_rowdataset:
  974. case TAKhttp_rowdataset:
  975. case TAKsoap_rowaction:
  976. case TAKsoap_datasetdataset:
  977. case TAKsoap_datasetaction:
  978. case TAKlinkedrawiterator:
  979. case TAKchilditerator:
  980. case TAKstreamediterator:
  981. case TAKworkunitread:
  982. case TAKchilddataset:
  983. case TAKinlinetable:
  984. case TAKnull:
  985. case TAKemptyaction:
  986. case TAKlocalresultread:
  987. case TAKlocalresultwrite:
  988. case TAKdictionaryresultwrite:
  989. case TAKgraphloopresultread:
  990. case TAKgraphloopresultwrite:
  991. case TAKnwaygraphloopresultread:
  992. case TAKapply:
  993. case TAKsideeffect:
  994. case TAKsimpleaction:
  995. case TAKsorted:
  996. case TAKdistributed:
  997. case TAKtrace:
  998. break;
  999. case TAKnwayjoin:
  1000. case TAKnwaymerge:
  1001. case TAKnwaymergejoin:
  1002. case TAKnwayinput:
  1003. case TAKnwayselect:
  1004. return false; // JCSMORE - I think and/or have to be for now
  1005. // undefined
  1006. case TAKdatasetresult:
  1007. case TAKrowresult:
  1008. case TAKremotegraph:
  1009. case TAKlibrarycall:
  1010. default:
  1011. return true; // if in doubt
  1012. }
  1013. return false;
  1014. }
  1015. bool isLoopActivity(CGraphElementBase &container)
  1016. {
  1017. switch (container.getKind())
  1018. {
  1019. case TAKlooprow:
  1020. case TAKloopcount:
  1021. case TAKloopdataset:
  1022. case TAKgraphloop:
  1023. case TAKparallelgraphloop:
  1024. return true;
  1025. }
  1026. return false;
  1027. }
  1028. /////
  1029. CGraphBase::CGraphBase(CJobBase &_job) : job(_job)
  1030. {
  1031. xgmml = NULL;
  1032. parent = owner = NULL;
  1033. graphId = 0;
  1034. complete = false;
  1035. reinit = false; // should graph reinitialize each time it is called (e.g. in loop graphs)
  1036. // This is currently for 'init' (Create time) info and onStart into
  1037. sentInitData = false;
  1038. // sentStartCtx = false;
  1039. sentStartCtx = true; // JCSMORE - disable for now
  1040. parentActivityId = 0;
  1041. created = connected = started = graphDone = aborted = prepared = false;
  1042. startBarrier = waitBarrier = doneBarrier = NULL;
  1043. mpTag = waitBarrierTag = startBarrierTag = doneBarrierTag = TAG_NULL;
  1044. executeReplyTag = TAG_NULL;
  1045. parentExtractSz = 0;
  1046. counter = 0; // loop/graph counter, will be set by loop/graph activity if needed
  1047. }
  1048. CGraphBase::~CGraphBase()
  1049. {
  1050. clean();
  1051. }
  1052. void CGraphBase::clean()
  1053. {
  1054. ::Release(startBarrier);
  1055. ::Release(waitBarrier);
  1056. ::Release(doneBarrier);
  1057. localResults.clear();
  1058. graphLoopResults.clear();
  1059. childGraphsTable.kill();
  1060. childGraphs.kill();
  1061. disconnectActivities();
  1062. containers.kill();
  1063. sinks.kill();
  1064. activeSinks.kill();
  1065. }
  1066. void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
  1067. {
  1068. DelayedSizeMarker sizeMark(mb);
  1069. Owned<IThorActivityIterator> iter = getIterator();
  1070. ForEach (*iter)
  1071. {
  1072. CGraphElementBase &element = iter->query();
  1073. if (element.isOnCreated())
  1074. {
  1075. mb.append(element.queryId());
  1076. element.serializeCreateContext(mb);
  1077. }
  1078. }
  1079. mb.append((activity_id)0);
  1080. sizeMark.write();
  1081. }
  1082. void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
  1083. {
  1084. DelayedSizeMarker sizeMark(mb);
  1085. Owned<IThorActivityIterator> iter = getIterator();
  1086. ForEach (*iter)
  1087. {
  1088. CGraphElementBase &element = iter->query();
  1089. if (element.isOnStarted())
  1090. {
  1091. mb.append(element.queryId());
  1092. element.serializeStartContext(mb);
  1093. }
  1094. }
  1095. mb.append((activity_id)0);
  1096. sizeMark.write();
  1097. }
  1098. void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
  1099. {
  1100. activity_id id;
  1101. loop
  1102. {
  1103. mb.read(id);
  1104. if (0 == id) break;
  1105. CGraphElementBase *element = queryElement(id);
  1106. assertex(element);
  1107. element->deserializeCreateContext(mb);
  1108. }
  1109. }
  1110. void CGraphBase::deserializeStartContexts(MemoryBuffer &mb)
  1111. {
  1112. activity_id id;
  1113. loop
  1114. {
  1115. mb.read(id);
  1116. if (0 == id) break;
  1117. CGraphElementBase *element = queryElement(id);
  1118. assertex(element);
  1119. element->deserializeStartContext(mb);
  1120. }
  1121. }
  1122. void CGraphBase::reset()
  1123. {
  1124. setCompleteEx(false);
  1125. graphCancelHandler.reset();
  1126. if (0 == containers.count())
  1127. {
  1128. SuperHashIteratorOf<CGraphBase> iter(childGraphsTable);
  1129. ForEach(iter)
  1130. iter.query().reset();
  1131. }
  1132. else
  1133. {
  1134. Owned<IThorActivityIterator> iter = getIterator();
  1135. ForEach(*iter)
  1136. {
  1137. CGraphElementBase &element = iter->query();
  1138. element.reset();
  1139. }
  1140. dependentSubGraphs.kill();
  1141. }
  1142. if (!queryOwner() || isGlobal())
  1143. job.queryTimeReporter().reset();
  1144. if (!queryOwner())
  1145. clearNodeStats();
  1146. }
  1147. void CGraphBase::addChildGraph(CGraphBase &graph)
  1148. {
  1149. CriticalBlock b(crit);
  1150. childGraphs.append(graph);
  1151. childGraphsTable.replace(graph);
  1152. job.associateGraph(graph);
  1153. }
  1154. IThorGraphIterator *CGraphBase::getChildGraphs() const
  1155. {
  1156. CriticalBlock b(crit);
  1157. return new CGraphArrayCopyIterator(childGraphs);
  1158. }
  1159. bool CGraphBase::fireException(IException *e)
  1160. {
  1161. return job.fireException(e);
  1162. }
  1163. bool CGraphBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
  1164. {
  1165. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1166. ForEach(*iter)
  1167. {
  1168. CGraphElementBase &element = iter->query();
  1169. element.preStart(parentExtractSz, parentExtract);
  1170. }
  1171. return true;
  1172. }
  1173. void CGraphBase::executeSubGraph(size32_t parentExtractSz, const byte *parentExtract)
  1174. {
  1175. CriticalBlock b(executeCrit);
  1176. if (job.queryPausing())
  1177. return;
  1178. Owned<IException> exception;
  1179. try
  1180. {
  1181. if (!prepare(parentExtractSz, parentExtract, false, false, false))
  1182. {
  1183. setCompleteEx();
  1184. return;
  1185. }
  1186. try
  1187. {
  1188. if (!queryOwner())
  1189. {
  1190. StringBuffer s;
  1191. toXML(&queryXGMML(), s, 2);
  1192. GraphPrintLog("Running graph [%s] : %s", isGlobal()?"global":"local", s.str());
  1193. }
  1194. create(parentExtractSz, parentExtract);
  1195. }
  1196. catch (IException *e)
  1197. {
  1198. Owned<IThorException> e2 = MakeThorException(e);
  1199. e2->setGraphId(graphId);
  1200. e2->setAction(tea_abort);
  1201. job.fireException(e2);
  1202. throw;
  1203. }
  1204. if (localResults)
  1205. localResults->clear();
  1206. doExecute(parentExtractSz, parentExtract, false);
  1207. }
  1208. catch (IException *e)
  1209. {
  1210. GraphPrintLog(e);
  1211. exception.setown(e);
  1212. }
  1213. if (!queryOwner())
  1214. {
  1215. GraphPrintLog("Graph Done");
  1216. StringBuffer memStr;
  1217. getSystemTraceInfo(memStr, PerfMonStandard | PerfMonExtended);
  1218. GraphPrintLog("%s", memStr.str());
  1219. }
  1220. if (exception)
  1221. throw exception.getClear();
  1222. }
  1223. void CGraphBase::execute(size32_t _parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async)
  1224. {
  1225. parentExtractSz = _parentExtractSz;
  1226. if (isComplete())
  1227. return;
  1228. if (async)
  1229. queryJob().startGraph(*this, queryJob(), checkDependencies, parentExtractSz, parentExtract); // may block if enough running
  1230. else
  1231. {
  1232. if (!prepare(parentExtractSz, parentExtract, checkDependencies, async, async))
  1233. {
  1234. setComplete();
  1235. return;
  1236. }
  1237. executeSubGraph(parentExtractSz, parentExtract);
  1238. }
  1239. }
  1240. void CGraphBase::doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies)
  1241. {
  1242. if (isComplete()) return;
  1243. if (queryAborted())
  1244. {
  1245. if (abortException)
  1246. throw abortException.getLink();
  1247. throw MakeGraphException(this, 0, "subgraph aborted(1)");
  1248. }
  1249. if (!prepare(parentExtractSz, parentExtract, checkDependencies, false, false))
  1250. {
  1251. setComplete();
  1252. return;
  1253. }
  1254. if (queryAborted())
  1255. {
  1256. if (abortException)
  1257. throw abortException.getLink();
  1258. throw MakeGraphException(this, 0, "subgraph aborted(2)");
  1259. }
  1260. Owned<IException> exception;
  1261. try
  1262. {
  1263. if (started)
  1264. reset();
  1265. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1266. ForEach(*iter)
  1267. {
  1268. CGraphElementBase &element = iter->query();
  1269. element.onStart(parentExtractSz, parentExtract);
  1270. }
  1271. if (!preStart(parentExtractSz, parentExtract)) return;
  1272. start();
  1273. if (!wait(aborted?MEDIUMTIMEOUT:INFINITE)) // can't wait indefinitely, query may have aborted and stall, but prudent to wait a short time for underlying graphs to unwind.
  1274. GraphPrintLogEx(this, thorlog_null, MCuserWarning, "Graph wait cancelled, aborted=%s", aborted?"true":"false");
  1275. else
  1276. graphDone = true;
  1277. }
  1278. catch (IException *e)
  1279. {
  1280. GraphPrintLog(e);
  1281. exception.setown(e);
  1282. }
  1283. try
  1284. {
  1285. if (!exception && abortException)
  1286. exception.setown(abortException.getClear());
  1287. if (exception)
  1288. {
  1289. if (NULL == owner || isGlobal())
  1290. waitBarrier->cancel(exception);
  1291. if (!queryOwner())
  1292. {
  1293. StringBuffer str;
  1294. Owned<IThorException> e = MakeGraphException(this, exception->errorCode(), "%s", exception->errorMessage(str).str());
  1295. e->setGraphId(graphId);
  1296. e->setAction(tea_abort);
  1297. fireException(e);
  1298. }
  1299. }
  1300. }
  1301. catch (IException *e)
  1302. {
  1303. GraphPrintLog(e, "during abort()");
  1304. e->Release();
  1305. }
  1306. try
  1307. {
  1308. done();
  1309. if (doneBarrier)
  1310. doneBarrier->wait(false);
  1311. }
  1312. catch (IException *e)
  1313. {
  1314. GraphPrintLog(e);
  1315. if (!exception.get())
  1316. exception.setown(e);
  1317. else
  1318. e->Release();
  1319. }
  1320. end();
  1321. if (exception)
  1322. throw exception.getClear();
  1323. if (!queryAborted())
  1324. setComplete();
  1325. }
  1326. bool CGraphBase::prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async)
  1327. {
  1328. if (isComplete()) return false;
  1329. bool needToExecute = false;
  1330. ifs.kill();
  1331. ForEachItemIn(s, sinks)
  1332. {
  1333. CGraphElementBase &sink = sinks.item(s);
  1334. if (sink.prepareContext(parentExtractSz, parentExtract, checkDependencies, shortCircuit, async))
  1335. needToExecute = true;
  1336. }
  1337. // prepared = true;
  1338. return needToExecute;
  1339. }
  1340. void CGraphBase::create(size32_t parentExtractSz, const byte *parentExtract)
  1341. {
  1342. Owned<IThorActivityIterator> iter = getIterator();
  1343. ForEach(*iter)
  1344. {
  1345. CGraphElementBase &element = iter->query();
  1346. element.clearConnections();
  1347. }
  1348. activeSinks.kill(); // NB: activeSinks are added to during activity creation
  1349. ForEachItemIn(s, sinks)
  1350. {
  1351. CGraphElementBase &sink = sinks.item(s);
  1352. sink.createActivity(parentExtractSz, parentExtract);
  1353. }
  1354. created = true;
  1355. }
  1356. void CGraphBase::done()
  1357. {
  1358. if (aborted) return; // activity done methods only called on success
  1359. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1360. ForEach (*iter)
  1361. {
  1362. CGraphElementBase &element = iter->query();
  1363. element.queryActivity()->done();
  1364. }
  1365. }
  1366. bool CGraphBase::syncInitData()
  1367. {
  1368. CGraphElementBase *parentElement = queryOwner() ? queryOwner()->queryElement(queryParentActivityId()) : NULL;
  1369. if (parentElement && isLoopActivity(*parentElement))
  1370. return parentElement->queryLoopGraph()->queryGraph()->isGlobal();
  1371. else
  1372. return !isLocalChild();
  1373. }
  1374. void CGraphBase::end()
  1375. {
  1376. // always called, any final action clear up
  1377. Owned<IThorActivityIterator> iter = getIterator();
  1378. ForEach(*iter)
  1379. {
  1380. CGraphElementBase &element = iter->query();
  1381. try
  1382. {
  1383. if (element.queryActivity())
  1384. element.queryActivity()->kill();
  1385. }
  1386. catch (IException *e)
  1387. {
  1388. Owned<IException> e2 = MakeActivityException(element.queryActivity(), e, "Error calling kill()");
  1389. GraphPrintLog(e2);
  1390. e->Release();
  1391. }
  1392. }
  1393. }
  1394. class CGraphTraverseIteratorBase : public CInterface, implements IThorActivityIterator
  1395. {
  1396. protected:
  1397. CGraphBase &graph;
  1398. Linked<CGraphElementBase> cur;
  1399. CIArrayOf<CGraphElementBase> others;
  1400. CGraphElementArrayCopy covered;
  1401. CGraphElementBase *popNext()
  1402. {
  1403. if (!others.ordinality())
  1404. {
  1405. cur.clear();
  1406. return NULL;
  1407. }
  1408. cur.setown(&others.popGet());
  1409. return cur;
  1410. }
  1411. CGraphElementBase *setNext(CIOConnectionArray &inputs, unsigned whichInput=((unsigned)-1))
  1412. {
  1413. cur.clear();
  1414. unsigned n = inputs.ordinality();
  1415. if (((unsigned)-1) != whichInput)
  1416. {
  1417. CIOConnection *io = inputs.queryItem(whichInput);
  1418. if (io)
  1419. cur.set(io->activity);
  1420. }
  1421. else
  1422. {
  1423. bool first = true;
  1424. unsigned i=0;
  1425. for (; i<n; i++)
  1426. {
  1427. CIOConnection *io = inputs.queryItem(i);
  1428. if (io)
  1429. {
  1430. if (first)
  1431. {
  1432. first = false;
  1433. cur.set(io->activity);
  1434. }
  1435. else
  1436. others.append(*LINK(io->activity));
  1437. }
  1438. }
  1439. }
  1440. if (!cur)
  1441. {
  1442. if (!popNext())
  1443. return NULL;
  1444. }
  1445. // check haven't been here before
  1446. loop
  1447. {
  1448. if (cur->getOutputs() < 2)
  1449. break;
  1450. else if (NotFound == covered.find(*cur))
  1451. {
  1452. if (!cur->alreadyUpdated)
  1453. {
  1454. covered.append(*cur);
  1455. break;
  1456. }
  1457. }
  1458. if (!popNext())
  1459. return NULL;
  1460. }
  1461. return cur.get();
  1462. }
  1463. public:
  1464. IMPLEMENT_IINTERFACE;
  1465. CGraphTraverseIteratorBase(CGraphBase &_graph) : graph(_graph)
  1466. {
  1467. }
  1468. virtual bool first()
  1469. {
  1470. covered.kill();
  1471. others.kill();
  1472. cur.clear();
  1473. Owned<IThorActivityIterator> sinkIter = graph.getSinkIterator();
  1474. if (!sinkIter->first())
  1475. return false;
  1476. loop
  1477. {
  1478. cur.set(& sinkIter->query());
  1479. if (!cur->alreadyUpdated)
  1480. break;
  1481. if (!sinkIter->next())
  1482. return false;
  1483. }
  1484. while (sinkIter->next())
  1485. others.append(sinkIter->get());
  1486. return true;
  1487. }
  1488. virtual bool isValid() { return NULL != cur.get(); }
  1489. virtual CGraphElementBase & query() { return *cur; }
  1490. CGraphElementBase & get() { return *LINK(cur); }
  1491. };
  1492. class CGraphTraverseConnectedIterator : public CGraphTraverseIteratorBase
  1493. {
  1494. public:
  1495. CGraphTraverseConnectedIterator(CGraphBase &graph) : CGraphTraverseIteratorBase(graph) { }
  1496. virtual bool next()
  1497. {
  1498. if (cur->hasNullInput)
  1499. {
  1500. do
  1501. {
  1502. if (!popNext())
  1503. return false;
  1504. }
  1505. while (cur->hasNullInput);
  1506. }
  1507. else
  1508. setNext(cur->connectedInputs);
  1509. return NULL!=cur.get();
  1510. }
  1511. };
  1512. IThorActivityIterator *CGraphBase::getConnectedIterator()
  1513. {
  1514. return new CGraphTraverseConnectedIterator(*this);
  1515. }
  1516. bool CGraphBase::wait(unsigned timeout)
  1517. {
  1518. CTimeMon tm(timeout);
  1519. unsigned remaining = timeout;
  1520. class CWaitException
  1521. {
  1522. CGraphBase *graph;
  1523. Owned<IException> exception;
  1524. public:
  1525. CWaitException(CGraphBase *_graph) : graph(_graph) { }
  1526. IException *get() { return exception; }
  1527. void set(IException *e)
  1528. {
  1529. if (!exception)
  1530. exception.setown(e);
  1531. else
  1532. e->Release();
  1533. }
  1534. void throwException()
  1535. {
  1536. if (exception)
  1537. throw exception.getClear();
  1538. throw MakeGraphException(graph, 0, "Timed out waiting for graph to end");
  1539. }
  1540. } waitException(this);
  1541. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1542. ForEach (*iter)
  1543. {
  1544. CGraphElementBase &element = iter->query();
  1545. CActivityBase *activity = element.queryActivity();
  1546. if (INFINITE != timeout && tm.timedout(&remaining))
  1547. waitException.throwException();
  1548. try
  1549. {
  1550. if (!activity->wait(remaining))
  1551. waitException.throwException();
  1552. }
  1553. catch (IException *e)
  1554. {
  1555. waitException.set(e); // will discard if already set
  1556. if (timeout == INFINITE)
  1557. {
  1558. unsigned e = tm.elapsed();
  1559. if (e >= MEDIUMTIMEOUT)
  1560. waitException.throwException();
  1561. timeout = MEDIUMTIMEOUT-e;
  1562. tm.reset(timeout);
  1563. }
  1564. }
  1565. }
  1566. if (waitException.get())
  1567. waitException.throwException();
  1568. // synchronize all slaves to end of graphs
  1569. if (NULL == owner || isGlobal())
  1570. {
  1571. if (INFINITE != timeout && tm.timedout(&remaining))
  1572. waitException.throwException();
  1573. if (!waitBarrier->wait(true, remaining))
  1574. return false;
  1575. }
  1576. return true;
  1577. }
  1578. void CGraphBase::abort(IException *e)
  1579. {
  1580. if (aborted)
  1581. return;
  1582. {
  1583. CriticalBlock cb(crit);
  1584. abortException.set(e);
  1585. aborted = true;
  1586. graphCancelHandler.cancel(0);
  1587. if (0 == containers.count())
  1588. {
  1589. Owned<IThorGraphIterator> iter = getChildGraphs();
  1590. ForEach(*iter)
  1591. {
  1592. CGraphBase &graph = iter->query();
  1593. graph.abort(e);
  1594. }
  1595. }
  1596. }
  1597. if (started && !graphDone)
  1598. {
  1599. Owned<IThorActivityIterator> iter = getConnectedIterator();
  1600. ForEach (*iter)
  1601. {
  1602. iter->query().abort(e); // JCSMORE - could do in parallel, they can take some time to timeout
  1603. }
  1604. if (startBarrier)
  1605. startBarrier->cancel(e);
  1606. if (waitBarrier)
  1607. waitBarrier->cancel(e);
  1608. if (doneBarrier)
  1609. doneBarrier->cancel(e);
  1610. }
  1611. }
  1612. void CGraphBase::GraphPrintLog(const char *format, ...)
  1613. {
  1614. va_list args;
  1615. va_start(args, format);
  1616. ::GraphPrintLogArgs(this, thorlog_null, MCdebugProgress, format, args);
  1617. va_end(args);
  1618. }
  1619. void CGraphBase::GraphPrintLog(IException *e, const char *format, ...)
  1620. {
  1621. va_list args;
  1622. va_start(args, format);
  1623. ::GraphPrintLogArgs(this, e, thorlog_null, MCdebugProgress, format, args);
  1624. va_end(args);
  1625. }
  1626. void CGraphBase::GraphPrintLog(IException *e)
  1627. {
  1628. GraphPrintLog(e, "%s", "");
  1629. }
  1630. void CGraphBase::setLogging(bool tf)
  1631. {
  1632. Owned<IThorActivityIterator> iter = getIterator();
  1633. ForEach(*iter)
  1634. iter->query().setLogging(tf);
  1635. }
  1636. void CGraphBase::createFromXGMML(IPropertyTree *_node, CGraphBase *_owner, CGraphBase *_parent, CGraphBase *resultsGraph)
  1637. {
  1638. owner = _owner;
  1639. parent = _parent?_parent:owner;
  1640. node.setown(createPTreeFromIPT(_node));
  1641. xgmml = node->queryPropTree("att/graph");
  1642. sink = xgmml->getPropBool("att[@name=\"rootGraph\"]/@value", false);
  1643. sequential = xgmml->getPropBool("@sequential");
  1644. graphId = node->getPropInt("@id");
  1645. global = false;
  1646. localOnly = -1; // unset
  1647. parentActivityId = node->getPropInt("att[@name=\"_parentActivity\"]/@value", 0);
  1648. CGraphBase *graphContainer = this;
  1649. if (resultsGraph)
  1650. graphContainer = resultsGraph; // JCSMORE is this right?
  1651. graphCodeContext.setContext(graphContainer, (ICodeContextExt *)&job.queryCodeContext());
  1652. unsigned numResults = xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0);
  1653. if (numResults)
  1654. {
  1655. localResults.setown(createThorGraphResults(numResults));
  1656. resultsGraph = this;
  1657. // JCSMORE - it might more sense if this temp handler was owned by parent act., which may finish(get stopped) earlier than the owning graph
  1658. tmpHandler.setown(queryJob().createTempHandler(false));
  1659. }
  1660. localChild = false;
  1661. if (owner && parentActivityId)
  1662. {
  1663. CGraphElementBase *parentElement = owner->queryElement(parentActivityId);
  1664. parentElement->addAssociatedChildGraph(this);
  1665. if (isLoopActivity(*parentElement))
  1666. localChild = parentElement->queryOwner().isLocalChild();
  1667. else
  1668. localChild = true;
  1669. }
  1670. Owned<IPropertyTreeIterator> nodes = xgmml->getElements("node");
  1671. ForEach(*nodes)
  1672. {
  1673. IPropertyTree &e = nodes->query();
  1674. ThorActivityKind kind = (ThorActivityKind) e.getPropInt("att[@name=\"_kind\"]/@value");
  1675. if (TAKsubgraph == kind)
  1676. {
  1677. Owned<CGraphBase> subGraph = job.createGraph();
  1678. subGraph->createFromXGMML(&e, this, parent, resultsGraph);
  1679. addChildGraph(*LINK(subGraph));
  1680. if (!global)
  1681. global = subGraph->isGlobal();
  1682. }
  1683. else
  1684. {
  1685. if (localChild && !e.getPropBool("att[@name=\"coLocal\"]/@value", false))
  1686. {
  1687. IPropertyTree *att = createPTree("att");
  1688. att->setProp("@name", "coLocal");
  1689. att->setPropBool("@value", true);
  1690. e.addPropTree("att", att);
  1691. }
  1692. CGraphElementBase *act = createGraphElement(e, *this, resultsGraph);
  1693. addActivity(act);
  1694. if (!global)
  1695. global = isGlobalActivity(*act);
  1696. }
  1697. }
  1698. Owned<IPropertyTreeIterator> edges = xgmml->getElements("edge");
  1699. ForEach(*edges)
  1700. {
  1701. IPropertyTree &edge = edges->query();
  1702. unsigned sourceOutput = edge.getPropInt("att[@name=\"_sourceIndex\"]/@value", 0);
  1703. unsigned targetInput = edge.getPropInt("att[@name=\"_targetIndex\"]/@value", 0);
  1704. CGraphElementBase *source = queryElement(edge.getPropInt("@source"));
  1705. CGraphElementBase *target = queryElement(edge.getPropInt("@target"));
  1706. target->addInput(targetInput, source, sourceOutput);
  1707. }
  1708. Owned<IThorActivityIterator> iter = getIterator();
  1709. ForEach(*iter)
  1710. {
  1711. CGraphElementBase &element = iter->query();
  1712. if (0 == element.getOutputs())
  1713. {
  1714. /* JCSMORE - Making some outputs conditional, will require:
  1715. * a) Pass through information as to which dependent graph causes this graph (and this sink) to execute)
  1716. * b) Allow the subgraph to re-executed by other dependent subgraphs and avoid re-executing completed sinks
  1717. * c) Keep common points (splitters) around (preferably in memory), re-execution of graph will need them
  1718. */
  1719. sinks.append(*LINK(&element));
  1720. }
  1721. }
  1722. init();
  1723. }
  1724. void CGraphBase::executeChildGraphs(size32_t parentExtractSz, const byte *parentExtract)
  1725. {
  1726. // JCSMORE - would need to respect codegen 'sequential' flag, if these child graphs
  1727. // could be executed in parallel.
  1728. Owned<IThorGraphIterator> iter = getChildGraphs();
  1729. ForEach(*iter)
  1730. {
  1731. CGraphBase &graph = iter->query();
  1732. if (graph.isSink())
  1733. graph.execute(parentExtractSz, parentExtract, true, false);
  1734. }
  1735. }
  1736. void CGraphBase::doExecuteChild(size32_t parentExtractSz, const byte *parentExtract)
  1737. {
  1738. reset();
  1739. if (0 == containers.count())
  1740. executeChildGraphs(parentExtractSz, parentExtract);
  1741. else
  1742. execute(parentExtractSz, parentExtract, false, false);
  1743. queryTempHandler()->clearTemps();
  1744. }
  1745. void CGraphBase::executeChild(size32_t & retSize, void * &ret, size32_t parentExtractSz, const byte *parentExtract)
  1746. {
  1747. reset();
  1748. doExecute(parentExtractSz, parentExtract, false);
  1749. UNIMPLEMENTED;
  1750. /*
  1751. ForEachItemIn(idx1, elements)
  1752. {
  1753. EclGraphElement & cur = elements.item(idx1);
  1754. if (cur.isResult)
  1755. {
  1756. cur.extractResult(retSize, ret);
  1757. return;
  1758. }
  1759. }
  1760. */
  1761. throwUnexpected();
  1762. }
  1763. void CGraphBase::setResults(IThorGraphResults *results) // used by master only
  1764. {
  1765. localResults.set(results);
  1766. }
  1767. void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *_graphLoopResults)
  1768. {
  1769. localResults.set(results);
  1770. graphLoopResults.set(_graphLoopResults);
  1771. doExecuteChild(parentExtractSz, parentExtract);
  1772. graphLoopResults.clear();
  1773. localResults.clear();
  1774. }
  1775. StringBuffer &getGlobals(CGraphBase &graph, StringBuffer &str)
  1776. {
  1777. bool first = true;
  1778. Owned<IThorActivityIterator> iter = graph.getIterator();
  1779. ForEach(*iter)
  1780. {
  1781. CGraphElementBase &e = iter->query();
  1782. if (isGlobalActivity(e))
  1783. {
  1784. if (first)
  1785. str.append("Graph(").append(graph.queryGraphId()).append("): [");
  1786. else
  1787. str.append(", ");
  1788. first = false;
  1789. ThorActivityKind kind = e.getKind();
  1790. str.append(activityKindStr(kind));
  1791. str.append("(").append(e.queryId()).append(")");
  1792. }
  1793. }
  1794. if (!first)
  1795. str.append("]");
  1796. Owned<IThorGraphIterator> childIter = graph.getChildGraphs();
  1797. ForEach(*childIter)
  1798. {
  1799. CGraphBase &childGraph = childIter->query();
  1800. getGlobals(childGraph, str);
  1801. }
  1802. return str;
  1803. }
  1804. void CGraphBase::executeChild(size32_t parentExtractSz, const byte *parentExtract)
  1805. {
  1806. assertex(localResults);
  1807. localResults->clear();
  1808. if (isGlobal()) // any slave
  1809. {
  1810. StringBuffer str("Global acts = ");
  1811. getGlobals(*this, str);
  1812. throw MakeGraphException(this, 0, "Global child graph? : %s", str.str());
  1813. }
  1814. doExecuteChild(parentExtractSz, parentExtract);
  1815. }
  1816. IThorResult *CGraphBase::getResult(unsigned id, bool distributed)
  1817. {
  1818. return localResults->getResult(id, distributed);
  1819. }
  1820. IThorResult *CGraphBase::getGraphLoopResult(unsigned id, bool distributed)
  1821. {
  1822. return graphLoopResults->getResult(id, distributed);
  1823. }
  1824. IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  1825. {
  1826. return results->createResult(activity, id, rowIf, distributed, spillPriority);
  1827. }
  1828. IThorResult *CGraphBase::createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  1829. {
  1830. return localResults->createResult(activity, id, rowIf, distributed, spillPriority);
  1831. }
  1832. IThorResult *CGraphBase::createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority)
  1833. {
  1834. return graphLoopResults->createResult(activity, rowIf, distributed, spillPriority);
  1835. }
  1836. // IEclGraphResults
  1837. void CGraphBase::getDictionaryResult(unsigned & count, byte * * & ret, unsigned id)
  1838. {
  1839. Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
  1840. result->getLinkedResult(count, ret);
  1841. }
  1842. void CGraphBase::getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
  1843. {
  1844. Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
  1845. result->getLinkedResult(count, ret);
  1846. }
  1847. const void * CGraphBase::getLinkedRowResult(unsigned id)
  1848. {
  1849. Owned<IThorResult> result = getResult(id, true); // will get collated distributed result
  1850. return result->getLinkedRowResult();
  1851. }
  1852. // IThorChildGraph impl.
  1853. IEclGraphResults *CGraphBase::evaluate(unsigned _parentExtractSz, const byte *parentExtract)
  1854. {
  1855. CriticalBlock block(evaluateCrit);
  1856. localResults.setown(createThorGraphResults(xgmml->getPropInt("att[@name=\"_numResults\"]/@value", 0)));
  1857. parentExtractSz = _parentExtractSz;
  1858. executeChild(parentExtractSz, parentExtract);
  1859. return localResults.getClear();
  1860. }
  1861. static bool isLocalOnly(const CGraphElementBase &activity);
  1862. static bool isLocalOnly(const CGraphBase &graph) // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
  1863. {
  1864. if (0 == graph.activityCount())
  1865. {
  1866. Owned<IThorGraphIterator> iter = graph.getChildGraphs();
  1867. ForEach(*iter)
  1868. {
  1869. CGraphBase &childGraph = iter->query();
  1870. if (childGraph.isSink())
  1871. {
  1872. if (!isLocalOnly(childGraph))
  1873. return false;
  1874. }
  1875. }
  1876. }
  1877. else
  1878. {
  1879. if (graph.isGlobal())
  1880. return false;
  1881. Owned<IThorActivityIterator> sinkIter = graph.getAllSinkIterator();
  1882. ForEach(*sinkIter)
  1883. {
  1884. CGraphElementBase &sink = sinkIter->query();
  1885. if (!isLocalOnly(sink))
  1886. return false;
  1887. }
  1888. }
  1889. return true;
  1890. }
  1891. static bool isLocalOnly(const CGraphElementBase &activity)
  1892. {
  1893. Owned<IThorGraphDependencyIterator> deps = activity.getDependsIterator();
  1894. ForEach(*deps)
  1895. {
  1896. if (!isLocalOnly(*(deps->query().graph)))
  1897. return false;
  1898. }
  1899. StringBuffer match("edge[@target=\"");
  1900. match.append(activity.queryId()).append("\"]");
  1901. Owned<IPropertyTreeIterator> inputs = activity.queryOwner().queryXGMML().getElements(match.str());
  1902. ForEach(*inputs)
  1903. {
  1904. CGraphElementBase *sourceAct = activity.queryOwner().queryElement(inputs->query().getPropInt("@source"));
  1905. if (!isLocalOnly(*sourceAct))
  1906. return false;
  1907. }
  1908. return true;
  1909. }
  1910. bool CGraphBase::isLocalOnly() const // checks all dependencies, if something needs to be global, whole body is forced to be execution sync.
  1911. {
  1912. if (-1 == localOnly)
  1913. localOnly = (int)::isLocalOnly(*this);
  1914. return 1==localOnly;
  1915. }
  1916. IThorGraphResults *CGraphBase::createThorGraphResults(unsigned num)
  1917. {
  1918. return new CThorGraphResults(num);
  1919. }
  1920. ////
  1921. void CGraphTempHandler::registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters)
  1922. {
  1923. assertex(temp);
  1924. LOG(MCdebugProgress, thorJob, "registerTmpFile name=%s, usageCount=%d", name, usageCount);
  1925. CriticalBlock b(crit);
  1926. if (tmpFiles.find(name))
  1927. throw MakeThorException(TE_FileAlreadyUsedAsTempFile, "File already used as temp file (%s)", name);
  1928. tmpFiles.replace(* new CFileUsageEntry(name, graphId, fileKind, usageCount));
  1929. }
  1930. void CGraphTempHandler::deregisterFile(const char *name, bool kept)
  1931. {
  1932. LOG(MCdebugProgress, thorJob, "deregisterTmpFile name=%s", name);
  1933. CriticalBlock b(crit);
  1934. CFileUsageEntry *fileUsage = tmpFiles.find(name);
  1935. if (!fileUsage)
  1936. {
  1937. if (errorOnMissing)
  1938. throw MakeThorException(TE_FileNotFound, "File not found (%s) deregistering tmp file", name);
  1939. return;
  1940. }
  1941. if (0 == fileUsage->queryUsage()) // marked 'not to be deleted' until workunit complete.
  1942. return;
  1943. else if (1 == fileUsage->queryUsage())
  1944. {
  1945. tmpFiles.remove(name);
  1946. try
  1947. {
  1948. if (!removeTemp(name))
  1949. LOG(MCwarning, unknownJob, "Failed to delete tmp file : %s (not found)", name);
  1950. }
  1951. catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(name).str()); }
  1952. }
  1953. else
  1954. fileUsage->decUsage();
  1955. }
  1956. void CGraphTempHandler::clearTemps()
  1957. {
  1958. CriticalBlock b(crit);
  1959. Owned<IFileUsageIterator> iter = getIterator();
  1960. ForEach(*iter)
  1961. {
  1962. CFileUsageEntry &entry = iter->query();
  1963. const char *tmpname = entry.queryName();
  1964. try
  1965. {
  1966. if (!removeTemp(tmpname))
  1967. LOG(MCwarning, thorJob, "Failed to delete tmp file : %s (not found)", tmpname);
  1968. }
  1969. catch (IException *e) { StringBuffer s("Failed to delete tmp file : "); FLLOG(MCwarning, thorJob, e, s.append(tmpname).str()); }
  1970. }
  1971. iter.clear();
  1972. tmpFiles.kill();
  1973. }
  1974. /////
  1975. class CGraphExecutor;
  1976. class CGraphExecutorGraphInfo : public CInterface
  1977. {
  1978. public:
  1979. CGraphExecutorGraphInfo(CGraphExecutor &_executor, CGraphBase *_subGraph, IGraphCallback &_callback, const byte *parentExtract, size32_t parentExtractSz) : executor(_executor), subGraph(_subGraph), callback(_callback)
  1980. {
  1981. parentExtractMb.append(parentExtractSz, parentExtract);
  1982. }
  1983. CGraphExecutor &executor;
  1984. IGraphCallback &callback;
  1985. Linked<CGraphBase> subGraph;
  1986. MemoryBuffer parentExtractMb;
  1987. };
  1988. class CGraphExecutor : public CInterface, implements IGraphExecutor
  1989. {
  1990. CJobBase &job;
  1991. CIArrayOf<CGraphExecutorGraphInfo> stack, running, toRun;
  1992. UnsignedArray seen;
  1993. bool stopped;
  1994. unsigned limit;
  1995. unsigned waitOnRunning;
  1996. CriticalSection crit;
  1997. Semaphore runningSem;
  1998. Owned<IThreadPool> graphPool;
  1999. class CGraphExecutorFactory : public CInterface, implements IThreadFactory
  2000. {
  2001. CGraphExecutor &executor;
  2002. public:
  2003. IMPLEMENT_IINTERFACE;
  2004. CGraphExecutorFactory(CGraphExecutor &_executor) : executor(_executor) { }
  2005. // IThreadFactory
  2006. virtual IPooledThread *createNew()
  2007. {
  2008. class CGraphExecutorThread : public CInterface, implements IPooledThread
  2009. {
  2010. Owned<CGraphExecutorGraphInfo> graphInfo;
  2011. public:
  2012. IMPLEMENT_IINTERFACE;
  2013. CGraphExecutorThread()
  2014. {
  2015. }
  2016. void init(void *startInfo)
  2017. {
  2018. graphInfo.setown((CGraphExecutorGraphInfo *)startInfo);
  2019. }
  2020. void main()
  2021. {
  2022. loop
  2023. {
  2024. Linked<CGraphBase> graph = graphInfo->subGraph;
  2025. Owned<IException> e;
  2026. try
  2027. {
  2028. PROGLOG("CGraphExecutor: Running graph, graphId=%" GIDPF "d", graph->queryGraphId());
  2029. graphInfo->callback.runSubgraph(*graph, graphInfo->parentExtractMb.length(), (const byte *)graphInfo->parentExtractMb.toByteArray());
  2030. }
  2031. catch (IException *_e)
  2032. {
  2033. e.setown(_e);
  2034. }
  2035. Owned<CGraphExecutorGraphInfo> nextGraphInfo;
  2036. try
  2037. {
  2038. nextGraphInfo.setown(graphInfo->executor.graphDone(*graphInfo, e));
  2039. }
  2040. catch (IException *e)
  2041. {
  2042. GraphPrintLog(graph, e, "graphDone");
  2043. e->Release();
  2044. }
  2045. if (e)
  2046. throw e.getClear();
  2047. if (!nextGraphInfo)
  2048. return;
  2049. graphInfo.setown(nextGraphInfo.getClear());
  2050. }
  2051. }
  2052. bool canReuse() { return true; }
  2053. bool stop() { return true; }
  2054. };
  2055. return new CGraphExecutorThread();
  2056. }
  2057. } *factory;
  2058. CGraphExecutorGraphInfo *findRunning(graph_id gid)
  2059. {
  2060. ForEachItemIn(r, running)
  2061. {
  2062. CGraphExecutorGraphInfo *graphInfo = &running.item(r);
  2063. if (gid == graphInfo->subGraph->queryGraphId())
  2064. return graphInfo;
  2065. }
  2066. return NULL;
  2067. }
  2068. public:
  2069. IMPLEMENT_IINTERFACE;
  2070. CGraphExecutor(CJobBase &_job) : job(_job)
  2071. {
  2072. limit = (unsigned)job.getWorkUnitValueInt("concurrentSubGraphs", globals->getPropInt("@concurrentSubGraphs", 1));
  2073. PROGLOG("CGraphExecutor: limit = %d", limit);
  2074. waitOnRunning = 0;
  2075. stopped = false;
  2076. factory = new CGraphExecutorFactory(*this);
  2077. graphPool.setown(createThreadPool("CGraphExecutor pool", factory, &job, limit));
  2078. }
  2079. ~CGraphExecutor()
  2080. {
  2081. stopped = true;
  2082. graphPool->joinAll();
  2083. factory->Release();
  2084. }
  2085. CGraphExecutorGraphInfo *graphDone(CGraphExecutorGraphInfo &doneGraphInfo, IException *e)
  2086. {
  2087. CriticalBlock b(crit);
  2088. running.zap(doneGraphInfo);
  2089. if (waitOnRunning)
  2090. {
  2091. runningSem.signal(waitOnRunning);
  2092. waitOnRunning = 0;
  2093. }
  2094. if (e || job.queryAborted())
  2095. {
  2096. stopped = true;
  2097. stack.kill();
  2098. return NULL;
  2099. }
  2100. if (job.queryPausing())
  2101. stack.kill();
  2102. else if (stack.ordinality())
  2103. {
  2104. CICopyArrayOf<CGraphExecutorGraphInfo> toMove;
  2105. ForEachItemIn(s, stack)
  2106. {
  2107. bool dependenciesDone = true;
  2108. CGraphExecutorGraphInfo &graphInfo = stack.item(s);
  2109. ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
  2110. {
  2111. CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
  2112. if (!subGraph.isComplete())
  2113. {
  2114. dependenciesDone = false;
  2115. break;
  2116. }
  2117. }
  2118. if (dependenciesDone)
  2119. {
  2120. graphInfo.subGraph->dependentSubGraphs.kill();
  2121. graphInfo.subGraph->prepare(graphInfo.parentExtractMb.length(), (const byte *)graphInfo.parentExtractMb.toByteArray(), true, true, true); // now existing deps done, maybe more to prepare
  2122. ForEachItemIn (d, graphInfo.subGraph->dependentSubGraphs)
  2123. {
  2124. CGraphBase &subGraph = graphInfo.subGraph->dependentSubGraphs.item(d);
  2125. if (!subGraph.isComplete())
  2126. {
  2127. dependenciesDone = false;
  2128. break;
  2129. }
  2130. }
  2131. if (dependenciesDone)
  2132. {
  2133. graphInfo.subGraph->dependentSubGraphs.kill(); // none to track anymore
  2134. toMove.append(graphInfo);
  2135. }
  2136. }
  2137. }
  2138. ForEachItemIn(m, toMove)
  2139. {
  2140. Linked<CGraphExecutorGraphInfo> graphInfo = &toMove.item(m);
  2141. stack.zap(*graphInfo);
  2142. toRun.add(*graphInfo.getClear(), 0);
  2143. }
  2144. }
  2145. job.markWuDirty();
  2146. PROGLOG("CGraphExecutor running=%d, waitingToRun=%d, dependentsWaiting=%d", running.ordinality(), toRun.ordinality(), stack.ordinality());
  2147. while (toRun.ordinality())
  2148. {
  2149. if (job.queryPausing())
  2150. return NULL;
  2151. Linked<CGraphExecutorGraphInfo> nextGraphInfo = &toRun.item(0);
  2152. toRun.remove(0);
  2153. if (!nextGraphInfo->subGraph->isComplete() && (NULL == findRunning(nextGraphInfo->subGraph->queryGraphId())))
  2154. {
  2155. running.append(*nextGraphInfo.getLink());
  2156. return nextGraphInfo.getClear();
  2157. }
  2158. }
  2159. return NULL;
  2160. }
  2161. // IGraphExecutor
  2162. virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract)
  2163. {
  2164. bool alreadyRunning;
  2165. {
  2166. CriticalBlock b(crit);
  2167. if (job.queryPausing())
  2168. return;
  2169. if (subGraph->isComplete())
  2170. return;
  2171. alreadyRunning = NULL != findRunning(subGraph->queryGraphId());
  2172. if (alreadyRunning)
  2173. ++waitOnRunning;
  2174. }
  2175. if (alreadyRunning)
  2176. {
  2177. loop
  2178. {
  2179. PROGLOG("Waiting on subgraph %" GIDPF "d", subGraph->queryGraphId());
  2180. if (runningSem.wait(MEDIUMTIMEOUT) || job.queryAborted() || job.queryPausing())
  2181. break;
  2182. }
  2183. return;
  2184. }
  2185. else
  2186. {
  2187. CriticalBlock b(crit);
  2188. if (seen.contains(subGraph->queryGraphId()))
  2189. return; // already queued;
  2190. seen.append(subGraph->queryGraphId());
  2191. }
  2192. if (!subGraph->prepare(parentExtractSz, parentExtract, checkDependencies, true, true))
  2193. {
  2194. subGraph->setComplete();
  2195. return;
  2196. }
  2197. if (subGraph->dependentSubGraphs.ordinality())
  2198. {
  2199. bool dependenciesDone = true;
  2200. ForEachItemIn (d, subGraph->dependentSubGraphs)
  2201. {
  2202. CGraphBase &graph = subGraph->dependentSubGraphs.item(d);
  2203. if (!graph.isComplete())
  2204. {
  2205. dependenciesDone = false;
  2206. break;
  2207. }
  2208. }
  2209. if (dependenciesDone)
  2210. subGraph->dependentSubGraphs.kill(); // none to track anymore
  2211. }
  2212. Owned<CGraphExecutorGraphInfo> graphInfo = new CGraphExecutorGraphInfo(*this, subGraph, callback, parentExtract, parentExtractSz);
  2213. CriticalBlock b(crit);
  2214. if (0 == subGraph->dependentSubGraphs.ordinality())
  2215. {
  2216. if (running.ordinality()<limit)
  2217. {
  2218. running.append(*LINK(graphInfo));
  2219. PROGLOG("Add: Launching graph thread for graphId=%" GIDPF "d", subGraph->queryGraphId());
  2220. graphPool->start(graphInfo.getClear());
  2221. }
  2222. else
  2223. stack.add(*graphInfo.getClear(), 0); // push to front, no dependency, free to run next.
  2224. }
  2225. else
  2226. stack.append(*graphInfo.getClear()); // as dependencies finish, may move up the list
  2227. }
  2228. virtual IThreadPool &queryGraphPool() { return *graphPool; }
  2229. virtual void wait()
  2230. {
  2231. PROGLOG("CGraphExecutor exiting, waiting on graph pool");
  2232. graphPool->joinAll();
  2233. PROGLOG("CGraphExecutor graphPool finished");
  2234. }
  2235. };
  2236. ////
  2237. // IContextLogger
  2238. class CThorContextLogger : CSimpleInterface, implements IContextLogger
  2239. {
  2240. CJobBase &job;
  2241. unsigned traceLevel;
  2242. public:
  2243. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2244. CThorContextLogger(CJobBase &_job) : job(_job)
  2245. {
  2246. traceLevel = 1;
  2247. }
  2248. virtual void CTXLOGva(const char *format, va_list args) const
  2249. {
  2250. StringBuffer ss;
  2251. ss.valist_appendf(format, args);
  2252. LOG(MCdebugProgress, thorJob, "%s", ss.str());
  2253. }
  2254. virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const
  2255. {
  2256. StringBuffer ss;
  2257. ss.append("ERROR");
  2258. if (E)
  2259. ss.append(": ").append(E->errorCode());
  2260. if (file)
  2261. ss.appendf(": %s(%d) ", file, line);
  2262. if (E)
  2263. E->errorMessage(ss.append(": "));
  2264. if (format)
  2265. ss.append(": ").valist_appendf(format, args);
  2266. LOG(MCoperatorProgress, thorJob, "%s", ss.str());
  2267. }
  2268. virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
  2269. {
  2270. }
  2271. virtual void mergeStats(const CRuntimeStatisticCollection &from) const
  2272. {
  2273. }
  2274. virtual unsigned queryTraceLevel() const
  2275. {
  2276. return traceLevel;
  2277. }
  2278. };
  2279. ////
  2280. CJobBase::CJobBase(const char *_graphName) : graphName(_graphName)
  2281. {
  2282. maxDiskUsage = diskUsage = 0;
  2283. dirty = true;
  2284. aborted = false;
  2285. codeCtx = NULL;
  2286. mpJobTag = TAG_NULL;
  2287. timeReporter = createStdTimeReporter();
  2288. pluginMap = new SafePluginMap(&pluginCtx, true);
  2289. // JCSMORE - Will pass down at job creation time...
  2290. jobGroup.set(&::queryClusterGroup());
  2291. jobComm.setown(createCommunicator(jobGroup));
  2292. slaveGroup.setown(jobGroup->remove(0));
  2293. myrank = jobGroup->rank(queryMyNode());
  2294. globalMemorySize = globals->getPropInt("@globalMemorySize"); // in MB
  2295. oldNodeCacheMem = 0;
  2296. }
  2297. void CJobBase::init()
  2298. {
  2299. StringBuffer tmp;
  2300. tmp.append(wuid);
  2301. tmp.append(graphName);
  2302. key.set(tmp.str());
  2303. SCMStringBuffer tokenUser, password;
  2304. extractToken(token.str(), wuid.str(), tokenUser, password);
  2305. userDesc = createUserDescriptor();
  2306. userDesc->set(user.str(), password.str());
  2307. forceLogGraphIdMin = (graph_id)getWorkUnitValueInt("forceLogGraphIdMin", 0);
  2308. forceLogGraphIdMax = (graph_id)getWorkUnitValueInt("forceLogGraphIdMax", 0);
  2309. logctx.setown(new CThorContextLogger(*this));
  2310. // global setting default on, can be overridden by #option
  2311. timeActivities = 0 != getWorkUnitValueInt("timeActivities", globals->getPropBool("@timeActivities", true));
  2312. maxActivityCores = (unsigned)getWorkUnitValueInt("maxActivityCores", globals->getPropInt("@maxActivityCores", 0)); // NB: 0 means system decides
  2313. pausing = false;
  2314. resumed = false;
  2315. bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", false));
  2316. bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", true));
  2317. unsigned memorySpillAt = (unsigned)getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
  2318. thorAllocator.setown(createThorAllocator(((memsize_t)globalMemorySize)*0x100000, memorySpillAt, *logctx, crcChecking, usePackedAllocator));
  2319. unsigned defaultMemMB = globalMemorySize*3/4;
  2320. PROGLOG("Global memory size = %d MB, memory spill at = %d%%", globalMemorySize, memorySpillAt);
  2321. StringBuffer tracing("maxActivityCores = ");
  2322. if (maxActivityCores)
  2323. tracing.append(maxActivityCores);
  2324. else
  2325. tracing.append("[unbound]");
  2326. PROGLOG("%s", tracing.str());
  2327. graphExecutor.setown(new CGraphExecutor(*this));
  2328. }
  2329. void CJobBase::beforeDispose()
  2330. {
  2331. endJob();
  2332. }
  2333. CJobBase::~CJobBase()
  2334. {
  2335. clean();
  2336. thorAllocator->queryRowManager()->reportMemoryUsage(false);
  2337. PROGLOG("CJobBase resetting memory manager");
  2338. thorAllocator.clear();
  2339. ::Release(codeCtx);
  2340. ::Release(userDesc);
  2341. timeReporter->Release();
  2342. delete pluginMap;
  2343. StringBuffer memStatsStr;
  2344. roxiemem::memstats(memStatsStr);
  2345. PROGLOG("Roxiemem stats: %s", memStatsStr.str());
  2346. memsize_t heapUsage = getMapInfo("heap");
  2347. if (heapUsage) // if 0, assumed to be unavailable
  2348. PROGLOG("Heap usage : %" I64F "d bytes", (unsigned __int64)heapUsage);
  2349. }
  2350. void CJobBase::startJob()
  2351. {
  2352. LOG(MCdebugProgress, thorJob, "New Graph started : %s", graphName.get());
  2353. ClearTempDirs();
  2354. unsigned pinterval = globals->getPropInt("@system_monitor_interval",1000*60);
  2355. if (pinterval)
  2356. {
  2357. perfmonhook.setown(createThorMemStatsPerfMonHook(*this, getOptInt(THOROPT_MAX_KERNLOG, 3)));
  2358. startPerformanceMonitor(pinterval,PerfMonStandard,perfmonhook);
  2359. }
  2360. PrintMemoryStatusLog();
  2361. logDiskSpace();
  2362. unsigned keyNodeCacheMB = (unsigned)getWorkUnitValueInt("keyNodeCacheMB", 0);
  2363. if (keyNodeCacheMB)
  2364. {
  2365. oldNodeCacheMem = setNodeCacheMem(keyNodeCacheMB * 0x100000);
  2366. PROGLOG("Key node cache size set to: %d MB", keyNodeCacheMB);
  2367. }
  2368. unsigned keyFileCacheLimit = (unsigned)getWorkUnitValueInt("keyFileCacheLimit", 0);
  2369. if (!keyFileCacheLimit)
  2370. keyFileCacheLimit = (querySlaves()+1)*2;
  2371. setKeyIndexCacheSize(keyFileCacheLimit);
  2372. PROGLOG("Key file cache size set to: %d", keyFileCacheLimit);
  2373. }
  2374. void CJobBase::endJob()
  2375. {
  2376. stopPerformanceMonitor();
  2377. LOG(MCdebugProgress, thorJob, "Job ended : %s", graphName.get());
  2378. clearKeyStoreCache(true);
  2379. if (oldNodeCacheMem)
  2380. setNodeCacheMem(oldNodeCacheMem);
  2381. PrintMemoryStatusLog();
  2382. }
  2383. bool CJobBase::queryForceLogging(graph_id graphId, bool def) const
  2384. {
  2385. // JCSMORE, could add comma separated range, e.g. 1-5,10-12
  2386. if ((graphId >= forceLogGraphIdMin) && (graphId <= forceLogGraphIdMax))
  2387. return true;
  2388. return def;
  2389. }
  2390. void CJobBase::clean()
  2391. {
  2392. if (graphExecutor)
  2393. {
  2394. graphExecutor->queryGraphPool().stopAll();
  2395. graphExecutor.clear();
  2396. }
  2397. subGraphs.kill();
  2398. }
  2399. IThorGraphIterator *CJobBase::getSubGraphs()
  2400. {
  2401. CriticalBlock b(crit);
  2402. return new CGraphTableIterator(subGraphs);
  2403. }
  2404. static void getGlobalDeps(CGraphBase &graph, CICopyArrayOf<CGraphDependency> &deps)
  2405. {
  2406. Owned<IThorActivityIterator> iter = graph.getIterator();
  2407. ForEach(*iter)
  2408. {
  2409. CGraphElementBase &elem = iter->query();
  2410. Owned<IThorGraphDependencyIterator> dependIterator = elem.getDependsIterator();
  2411. ForEach(*dependIterator)
  2412. {
  2413. CGraphDependency &dependency = dependIterator->query();
  2414. if (dependency.graph->isGlobal() && NULL==dependency.graph->queryOwner())
  2415. deps.append(dependency);
  2416. getGlobalDeps(*dependency.graph, deps);
  2417. }
  2418. }
  2419. }
  2420. static void noteDependency(CGraphElementBase *targetActivity, CGraphElementBase *sourceActivity, CGraphBase *targetGraph, CGraphBase *sourceGraph, unsigned controlId)
  2421. {
  2422. targetActivity->addDependsOn(sourceGraph, controlId);
  2423. // NB: record dependency in source graph, serialized to slaves, used to decided if should run dependency sinks or not
  2424. Owned<IPropertyTree> dependencyFor = createPTree();
  2425. dependencyFor->setPropInt("@id", sourceActivity->queryId());
  2426. dependencyFor->setPropInt("@graphId", targetGraph->queryGraphId());
  2427. if (controlId)
  2428. dependencyFor->setPropInt("@conditionalId", controlId);
  2429. sourceGraph->queryXGMML().addPropTree("Dependency", dependencyFor.getClear());
  2430. }
  2431. void CJobBase::addDependencies(IPropertyTree *xgmml, bool failIfMissing)
  2432. {
  2433. CGraphArrayCopy childGraphs;
  2434. CGraphElementArrayCopy targetActivities, sourceActivities;
  2435. Owned<IPropertyTreeIterator> iter = xgmml->getElements("edge");
  2436. ForEach(*iter)
  2437. {
  2438. IPropertyTree &edge = iter->query();
  2439. Owned<CGraphBase> source = getGraph(edge.getPropInt("@source"));
  2440. Owned<CGraphBase> target = getGraph(edge.getPropInt("@target"));
  2441. if (!source || !target)
  2442. {
  2443. if (failIfMissing)
  2444. throwUnexpected();
  2445. else
  2446. continue; // expected if assigning dependencies in slaves
  2447. }
  2448. CGraphElementBase *targetActivity = (CGraphElementBase *)target->queryElement(edge.getPropInt("att[@name=\"_targetActivity\"]/@value"));
  2449. CGraphElementBase *sourceActivity = (CGraphElementBase *)source->queryElement(edge.getPropInt("att[@name=\"_sourceActivity\"]/@value"));
  2450. int controlId = 0;
  2451. if (edge.getPropBool("att[@name=\"_dependsOn\"]/@value", false))
  2452. {
  2453. if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
  2454. controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  2455. CGraphBase &sourceGraph = sourceActivity->queryOwner();
  2456. unsigned sourceGraphContext = sourceGraph.queryParentActivityId();
  2457. CGraphBase *targetGraph = NULL;
  2458. unsigned targetGraphContext = -1;
  2459. loop
  2460. {
  2461. targetGraph = &targetActivity->queryOwner();
  2462. targetGraphContext = targetGraph->queryParentActivityId();
  2463. if (sourceGraphContext == targetGraphContext)
  2464. break;
  2465. targetActivity = targetGraph->queryElement(targetGraphContext);
  2466. }
  2467. assertex(targetActivity && sourceActivity);
  2468. noteDependency(targetActivity, sourceActivity, target, source, controlId);
  2469. }
  2470. else if (edge.getPropBool("att[@name=\"_conditionSource\"]/@value", false))
  2471. { /* Ignore it */ }
  2472. else if (edge.getPropBool("att[@name=\"_childGraph\"]/@value", false))
  2473. {
  2474. // NB: any dependencies of the child acts. are dependencies of this act.
  2475. childGraphs.append(*source);
  2476. targetActivities.append(*targetActivity);
  2477. sourceActivities.append(*sourceActivity);
  2478. }
  2479. else
  2480. {
  2481. if (!edge.getPropBool("att[@name=\"_childGraph\"]/@value", false)) // JCSMORE - not sure if necess. roxie seem to do.
  2482. controlId = edge.getPropInt("att[@name=\"_when\"]/@value", 0);
  2483. noteDependency(targetActivity, sourceActivity, target, source, controlId);
  2484. }
  2485. }
  2486. ForEachItemIn(c, childGraphs)
  2487. {
  2488. CGraphBase &childGraph = childGraphs.item(c);
  2489. CGraphElementBase &targetActivity = targetActivities.item(c);
  2490. CGraphElementBase &sourceActivity = sourceActivities.item(c);
  2491. if (!childGraph.isGlobal())
  2492. {
  2493. CICopyArrayOf<CGraphDependency> globalChildGraphDeps;
  2494. getGlobalDeps(childGraph, globalChildGraphDeps);
  2495. ForEachItemIn(gcd, globalChildGraphDeps)
  2496. {
  2497. CGraphDependency &globalDep = globalChildGraphDeps.item(gcd);
  2498. noteDependency(&targetActivity, &sourceActivity, globalDep.graph, &childGraph, globalDep.controlId);
  2499. }
  2500. }
  2501. }
  2502. SuperHashIteratorOf<CGraphBase> allIter(allGraphs);
  2503. ForEach(allIter)
  2504. {
  2505. CGraphBase &subGraph = allIter.query();
  2506. if (subGraph.queryOwner() && subGraph.queryParentActivityId())
  2507. {
  2508. CGraphElementBase *parentElement = subGraph.queryOwner()->queryElement(subGraph.queryParentActivityId());
  2509. if (isLoopActivity(*parentElement))
  2510. {
  2511. if (!parentElement->queryOwner().isLocalChild() && !subGraph.isLocalOnly())
  2512. subGraph.setGlobal(true);
  2513. }
  2514. }
  2515. bool log = queryForceLogging(subGraph.queryGraphId(), (NULL == subGraph.queryOwner()) || subGraph.isGlobal());
  2516. subGraph.setLogging(log);
  2517. }
  2518. }
  2519. void CJobBase::startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract)
  2520. {
  2521. graphExecutor->add(&graph, callback, checkDependencies, parentExtractSize, parentExtract);
  2522. }
  2523. ICodeContext &CJobBase::queryCodeContext() const
  2524. {
  2525. return *codeCtx;
  2526. }
  2527. bool CJobBase::queryUseCheckpoints() const
  2528. {
  2529. return globals->getPropBool("@checkPointRecovery") || 0 != getWorkUnitValueInt("checkPointRecovery", 0);
  2530. }
  2531. void CJobBase::abort(IException *e)
  2532. {
  2533. aborted = true;
  2534. Owned<IThorGraphIterator> iter = getSubGraphs();
  2535. ForEach (*iter)
  2536. {
  2537. CGraphBase &graph = iter->query();
  2538. graph.abort(e);
  2539. }
  2540. }
  2541. void CJobBase::increase(offset_t usage, const char *key)
  2542. {
  2543. diskUsage += usage;
  2544. if (diskUsage > maxDiskUsage) maxDiskUsage = diskUsage;
  2545. }
  2546. void CJobBase::decrease(offset_t usage, const char *key)
  2547. {
  2548. diskUsage -= usage;
  2549. }
  2550. mptag_t CJobBase::allocateMPTag()
  2551. {
  2552. mptag_t tag = allocateClusterMPTag();
  2553. jobComm->flush(tag);
  2554. PROGLOG("allocateMPTag: tag = %d", (int)tag);
  2555. return tag;
  2556. }
  2557. void CJobBase::freeMPTag(mptag_t tag)
  2558. {
  2559. if (TAG_NULL != tag)
  2560. {
  2561. freeClusterMPTag(tag);
  2562. PROGLOG("freeMPTag: tag = %d", (int)tag);
  2563. jobComm->flush(tag);
  2564. }
  2565. }
  2566. mptag_t CJobBase::deserializeMPTag(MemoryBuffer &mb)
  2567. {
  2568. mptag_t tag;
  2569. deserializeMPtag(mb, tag);
  2570. if (TAG_NULL != tag)
  2571. {
  2572. PROGLOG("deserializeMPTag: tag = %d", (int)tag);
  2573. jobComm->flush(tag);
  2574. }
  2575. return tag;
  2576. }
  2577. // these getX methods for property in workunit settings, then global setting, defaulting to provided 'dft' if not present
  2578. StringBuffer &CJobBase::getOpt(const char *opt, StringBuffer &out)
  2579. {
  2580. if (!opt || !*opt)
  2581. return out; // probably error
  2582. VStringBuffer gOpt("Debug/@%s", opt);
  2583. getWorkUnitValue(opt, out);
  2584. if (0 == out.length())
  2585. globals->getProp(gOpt, out);
  2586. return out;
  2587. }
  2588. bool CJobBase::getOptBool(const char *opt, bool dft)
  2589. {
  2590. if (!opt || !*opt)
  2591. return dft; // probably error
  2592. VStringBuffer gOpt("Debug/@%s", opt);
  2593. return getWorkUnitValueBool(opt, globals->getPropBool(gOpt, dft));
  2594. }
  2595. int CJobBase::getOptInt(const char *opt, int dft)
  2596. {
  2597. if (!opt || !*opt)
  2598. return dft; // probably error
  2599. VStringBuffer gOpt("Debug/@%s", opt);
  2600. return (int)getWorkUnitValueInt(opt, globals->getPropInt(gOpt, dft));
  2601. }
  2602. __int64 CJobBase::getOptInt64(const char *opt, __int64 dft)
  2603. {
  2604. if (!opt || !*opt)
  2605. return dft; // probably error
  2606. VStringBuffer gOpt("Debug/@%s", opt);
  2607. return getWorkUnitValueInt(opt, globals->getPropInt64(gOpt, dft));
  2608. }
  2609. // IGraphCallback
  2610. void CJobBase::runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract)
  2611. {
  2612. graph.executeSubGraph(parentExtractSz, parentExtract);
  2613. }
  2614. IEngineRowAllocator *CJobBase::getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  2615. {
  2616. return thorAllocator->getRowAllocator(meta, activityId, flags);
  2617. }
  2618. roxiemem::IRowManager *CJobBase::queryRowManager() const
  2619. {
  2620. return thorAllocator->queryRowManager();
  2621. }
  2622. IThorResult *CJobBase::getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId)
  2623. {
  2624. Owned<CGraphBase> graph = getGraph(gid);
  2625. if (!graph)
  2626. {
  2627. Owned<IThorException> e = MakeThorException(0, "getOwnedResult: graph not found");
  2628. e->setGraphId(gid);
  2629. throw e.getClear();
  2630. }
  2631. Owned<IThorResult> result;
  2632. if (ownerId)
  2633. {
  2634. CGraphElementBase *container = graph->queryElement(ownerId);
  2635. assertex(container);
  2636. CActivityBase *activity = container->queryActivity();
  2637. IThorGraphResults *results = activity->queryResults();
  2638. if (!results)
  2639. throw MakeGraphException(graph, 0, "GraphGetResult: no results created (requesting: %d)", resultId);
  2640. result.setown(activity->queryResults()->getResult(resultId));
  2641. }
  2642. else
  2643. result.setown(graph->getResult(resultId));
  2644. if (!result)
  2645. throw MakeGraphException(graph, 0, "GraphGetResult: result not found: %d", resultId);
  2646. return result.getClear();
  2647. }
  2648. static IThorResource *iThorResource = NULL;
  2649. void setIThorResource(IThorResource &r)
  2650. {
  2651. iThorResource = &r;
  2652. }
  2653. IThorResource &queryThor()
  2654. {
  2655. return *iThorResource;
  2656. }
  2657. //
  2658. //
  2659. //
  2660. //
  2661. CActivityBase::CActivityBase(CGraphElementBase *_container) : container(*_container), timeActivities(_container->queryJob().queryTimeActivities())
  2662. {
  2663. mpTag = TAG_NULL;
  2664. abortSoon = receiving = cancelledReceive = reInit = false;
  2665. baseHelper.set(container.queryHelper());
  2666. parentExtractSz = 0;
  2667. parentExtract = NULL;
  2668. // NB: maxCores, currently only used to control # cores used by sorts
  2669. maxCores = container.queryXGMML().getPropInt("hint[@name=\"max_cores\"]/@value", container.queryJob().queryMaxDefaultActivityCores());
  2670. }
  2671. CActivityBase::~CActivityBase()
  2672. {
  2673. }
  2674. void CActivityBase::abort()
  2675. {
  2676. if (!abortSoon) ActPrintLog("Abort condition set");
  2677. abortSoon = true;
  2678. }
  2679. void CActivityBase::kill()
  2680. {
  2681. ownedResults.clear();
  2682. }
  2683. bool CActivityBase::appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const
  2684. {
  2685. if (!meta.hasXML())
  2686. {
  2687. target.append("<xml-unavailable/>");
  2688. return false;
  2689. }
  2690. try
  2691. {
  2692. CommonXmlWriter xmlWrite(XWFnoindent);
  2693. meta.toXML((byte *) row, xmlWrite);
  2694. target.append(xmlWrite.str());
  2695. return true;
  2696. }
  2697. catch (IException * e)
  2698. {
  2699. e->Release();
  2700. target.append("<invalid-row/>");
  2701. return false;
  2702. }
  2703. }
  2704. void CActivityBase::logRow(const char * prefix, IOutputMetaData & meta, const void * row)
  2705. {
  2706. bool blindLogging = false; // MORE: should check a workunit/global option
  2707. if (meta.hasXML() && !blindLogging)
  2708. {
  2709. StringBuffer xml;
  2710. appendRowXml(xml, meta, row);
  2711. ActPrintLog("%s: %s", prefix, xml.str());
  2712. }
  2713. }
  2714. void CActivityBase::ActPrintLog(const char *format, ...)
  2715. {
  2716. va_list args;
  2717. va_start(args, format);
  2718. ::ActPrintLogArgs(&queryContainer(), thorlog_null, MCdebugProgress, format, args);
  2719. va_end(args);
  2720. }
  2721. void CActivityBase::ActPrintLog(IException *e, const char *format, ...)
  2722. {
  2723. va_list args;
  2724. va_start(args, format);
  2725. ::ActPrintLogArgs(&queryContainer(), e, thorlog_all, MCexception(e), format, args);
  2726. va_end(args);
  2727. }
  2728. void CActivityBase::ActPrintLog(IException *e)
  2729. {
  2730. ActPrintLog(e, "%s", "");
  2731. }
  2732. bool CActivityBase::fireException(IException *e)
  2733. {
  2734. Owned<IThorException> _te;
  2735. IThorException *te = QUERYINTERFACE(e, IThorException);
  2736. if (te)
  2737. {
  2738. if (!te->queryActivityId())
  2739. setExceptionActivityInfo(container, te);
  2740. }
  2741. else
  2742. {
  2743. te = MakeActivityException(this, e);
  2744. te->setAudience(e->errorAudience());
  2745. _te.setown(te);
  2746. }
  2747. return container.queryOwner().fireException(te);
  2748. }
  2749. void CActivityBase::processAndThrowOwnedException(IException * _e)
  2750. {
  2751. IThorException *e = QUERYINTERFACE(_e, IThorException);
  2752. if (e)
  2753. {
  2754. if (!e->queryActivityId())
  2755. setExceptionActivityInfo(container, e);
  2756. }
  2757. else
  2758. {
  2759. e = MakeActivityException(this, _e);
  2760. _e->Release();
  2761. }
  2762. if (!e->queryNotified())
  2763. {
  2764. fireException(e);
  2765. e->setNotified();
  2766. }
  2767. throw e;
  2768. }
  2769. IEngineRowAllocator * CActivityBase::queryRowAllocator()
  2770. {
  2771. if (CABallocatorlock.lock()) {
  2772. if (!rowAllocator)
  2773. rowAllocator.setown(queryJob().getRowAllocator(queryRowMetaData(),queryActivityId()));
  2774. CABallocatorlock.unlock();
  2775. }
  2776. return rowAllocator;
  2777. }
  2778. IOutputRowSerializer * CActivityBase::queryRowSerializer()
  2779. {
  2780. if (CABserializerlock.lock()) {
  2781. if (!rowSerializer)
  2782. rowSerializer.setown(queryRowMetaData()->createDiskSerializer(queryCodeContext(),queryActivityId()));
  2783. CABserializerlock.unlock();
  2784. }
  2785. return rowSerializer;
  2786. }
  2787. IOutputRowDeserializer * CActivityBase::queryRowDeserializer()
  2788. {
  2789. if (CABdeserializerlock.lock()) {
  2790. if (!rowDeserializer)
  2791. rowDeserializer.setown(queryRowMetaData()->createDiskDeserializer(queryCodeContext(),queryActivityId()));
  2792. CABdeserializerlock.unlock();
  2793. }
  2794. return rowDeserializer;
  2795. }
  2796. IRowInterfaces *CActivityBase::getRowInterfaces()
  2797. {
  2798. // create an independent instance, to avoid circular link dependency problems
  2799. return createRowInterfaces(queryRowMetaData(), container.queryId(), queryCodeContext());
  2800. }
  2801. bool CActivityBase::receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
  2802. {
  2803. BooleanOnOff onOff(receiving);
  2804. CTimeMon t(timeout);
  2805. unsigned remaining = timeout;
  2806. // check 'cancelledReceive' every 10 secs
  2807. while (!cancelledReceive && ((MP_WAIT_FOREVER==timeout) || !t.timedout(&remaining)))
  2808. {
  2809. if (container.queryJob().queryJobComm().recv(mb, rank, mpTag, sender, remaining>10000?10000:remaining))
  2810. return true;
  2811. }
  2812. return false;
  2813. }
  2814. void CActivityBase::cancelReceiveMsg(const rank_t rank, const mptag_t mpTag)
  2815. {
  2816. cancelledReceive = true;
  2817. if (receiving)
  2818. container.queryJob().queryJobComm().cancel(rank, mpTag);
  2819. }
  2820. StringBuffer &CActivityBase::getOpt(const char *prop, StringBuffer &out) const
  2821. {
  2822. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  2823. if (!container.queryXGMML().getProp(path.toLowerCase().str(), out))
  2824. queryJob().getOpt(prop, out);
  2825. return out;
  2826. }
  2827. bool CActivityBase::getOptBool(const char *prop, bool defVal) const
  2828. {
  2829. bool def = queryJob().getOptBool(prop, defVal);
  2830. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  2831. return container.queryXGMML().getPropBool(path.toLowerCase().str(), def);
  2832. }
  2833. int CActivityBase::getOptInt(const char *prop, int defVal) const
  2834. {
  2835. int def = queryJob().getOptInt(prop, defVal);
  2836. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  2837. return container.queryXGMML().getPropInt(path.toLowerCase().str(), def);
  2838. }
  2839. __int64 CActivityBase::getOptInt64(const char *prop, __int64 defVal) const
  2840. {
  2841. __int64 def = queryJob().getOptInt64(prop, defVal);
  2842. VStringBuffer path("hint[@name=\"%s\"]/@value", prop);
  2843. return container.queryXGMML().getPropInt64(path.toLowerCase().str(), def);
  2844. }