thgraph.hpp 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef _THGRAPH_HPP
  14. #define _THGRAPH_HPP
  15. #ifdef _WIN32
  16. #ifdef GRAPH_EXPORTS
  17. #define graph_decl __declspec(dllexport)
  18. #else
  19. #define graph_decl __declspec(dllimport)
  20. #endif
  21. #else
  22. #define graph_decl
  23. #endif
  24. #undef barrier
  25. #define LONGTIMEOUT (25*60*1000)
  26. #define MEDIUMTIMEOUT 30000
  27. #include "jlib.hpp"
  28. #include "jarray.tpp"
  29. #include "jexcept.hpp"
  30. #include "jhash.hpp"
  31. #include "jsuperhash.hpp"
  32. #include "jset.hpp"
  33. #include "mpcomm.hpp"
  34. #include "mptag.hpp"
  35. #include "roxiemem.hpp"
  36. #include "thormisc.hpp"
  37. #include "workunit.hpp"
  38. #include "thorcommon.hpp"
  39. #include "thmem.hpp"
  40. #include "thor.hpp"
  41. #include "eclhelper.hpp"
  42. #include "thorplugin.hpp"
  43. #define THORDATALINK_STOPPED (RCMAX&~(RCMAX>>1)) // dataLinkStop() was called
  44. #define THORDATALINK_STARTED (RCMAX&~THORDATALINK_STOPPED&~(RCMAX>>2)) // dataLinkStart() was called
  45. #define THORDATALINK_COUNT_MASK (RCMAX>>2) // mask to extract count value only
  46. enum ActivityAttributes { ActAttr_Source=1, ActAttr_Sink=2 };
  47. #define INVALID_UNIQ_ID -1;
  48. typedef activity_id unique_id;
  49. enum msgids
  50. {
  51. QueryInit,
  52. QueryDone,
  53. Shutdown,
  54. GraphInit,
  55. GraphEnd,
  56. GraphAbort,
  57. GraphGetResult
  58. };
  59. interface ICodeContextExt : extends ICodeContext
  60. {
  61. virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence) = 0;
  62. virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) = 0;
  63. };
  64. interface IDiskUsage : extends IInterface
  65. {
  66. virtual void increase(offset_t usage, const char *key=NULL) = 0;
  67. virtual void decrease(offset_t usage, const char *key=NULL) = 0;
  68. };
  69. interface IBackup;
  70. interface IFileInProgressHandler;
  71. interface IThorFileCache;
  72. interface IThorResource
  73. {
  74. virtual IThorFileCache &queryFileCache() = 0;
  75. virtual IBackup &queryBackup() = 0;
  76. virtual IFileInProgressHandler &queryFileInProgressHandler() = 0;
  77. };
  78. interface IBarrier : extends IInterface
  79. {
  80. virtual const mptag_t queryTag() const = 0;
  81. virtual bool wait(bool exception, unsigned timeout=INFINITE) = 0;
  82. virtual void cancel() = 0;
  83. };
  84. graph_decl IThorResource &queryThor();
  85. graph_decl void setIThorResource(IThorResource &r);
  86. interface IRowWriterMultiReader;
  87. interface IThorResult : extends IInterface
  88. {
  89. virtual IRowWriter *getWriter() = 0;
  90. virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count) = 0;
  91. virtual IRowStream *getRowStream() = 0;
  92. virtual IRowInterfaces *queryRowInterfaces() = 0;
  93. virtual CActivityBase *queryActivity() = 0;
  94. virtual bool isDistributed() const = 0;
  95. virtual void serialize(MemoryBuffer &mb) = 0;
  96. virtual void getLinkedResult(unsigned & count, byte * * & ret) = 0;
  97. };
  98. class CActivityBase;
  99. // JCSMORE - based on IHThorGraphResults
  100. interface IThorGraphResults : extends IEclGraphResults
  101. {
  102. virtual void clear() = 0;
  103. virtual IThorResult *getResult(unsigned id, bool distributed=false) = 0;
  104. virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT) = 0;
  105. virtual IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT) = 0;
  106. virtual unsigned addResult(IThorResult *result) = 0;
  107. virtual void setResult(unsigned id, IThorResult *result) = 0;
  108. virtual unsigned count() = 0;
  109. virtual void setOwner(activity_id id) = 0;
  110. virtual activity_id queryOwnerId() const = 0;
  111. };
  112. class CGraphBase;
  113. interface IThorBoundLoopGraph : extends IInterface
  114. {
  115. virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results) = 0;
  116. virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos) = 0;
  117. virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos) = 0;
  118. virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *rowStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte * parentExtract) = 0;
  119. virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults * graphLoopResults, size32_t parentExtractSz, const byte * parentExtract) = 0;
  120. virtual CGraphBase *queryGraph() = 0;
  121. };
  122. class CFileUsageEntry : public CInterface
  123. {
  124. StringAttr name;
  125. unsigned usage;
  126. graph_id graphId;
  127. WUFileKind fileKind;
  128. public:
  129. CFileUsageEntry(const char *_name, graph_id _graphId, WUFileKind _fileKind, unsigned _usage) :name(_name), graphId(_graphId), fileKind(_fileKind), usage(_usage) { }
  130. unsigned queryUsage() const { return usage; }
  131. const graph_id queryGraphId() const { return graphId; }
  132. const WUFileKind queryKind() const { return fileKind; }
  133. const char *queryName() const { return name.get(); }
  134. void decUsage() { --usage; }
  135. const char *queryFindString() const { return name; }
  136. };
  137. typedef IIteratorOf<CFileUsageEntry> IFileUsageIterator;
  138. interface IGraphTempHandler : extends IInterface
  139. {
  140. virtual void registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind=WUFileStandard, StringArray *clusters=NULL) = 0;
  141. virtual void deregisterFile(const char *name, bool kept=false) = 0;
  142. virtual void clearTemps() = 0;
  143. virtual IFileUsageIterator *getIterator() = 0;
  144. };
  145. class CGraphDependency : public CInterface
  146. {
  147. public:
  148. Linked<CGraphBase> graph;
  149. int controlId;
  150. CGraphDependency(CGraphBase *_graph, int _controlId) : graph(_graph), controlId(_controlId) { }
  151. void connect(CActivityBase *activity);
  152. };
  153. typedef CIArrayOf<CGraphDependency> CGraphDependencyArray;
  154. typedef IIteratorOf<CGraphDependency> IThorGraphDependencyIterator;
  155. class CGraphElementBase;
  156. class CIOConnection : public CInterface
  157. {
  158. public:
  159. CGraphElementBase *activity;
  160. unsigned index;
  161. void connect(unsigned which, CActivityBase *activity);
  162. CIOConnection(CGraphElementBase *_activity, unsigned _index) : activity(_activity), index(_index) { }
  163. };
  164. inline CIOConnection *Array__Member2Param(CIOConnection * src) { return src; }
  165. inline void Array__Assign(CIOConnection * & dest, CIOConnection * src) { dest = src; }
  166. inline void Array__Destroy(CIOConnection * & next) { if (next) next->Release(); }
  167. inline CIOConnection * Array__Member2ParamPtr(CIOConnection * src) { return src; }
  168. MAKEArrayOf(CIOConnection *, CIOConnection *, _CIOConnectionArray);
  169. class COwningSimpleIOConnection : public CIOConnection
  170. {
  171. public:
  172. COwningSimpleIOConnection(CGraphElementBase *_activity, unsigned index) : CIOConnection(_activity, index) { }
  173. ~COwningSimpleIOConnection() { ::Release(activity); }
  174. };
  175. class CIOConnectionArray : public _CIOConnectionArray
  176. {
  177. public:
  178. CIOConnection *queryItem(unsigned i)
  179. {
  180. if (!isItem(i))
  181. return NULL;
  182. return item(i);
  183. }
  184. unsigned getCount() const
  185. {
  186. unsigned c = 0;
  187. ForEachItemIn(i, *this)
  188. {
  189. CIOConnection *io = item(i);
  190. if (io)
  191. ++c;
  192. }
  193. return c;
  194. }
  195. };
  196. typedef SimpleHashTableOf<CGraphBase, graph_id> CGraphTableCopy;
  197. typedef OwningSimpleHashTableOf<CGraphBase, graph_id> CGraphTable;
  198. typedef CIArrayOf<CGraphBase> CGraphArray;
  199. typedef CopyCIArrayOf<CGraphBase> CGraphArrayCopy;
  200. typedef IIteratorOf<CGraphBase> IThorGraphIterator;
  201. typedef ArrayIIteratorOf<const CGraphArray, CGraphBase, IThorGraphIterator> CGraphArrayIterator;
  202. typedef ArrayIIteratorOf<const CGraphArrayCopy, CGraphBase, IThorGraphIterator> CGraphArrayCopyIterator;
  203. class CJobBase;
  204. class graph_decl CGraphElementBase : public CInterface, implements IInterface
  205. {
  206. protected:
  207. CriticalSection crit;
  208. Owned<IHThorArg> baseHelper;
  209. ThorActivityKind kind;
  210. activity_id id, ownerId;
  211. StringAttr eclText;
  212. Owned<IPropertyTree> xgmml;
  213. bool isLocal, isLocalData, isGrouped, sink, prepared, onCreateCalled, onStartCalled, onlyUpdateIfChanged, nullAct, log;
  214. Owned<CActivityBase> activity;
  215. CGraphBase *resultsGraph, *owner;
  216. CGraphDependencyArray dependsOn;
  217. Owned<IThorBoundLoopGraph> loopGraph; // really only here as master and slave derivatives set/use
  218. MemoryBuffer createCtxMb, startCtxMb;
  219. bool haveCreateCtx, haveStartCtx;
  220. public:
  221. IMPLEMENT_IINTERFACE;
  222. const void *queryFindParam() const { return &queryId(); } // for SimpleHashTableOf
  223. bool alreadyUpdated, isEof, newWhichBranch;
  224. EclHelperFactory helperFactory;
  225. CIOConnectionArray inputs, outputs, connectedInputs, connectedOutputs;
  226. CGraphArray associatedChildGraphs;
  227. unsigned whichBranch;
  228. Owned<IBitSet> whichBranchBitSet;
  229. Owned<IBitSet> sentActInitData;
  230. CGraphElementBase(CGraphBase &_owner, IPropertyTree &_xgmml);
  231. ~CGraphElementBase();
  232. void doconnect();
  233. void addInput(unsigned input, CGraphElementBase *inputAct, unsigned inputOutIdx);
  234. void clearConnections();
  235. virtual void connectInput(unsigned which, CGraphElementBase *input, unsigned inputOutIdx);
  236. void setResultsGraph(CGraphBase *_resultsGraph) { resultsGraph = _resultsGraph; }
  237. void addAssociatedChildGraph(CGraphBase *childGraph);
  238. void releaseIOs();
  239. void addDependsOn(CGraphBase *graph, int controlId);
  240. IThorGraphDependencyIterator *getDependsIterator() const;
  241. void ActPrintLog(const char *format, ...) __attribute__((format(printf, 2, 3)));
  242. void ActPrintLog(IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  243. void ActPrintLog(IException *e);
  244. void setBoundGraph(IThorBoundLoopGraph *graph) { loopGraph.set(graph); }
  245. IThorBoundLoopGraph *queryLoopGraph() { return loopGraph; }
  246. bool executeDependencies(size32_t parentExtractSz, const byte *parentExtract, int controlId, bool async);
  247. virtual void deserializeCreateContext(MemoryBuffer &mb);
  248. virtual void deserializeStartContext(MemoryBuffer &mb);
  249. virtual void serializeCreateContext(MemoryBuffer &mb); // called after onCreate and create() (of activity)
  250. virtual void serializeStartContext(MemoryBuffer &mb);
  251. virtual bool checkUpdate() { return false; }
  252. virtual void reset();
  253. void onStart(size32_t parentExtractSz, const byte *parentExtract);
  254. void onCreate();
  255. void abort(IException *e);
  256. virtual void preStart(size32_t parentExtractSz, const byte *parentExtract);
  257. const bool &isOnCreated() const { return onCreateCalled; }
  258. const bool &isPrepared() const { return prepared; }
  259. CGraphBase &queryOwner() const { return *owner; }
  260. CGraphBase *queryResultsGraph() const { return resultsGraph; }
  261. IThorGraphIterator *getAssociatedChildGraphs() const;
  262. IGraphTempHandler *queryTempHandler() const;
  263. CJobBase &queryJob() const;
  264. unsigned getInputs() const { return inputs.ordinality(); }
  265. unsigned getOutputs() const { return outputs.ordinality(); }
  266. bool isSource() const { return isActivitySource(kind); }
  267. bool isSink() const { return sink; }
  268. inline bool doLogging() const { return log; }
  269. inline void setLogging(bool _log) { log = _log; }
  270. // NB: in almost all cases queryLocal() == queryLocalData()
  271. // an exception is e.g. a locally executing keyedjoin, accessing a global key
  272. bool queryLocal() const { return isLocal; } // executed in isolation on each slave
  273. bool queryLocalData() const { return isLocalData; } // activity access local data only
  274. bool queryGrouped() const { return isGrouped; }
  275. bool queryLocalOrGrouped() { return isLocal || isGrouped; }
  276. CGraphElementBase *queryInput(unsigned index) const
  277. {
  278. if (inputs.isItem(index) && (NULL != inputs.item(index)))
  279. return inputs.item(index)->activity;
  280. return NULL;
  281. }
  282. IHThorArg *queryHelper() const { return baseHelper; }
  283. IPropertyTree &queryXGMML() const { return *xgmml; }
  284. CActivityBase *queryActivity() const { return activity; }
  285. const activity_id &queryOwnerId() const { return ownerId; }
  286. void createActivity(size32_t parentExtractSz, const byte *parentExtract);
  287. //
  288. const ThorActivityKind getKind() const { return kind; }
  289. const activity_id &queryId() const { return id; }
  290. StringBuffer &getEclText(StringBuffer& dst) const
  291. {
  292. dst.append(eclText.get());
  293. return dst;
  294. }
  295. virtual bool prepareContext(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async);
  296. //
  297. virtual void initActivity();
  298. virtual CActivityBase *factory(ThorActivityKind kind) { assertex(false); return NULL; }
  299. virtual CActivityBase *factory() { return factory(getKind()); }
  300. virtual CActivityBase *factorySet(ThorActivityKind kind) { CActivityBase *_activity = factory(kind); activity.setown(_activity); return _activity; }
  301. virtual ICodeContext *queryCodeContext();
  302. };
  303. typedef CIArrayOf<CGraphElementBase> CGraphElementArray;
  304. typedef CopyCIArrayOf<CGraphElementBase> CGraphElementArrayCopy;
  305. typedef OwningSimpleHashTableOf<CGraphElementBase, activity_id> CGraphElementTable;
  306. typedef IIteratorOf<CGraphElementBase> IThorActivityIterator;
  307. typedef ArrayIIteratorOf<const CGraphElementArray, CGraphElementBase, IThorActivityIterator> CGraphElementArrayIterator;
  308. class CGraphElementIterator : public CInterface, implements IThorActivityIterator
  309. {
  310. SuperHashIteratorOf<CGraphElementBase> iter;
  311. public:
  312. IMPLEMENT_IINTERFACE;
  313. CGraphElementIterator(const CGraphElementTable &table) : iter(table) { }
  314. virtual bool first() { return iter.first(); }
  315. virtual bool next() { return iter.next(); }
  316. virtual bool isValid() { return iter.isValid(); }
  317. virtual CGraphElementBase & query() { return iter.query(); }
  318. CGraphElementBase & get() { CGraphElementBase &c = query(); c.Link(); return c; }
  319. };
  320. typedef OwningStringSuperHashTableOf<CFileUsageEntry> CFileUsageTable;
  321. class graph_decl CGraphTempHandler : public CInterface, implements IGraphTempHandler
  322. {
  323. protected:
  324. CFileUsageTable tmpFiles;
  325. CJobBase &job;
  326. mutable CriticalSection crit;
  327. bool errorOnMissing;
  328. public:
  329. IMPLEMENT_IINTERFACE;
  330. CGraphTempHandler(CJobBase &_job, bool _errorOnMissing) : job(_job), errorOnMissing(_errorOnMissing) { }
  331. ~CGraphTempHandler()
  332. {
  333. }
  334. virtual void beforeDispose()
  335. {
  336. clearTemps();
  337. }
  338. virtual bool removeTemp(const char *name) = 0;
  339. // IGraphTempHandler
  340. virtual void registerFile(const char *name, graph_id graphId, unsigned usageCount, bool temp, WUFileKind fileKind, StringArray *clusters);
  341. virtual void deregisterFile(const char *name, bool kept=false);
  342. virtual void clearTemps();
  343. virtual IFileUsageIterator *getIterator()
  344. {
  345. class CIterator : public CInterface, implements IFileUsageIterator
  346. {
  347. SuperHashIteratorOf<CFileUsageEntry> iter;
  348. public:
  349. IMPLEMENT_IINTERFACE;
  350. CIterator(CFileUsageTable &table) : iter(table) { }
  351. virtual bool first() { return iter.first(); }
  352. virtual bool next() { return iter.next(); }
  353. virtual bool isValid() { return iter.isValid(); }
  354. virtual CFileUsageEntry & query() { return iter.query(); }
  355. };
  356. return new CIterator(tmpFiles);
  357. }
  358. };
  359. interface IGraphCallback
  360. {
  361. virtual void runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract) = 0;
  362. };
  363. class CJobBase;
  364. interface IPropertyTree;
  365. class graph_decl CGraphBase : public CInterface, implements IEclGraphResults, implements IThorChildGraph, implements IExceptionHandler
  366. {
  367. mutable CriticalSection crit;
  368. CriticalSection evaluateCrit;
  369. CGraphElementTable containers;
  370. CGraphElementArray sinks;
  371. bool sink, complete, global, localChild;
  372. mutable int localOnly;
  373. activity_id parentActivityId;
  374. IPropertyTree *xgmml;
  375. CGraphTable childGraphsTable;
  376. CGraphArrayCopy childGraphs;
  377. Owned<IGraphTempHandler> tmpHandler;
  378. void clean();
  379. class CGraphCodeContext : implements ICodeContextExt
  380. {
  381. ICodeContextExt *ctx;
  382. CGraphBase *graph;
  383. public:
  384. CGraphCodeContext() : graph(NULL), ctx(NULL) { }
  385. void setContext(CGraphBase *_graph, ICodeContextExt *_ctx)
  386. {
  387. graph = _graph;
  388. ctx = _ctx;
  389. }
  390. virtual const char *loadResource(unsigned id) { return ctx->loadResource(id); }
  391. virtual void setResultBool(const char *name, unsigned sequence, bool value) { ctx->setResultBool(name, sequence, value); }
  392. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data) { ctx->setResultData(name, sequence, len, data); }
  393. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val) { ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val); }
  394. virtual void setResultInt(const char *name, unsigned sequence, __int64 value) { ctx->setResultInt(name, sequence, value); }
  395. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data) { ctx->setResultRaw(name, sequence, len, data); }
  396. virtual void setResultReal(const char * stepname, unsigned sequence, double value) { ctx->setResultReal(stepname, sequence, value); }
  397. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer) { ctx->setResultSet(name, sequence, isAll, len, data, transformer); }
  398. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str) { ctx->setResultString(name, sequence, len, str); }
  399. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value) { ctx->setResultUInt(name, sequence, value); }
  400. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str) { ctx->setResultUnicode(name, sequence, len, str); }
  401. virtual void setResultVarString(const char * name, unsigned sequence, const char * value) { ctx->setResultVarString(name, sequence, value); }
  402. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value) { ctx->setResultVarUnicode(name, sequence, value); }
  403. virtual bool getResultBool(const char * name, unsigned sequence) { return ctx->getResultBool(name, sequence); }
  404. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence) { ctx->getResultData(tlen, tgt, name, sequence); }
  405. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence) { ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence); }
  406. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer); }
  407. virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer); }
  408. virtual __int64 getResultInt(const char * name, unsigned sequence) { return ctx->getResultInt(name, sequence); }
  409. virtual double getResultReal(const char * name, unsigned sequence) { return ctx->getResultReal(name, sequence); }
  410. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence) { ctx->getResultString(tlen, tgt, name, sequence); }
  411. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence) { ctx->getResultStringF(tlen, tgt, name, sequence); }
  412. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence) { ctx->getResultUnicode(tlen, tgt, name, sequence); }
  413. virtual char *getResultVarString(const char * name, unsigned sequence) { return ctx->getResultVarString(name, sequence); }
  414. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence) { return ctx->getResultVarUnicode(name, sequence); }
  415. virtual unsigned getResultHash(const char * name, unsigned sequence) { return ctx->getResultHash(name, sequence); }
  416. virtual const char *cloneVString(const char *str) const { return ctx->cloneVString(str); }
  417. virtual const char *cloneVString(size32_t len, const char *str) const { return ctx->cloneVString(len, str); }
  418. virtual char *getWuid() { return ctx->getWuid(); }
  419. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer); }
  420. virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract); }
  421. virtual char * getExpandLogicalName(const char * logicalName) { return ctx->getExpandLogicalName(logicalName); }
  422. virtual void addWuException(const char * text, unsigned code, unsigned severity) { ctx->addWuException(text, code, severity); }
  423. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort) { ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort); }
  424. virtual IUserDescriptor *queryUserDescriptor() { return ctx->queryUserDescriptor(); }
  425. virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal) { return ctx->resolveChildQuery(activityId, colocal); }
  426. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { return ctx->getDatasetHash(name, hash); }
  427. virtual unsigned getNodes() { return ctx->getNodes(); }
  428. virtual unsigned getNodeNum() { return ctx->getNodeNum(); }
  429. virtual char *getFilePart(const char *logicalPart, bool create) { return ctx->getFilePart(logicalPart, create); }
  430. virtual unsigned __int64 getFileOffset(const char *logicalPart) { return ctx->getFileOffset(logicalPart); }
  431. virtual IDistributedFileTransaction *querySuperFileTransaction() { return ctx->querySuperFileTransaction(); }
  432. virtual char *getJobName() { return ctx->getJobName(); }
  433. virtual char *getJobOwner() { return ctx->getJobOwner(); }
  434. virtual char *getClusterName() { return ctx->getClusterName(); }
  435. virtual char *getGroupName() { return ctx->getGroupName(); }
  436. virtual char * queryIndexMetaData(char const * lfn, char const * xpath) { return ctx->queryIndexMetaData(lfn, xpath); }
  437. virtual unsigned getPriority() const { return ctx->getPriority(); }
  438. virtual char *getPlatform() { return ctx->getPlatform(); }
  439. virtual char *getEnv(const char *name, const char *defaultValue) const { return ctx->getEnv(name, defaultValue); }
  440. virtual char *getOS() { return ctx->getOS(); }
  441. virtual IEclGraphResults * resolveLocalQuery(__int64 activityId) { return ctx->resolveLocalQuery(activityId); }
  442. virtual char *getEnv(const char *name, const char *defaultValue) { return ctx->getEnv(name, defaultValue); }
  443. virtual unsigned logString(const char * text) const { return ctx->logString(text); }
  444. virtual const IContextLogger &queryContextLogger() const { return ctx->queryContextLogger(); }
  445. virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const { return ctx->getRowAllocator(meta, activityId); }
  446. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer); }
  447. virtual void getResultDictionary(size32_t & tcount, byte * * & tgt,IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) { ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher); }
  448. virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) { convertRowToXML(lenResult, result, info, row, flags); }
  449. virtual unsigned getGraphLoopCounter() const
  450. {
  451. return graph->queryLoopCounter(); // only called if value is valid
  452. }
  453. virtual IConstWUResult *getExternalResult(const char * wuid, const char *name, unsigned sequence) { return ctx->getExternalResult(wuid, name, sequence); }
  454. virtual IConstWUResult *getResultForGet(const char *name, unsigned sequence) { return ctx->getResultForGet(name, sequence); }
  455. virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  456. {
  457. return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  458. }
  459. virtual IEngineContext *queryEngineContext()
  460. {
  461. return ctx->queryEngineContext();
  462. }
  463. } graphCodeContext;
  464. protected:
  465. Owned<IThorGraphResults> localResults, graphLoopResults;
  466. CGraphBase *owner, *parent;
  467. Owned<IException> abortException;
  468. CGraphElementArrayCopy ifs;
  469. Owned<IPropertyTree> node;
  470. IBarrier *startBarrier, *waitBarrier, *doneBarrier;
  471. mptag_t mpTag, startBarrierTag, waitBarrierTag, doneBarrierTag;
  472. bool created, connected, started, aborted, graphDone, prepared, sequential;
  473. bool reinit, sentInitData, sentStartCtx;
  474. CJobBase &job;
  475. graph_id graphId;
  476. mptag_t executeReplyTag;
  477. size32_t parentExtractSz; // keep track of sz when passed in, as may need to serialize later
  478. MemoryBuffer parentExtractMb; // retain copy, used if slave transmits to master (child graph 1st time initialization of global graph)
  479. unsigned counter;
  480. CReplyCancelHandler graphCancelHandler;
  481. class CGraphGraphActElementIterator : public CInterface, implements IThorActivityIterator
  482. {
  483. protected:
  484. CGraphBase &graph;
  485. IPropertyTree &xgmml;
  486. Owned<IPropertyTreeIterator> iter;
  487. CGraphElementBase *current;
  488. public:
  489. IMPLEMENT_IINTERFACE;
  490. CGraphGraphActElementIterator(CGraphBase &_graph, IPropertyTree &_xgmml) : graph(_graph), xgmml(_xgmml)
  491. {
  492. iter.setown(xgmml.getElements("node"));
  493. }
  494. virtual bool first()
  495. {
  496. if (iter->first())
  497. {
  498. IPropertyTree &node = iter->query();
  499. current = graph.queryElement(node.getPropInt("@id"));
  500. if (current)
  501. return true;
  502. else if (next())
  503. return true;
  504. }
  505. current = NULL;
  506. return false;
  507. }
  508. virtual bool next()
  509. {
  510. loop
  511. {
  512. if (!iter->next())
  513. break;
  514. IPropertyTree &node = iter->query();
  515. current = graph.queryElement(node.getPropInt("@id"));
  516. if (current)
  517. return true;
  518. }
  519. current = NULL;
  520. return false;
  521. }
  522. virtual bool isValid() { return NULL!=current; }
  523. virtual CGraphElementBase & query()
  524. {
  525. return *current;
  526. }
  527. CGraphElementBase & get() { CGraphElementBase &c = query(); c.Link(); return c; }
  528. };
  529. public:
  530. IMPLEMENT_IINTERFACE;
  531. PooledThreadHandle poolThreadHandle;
  532. CGraphArrayCopy dependentSubGraphs;
  533. CGraphBase(CJobBase &job);
  534. ~CGraphBase();
  535. const void *queryFindParam() const { return &queryGraphId(); } // for SimpleHashTableOf
  536. virtual void init() { }
  537. IThorActivityIterator *getTraverseIterator(bool all=false); // all traverses and includes conditionals, others traverses connected nodes only
  538. void GraphPrintLog(const char *msg, ...) __attribute__((format(printf, 2, 3)));
  539. void GraphPrintLog(IException *e, const char *msg, ...) __attribute__((format(printf, 3, 4)));
  540. void GraphPrintLog(IException *e);
  541. void createFromXGMML(IPropertyTree *node, CGraphBase *owner, CGraphBase *parent, CGraphBase *resultsGraph);
  542. const bool &queryAborted() const { return aborted; }
  543. CJobBase &queryJob() const { return job; }
  544. IGraphTempHandler *queryTempHandler() const { assertex(tmpHandler.get()); return tmpHandler; }
  545. CGraphBase *queryOwner() { return owner; }
  546. CGraphBase *queryParent() { return parent?parent:this; }
  547. bool syncInitData();
  548. bool isComplete() const { return complete; }
  549. bool isPrepared() const { return prepared; }
  550. bool isGlobal() const { return global; }
  551. bool isCreated() const { return created; }
  552. bool isStarted() const { return started; }
  553. bool isLocalOnly() const; // this graph and all upstream dependencies
  554. bool isLocalChild() const { return localChild; }
  555. void setCompleteEx(bool tf=true) { complete = tf; }
  556. void setGlobal(bool tf) { global = tf; }
  557. void setLogging(bool tf);
  558. const byte *setParentCtx(size32_t _parentExtractSz, const byte *parentExtract)
  559. {
  560. parentExtractSz = _parentExtractSz;
  561. MemoryBuffer newParentExtract(parentExtractSz, parentExtract);
  562. parentExtractMb.swapWith(newParentExtract);
  563. return (const byte *)parentExtractMb.toByteArray();
  564. }
  565. virtual ICodeContext *queryCodeContext() { return &graphCodeContext; }
  566. void setLoopCounter(unsigned _counter) { counter = _counter; }
  567. unsigned queryLoopCounter() const { return counter; }
  568. virtual void setComplete(bool tf=true) { complete=tf; }
  569. virtual void deserializeCreateContexts(MemoryBuffer &mb);
  570. virtual void deserializeStartContexts(MemoryBuffer &mb);
  571. virtual void serializeCreateContexts(MemoryBuffer &mb);
  572. virtual void serializeStartContexts(MemoryBuffer &mb);
  573. virtual void reset();
  574. void disconnectActivities()
  575. {
  576. CGraphElementIterator iter(containers);
  577. ForEach(iter)
  578. {
  579. CGraphElementBase &element = iter.query();
  580. element.releaseIOs();
  581. }
  582. }
  583. virtual void executeSubGraph(size32_t parentExtractSz, const byte *parentExtract);
  584. void join();
  585. virtual void execute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool async);
  586. IThorActivityIterator *getIterator()
  587. {
  588. return new CGraphGraphActElementIterator(*this, *xgmml);
  589. }
  590. IThorActivityIterator *getSinkIterator() const
  591. {
  592. return new CGraphElementArrayIterator(sinks);
  593. }
  594. IPropertyTree &queryXGMML() const { return *xgmml; }
  595. void addActivity(CGraphElementBase *element)
  596. {
  597. if (containers.find((activity_id &)element->queryId()))
  598. {
  599. element->Release();
  600. return;
  601. }
  602. containers.replace(*element);
  603. if (element->isSink())
  604. sinks.append(*LINK(element));
  605. }
  606. bool removeActivity(CGraphElementBase *element)
  607. {
  608. bool res = containers.removeExact(element);
  609. sinks.zap(* element);
  610. return res;
  611. }
  612. unsigned activityCount() const
  613. {
  614. Owned<IPropertyTreeIterator> iter = xgmml->getElements("node");
  615. unsigned count=0;
  616. ForEach(*iter)
  617. {
  618. ThorActivityKind kind = (ThorActivityKind) iter->query().getPropInt("att[@name=\"_kind\"]/@value", TAKnone);
  619. if (TAKsubgraph != kind)
  620. ++count;
  621. }
  622. return count;
  623. }
  624. CGraphElementBase *queryElement(activity_id id) const
  625. {
  626. CGraphElementBase *activity = containers.find(id);
  627. if (activity)
  628. return activity;
  629. if (owner)
  630. return owner->queryElement(id);
  631. return NULL;
  632. }
  633. bool isSink() const { return sink; }
  634. void setSink(bool tf)
  635. {
  636. sink = tf;
  637. xgmml->setPropBool("att[@name=\"rootGraph\"]/@value", tf);
  638. }
  639. const activity_id &queryParentActivityId() const { return parentActivityId; }
  640. const graph_id &queryGraphId() const { return graphId; }
  641. void addChildGraph(CGraphBase &graph);
  642. unsigned queryChildGraphCount() { return childGraphs.ordinality(); }
  643. CGraphBase *getChildGraph(graph_id gid)
  644. {
  645. CriticalBlock b(crit);
  646. return LINK(childGraphsTable.find(gid));
  647. }
  648. IThorGraphIterator *getChildGraphs() const;
  649. void executeChildGraphs(size32_t parentExtractSz, const byte *parentExtract);
  650. void doExecute(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies);
  651. void doExecuteChild(size32_t parentExtractSz, const byte *parentExtract);
  652. void executeChild(size32_t & retSize, void * & ret, size32_t parentExtractSz, const byte *parentExtract);
  653. void setResults(IThorGraphResults *results);
  654. virtual void executeChild(size32_t parentExtractSz, const byte *parentExtract, IThorGraphResults *results, IThorGraphResults *graphLoopResults);
  655. virtual void executeChild(size32_t parentExtractSz, const byte *parentExtract);
  656. virtual bool serializeStats(MemoryBuffer &mb) { return false; }
  657. virtual bool prepare(size32_t parentExtractSz, const byte *parentExtract, bool checkDependencies, bool shortCircuit, bool async);
  658. virtual void create(size32_t parentExtractSz, const byte *parentExtract);
  659. virtual bool preStart(size32_t parentExtractSz, const byte *parentExtract);
  660. virtual void start() = 0;
  661. virtual bool wait(unsigned timeout);
  662. virtual void done();
  663. virtual void end();
  664. virtual void abort(IException *e);
  665. virtual IThorGraphResults *createThorGraphResults(unsigned num);
  666. // IExceptionHandler
  667. virtual bool fireException(IException *e);
  668. virtual IThorResult *getResult(unsigned id, bool distributed=false);
  669. virtual IThorResult *getGraphLoopResult(unsigned id, bool distributed=false);
  670. virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IThorGraphResults *results, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
  671. virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
  672. virtual IThorResult *createGraphLoopResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
  673. // IEclGraphResults
  674. virtual void getDictionaryResult(unsigned & count, byte * * & ret, unsigned id);
  675. virtual void getLinkedResult(unsigned & count, byte * * & ret, unsigned id);
  676. // IThorChildGraph
  677. // virtual void getResult(size32_t & retSize, void * & ret, unsigned id);
  678. // virtual void getLinkedResult(unsigned & count, byte * * & ret, unsigned id);
  679. virtual IEclGraphResults *evaluate(unsigned parentExtractSz, const byte * parentExtract);
  680. friend class CGraphElementBase;
  681. };
  682. class CGraphTableIterator : public CInterface, implements IThorGraphIterator
  683. {
  684. SuperHashIteratorOf<CGraphBase> iter;
  685. public:
  686. IMPLEMENT_IINTERFACE;
  687. CGraphTableIterator(const CGraphTable &table) : iter(table) { }
  688. virtual bool first() { return iter.first(); }
  689. virtual bool next() { return iter.next(); }
  690. virtual bool isValid() { return iter.isValid(); }
  691. virtual CGraphBase & query() { return iter.query(); }
  692. CGraphBase & get() { CGraphBase &c = query(); c.Link(); return c; }
  693. };
  694. interface IGraphExecutor : extends IInterface
  695. {
  696. virtual void add(CGraphBase *subGraph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSz, const byte *parentExtract) = 0;
  697. virtual IThreadPool &queryGraphPool() = 0 ;
  698. virtual void wait() = 0;
  699. };
  700. interface ILoadedDllEntry;
  701. interface IConstWorkUnit;
  702. interface IThorAllocator;
  703. class CThorCodeContextBase;
  704. class graph_decl CJobBase : public CInterface, implements IDiskUsage, implements IExceptionHandler, implements IGraphCallback
  705. {
  706. protected:
  707. Owned<IThorAllocator> thorAllocator;
  708. Owned<IGraphExecutor> graphExecutor;
  709. CriticalSection crit;
  710. Owned<ILoadedDllEntry> querySo;
  711. CThorCodeContextBase *codeCtx;
  712. IUserDescriptor *userDesc;
  713. offset_t maxDiskUsage, diskUsage;
  714. StringAttr key, graphName;
  715. bool aborted, pausing, resumed;
  716. StringBuffer wuid, user, scope, token;
  717. mutable CriticalSection wuDirty;
  718. mutable bool dirty;
  719. CGraphTable subGraphs;
  720. CGraphTableCopy allGraphs; // for lookup, includes all childGraphs
  721. mptag_t mpJobTag, slavemptag;
  722. Owned<IGroup> jobGroup, slaveGroup;
  723. Owned<ICommunicator> jobComm;
  724. rank_t myrank;
  725. ITimeReporter *timeReporter;
  726. Owned<IPropertyTree> xgmml;
  727. Owned<IGraphTempHandler> tmpHandler;
  728. bool timeActivities;
  729. unsigned maxActivityCores, globalMemorySize;
  730. unsigned forceLogGraphIdMin, forceLogGraphIdMax;
  731. Owned<IContextLogger> logctx;
  732. class CThorPluginCtx : public SimplePluginCtx
  733. {
  734. public:
  735. virtual int ctxGetPropInt(const char *propName, int defaultValue) const
  736. {
  737. return globals->getPropInt(propName, defaultValue);
  738. }
  739. virtual const char *ctxQueryProp(const char *propName) const
  740. {
  741. return globals->queryProp(propName);
  742. }
  743. } pluginCtx;
  744. SafePluginMap *pluginMap;
  745. void removeAssociates(CGraphBase &graph)
  746. {
  747. CriticalBlock b(crit);
  748. allGraphs.removeExact(&graph);
  749. Owned<IThorGraphIterator> iter = graph.getChildGraphs();
  750. ForEach(*iter)
  751. {
  752. CGraphBase &child = iter->query();
  753. removeAssociates(child);
  754. }
  755. }
  756. public:
  757. IMPLEMENT_IINTERFACE;
  758. CJobBase(const char *graphName);
  759. ~CJobBase();
  760. void clean();
  761. void init();
  762. void setXGMML(IPropertyTree *_xgmml) { xgmml.set(_xgmml); }
  763. IPropertyTree *queryXGMML() { return xgmml; }
  764. const bool &queryAborted() const { return aborted; }
  765. const char *queryKey() const { return key; }
  766. const char *queryGraphName() const { return graphName; }
  767. bool queryForceLogging(graph_id graphId, bool def) const;
  768. ITimeReporter &queryTimeReporter() { return *timeReporter; }
  769. const IContextLogger &queryContextLogger() const { return *logctx; }
  770. virtual IGraphTempHandler *createTempHandler(bool errorOnMissing) = 0;
  771. virtual CGraphBase *createGraph() = 0;
  772. void joinGraph(CGraphBase &graph);
  773. void startGraph(CGraphBase &graph, IGraphCallback &callback, bool checkDependencies, size32_t parentExtractSize, const byte *parentExtract);
  774. void addDependencies(IPropertyTree *xgmml, bool failIfMissing=true);
  775. void addSubGraph(CGraphBase &graph)
  776. {
  777. CriticalBlock b(crit);
  778. subGraphs.replace(graph);
  779. allGraphs.replace(graph);
  780. }
  781. void associateGraph(CGraphBase &graph)
  782. {
  783. CriticalBlock b(crit);
  784. allGraphs.replace(graph);
  785. }
  786. void removeSubGraph(CGraphBase &graph)
  787. {
  788. CriticalBlock b(crit);
  789. removeAssociates(graph);
  790. subGraphs.removeExact(&graph);
  791. }
  792. IThorGraphIterator *getSubGraphs();
  793. CGraphBase *getGraph(graph_id gid)
  794. {
  795. CriticalBlock b(crit);
  796. return LINK(allGraphs.find(gid));
  797. }
  798. IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
  799. roxiemem::IRowManager *queryRowManager() const;
  800. IThorResult *getOwnedResult(graph_id gid, activity_id ownerId, unsigned resultId);
  801. IThorAllocator *queryThorAllocator() const { return thorAllocator; }
  802. bool queryUseCheckpoints() const;
  803. const bool &queryPausing() const { return pausing; }
  804. const bool &queryResumed() const { return resumed; }
  805. IGraphTempHandler *queryTempHandler() const { return tmpHandler; }
  806. ILoadedDllEntry &queryDllEntry() const { return *querySo; }
  807. ICodeContext &queryCodeContext() const;
  808. IUserDescriptor *queryUserDescriptor() const { return userDesc; }
  809. virtual IConstWorkUnit &queryWorkUnit() const { throwUnexpected(); }
  810. virtual void markWuDirty() { };
  811. virtual __int64 getWorkUnitValueInt(const char *prop, __int64 defVal) const = 0;
  812. virtual StringBuffer &getWorkUnitValue(const char *prop, StringBuffer &str) const = 0;
  813. virtual bool getWorkUnitValueBool(const char *prop, bool defVal) const = 0;
  814. const char *queryWuid() const { return wuid.str(); }
  815. const char *queryUser() const { return user.str(); }
  816. const char *queryScope() const { return scope.str(); }
  817. IDiskUsage &queryIDiskUsage() const { return *(IDiskUsage *)this; }
  818. void setDiskUsage(offset_t _diskUsage) { diskUsage = _diskUsage; }
  819. const offset_t queryMaxDiskUsage() const { return maxDiskUsage; }
  820. mptag_t querySlaveMpTag() const { return slavemptag; }
  821. mptag_t queryJobMpTag() const { return mpJobTag; }
  822. unsigned querySlaves() const { return slaveGroup->ordinality(); }
  823. ICommunicator &queryJobComm() const { return *jobComm; }
  824. IGroup &queryJobGroup() const { return *jobGroup; }
  825. const bool &queryTimeActivities() const { return timeActivities; }
  826. unsigned queryMaxDefaultActivityCores() const { return maxActivityCores; }
  827. IGroup &querySlaveGroup() const { return *slaveGroup; }
  828. const rank_t &queryMyRank() const { return myrank; }
  829. mptag_t allocateMPTag();
  830. void freeMPTag(mptag_t tag);
  831. mptag_t deserializeMPTag(MemoryBuffer &mb);
  832. bool getOptBool(const char *opt, bool dft=false);
  833. int getOptInt(const char *opt, int dft=0);
  834. unsigned getOptUInt(const char *opt, unsigned dft=0) { return (unsigned)getOptInt(opt, dft); }
  835. __int64 getOptInt64(const char *opt, __int64 dft=0);
  836. unsigned __int64 getOptUInt64(const char *opt, unsigned __int64 dft=0) { return (unsigned __int64)getOptInt64(opt, dft); }
  837. virtual void abort(IException *e);
  838. virtual IBarrier *createBarrier(mptag_t tag) { UNIMPLEMENTED; return NULL; }
  839. //
  840. virtual void addCreatedFile(const char *file) { assertex(false); }
  841. virtual __int64 addNodeDiskUsage(unsigned node, __int64 sz) { assertex(false); return 0; }
  842. // IDiskUsage
  843. virtual void increase(offset_t usage, const char *key=NULL);
  844. virtual void decrease(offset_t usage, const char *key=NULL);
  845. // IExceptionHandler
  846. virtual bool fireException(IException *e) = 0;
  847. // IGraphCallback
  848. virtual void runSubgraph(CGraphBase &graph, size32_t parentExtractSz, const byte *parentExtract);
  849. };
  850. interface IOutputMetaData;
  851. class graph_decl CActivityBase : public CInterface, implements IExceptionHandler, implements IRowInterfaces
  852. {
  853. Owned<IEngineRowAllocator> rowAllocator;
  854. Owned<IOutputRowSerializer> rowSerializer;
  855. Owned<IOutputRowDeserializer> rowDeserializer;
  856. CSingletonLock CABallocatorlock;
  857. CSingletonLock CABserializerlock;
  858. CSingletonLock CABdeserializerlock;
  859. protected:
  860. CGraphElementBase &container;
  861. Linked<IHThorArg> baseHelper;
  862. mptag_t mpTag; // to be used by any direct inter master<->slave communication
  863. bool abortSoon;
  864. const bool &timeActivities; // purely for access efficiency
  865. size32_t parentExtractSz;
  866. const byte *parentExtract;
  867. bool receiving, cancelledReceive, reInit;
  868. unsigned maxCores; // NB: only used by acts that sort at the moment
  869. Owned<IThorGraphResults> ownedResults; // NB: probably only to be used by loop results
  870. public:
  871. IMPLEMENT_IINTERFACE;
  872. CActivityBase(CGraphElementBase *container);
  873. ~CActivityBase();
  874. CGraphElementBase &queryContainer() const { return container; }
  875. CJobBase &queryJob() const { return container.queryJob(); }
  876. CGraphBase &queryGraph() const { return container.queryOwner(); }
  877. inline const mptag_t queryMpTag() const { return mpTag; }
  878. inline const bool &queryAbortSoon() const { return abortSoon; }
  879. inline IHThorArg *queryHelper() const { return baseHelper; }
  880. inline bool needReInit() const { return reInit; }
  881. inline const bool &queryTimeActivities() const { return timeActivities; }
  882. void onStart(size32_t _parentExtractSz, const byte *_parentExtract) { parentExtractSz = _parentExtractSz; parentExtract = _parentExtract; }
  883. bool receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER);
  884. void cancelReceiveMsg(const rank_t rank, const mptag_t mpTag);
  885. bool firstNode() { return 1 == container.queryJob().queryMyRank(); }
  886. bool lastNode() { return container.queryJob().querySlaves() == container.queryJob().queryMyRank(); }
  887. unsigned queryMaxCores() const { return maxCores; }
  888. IRowInterfaces *getRowInterfaces();
  889. virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx) { }
  890. virtual void clearConnections() { }
  891. virtual void releaseIOs() { }
  892. virtual void preStart(size32_t parentExtractSz, const byte *parentExtract) { }
  893. virtual void startProcess(bool async=true) { }
  894. virtual bool wait(unsigned timeout) { return true; } // NB: true == success
  895. virtual void reset() { receiving = abortSoon = cancelledReceive = false; }
  896. virtual void done() { }
  897. virtual void kill();
  898. virtual void abort();
  899. virtual MemoryBuffer &queryInitializationData(unsigned slave) const = 0;
  900. virtual MemoryBuffer &getInitializationData(unsigned slave, MemoryBuffer &mb) const = 0;
  901. virtual IThorGraphResults *queryResults() { return ownedResults; }
  902. void ActPrintLog(const char *format, ...) __attribute__((format(printf, 2, 3)));
  903. void ActPrintLog(IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
  904. void ActPrintLog(IException *e);
  905. // IExceptionHandler
  906. bool fireException(IException *e);
  907. virtual IEngineRowAllocator * queryRowAllocator();
  908. virtual IOutputRowSerializer * queryRowSerializer();
  909. virtual IOutputRowDeserializer * queryRowDeserializer();
  910. virtual IOutputMetaData *queryRowMetaData() { return baseHelper->queryOutputMeta(); }
  911. virtual unsigned queryActivityId() { return (unsigned)container.queryId(); }
  912. virtual ICodeContext *queryCodeContext() { return container.queryCodeContext(); }
  913. bool getOptBool(const char *prop, bool defVal=false) const;
  914. int getOptInt(const char *prop, int defVal=0) const;
  915. unsigned getOptUInt(const char *prop, unsigned defVal=0) const { return (unsigned)getOptInt(prop, defVal); }
  916. __int64 getOptInt64(const char *prop, __int64 defVal=0) const;
  917. unsigned __int64 getOptUInt64(const char *prop, unsigned __int64 defVal=0) const { return (unsigned __int64)getOptInt64(prop, defVal); }
  918. };
  919. interface IFileInProgressHandler : extends IInterface
  920. {
  921. virtual void add(const char *fip) = 0;
  922. virtual void remove(const char *fip) = 0;
  923. };
  924. class CFIPScope
  925. {
  926. StringAttr fip;
  927. public:
  928. CFIPScope() { }
  929. CFIPScope(const char *_fip) : fip(_fip)
  930. {
  931. queryThor().queryFileInProgressHandler().add(fip);
  932. }
  933. ~CFIPScope()
  934. {
  935. if (fip)
  936. queryThor().queryFileInProgressHandler().remove(fip);
  937. }
  938. void set(const char *_fip)
  939. {
  940. fip.set(_fip);
  941. }
  942. void clear()
  943. {
  944. fip.clear();
  945. }
  946. };
  947. interface IDelayedFile;
  948. interface IExpander;
  949. interface IThorFileCache : extends IInterface
  950. {
  951. virtual bool remove(IDelayedFile &dFile) = 0;
  952. virtual IDelayedFile *lookup(CActivityBase &activity, IPartDescriptor &partDesc, IExpander *expander=NULL) = 0;
  953. };
  954. class graph_decl CThorResourceBase : public CInterface, implements IThorResource
  955. {
  956. public:
  957. IMPLEMENT_IINTERFACE;
  958. // IThorResource
  959. virtual IThorFileCache &queryFileCache() { UNIMPLEMENTED; return *((IThorFileCache *)NULL); }
  960. virtual IBackup &queryBackup() { UNIMPLEMENTED; return *((IBackup *)NULL); }
  961. virtual IFileInProgressHandler &queryFileInProgressHandler() { UNIMPLEMENTED; return *((IFileInProgressHandler *)NULL); }
  962. };
  963. class graph_decl CThorGraphResults : public CInterface, implements IThorGraphResults
  964. {
  965. protected:
  966. class CThorUninitializedGraphResults : public CInterface, implements IThorResult
  967. {
  968. unsigned id;
  969. public:
  970. IMPLEMENT_IINTERFACE
  971. CThorUninitializedGraphResults(unsigned _id) { id = _id; }
  972. virtual IRowWriter *getWriter() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  973. virtual void setResultStream(IRowWriterMultiReader *stream, rowcount_t count) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  974. virtual IRowStream *getRowStream() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  975. virtual IRowInterfaces *queryRowInterfaces() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  976. virtual CActivityBase *queryActivity() { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  977. virtual bool isDistributed() const { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  978. virtual void serialize(MemoryBuffer &mb) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  979. virtual void getResult(size32_t & retSize, void * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  980. virtual void getLinkedResult(unsigned & count, byte * * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  981. virtual void getDictionaryResult(unsigned & count, byte * * & ret) { throw MakeStringException(0, "Graph Result %d accessed before it is created", id); }
  982. };
  983. IArrayOf<IThorResult> results;
  984. CriticalSection cs;
  985. activity_id ownerId;
  986. void ensureAtLeast(unsigned id)
  987. {
  988. while (results.ordinality() < id)
  989. results.append(*new CThorUninitializedGraphResults(results.ordinality()));
  990. }
  991. public:
  992. IMPLEMENT_IINTERFACE;
  993. CThorGraphResults(unsigned _numResults) { ensureAtLeast(_numResults); ownerId = 0; }
  994. virtual void clear()
  995. {
  996. CriticalBlock procedure(cs);
  997. results.kill();
  998. }
  999. virtual IThorResult *getResult(unsigned id, bool distributed)
  1000. {
  1001. CriticalBlock procedure(cs);
  1002. ensureAtLeast(id+1);
  1003. // NB: stream static after this, i.e. nothing can be added to this result
  1004. return LINK(&results.item(id));
  1005. }
  1006. virtual IThorResult *createResult(CActivityBase &activity, unsigned id, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
  1007. virtual IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT)
  1008. {
  1009. return createResult(activity, results.ordinality(), rowIf, distributed, spillPriority);
  1010. }
  1011. virtual unsigned addResult(IThorResult *result)
  1012. {
  1013. CriticalBlock procedure(cs);
  1014. unsigned id = results.ordinality();
  1015. setResult(id, result);
  1016. return id;
  1017. }
  1018. virtual void setResult(unsigned id, IThorResult *result)
  1019. {
  1020. CriticalBlock procedure(cs);
  1021. ensureAtLeast(id);
  1022. if (results.isItem(id))
  1023. results.replace(*LINK(result), id);
  1024. else
  1025. results.append(*LINK(result));
  1026. }
  1027. virtual unsigned count() { return results.ordinality(); }
  1028. virtual void getLinkedResult(unsigned & count, byte * * & ret, unsigned id)
  1029. {
  1030. Owned<IThorResult> result = getResult(id, true);
  1031. result->getLinkedResult(count, ret);
  1032. }
  1033. virtual void getDictionaryResult(unsigned & count, byte * * & ret, unsigned id)
  1034. {
  1035. Owned<IThorResult> result = getResult(id, true);
  1036. result->getLinkedResult(count, ret);
  1037. }
  1038. virtual void setOwner(activity_id _ownerId) { ownerId = _ownerId; }
  1039. virtual activity_id queryOwnerId() const { return ownerId; }
  1040. };
  1041. extern graph_decl IThorResult *createResult(CActivityBase &activity, IRowInterfaces *rowIf, bool distributed, unsigned spillPriority=SPILL_PRIORITY_RESULT);
  1042. class CGraphElementBase;
  1043. typedef CGraphElementBase *(*CreateFunc)(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph);
  1044. extern graph_decl void registerCreateFunc(CreateFunc func);
  1045. extern graph_decl CGraphElementBase *createGraphElement(IPropertyTree &node, CGraphBase &owner, CGraphBase *resultsGraph);
  1046. extern graph_decl IThorBoundLoopGraph *createBoundLoopGraph(CGraphBase *graph, IOutputMetaData *resultMeta, unsigned activityId);
  1047. extern graph_decl bool isDiskInput(ThorActivityKind kind);
  1048. extern graph_decl bool isLoopActivity(CGraphElementBase &container);
  1049. #endif