thorcommon.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef THORCOMMON_HPP
  14. #define THORCOMMON_HPP
  15. #include "jiface.hpp"
  16. #include "jcrc.hpp"
  17. #include "jlzw.hpp"
  18. #include "jsort.hpp"
  19. #include "jdebug.hpp"
  20. #include "jfile.hpp"
  21. #include "eclhelper.hpp"
  22. #include "rtldynfield.hpp"
  23. #include "thorhelper.hpp"
  24. #include "thorxmlwrite.hpp"
  25. static unsigned const defaultDaliResultOutputMax = 2000; // MB
  26. static unsigned const defaultDaliResultLimit = 10; // MB
  27. static unsigned const defaultMaxCsvRowSize = 10; // MB
  28. #define OPT_OUTPUTLIMIT_LEGACY "outputLimit" // OUTPUT Mb limit (legacy property name, renamed to outputLimitMb in 5.2)
  29. #define OPT_OUTPUTLIMIT "outputLimitMb" // OUTPUT Mb limit (default = 10 [MB])
  30. #define OPT_MAXCSVROWSIZE "maxCsvRowSizeMb" // Upper limit on csv read line size (default = 10 [MB])
  31. #define OPT_VALIDATE_FILE_TYPE "validateFileType" // Validate that diskread file type matches (default = true)
  32. class THORHELPER_API CSizingSerializer : implements IRowSerializerTarget
  33. {
  34. size32_t totalsize;
  35. public:
  36. inline CSizingSerializer() { reset(); }
  37. inline void reset() { totalsize = 0; }
  38. inline size32_t size() { return totalsize; }
  39. virtual void put(size32_t len, const void * ptr);
  40. virtual size32_t beginNested(size32_t count);
  41. virtual void endNested(size32_t position);
  42. };
  43. class THORHELPER_API CMemoryRowSerializer: implements IRowSerializerTarget
  44. {
  45. MemoryBuffer & buffer;
  46. unsigned nesting;
  47. public:
  48. inline CMemoryRowSerializer(MemoryBuffer & _buffer)
  49. : buffer(_buffer)
  50. {
  51. nesting = 0;
  52. }
  53. virtual void put(size32_t len, const void * ptr);
  54. virtual size32_t beginNested(size32_t count);
  55. virtual void endNested(size32_t sizePos);
  56. };
  57. // useful package
  58. interface IRowInterfaces: extends IInterface
  59. {
  60. virtual IEngineRowAllocator * queryRowAllocator()=0;
  61. virtual IOutputRowSerializer * queryRowSerializer()=0;
  62. virtual IOutputRowDeserializer * queryRowDeserializer()=0;
  63. virtual IOutputMetaData *queryRowMetaData()=0;
  64. virtual unsigned queryActivityId() const=0;
  65. virtual ICodeContext *queryCodeContext()=0;
  66. };
  67. extern THORHELPER_API void useMemoryMappedRead(bool on);
  68. extern THORHELPER_API IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, unsigned heapFlags, ICodeContext *context);
  69. enum RowReaderWriterFlags
  70. {
  71. rw_grouped = 0x1,
  72. rw_crc = 0x2,
  73. rw_extend = 0x4,
  74. rw_compress = 0x8,
  75. rw_compressblkcrc = 0x10, // block compression, this sets/checks crc's at block level
  76. rw_fastlz = 0x20, // if rw_compress
  77. rw_autoflush = 0x40,
  78. rw_buffered = 0x80,
  79. rw_lzw = 0x100, // if rw_compress
  80. rw_lz4 = 0x200, // if rw_compress
  81. rw_sparse = 0x400 // NB: mutually exclusive with rw_grouped
  82. };
  83. #define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
  84. inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
  85. #define COMP_MASK (rw_compress|rw_compressblkcrc|rw_fastlz|rw_lzw|rw_lz4)
  86. #define COMP_TYPE_MASK (rw_fastlz|rw_lzw|rw_lz4)
  87. inline void setCompFlag(const char *compStr, unsigned &flags)
  88. {
  89. flags &= ~COMP_TYPE_MASK;
  90. if (!isEmptyString(compStr))
  91. {
  92. if (0 == stricmp("FLZ", compStr))
  93. flags |= rw_fastlz;
  94. else if (0 == stricmp("LZW", compStr))
  95. flags |= rw_lzw;
  96. else // not specifically FLZ or LZW so set to default LZ4
  97. flags |= rw_lz4;
  98. }
  99. else // default is LZ4
  100. flags |= rw_lz4;
  101. }
  102. inline unsigned getCompMethod(unsigned flags)
  103. {
  104. unsigned compMethod = COMPRESS_METHOD_LZ4;
  105. if (TestRwFlag(flags, rw_lzw))
  106. compMethod = COMPRESS_METHOD_LZW;
  107. else if (TestRwFlag(flags, rw_fastlz))
  108. compMethod = COMPRESS_METHOD_FASTLZ;
  109. return compMethod;
  110. }
  111. inline unsigned getCompMethod(const char *compStr)
  112. {
  113. unsigned compMethod = COMPRESS_METHOD_LZ4;
  114. if (!isEmptyString(compStr))
  115. {
  116. if (0 == stricmp("FLZ", compStr))
  117. compMethod = COMPRESS_METHOD_FASTLZ;
  118. else if (0 == stricmp("LZW", compStr))
  119. compMethod = COMPRESS_METHOD_LZW;
  120. }
  121. return compMethod;
  122. }
  123. interface IExtRowStream: extends IRowStream
  124. {
  125. virtual offset_t getOffset() const = 0;
  126. virtual offset_t getLastRowOffset() const = 0;
  127. virtual unsigned __int64 queryProgress() const = 0;
  128. using IRowStream::stop;
  129. virtual void stop(CRC32 *crcout) = 0;
  130. virtual const byte *prefetchRow() = 0;
  131. virtual void prefetchDone() = 0;
  132. virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
  133. virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
  134. virtual void setFilters(IConstArrayOf<IFieldFilter> &filters) = 0;
  135. };
  136. interface IExtRowWriter: extends IRowWriter
  137. {
  138. virtual offset_t getPosition() = 0;
  139. using IRowWriter::flush;
  140. virtual void flush(CRC32 *crcout) = 0;
  141. };
  142. enum EmptyRowSemantics { ers_forbidden, ers_allow, ers_eogonly };
  143. inline unsigned mapESRToRWFlags(EmptyRowSemantics emptyRowSemantics)
  144. {
  145. switch (emptyRowSemantics)
  146. {
  147. case ers_allow:
  148. return rw_sparse;
  149. case ers_eogonly:
  150. return rw_grouped;
  151. default:
  152. return 0;
  153. }
  154. }
  155. inline EmptyRowSemantics extractESRFromRWFlags(unsigned rwFlags)
  156. {
  157. if (TestRwFlag(rwFlags, rw_sparse))
  158. return ers_allow;
  159. else if (TestRwFlag(rwFlags, rw_grouped))
  160. return ers_eogonly;
  161. else
  162. return ers_forbidden;
  163. }
  164. interface ITranslator : extends IInterface
  165. {
  166. virtual IOutputMetaData &queryActualFormat() const = 0;
  167. virtual const IDynamicTransform &queryTranslator() const = 0;
  168. virtual const IKeyTranslator *queryKeyedTranslator() const = 0;
  169. };
  170. interface IExpander;
  171. extern THORHELPER_API IExtRowStream *createRowStreamEx(IFileIO *fileIO, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback=nullptr);
  172. extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowif, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=nullptr, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback=nullptr);
  173. extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=nullptr, ITranslator *translatorContainer=nullptr, IVirtualFieldCallback * _fieldCallback = nullptr);
  174. interface ICompressor;
  175. extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=NULL, size32_t compressorBlkSz=0);
  176. extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *fileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, size32_t compressorBlkSz=0);
  177. extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS); // strm should be unbuffered
  178. interface THORHELPER_API IDiskMerger : extends IInterface
  179. {
  180. virtual void put(const void **rows, unsigned numrows) = 0;
  181. virtual void putIndirect(const void ***rowptrs, unsigned numrows) = 0; // like put only with an additional dereference, i.e. row i is *(rowptrs[i])
  182. virtual void put(ISortedRowProvider * rows) = 0;
  183. virtual IRowStream *merge(ICompare *icompare,bool partdedup=false) = 0;
  184. virtual count_t mergeTo(IRowWriter *dest,ICompare *icompare,bool partdedup=false) = 0; // alternative to merge
  185. virtual IRowWriter *createWriteBlock() = 0;
  186. };
  187. extern THORHELPER_API IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase);
  188. #ifdef HAS_GOOD_CYCLE_COUNTER
  189. #define TIME_ACTIVITIES
  190. #endif
  191. class THORHELPER_API ActivityTimeAccumulator
  192. {
  193. friend class ActivityTimer;
  194. public:
  195. ActivityTimeAccumulator()
  196. {
  197. reset();
  198. }
  199. public:
  200. cycle_t startCycles; // Wall clock time of first entry to this activity
  201. cycle_t totalCycles; // Time spent in this activity
  202. cycle_t endCycles; // Wall clock time of last entry to this activity
  203. unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch)
  204. cycle_t firstExitCycles; // Wall clock time of first exit from this activity
  205. cycle_t blockedCycles; // Time spent blocked
  206. // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit)
  207. inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); }
  208. // Return the total amount of time (in nanoseconds) spent in the first call of this activity (first entry to first exit)
  209. inline unsigned __int64 latency() const { return cycle_to_nanosec(latencyCycles()); }
  210. inline cycle_t latencyCycles() const { return firstExitCycles-startCycles; }
  211. void addStatistics(IStatisticGatherer & builder) const;
  212. void addStatistics(CRuntimeStatisticCollection & merged) const;
  213. void merge(const ActivityTimeAccumulator & other);
  214. void reset()
  215. {
  216. startCycles = 0;
  217. totalCycles = 0;
  218. endCycles = 0;
  219. firstRow = 0;
  220. firstExitCycles = 0;
  221. blockedCycles = 0;
  222. }
  223. };
  224. #ifdef TIME_ACTIVITIES
  225. class ActivityTimer
  226. {
  227. unsigned __int64 startCycles;
  228. ActivityTimeAccumulator &accumulator;
  229. protected:
  230. const bool enabled;
  231. bool isFirstRow;
  232. public:
  233. ActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
  234. : accumulator(_accumulator), enabled(_enabled), isFirstRow(false)
  235. {
  236. if (enabled)
  237. {
  238. startCycles = get_cycles_now();
  239. if (!accumulator.firstRow)
  240. {
  241. isFirstRow = true;
  242. accumulator.startCycles = startCycles;
  243. accumulator.firstRow = getTimeStampNowValue();
  244. }
  245. }
  246. else
  247. startCycles = 0;
  248. }
  249. ~ActivityTimer()
  250. {
  251. if (enabled)
  252. {
  253. cycle_t nowCycles = get_cycles_now();
  254. accumulator.endCycles = nowCycles;
  255. cycle_t elapsedCycles = nowCycles - startCycles;
  256. accumulator.totalCycles += elapsedCycles;
  257. if (isFirstRow)
  258. accumulator.firstExitCycles = nowCycles;
  259. }
  260. }
  261. };
  262. class SimpleActivityTimer
  263. {
  264. cycle_t startCycles;
  265. cycle_t &accumulator;
  266. protected:
  267. const bool enabled;
  268. public:
  269. inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled)
  270. : accumulator(_accumulator), enabled(_enabled)
  271. {
  272. if (enabled)
  273. startCycles = get_cycles_now();
  274. else
  275. startCycles = 0;
  276. }
  277. inline ~SimpleActivityTimer()
  278. {
  279. if (enabled)
  280. {
  281. cycle_t nowCycles = get_cycles_now();
  282. cycle_t elapsedCycles = nowCycles - startCycles;
  283. accumulator += elapsedCycles;
  284. }
  285. }
  286. };
  287. class BlockedActivityTimer
  288. {
  289. unsigned __int64 startCycles;
  290. ActivityTimeAccumulator &accumulator;
  291. protected:
  292. const bool enabled;
  293. public:
  294. BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
  295. : accumulator(_accumulator), enabled(_enabled)
  296. {
  297. if (enabled)
  298. startCycles = get_cycles_now();
  299. else
  300. startCycles = 0;
  301. }
  302. ~BlockedActivityTimer()
  303. {
  304. if (enabled)
  305. {
  306. cycle_t elapsedCycles = get_cycles_now() - startCycles;
  307. accumulator.blockedCycles += elapsedCycles;
  308. }
  309. }
  310. };
  311. #else
  312. struct ActivityTimer
  313. {
  314. inline ActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { }
  315. };
  316. struct SimpleActivityTimer
  317. {
  318. inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled) { }
  319. };
  320. struct BlockedActivityTimer
  321. {
  322. inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { }
  323. };
  324. #endif
  325. class THORHELPER_API IndirectCodeContext : implements ICodeContext
  326. {
  327. public:
  328. IndirectCodeContext(ICodeContext * _ctx = NULL) : ctx(_ctx) {}
  329. void set(ICodeContext * _ctx) { ctx = _ctx; }
  330. virtual const char *loadResource(unsigned id)
  331. {
  332. return ctx->loadResource(id);
  333. }
  334. virtual void setResultBool(const char *name, unsigned sequence, bool value)
  335. {
  336. ctx->setResultBool(name, sequence, value);
  337. }
  338. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
  339. {
  340. ctx->setResultData(name, sequence, len, data);
  341. }
  342. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  343. {
  344. ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val);
  345. }
  346. virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size)
  347. {
  348. ctx->setResultInt(name, sequence, value, size);
  349. }
  350. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
  351. {
  352. ctx->setResultRaw(name, sequence, len, data);
  353. }
  354. virtual void setResultReal(const char * stepname, unsigned sequence, double value)
  355. {
  356. ctx->setResultReal(stepname, sequence, value);
  357. }
  358. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
  359. {
  360. ctx->setResultSet(name, sequence, isAll, len, data, transformer);
  361. }
  362. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
  363. {
  364. ctx->setResultString(name, sequence, len, str);
  365. }
  366. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size)
  367. {
  368. ctx->setResultUInt(name, sequence, value, size);
  369. }
  370. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
  371. {
  372. ctx->setResultUnicode(name, sequence, len, str);
  373. }
  374. virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
  375. {
  376. ctx->setResultVarString(name, sequence, value);
  377. }
  378. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
  379. {
  380. ctx->setResultVarUnicode(name, sequence, value);
  381. }
  382. virtual bool getResultBool(const char * name, unsigned sequence)
  383. {
  384. return ctx->getResultBool(name, sequence);
  385. }
  386. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
  387. {
  388. ctx->getResultData(tlen, tgt, name, sequence);
  389. }
  390. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  391. {
  392. ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
  393. }
  394. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  395. {
  396. ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
  397. }
  398. virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  399. {
  400. ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
  401. }
  402. virtual __int64 getResultInt(const char * name, unsigned sequence)
  403. {
  404. return ctx->getResultInt(name, sequence);
  405. }
  406. virtual double getResultReal(const char * name, unsigned sequence)
  407. {
  408. return ctx->getResultReal(name, sequence);
  409. }
  410. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
  411. {
  412. ctx->getResultString(tlen, tgt, name, sequence);
  413. }
  414. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
  415. {
  416. ctx->getResultStringF(tlen, tgt, name, sequence);
  417. }
  418. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
  419. {
  420. ctx->getResultUnicode(tlen, tgt, name, sequence);
  421. }
  422. virtual char *getResultVarString(const char * name, unsigned sequence)
  423. {
  424. return ctx->getResultVarString(name, sequence);
  425. }
  426. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
  427. {
  428. return ctx->getResultVarUnicode(name, sequence);
  429. }
  430. virtual unsigned getResultHash(const char * name, unsigned sequence)
  431. {
  432. return ctx->getResultHash(name, sequence);
  433. }
  434. virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence)
  435. {
  436. return ctx->getExternalResultHash(wuid, name, sequence);
  437. }
  438. virtual char *getWuid()
  439. {
  440. return ctx->getWuid();
  441. }
  442. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  443. {
  444. ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer);
  445. }
  446. virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
  447. {
  448. ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract);
  449. }
  450. virtual char * getExpandLogicalName(const char * logicalName)
  451. {
  452. return ctx->getExpandLogicalName(logicalName);
  453. }
  454. virtual void addWuException(const char * text, unsigned code, unsigned severity, const char *source)
  455. {
  456. ctx->addWuException(text, code, severity, source);
  457. }
  458. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  459. {
  460. ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort);
  461. }
  462. virtual IUserDescriptor *queryUserDescriptor()
  463. {
  464. return ctx->queryUserDescriptor();
  465. }
  466. virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
  467. {
  468. return ctx->resolveChildQuery(activityId, colocal);
  469. }
  470. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)
  471. {
  472. return ctx->getDatasetHash(name, hash);
  473. }
  474. virtual unsigned getNodes()
  475. {
  476. return ctx->getNodes();
  477. }
  478. virtual unsigned getNodeNum()
  479. {
  480. return ctx->getNodeNum();
  481. }
  482. virtual char *getFilePart(const char *logicalPart, bool create)
  483. {
  484. return ctx->getFilePart(logicalPart, create);
  485. }
  486. virtual unsigned __int64 getFileOffset(const char *logicalPart)
  487. {
  488. return ctx->getFileOffset(logicalPart);
  489. }
  490. virtual IDistributedFileTransaction *querySuperFileTransaction()
  491. {
  492. return ctx->querySuperFileTransaction();
  493. }
  494. virtual char *getEnv(const char *name, const char *defaultValue) const
  495. {
  496. return ctx->getEnv(name, defaultValue);
  497. }
  498. virtual char *getJobName()
  499. {
  500. return ctx->getJobName();
  501. }
  502. virtual char *getJobOwner()
  503. {
  504. return ctx->getJobOwner();
  505. }
  506. virtual char *getClusterName()
  507. {
  508. return ctx->getClusterName();
  509. }
  510. virtual char *getGroupName()
  511. {
  512. return ctx->getGroupName();
  513. }
  514. virtual char * queryIndexMetaData(char const * lfn, char const * xpath)
  515. {
  516. return ctx->queryIndexMetaData(lfn, xpath);
  517. }
  518. virtual unsigned getPriority() const
  519. {
  520. return ctx->getPriority();
  521. }
  522. virtual char *getPlatform()
  523. {
  524. return ctx->getPlatform();
  525. }
  526. virtual char *getOS()
  527. {
  528. return ctx->getOS();
  529. }
  530. virtual IEclGraphResults * resolveLocalQuery(__int64 activityId)
  531. {
  532. return ctx->resolveLocalQuery(activityId);
  533. }
  534. virtual char *getEnv(const char *name, const char *defaultValue)
  535. {
  536. return ctx->getEnv(name, defaultValue);
  537. }
  538. virtual unsigned logString(const char *text) const
  539. {
  540. return ctx->logString(text);
  541. }
  542. virtual const IContextLogger &queryContextLogger() const
  543. {
  544. return ctx->queryContextLogger();
  545. }
  546. virtual IDebuggableContext *queryDebugContext() const
  547. {
  548. return ctx->queryDebugContext();
  549. }
  550. virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  551. {
  552. return ctx->getRowAllocator(meta, activityId);
  553. }
  554. virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
  555. {
  556. return ctx->getRowAllocatorEx(meta, activityId, heapFlags);
  557. }
  558. virtual const char *cloneVString(const char *str) const
  559. {
  560. return ctx->cloneVString(str);
  561. }
  562. virtual const char *cloneVString(size32_t len, const char *str) const
  563. {
  564. return ctx->cloneVString(len, str);
  565. }
  566. virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
  567. {
  568. ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer);
  569. }
  570. virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
  571. {
  572. ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher);
  573. }
  574. virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  575. {
  576. convertRowToXML(lenResult, result, info, row, flags);
  577. }
  578. virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  579. {
  580. convertRowToJSON(lenResult, result, info, row, flags);
  581. }
  582. virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  583. {
  584. return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  585. }
  586. virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  587. {
  588. return ctx->fromJson(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  589. }
  590. virtual IEngineContext *queryEngineContext()
  591. {
  592. return ctx->queryEngineContext();
  593. }
  594. virtual char *getDaliServers()
  595. {
  596. return ctx->getDaliServers();
  597. }
  598. virtual IWorkUnit *updateWorkUnit() const
  599. {
  600. return ctx->updateWorkUnit();
  601. }
  602. virtual ISectionTimer * registerTimer(unsigned activityId, const char * name)
  603. {
  604. return ctx->registerTimer(activityId, name);
  605. }
  606. virtual unsigned getGraphLoopCounter() const override
  607. {
  608. return ctx->getGraphLoopCounter();
  609. }
  610. virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char *source) override
  611. {
  612. ctx->addWuExceptionEx(text, code, severity, audience, source);
  613. }
  614. protected:
  615. ICodeContext * ctx;
  616. };
  617. extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
  618. extern THORHELPER_API bool isActivitySource(ThorActivityKind kind);
  619. extern THORHELPER_API const char * getActivityText(ThorActivityKind kind);
  620. extern THORHELPER_API void setProcessAffinity(const char * cpus);
  621. extern THORHELPER_API void setAutoAffinity(unsigned curProcess, unsigned processPerNode, const char * optNodes);
  622. extern THORHELPER_API void bindMemoryToLocalNodes();
  623. extern THORHELPER_API IOutputMetaData *getDaliLayoutInfo(IPropertyTree const &props);
  624. extern THORHELPER_API bool getDaliLayoutInfo(MemoryBuffer &layoutBin, IPropertyTree const &props);
  625. /* Returns a dynamic translator (as 1st parameter) given a generated expected format, the published format and the desired projectedFormat,
  626. * providing translation mode and crc's allow translation. Returns true if translator created.
  627. * NB: translator and keyedTranslator are expected to be empty before calling.
  628. */
  629. extern THORHELPER_API bool getTranslators(Owned<const IDynamicTransform> &translator, const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode);
  630. // Same as above, but will also return a key field translator in 2nd parameter. Returns true if translator created.
  631. extern THORHELPER_API bool getTranslators(Owned<const IDynamicTransform> &translator, Owned<const IKeyTranslator> &keyedTranslator, const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode);
  632. // Returns a ITranslator that gives access to a dynamic translator, keyed translator and the format used
  633. extern THORHELPER_API ITranslator *getTranslators(const char *tracing, unsigned expectedCrc, IOutputMetaData *expectedFormat, unsigned publishedCrc, IOutputMetaData *publishedFormat, unsigned projectedCrc, IOutputMetaData *projectedFormat, RecordTranslationMode mode);
  634. inline bool isActivityCodeSigned(IPropertyTree &graphNode)
  635. {
  636. if (!isEmptyString(graphNode.queryProp("att[@name=\"signedBy\"]/@value")))
  637. return true;
  638. return false;
  639. }
  640. #endif // THORHELPER_HPP