thorcommon.cpp 51 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "jmisc.hpp"
  15. #include "jthread.hpp"
  16. #include "jsocket.hpp"
  17. #include "jprop.hpp"
  18. #include "jdebug.hpp"
  19. #include "jlzw.hpp"
  20. #include "junicode.hpp"
  21. #include "eclhelper.hpp"
  22. #include "thorcommon.ipp"
  23. #include "eclrtl.hpp"
  24. #include "rtlread_imp.hpp"
  25. #include "thorstep.hpp"
  26. #define ROWAGG_PERROWOVERHEAD (sizeof(AggregateRowBuilder))
  27. RowAggregator::RowAggregator(IHThorHashAggregateExtra &_extra, IHThorRowAggregator & _helper) : helper(_helper)
  28. {
  29. comparer = _extra.queryCompareRowElement();
  30. hasher = _extra.queryHash();
  31. elementHasher = _extra.queryHashElement();
  32. elementComparer = _extra.queryCompareElements();
  33. cursor = NULL;
  34. eof = false;
  35. totalSize = overhead = 0;
  36. }
  37. RowAggregator::~RowAggregator()
  38. {
  39. reset();
  40. }
  41. void RowAggregator::start(IEngineRowAllocator *_rowAllocator)
  42. {
  43. rowAllocator.set(_rowAllocator);
  44. }
  45. void RowAggregator::reset()
  46. {
  47. while (!eof)
  48. {
  49. AggregateRowBuilder *n = nextResult();
  50. if (n)
  51. n->Release();
  52. }
  53. SuperHashTable::releaseAll();
  54. eof = false;
  55. cursor = NULL;
  56. rowAllocator.clear();
  57. totalSize = overhead = 0;
  58. }
  59. AggregateRowBuilder &RowAggregator::addRow(const void * row)
  60. {
  61. AggregateRowBuilder *result;
  62. unsigned hash = hasher->hash(row);
  63. void * match = find(hash, row);
  64. if (match)
  65. {
  66. result = static_cast<AggregateRowBuilder *>(match);
  67. totalSize -= result->querySize();
  68. size32_t sz = helper.processNext(*result, row);
  69. result->setSize(sz);
  70. totalSize += sz;
  71. }
  72. else
  73. {
  74. Owned<AggregateRowBuilder> rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
  75. helper.clearAggregate(*rowBuilder);
  76. size32_t sz = helper.processFirst(*rowBuilder, row);
  77. rowBuilder->setSize(sz);
  78. result = rowBuilder.getClear();
  79. addNew(result, hash);
  80. totalSize += sz;
  81. overhead += ROWAGG_PERROWOVERHEAD;
  82. }
  83. return *result;
  84. }
  85. void RowAggregator::mergeElement(const void * otherElement)
  86. {
  87. unsigned hash = elementHasher->hash(otherElement);
  88. void * match = findElement(hash, otherElement);
  89. if (match)
  90. {
  91. AggregateRowBuilder *rowBuilder = static_cast<AggregateRowBuilder *>(match);
  92. totalSize -= rowBuilder->querySize();
  93. size32_t sz = helper.mergeAggregate(*rowBuilder, otherElement);
  94. rowBuilder->setSize(sz);
  95. totalSize += sz;
  96. }
  97. else
  98. {
  99. Owned<AggregateRowBuilder> rowBuilder = new AggregateRowBuilder(rowAllocator, hash);
  100. rowBuilder->setSize(cloneRow(*rowBuilder, otherElement, rowAllocator->queryOutputMeta()));
  101. addNew(rowBuilder.getClear(), hash);
  102. }
  103. }
  104. const void * RowAggregator::getFindParam(const void *et) const
  105. {
  106. // Slightly odd name for this function... it actually gets the comparable element
  107. const AggregateRowBuilder *rb = static_cast<const AggregateRowBuilder*>(et);
  108. return rb->row();
  109. }
  110. bool RowAggregator::matchesFindParam(const void *et, const void *key, unsigned fphash) const
  111. {
  112. if (fphash != hashFromElement(et))
  113. return false;
  114. // et = element in the table (an AggregateRowBuilder) key = new row (in input row layout).
  115. return comparer->docompare(key, getFindParam(et)) == 0;
  116. }
  117. bool RowAggregator::matchesElement(const void *et, const void * searchET) const
  118. {
  119. return elementComparer->docompare(getFindParam(et), searchET) == 0;
  120. }
  121. AggregateRowBuilder *RowAggregator::nextResult()
  122. {
  123. void *ret = next(cursor);
  124. if (!ret)
  125. {
  126. eof = true;
  127. return NULL;
  128. }
  129. cursor = ret;
  130. return static_cast<AggregateRowBuilder *>(ret);
  131. }
  132. //=====================================================================================================
  133. void CStreamMerger::fillheap(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
  134. {
  135. assertex(activeInputs == 0);
  136. for(unsigned i = 0; i < numInputs; i++)
  137. if(pullInput(i, seek, numFields, stepExtra))
  138. mergeheap[activeInputs++] = i;
  139. }
  140. void CStreamMerger::permute(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
  141. {
  142. // the tree structure: element p has children p*2+1 and p*2+2, or element c has parent (unsigned)(c-1)/2
  143. // the heap property: no element should be smaller than its parent
  144. // the dedup variant: if(dedup), the top of the heap should also not be equal to either child
  145. // the method: establish this by starting with the parent of the bottom element and working up to the top element, sifting each down to its correct place
  146. if (activeInputs >= 2)
  147. for(unsigned p = (activeInputs-2)/2; p > 0; --p)
  148. siftDown(p);
  149. if(dedup)
  150. siftDownDedupTop(seek, numFields, stepExtra);
  151. else
  152. siftDown(0);
  153. }
  154. const void * CStreamMerger::consumeTop()
  155. {
  156. unsigned top = mergeheap[0];
  157. if (!pullConsumes)
  158. consumeInput(top);
  159. const void *next = pending[top];
  160. pending[top] = NULL;
  161. return next;
  162. }
  163. bool CStreamMerger::ensureNext(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra * stepExtra)
  164. {
  165. //wasCompleteMatch must be initialised from the actual row returned. (See bug #30388)
  166. if (first)
  167. {
  168. fillheap(seek, numFields, stepExtra);
  169. permute(seek, numFields, stepExtra);
  170. first = false;
  171. if (activeInputs == 0)
  172. return false;
  173. unsigned top = mergeheap[0];
  174. wasCompleteMatch = pendingMatches[top];
  175. return true;
  176. }
  177. while (activeInputs)
  178. {
  179. unsigned top = mergeheap[0];
  180. const void *next = pending[top];
  181. if (next)
  182. {
  183. if (seek)
  184. {
  185. int c = rangeCompare->docompare(next, seek, numFields);
  186. if (c >= 0)
  187. {
  188. if (stepExtra->returnMismatches() && (c > 0))
  189. {
  190. wasCompleteMatch = pendingMatches[top];
  191. return true;
  192. }
  193. else
  194. {
  195. if (pendingMatches[top])
  196. return true;
  197. }
  198. }
  199. }
  200. else
  201. {
  202. if (pendingMatches[top])
  203. return true;
  204. }
  205. skipInput(top);
  206. }
  207. if(!pullInput(top, seek, numFields, stepExtra))
  208. if(!promote(0))
  209. return false;
  210. // we have changed the element at the top of the heap, so need to sift it down to maintain the heap property
  211. if(dedup)
  212. siftDownDedupTop(seek, numFields, stepExtra);
  213. else
  214. siftDown(0);
  215. }
  216. return false;
  217. }
  218. bool CStreamMerger::ensureNext()
  219. {
  220. bool isCompleteMatch = true;
  221. return ensureNext(NULL, 0, isCompleteMatch, NULL);
  222. }
  223. void CStreamMerger::permute()
  224. {
  225. permute(NULL, 0, NULL);
  226. }
  227. bool CStreamMerger::promote(unsigned p)
  228. {
  229. activeInputs--;
  230. if(activeInputs == p)
  231. return false;
  232. mergeheap[p] = mergeheap[activeInputs];
  233. return true;
  234. }
  235. void CStreamMerger::siftDownDedupTop(const void * seek, unsigned numFields, const SmartStepExtra * stepExtra)
  236. {
  237. // same as siftDown(0), except that it also ensures that the top of the heap is not equal to either of its children
  238. if(activeInputs < 2)
  239. return;
  240. unsigned c = 1;
  241. int childcmp = 1;
  242. if(activeInputs >= 3)
  243. {
  244. childcmp = compare->docompare(pending[mergeheap[2]], pending[mergeheap[1]]);
  245. if(childcmp < 0)
  246. c = 2;
  247. }
  248. int cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
  249. if(cmp > 0)
  250. return;
  251. // the following loop ensures the correct property holds on the smaller branch, and that childcmp==0 iff the top matches the other branch
  252. while(cmp <= 0)
  253. {
  254. if(cmp == 0)
  255. {
  256. if(mergeheap[c] < mergeheap[0])
  257. {
  258. unsigned r = mergeheap[c];
  259. mergeheap[c] = mergeheap[0];
  260. mergeheap[0] = r;
  261. }
  262. unsigned top = mergeheap[c];
  263. skipInput(top);
  264. if(!pullInput(top, seek, numFields, stepExtra))
  265. if(!promote(c))
  266. break;
  267. siftDown(c);
  268. }
  269. else
  270. {
  271. unsigned r = mergeheap[c];
  272. mergeheap[c] = mergeheap[0];
  273. mergeheap[0] = r;
  274. if(siftDown(c))
  275. break;
  276. }
  277. cmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
  278. }
  279. // the following loop ensures the uniqueness property holds on the other branch too
  280. c = 3-c;
  281. if(activeInputs <= c)
  282. return;
  283. while(childcmp == 0)
  284. {
  285. if(mergeheap[c] < mergeheap[0])
  286. {
  287. unsigned r = mergeheap[c];
  288. mergeheap[c] = mergeheap[0];
  289. mergeheap[0] = r;
  290. }
  291. unsigned top = mergeheap[c];
  292. skipInput(top);
  293. if(!pullInput(top, seek, numFields, stepExtra))
  294. if(!promote(c))
  295. break;
  296. siftDown(c);
  297. childcmp = compare->docompare(pending[mergeheap[c]], pending[mergeheap[0]]);
  298. }
  299. }
  300. void CStreamMerger::cleanup()
  301. {
  302. clearPending();
  303. delete [] pending;
  304. pending = NULL;
  305. delete [] pendingMatches;
  306. pendingMatches = NULL;
  307. delete [] mergeheap;
  308. mergeheap = NULL;
  309. }
  310. void CStreamMerger::clearPending()
  311. {
  312. if (pending && activeInputs)
  313. {
  314. for(unsigned i = 0; i < numInputs; i++)
  315. {
  316. if (pullConsumes)
  317. releaseRow(pending[i]);
  318. pending[i] = NULL;
  319. }
  320. activeInputs = 0;
  321. }
  322. first = true;
  323. }
  324. CStreamMerger::CStreamMerger(bool _pullConsumes)
  325. {
  326. pending = NULL;
  327. pendingMatches = NULL;
  328. mergeheap = NULL;
  329. compare = NULL;
  330. rangeCompare = NULL;
  331. dedup = false;
  332. activeInputs = 0;
  333. pullConsumes = _pullConsumes;
  334. numInputs = 0;
  335. first = true;
  336. }
  337. CStreamMerger::~CStreamMerger()
  338. {
  339. //can't call cleanup() because virtual releaseRow() won't be defined.
  340. // NOTE: use assert rather than assertex as exceptions from within destructors are not handled well.
  341. assert(!pending && !mergeheap);
  342. }
  343. void CStreamMerger::init(ICompare * _compare, bool _dedup, IRangeCompare * _rangeCompare)
  344. {
  345. compare = _compare;
  346. dedup = _dedup;
  347. rangeCompare = _rangeCompare;
  348. }
  349. void CStreamMerger::initInputs(unsigned _numInputs)
  350. {
  351. assertex(!pending); // cleanup should have been called before reinitializing
  352. numInputs = _numInputs;
  353. mergeheap = new unsigned[numInputs];
  354. pending = new const void *[numInputs];
  355. pendingMatches = new bool [numInputs];
  356. for (unsigned i = 0; i < numInputs; i++)
  357. pending[i] = NULL;
  358. activeInputs = 0;
  359. first = true;
  360. }
  361. void CStreamMerger::consumeInput(unsigned i)
  362. {
  363. //should be over-ridden if pullConsumes is false;
  364. throwUnexpected();
  365. }
  366. void CStreamMerger::skipInput(unsigned i)
  367. {
  368. if (!pullConsumes)
  369. consumeInput(i);
  370. releaseRow(pending[i]);
  371. pending[i] = NULL;
  372. }
  373. void CStreamMerger::primeRows(const void * * rows)
  374. {
  375. assertex(first && (activeInputs == 0));
  376. first = false;
  377. for(unsigned i = 0; i < numInputs; i++)
  378. {
  379. if ((pending[i] = rows[i]) != NULL)
  380. {
  381. mergeheap[activeInputs++] = i;
  382. pendingMatches[i] = true;
  383. }
  384. }
  385. permute();
  386. }
  387. const void * CStreamMerger::nextRow()
  388. {
  389. if (ensureNext())
  390. return consumeTop();
  391. return NULL;
  392. }
  393. const void * CStreamMerger::queryNextRow()
  394. {
  395. if (ensureNext())
  396. return pending[mergeheap[0]];
  397. return NULL;
  398. }
  399. unsigned CStreamMerger::queryNextInput()
  400. {
  401. if (ensureNext())
  402. return mergeheap[0];
  403. return NotFound;
  404. }
  405. const void * CStreamMerger::nextRowGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
  406. {
  407. if (ensureNext(seek, numFields, wasCompleteMatch, &stepExtra))
  408. return consumeTop();
  409. return NULL;
  410. }
  411. void CStreamMerger::skipRow()
  412. {
  413. assertex(!first);
  414. skipInput(mergeheap[0]);
  415. }
  416. //=====================================================================================================
  417. CThorDemoRowSerializer::CThorDemoRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
  418. {
  419. nesting = 0;
  420. }
  421. void CThorDemoRowSerializer::put(size32_t len, const void * ptr)
  422. {
  423. buffer.append(len, ptr);
  424. //ok to flush if nesting == 0;
  425. }
  426. size32_t CThorDemoRowSerializer::beginNested()
  427. {
  428. nesting++;
  429. unsigned pos = buffer.length();
  430. buffer.append((size32_t)0);
  431. return pos;
  432. }
  433. void CThorDemoRowSerializer::endNested(size32_t sizePos)
  434. {
  435. unsigned pos = buffer.length();
  436. buffer.rewrite(sizePos);
  437. buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
  438. buffer.rewrite(pos);
  439. nesting--;
  440. }
  441. IOutputRowSerializer * CachedOutputMetaData::createDiskSerializer(ICodeContext * ctx, unsigned activityId) const
  442. {
  443. if (metaFlags & (MDFhasserialize|MDFneedserializedisk))
  444. return meta->createDiskSerializer(ctx, activityId);
  445. if (isFixedSize())
  446. return new CSimpleFixedRowSerializer(getFixedSize());
  447. return new CSimpleVariableRowSerializer(this);
  448. }
  449. IOutputRowDeserializer * CachedOutputMetaData::createDiskDeserializer(ICodeContext * ctx, unsigned activityId) const
  450. {
  451. if (metaFlags & (MDFhasserialize|MDFneedserializedisk))
  452. return meta->createDiskDeserializer(ctx, activityId);
  453. if (isFixedSize())
  454. return new CSimpleFixedRowDeserializer(getFixedSize());
  455. throwUnexpectedX("createDiskDeserializer variable meta has no serializer");
  456. }
  457. IOutputRowSerializer * CachedOutputMetaData::createInternalSerializer(ICodeContext * ctx, unsigned activityId) const
  458. {
  459. if (metaFlags & (MDFhasserialize|MDFneedserializeinternal))
  460. return meta->createInternalSerializer(ctx, activityId);
  461. if (isFixedSize())
  462. return new CSimpleFixedRowSerializer(getFixedSize());
  463. return new CSimpleVariableRowSerializer(this);
  464. }
  465. IOutputRowDeserializer * CachedOutputMetaData::createInternalDeserializer(ICodeContext * ctx, unsigned activityId) const
  466. {
  467. if (metaFlags & (MDFhasserialize|MDFneedserializeinternal))
  468. return meta->createInternalDeserializer(ctx, activityId);
  469. if (isFixedSize())
  470. return new CSimpleFixedRowDeserializer(getFixedSize());
  471. throwUnexpectedX("createInternalDeserializer variable meta has no serializer");
  472. }
  473. void CSizingSerializer::put(size32_t len, const void * ptr)
  474. {
  475. totalsize += len;
  476. }
  477. size32_t CSizingSerializer::beginNested()
  478. {
  479. totalsize += sizeof(size32_t);
  480. return totalsize;
  481. }
  482. void CSizingSerializer::endNested(size32_t position)
  483. {
  484. }
  485. void CMemoryRowSerializer::put(size32_t len, const void * ptr)
  486. {
  487. buffer.append(len, ptr);
  488. }
  489. size32_t CMemoryRowSerializer::beginNested()
  490. {
  491. nesting++;
  492. unsigned pos = buffer.length();
  493. buffer.append((size32_t)0);
  494. return pos;
  495. }
  496. void CMemoryRowSerializer::endNested(size32_t sizePos)
  497. {
  498. size32_t sz = buffer.length()-(sizePos + sizeof(size32_t));
  499. buffer.writeDirect(sizePos,sizeof(sz),&sz);
  500. nesting--;
  501. }
  502. static void ensureClassesAreNotAbstract()
  503. {
  504. MemoryBuffer temp;
  505. CThorStreamDeserializerSource x1(NULL);
  506. CThorContiguousRowBuffer x2(NULL);
  507. CSizingSerializer x3;
  508. CMemoryRowSerializer x4(temp);
  509. }
  510. //=====================================================================================================
  511. class ChildRowLinkerWalker : implements IIndirectMemberVisitor
  512. {
  513. public:
  514. virtual void visitRowset(size32_t count, byte * * rows)
  515. {
  516. rtlLinkRowset(rows);
  517. }
  518. virtual void visitRow(const byte * row)
  519. {
  520. rtlLinkRow(row);
  521. }
  522. };
  523. //the visitor callback is used to ensure link counts for children are updated.
  524. size32_t cloneRow(ARowBuilder & rowBuilder, const void * row, IOutputMetaData * meta)
  525. {
  526. size32_t rowSize = meta->getRecordSize(row); // TBD could be better?
  527. byte * self = rowBuilder.ensureCapacity(rowSize, NULL);
  528. memcpy(self, row, rowSize);
  529. if (meta->getMetaFlags() & MDFneeddestruct)
  530. {
  531. ChildRowLinkerWalker walker;
  532. meta->walkIndirectMembers(self, walker);
  533. }
  534. return rowSize;
  535. }
  536. //---------------------------------------------------------------------------------------------------
  537. extern const char * getActivityText(ThorActivityKind kind)
  538. {
  539. switch (kind)
  540. {
  541. case TAKnone: return "None";
  542. case TAKdiskwrite: return "Disk Write";
  543. case TAKsort: return "Sort";
  544. case TAKdedup: return "Dedup";
  545. case TAKfilter: return "Filter";
  546. case TAKsplit: return "Split";
  547. case TAKproject: return "Project";
  548. case TAKrollup: return "Rollup";
  549. case TAKiterate: return "Iterate";
  550. case TAKaggregate: return "Aggregate";
  551. case TAKhashaggregate: return "Hash Aggregate";
  552. case TAKfirstn: return "Firstn";
  553. case TAKsample: return "Sample";
  554. case TAKdegroup: return "Degroup";
  555. case TAKjoin: return "Join";
  556. case TAKhashjoin: return "Hash Join";
  557. case TAKlookupjoin: return "Lookup Join";
  558. case TAKselfjoin: return "Self Join";
  559. case TAKkeyedjoin: return "Keyed Join";
  560. case TAKgroup: return "Group";
  561. case TAKworkunitwrite: return "Output";
  562. case TAKfunnel: return "Funnel";
  563. case TAKapply: return "Apply";
  564. case TAKtemptable: return "Inline Dataset";
  565. case TAKinlinetable: return "Inline Dataset";
  566. case TAKtemprow: return "Inline Row";
  567. case TAKhashdistribute: return "Hash Distribute";
  568. case TAKhashdedup: return "Hash Dedup";
  569. case TAKnormalize: return "Normalize";
  570. case TAKremoteresult: return "Remote Result";
  571. case TAKpull: return "Pull";
  572. case TAKdenormalize: return "Denormalize";
  573. case TAKnormalizechild: return "Normalize Child";
  574. case TAKchilddataset: return "Child Dataset";
  575. case TAKselectn: return "Select Nth";
  576. case TAKenth: return "Enth";
  577. case TAKif: return "If";
  578. case TAKnull: return "Null";
  579. case TAKdistribution: return "Distribution";
  580. case TAKcountproject: return "Count Project";
  581. case TAKchoosesets: return "Choose Sets";
  582. case TAKpiperead: return "Pipe Read";
  583. case TAKpipewrite: return "Pipe Write";
  584. case TAKcsvwrite: return "Csv Write";
  585. case TAKpipethrough: return "Pipe Through";
  586. case TAKindexwrite: return "Index Write";
  587. case TAKchoosesetsenth: return "Choose Sets Enth";
  588. case TAKchoosesetslast: return "Choose Sets Last";
  589. case TAKfetch: return "Fetch";
  590. case TAKhashdenormalize: return "Hash Denormalize";
  591. case TAKworkunitread: return "Read";
  592. case TAKthroughaggregate: return "Through Aggregate";
  593. case TAKspill: return "Spill";
  594. case TAKcase: return "Case";
  595. case TAKlimit: return "Limit";
  596. case TAKcsvfetch: return "Csv Fetch";
  597. case TAKxmlwrite: return "Xml Write";
  598. case TAKparse: return "Parse";
  599. case TAKcountdisk: return "Count Disk";
  600. case TAKsideeffect: return "Simple Action";
  601. case TAKtopn: return "Top N";
  602. case TAKmerge: return "Merge";
  603. case TAKxmlfetch: return "Xml Fetch";
  604. case TAKxmlparse: return "Parse Xml";
  605. case TAKkeyeddistribute: return "Keyed Distribute";
  606. case TAKjoinlight: return "Lightweight Join";
  607. case TAKalljoin: return "All Join";
  608. case TAKsoap_rowdataset: return "SOAP dataset";
  609. case TAKsoap_rowaction: return "SOAP action";
  610. case TAKsoap_datasetdataset: return "SOAP dataset";
  611. case TAKsoap_datasetaction: return "SOAP action";
  612. case TAKkeydiff: return "Key Difference";
  613. case TAKkeypatch: return "Key Patch";
  614. case TAKkeyeddenormalize: return "Keyed Denormalize";
  615. case TAKsequential: return "Sequential";
  616. case TAKparallel: return "Parallel";
  617. case TAKchilditerator: return "Child Dataset";
  618. case TAKdatasetresult: return "Dataset Result";
  619. case TAKrowresult: return "Row Result";
  620. case TAKchildif: return "If";
  621. case TAKpartition: return "Partition Distribute";
  622. case TAKsubgraph: return "Sub Graph";
  623. case TAKlocalgraph: return "Local Graph";
  624. case TAKifaction: return "If Action";
  625. case TAKemptyaction: return "Empty Action";
  626. case TAKskiplimit: return "Skip Limit";
  627. case TAKdiskread: return "Disk Read";
  628. case TAKdisknormalize: return "Disk Normalize";
  629. case TAKdiskaggregate: return "Disk Aggregate";
  630. case TAKdiskcount: return "Disk Count";
  631. case TAKdiskgroupaggregate: return "Disk Grouped Aggregate";
  632. case TAKindexread: return "Index Read";
  633. case TAKindexnormalize: return "Index Normalize";
  634. case TAKindexaggregate: return "Index Aggregate";
  635. case TAKindexcount: return "Index Count";
  636. case TAKindexgroupaggregate: return "Index Grouped Aggregate";
  637. case TAKchildnormalize: return "Child Normalize";
  638. case TAKchildaggregate: return "Child Aggregate";
  639. case TAKchildgroupaggregate: return "Child Grouped Aggregate";
  640. case TAKchildthroughnormalize: return "Normalize";
  641. case TAKcsvread: return "Csv Read";
  642. case TAKxmlread: return "Xml Read";
  643. case TAKlocalresultread: return "Read Local Result";
  644. case TAKlocalresultwrite: return "Local Result";
  645. case TAKcombine: return "Combine";
  646. case TAKregroup: return "Regroup";
  647. case TAKrollupgroup: return "Rollup Group";
  648. case TAKcombinegroup: return "Combine Group";
  649. case TAKlookupdenormalize: return "Lookup Denormalize";
  650. case TAKalldenormalize: return "All Denormalize";
  651. case TAKdenormalizegroup: return "Denormalize Group";
  652. case TAKhashdenormalizegroup: return "Hash Denormalize Group";
  653. case TAKlookupdenormalizegroup: return "Lookup Denormalize Group";
  654. case TAKkeyeddenormalizegroup: return "Keyed Denormalize Group";
  655. case TAKalldenormalizegroup: return "All Denormalize Group";
  656. case TAKlocalresultspill: return "Spill Local Result";
  657. case TAKsimpleaction: return "Action";
  658. case TAKloopcount: return "Loop";
  659. case TAKlooprow: return "Loop";
  660. case TAKloopdataset: return "Loop";
  661. case TAKchildcase: return "Case";
  662. case TAKremotegraph: return "Remote";
  663. case TAKlibrarycall: return "Library Call";
  664. case TAKrawiterator: return "Child Dataset";
  665. case TAKlocalstreamread: return "Read Input";
  666. case TAKprocess: return "Process";
  667. case TAKgraphloop: return "Graph";
  668. case TAKparallelgraphloop: return "Graph";
  669. case TAKgraphloopresultread: return "Graph Input";
  670. case TAKgraphloopresultwrite: return "Graph Result";
  671. case TAKgrouped: return "Grouped";
  672. case TAKsorted: return "Sorted";
  673. case TAKdistributed: return "Distributed";
  674. case TAKnwayjoin: return "Join";
  675. case TAKnwaymerge: return "Merge";
  676. case TAKnwaymergejoin: return "Merge Join";
  677. case TAKnwayinput: return "Nway Input";
  678. case TAKnwaygraphloopresultread: return "Nway Graph Input";
  679. case TAKnwayselect: return "Select Nway Input";
  680. case TAKnonempty: return "Non Empty";
  681. case TAKcreaterowlimit: return "OnFail Limit";
  682. case TAKexistsaggregate: return "Exists";
  683. case TAKcountaggregate: return "Count";
  684. case TAKprefetchproject: return "Prefetch Project";
  685. case TAKprefetchcountproject: return "Prefetch Count Project";
  686. case TAKfiltergroup: return "Filter Group";
  687. case TAKmemoryspillread: return "Read Spill";
  688. case TAKmemoryspillwrite: return "Write Spill";
  689. case TAKmemoryspillsplit: return "Spill";
  690. case TAKsection: return "Section";
  691. case TAKlinkedrawiterator: return "Child Dataset";
  692. case TAKnormalizelinkedchild: return "Normalize";
  693. case TAKfilterproject: return "Filtered Project";
  694. case TAKcatch: return "Catch";
  695. case TAKskipcatch: return "Skip Catch";
  696. case TAKcreaterowcatch: return "OnFail Catch";
  697. case TAKsectioninput: return "Section Input";
  698. case TAKindexgroupcount: return "Index Grouped Count";
  699. case TAKindexgroupexists: return "Index Grouped Exists";
  700. case TAKhashdistributemerge: return "Distribute Merge";
  701. case TAKselfjoinlight: return "Lightweight Self Join";
  702. case TAKwhen_dataset: return "When";
  703. case TAKhttp_rowdataset: return "HTTP dataset";
  704. case TAKstreamediterator: return "Streamed Dataset";
  705. case TAKexternalsource: return "User Source";
  706. case TAKexternalsink: return "User Output";
  707. case TAKexternalprocess: return "User Proceess";
  708. case TAKwhen_action: return "When";
  709. case TAKshuffle: return "Shuffle";
  710. case TAKdictionaryworkunitwrite:return "Dictionary Write";
  711. case TAKdictionaryresultwrite: return "Dictionary Result";
  712. }
  713. throwUnexpected();
  714. }
  715. extern bool isActivitySource(ThorActivityKind kind)
  716. {
  717. switch (kind)
  718. {
  719. case TAKpiperead:
  720. case TAKtemptable:
  721. case TAKinlinetable:
  722. case TAKtemprow:
  723. case TAKworkunitread:
  724. case TAKnull:
  725. case TAKsideeffect:
  726. case TAKsoap_rowdataset:
  727. case TAKsoap_rowaction:
  728. case TAKkeydiff:
  729. case TAKkeypatch:
  730. case TAKchilditerator:
  731. case TAKlocalgraph:
  732. case TAKemptyaction:
  733. case TAKdiskread:
  734. case TAKdisknormalize:
  735. case TAKdiskaggregate:
  736. case TAKdiskcount:
  737. case TAKdiskgroupaggregate:
  738. case TAKindexread:
  739. case TAKindexnormalize:
  740. case TAKindexaggregate:
  741. case TAKindexcount:
  742. case TAKindexgroupaggregate:
  743. case TAKchildnormalize:
  744. case TAKchildaggregate:
  745. case TAKchildgroupaggregate:
  746. case TAKcsvread:
  747. case TAKxmlread:
  748. case TAKlocalresultread:
  749. case TAKsimpleaction:
  750. case TAKrawiterator:
  751. case TAKlocalstreamread:
  752. case TAKgraphloopresultread:
  753. case TAKnwaygraphloopresultread:
  754. case TAKlinkedrawiterator:
  755. case TAKindexgroupexists:
  756. case TAKindexgroupcount:
  757. case TAKstreamediterator:
  758. case TAKexternalsource:
  759. return true;
  760. }
  761. return false;
  762. }
  763. extern bool isActivitySink(ThorActivityKind kind)
  764. {
  765. switch (kind)
  766. {
  767. case TAKdiskwrite:
  768. case TAKworkunitwrite:
  769. case TAKapply:
  770. case TAKremoteresult:
  771. case TAKdistribution:
  772. case TAKpipewrite:
  773. case TAKcsvwrite:
  774. case TAKindexwrite:
  775. case TAKxmlwrite:
  776. case TAKsoap_rowaction:
  777. case TAKsoap_datasetaction:
  778. case TAKkeydiff:
  779. case TAKkeypatch:
  780. case TAKdatasetresult:
  781. case TAKrowresult:
  782. case TAKemptyaction:
  783. case TAKlocalresultwrite:
  784. case TAKgraphloopresultwrite:
  785. case TAKsimpleaction:
  786. case TAKexternalsink:
  787. case TAKifaction:
  788. case TAKparallel:
  789. case TAKsequential:
  790. case TAKwhen_action:
  791. case TAKdictionaryworkunitwrite:
  792. case TAKdictionaryresultwrite:
  793. return true;
  794. }
  795. return false;
  796. }
  797. //------------------------------------------------------------------------------------------------
  798. byte * CStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  799. {
  800. if (required <= maxLength)
  801. return static_cast<byte *>(self);
  802. rtlReportFieldOverflow(required, maxLength, fieldName);
  803. return NULL;
  804. }
  805. //=====================================================================================================
  806. CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : in(_in)
  807. {
  808. buffer = NULL;
  809. maxOffset = 0;
  810. readOffset = 0;
  811. }
  812. void CThorContiguousRowBuffer::doRead(size32_t len, void * ptr)
  813. {
  814. ensureAccessible(readOffset + len);
  815. memcpy(ptr, buffer+readOffset, len);
  816. readOffset += len;
  817. }
  818. size32_t CThorContiguousRowBuffer::read(size32_t len, void * ptr)
  819. {
  820. doRead(len, ptr);
  821. return len;
  822. }
  823. size32_t CThorContiguousRowBuffer::readSize()
  824. {
  825. size32_t value;
  826. doRead(sizeof(value), &value);
  827. return value;
  828. }
  829. size32_t CThorContiguousRowBuffer::readPackedInt(void * ptr)
  830. {
  831. size32_t size = sizePackedInt();
  832. doRead(size, ptr);
  833. return size;
  834. }
  835. size32_t CThorContiguousRowBuffer::readUtf8(ARowBuilder & target, size32_t offset, size32_t fixedSize, size32_t len)
  836. {
  837. if (len == 0)
  838. return 0;
  839. size32_t size = sizeUtf8(len);
  840. byte * self = target.ensureCapacity(fixedSize + size, NULL);
  841. doRead(size, self+offset);
  842. return size;
  843. }
  844. size32_t CThorContiguousRowBuffer::readVStr(ARowBuilder & target, size32_t offset, size32_t fixedSize)
  845. {
  846. size32_t size = sizeVStr();
  847. byte * self = target.ensureCapacity(fixedSize + size, NULL);
  848. doRead(size, self+offset);
  849. return size;
  850. }
  851. size32_t CThorContiguousRowBuffer::readVUni(ARowBuilder & target, size32_t offset, size32_t fixedSize)
  852. {
  853. size32_t size = sizeVUni();
  854. byte * self = target.ensureCapacity(fixedSize + size, NULL);
  855. doRead(size, self+offset);
  856. return size;
  857. }
  858. size32_t CThorContiguousRowBuffer::sizePackedInt()
  859. {
  860. ensureAccessible(readOffset+1);
  861. return rtlGetPackedSizeFromFirst(buffer[readOffset]);
  862. }
  863. size32_t CThorContiguousRowBuffer::sizeUtf8(size32_t len)
  864. {
  865. if (len == 0)
  866. return 0;
  867. //The len is the number of utf characters, size depends on which characters are included.
  868. size32_t nextOffset = readOffset;
  869. while (len)
  870. {
  871. ensureAccessible(nextOffset+1);
  872. for (;nextOffset < maxOffset;)
  873. {
  874. nextOffset += readUtf8Size(buffer+nextOffset); // This function only accesses the first byte
  875. if (--len == 0)
  876. break;
  877. }
  878. }
  879. return nextOffset - readOffset;
  880. }
  881. size32_t CThorContiguousRowBuffer::sizeVStr()
  882. {
  883. size32_t nextOffset = readOffset;
  884. loop
  885. {
  886. ensureAccessible(nextOffset+1);
  887. for (; nextOffset < maxOffset; nextOffset++)
  888. {
  889. if (buffer[nextOffset] == 0)
  890. return (nextOffset + 1) - readOffset;
  891. }
  892. }
  893. }
  894. size32_t CThorContiguousRowBuffer::sizeVUni()
  895. {
  896. size32_t nextOffset = readOffset;
  897. const size32_t sizeOfUChar = 2;
  898. loop
  899. {
  900. ensureAccessible(nextOffset+sizeOfUChar);
  901. for (; nextOffset+1 < maxOffset; nextOffset += sizeOfUChar)
  902. {
  903. if (buffer[nextOffset] == 0 && buffer[nextOffset+1] == 0)
  904. return (nextOffset + sizeOfUChar) - readOffset;
  905. }
  906. }
  907. }
  908. void CThorContiguousRowBuffer::reportReadFail()
  909. {
  910. throwUnexpected();
  911. }
  912. const byte * CThorContiguousRowBuffer::peek(size32_t maxSize)
  913. {
  914. if (maxSize+readOffset > maxOffset)
  915. doPeek(maxSize+readOffset);
  916. return buffer + readOffset;
  917. }
  918. offset_t CThorContiguousRowBuffer::beginNested()
  919. {
  920. size32_t len = readSize();
  921. return len+readOffset;
  922. }
  923. bool CThorContiguousRowBuffer::finishedNested(offset_t endPos)
  924. {
  925. return readOffset >= endPos;
  926. }
  927. void CThorContiguousRowBuffer::skip(size32_t size)
  928. {
  929. ensureAccessible(readOffset+size);
  930. readOffset += size;
  931. }
  932. void CThorContiguousRowBuffer::skipPackedInt()
  933. {
  934. size32_t size = sizePackedInt();
  935. ensureAccessible(readOffset+size);
  936. readOffset += size;
  937. }
  938. void CThorContiguousRowBuffer::skipUtf8(size32_t len)
  939. {
  940. size32_t size = sizeUtf8(len);
  941. ensureAccessible(readOffset+size);
  942. readOffset += size;
  943. }
  944. void CThorContiguousRowBuffer::skipVStr()
  945. {
  946. size32_t size = sizeVStr();
  947. ensureAccessible(readOffset+size);
  948. readOffset += size;
  949. }
  950. void CThorContiguousRowBuffer::skipVUni()
  951. {
  952. size32_t size = sizeVUni();
  953. ensureAccessible(readOffset+size);
  954. readOffset += size;
  955. }
  956. // ===========================================
  957. IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context)
  958. {
  959. class cRowInterfaces: public CSimpleInterface, implements IRowInterfaces
  960. {
  961. Linked<IOutputMetaData> meta;
  962. ICodeContext* context;
  963. unsigned actid;
  964. Linked<IEngineRowAllocator> allocator;
  965. Linked<IOutputRowSerializer> serializer;
  966. Linked<IOutputRowDeserializer> deserializer;
  967. CSingletonLock allocatorlock;
  968. CSingletonLock serializerlock;
  969. CSingletonLock deserializerlock;
  970. public:
  971. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  972. cRowInterfaces(IOutputMetaData *_meta,unsigned _actid, ICodeContext *_context)
  973. : meta(_meta)
  974. {
  975. context = _context;
  976. actid = _actid;
  977. }
  978. IEngineRowAllocator * queryRowAllocator()
  979. {
  980. if (allocatorlock.lock()) {
  981. if (!allocator&&meta)
  982. allocator.setown(context->getRowAllocator(meta, actid));
  983. allocatorlock.unlock();
  984. }
  985. return allocator;
  986. }
  987. IOutputRowSerializer * queryRowSerializer()
  988. {
  989. if (serializerlock.lock()) {
  990. if (!serializer&&meta)
  991. serializer.setown(meta->createDiskSerializer(context,actid));
  992. serializerlock.unlock();
  993. }
  994. return serializer;
  995. }
  996. IOutputRowDeserializer * queryRowDeserializer()
  997. {
  998. if (deserializerlock.lock()) {
  999. if (!deserializer&&meta)
  1000. deserializer.setown(meta->createDiskDeserializer(context,actid));
  1001. deserializerlock.unlock();
  1002. }
  1003. return deserializer;
  1004. }
  1005. IOutputMetaData *queryRowMetaData()
  1006. {
  1007. return meta;
  1008. }
  1009. unsigned queryActivityId()
  1010. {
  1011. return actid;
  1012. }
  1013. ICodeContext *queryCodeContext()
  1014. {
  1015. return context;
  1016. }
  1017. };
  1018. return new cRowInterfaces(meta,actid,context);
  1019. };
  1020. class CRowStreamReader : public CSimpleInterface, implements IExtRowStream
  1021. {
  1022. Linked<IFileIO> fileio;
  1023. Linked<IMemoryMappedFile> mmfile;
  1024. Linked<IOutputRowDeserializer> deserializer;
  1025. Linked<IEngineRowAllocator> allocator;
  1026. Owned<ISerialStream> strm;
  1027. CThorStreamDeserializerSource source;
  1028. Owned<ISourceRowPrefetcher> prefetcher;
  1029. CThorContiguousRowBuffer prefetchBuffer; // used if prefetcher set
  1030. bool grouped;
  1031. unsigned __int64 maxrows;
  1032. unsigned __int64 rownum;
  1033. bool eoi;
  1034. bool eos;
  1035. bool eog;
  1036. offset_t bufofs;
  1037. #ifdef TRACE_CREATE
  1038. static unsigned rdnum;
  1039. #endif
  1040. class : implements IFileSerialStreamCallback
  1041. {
  1042. public:
  1043. CRC32 crc;
  1044. void process(offset_t ofs, size32_t sz, const void *buf)
  1045. {
  1046. crc.tally(sz,buf);
  1047. }
  1048. } crccb;
  1049. public:
  1050. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1051. CRowStreamReader(IFileIO *_fileio, IMemoryMappedFile *_mmfile, IRowInterfaces *rowif, offset_t _ofs, offset_t _len, unsigned __int64 _maxrows, bool _tallycrc, bool _grouped)
  1052. : fileio(_fileio), mmfile(_mmfile), allocator(rowif->queryRowAllocator()), prefetchBuffer(NULL)
  1053. {
  1054. #ifdef TRACE_CREATE
  1055. PROGLOG("CRowStreamReader %d = %p",++rdnum,this);
  1056. #endif
  1057. maxrows = _maxrows;
  1058. grouped = _grouped;
  1059. eoi = false;
  1060. eos = maxrows==0;
  1061. eog = false;
  1062. bufofs = 0;
  1063. rownum = 0;
  1064. if (fileio)
  1065. strm.setown(createFileSerialStream(fileio,_ofs,_len,(size32_t)-1, _tallycrc?&crccb:NULL));
  1066. else
  1067. strm.setown(createFileSerialStream(mmfile,_ofs,_len,_tallycrc?&crccb:NULL));
  1068. prefetcher.setown(rowif->queryRowMetaData()->createDiskPrefetcher(rowif->queryCodeContext(), rowif->queryActivityId()));
  1069. if (prefetcher)
  1070. prefetchBuffer.setStream(strm);
  1071. source.setStream(strm);
  1072. deserializer.set(rowif->queryRowDeserializer());
  1073. }
  1074. ~CRowStreamReader()
  1075. {
  1076. #ifdef TRACE_CREATE
  1077. PROGLOG("~CRowStreamReader %d = %p",rdnum--,this);
  1078. #endif
  1079. }
  1080. void reinit(offset_t _ofs,offset_t _len,unsigned __int64 _maxrows)
  1081. {
  1082. maxrows = _maxrows;
  1083. eoi = false;
  1084. eos = (maxrows==0)||(_len==0);
  1085. eog = false;
  1086. bufofs = 0;
  1087. rownum = 0;
  1088. strm->reset(_ofs,_len);
  1089. }
  1090. const void *nextRow()
  1091. {
  1092. if (eog) {
  1093. eog = false;
  1094. return NULL;
  1095. }
  1096. if (eos)
  1097. return NULL;
  1098. if (source.eos()) {
  1099. eos = true;
  1100. return NULL;
  1101. }
  1102. RtlDynamicRowBuilder rowBuilder(allocator);
  1103. size_t size = deserializer->deserialize(rowBuilder,source);
  1104. if (grouped && !eos) {
  1105. byte b;
  1106. source.read(sizeof(b),&b);
  1107. eog = (b==1);
  1108. }
  1109. if (++rownum==maxrows)
  1110. eos = true;
  1111. return rowBuilder.finalizeRowClear(size);
  1112. }
  1113. const void *prefetchRow(size32_t *sz)
  1114. {
  1115. if (eog)
  1116. eog = false;
  1117. else if (!eos) {
  1118. if (source.eos())
  1119. eos = true;
  1120. else {
  1121. assertex(prefetcher);
  1122. prefetcher->readAhead(prefetchBuffer);
  1123. const byte * ret = prefetchBuffer.queryRow();
  1124. if (sz)
  1125. *sz = prefetchBuffer.queryRowSize();
  1126. return ret;
  1127. }
  1128. }
  1129. if (sz)
  1130. sz = 0;
  1131. return NULL;
  1132. }
  1133. void prefetchDone()
  1134. {
  1135. prefetchBuffer.finishedRow();
  1136. if (grouped) {
  1137. byte b;
  1138. strm->get(sizeof(b),&b);
  1139. eog = (b==1);
  1140. }
  1141. }
  1142. virtual void stop()
  1143. {
  1144. stop(NULL);
  1145. }
  1146. void clear()
  1147. {
  1148. strm.clear();
  1149. source.clearStream();
  1150. fileio.clear();
  1151. }
  1152. void stop(CRC32 *crcout)
  1153. {
  1154. if (!eos) {
  1155. eos = true;
  1156. clear();
  1157. }
  1158. // NB CRC will only be right if stopped at eos
  1159. if (crcout)
  1160. *crcout = crccb.crc;
  1161. }
  1162. offset_t getOffset()
  1163. {
  1164. return source.tell();
  1165. }
  1166. };
  1167. #ifdef TRACE_CREATE
  1168. unsigned CRowStreamReader::rdnum;
  1169. #endif
  1170. bool UseMemoryMappedRead = false;
  1171. IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowIf, offset_t offset, offset_t len, unsigned __int64 maxrows, unsigned rwFlags, IExpander *eexp)
  1172. {
  1173. bool compressed = TestRwFlag(rwFlags, rw_compress);
  1174. if (UseMemoryMappedRead && !compressed)
  1175. {
  1176. PROGLOG("Memory Mapped read of %s",file->queryFilename());
  1177. Owned<IMemoryMappedFile> mmfile = file->openMemoryMapped();
  1178. if (!mmfile)
  1179. return NULL;
  1180. return new CRowStreamReader(NULL, mmfile, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
  1181. }
  1182. else
  1183. {
  1184. Owned<IFileIO> fileio;
  1185. if (compressed)
  1186. {
  1187. // JCSMORE should pass in a flag for rw_compressblkcrc I think, doesn't look like it (or anywhere else)
  1188. // checks the block crc's at the moment.
  1189. fileio.setown(createCompressedFileReader(file, eexp, UseMemoryMappedRead));
  1190. }
  1191. else
  1192. fileio.setown(file->open(IFOread));
  1193. if (!fileio)
  1194. return NULL;
  1195. return new CRowStreamReader(fileio, NULL, rowIf, offset, len, maxrows, TestRwFlag(rwFlags, rw_crc), TestRwFlag(rwFlags, rw_grouped));
  1196. }
  1197. }
  1198. IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowIf, unsigned rwFlags, IExpander *eexp)
  1199. {
  1200. return createRowStreamEx(file, rowIf, 0, (offset_t)-1, (unsigned __int64)-1, rwFlags, eexp);
  1201. }
  1202. void useMemoryMappedRead(bool on)
  1203. {
  1204. #if defined(_DEBUG) || defined(__64BIT__)
  1205. UseMemoryMappedRead = on;
  1206. #endif
  1207. }
  1208. #define ROW_WRITER_BUFFERSIZE (0x100000)
  1209. class CRowStreamWriter : public CSimpleInterface, private IRowSerializerTarget, implements IExtRowWriter
  1210. {
  1211. Linked<IFileIOStream> stream;
  1212. Linked<IOutputRowSerializer> serializer;
  1213. Linked<IEngineRowAllocator> allocator;
  1214. CRC32 crc;
  1215. bool grouped;
  1216. bool tallycrc;
  1217. unsigned nested;
  1218. MemoryAttr ma;
  1219. MemoryBuffer extbuf; // may need to spill to disk at some point
  1220. byte *buf;
  1221. size32_t bufpos;
  1222. bool autoflush;
  1223. #ifdef TRACE_CREATE
  1224. static unsigned wrnum;
  1225. #endif
  1226. void flushBuffer(bool final)
  1227. {
  1228. try
  1229. {
  1230. if (bufpos) {
  1231. stream->write(bufpos,buf);
  1232. if (tallycrc)
  1233. crc.tally(bufpos,buf);
  1234. bufpos = 0;
  1235. }
  1236. size32_t extpos = extbuf.length();
  1237. if (!extpos)
  1238. return;
  1239. if (!final)
  1240. extpos = (extpos/ROW_WRITER_BUFFERSIZE)*ROW_WRITER_BUFFERSIZE;
  1241. if (extpos) {
  1242. stream->write(extpos,extbuf.toByteArray());
  1243. if (tallycrc)
  1244. crc.tally(extpos,extbuf.toByteArray());
  1245. }
  1246. if (extpos<extbuf.length()) {
  1247. bufpos = extbuf.length()-extpos;
  1248. memcpy(buf,extbuf.toByteArray()+extpos,bufpos);
  1249. }
  1250. extbuf.clear();
  1251. }
  1252. catch (IException *e)
  1253. {
  1254. autoflush = false; // avoid follow-on errors
  1255. EXCLOG(e, "flushBuffer");
  1256. throw;
  1257. }
  1258. }
  1259. void streamFlush()
  1260. {
  1261. try
  1262. {
  1263. stream->flush();
  1264. }
  1265. catch (IException *e)
  1266. {
  1267. autoflush = false; // avoid follow-on errors
  1268. EXCLOG(e, "streamFlush");
  1269. throw;
  1270. }
  1271. }
  1272. public:
  1273. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  1274. CRowStreamWriter(IFileIOStream *_stream,IOutputRowSerializer *_serializer,IEngineRowAllocator *_allocator,bool _grouped, bool _tallycrc, bool _autoflush)
  1275. : stream(_stream), serializer(_serializer), allocator(_allocator)
  1276. {
  1277. #ifdef TRACE_CREATE
  1278. PROGLOG("createRowWriter %d = %p",++wrnum,this);
  1279. #endif
  1280. grouped = _grouped;
  1281. tallycrc = _tallycrc;
  1282. nested = 0;
  1283. buf = (byte *)ma.allocate(ROW_WRITER_BUFFERSIZE);
  1284. bufpos = 0;
  1285. autoflush = _autoflush;
  1286. }
  1287. ~CRowStreamWriter()
  1288. {
  1289. #ifdef TRACE_CREATE
  1290. PROGLOG("~createRowWriter %d = %p",wrnum--,this);
  1291. #endif
  1292. if (autoflush)
  1293. flush();
  1294. else if (bufpos+extbuf.length()) {
  1295. #ifdef _DEBUG
  1296. PrintStackReport();
  1297. #endif
  1298. WARNLOG("CRowStreamWriter closed with %d bytes unflushed",bufpos+extbuf.length());
  1299. }
  1300. }
  1301. void putRow(const void *row)
  1302. {
  1303. if (row) {
  1304. serializer->serialize(*this,(const byte *)row);
  1305. if (grouped) {
  1306. byte b = 0;
  1307. if (bufpos<ROW_WRITER_BUFFERSIZE)
  1308. buf[bufpos++] = b;
  1309. else
  1310. extbuf.append(b);
  1311. }
  1312. allocator->releaseRow(row);
  1313. }
  1314. else if (grouped) { // backpatch
  1315. byte b = 1;
  1316. if (extbuf.length())
  1317. extbuf.writeDirect(extbuf.length()-1,sizeof(b),&b);
  1318. else {
  1319. assertex(bufpos);
  1320. buf[bufpos-1] = b;
  1321. }
  1322. }
  1323. }
  1324. void flush()
  1325. {
  1326. flushBuffer(true);
  1327. streamFlush();
  1328. }
  1329. void flush(CRC32 *crcout)
  1330. {
  1331. flushBuffer(true);
  1332. streamFlush();
  1333. if (crcout)
  1334. *crcout = crc;
  1335. }
  1336. offset_t getPosition()
  1337. {
  1338. return stream->tell()+bufpos+extbuf.length();
  1339. }
  1340. void put(size32_t len, const void * ptr)
  1341. {
  1342. // first fill buf
  1343. loop {
  1344. if (bufpos<ROW_WRITER_BUFFERSIZE) {
  1345. size32_t wr = ROW_WRITER_BUFFERSIZE-bufpos;
  1346. if (wr>len)
  1347. wr = len;
  1348. memcpy(buf+bufpos,ptr,wr);
  1349. bufpos += wr;
  1350. len -= wr;
  1351. if (len==0)
  1352. break; // quick exit
  1353. ptr = (const byte *)ptr + wr;
  1354. }
  1355. if (nested) {
  1356. // have to append to ext buffer (will need to spill to disk here if gets *too* big)
  1357. extbuf.append(len,ptr);
  1358. break;
  1359. }
  1360. else
  1361. flushBuffer(false);
  1362. }
  1363. }
  1364. size32_t beginNested()
  1365. {
  1366. if (nested++==0)
  1367. if (bufpos==ROW_WRITER_BUFFERSIZE)
  1368. flushBuffer(false);
  1369. size32_t ret = bufpos+extbuf.length();
  1370. size32_t sz = 0;
  1371. put(sizeof(sz),&sz);
  1372. return ret;
  1373. }
  1374. void endNested(size32_t pos)
  1375. {
  1376. size32_t sz = bufpos+extbuf.length()-(pos + sizeof(size32_t));
  1377. size32_t wr = sizeof(size32_t);
  1378. byte *out = (byte *)&sz;
  1379. if (pos<ROW_WRITER_BUFFERSIZE) {
  1380. size32_t space = ROW_WRITER_BUFFERSIZE-pos;
  1381. if (space>wr)
  1382. space = wr;
  1383. memcpy(buf+pos,out,space);
  1384. wr -= space;
  1385. if (wr==0) {
  1386. --nested;
  1387. return; // quick exit
  1388. }
  1389. out += space;
  1390. pos += space;
  1391. }
  1392. extbuf.writeDirect(pos-ROW_WRITER_BUFFERSIZE,wr,out);
  1393. --nested;
  1394. }
  1395. };
  1396. #ifdef TRACE_CREATE
  1397. unsigned CRowStreamWriter::wrnum=0;
  1398. #endif
  1399. IExtRowWriter *createRowWriter(IFile *iFile, IRowInterfaces *rowIf, unsigned flags, ICompressor *compressor)
  1400. {
  1401. OwnedIFileIO iFileIO;
  1402. if (TestRwFlag(flags, rw_compress))
  1403. {
  1404. size32_t fixedSize = rowIf->queryRowMetaData()->querySerializedDiskMeta()->getFixedSize();
  1405. if (fixedSize && TestRwFlag(flags, rw_grouped))
  1406. ++fixedSize; // row writer will include a grouping byte
  1407. iFileIO.setown(createCompressedFileWriter(iFile, fixedSize, TestRwFlag(flags, rw_extend), TestRwFlag(flags, rw_compressblkcrc), compressor, TestRwFlag(flags, rw_fastlz)));
  1408. }
  1409. else
  1410. iFileIO.setown(iFile->open((flags & rw_extend)?IFOwrite:IFOcreate));
  1411. if (!iFileIO)
  1412. return NULL;
  1413. flags &= ~((unsigned)(rw_compress|rw_fastlz|rw_compressblkcrc));
  1414. return createRowWriter(iFileIO, rowIf, flags);
  1415. }
  1416. IExtRowWriter *createRowWriter(IFileIO *iFileIO, IRowInterfaces *rowIf, unsigned flags)
  1417. {
  1418. if (TestRwFlag(flags, rw_compress))
  1419. throw MakeStringException(0, "Unsupported createRowWriter flags");
  1420. Owned<IFileIOStream> stream;
  1421. if (TestRwFlag(flags, rw_buffered))
  1422. stream.setown(createBufferedIOStream(iFileIO));
  1423. else
  1424. stream.setown(createIOStream(iFileIO));
  1425. if (flags & rw_extend)
  1426. stream->seek(0, IFSend);
  1427. flags &= ~((unsigned)(rw_extend|rw_buffered));
  1428. return createRowWriter(stream, rowIf, flags);
  1429. }
  1430. IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags)
  1431. {
  1432. if (0 != (flags & (rw_compress|rw_fastlz|rw_extend|rw_buffered|rw_compressblkcrc)))
  1433. throw MakeStringException(0, "Unsupported createRowWriter flags");
  1434. Owned<CRowStreamWriter> writer = new CRowStreamWriter(strm, rowIf->queryRowSerializer(), rowIf->queryRowAllocator(), TestRwFlag(flags, rw_grouped), TestRwFlag(flags, rw_crc), TestRwFlag(flags, rw_autoflush));
  1435. return writer.getClear();
  1436. }
  1437. class CDiskMerger : public CInterface, implements IDiskMerger
  1438. {
  1439. IArrayOf<IFile> tempfiles;
  1440. IRowStream **strms;
  1441. Linked<IRecordSize> irecsize;
  1442. StringAttr tempnamebase;
  1443. Linked<IRowLinkCounter> linker;
  1444. Linked<IRowInterfaces> rowInterfaces;
  1445. public:
  1446. IMPLEMENT_IINTERFACE;
  1447. CDiskMerger(IRowInterfaces *_rowInterfaces, IRowLinkCounter *_linker, const char *_tempnamebase)
  1448. : rowInterfaces(_rowInterfaces), linker(_linker), tempnamebase(_tempnamebase)
  1449. {
  1450. strms = NULL;
  1451. }
  1452. ~CDiskMerger()
  1453. {
  1454. for (unsigned i=0;i<tempfiles.ordinality();i++) {
  1455. if (strms&&strms[i])
  1456. strms[i]->Release();
  1457. try
  1458. {
  1459. tempfiles.item(i).remove();
  1460. }
  1461. catch (IException * e)
  1462. {
  1463. //Exceptions inside destructors are bad.
  1464. EXCLOG(e);
  1465. e->Release();
  1466. }
  1467. }
  1468. free(strms);
  1469. }
  1470. IRowWriter *createWriteBlock()
  1471. {
  1472. StringBuffer tempname(tempnamebase);
  1473. tempname.append('.').append(tempfiles.ordinality()).append('_').append((__int64)GetCurrentThreadId()).append('_').append((unsigned)GetCurrentProcessId());
  1474. IFile *file = createIFile(tempname.str());
  1475. tempfiles.append(*file);
  1476. return createRowWriter(file, rowInterfaces);
  1477. }
  1478. void put(const void **rows,unsigned numrows)
  1479. {
  1480. Owned<IRowWriter> out = createWriteBlock();
  1481. for (unsigned i=0;i<numrows;i++)
  1482. out->putRow(rows[i]);
  1483. }
  1484. void putIndirect(const void ***rowptrs,unsigned numrows)
  1485. {
  1486. Owned<IRowWriter> out = createWriteBlock();
  1487. for (unsigned i=0;i<numrows;i++)
  1488. out->putRow(*(rowptrs[i]));
  1489. }
  1490. virtual void put(ISortedRowProvider *rows)
  1491. {
  1492. Owned<IRowWriter> out = createWriteBlock();
  1493. void * row;
  1494. while((row = rows->getNextSorted()) != NULL)
  1495. out->putRow(row);
  1496. }
  1497. IRowStream *merge(ICompare *icompare, bool partdedup)
  1498. {
  1499. unsigned numstrms = tempfiles.ordinality();
  1500. strms = (IRowStream **)calloc(numstrms,sizeof(IRowStream *));
  1501. unsigned i;
  1502. for (i=0;i<numstrms;i++) {
  1503. strms[i] = createRowStream(&tempfiles.item(i), rowInterfaces);
  1504. }
  1505. if (numstrms==1)
  1506. return LINK(strms[0]);
  1507. if (icompare)
  1508. return createRowStreamMerger(numstrms, strms, icompare, partdedup, linker);
  1509. return createConcatRowStream(numstrms,strms);
  1510. }
  1511. virtual count_t mergeTo(IRowWriter *dest, ICompare *icompare, bool partdedup)
  1512. {
  1513. count_t count = 0;
  1514. Owned<IRowStream> mergedStream = merge(icompare, partdedup);
  1515. loop
  1516. {
  1517. const void *row = mergedStream->nextRow();
  1518. if (!row)
  1519. return count;
  1520. dest->putRow(row); // takes ownership
  1521. ++count;
  1522. }
  1523. return count;
  1524. }
  1525. };
  1526. IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase)
  1527. {
  1528. return new CDiskMerger(rowInterfaces, linker, tempnamebase);
  1529. }