thorcommon.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef THORCOMMON_HPP
  14. #define THORCOMMON_HPP
  15. #include "jiface.hpp"
  16. #include "jcrc.hpp"
  17. #include "jsort.hpp"
  18. #include "eclhelper.hpp"
  19. #include "thorhelper.hpp"
  20. #include "thorxmlwrite.hpp"
  21. class THORHELPER_API CSizingSerializer : implements IRowSerializerTarget
  22. {
  23. size32_t totalsize;
  24. public:
  25. inline CSizingSerializer() { reset(); }
  26. inline void reset() { totalsize = 0; }
  27. inline size32_t size() { return totalsize; }
  28. void put(size32_t len, const void * ptr);
  29. size32_t beginNested();
  30. void endNested(size32_t position);
  31. };
  32. class THORHELPER_API CMemoryRowSerializer: implements IRowSerializerTarget
  33. {
  34. MemoryBuffer & buffer;
  35. unsigned nesting;
  36. public:
  37. inline CMemoryRowSerializer(MemoryBuffer & _buffer)
  38. : buffer(_buffer)
  39. {
  40. nesting = 0;
  41. }
  42. void put(size32_t len, const void * ptr);
  43. size32_t beginNested();
  44. void endNested(size32_t sizePos);
  45. };
  46. // useful package
  47. interface IRowInterfaces: extends IInterface
  48. {
  49. virtual IEngineRowAllocator * queryRowAllocator()=0;
  50. virtual IOutputRowSerializer * queryRowSerializer()=0;
  51. virtual IOutputRowDeserializer * queryRowDeserializer()=0;
  52. virtual IOutputMetaData *queryRowMetaData()=0;
  53. virtual unsigned queryActivityId()=0;
  54. virtual ICodeContext *queryCodeContext()=0;
  55. };
  56. extern THORHELPER_API void useMemoryMappedRead(bool on);
  57. extern THORHELPER_API IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context);
  58. enum RowReaderWriterFlags
  59. {
  60. rw_grouped = 0x1,
  61. rw_crc = 0x2,
  62. rw_extend = 0x4,
  63. rw_compress = 0x8,
  64. rw_compressblkcrc = 0x10, // block compression, this sets/checks crc's at block level
  65. rw_fastlz = 0x20, // if rw_compress
  66. rw_autoflush = 0x40,
  67. rw_buffered = 0x80
  68. };
  69. #define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
  70. inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
  71. interface IExtRowStream: extends IRowStream
  72. {
  73. virtual offset_t getOffset() = 0;
  74. virtual void stop(CRC32 *crcout=NULL) = 0;
  75. virtual const void *prefetchRow(size32_t *sz=NULL) = 0;
  76. virtual void prefetchDone() = 0;
  77. virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
  78. };
  79. interface IExtRowWriter: extends IRowWriter
  80. {
  81. virtual offset_t getPosition() = 0;
  82. virtual void flush(CRC32 *crcout=NULL) = 0;
  83. };
  84. interface IExpander;
  85. extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowif, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
  86. extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
  87. interface ICompressor;
  88. extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=NULL);
  89. extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *fileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS);
  90. extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS); // strm should be unbuffered
  91. interface THORHELPER_API IDiskMerger : extends IInterface
  92. {
  93. virtual void put(const void **rows, unsigned numrows) = 0;
  94. virtual void putIndirect(const void ***rowptrs, unsigned numrows) = 0; // like put only with an additional dereference, i.e. row i is *(rowptrs[i])
  95. virtual void put(ISortedRowProvider * rows) = 0;
  96. virtual IRowStream *merge(ICompare *icompare,bool partdedup=false) = 0;
  97. virtual count_t mergeTo(IRowWriter *dest,ICompare *icompare,bool partdedup=false) = 0; // alternative to merge
  98. virtual IRowWriter *createWriteBlock() = 0;
  99. };
  100. extern THORHELPER_API IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase);
  101. extern THORHELPER_API void testDiskSort();
  102. #define TIME_ACTIVITIES
  103. interface IActivityTimer : extends IInterface
  104. {
  105. virtual unsigned __int64 getCyclesAdjustment() const = 0;
  106. };
  107. #ifdef TIME_ACTIVITIES
  108. #include "jdebug.hpp"
  109. class ActivityTimer
  110. {
  111. unsigned __int64 startCycles;
  112. unsigned __int64 &accumulator;
  113. protected:
  114. const bool &enabled;
  115. IActivityTimer *iActivityTimer;
  116. public:
  117. inline ActivityTimer(unsigned __int64 &_accumulator, const bool &_enabled, IActivityTimer *_iActivityTimer) : accumulator(_accumulator), enabled(_enabled), iActivityTimer(_iActivityTimer)
  118. {
  119. if (enabled)
  120. {
  121. startCycles = get_cycles_now();
  122. if (iActivityTimer)
  123. startCycles -= iActivityTimer->getCyclesAdjustment();
  124. }
  125. }
  126. inline ~ActivityTimer()
  127. {
  128. if (enabled)
  129. {
  130. unsigned __int64 elapsedCycles = get_cycles_now() - startCycles;
  131. if (iActivityTimer)
  132. elapsedCycles -= iActivityTimer->getCyclesAdjustment();
  133. accumulator += elapsedCycles;
  134. }
  135. }
  136. };
  137. #else
  138. struct ActivityTimer
  139. {
  140. inline ActivityTimer(unsigned __int64 &_accumulator, const bool &_enabled, IActivityTimer *_iActivityTimer) { }
  141. };
  142. #endif
  143. class THORHELPER_API IndirectCodeContext : implements ICodeContext
  144. {
  145. public:
  146. IndirectCodeContext(ICodeContext * _ctx = NULL) : ctx(_ctx) {}
  147. void set(ICodeContext * _ctx) { ctx = _ctx; }
  148. virtual const char *loadResource(unsigned id)
  149. {
  150. return ctx->loadResource(id);
  151. }
  152. virtual void setResultBool(const char *name, unsigned sequence, bool value)
  153. {
  154. ctx->setResultBool(name, sequence, value);
  155. }
  156. virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
  157. {
  158. ctx->setResultData(name, sequence, len, data);
  159. }
  160. virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val)
  161. {
  162. ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val);
  163. }
  164. virtual void setResultInt(const char *name, unsigned sequence, __int64 value)
  165. {
  166. ctx->setResultInt(name, sequence, value);
  167. }
  168. virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
  169. {
  170. ctx->setResultRaw(name, sequence, len, data);
  171. }
  172. virtual void setResultReal(const char * stepname, unsigned sequence, double value)
  173. {
  174. ctx->setResultReal(stepname, sequence, value);
  175. }
  176. virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
  177. {
  178. ctx->setResultSet(name, sequence, isAll, len, data, transformer);
  179. }
  180. virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
  181. {
  182. ctx->setResultString(name, sequence, len, str);
  183. }
  184. virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value)
  185. {
  186. ctx->setResultUInt(name, sequence, value);
  187. }
  188. virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
  189. {
  190. ctx->setResultUnicode(name, sequence, len, str);
  191. }
  192. virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
  193. {
  194. ctx->setResultVarString(name, sequence, value);
  195. }
  196. virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
  197. {
  198. ctx->setResultVarUnicode(name, sequence, value);
  199. }
  200. virtual bool getResultBool(const char * name, unsigned sequence)
  201. {
  202. return ctx->getResultBool(name, sequence);
  203. }
  204. virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
  205. {
  206. ctx->getResultData(tlen, tgt, name, sequence);
  207. }
  208. virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
  209. {
  210. ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
  211. }
  212. virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  213. {
  214. ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
  215. }
  216. virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  217. {
  218. ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
  219. }
  220. virtual __int64 getResultInt(const char * name, unsigned sequence)
  221. {
  222. return ctx->getResultInt(name, sequence);
  223. }
  224. virtual double getResultReal(const char * name, unsigned sequence)
  225. {
  226. return ctx->getResultReal(name, sequence);
  227. }
  228. virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
  229. {
  230. ctx->getResultString(tlen, tgt, name, sequence);
  231. }
  232. virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
  233. {
  234. ctx->getResultStringF(tlen, tgt, name, sequence);
  235. }
  236. virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
  237. {
  238. ctx->getResultUnicode(tlen, tgt, name, sequence);
  239. }
  240. virtual char *getResultVarString(const char * name, unsigned sequence)
  241. {
  242. return ctx->getResultVarString(name, sequence);
  243. }
  244. virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
  245. {
  246. return ctx->getResultVarUnicode(name, sequence);
  247. }
  248. virtual unsigned getResultHash(const char * name, unsigned sequence)
  249. {
  250. return ctx->getResultHash(name, sequence);
  251. }
  252. virtual char *getWuid()
  253. {
  254. return ctx->getWuid();
  255. }
  256. virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  257. {
  258. ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer);
  259. }
  260. virtual char *getDaliServers()
  261. {
  262. return ctx->getDaliServers();
  263. }
  264. virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
  265. {
  266. ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract);
  267. }
  268. virtual __int64 countDiskFile(const char * lfn, unsigned recordSize)
  269. {
  270. return ctx->countDiskFile(lfn, recordSize);
  271. }
  272. virtual __int64 countIndex(__int64 activityId, IHThorCountIndexArg & arg)
  273. {
  274. return ctx->countIndex(activityId, arg);
  275. }
  276. virtual __int64 countDiskFile(__int64 activityId, IHThorCountFileArg & arg)
  277. {
  278. return ctx->countDiskFile(activityId, arg);
  279. }
  280. virtual char * getExpandLogicalName(const char * logicalName)
  281. {
  282. return ctx->getExpandLogicalName(logicalName);
  283. }
  284. virtual void addWuException(const char * text, unsigned code, unsigned severity)
  285. {
  286. ctx->addWuException(text, code, severity);
  287. }
  288. virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
  289. {
  290. ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort);
  291. }
  292. virtual IUserDescriptor *queryUserDescriptor()
  293. {
  294. return ctx->queryUserDescriptor();
  295. }
  296. virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
  297. {
  298. return ctx->resolveChildQuery(activityId, colocal);
  299. }
  300. virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)
  301. {
  302. return ctx->getDatasetHash(name, hash);
  303. }
  304. virtual unsigned getRecoveringCount()
  305. {
  306. return ctx->getRecoveringCount();
  307. }
  308. virtual unsigned getNodes()
  309. {
  310. return ctx->getNodes();
  311. }
  312. virtual unsigned getNodeNum()
  313. {
  314. return ctx->getNodeNum();
  315. }
  316. virtual char *getFilePart(const char *logicalPart, bool create)
  317. {
  318. return ctx->getFilePart(logicalPart, create);
  319. }
  320. virtual unsigned __int64 getFileOffset(const char *logicalPart)
  321. {
  322. return ctx->getFileOffset(logicalPart);
  323. }
  324. virtual IDistributedFileTransaction *querySuperFileTransaction()
  325. {
  326. return ctx->querySuperFileTransaction();
  327. }
  328. virtual char *getEnv(const char *name, const char *defaultValue) const
  329. {
  330. return ctx->getEnv(name, defaultValue);
  331. }
  332. virtual char *getJobName()
  333. {
  334. return ctx->getJobName();
  335. }
  336. virtual char *getJobOwner()
  337. {
  338. return ctx->getJobOwner();
  339. }
  340. virtual char *getClusterName()
  341. {
  342. return ctx->getClusterName();
  343. }
  344. virtual char *getGroupName()
  345. {
  346. return ctx->getGroupName();
  347. }
  348. virtual char * queryIndexMetaData(char const * lfn, char const * xpath)
  349. {
  350. return ctx->queryIndexMetaData(lfn, xpath);
  351. }
  352. virtual unsigned getPriority() const
  353. {
  354. return ctx->getPriority();
  355. }
  356. virtual char *getPlatform()
  357. {
  358. return ctx->getPlatform();
  359. }
  360. virtual char *getOS()
  361. {
  362. return ctx->getOS();
  363. }
  364. virtual ILocalGraph * resolveLocalQuery(__int64 activityId)
  365. {
  366. return ctx->resolveLocalQuery(activityId);
  367. }
  368. virtual char *getEnv(const char *name, const char *defaultValue)
  369. {
  370. return ctx->getEnv(name, defaultValue);
  371. }
  372. virtual unsigned logString(const char *text) const
  373. {
  374. return ctx->logString(text);
  375. }
  376. virtual const IContextLogger &queryContextLogger() const
  377. {
  378. return ctx->queryContextLogger();
  379. }
  380. virtual IDebuggableContext *queryDebugContext() const
  381. {
  382. return ctx->queryDebugContext();
  383. }
  384. virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
  385. {
  386. return ctx->getRowAllocator(meta, activityId);
  387. }
  388. virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
  389. {
  390. ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, deserializer, isGrouped, xmlTransformer, csvTransformer);
  391. }
  392. virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
  393. {
  394. convertRowToXML(lenResult, result, info, row, flags);
  395. }
  396. virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
  397. {
  398. return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
  399. }
  400. protected:
  401. ICodeContext * ctx;
  402. };
  403. extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
  404. extern THORHELPER_API bool isActivitySource(ThorActivityKind kind);
  405. extern THORHELPER_API const char * getActivityText(ThorActivityKind kind);
  406. #endif // THORHELPER_HPP