thorcommon.ipp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef THORCOMMON_IPP
  14. #define THORCOMMON_IPP
  15. #include <type_traits>
  16. #include "eclrtl.hpp"
  17. #include "thorcommon.hpp"
  18. #include "jsort.hpp"
  19. #include "rtlds_imp.hpp"
  20. #include "jfile.hpp"
  21. #ifdef THORHELPER_EXPORTS
  22. #define THORHELPER_API DECL_EXPORT
  23. #else
  24. #define THORHELPER_API DECL_IMPORT
  25. #endif
  26. //------------------------------------------------------------------------------------------------
  27. //An inline caching version of an IOutputMetaDataEx, which hides the backward compatibility issues, and caches
  28. //fixed record information (similar to CachedRecordSize) for efficient access.
  29. class THORHELPER_API CachedOutputMetaData
  30. {
  31. public:
  32. explicit inline CachedOutputMetaData(IOutputMetaData * _rs = NULL) { set(_rs); }
  33. inline void set(IOutputMetaData * _meta)
  34. {
  35. meta.set(_meta);
  36. if (_meta)
  37. {
  38. fixedSize = _meta->getFixedSize();
  39. minSize = _meta->getMinRecordSize();
  40. initialSize = minSize;
  41. metaFlags = meta->getMetaFlags();
  42. }
  43. else
  44. {
  45. initialSize = 0;
  46. fixedSize = 0;
  47. metaFlags = 0;
  48. minSize = 0;
  49. }
  50. }
  51. //extra helpers
  52. inline size32_t getInitialSize() const { return initialSize; }
  53. inline size32_t getMinRecordSize() const { return minSize; }
  54. inline bool isFixedSize() const { return (fixedSize != 0); }
  55. inline bool isVariableSize() const { return (fixedSize == 0); }
  56. //IRecordSize
  57. inline size32_t getRecordSize(const void *rec) const { return fixedSize ? fixedSize : meta->getRecordSize(rec); }
  58. inline size32_t getFixedSize() const { return fixedSize; }
  59. //IOutputMetaData
  60. inline bool isGrouped() const { return (metaFlags & MDFgrouped) != 0; }
  61. inline void toXML(const byte * self, IXmlWriter & out) { meta->toXML(self, out); }
  62. inline bool hasXML() const { return (metaFlags & MDFhasxml) != 0; }
  63. //v1 member functions (can be called on any interface)
  64. inline unsigned getMetaFlags() const { return metaFlags; }
  65. inline bool needsDestruct() const { return (metaFlags & MDFneeddestruct) != 0; }
  66. inline bool needsSerializeDisk() const { return (metaFlags & MDFneedserializedisk) != 0; }
  67. inline void destruct(byte * self)
  68. {
  69. if (metaFlags & MDFneeddestruct)
  70. meta->destruct(self);
  71. }
  72. IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId) const;
  73. IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId) const;
  74. IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) const;
  75. IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) const;
  76. inline IOutputMetaData * querySerializedDiskMeta() const
  77. {
  78. if (metaFlags & MDFneedserializedisk)
  79. return meta->querySerializedDiskMeta();
  80. return meta;
  81. }
  82. inline IOutputMetaData * queryChildMeta(unsigned i) const
  83. {
  84. return meta->queryChildMeta(i);
  85. }
  86. inline const RtlRecord &queryRecordAccessor(bool expand) const
  87. {
  88. return meta->queryRecordAccessor(expand);
  89. }
  90. const RtlTypeInfo * queryTypeInfo() const { return meta->queryTypeInfo(); }
  91. //cast operators.
  92. inline IOutputMetaData * queryOriginal() const { return meta; }
  93. inline operator IRecordSize * () const { return meta; }
  94. inline operator IOutputMetaData * () const { return meta; }
  95. private:
  96. Owned<IOutputMetaData> meta;
  97. size32_t fixedSize;
  98. size32_t initialSize;
  99. size32_t minSize;
  100. unsigned metaFlags;
  101. };
  102. template<class T> class CClassMeta : implements CInterfaceOf<IOutputMetaData>
  103. {
  104. public:
  105. virtual size32_t getRecordSize(const void *rec) { return sizeof(T); }
  106. virtual size32_t getMinRecordSize() const { return sizeof(T); }
  107. virtual size32_t getFixedSize() const { return sizeof(T); }
  108. virtual void toXML(const byte * self, IXmlWriter & out) { }
  109. virtual unsigned getVersion() const { return OUTPUTMETADATA_VERSION; }
  110. virtual unsigned getMetaFlags() { return std::is_pod<T>() ? 0 : MDFneeddestruct; }
  111. virtual const RtlTypeInfo * queryTypeInfo() const { return nullptr; }
  112. virtual void destruct(byte * self) { reinterpret_cast<T *>(self)->~T(); }
  113. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  114. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  115. virtual ISourceRowPrefetcher * createDiskPrefetcher() { return NULL; }
  116. virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
  117. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  118. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  119. virtual void process(const byte * self, IFieldProcessor & target, unsigned from, unsigned to) {}
  120. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
  121. virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
  122. virtual const RtlRecord &queryRecordAccessor(bool expand) const { throwUnexpected(); } // could provide a static implementation if needed
  123. };
  124. //------------------------------------------------------------------------------------------------
  125. //This class is only ever used to apply a delta to a self pointer, it is never finalized, and the builder must stay alive.
  126. class THORHELPER_API CPrefixedRowBuilder : implements RtlRowBuilderBase
  127. {
  128. public:
  129. inline CPrefixedRowBuilder(size32_t _offset, ARowBuilder & _builder) : offset(_offset), builder(_builder)
  130. {
  131. self = builder.getSelf()+offset;
  132. }
  133. virtual byte * ensureCapacity(size32_t required, const char * fieldName)
  134. {
  135. self = builder.ensureCapacity(offset+required, fieldName) + offset;
  136. return getSelf();
  137. }
  138. virtual byte * createSelf()
  139. {
  140. self = builder.getSelf()+offset;
  141. return self;
  142. }
  143. virtual IEngineRowAllocator *queryAllocator() const
  144. {
  145. return builder.queryAllocator();
  146. }
  147. protected:
  148. size32_t offset;
  149. ARowBuilder & builder;
  150. };
  151. //------------------------------------------------------------------------------------------------
  152. class THORHELPER_API AggregateRowBuilder : public RtlDynamicRowBuilder
  153. {
  154. public:
  155. void Link() const;
  156. bool Release() const;
  157. AggregateRowBuilder(IEngineRowAllocator *_rowAllocator, unsigned _elementHash)
  158. : RtlDynamicRowBuilder(_rowAllocator, true), elementHash(_elementHash)
  159. {
  160. size = 0;
  161. }
  162. inline unsigned queryHash() const
  163. {
  164. return elementHash;
  165. }
  166. inline void setSize(size32_t _newSize)
  167. {
  168. size = _newSize;
  169. }
  170. inline const void *finalizeRowClear()
  171. {
  172. return RtlDynamicRowBuilder::finalizeRowClear(size);
  173. }
  174. inline size32_t querySize() const
  175. {
  176. return size;
  177. }
  178. private:
  179. unsigned elementHash;
  180. size32_t size;
  181. };
  182. class THORHELPER_API RowAggregator : private SuperHashTable
  183. {
  184. // We have to be careful with ownership of the items in the hashtable. Because we free them as we iterate, we need to make sure
  185. // that we always do exactly one iteration through the hash table. We therefore DON'T free anything in onRemove.
  186. public:
  187. RowAggregator(IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper);
  188. ~RowAggregator();
  189. IMPLEMENT_IINTERFACE
  190. void reset();
  191. void start(IEngineRowAllocator *rowAllocator, ICodeContext *ctx, unsigned activityId);
  192. AggregateRowBuilder &addRow(const void * row);
  193. void mergeElement(const void * otherElement);
  194. AggregateRowBuilder *nextResult();
  195. unsigned elementCount() const { return count(); }
  196. memsize_t queryMem() const { return SuperHashTable::queryMem() + totalSize + overhead; };
  197. protected:
  198. virtual void onAdd(void *et) {}
  199. virtual void onRemove(void *et) {}
  200. virtual unsigned getHashFromElement(const void *et) const { return hashFromElement(et); }
  201. virtual unsigned getHashFromFindParam(const void *fp) const { return hasher->hash(fp); }
  202. virtual const void * getFindParam(const void *et) const;
  203. virtual bool matchesFindParam(const void *et, const void *key, unsigned fphash) const;
  204. virtual bool matchesElement(const void *et, const void * searchET) const;
  205. private:
  206. void releaseAll(void); // No implementation.
  207. inline unsigned hashFromElement(const void * et) const
  208. {
  209. const AggregateRowBuilder *rb = static_cast<const AggregateRowBuilder*>(et);
  210. return rb->queryHash();
  211. }
  212. IHash * hasher;
  213. IHash * elementHasher;
  214. ICompare * comparer;
  215. ICompare * elementComparer;
  216. IHThorRowAggregator & helper;
  217. const void * cursor;
  218. bool eof;
  219. Owned<IEngineRowAllocator> rowAllocator;
  220. Owned<IEngineRowAllocator> rowBuilderAllocator;
  221. memsize_t totalSize, overhead;
  222. };
  223. //------------------------------------------------------------------------------------------------
  224. class THORHELPER_API CPrefixedRowSerializer : implements IOutputRowSerializer, public CInterface
  225. {
  226. public:
  227. IMPLEMENT_IINTERFACE;
  228. CPrefixedRowSerializer(size32_t _offset, IOutputRowSerializer *_original) : offset(_offset), original(_original)
  229. {
  230. }
  231. virtual void serialize(IRowSerializerTarget & out, const byte * self)
  232. {
  233. out.put(offset, self);
  234. original->serialize(out, self+offset);
  235. }
  236. protected:
  237. size32_t offset;
  238. Owned<IOutputRowSerializer> original;
  239. };
  240. class THORHELPER_API CPrefixedRowDeserializer : implements IOutputRowDeserializer, public CInterface
  241. {
  242. public:
  243. IMPLEMENT_IINTERFACE;
  244. CPrefixedRowDeserializer(size32_t _offset, IOutputRowDeserializer *_original) : offset(_offset), original(_original)
  245. {
  246. }
  247. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource & in)
  248. {
  249. in.read(offset, rowBuilder.getSelf());
  250. //Need to apply a delta to the self return from getSelf()
  251. CPrefixedRowBuilder prefixedBuilder(offset, rowBuilder);
  252. return original->deserialize(prefixedBuilder, in)+offset;
  253. }
  254. protected:
  255. size32_t offset;
  256. Owned<IOutputRowDeserializer> original;
  257. };
  258. class THORHELPER_API CPrefixedRowPrefetcher : implements ISourceRowPrefetcher, public CInterface
  259. {
  260. public:
  261. IMPLEMENT_IINTERFACE;
  262. CPrefixedRowPrefetcher(size32_t _offset, ISourceRowPrefetcher *_original) : offset(_offset), original(_original)
  263. {
  264. }
  265. virtual void readAhead(IRowPrefetcherSource & in)
  266. {
  267. in.skip(offset);
  268. original->readAhead(in);
  269. }
  270. protected:
  271. size32_t offset;
  272. Owned<ISourceRowPrefetcher> original;
  273. };
  274. class THORHELPER_API CPrefixedOutputMeta : implements IOutputMetaData, public CInterface
  275. {
  276. public:
  277. IMPLEMENT_IINTERFACE;
  278. CPrefixedOutputMeta(size32_t _offset, IOutputMetaData *_original)
  279. {
  280. offset = _offset;
  281. original = _original;
  282. IOutputMetaData * originalSerialized = _original->querySerializedDiskMeta();
  283. if (originalSerialized != original)
  284. serializedMeta.setown(new CPrefixedOutputMeta(_offset, originalSerialized));
  285. }
  286. virtual size32_t getRecordSize(const void *rec)
  287. {
  288. if (rec)
  289. rec = ((const byte *) rec) + offset;
  290. return original->getRecordSize(rec) + offset;
  291. }
  292. virtual size32_t getMinRecordSize() const
  293. {
  294. return original->getMinRecordSize() + offset;
  295. }
  296. virtual size32_t getFixedSize() const
  297. {
  298. size32_t ret = original->getFixedSize();
  299. if (ret)
  300. ret += offset;
  301. return ret;
  302. }
  303. virtual void toXML(const byte * self, IXmlWriter & out) { original->toXML(self+offset, out); }
  304. virtual unsigned getVersion() const { return original->getVersion(); }
  305. virtual const RtlTypeInfo * queryTypeInfo() const { return nullptr; }
  306. virtual unsigned getMetaFlags() { return original->getMetaFlags(); }
  307. virtual void destruct(byte * self) { original->destruct(self+offset); }
  308. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId)
  309. {
  310. return new CPrefixedRowSerializer(offset, original->createDiskSerializer(ctx, activityId));
  311. }
  312. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId)
  313. {
  314. return new CPrefixedRowDeserializer(offset, original->createDiskDeserializer(ctx, activityId));
  315. }
  316. virtual ISourceRowPrefetcher * createDiskPrefetcher()
  317. {
  318. return new CPrefixedRowPrefetcher(offset, original->createDiskPrefetcher());
  319. }
  320. virtual IOutputMetaData * querySerializedDiskMeta()
  321. {
  322. if (serializedMeta)
  323. return serializedMeta.get();
  324. return this;
  325. }
  326. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId)
  327. {
  328. return new CPrefixedRowSerializer(offset, original->createInternalSerializer(ctx, activityId));
  329. }
  330. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId)
  331. {
  332. return new CPrefixedRowDeserializer(offset, original->createInternalDeserializer(ctx, activityId));
  333. }
  334. virtual void process(const byte * self, IFieldProcessor & target, unsigned from, unsigned to)
  335. {
  336. }
  337. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor)
  338. {
  339. original->walkIndirectMembers(self+offset, visitor);
  340. }
  341. virtual IOutputMetaData * queryChildMeta(unsigned i)
  342. {
  343. return original->queryChildMeta(i);
  344. }
  345. virtual const RtlRecord &queryRecordAccessor(bool expand) const
  346. {
  347. UNIMPLEMENTED; // If needed we could implement a version of RtlRecord that added/subtracted offset as needed
  348. }
  349. protected:
  350. size32_t offset;
  351. IOutputMetaData *original;
  352. Owned<IOutputMetaData> serializedMeta;
  353. };
  354. //------------------------------------------------------------------------------------------------
  355. class THORHELPER_API CSuffixedRowSerializer : implements IOutputRowSerializer, public CInterface
  356. {
  357. public:
  358. IMPLEMENT_IINTERFACE;
  359. CSuffixedRowSerializer(size32_t _offset, IOutputRowSerializer *_original) : offset(_offset), original(_original)
  360. {
  361. }
  362. virtual void serialize(IRowSerializerTarget & out, const byte * self)
  363. {
  364. original->serialize(out, self+offset);
  365. out.put(offset, self);
  366. }
  367. protected:
  368. size32_t offset;
  369. Owned<IOutputRowSerializer> original;
  370. };
  371. class THORHELPER_API CSuffixedRowDeserializer : implements IOutputRowDeserializer, public CInterface
  372. {
  373. public:
  374. IMPLEMENT_IINTERFACE;
  375. CSuffixedRowDeserializer(size32_t _offset, IOutputRowDeserializer *_original) : offset(_offset), original(_original)
  376. {
  377. }
  378. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource & in)
  379. {
  380. size32_t size = original->deserialize(rowBuilder, in);
  381. in.read(offset, rowBuilder.getSelf()+size);
  382. return size+offset;
  383. }
  384. protected:
  385. size32_t offset;
  386. Owned<IOutputRowDeserializer> original;
  387. };
  388. class THORHELPER_API CSuffixedRowPrefetcher : implements ISourceRowPrefetcher, public CInterface
  389. {
  390. public:
  391. IMPLEMENT_IINTERFACE;
  392. CSuffixedRowPrefetcher(size32_t _offset, ISourceRowPrefetcher *_original) : offset(_offset), original(_original)
  393. {
  394. }
  395. virtual void readAhead(IRowPrefetcherSource & in)
  396. {
  397. original->readAhead(in);
  398. in.skip(offset);
  399. }
  400. protected:
  401. size32_t offset;
  402. Owned<ISourceRowPrefetcher> original;
  403. };
  404. class THORHELPER_API CSuffixedOutputMeta : implements IOutputMetaData, public CInterface
  405. {
  406. public:
  407. IMPLEMENT_IINTERFACE;
  408. CSuffixedOutputMeta(size32_t _offset, IOutputMetaData *_original) : original(_original)
  409. {
  410. offset = _offset;
  411. IOutputMetaData * originalSerialized = _original->querySerializedDiskMeta();
  412. if (originalSerialized != original)
  413. serializedMeta.setown(new CSuffixedOutputMeta(_offset, originalSerialized));
  414. }
  415. virtual size32_t getRecordSize(const void *rec)
  416. {
  417. return original->getRecordSize(rec) + offset;
  418. }
  419. virtual size32_t getMinRecordSize() const
  420. {
  421. return original->getMinRecordSize() + offset;
  422. }
  423. virtual size32_t getFixedSize() const
  424. {
  425. size32_t ret = original->getFixedSize();
  426. if (ret)
  427. ret += offset;
  428. return ret;
  429. }
  430. virtual void toXML(const byte * self, IXmlWriter & out) { original->toXML(self, out); }
  431. virtual unsigned getVersion() const { return original->getVersion(); }
  432. virtual const RtlTypeInfo * queryTypeInfo() const { return nullptr; }
  433. virtual unsigned getMetaFlags() { return original->getMetaFlags(); }
  434. virtual void destruct(byte * self) { original->destruct(self); }
  435. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId)
  436. {
  437. return new CSuffixedRowSerializer(offset, original->createDiskSerializer(ctx, activityId));
  438. }
  439. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId)
  440. {
  441. return new CSuffixedRowDeserializer(offset, original->createDiskDeserializer(ctx, activityId));
  442. }
  443. virtual ISourceRowPrefetcher * createDiskPrefetcher()
  444. {
  445. return new CSuffixedRowPrefetcher(offset, original->createDiskPrefetcher());
  446. }
  447. virtual IOutputMetaData * querySerializedDiskMeta()
  448. {
  449. if (serializedMeta)
  450. return serializedMeta.get();
  451. return this;
  452. }
  453. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId)
  454. {
  455. return new CSuffixedRowSerializer(offset, original->createInternalSerializer(ctx, activityId));
  456. }
  457. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId)
  458. {
  459. return new CSuffixedRowDeserializer(offset, original->createInternalDeserializer(ctx, activityId));
  460. }
  461. virtual void process(const byte * self, IFieldProcessor & target, unsigned from, unsigned to)
  462. {
  463. }
  464. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor)
  465. {
  466. original->walkIndirectMembers(self, visitor);
  467. }
  468. virtual IOutputMetaData * queryChildMeta(unsigned i)
  469. {
  470. return original->queryChildMeta(i);
  471. }
  472. virtual const RtlRecord &queryRecordAccessor(bool expand) const override { return original->queryRecordAccessor(expand); }
  473. protected:
  474. size32_t offset;
  475. Linked<IOutputMetaData> original;
  476. Owned<IOutputMetaData> serializedMeta;
  477. };
  478. //------------------------------------------------------------------------------------------------
  479. struct SmartStepExtra;
  480. class THORHELPER_API CStreamMerger : public CInterface
  481. {
  482. public:
  483. CStreamMerger(bool pullConsumes);
  484. ~CStreamMerger();
  485. void cleanup();
  486. void init(ICompare * _compare, bool _dedup, IRangeCompare * _rangeCompare);
  487. void initInputs(unsigned _numInputs);
  488. inline void done() { cleanup(); }
  489. const void * nextRow();
  490. const void * nextRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra);
  491. void primeRows(const void * * rows);
  492. const void * queryNextRow(); // look ahead at the next item
  493. unsigned queryNextInput(); // which input will the next item come from?
  494. inline void reset() { clearPending(); }
  495. void skipRow();
  496. protected:
  497. //The following function must fill in pending[i] AND pendingMatches[i]
  498. virtual bool pullInput(unsigned i, const void * seek, unsigned numFields, const SmartStepExtra * stepExtra) = 0;
  499. virtual void skipInput(unsigned i);
  500. virtual void consumeInput(unsigned i);
  501. virtual void releaseRow(const void * row) = 0;
  502. bool ensureNext();
  503. void permute();
  504. private:
  505. const void * consumeTop();
  506. bool ensureNext(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra * stepExtra);
  507. void clearPending();
  508. void fillheap(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra);
  509. void permute(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra);
  510. bool promote(unsigned p);
  511. bool siftDown(unsigned p) { return heap_push_down(p, activeInputs, mergeheap, pending, compare); }
  512. void siftDownDedupTop(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra);
  513. protected:
  514. unsigned *mergeheap;
  515. unsigned activeInputs;
  516. unsigned numInputs;
  517. const void **pending;
  518. bool *pendingMatches;
  519. ICompare *compare;
  520. IRangeCompare * rangeCompare;
  521. bool first;
  522. bool dedup;
  523. bool pullConsumes; // true if pull consumes from input, and takes ownership, false if pullInput just looks ahead
  524. };
  525. class THORHELPER_API CRawRowSerializer : implements IRowSerializerTarget
  526. {
  527. public:
  528. CRawRowSerializer(size32_t _len, byte *_buf)
  529. : buffer(_buf), maxSize(_len)
  530. {
  531. pos = 0;
  532. nesting = 0;
  533. }
  534. virtual void put(size32_t len, const void * ptr)
  535. {
  536. assertex(pos+len <= maxSize);
  537. memcpy_iflen(buffer+pos, ptr, len);
  538. pos += len;
  539. }
  540. virtual size32_t beginNested(size32_t count)
  541. {
  542. nesting++;
  543. size32_t ret = pos;
  544. size32_t placeholder = 0;
  545. put(sizeof(placeholder), &placeholder);
  546. return ret;
  547. }
  548. virtual void endNested(size32_t position)
  549. {
  550. * (size32_t *) (buffer+position) = pos - (position + sizeof(size32_t));
  551. nesting--;
  552. }
  553. inline size32_t size() const { return pos; }
  554. protected:
  555. byte *buffer;
  556. unsigned maxSize;
  557. unsigned pos;
  558. unsigned nesting;
  559. };
  560. class THORHELPER_API CThorDemoRowSerializer : implements IRowSerializerTarget
  561. {
  562. public:
  563. CThorDemoRowSerializer(MemoryBuffer & _buffer);
  564. virtual void put(size32_t len, const void * ptr);
  565. virtual size32_t beginNested(size32_t count);
  566. virtual void endNested(size32_t position);
  567. protected:
  568. MemoryBuffer & buffer;
  569. unsigned nesting;
  570. };
  571. class THORHELPER_API CSimpleFixedRowSerializer : implements IOutputRowSerializer, public CInterface
  572. {
  573. public:
  574. CSimpleFixedRowSerializer(size32_t _fixedSize) : fixedSize(_fixedSize) {}
  575. IMPLEMENT_IINTERFACE
  576. virtual void serialize(IRowSerializerTarget & out, const byte * self)
  577. {
  578. out.put(fixedSize, self);
  579. }
  580. protected:
  581. size32_t fixedSize;
  582. };
  583. class THORHELPER_API CSimpleFixedRowDeserializer : implements IOutputRowDeserializer, public CInterface
  584. {
  585. public:
  586. CSimpleFixedRowDeserializer(size32_t _fixedSize) : fixedSize(_fixedSize) {}
  587. IMPLEMENT_IINTERFACE
  588. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource & in)
  589. {
  590. in.read(fixedSize, rowBuilder.getSelf());
  591. return fixedSize;
  592. }
  593. protected:
  594. size32_t fixedSize;
  595. };
  596. class THORHELPER_API CSimpleVariableRowSerializer : implements IOutputRowSerializer, public CInterface
  597. {
  598. public:
  599. CSimpleVariableRowSerializer(const CachedOutputMetaData * _meta) : meta(_meta) {}
  600. IMPLEMENT_IINTERFACE
  601. virtual void serialize(IRowSerializerTarget & out, const byte * self)
  602. {
  603. out.put(meta->getRecordSize(self), self);
  604. }
  605. protected:
  606. const CachedOutputMetaData * meta; // assume lifetime is shorter than this meta
  607. };
  608. //This should never be created in practice - need to use a streamer. Pseudocode below for illustration purposes only
  609. #if 0
  610. class THORHELPER_API CSimpleVariableRowDeserializer : public CInterface, implements IOutputRowDeserializer
  611. {
  612. public:
  613. CSimpleVariableRowDeserializer(CachedOutputMetaData * _meta) : meta(_meta) {}
  614. IMPLEMENT_IINTERFACE
  615. virtual size32_t deserialize(ARowBuilder & rowBuilder, IRowDeserializerSource & in)
  616. {
  617. const byte * next = in.peek(meta->getMaxSize()); // This is wrong! We don't know the maximum size...
  618. size32_t size = meta->getRecordSize(next);
  619. in.read(size, rowBuilder.ensureCapacity(size, NULL));
  620. return size;
  621. }
  622. protected:
  623. CachedOutputMetaData * meta; // assume lifetime is shorter than this meta
  624. };
  625. #endif
  626. class NullDiskCallback : public IThorDiskCallback, extends CInterface
  627. {
  628. IMPLEMENT_IINTERFACE
  629. virtual unsigned __int64 getFilePosition(const void * row) { return 0; }
  630. virtual unsigned __int64 getLocalFilePosition(const void * row) { return 0; }
  631. virtual const char * queryLogicalFilename(const void * row) { return NULL; }
  632. virtual const byte * lookupBlob(unsigned __int64 id) { return nullptr; }
  633. };
  634. extern THORHELPER_API size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta);
  635. //=====================================================================================================
  636. class ChildRowLinkerWalker : implements IIndirectMemberVisitor
  637. {
  638. public:
  639. virtual void visitRowset(size32_t count, const byte * * rows) override
  640. {
  641. rtlLinkRowset(rows);
  642. }
  643. virtual void visitRow(const byte * row) override
  644. {
  645. rtlLinkRow(row);
  646. }
  647. };
  648. #endif // THORCOMMON_IPP